import sys
from PyQt5.QtWidgets import QApplication, QMainWindow, QPushButton, QComboBox, QTableWidget, QTableWidgetItem, QVBoxLayout, QHBoxLayout, QWidget, QTextEdit, QHeaderView
from scapy.all import sniff, hexdump
import threading

class Sniffer(QMainWindow):
    def __init__(self):
        super().__init__()
        self.initUI()

    def initUI(self):
        self.setWindowTitle('PyQt5 网络嗅探器')
        self.setGeometry(100, 100, 1200, 800)

        mainLayout = QVBoxLayout()

        controlLayout1 = QHBoxLayout()
        controlLayout2 = QHBoxLayout()

        self.comboBoxProtocol = QComboBox(self)
        self.comboBoxProtocol.addItems(['All', 'HTTP', 'HTTPS', 'FTP', 'SMTP', 'IMAP', 'POP3', 'DNS'])
        controlLayout1.addWidget(self.comboBoxProtocol)

        self.comboBoxFilter = QComboBox(self)
        self.comboBoxFilter.addItems(['All', 'Source IP', 'Destination IP', 'Source MAC', 'Destination MAC'])
        controlLayout1.addWidget(self.comboBoxFilter)

        self.startButton = QPushButton('开始嗅探', self)
        self.startButton.clicked.connect(self.start_sniffing)
        controlLayout2.addWidget(self.startButton)

        self.stopButton = QPushButton('停止嗅探', self)
        self.stopButton.clicked.connect(self.stop_sniffing)
        controlLayout2.addWidget(self.stopButton)

        self.resetButton = QPushButton('重置', self)
        self.resetButton.clicked.connect(self.reset_table)
        controlLayout2.addWidget(self.resetButton)

        mainLayout.addLayout(controlLayout1)
        mainLayout.addLayout(controlLayout2)

        self.tableWidget = QTableWidget(self)
        self.tableWidget.setColumnCount(5)
        self.tableWidget.setHorizontalHeaderLabels(['源 IP', '源端口', '目标 IP', '目标端口', '长度'])
        self.tableWidget.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch)
        self.tableWidget.selectionModel().selectionChanged.connect(self.show_packet_detail)
        mainLayout.addWidget(self.tableWidget)

        self.textEdit = QTextEdit(self)
        mainLayout.addWidget(self.textEdit)

        container = QWidget()
        container.setLayout(mainLayout)
        self.setCentralWidget(container)

        self.stop_sniffing_flag = False
        self.packets = []

    def packet_callback(self, packet):
        self.packets.append(packet)
        src_ip = packet['IP'].src if packet.haslayer('IP') else '-'
        dst_ip = packet['IP'].dst if packet.haslayer('IP') else '-'
        src_port = packet[1].sport if hasattr(packet[1], 'sport') else '-'
        dst_port = packet[1].dport if hasattr(packet[1], 'dport') else '-'
        length = len(packet)

        rowPosition = self.tableWidget.rowCount()
        self.tableWidget.insertRow(rowPosition)
        self.tableWidget.setItem(rowPosition, 0, QTableWidgetItem(src_ip))
        self.tableWidget.setItem(rowPosition, 1, QTableWidgetItem(str(src_port)))
        self.tableWidget.setItem(rowPosition, 2, QTableWidgetItem(dst_ip))
        self.tableWidget.setItem(rowPosition, 3, QTableWidgetItem(str(dst_port)))
        self.tableWidget.setItem(rowPosition, 4, QTableWidgetItem(str(length)))

    def start_sniffing(self):
        self.startButton.setEnabled(False)
        protocol = self.comboBoxProtocol.currentText().lower()
        filter_option = self.comboBoxFilter.currentText()

        if protocol == 'all':
            filter_str = 'ip'
        else:
            filter_str = {
                'http': 'tcp port 80',
                'https': 'tcp port 443',
                'ftp': 'tcp port 21',
                'smtp': 'tcp port 25',
                'imap': 'tcp port 143',
                'pop3': 'tcp port 110',
                'dns': 'udp port 53'
            }.get(protocol, 'ip')

        if filter_option == 'Source IP':
            filter_str += ' and src host ' + input('请输入源 IP 地址:')
        elif filter_option == 'Destination IP':
            filter_str += ' and dst host ' + input('请输入目标 IP 地址:')
        elif filter_option == 'Source MAC':
            filter_str += ' and ether src ' + input('请输入源 MAC 地址:')
        elif filter_option == 'Destination MAC':
            filter_str += ' and ether dst ' + input('请输入目标 MAC 地址:')

        self.stop_sniffing_flag = False
        t = threading.Thread(target=self.sniff_thread, args=(filter_str,))
        t.start()

    def sniff_thread(self, filter_str):
        sniff(prn=self.packet_callback, filter=filter_str, stop_filter=lambda x: self.stop_sniffing_flag)

    def stop_sniffing(self):
        self.stop_sniffing_flag = True
        self.startButton.setEnabled(True)

    def reset_table(self):
        self.tableWidget.setRowCount(0)
        self.textEdit.clear()
        self.packets = []

    def show_packet_detail(self):
        self.textEdit.clear()
        for selectionRange in self.tableWidget.selectedRanges():
            for row in range(selectionRange.topRow(), selectionRange.bottomRow() + 1):
                packet = self.packets[row]
                self.textEdit.append(packet.show(dump=True))
                self.textEdit.append('Hexdump:\n')
                hexdump(packet, dump=True, full=True, file=self.textEdit)

if __name__ == '__main__':
    app = QApplication(sys.argv)
    mainWin = Sniffer()
    mainWin.show()
    sys.exit(app.exec_())
基于PyQt5和Scapy的增强型网络嗅探器

原文地址: https://www.cveoy.top/t/topic/bpUM 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录