utf-8 python3

from ncclient import manager # 导入ncclient库,用于Netconf协议连接 from paramiko import SSHClient,AutoAddPolicy # 导入paramiko库,用于SSH连接 import datetime # 导入datetime库,用于处理时间 import time # 导入time库,用于控制时间

div_ip='10.1.0.6' # 设备IP ssh_usr='python' # SSH登录用户名 ssh_pwd='Huawei@123' # SSH登录密码 cn_usr='ncclient' # Netconf登录用户名 cn_pwd='Huawei@123' # Netconf登录密码 Command_file='command.txt' # 命令文件名 save=datetime.datetime.now().strftime('%Y-%m-%d-%H')+'_X_T1_AGG1.zip' # 存储文件名,包含日期和设备名称

class Device: def init(self,ip,usr,pwd): self.host=ip self.username=usr self.password=pwd self.client=self.get_connect() # 获取SSH连接 self.cli=self.client.invoke_shell() # 打开SSH连接shell self.cli.send('n screen-length 0 temporary ') # 发送换行符和命令 self.cli.recv(65535) # 接收命令回显

# SSH连接函数
def get_connect(self):
    sshcon=SSHClient()  # 创建SSH连接
    sshcon.set_missing_host_key_policy(AutoAddPolicy)  # 设置SSH连接策略
    sshcon.connect(hostname=self.host,username=self.username,password=self.password)  # 连接SSH
    return sshcon  # 返回SSH连接对象

def cmd_running(self):
    with open(Command_file,'r') as f:  # 打开命令文件
        cmd_list=f.readlines()  # 读取命令文件中的所有命令
    for cmd in cmd_list:  # 循环执行每个命令
        self.cli.send(cmd)  # 发送命令
        time.sleep(1)  # 等待1秒
    dis_th=self.cli.recv(65535).decode('utf-8')  # 接收命令回显
    time_n=datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')  # 获取当前时间
    file_name=f'{self.host}_{time_n}'
    with open(file_name,'w') as f:  # 打开文件,写入命令回显
        f.write(dis_th)
    return dis_th

def chenk_fans(self,cmd,dis_th):
    if cmd=='display fan

': # 如果是查询风扇状态的命令 status='normal' in dis_th # 查找回显中是否包含normal字符串 if not status: # 如果回显中没有normal字符串 print('All fans are faulty.') # 打印警告信息

def close(self):
    self.client.close()  # 关闭SSH连接

def save_running(self):
    cmd=f'save {save}

y y ' # 生成保存命令 self.cli.send(cmd) # 发送保存命令 time.sleep(1) # 等待1秒 dis_th=self.cli.recv(65535).decode('utf-8') # 接收命令回显 print(dis_th) # 打印命令回显

def download(self):
    with SSHClient() as ssh_con:  # 打开SSH连接
        ssh_con.set_missing_host_key_policy(AutoAddPolicy)  # 设置SSH连接策略
        ssh_con.connect(hostname=self.host,username=self.username,password=self.password)  # 连接SSH
        with ssh_con.open_sftp() as sftp_con:  # 打开SFTP连接
            sftp_con.get(remoutepath=f'/{save}',localpath=save)  # 下载远程文件

def netconf_run(self):
    self.cli.send('sys

netconf y protocol inbound ssh port 830 commit ') # 发送Netconf命令 time.sleep(10) # 等待10秒 dis_th=self.cli.recv(65535).decode('utf-8') # 接收命令回显 print(dis_th) # 打印命令回显

def netconf(ip,usr,pwd): netconf=Device(ip,usr,pwd) # 创建Netconf连接对象 netconf.netconf_run() # 开启Netconf netconf.close() # 关闭连接

def monitor(ip,usr,pwd): while True: # 循环执行监控任务 monitor=Device(ip,usr,pwd) # 创建SSH连接对象 dis_th=monitor.cmd_running() # 执行命令 monitor.chenk_fans(Command_file,dis_th) # 检查风扇状态 now=datetime.datetime.now() # 获取当前时间 save_time_24h=now.replace(hour=0,minute=0,second=0,microsecond=0)+datetime.timedelta(days=1) # 计算下次保存时间 save_time_5min=now+datetime.timedelta(minutes=1) # 计算下次保存时间 if now>=save_time_24h: # 如果当前时间大于等于下次保存时间(每天零点) monitor.save_running() # 执行保存命令 monitor.download() # 下载保存文件 elif now>=save_time_5min: # 如果当前时间大于等于下次保存时间(每5分钟) monitor.save_running() # 执行保存命令 monitor.download() # 下载保存文件 monitor.close() time.sleep(300) # 等待5分钟

if name=='main': try: #netconf(div_ip,ssh_usr,ssh_pwd) # 开启Netconf monitor(div_ip,ssh_usr,ssh_pwd) # 执行监控任务 except KeyboardInterrupt: print('已经停止') # 捕获键盘中断信号,停止程序执行

Python 网络设备监控脚本:自动执行命令、保存配置并下载

原文地址: https://www.cveoy.top/t/topic/n9Em 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录