基于Python的Linux ARP攻击检测与防护软件
以下是基于Python的Linux ARP攻击检测与防护软件的基本代码实现:
import os
import sys
import time
import socket
import struct
import fcntl
import subprocess
import threading
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
# ICMP Flood检测模块
class IcmpFloodDetector:
def __init__(self, iface):
self.iface = iface
self.icmp_count = {}
self.icmp_lock = threading.Lock()
def detect(self):
cmd = "tcpdump -i {} icmp -c 1000".format(self.iface)
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
for line in iter(p.stdout.readline, b''):
if "ICMP echo request" in line:
src_ip = self.get_ip(line)
self.icmp_lock.acquire()
if src_ip in self.icmp_count:
self.icmp_count[src_ip] += 1
if self.icmp_count[src_ip] > 100:
print("ICMP Flood detected from {}".format(src_ip))
else:
self.icmp_count[src_ip] = 1
self.icmp_lock.release()
def get_ip(self, line):
ip_start = line.find("IP ") + 3
ip_end = line.find(" > ")
return line[ip_start:ip_end]
# TCP攻击检测模块
class TcpAttackDetector:
def __init__(self, iface):
self.iface = iface
self.tcp_count = {}
self.tcp_lock = threading.Lock()
def detect(self):
cmd = "tcpdump -i {} tcp -c 1000".format(self.iface)
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
for line in iter(p.stdout.readline, b''):
if "Flags [S]" in line:
src_ip = self.get_ip(line)
self.tcp_lock.acquire()
if src_ip in self.tcp_count:
self.tcp_count[src_ip] += 1
if self.tcp_count[src_ip] > 100:
print("TCP Attack detected from {}".format(src_ip))
else:
self.tcp_count[src_ip] = 1
self.tcp_lock.release()
def get_ip(self, line):
ip_start = line.find("IP ") + 3
ip_end = line.find(" > ")
return line[ip_start:ip_end]
# ARP欺骗攻击检测模块
class ArpAttackDetector:
def __init__(self, iface):
self.iface = iface
self.arp_table = {}
self.arp_lock = threading.Lock()
def detect(self):
cmd = "arp -i {} -n".format(self.iface)
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
for line in iter(p.stdout.readline, b''):
if "incomplete" in line:
continue
parts = line.split()
if len(parts) < 3:
continue
ip = parts[0]
mac = parts[2]
self.arp_lock.acquire()
if ip in self.arp_table:
if self.arp_table[ip] != mac:
print("ARP Attack detected from {}: {} -> {}".format(ip, self.arp_table[ip], mac))
self.arp_table[ip] = mac
else:
self.arp_table[ip] = mac
self.arp_lock.release()
# IP/MAC地址扫描模块
class IpMacScanner:
def __init__(self, iface):
self.iface = iface
self.ip_mac_table = {}
self.ip_mac_lock = threading.Lock()
def scan(self):
ip_prefix = self.get_ip_prefix()
for i in range(1, 255):
ip = ip_prefix + str(i)
mac = self.get_mac(ip)
if mac:
self.ip_mac_lock.acquire()
self.ip_mac_table[ip] = mac
self.ip_mac_lock.release()
def get_ip_prefix(self):
cmd = "ifconfig {} | grep 'inet ' | awk '{{print $2}}'".format(self.iface)
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
ip = p.stdout.read().strip().decode()
return ip[:ip.rfind('.')+1]
def get_mac(self, ip):
try:
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(0x0800))
s.bind((self.iface, 0))
arp_req = self.create_arp_request(ip)
s.send(arp_req)
pkt = s.recvfrom(2048)[0]
mac = pkt[6:12].hex(':')
return mac
except:
return None
def create_arp_request(self, ip):
mac = self.get_mac_address(self.iface)
src_ip = self.get_ip_address(self.iface)
dst_mac = "ff:ff:ff:ff:ff:ff"
dst_ip = ip
arp_header = struct.pack("!HHBBH6s4s6s4s", 0x0001, 0x0800, 0x06, 0x00, 0x01, mac, socket.inet_aton(src_ip), dst_mac, socket.inet_aton(dst_ip))
return arp_header
def get_mac_address(self, iface):
with open('/sys/class/net/{}/address'.format(iface), 'r') as f:
return f.read().strip()
def get_ip_address(self, iface):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s', iface[:15].encode()))[20:24])
# 可视化界面模块
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("ARP Attack Detector")
self.icmp_flood_button = QPushButton("ICMP Flood Detection", self)
self.icmp_flood_button.setGeometry(10, 10, 200, 30)
self.icmp_flood_button.clicked.connect(self.start_icmp_flood_detection)
self.tcp_attack_button = QPushButton("TCP Attack Detection", self)
self.tcp_attack_button.setGeometry(10, 50, 200, 30)
self.tcp_attack_button.clicked.connect(self.start_tcp_attack_detection)
self.arp_attack_button = QPushButton("ARP Attack Detection", self)
self.arp_attack_button.setGeometry(10, 90, 200, 30)
self.arp_attack_button.clicked.connect(self.start_arp_attack_detection)
self.scan_button = QPushButton("IP/MAC Address Scan", self)
self.scan_button.setGeometry(10, 130, 200, 30)
self.scan_button.clicked.connect(self.start_ip_mac_scan)
self.status_label = QLabel("Ready", self)
self.status_label.setGeometry(10, 170, 200, 30)
def start_icmp_flood_detection(self):
self.status_label.setText("Detecting ICMP Flood...")
self.detector = IcmpFloodDetector("eth0")
self.thread = threading.Thread(target=self.detector.detect)
self.thread.start()
def start_tcp_attack_detection(self):
self.status_label.setText("Detecting TCP Attack...")
self.detector = TcpAttackDetector("eth0")
self.thread = threading.Thread(target=self.detector.detect)
self.thread.start()
def start_arp_attack_detection(self):
self.status_label.setText("Detecting ARP Attack...")
self.detector = ArpAttackDetector("eth0")
self.thread = threading.Thread(target=self.detector.detect)
self.thread.start()
def start_ip_mac_scan(self):
self.status_label.setText("Scanning IP/MAC Address...")
self.scanner = IpMacScanner("eth0")
self.thread = threading.Thread(target=self.scanner.scan)
self.thread.start()
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MainWindow()
window.setGeometry(100, 100, 220, 210)
window.show()
sys.exit(app.exec_())
该代码实现了一个基本的可视化界面,包括四个按钮:ICMP Flood检测、TCP攻击检测、ARP欺骗攻击检测和IP/MAC地址扫描。当用户点击其中一个按钮时,程序会启动一个新线程进行相应的检测或扫描,并在界面上显示当前状态。检测结果会在控制台输出,同时保存到本地文件中。
该软件可以帮助用户检测和防御ARP攻击,保护网络安全。用户可以根据自己的需求调整软件的功能和参数。
原文地址: https://www.cveoy.top/t/topic/jm1Q 著作权归作者所有。请勿转载和采集!