Python 网络设备监控脚本:自动执行命令、保存配置并下载
utf-8 python3
from ncclient import manager # 导入ncclient库,用于Netconf协议连接 from paramiko import SSHClient,AutoAddPolicy # 导入paramiko库,用于SSH连接 import datetime # 导入datetime库,用于处理时间 import time # 导入time库,用于控制时间
div_ip='10.1.0.6' # 设备IP ssh_usr='python' # SSH登录用户名 ssh_pwd='Huawei@123' # SSH登录密码 cn_usr='ncclient' # Netconf登录用户名 cn_pwd='Huawei@123' # Netconf登录密码 Command_file='command.txt' # 命令文件名 save=datetime.datetime.now().strftime('%Y-%m-%d-%H')+'_X_T1_AGG1.zip' # 存储文件名,包含日期和设备名称
class Device: def init(self,ip,usr,pwd): self.host=ip self.username=usr self.password=pwd self.client=self.get_connect() # 获取SSH连接 self.cli=self.client.invoke_shell() # 打开SSH连接shell self.cli.send('n screen-length 0 temporary ') # 发送换行符和命令 self.cli.recv(65535) # 接收命令回显
# SSH连接函数
def get_connect(self):
sshcon=SSHClient() # 创建SSH连接
sshcon.set_missing_host_key_policy(AutoAddPolicy) # 设置SSH连接策略
sshcon.connect(hostname=self.host,username=self.username,password=self.password) # 连接SSH
return sshcon # 返回SSH连接对象
def cmd_running(self):
with open(Command_file,'r') as f: # 打开命令文件
cmd_list=f.readlines() # 读取命令文件中的所有命令
for cmd in cmd_list: # 循环执行每个命令
self.cli.send(cmd) # 发送命令
time.sleep(1) # 等待1秒
dis_th=self.cli.recv(65535).decode('utf-8') # 接收命令回显
time_n=datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S') # 获取当前时间
file_name=f'{self.host}_{time_n}'
with open(file_name,'w') as f: # 打开文件,写入命令回显
f.write(dis_th)
return dis_th
def chenk_fans(self,cmd,dis_th):
if cmd=='display fan
': # 如果是查询风扇状态的命令 status='normal' in dis_th # 查找回显中是否包含normal字符串 if not status: # 如果回显中没有normal字符串 print('All fans are faulty.') # 打印警告信息
def close(self):
self.client.close() # 关闭SSH连接
def save_running(self):
cmd=f'save {save}
y y ' # 生成保存命令 self.cli.send(cmd) # 发送保存命令 time.sleep(1) # 等待1秒 dis_th=self.cli.recv(65535).decode('utf-8') # 接收命令回显 print(dis_th) # 打印命令回显
def download(self):
with SSHClient() as ssh_con: # 打开SSH连接
ssh_con.set_missing_host_key_policy(AutoAddPolicy) # 设置SSH连接策略
ssh_con.connect(hostname=self.host,username=self.username,password=self.password) # 连接SSH
with ssh_con.open_sftp() as sftp_con: # 打开SFTP连接
sftp_con.get(remoutepath=f'/{save}',localpath=save) # 下载远程文件
def netconf_run(self):
self.cli.send('sys
netconf y protocol inbound ssh port 830 commit ') # 发送Netconf命令 time.sleep(10) # 等待10秒 dis_th=self.cli.recv(65535).decode('utf-8') # 接收命令回显 print(dis_th) # 打印命令回显
def netconf(ip,usr,pwd): netconf=Device(ip,usr,pwd) # 创建Netconf连接对象 netconf.netconf_run() # 开启Netconf netconf.close() # 关闭连接
def monitor(ip,usr,pwd): while True: # 循环执行监控任务 monitor=Device(ip,usr,pwd) # 创建SSH连接对象 dis_th=monitor.cmd_running() # 执行命令 monitor.chenk_fans(Command_file,dis_th) # 检查风扇状态 now=datetime.datetime.now() # 获取当前时间 save_time_24h=now.replace(hour=0,minute=0,second=0,microsecond=0)+datetime.timedelta(days=1) # 计算下次保存时间 save_time_5min=now+datetime.timedelta(minutes=1) # 计算下次保存时间 if now>=save_time_24h: # 如果当前时间大于等于下次保存时间(每天零点) monitor.save_running() # 执行保存命令 monitor.download() # 下载保存文件 elif now>=save_time_5min: # 如果当前时间大于等于下次保存时间(每5分钟) monitor.save_running() # 执行保存命令 monitor.download() # 下载保存文件 monitor.close() time.sleep(300) # 等待5分钟
if name=='main': try: #netconf(div_ip,ssh_usr,ssh_pwd) # 开启Netconf monitor(div_ip,ssh_usr,ssh_pwd) # 执行监控任务 except KeyboardInterrupt: print('已经停止') # 捕获键盘中断信号,停止程序执行
原文地址: https://www.cveoy.top/t/topic/n9Em 著作权归作者所有。请勿转载和采集!