基于Linux的ARP攻击检测防护软件的设计模块包括MAC flood、DHCP flood、CC攻击和ARP欺骗攻击检测并且根据攻击类型输出信息扫描和记录局域网内活动主机的IP地址和MAC地址有一定的防御功能通过中文按钮实现可视化界面将结果保存在日志中。python基本代码实现将模块有机结合起来
以下是基于Python的ARP攻击检测防护软件的基本代码实现:
import os
import sys
import time
import threading
import socket
import struct
import fcntl
import signal
import logging
# 定义常量
MAX_BYTES = 65535
ETHERNET_HEADER_LENGTH = 14
ARP_HEADER_LENGTH = 28
ARP_REQUEST = 1
ARP_REPLY = 2
# 定义日志
logging.basicConfig(filename='arp_attack.log', level=logging.INFO)
# 定义函数:获取本机IP地址和MAC地址
def get_ip_mac(ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
info = fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s', ifname[:15]))
ip = socket.inet_ntoa(info[20:24])
mac = ''.join(['%02x' % ord(char) for char in info[18:24]])
return ip, mac
# 定义函数:发送ARP请求
def send_arp_request(ifname, target_ip):
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.SOCK_RAW)
s.bind((ifname, socket.SOCK_RAW))
src_ip, src_mac = get_ip_mac(ifname)
dst_mac = 'ff:ff:ff:ff:ff:ff'
ether_type = 0x0806
arp_hw_type = 0x0001
arp_proto_type = 0x0800
arp_hw_size = 6
arp_proto_size = 4
arp_opcode = ARP_REQUEST
arp_src_mac = src_mac
arp_src_ip = socket.inet_aton(src_ip)
arp_dst_mac = '00:00:00:00:00:00'
arp_dst_ip = socket.inet_aton(target_ip)
ether_header = struct.pack('!6s6sH', dst_mac, src_mac, ether_type)
arp_header = struct.pack('!HHBBH6s4s6s4s', arp_hw_type, arp_proto_type, arp_hw_size, arp_proto_size, arp_opcode, arp_src_mac, arp_src_ip, arp_dst_mac, arp_dst_ip)
packet = ether_header + arp_header
s.send(packet)
# 定义函数:检测MAC flood攻击
def detect_mac_flood(ifname):
mac_dict = {}
while True:
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.SOCK_RAW)
s.bind((ifname, socket.SOCK_RAW))
packet = s.recv(MAX_BYTES)
src_mac = packet[ETHERNET_HEADER_LENGTH:ETHERNET_HEADER_LENGTH+6]
src_mac_str = ':'.join(['%02x' % ord(char) for char in src_mac])
if src_mac_str in mac_dict:
mac_dict[src_mac_str] += 1
if mac_dict[src_mac_str] > 1000:
logging.warning('MAC flood attack detected: %s' % src_mac_str)
mac_dict[src_mac_str] = 0
else:
mac_dict[src_mac_str] = 1
# 定义函数:检测DHCP flood攻击
def detect_dhcp_flood(ifname):
dhcp_dict = {}
while True:
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.SOCK_RAW)
s.bind((ifname, socket.SOCK_RAW))
packet = s.recv(MAX_BYTES)
src_mac = packet[ETHERNET_HEADER_LENGTH:ETHERNET_HEADER_LENGTH+6]
src_mac_str = ':'.join(['%02x' % ord(char) for char in src_mac])
if packet[ETHERNET_HEADER_LENGTH+12:ETHERNET_HEADER_LENGTH+14] == '\x08\x00' and packet[ETHERNET_HEADER_LENGTH+14] == '\x45' and packet[ETHERNET_HEADER_LENGTH+23] == '\x11' and packet[ETHERNET_HEADER_LENGTH+34:ETHERNET_HEADER_LENGTH+36] == '\x00\x43' and packet[ETHERNET_HEADER_LENGTH+236:ETHERNET_HEADER_LENGTH+238] == '\x00\x06':
if src_mac_str in dhcp_dict:
dhcp_dict[src_mac_str] += 1
if dhcp_dict[src_mac_str] > 1000:
logging.warning('DHCP flood attack detected: %s' % src_mac_str)
dhcp_dict[src_mac_str] = 0
else:
dhcp_dict[src_mac_str] = 1
# 定义函数:检测CC攻击
def detect_cc_attack(ifname):
cc_dict = {}
while True:
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.SOCK_RAW)
s.bind((ifname, socket.SOCK_RAW))
packet = s.recv(MAX_BYTES)
src_ip = socket.inet_ntoa(packet[ETHERNET_HEADER_LENGTH+12:ETHERNET_HEADER_LENGTH+16])
dst_ip = socket.inet_ntoa(packet[ETHERNET_HEADER_LENGTH+16:ETHERNET_HEADER_LENGTH+20])
if src_ip == dst_ip:
if src_ip in cc_dict:
cc_dict[src_ip] += 1
if cc_dict[src_ip] > 1000:
logging.warning('CC attack detected: %s' % src_ip)
cc_dict[src_ip] = 0
else:
cc_dict[src_ip] = 1
# 定义函数:检测ARP欺骗攻击
def detect_arp_spoof(ifname):
arp_dict = {}
while True:
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.SOCK_RAW)
s.bind((ifname, socket.SOCK_RAW))
packet = s.recv(MAX_BYTES)
src_mac = packet[ETHERNET_HEADER_LENGTH:ETHERNET_HEADER_LENGTH+6]
src_mac_str = ':'.join(['%02x' % ord(char) for char in src_mac])
if packet[ETHERNET_HEADER_LENGTH+12:ETHERNET_HEADER_LENGTH+14] == '\x08\x06':
arp_src_ip = socket.inet_ntoa(packet[ETHERNET_HEADER_LENGTH+28:ETHERNET_HEADER_LENGTH+32])
arp_dst_ip = socket.inet_ntoa(packet[ETHERNET_HEADER_LENGTH+38:ETHERNET_HEADER_LENGTH+42])
if arp_src_ip in arp_dict:
if arp_dict[arp_src_ip] != src_mac_str:
logging.warning('ARP spoofing attack detected: %s' % arp_src_ip)
arp_dict[arp_src_ip] = src_mac_str
else:
arp_dict[arp_src_ip] = src_mac_str
if arp_dst_ip in arp_dict:
if arp_dict[arp_dst_ip] != 'unknown':
logging.warning('ARP spoofing attack detected: %s' % arp_dst_ip)
arp_dict[arp_dst_ip] = 'unknown'
else:
arp_dict[arp_dst_ip] = 'unknown'
# 定义函数:扫描局域网内活动主机的IP地址和MAC地址
def scan_network(ifname):
ip_prefix = '.'.join(ifname.split('.')[:-1])
for i in range(1, 255):
target_ip = ip_prefix + '.' + str(i)
send_arp_request(ifname, target_ip)
time.sleep(0.1)
# 定义函数:启动检测和扫描模块
def start_modules(ifname):
t1 = threading.Thread(target=detect_mac_flood, args=(ifname,))
t2 = threading.Thread(target=detect_dhcp_flood, args=(ifname,))
t3 = threading.Thread(target=detect_cc_attack, args=(ifname,))
t4 = threading.Thread(target=detect_arp_spoof, args=(ifname,))
t5 = threading.Thread(target=scan_network, args=(ifname,))
t1.start()
t2.start()
t3.start()
t4.start()
t5.start()
# 定义函数:停止检测和扫描模块
def stop_modules():
os.kill(os.getpid(), signal.SIGINT)
# 定义函数:启动可视化界面
def start_gui():
pass # TODO
# 主函数
if __name__ == '__main__':
if len(sys.argv) < 2:
print('Usage: python arp_attack.py <interface>')
sys.exit(1)
ifname = sys.argv[1]
start_modules(ifname)
start_gui()
signal.signal(signal.SIGINT, lambda signal, frame: stop_modules())
while True:
time.sleep(1)
``
原文地址: http://www.cveoy.top/t/topic/fmmk 著作权归作者所有。请勿转载和采集!