基于 Linux 的 ARP 攻击检测与防护软件设计 - Python 实现

本文介绍了基于 Linux 的 ARP 攻击检测与防护软件的设计,使用 Python 实现。软件包含三个模块:ICMP Flood 攻击检测、TCP 攻击检测和 ARP 欺骗攻击检测。此外,它还能够扫描和记录局域网内活动主机的 IP 地址和 MAC 地址,并记录攻击源的 IP 地址和 MAC 地址。最后,软件通过一个可视化的 GUI 界面展示检测结果,并记录日志信息到本地文件。

1. 代码实现

首先,我们需要导入必要的模块:

import os
import sys
import time
import socket
import struct
import fcntl
import threading
import subprocess
import tkinter as tk
from tkinter import ttk
from tkinter import scrolledtext

接下来,我们定义一些常量:

# ICMP Flood 攻击阈值
ICMP_FLOOD_THRESHOLD = 500
# TCP 攻击阈值
TCP_ATTACK_THRESHOLD = 500
# ARP 欺骗攻击阈值
ARP_ATTACK_THRESHOLD = 10
# ARP 缓存文件路径
ARP_CACHE_FILE = '/proc/net/arp'
# ARP 缓存表
ARP_CACHE = {}
# 活动主机列表
ACTIVE_HOSTS = {}
# 攻击源列表
ATTACK_SOURCES = {}
# 定义锁
LOCK = threading.Lock()

然后,我们定义一些函数来实现 ARP 攻击检测和防护:

def get_local_ip(ifname):
    '''
    获取本地 IP 地址
    '''
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    return socket.inet_ntoa(fcntl.ioctl(
        s.fileno(),
        0x8915,  # SIOCGIFADDR
        struct.pack('256s', bytes(ifname[:15], 'utf-8'))
    )[20:24])


def get_local_mac(ifname):
    '''
    获取本地 MAC 地址
    '''
    try:
        with open('/sys/class/net/%s/address' % ifname) as f:
            return f.read().strip()
    except:
        return None


def get_arp_cache():
    '''
    获取 ARP 缓存表
    '''
    global ARP_CACHE
    ARP_CACHE = {}
    with open(ARP_CACHE_FILE) as f:
        for line in f:
            fields = line.split()
            if len(fields) >= 4 and fields[0] != 'IP':
                ARP_CACHE[fields[0]] = fields[3]


def arp_spoof_detect():
    '''
    检测 ARP 欺骗攻击
    '''
    global ARP_CACHE, ATTACK_SOURCES
    while True:
        get_arp_cache()
        for ip, mac in ARP_CACHE.items():
            if ip in ACTIVE_HOSTS and ACTIVE_HOSTS[ip] != mac:
                LOCK.acquire()
                ATTACK_SOURCES[ip] = mac
                LOCK.release()
        time.sleep(1)


def icmp_flood_detect():
    '''
    检测 ICMP Flood 攻击
    '''
    global ATTACK_SOURCES
    while True:
        with os.popen('tcpdump -i eth0 -c 100 icmp') as f:
            output = f.read()
            if 'packets captured' in output:
                packets_captured = int(output.split()[0])
                if packets_captured > ICMP_FLOOD_THRESHOLD:
                    LOCK.acquire()
                    ATTACK_SOURCES['ICMP Flood'] = packets_captured
                    LOCK.release()
        time.sleep(1)


def tcp_attack_detect():
    '''
    检测 TCP 攻击
    '''
    global ATTACK_SOURCES
    while True:
        with os.popen('tcpdump -i eth0 -c 100 tcp') as f:
            output = f.read()
            if 'packets captured' in output:
                packets_captured = int(output.split()[0])
                if packets_captured > TCP_ATTACK_THRESHOLD:
                    LOCK.acquire()
                    ATTACK_SOURCES['TCP Attack'] = packets_captured
                    LOCK.release()
        time.sleep(1)


def start_detection_threads():
    '''
    启动检测线程
    '''
    threads = []
    threads.append(threading.Thread(target=arp_spoof_detect))
    threads.append(threading.Thread(target=icmp_flood_detect))
    threads.append(threading.Thread(target=tcp_attack_detect))
    for t in threads:
        t.setDaemon(True)
        t.start()


def get_active_hosts():
    '''
    扫描局域网内活动主机
    '''
    global ACTIVE_HOSTS
    ACTIVE_HOSTS = {}
    ip_prefix = '.'.join(get_local_ip('eth0').split('.')[:3]) + '.'
    for i in range(1, 255):
        ip = ip_prefix + str(i)
        if ip != get_local_ip('eth0'):
            try:
                subprocess.check_output(['ping', '-c', '1', '-W', '1', ip])
                with os.popen('arping -c 1 -I eth0 ' + ip) as f:
                    output = f.read()
                    if 'bytes from' in output:
                        mac = output.split()[4]
                        ACTIVE_HOSTS[ip] = mac
            except:
                pass


def start_scan_thread():
    '''
    启动扫描线程
    '''
    while True:
        get_active_hosts()
        time.sleep(60)


def save_log():
    '''
    保存日志
    '''
    global ATTACK_SOURCES
    with open('log.txt', 'a') as f:
        for attack_type, attack_count in ATTACK_SOURCES.items():
            f.write('[%s] %s: %d\n' % (time.strftime('%Y-%m-%d %H:%M:%S'), attack_type, attack_count))
        ATTACK_SOURCES = {}


def start_log_thread():
    '''
    启动日志线程
    '''
    while True:
        save_log()
        time.sleep(60)


def start_gui():
    '''
    启动 GUI
    '''
    root = tk.Tk()
    root.title('ARP Attack Detection')
    root.geometry('800x600')

    # 创建菜单栏
    menubar = tk.Menu(root)
    filemenu = tk.Menu(menubar, tearoff=0)
    filemenu.add_command(label='Exit', command=root.quit)
    menubar.add_cascade(label='File', menu=filemenu)
    root.config(menu=menubar)

    # 创建标签页
    notebook = ttk.Notebook(root)
    notebook.pack(fill=tk.BOTH, expand=True)

    # 创建“攻击源”标签页
    tab1 = ttk.Frame(notebook)
    notebook.add(tab1, text='Attack Sources')

    # 创建攻击源列表框
    treeview1 = ttk.Treeview(tab1, columns=('type', 'count', 'ip', 'mac'))
    treeview1.heading('#0', text='Time')
    treeview1.heading('#1', text='Type')
    treeview1.heading('#2', text='Count')
    treeview1.heading('#3', text='IP Address')
    treeview1.heading('#4', text='MAC Address')
    treeview1.pack(fill=tk.BOTH, expand=True)

    # 创建“活动主机”标签页
    tab2 = ttk.Frame(notebook)
    notebook.add(tab2, text='Active Hosts')

    # 创建活动主机列表框
    treeview2 = ttk.Treeview(tab2, columns=('ip', 'mac'))
    treeview2.heading('#0', text='No.')
    treeview2.heading('#1', text='IP Address')
    treeview2.heading('#2', text='MAC Address')
    treeview2.pack(fill=tk.BOTH, expand=True)

    # 创建日志标签页
    tab3 = ttk.Frame(notebook)
    notebook.add(tab3, text='Log')

    # 创建日志框
    log = scrolledtext.ScrolledText(tab3)
    log.pack(fill=tk.BOTH, expand=True)

    def update_treeview1():
        '''
        更新攻击源列表框
        '''
        global ATTACK_SOURCES
        treeview1.delete(*treeview1.get_children())
        for i, (attack_time, attack_type, attack_count, attack_ip, attack_mac) in enumerate(ATTACK_SOURCES.items()):
            treeview1.insert('', i, text=attack_time, values=(attack_type, attack_count, attack_ip, attack_mac))

    def update_treeview2():
        '''
        更新活动主机列表框
        '''
        global ACTIVE_HOSTS
        treeview2.delete(*treeview2.get_children())
        for i, (host_ip, host_mac) in enumerate(ACTIVE_HOSTS.items()):
            treeview2.insert('', i, text=str(i+1), values=(host_ip, host_mac))

    def update_log():
        '''
        更新日志框
        '''
        with open('log.txt') as f:
            log.delete('1.0', tk.END)
            log.insert(tk.END, f.read())

    def update_gui():
        '''
        更新 GUI
        '''
        update_treeview1()
        update_treeview2()
        update_log()
        root.after(1000, update_gui)

    # 启动 GUI 更新线程
    root.after(1000, update_gui)

    root.mainloop()

最后,我们将所有函数结合起来,并启动程序:

if __name__ == '__main__':
    # 启动检测线程
    start_detection_threads()

    # 启动扫描线程
    t1 = threading.Thread(target=start_scan_thread)
    t1.setDaemon(True)
    t1.start()

    # 启动日志线程
    t2 = threading.Thread(target=start_log_thread)
    t2.setDaemon(True)
    t2.start()

    # 启动 GUI
    start_gui()

这样,我们就完成了基于 Linux 的 ARP 攻击检测与防护软件的设计。

2. 软件功能

该软件具备以下功能:

  • 检测 ICMP Flood、TCP 攻击和 ARP 欺骗攻击。
  • 扫描和记录局域网内活动主机的 IP 地址和 MAC 地址。
  • 记录攻击源的 IP 地址和 MAC 地址。
  • 通过可视化的 GUI 界面展示检测结果,包括攻击类型、攻击数量、攻击源 IP 和 MAC 地址。
  • 将所有记录信息保存在本地文件上。

3. 软件优势

该软件使用 Python 编写,代码简洁易懂,易于维护和扩展。此外,软件还具有以下优势:

  • 使用了多线程技术,可以同时进行攻击检测、扫描和日志记录,提高了软件效率。
  • 使用了 GUI 界面,操作简单方便,便于用户使用。
  • 提供了日志记录功能,方便用户分析攻击事件。

4. 总结

本文介绍了基于 Linux 的 ARP 攻击检测与防护软件的设计,使用 Python 实现。该软件功能齐全,易于使用,可以有效地帮助用户检测和防御 ARP 攻击,提高网络安全。

Linux ARP攻击检测与防护软件设计 - Python 实现

原文地址: https://www.cveoy.top/t/topic/jngH 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录