基于PyQt5和Scapy的增强型网络嗅探器
import sys
from PyQt5.QtWidgets import QApplication, QMainWindow, QPushButton, QComboBox, QTableWidget, QTableWidgetItem, QVBoxLayout, QHBoxLayout, QWidget, QTextEdit, QHeaderView
from scapy.all import sniff, hexdump
import threading
class Sniffer(QMainWindow):
def __init__(self):
super().__init__()
self.initUI()
def initUI(self):
self.setWindowTitle('PyQt5 网络嗅探器')
self.setGeometry(100, 100, 1200, 800)
mainLayout = QVBoxLayout()
controlLayout1 = QHBoxLayout()
controlLayout2 = QHBoxLayout()
self.comboBoxProtocol = QComboBox(self)
self.comboBoxProtocol.addItems(['All', 'HTTP', 'HTTPS', 'FTP', 'SMTP', 'IMAP', 'POP3', 'DNS'])
controlLayout1.addWidget(self.comboBoxProtocol)
self.comboBoxFilter = QComboBox(self)
self.comboBoxFilter.addItems(['All', 'Source IP', 'Destination IP', 'Source MAC', 'Destination MAC'])
controlLayout1.addWidget(self.comboBoxFilter)
self.startButton = QPushButton('开始嗅探', self)
self.startButton.clicked.connect(self.start_sniffing)
controlLayout2.addWidget(self.startButton)
self.stopButton = QPushButton('停止嗅探', self)
self.stopButton.clicked.connect(self.stop_sniffing)
controlLayout2.addWidget(self.stopButton)
self.resetButton = QPushButton('重置', self)
self.resetButton.clicked.connect(self.reset_table)
controlLayout2.addWidget(self.resetButton)
mainLayout.addLayout(controlLayout1)
mainLayout.addLayout(controlLayout2)
self.tableWidget = QTableWidget(self)
self.tableWidget.setColumnCount(5)
self.tableWidget.setHorizontalHeaderLabels(['源 IP', '源端口', '目标 IP', '目标端口', '长度'])
self.tableWidget.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch)
self.tableWidget.selectionModel().selectionChanged.connect(self.show_packet_detail)
mainLayout.addWidget(self.tableWidget)
self.textEdit = QTextEdit(self)
mainLayout.addWidget(self.textEdit)
container = QWidget()
container.setLayout(mainLayout)
self.setCentralWidget(container)
self.stop_sniffing_flag = False
self.packets = []
def packet_callback(self, packet):
self.packets.append(packet)
src_ip = packet['IP'].src if packet.haslayer('IP') else '-'
dst_ip = packet['IP'].dst if packet.haslayer('IP') else '-'
src_port = packet[1].sport if hasattr(packet[1], 'sport') else '-'
dst_port = packet[1].dport if hasattr(packet[1], 'dport') else '-'
length = len(packet)
rowPosition = self.tableWidget.rowCount()
self.tableWidget.insertRow(rowPosition)
self.tableWidget.setItem(rowPosition, 0, QTableWidgetItem(src_ip))
self.tableWidget.setItem(rowPosition, 1, QTableWidgetItem(str(src_port)))
self.tableWidget.setItem(rowPosition, 2, QTableWidgetItem(dst_ip))
self.tableWidget.setItem(rowPosition, 3, QTableWidgetItem(str(dst_port)))
self.tableWidget.setItem(rowPosition, 4, QTableWidgetItem(str(length)))
def start_sniffing(self):
self.startButton.setEnabled(False)
protocol = self.comboBoxProtocol.currentText().lower()
filter_option = self.comboBoxFilter.currentText()
if protocol == 'all':
filter_str = 'ip'
else:
filter_str = {
'http': 'tcp port 80',
'https': 'tcp port 443',
'ftp': 'tcp port 21',
'smtp': 'tcp port 25',
'imap': 'tcp port 143',
'pop3': 'tcp port 110',
'dns': 'udp port 53'
}.get(protocol, 'ip')
if filter_option == 'Source IP':
filter_str += ' and src host ' + input('请输入源 IP 地址:')
elif filter_option == 'Destination IP':
filter_str += ' and dst host ' + input('请输入目标 IP 地址:')
elif filter_option == 'Source MAC':
filter_str += ' and ether src ' + input('请输入源 MAC 地址:')
elif filter_option == 'Destination MAC':
filter_str += ' and ether dst ' + input('请输入目标 MAC 地址:')
self.stop_sniffing_flag = False
t = threading.Thread(target=self.sniff_thread, args=(filter_str,))
t.start()
def sniff_thread(self, filter_str):
sniff(prn=self.packet_callback, filter=filter_str, stop_filter=lambda x: self.stop_sniffing_flag)
def stop_sniffing(self):
self.stop_sniffing_flag = True
self.startButton.setEnabled(True)
def reset_table(self):
self.tableWidget.setRowCount(0)
self.textEdit.clear()
self.packets = []
def show_packet_detail(self):
self.textEdit.clear()
for selectionRange in self.tableWidget.selectedRanges():
for row in range(selectionRange.topRow(), selectionRange.bottomRow() + 1):
packet = self.packets[row]
self.textEdit.append(packet.show(dump=True))
self.textEdit.append('Hexdump:\n')
hexdump(packet, dump=True, full=True, file=self.textEdit)
if __name__ == '__main__':
app = QApplication(sys.argv)
mainWin = Sniffer()
mainWin.show()
sys.exit(app.exec_())
原文地址: https://www.cveoy.top/t/topic/bpUM 著作权归作者所有。请勿转载和采集!