from flask import Flask, render_template, request, redirect, url_for, jsonify
import os
import time
import sys
import platform
from scapy.all import *

app = Flask(__name__, template_folder='path/to/templates')  # 指定模板目录路径
app.secret_key = 'secret_key'

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/register.html', methods=['GET', 'POST'])
def register():
    if request.method == 'POST':
        with open('user.txt', 'a') as f:
            f.write(request.form['username'] + ' ' + request.form['password'] + '\n')
        flash('注册成功!')
        return redirect(url_for('login'))
    return render_template('register.html')

@app.route('/login.html', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        with open('user.txt', 'r') as f:
            users = f.readlines()
            for user in users:
                if user.split()[0] == request.form['username'] and user.split()[1] == request.form['password']:
                    session['username'] = request.form['username']
                    flash('登录成功!')
                    return redirect(url_for('func.html'))
            flash('用户名或密码错误!')
    return render_template('login.html')

@app.route('/func')
def func():
    if 'username' not in session:
        return redirect(url_for('login'))
    return render_template('func')

@app.route('/icmp_flood')
def icmp_flood():
    sniff_filter = 'icmp'
    sniff_timeout = 10
    sniff_count = 0
    sniff_packets = sniff(filter=sniff_filter, timeout=sniff_timeout)
    for packet in sniff_packets:
        sniff_count += 1
    if sniff_count > 100:
        flash('检测到ICMP flood攻击!')
        with open('detection_log.txt', 'a') as f:
            f.write('ICMP flood攻击检测 - 检测结果:检测到ICMP flood攻击\n')
    else:
        flash('未检测到ICMP flood攻击!')
        with open('detection_log.txt', 'a') as f:
            f.write('ICMP flood攻击检测 - 检测结果:未检测到ICMP flood攻击\n')
    return redirect(url_for('func'))

@app.route('/tcp_attack')
def tcp_attack():
    sniff_filter = 'tcp'
    sniff_timeout = 10
    sniff_count = 0
    sniff_packets = sniff(filter=sniff_filter, timeout=sniff_timeout)
    for packet in sniff_packets:
        sniff_count += 1
    if sniff_count > 100:
        flash('检测到TCP攻击!')
        with open('detection_log.txt', 'a') as f:
            f.write('TCP攻击检测 - 检测结果:检测到TCP攻击\n')
    else:
        flash('未检测到TCP攻击!')
        with open('detection_log.txt', 'a') as f:
            f.write('TCP攻击检测 - 检测结果:未检测到TCP攻击\n')
    return redirect(url_for('func'))

@app.route('/arp_spoofing')
def arp_spoofing():
    sniff_filter = 'arp'
    sniff_timeout = 10
    sniff_count = 0
    sniff_packets = sniff(filter=sniff_filter, timeout=sniff_timeout)
    for packet in sniff_packets:
        sniff_count += 1
    if sniff_count > 100:
        flash('检测到ARP欺骗攻击!')
        with open('detection_log.txt', 'a') as f:
            f.write('ARP欺骗攻击检测 - 检测结果:检测到ARP欺骗攻击\n')
    else:
        flash('未检测到ARP欺骗攻击!')
        with open('detection_log.txt', 'a') as f:
            f.write('ARP欺骗攻击检测 - 检测结果:未检测到ARP欺骗攻击\n')
    return redirect(url_for('func'))

@app.route('/detect_attack')
def detect_attack():
    with open('clients.txt', 'r') as f:
        clients = f.readlines()
        for client in clients:
            ip = client.split()[0]
            mac = client.split()[1]
            sniff_filter = 'arp and src host ' + ip
            sniff_timeout = 10
            sniff_count = 0
            sniff_packets = sniff(filter=sniff_filter, timeout=sniff_timeout)
            for packet in sniff_packets:
                sniff_count += 1
            if sniff_count > 100:
                flash('检测到攻击源IP地址为' + ip + ',MAC地址为' + mac + '!')
                with open('detection_log.txt', 'a') as f:
                    f.write('警告' + '检测到攻击源IP地址为' + ip + ',MAC地址为' + mac + '!\n')
            else:
                flash('未检测到攻击源!')
                with open('detection_log.txt', 'a') as f:
                    f.write('提示' + '未检测到攻击源!\n')
    return redirect(url_for('func'))

@app.route('/scan_network')
def scan_network():
    network = '192.168.31.0/24'
    arp_request = ARP(pdst=network)
    broadcast = Ether(dst='ff:ff:ff:ff:ff:ff')
    arp_broadcast = broadcast/arp_request
    answered_list = srp(arp_broadcast, timeout=1, verbose=False)[0]
    clients = []
    for element in answered_list:
        client = {'ip': element[1].psrc, 'mac': element[1].hwsrc}
        clients.append(client)
    with open('clients.txt', 'w') as f:
        for client in clients:
            f.write(client['ip'] + ' ' + client['mac'] + '\n')
    flash('扫描完成!')
    return redirect(url_for('func.html'))

@app.route('/view_detection_log')
def view_detection_log():
    with open('detection_log.txt', 'r') as f:
        detection_log = f.read()
    flash(detection_log)
    return redirect(url_for('func.html'))

if __name__ == '__main__':
    app.run()

注意:

  • 以上代码使用了 scapy 库来进行网络数据包的分析,请确保你已经安装了 scapy 库。
  • 代码中的 template_folder 变量需要替换为你的模板目录的实际路径。
  • 代码中的 user.txt 文件用来存储用户注册信息, clients.txt 文件用来存储网络扫描结果, detection_log.txt 文件用来记录检测到的攻击日志。
  • 代码中使用 flash() 函数来显示消息提示,需要在你的模板文件中使用 {{ get_flashed_messages() }} 来显示这些消息。
  • 这只是一个简单的网络安全检测系统,你可以根据自己的需求进行扩展和改进。
Flask网络安全检测系统: 识别和防御常见网络攻击

原文地址: https://www.cveoy.top/t/topic/jobv 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录