以下是基于Linux的ARP攻击检测防护软件的Python代码实现。

首先,我们需要导入相关的模块和库:

import os
import sys
import time
import socket
import struct
import threading
import tkinter as tk
from tkinter import messagebox

然后,我们定义一些全局变量:

# ICMP Flood攻击检测阈值
ICMP_THRESHOLD = 100
# TCP攻击检测阈值
TCP_THRESHOLD = 100
# ARP欺骗攻击检测阈值
ARP_THRESHOLD = 100
# ARP欺骗攻击检测间隔
ARP_INTERVAL = 10
# 局域网IP地址前缀
LOCAL_NET_PREFIX = '192.168.0.'
# 记录活动主机的字典
active_hosts = {}
# 记录攻击源的字典
attack_sources = {}
# 记录所有事件的列表
event_list = []

接下来,我们定义一些函数来实现各个模块的功能。

首先是ICMP Flood攻击检测模块:

def icmp_flood_detection():
    '''
    ICMP Flood攻击检测函数
    '''
    global event_list
    icmp_count = {}
    while True:
        # 统计5秒内的ICMP包数量
        time.sleep(5)
        for host in active_hosts:
            icmp_count[host] = active_hosts[host]['icmp_count']
            active_hosts[host]['icmp_count'] = 0
        # 判断是否遭受ICMP Flood攻击
        for host in icmp_count:
            if icmp_count[host] > ICMP_THRESHOLD:
                event_list.append(f'ICMP Flood攻击:{host},数量:{icmp_count[host]}')
                messagebox.showwarning('警告', f'遭受ICMP Flood攻击:{host},数量:{icmp_count[host]}')

然后是TCP攻击检测模块:

def tcp_attack_detection():
    '''
    TCP攻击检测函数
    '''
    global event_list
    tcp_count = {}
    while True:
        # 统计5秒内的TCP连接数量
        time.sleep(5)
        for host in active_hosts:
            tcp_count[host] = active_hosts[host]['tcp_count']
            active_hosts[host]['tcp_count'] = 0
        # 判断是否遭受TCP攻击
        for host in tcp_count:
            if tcp_count[host] > TCP_THRESHOLD:
                event_list.append(f'TCP攻击:{host},数量:{tcp_count[host]}')
                messagebox.showwarning('警告', f'遭受TCP攻击:{host},数量:{tcp_count[host]}')

最后是ARP欺骗攻击检测模块:

def arp_attack_detection():
    '''
    ARP欺骗攻击检测函数
    '''
    global event_list, attack_sources
    while True:
        # 扫描局域网内所有IP地址
        for i in range(1, 255):
            ip = LOCAL_NET_PREFIX + str(i)
            if ip in active_hosts:
                continue
            # 发送ARP请求包
            mac = send_arp_request(ip)
            if mac:
                active_hosts[ip] = {'mac': mac, 'icmp_count': 0, 'tcp_count': 0}
                # 判断是否遭受ARP欺骗攻击
                if mac in attack_sources:
                    event_list.append(f'ARP欺骗攻击:{ip},攻击源:{attack_sources[mac]}')
                    messagebox.showwarning('警告', f'遭受ARP欺骗攻击:{ip},攻击源:{attack_sources[mac]}')
            else:
                if ip in active_hosts:
                    del active_hosts[ip]
        # 记录所有主机的MAC地址
        for host in active_hosts:
            mac = active_hosts[host]['mac']
            if mac in attack_sources:
                attack_sources[mac].append(host)
            else:
                attack_sources[mac] = [host]
        # 等待一段时间后再次扫描
        time.sleep(ARP_INTERVAL)

其中,发送ARP请求包的函数如下:

def send_arp_request(ip):
    '''
    发送ARP请求包
    '''
    # 构造ARP请求包
    eth_dst = 'ff:ff:ff:ff:ff:ff'
    eth_src = get_local_mac()
    arp_dst = '00:00:00:00:00:00'
    arp_src = get_local_mac()
    arp_op = 1
    arp_sha = get_local_mac()
    arp_spa = socket.inet_aton(get_local_ip())
    arp_tha = '00:00:00:00:00:00'
    arp_tpa = socket.inet_aton(ip)
    arp_packet = struct.pack('!6s6sHHHBBH6s4s6s4s', binascii.unhexlify(eth_dst.replace(':', '')), binascii.unhexlify(eth_src.replace(':', '')), 0x0806, 1, 0x0800, 6, 4, 1, binascii.unhexlify(arp_sha.replace(':', '')), arp_spa, binascii.unhexlify(arp_tha.replace(':', '')), arp_tpa)
    # 发送ARP请求包
    try:
        s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.SOCK_RAW)
        s.bind((get_local_interface(), socket.SOCK_RAW))
        s.send(arp_packet)
        s.close()
    except:
        return None
    # 接收ARP响应包
    try:
        s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.SOCK_RAW)
        s.bind((get_local_interface(), socket.SOCK_RAW))
        while True:
            packet = s.recvfrom(65535)[0]
            eth_dst, eth_src, eth_type = struct.unpack('!6s6sH', packet[:14])
            if eth_type != 0x0806:
                continue
            arp_op = struct.unpack('!H', packet[20:22])[0]
            arp_sha = ':'.join(['{:02x}'.format(b) for b in packet[22:28]])
            arp_spa = socket.inet_ntoa(packet[28:32])
            if arp_op != 2 or arp_spa != ip:
                continue
            return arp_sha
    except:
        return None

接下来,我们定义一些辅助函数来获取本地IP地址、MAC地址和接口:

def get_local_ip():
    '''
    获取本地IP地址
    '''
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.connect(('8.8.8.8', 80))
    ip = s.getsockname()[0]
    s.close()
    return ip


def get_local_mac():
    '''
    获取本地MAC地址
    '''
    mac = ':'.join(['{:02x}'.format((os.getpid() >> i) & 0xff) for i in (24, 16, 8, 0)])
    return mac


def get_local_interface():
    '''
    获取本地接口
    '''
    with open('/proc/net/route') as f:
        for line in f:
            fields = line.strip().split()
            if fields[1] != '00000000' or not int(fields[3], 16) & 2:
                continue
            return fields[0]

最后,我们定义一个可视化界面来显示所有事件和记录:

class Application(tk.Frame):
    '''
    可视化界面
    '''
    def __init__(self, master=None):
        super().__init__(master)
        self.master = master
        self.pack()
        self.create_widgets()

    def create_widgets(self):
        self.event_label = tk.Label(self, text='事件记录:')
        self.event_label.pack()
        self.event_listbox = tk.Listbox(self, width=80, height=10)
        self.event_listbox.pack()
        self.refresh_button = tk.Button(self, text='刷新', command=self.refresh)
        self.refresh_button.pack()
        self.record_label = tk.Label(self, text='记录保存在:')
        self.record_label.pack()
        self.record_path_label = tk.Label(self, text=os.path.join(os.getcwd(), 'record.txt'))
        self.record_path_label.pack()

    def refresh(self):
        self.event_listbox.delete(0, tk.END)
        for event in event_list:
            self.event_listbox.insert(tk.END, event)

最后,我们将所有模块结合起来,通过多线程来实现并启动可视化界面:

if __name__ == '__main__':
    # 创建并启动线程
    icmp_thread = threading.Thread(target=icmp_flood_detection)
    icmp_thread.start()
    tcp_thread = threading.Thread(target=tcp_attack_detection)
    tcp_thread.start()
    arp_thread = threading.Thread(target=arp_attack_detection)
    arp_thread.start()
    # 创建可视化界面
    root = tk.Tk()
    root.title('ARP攻击检测防护软件')
    app = Application(master=root)
    app.mainloop()

完整的Python代码如下:

import os
import sys
import time
import socket
import struct
import threading
import tkinter as tk
from tkinter import messagebox

# ICMP Flood攻击检测阈值
ICMP_THRESHOLD = 100
# TCP攻击检测阈值
TCP_THRESHOLD = 100
# ARP欺骗攻击检测阈值
ARP_THRESHOLD = 100
# ARP欺骗攻击检测间隔
ARP_INTERVAL = 10
# 局域网IP地址前缀
LOCAL_NET_PREFIX = '192.168.0.'
# 记录活动主机的字典
active_hosts = {}
# 记录攻击源的字典
attack_sources = {}
# 记录所有事件的列表
event_list = []

def icmp_flood_detection():
    '''
    ICMP Flood攻击检测函数
    '''
    global event_list
    icmp_count = {}
    while True:
        # 统计5秒内的ICMP包数量
        time.sleep(5)
        for host in active_hosts:
            icmp_count[host] = active_hosts[host]['icmp_count']
            active_hosts[host]['icmp_count'] = 0
        # 判断是否遭受ICMP Flood攻击
        for host in icmp_count:
            if icmp_count[host] > ICMP_THRESHOLD:
                event_list.append(f'ICMP Flood攻击:{host},数量:{icmp_count[host]}')
                messagebox.showwarning('警告', f'遭受ICMP Flood攻击:{host},数量:{icmp_count[host]}')

def tcp_attack_detection():
    '''
    TCP攻击检测函数
    '''
    global event_list
    tcp_count = {}
    while True:
        # 统计5秒内的TCP连接数量
        time.sleep(5)
        for host in active_hosts:
            tcp_count[host] = active_hosts[host]['tcp_count']
            active_hosts[host]['tcp_count'] = 0
        # 判断是否遭受TCP攻击
        for host in tcp_count:
            if tcp_count[host] > TCP_THRESHOLD:
                event_list.append(f'TCP攻击:{host},数量:{tcp_count[host]}')
                messagebox.showwarning('警告', f'遭受TCP攻击:{host},数量:{tcp_count[host]}')

def arp_attack_detection():
    '''
    ARP欺骗攻击检测函数
    '''
    global event_list, attack_sources
    while True:
        # 扫描局域网内所有IP地址
        for i in range(1, 255):
            ip = LOCAL_NET_PREFIX + str(i)
            if ip in active_hosts:
                continue
            # 发送ARP请求包
            mac = send_arp_request(ip)
            if mac:
                active_hosts[ip] = {'mac': mac, 'icmp_count': 0, 'tcp_count': 0}
                # 判断是否遭受ARP欺骗攻击
                if mac in attack_sources:
                    event_list.append(f'ARP欺骗攻击:{ip},攻击源:{attack_sources[mac]}')
                    messagebox.showwarning('警告', f'遭受ARP欺骗攻击:{ip},攻击源:{attack_sources[mac]}')
            else:
                if ip in active_hosts:
                    del active_hosts[ip]
        # 记录所有主机的MAC地址
        for host in active_hosts:
            mac = active_hosts[host]['mac']
            if mac in attack_sources:
                attack_sources[mac].append(host)
            else:
                attack_sources[mac] = [host]
        # 等待一段时间后再次扫描
        time.sleep(ARP_INTERVAL)

def send_arp_request(ip):
    '''
    发送ARP请求包
    '''
    # 构造ARP请求包
    eth_dst = 'ff:ff:ff:ff:ff:ff'
    eth_src = get_local_mac()
    arp_dst = '00:00:00:00:00:00'
    arp_src = get_local_mac()
    arp_op = 1
    arp_sha = get_local_mac()
    arp_spa = socket.inet_aton(get_local_ip())
    arp_tha = '00:00:00:00:00:00'
    arp_tpa = socket.inet_aton(ip)
    arp_packet = struct.pack('!6s6sHHHBBH6s4s6s4s', binascii.unhexlify(eth_dst.replace(':', '')), binascii.unhexlify(eth_src.replace(':', '')), 0x0806, 1, 0x0800, 6, 4, 1, binascii.unhexlify(arp_sha.replace(':', '')), arp_spa, binascii.unhexlify(arp_tha.replace(':', '')), arp_tpa)
    # 发送ARP请求包
    try:
        s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.SOCK_RAW)
        s.bind((get_local_interface(), socket.SOCK_RAW))
        s.send(arp_packet)
        s.close()
    except:
        return None
    # 接收ARP响应包
    try:
        s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.SOCK_RAW)
        s.bind((get_local_interface(), socket.SOCK_RAW))
        while True:
            packet = s.recvfrom(65535)[0]
            eth_dst, eth_src, eth_type = struct.unpack('!6s6sH', packet[:14])
            if eth_type != 0x0806:
                continue
            arp_op = struct.unpack('!H', packet[20:22])[0]
            arp_sha = ':'.join(['{:02x}'.format(b) for b in packet[22:28]])
            arp_spa = socket.inet_ntoa(packet[28:32])
            if arp_op != 2 or arp_spa != ip:
                continue
            return arp_sha
    except:
        return None

def get_local_ip():
    '''
    获取本地IP地址
    '''
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.connect(('8.8.8.8', 80))
    ip = s.getsockname()[0]
    s.close()
    return ip


def get_local_mac():
    '''
    获取本地MAC地址
    '''
    mac = ':'.join(['{:02x}'.format((os.getpid() >> i) & 0xff) for i in (24, 16, 8, 0)])
    return mac


def get_local_interface():
    '''
    获取本地接口
    '''
    with open('/proc/net/route') as f:
        for line in f:
            fields = line.strip().split()
            if fields[1] != '00000000' or not int(fields[3], 16) & 2:
                continue
            return fields[0]

class Application(tk.Frame):
    '''
    可视化界面
    '''
    def __init__(self, master=None):
        super().__init__(master)
        self.master = master
        self.pack()
        self.create_widgets()

    def create_widgets(self):
        self.event_label = tk.Label(self, text='事件记录:')
        self.event_label.pack()
        self.event_listbox = tk.Listbox(self, width=80, height=10)
        self.event_listbox.pack()
        self.refresh_button = tk.Button(self, text='刷新', command=self.refresh)
        self.refresh_button.pack()
        self.record_label = tk.Label(self, text='记录保存在:')
        self.record_label.pack()
        self.record_path_label = tk.Label(self, text=os.path.join(os.getcwd(), 'record.txt'))
        self.record_path_label.pack()

    def refresh(self):
        self.event_listbox.delete(0, tk.END)
        for event in event_list:
            self.event_listbox.insert(tk.END, event)

if __name__ == '__main__':
    # 创建并启动线程
    icmp_thread = threading.Thread(target=icmp_flood_detection)
    icmp_thread.start()
    tcp_thread = threading.Thread(target=tcp_attack_detection)
    tcp_thread.start()
    arp_thread = threading.Thread(target=arp_attack_detection)
    arp_thread.start()
    # 创建可视化界面
    root = tk.Tk()
    root.title('ARP攻击检测防护软件')
    app = Application(master=root)
    app.mainloop()
基于Linux的ARP攻击检测防护软件的设计与实现

原文地址: https://www.cveoy.top/t/topic/jngw 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录