华为5700交换机监控脚本:Python自动化监控与备份
导入所需库
import paramiko from ncclient import manager import time from datetime import datetime
定义交换机类
class Switch: def init(self, ip, username, password): self.ip = ip self.username = username self.password = password
def connect_ssh(self):
# 建立SSH连接
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh.connect(self.ip, username=self.username, password=self.password)
def execute_command(self, command):
# 执行命令
stdin, stdout, stderr = self.ssh.exec_command(command)
result = stdout.read().decode()
return result
def close_ssh(self):
# 关闭SSH连接
self.ssh.close()
def connect_netconf(self):
# 建立NETCONF连接
self.netconf = manager.connect(host=self.ip, username=self.username, password=self.password, port=830, hostkey_verify=False)
def configure_netconf(self, config):
# 配置NETCONF
self.netconf.edit_config(target='running', config=config)
def close_netconf(self):
# 关闭NETCONF连接
self.netconf.close_session()
定义监控类
class Monitor: def init(self, switch): self.switch = switch
def check_power(self):
# 检查电源状态
command = 'display device'
result = self.switch.execute_command(command)
if 'Power supply: Normal' in result:
return 'Normal'
else:
return 'faulty'
def check_fan(self):
# 检查风扇状态
command = 'display device'
result = self.switch.execute_command(command)
if 'Fan: Normal' in result:
return 'Normal'
else:
return 'faulty'
def check_lacp(self):
# 检查LACP状态
command = 'display lacp verbose'
result = self.switch.execute_command(command)
if 'LACP protocol: Enabled' in result:
return 'Normal'
else:
return 'faulty'
def check_cpu(self):
# 检查CPU利用率
command = 'display cpu-usage'
result = self.switch.execute_command(command)
if 'CPU utilization: ' in result:
cpu_utilization = float(result.split('CPU utilization: ')[1].split('%')[0])
if cpu_utilization < 80:
return 'Normal'
else:
return 'warning'
else:
return 'faulty'
def check_memory(self):
# 检查内存利用率
command = 'display memory-usage'
result = self.switch.execute_command(command)
if 'Memory utilization: ' in result:
memory_utilization = float(result.split('Memory utilization: ')[1].split('%')[0])
if memory_utilization < 80:
return 'Normal'
else:
return 'warning'
else:
return 'faulty'
def check_ospf(self):
# 检查OSPF邻居状态
command = 'display ospf peer'
result = self.switch.execute_command(command)
if 'Full' in result:
return 'Normal'
else:
return 'warning'
def monitor_status(self):
# 监控状态并输出结果
power_status = self.check_power()
fan_status = self.check_fan()
lacp_status = self.check_lacp()
cpu_status = self.check_cpu()
memory_status = self.check_memory()
ospf_status = self.check_ospf()
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"[{now}] Power status: {power_status}, Fan status: {fan_status}, LACP status: {lacp_status}, CPU status: {cpu_status}, Memory status: {memory_status}, OSPF status: {ospf_status}")
定义备份类
class Backup: def init(self, switch): self.switch = switch self.config_file = f"{self.switch.ip}_{datetime.now().strftime('%Y%m%d%H%M%S')}.cfg"
def backup_config(self):
# 备份配置文件
command = 'save configuration to flash:/config.cfg'
self.switch.execute_command(command)
command = f'tftp {self.switch.ip} put flash:/config.cfg {self.config_file}'
self.switch.execute_command(command)
print(f"Config file {self.config_file} backed up.")
读取命令文件
with open('commands.txt', 'r') as f: commands = f.readlines()
创建交换机对象
switch = Switch('192.168.1.1', 'admin', 'password')
连接SSH
switch.connect_ssh()
执行命令
for command in commands: result = switch.execute_command(command) print(result)
关闭SSH
switch.close_ssh()
创建监控对象
monitor = Monitor(switch)
连接NETCONF
switch.connect_netconf()
配置设备日志主机
config = """
关闭NETCONF
switch.close_netconf()
监控状态并输出结果
while True: monitor.monitor_status() time.sleep(300) # 每5分钟监控一次
# 分析监控结果并给出提示
fan_status = monitor.check_fan()
if fan_status == 'faulty':
print("Fan faulty, please replace it.")
# 每24h备份配置文件
if datetime.now().strftime('%H:%M:%S') == '00:00:00':
backup = Backup(switch)
backup.backup_config()
原文地址: https://www.cveoy.top/t/topic/n2d7 著作权归作者所有。请勿转载和采集!