Python网络安全工具:使用Scapy检测ARP欺骗、ICMP泛洪和TCP攻击

这篇文章将介绍如何使用Python和Scapy库构建一个简单的网络安全工具。该工具将具备以下功能:

  • ARP欺骗检测: 监测网络上的ARP欺骗攻击。
  • ICMP泛洪检测: 监测针对主机的ICMP泛洪攻击。
  • TCP攻击检测: 监测针对主机的TCP攻击。
  • 网络扫描: 扫描网络以查找活动主机。

代码

import tkinter as tk
import os
import sys
import time
import threading
import netifaces
from scapy.all import *
from scapy.layers.l2 import ARP
from tkinter import messagebox
from scapy.layers.l2 import getmacbyip

class ArpDetector:
    def __init__(self, interface, timeout=1):
        self.interface = interface
        self.ip_mac_map = {}
        self.attacker_ip = None
        self.attacker_mac = None
        self.thread = None
        self.stop_event = threading.Event()
        self.timeout = timeout

    def start(self):
        self.thread = threading.Thread(target=self.run)
        self.thread.start()

    def stop(self):
        self.stop_event.set()
        self.thread.join()

    def run(self):
        self.attacker_ip = netifaces.ifaddresses(self.interface)[netifaces.AF_INET][0]['addr']
        self.attacker_mac = getmacbyip(self.attacker_ip)
        print(f'Attacker IP: {self.attacker_ip}, MAC: {self.attacker_mac}')

        while not self.stop_event.is_set():
            self.scan_network()
            self.detect_arp_spoofing()
            time.sleep(5)

    def scan_network(self):
        for ip in netifaces.ifaddresses(self.interface)[netifaces.AF_INET][0]['addr'].split('.')[:-1]:
            for i in range(1, 255):
                target_ip = f'{ip}.{i}'
                if target_ip != self.attacker_ip:
                    arp_request = ARP(pdst=target_ip)
                    arp_reply = sr1(arp_request, timeout=self.timeout, verbose=0)
                    if arp_reply and arp_reply.hwsrc not in ('00:00:00:00:00:00', self.attacker_mac):
                        self.ip_mac_map[target_ip] = arp_reply.hwsrc

    def detect_arp_spoofing(self):
        for target_ip, target_mac in self.ip_mac_map.items():
            arp_request = ARP(op=1, pdst=target_ip, hwdst=target_mac, psrc=self.attacker_ip, hwsrc=self.attacker_mac)
            arp_reply = sr1(arp_request, timeout=self.timeout, verbose=0)
            if arp_reply and arp_reply.hwsrc != target_mac:
                print(f'ARP spoofing detected: {target_ip} ({target_mac}) -> {arp_reply.hwsrc}')

class IcmpFloodDetector:
    def __init__(self, interface, timeout=1):
        self.interface = interface
        self.attacker_ip = None
        self.thread = None
        self.stop_event = threading.Event()
        self.timeout = timeout

    def start(self):
        self.thread = threading.Thread(target=self.run)
        self.thread.start()

    def stop(self):
        self.stop_event.set()
        self.thread.join()

    def run(self):
        self.attacker_ip = netifaces.ifaddresses(self.interface)[netifaces.AF_INET][0]['addr']
        print(f'Attacker IP: {self.attacker_ip}')

        while not self.stop_event.is_set():
            self.detect_icmp_flood()
            time.sleep(5)

    def detect_icmp_flood(self):
        icmp_packets = sniff(filter=f'icmp and src host {self.attacker_ip}', timeout=self.timeout, count=10)
        if len(icmp_packets) == 10:
            print('ICMP flood detected')

class TcpAttackDetector:
    def __init__(self, interface, timeout=1):
        self.interface = interface
        self.attacker_ip = None
        self.thread = None
        self.stop_event = threading.Event()
        self.timeout = timeout

    def start(self):
        self.thread = threading.Thread(target=self.run)
        self.thread.start()

    def stop(self):
        self.stop_event.set()
        self.thread.join()

    def run(self):
        self.attacker_ip = netifaces.ifaddresses(self.interface)[netifaces.AF_INET][0]['addr']
        print(f'Attacker IP: {self.attacker_ip}')

        while not self.stop_event.is_set():
            self.detect_tcp_attack()
            time.sleep(5)

    def detect_tcp_attack(self):
        tcp_packets = sniff(filter=f'tcp and src host {self.attacker_ip}', timeout=self.timeout, count=10)
        if len(tcp_packets) == 10:
            print('TCP attack detected')

class NetworkScanner:
    def __init__(self, interface):
        self.interface = interface
        self.ip_mac_map = {}
        self.thread = None
        self.stop_event = threading.Event()

    def start(self):
        self.thread = threading.Thread(target=self.run)
        self.thread.start()

    def stop(self):
        self.stop_event.set()
        self.thread.join()

    def run(self):
        while not self.stop_event.is_set():
            self.scan_network()
            time.sleep(10)

    def scan_network(self):
        for ip in netifaces.ifaddresses(self.interface)[netifaces.AF_INET][0]['addr'].split('.')[:-1]:
            for i in range(1, 255):
                target_ip = f'{ip}.{i}'
                if target_ip not in self.ip_mac_map:
                    arp_request = ARP(pdst=target_ip)
                    arp_reply = sr1(arp_request, timeout=1, verbose=0)
                    if arp_reply and arp_reply.hwsrc not in ('00:00:00:00:00:00', 'ff:ff:ff:ff:ff:ff'):
                        self.ip_mac_map[target_ip] = arp_reply.hwsrc
                        print(f'Found: {target_ip} ({arp_reply.hwsrc})')
                        self.save_to_file(target_ip, arp_reply.hwsrc)

    def save_to_file(self, ip, mac):
        with open('network_scan.txt', 'a') as f:
            f.write(f'{ip},{mac}
')

class LoginWindow:
    def __init__(self):
        self.root = tk.Tk()
        self.root.title('Login')
        self.root.geometry('300x150')

        tk.Label(self.root, text='Username').place(x=50, y=30)
        self.username_entry = tk.Entry(self.root)
        self.username_entry.place(x=120, y=30)

        tk.Label(self.root, text='Password').place(x=50, y=60)
        self.password_entry = tk.Entry(self.root, show='*')
        self.password_entry.place(x=120, y=60)

        self.login_button = tk.Button(self.root, text='Login', command=self.login)
        self.login_button.place(x=100, y=100)

        self.register_button = tk.Button(self.root, text='Register', command=self.register)
        self.register_button.place(x=170, y=100)

        self.root.mainloop()

    def show_main_window(self):
            self.root.destroy()
            MainWindow()

    def login(self):
        username = self.username_entry.get()
        password = self.password_entry.get()
        for user in users:
            if user['username'] == username and user['password'] == password:
                # 登录成功,跳转到主界面
                self.show_main_window()
                return
            # 登录失败,弹出错误提示
        messagebox.showinfo('错误', '用户名或密码错误')

    def register(self):
        username = self.username_entry.get()
        password = self.password_entry.get()
        for user in users:
            if user['username'] == username:
                # 用户名已存在,弹出错误提示
                messagebox.showerror('错误', '用户名已存在')
                return

        # 用户名不存在,将新用户添加到用户列表中
        users.append({'username': username, 'password': password})
        # 注册成功,弹出成功提示
        messagebox.showinfo('提示', '注册成功,请登录')



class MainWindow:
    def __init__(self):
        self.root = tk.Tk()
        self.root.title('网络安全工具')
        self.root.geometry('400x300')

        self.status_label = tk.Label(self.root, text='Idle')
        self.status_label.pack()

        self.start_button = tk.Button(self.root, text='启动所有检测', command=self.start_detection)
        self.start_button.pack()

        self.stop_button = tk.Button(self.root, text='停止所有检测', command=self.stop_detection, state=tk.DISABLED)
        self.stop_button.pack()

        self.arp_button = tk.Button(self.root, text='ARP欺骗检测', command=self.toggle_arp_detection, state=tk.DISABLED)
        self.arp_button.pack()

        self.icmp_button = tk.Button(self.root, text='ICMP泛洪检测', command=self.toggle_icmp_detection, state=tk.DISABLED)
        self.icmp_button.pack()

        self.tcp_button = tk.Button(self.root, text='TCP攻击检测', command=self.toggle_tcp_detection, state=tk.DISABLED)
        self.tcp_button.pack()

        self.network_button = tk.Button(self.root, text='网络扫描', command=self.toggle_network_scanner, state=tk.DISABLED)
        self.network_button.pack()

        self.root.mainloop()

    def start_detection(self):
        self.status_label.config(text='运行中')
        self.start_button.config(state=tk.DISABLED)
        self.stop_button.config(state=tk.NORMAL)
        self.arp_button.config(state=tk.NORMAL)
        self.icmp_button.config(state=tk.NORMAL)
        self.tcp_button.config(state=tk.NORMAL)
        self.network_button.config(state=tk.NORMAL)

        self.arp_detector = ArpDetector('ens33')
        self.arp_detector.start()

        self.icmp_flood_detector = IcmpFloodDetector('ens33')
        self.icmp_flood_detector.start()

        self.tcp_attack_detector = TcpAttackDetector('ens33')
        self.tcp_attack_detector.start()

        self.network_scanner = NetworkScanner('ens33')
        self.network_scanner.start()

    def stop_detection(self):
        self.status_label.config(text='Idle')
        self.start_button.config(state=tk.NORMAL)
        self.stop_button.config(state=tk.DISABLED)
        self.arp_button.config(state=tk.DISABLED)
        self.icmp_button.config(state=tk.DISABLED)
        self.tcp_button.config(state=tk.DISABLED)
        self.network_button.config(state=tk.DISABLED)

        self.arp_detector.stop()
        self.icmp_flood_detector.stop()
        self.tcp_attack_detector.stop()
        self.network_scanner.stop()

    def toggle_arp_detection(self):
        if self.arp_detector.thread is None or not self.arp_detector.thread.is_alive():
            self.arp_detector = ArpDetector('ens33')
            self.arp_detector.start()
            self.arp_button.config(text='停止ARP欺骗检测')
        else:
            self.arp_detector.stop()
            self.arp_button.config(text='启动ARP欺骗检测')

    def toggle_icmp_detection(self):
        if self.icmp_flood_detector.thread is None or not self.icmp_flood_detector.thread.is_alive():
            self.icmp_flood_detector = IcmpFloodDetector('ens33')
            self.icmp_flood_detector.start()
            self.icmp_button.config(text='停止ICMP泛洪检测')
        else:
            self.icmp_flood_detector.stop()
            self.icmp_button.config(text='启动ICMP泛洪检测')

    def toggle_tcp_detection(self):
        if self.tcp_attack_detector.thread is None or not self.tcp_attack_detector.thread.is_alive():
            self.tcp_attack_detector = TcpAttackDetector('ens33')
            self.tcp_attack_detector.start()
            self.tcp_button.config(text='停止TCP攻击检测')
        else:
            self.tcp_attack_detector.stop()
            self.tcp_button.config(text='启动TCP攻击检测')

    def toggle_network_scanner(self):
        if self.network_scanner.thread is None or not self.network_scanner.thread.is_alive():
            self.network_scanner = NetworkScanner('ens33')
            self.network_scanner.start()
            self.network_button.config(text='停止网络扫描')
        else:
            self.network_scanner.stop()
            self.network_button.config(text='启动网络扫描')

if __name__ == '__main__':
    users = [{'username': 'admin', 'password': 'admin'}]
    login_window = LoginWindow()


使用方法:

  1. 安装所需的库: pip install scapy netifaces
  2. 将代码保存为Python文件(例如network_security_tool.py)。
  3. 使用以下命令运行该工具: sudo python3 network_security_tool.py
  4. 该工具将启动一个GUI界面,您可以使用该界面启动和停止不同的检测模块。

注意:

  • 该工具需要root权限才能运行。
  • 您需要将代码中的'ens33'替换为您要使用的网络接口的名称。

总结

这个简单的网络安全工具演示了如何使用Python和Scapy库执行各种网络安全任务。您可以扩展此代码以添加更多功能,例如数据包分析、入侵防御等。

Python网络安全工具:使用Scapy检测ARP欺骗、ICMP泛洪和TCP攻击

原文地址: https://www.cveoy.top/t/topic/jnVg 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录