Python系统运维自动化通过调用相关库和模块,能够快速完成多台服务器的批量操作,同时配合规范的脚本管理方案,可大幅降低日常运维的工作量和出错概率。

批量操作的核心实现思路
批量操作的核心是先建立与目标服务器的连接,再统一下发执行指令,常用的连接方式是SSH协议,Python中可以使用paramiko库实现SSH连接功能。
基于paramiko的批量SSH操作示例
以下代码实现了批量连接多台服务器并执行指定命令的功能:
import paramiko
import time
# 定义服务器列表,包含IP、端口、用户名、密码
server_list = [
{"host": "192.168.0.1", "port": 22, "user": "root", "pwd": "test123"},
{"host": "127.0.0.1", "port": 22, "user": "root", "pwd": "test456"}
]
# 定义要执行的命令
command = "df -h"
def batch_run_command(servers, cmd):
results = {}
for server in servers:
try:
# 创建SSH客户端实例
ssh = paramiko.SSHClient()
# 自动添加未知主机密钥
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(
hostname=server["host"],
port=server["port"],
username=server["user"],
password=server["pwd"],
timeout=10
)
# 执行命令
stdin, stdout, stderr = ssh.exec_command(cmd)
# 获取命令输出
output = stdout.read().decode("utf-8")
error = stderr.read().decode("utf-8")
# 存储结果
results[server["host"]] = {
"output": output if output else "无输出",
"error": error if error else "无错误"
}
# 关闭连接
ssh.close()
except Exception as e:
results[server["host"]] = {"error": str(e)}
return results
if __name__ == "__main__":
res = batch_run_command(server_list, command)
for host, info in res.items():
print(f"服务器 {host} 执行结果:")
print(f"输出:{info.get('output')}")
print(f"错误:{info.get('error')}")
print("-" * 30)
运维脚本的规范化管理
随着运维脚本数量增多,需要建立统一的管理规范,避免脚本混乱、重复开发的问题,常见的管理维度包括目录结构、版本控制、执行日志三个方面。
推荐的脚本目录结构
可以按照功能模块划分脚本目录,参考结构如下:
- ops_scripts:根目录
- ops_scripts/batch_ops:批量操作相关脚本
- ops_scripts/monitor:监控相关脚本
- ops_scripts/deploy:部署相关脚本
- ops_scripts/common:公共工具模块
- ops_scripts/logs:脚本执行日志存放目录
- ops_scripts/config:配置文件存放目录
脚本执行日志记录实现
为所有运维脚本添加统一的日志记录功能,方便后续排查问题,以下是公共日志模块的实现代码:
import logging
import os
import time
def init_logger(script_name):
# 日志存放目录
log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../logs")
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 日志文件名,按日期划分
log_file = os.path.join(log_dir, f"{script_name}_{time.strftime('%Y%m%d')}.log")
# 配置日志格式
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(filename)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(log_file, encoding="utf-8"),
logging.StreamHandler()
]
)
return logging.getLogger()
在批量操作脚本中引入该日志模块,即可自动记录每次批量任务的执行情况和异常信息。
批量操作与脚本管理的结合实践
可以将批量操作逻辑封装成独立函数,放在公共模块中,不同业务脚本直接调用即可,同时配合配置文件管理服务器列表,避免硬编码。
配置文件示例
使用YAML格式存储服务器配置,内容如下:
# config/server_config.yaml
servers:
- host: 192.168.0.1
port: 22
user: root
pwd: test123
- host: 127.0.0.1
port: 22
user: root
pwd: test456
batch_command: df -h
读取配置并执行批量任务
结合配置文件的完整批量任务脚本代码如下:
import yaml
import sys
import os
# 添加公共模块路径
sys.path.append(os.path.join(os.path.dirname(__file__), "../common"))
from ssh_batch import batch_run_command
from logger import init_logger
def main():
logger = init_logger("batch_disk_check")
# 读取配置文件
config_path = os.path.join(os.path.dirname(__file__), "../config/server_config.yaml")
with open(config_path, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
servers = config.get("servers", [])
command = config.get("batch_command", "df -h")
logger.info(f"开始执行批量命令:{command},目标服务器数量:{len(servers)}")
try:
results = batch_run_command(servers, command)
for host, info in results.items():
logger.info(f"服务器 {host} 输出:{info.get('output')}")
if info.get("error"):
logger.error(f"服务器 {host} 错误:{info.get('error')}")
logger.info("批量任务执行完成")
except Exception as e:
logger.error(f"批量任务执行失败:{str(e)}")
if __name__ == "__main__":
main()
这种实现方式将配置、公共逻辑、业务脚本分离,后续新增批量操作任务时,只需要修改配置文件或者新增少量业务代码即可,大幅提升了脚本的可维护性。