在使用 as_completed() 时,应该将其放在一个 for 循环中,而不是直接调用。因此,应该将代码修改为:

for future in as_completed(futures, timeout=60):
    try:
        future.result()
    except Exception as e:
        print(e)

修改后的代码:

import requests
import time
import os
import urllib3
import sys
import random
from bs4 import BeautifulSoup
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed  # 添加 as_completed
from threading import Lock
from colorama import Fore, init

now_time = time.strftime('%Y-%m-%d %H-%M')

# 读取 Dorks
def work(dorks):
    with open(dorks, mode='r', encoding='utf-8') as file:
        read_content = file.readlines()
        # 将内容加入列表
        content = [result.strip() for result in read_content]
        # 返回数量丢给任务池
        return len(read_content), content

# Google 搜索
def google_serach(query, locks, filename):
    try:
        # 关闭 HTTPS 报错信息
        urllib3.disable_warnings()
        filename = os.path.join(os.getcwd(), f'{filename}.txt')
        domains = ['fr', 'it', 'ca', 'co.uk', 'ru', 'co,jp', 'co.kr', 'com.au', 'co.in', 'com.br', 'com.ar', 'co.za', 'co.nz', 'es', 'se', 'nl', 'ch', 'at', 'dk', 'be', 'pl', 'fi', 'ie', 'pt', 'gr', 'tw', 'com', 'uk', 'de', 'br', 'ca', 'kr', 'mx', 'au', 'za']
        random_domain = random.choice(domains)
        url = f'https://www.google.{random_domain}/search?q={query}&num=100'
        # 请求头
        headers = {
                   'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
                   'accept-language': 'zh-CN,zh;q=0.9',
                   'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
                   'referer': 'https://www.google.com/',
                   'origin': 'https://www.google.com',
                   'Sec-Fetch-Site': 'same-origin',
                   'Sec-Fetch-Mode': 'navigate',
                   'Sec-Fetch-User': '?1',
                   'Sec-Fetch-Dest': 'document'
        }
        # 代理
        proxies = {'http': 'http://127.0.0.1:7890', 'https': 'http://127.0.0.1:7890'}
        response = requests.get(url=url, headers=headers, proxies=proxies, verify=False, timeout=5)
        soup = BeautifulSoup(response.content, 'html.parser')
        # 查找全部 div 标签
        find_div = soup.find_all('div', {'class': 'yuRUbf'})
        # 开启线程锁
        locks.acquire()
        # 加入列表
        get_url = [url.findNext('a')['href'] + '
' for url in find_div if 'google.com.tw' not in url.findNext('a')['href']]
        global url_num, dork_finish_num
        url_num += len(get_url)
        dork_finish_num += 1
        print(Fore.GREEN + f'
{now_time}[INFO]{ '-' * 10}>已获取 Url 数量:{url_num}  Dorsk 数量:{dork_finish_num} / {dork_total_num}', end='' + Fore.RESET)
        # 写入文件
        write_info(filename, get_url)
        # 释放线程锁
        locks.release()

    except TimeoutError:
        pass

# 写入文件函数
def write_info(filename, get_url):
    with open(filename, mode='a+', encoding='utf-8') as file:
        file.writelines(get_url)

if __name__ == '__main__':
    while True:
        try:
            init()  # 初始化颜色模块
            dorks_file = input(Fore.YELLOW + f'
{now_time}[INFO]{ '-' * 10}>input file:' + Fore.RESET)
            print('')
            filename = input(Fore.YELLOW + f'
{now_time}[INFO]{ '-' * 10}>output file:' + Fore.RESET)
            # 接受 work 函数返回的元组
            dork_total_num, query_list = work(dorks_file)
            # 定义全局变量完成数量/URL 数量
            dork_finish_num = url_num = 0

            # 定义进程池和线程池数量
            process_pool = ProcessPoolExecutor(max_workers=4)
            thread_pool = ThreadPoolExecutor(max_workers=20)

            # 定义全局锁
            threads_lock = Lock()

            # 分配进程池任务
            futures = []
            for dokr_list in query_list:
                # 使用 as_completed 获取结果,并设置 timeout
                future = thread_pool.submit(google_serach, dokr_list, threads_lock, filename)
                futures.append(future)
            for future in as_completed(futures, timeout=60):
                try:
                    future.result()
                except Exception as e:
                    print(e)

            process_pool.shutdown(wait=True)
            thread_pool.shutdown(wait=True)
            if len(sys.argv) == 1:
                pass
            input(Fore.YELLOW + f'

{now_time}[INFO]{'-' * 10}>程序运行完毕,按回车退出' + Fore.RESET)
            break
        # 文件为空
        except FileNotFoundError:
            print(Fore.RED + f'{now_time}[Error]{'-' * 10}>文件不存在' + Fore.RESET)
        # 中断异常
        except KeyboardInterrupt:
            sys.exit(1)

解释:

  • as_completed() 返回一个迭代器,它会按完成顺序生成 future 对象。
  • for 循环中,我们使用 future.result() 获取每个 future 对象的结果。
  • 如果出现异常,则使用 try...except 块捕获异常,并打印错误信息。

通过这种修改,我们可以正确地使用 as_completed() 函数,并在每个 future 对象完成时及时处理结果或异常。

注意:

  • timeout 参数指定了等待所有 future 对象完成的最大时间。如果超过了这个时间,as_completed() 迭代器将停止生成 future 对象。
  • 如果您需要在所有 future 对象完成后执行一些操作,可以使用 wait() 函数。

希望这篇文章能够帮助您理解并解决 as_completed() 函数的使用错误。如果您还有其他问题,请随时提问。

Python concurrent.futures.as_completed() 使用错误:TimeoutError: 65287 (of 65760) futures unfinished

原文地址: https://www.cveoy.top/t/topic/oZQm 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录