导入所需库

import paramiko from ncclient import manager import time from datetime import datetime

定义交换机类

class Switch: def init(self, ip, username, password): self.ip = ip self.username = username self.password = password

    # 建立 SSH 连接
    self.ssh = paramiko.SSHClient()
    self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    self.ssh.connect(self.ip, username=self.username, password=self.password)
    
    # 建立 NETCONF 连接
    self.netconf = manager.connect(host=self.ip, username=self.username, password=self.password, port=830, hostkey_verify=False)

def execute_command(self, command):
    # 执行命令
    stdin, stdout, stderr = self.ssh.exec_command(command)
    result = stdout.read().decode('utf-8')
    return result

def get_fan_status(self):
    # 获取风扇状态
    command = 'display fan'
    result = self.execute_command(command)
    fan_status = result.split('

')[2].split()[1:] return fan_status

def get_power_status(self):
    # 获取电源状态
    command = 'display power'
    result = self.execute_command(command)
    power_status = result.split('

')[2].split()[1:] return power_status

def get_lacp_status(self):
    # 获取 LACP 状态
    command = 'display lacp summary'
    result = self.execute_command(command)
    lacp_status = result.split('

')[2].split()[1:] return lacp_status

def get_cpu_utilization(self):
    # 获取 CPU 利用率
    command = 'display cpu-usage'
    result = self.execute_command(command)
    cpu_utilization = result.split('

')[1].split()[1] return cpu_utilization

def get_memory_utilization(self):
    # 获取内存利用率
    command = 'display memory-usage'
    result = self.execute_command(command)
    memory_utilization = result.split('

')[1].split()[1] return memory_utilization

def get_ospf_neighbour_status(self):
    # 获取 OSPF 邻居状态
    command = 'display ospf peer'
    result = self.execute_command(command)
    ospf_neighbour_status = result.split('

')[2].split()[1:] return ospf_neighbour_status

def save_config(self):
    # 保存配置文件
    command = 'save'
    self.execute_command(command)
    time.sleep(5) # 等待5秒,确保配置文件保存完成
    
def backup_config(self):
    # 备份配置文件
    local_path = 'config_backup.cfg'
    remote_path = 'flash:/config_backup.cfg'
    sftp = self.ssh.open_sftp()
    sftp.get(remote_path, local_path)
    sftp.close()

def configure_loghost(self):
    # 配置设备日志主机
    config_template = '''
    <config>
      <system>
        <log>
          <host>
            <name>10.1.60.2</name>
          </host>
        </log>
      </system>
    </config>
    '''
    self.netconf.edit_config(target='running', config=config_template)

读取命令文件并执行

with open('commands.txt') as f: commands = f.readlines() switch = Switch('192.168.1.1', 'username', 'password') for command in commands: result = switch.execute_command(command) print(result)

监控设备状态并输出结果

while True: switch = Switch('192.168.1.1', 'username', 'password') fan_status = switch.get_fan_status() power_status = switch.get_power_status() lacp_status = switch.get_lacp_status() cpu_utilization = switch.get_cpu_utilization() memory_utilization = switch.get_memory_utilization() ospf_neighbour_status = switch.get_ospf_neighbour_status()

print('Fan status:', fan_status)
print('Power status:', power_status)
print('LACP status:', lacp_status)
print('CPU utilization:', cpu_utilization)
print('Memory utilization:', memory_utilization)
print('OSPF neighbour status:', ospf_neighbour_status)

# 分析状态监控结果并给出提示
if 'faulty' in fan_status:
    print('Fan is faulty!')
if 'faulty' in power_status:
    print('Power is faulty!')
    
# 每5分钟监控一次状态
time.sleep(300)

# 每24小时保存配置文件并备份到本地
current_time = datetime.now().strftime('%H:%M:%S')
if current_time == '00:00:00':
    switch = Switch('192.168.1.1', 'username', 'password')
    switch.save_config()
    switch.backup_config()
    
# 配置设备日志主机
switch = Switch('192.168.1.1', 'username', 'password')
switch.configure_loghost()
Python 脚本监控华为交换机状态并备份配置

原文地址: https://www.cveoy.top/t/topic/n2eA 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录