基于Linux的ARP攻击检测防护软件的设计与实现
以下是基于Linux的ARP攻击检测防护软件的Python代码实现。
首先,我们需要导入相关的模块和库:
import os
import sys
import time
import socket
import struct
import threading
import tkinter as tk
from tkinter import messagebox
然后,我们定义一些全局变量:
# ICMP Flood攻击检测阈值
ICMP_THRESHOLD = 100
# TCP攻击检测阈值
TCP_THRESHOLD = 100
# ARP欺骗攻击检测阈值
ARP_THRESHOLD = 100
# ARP欺骗攻击检测间隔
ARP_INTERVAL = 10
# 局域网IP地址前缀
LOCAL_NET_PREFIX = '192.168.0.'
# 记录活动主机的字典
active_hosts = {}
# 记录攻击源的字典
attack_sources = {}
# 记录所有事件的列表
event_list = []
接下来,我们定义一些函数来实现各个模块的功能。
首先是ICMP Flood攻击检测模块:
def icmp_flood_detection():
'''
ICMP Flood攻击检测函数
'''
global event_list
icmp_count = {}
while True:
# 统计5秒内的ICMP包数量
time.sleep(5)
for host in active_hosts:
icmp_count[host] = active_hosts[host]['icmp_count']
active_hosts[host]['icmp_count'] = 0
# 判断是否遭受ICMP Flood攻击
for host in icmp_count:
if icmp_count[host] > ICMP_THRESHOLD:
event_list.append(f'ICMP Flood攻击:{host},数量:{icmp_count[host]}')
messagebox.showwarning('警告', f'遭受ICMP Flood攻击:{host},数量:{icmp_count[host]}')
然后是TCP攻击检测模块:
def tcp_attack_detection():
'''
TCP攻击检测函数
'''
global event_list
tcp_count = {}
while True:
# 统计5秒内的TCP连接数量
time.sleep(5)
for host in active_hosts:
tcp_count[host] = active_hosts[host]['tcp_count']
active_hosts[host]['tcp_count'] = 0
# 判断是否遭受TCP攻击
for host in tcp_count:
if tcp_count[host] > TCP_THRESHOLD:
event_list.append(f'TCP攻击:{host},数量:{tcp_count[host]}')
messagebox.showwarning('警告', f'遭受TCP攻击:{host},数量:{tcp_count[host]}')
最后是ARP欺骗攻击检测模块:
def arp_attack_detection():
'''
ARP欺骗攻击检测函数
'''
global event_list, attack_sources
while True:
# 扫描局域网内所有IP地址
for i in range(1, 255):
ip = LOCAL_NET_PREFIX + str(i)
if ip in active_hosts:
continue
# 发送ARP请求包
mac = send_arp_request(ip)
if mac:
active_hosts[ip] = {'mac': mac, 'icmp_count': 0, 'tcp_count': 0}
# 判断是否遭受ARP欺骗攻击
if mac in attack_sources:
event_list.append(f'ARP欺骗攻击:{ip},攻击源:{attack_sources[mac]}')
messagebox.showwarning('警告', f'遭受ARP欺骗攻击:{ip},攻击源:{attack_sources[mac]}')
else:
if ip in active_hosts:
del active_hosts[ip]
# 记录所有主机的MAC地址
for host in active_hosts:
mac = active_hosts[host]['mac']
if mac in attack_sources:
attack_sources[mac].append(host)
else:
attack_sources[mac] = [host]
# 等待一段时间后再次扫描
time.sleep(ARP_INTERVAL)
其中,发送ARP请求包的函数如下:
def send_arp_request(ip):
'''
发送ARP请求包
'''
# 构造ARP请求包
eth_dst = 'ff:ff:ff:ff:ff:ff'
eth_src = get_local_mac()
arp_dst = '00:00:00:00:00:00'
arp_src = get_local_mac()
arp_op = 1
arp_sha = get_local_mac()
arp_spa = socket.inet_aton(get_local_ip())
arp_tha = '00:00:00:00:00:00'
arp_tpa = socket.inet_aton(ip)
arp_packet = struct.pack('!6s6sHHHBBH6s4s6s4s', binascii.unhexlify(eth_dst.replace(':', '')), binascii.unhexlify(eth_src.replace(':', '')), 0x0806, 1, 0x0800, 6, 4, 1, binascii.unhexlify(arp_sha.replace(':', '')), arp_spa, binascii.unhexlify(arp_tha.replace(':', '')), arp_tpa)
# 发送ARP请求包
try:
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.SOCK_RAW)
s.bind((get_local_interface(), socket.SOCK_RAW))
s.send(arp_packet)
s.close()
except:
return None
# 接收ARP响应包
try:
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.SOCK_RAW)
s.bind((get_local_interface(), socket.SOCK_RAW))
while True:
packet = s.recvfrom(65535)[0]
eth_dst, eth_src, eth_type = struct.unpack('!6s6sH', packet[:14])
if eth_type != 0x0806:
continue
arp_op = struct.unpack('!H', packet[20:22])[0]
arp_sha = ':'.join(['{:02x}'.format(b) for b in packet[22:28]])
arp_spa = socket.inet_ntoa(packet[28:32])
if arp_op != 2 or arp_spa != ip:
continue
return arp_sha
except:
return None
接下来,我们定义一些辅助函数来获取本地IP地址、MAC地址和接口:
def get_local_ip():
'''
获取本地IP地址
'''
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
s.close()
return ip
def get_local_mac():
'''
获取本地MAC地址
'''
mac = ':'.join(['{:02x}'.format((os.getpid() >> i) & 0xff) for i in (24, 16, 8, 0)])
return mac
def get_local_interface():
'''
获取本地接口
'''
with open('/proc/net/route') as f:
for line in f:
fields = line.strip().split()
if fields[1] != '00000000' or not int(fields[3], 16) & 2:
continue
return fields[0]
最后,我们定义一个可视化界面来显示所有事件和记录:
class Application(tk.Frame):
'''
可视化界面
'''
def __init__(self, master=None):
super().__init__(master)
self.master = master
self.pack()
self.create_widgets()
def create_widgets(self):
self.event_label = tk.Label(self, text='事件记录:')
self.event_label.pack()
self.event_listbox = tk.Listbox(self, width=80, height=10)
self.event_listbox.pack()
self.refresh_button = tk.Button(self, text='刷新', command=self.refresh)
self.refresh_button.pack()
self.record_label = tk.Label(self, text='记录保存在:')
self.record_label.pack()
self.record_path_label = tk.Label(self, text=os.path.join(os.getcwd(), 'record.txt'))
self.record_path_label.pack()
def refresh(self):
self.event_listbox.delete(0, tk.END)
for event in event_list:
self.event_listbox.insert(tk.END, event)
最后,我们将所有模块结合起来,通过多线程来实现并启动可视化界面:
if __name__ == '__main__':
# 创建并启动线程
icmp_thread = threading.Thread(target=icmp_flood_detection)
icmp_thread.start()
tcp_thread = threading.Thread(target=tcp_attack_detection)
tcp_thread.start()
arp_thread = threading.Thread(target=arp_attack_detection)
arp_thread.start()
# 创建可视化界面
root = tk.Tk()
root.title('ARP攻击检测防护软件')
app = Application(master=root)
app.mainloop()
完整的Python代码如下:
import os
import sys
import time
import socket
import struct
import threading
import tkinter as tk
from tkinter import messagebox
# ICMP Flood攻击检测阈值
ICMP_THRESHOLD = 100
# TCP攻击检测阈值
TCP_THRESHOLD = 100
# ARP欺骗攻击检测阈值
ARP_THRESHOLD = 100
# ARP欺骗攻击检测间隔
ARP_INTERVAL = 10
# 局域网IP地址前缀
LOCAL_NET_PREFIX = '192.168.0.'
# 记录活动主机的字典
active_hosts = {}
# 记录攻击源的字典
attack_sources = {}
# 记录所有事件的列表
event_list = []
def icmp_flood_detection():
'''
ICMP Flood攻击检测函数
'''
global event_list
icmp_count = {}
while True:
# 统计5秒内的ICMP包数量
time.sleep(5)
for host in active_hosts:
icmp_count[host] = active_hosts[host]['icmp_count']
active_hosts[host]['icmp_count'] = 0
# 判断是否遭受ICMP Flood攻击
for host in icmp_count:
if icmp_count[host] > ICMP_THRESHOLD:
event_list.append(f'ICMP Flood攻击:{host},数量:{icmp_count[host]}')
messagebox.showwarning('警告', f'遭受ICMP Flood攻击:{host},数量:{icmp_count[host]}')
def tcp_attack_detection():
'''
TCP攻击检测函数
'''
global event_list
tcp_count = {}
while True:
# 统计5秒内的TCP连接数量
time.sleep(5)
for host in active_hosts:
tcp_count[host] = active_hosts[host]['tcp_count']
active_hosts[host]['tcp_count'] = 0
# 判断是否遭受TCP攻击
for host in tcp_count:
if tcp_count[host] > TCP_THRESHOLD:
event_list.append(f'TCP攻击:{host},数量:{tcp_count[host]}')
messagebox.showwarning('警告', f'遭受TCP攻击:{host},数量:{tcp_count[host]}')
def arp_attack_detection():
'''
ARP欺骗攻击检测函数
'''
global event_list, attack_sources
while True:
# 扫描局域网内所有IP地址
for i in range(1, 255):
ip = LOCAL_NET_PREFIX + str(i)
if ip in active_hosts:
continue
# 发送ARP请求包
mac = send_arp_request(ip)
if mac:
active_hosts[ip] = {'mac': mac, 'icmp_count': 0, 'tcp_count': 0}
# 判断是否遭受ARP欺骗攻击
if mac in attack_sources:
event_list.append(f'ARP欺骗攻击:{ip},攻击源:{attack_sources[mac]}')
messagebox.showwarning('警告', f'遭受ARP欺骗攻击:{ip},攻击源:{attack_sources[mac]}')
else:
if ip in active_hosts:
del active_hosts[ip]
# 记录所有主机的MAC地址
for host in active_hosts:
mac = active_hosts[host]['mac']
if mac in attack_sources:
attack_sources[mac].append(host)
else:
attack_sources[mac] = [host]
# 等待一段时间后再次扫描
time.sleep(ARP_INTERVAL)
def send_arp_request(ip):
'''
发送ARP请求包
'''
# 构造ARP请求包
eth_dst = 'ff:ff:ff:ff:ff:ff'
eth_src = get_local_mac()
arp_dst = '00:00:00:00:00:00'
arp_src = get_local_mac()
arp_op = 1
arp_sha = get_local_mac()
arp_spa = socket.inet_aton(get_local_ip())
arp_tha = '00:00:00:00:00:00'
arp_tpa = socket.inet_aton(ip)
arp_packet = struct.pack('!6s6sHHHBBH6s4s6s4s', binascii.unhexlify(eth_dst.replace(':', '')), binascii.unhexlify(eth_src.replace(':', '')), 0x0806, 1, 0x0800, 6, 4, 1, binascii.unhexlify(arp_sha.replace(':', '')), arp_spa, binascii.unhexlify(arp_tha.replace(':', '')), arp_tpa)
# 发送ARP请求包
try:
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.SOCK_RAW)
s.bind((get_local_interface(), socket.SOCK_RAW))
s.send(arp_packet)
s.close()
except:
return None
# 接收ARP响应包
try:
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.SOCK_RAW)
s.bind((get_local_interface(), socket.SOCK_RAW))
while True:
packet = s.recvfrom(65535)[0]
eth_dst, eth_src, eth_type = struct.unpack('!6s6sH', packet[:14])
if eth_type != 0x0806:
continue
arp_op = struct.unpack('!H', packet[20:22])[0]
arp_sha = ':'.join(['{:02x}'.format(b) for b in packet[22:28]])
arp_spa = socket.inet_ntoa(packet[28:32])
if arp_op != 2 or arp_spa != ip:
continue
return arp_sha
except:
return None
def get_local_ip():
'''
获取本地IP地址
'''
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
s.close()
return ip
def get_local_mac():
'''
获取本地MAC地址
'''
mac = ':'.join(['{:02x}'.format((os.getpid() >> i) & 0xff) for i in (24, 16, 8, 0)])
return mac
def get_local_interface():
'''
获取本地接口
'''
with open('/proc/net/route') as f:
for line in f:
fields = line.strip().split()
if fields[1] != '00000000' or not int(fields[3], 16) & 2:
continue
return fields[0]
class Application(tk.Frame):
'''
可视化界面
'''
def __init__(self, master=None):
super().__init__(master)
self.master = master
self.pack()
self.create_widgets()
def create_widgets(self):
self.event_label = tk.Label(self, text='事件记录:')
self.event_label.pack()
self.event_listbox = tk.Listbox(self, width=80, height=10)
self.event_listbox.pack()
self.refresh_button = tk.Button(self, text='刷新', command=self.refresh)
self.refresh_button.pack()
self.record_label = tk.Label(self, text='记录保存在:')
self.record_label.pack()
self.record_path_label = tk.Label(self, text=os.path.join(os.getcwd(), 'record.txt'))
self.record_path_label.pack()
def refresh(self):
self.event_listbox.delete(0, tk.END)
for event in event_list:
self.event_listbox.insert(tk.END, event)
if __name__ == '__main__':
# 创建并启动线程
icmp_thread = threading.Thread(target=icmp_flood_detection)
icmp_thread.start()
tcp_thread = threading.Thread(target=tcp_attack_detection)
tcp_thread.start()
arp_thread = threading.Thread(target=arp_attack_detection)
arp_thread.start()
# 创建可视化界面
root = tk.Tk()
root.title('ARP攻击检测防护软件')
app = Application(master=root)
app.mainloop()
原文地址: https://www.cveoy.top/t/topic/jngw 著作权归作者所有。请勿转载和采集!