Python 并发编程:使用 wait 函数解决 as_completed 超时问题

在使用 Python 的 concurrent.futures 模块进行并发编程时,as_completed 函数可以方便地管理多个任务并获取已完成的任务。然而,当使用 as_completed 函数时,可能会遇到超时问题。

问题描述:

for future in as_completed(futures, timeout=60):
  File 'C:\Program Files\Python310\lib\concurrent\futures\_base.py', line 241, in as_completed
    raise TimeoutError(
concurrent.futures._base.TimeoutError: 65287 (of 65760) futures unfinished

这个错误提示表明在 60 秒的超时时间内,只有部分任务完成。如果需要等待所有任务完成,则可以使用 wait 函数代替 as_completed 函数。

修改后的代码:

import requests
import time
import os
import urllib3
import sys
import random
from bs4 import BeautifulSoup
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, wait  # 修改导入的模块
from threading import Lock
from colorama import Fore, init

now_time = time.strftime('%Y-%m-%d %H-%M')

# 读取Dorks
def work(dorks):
    with open(dorks, mode='r', encoding='utf-8') as file:
        read_content = file.readlines()
        # 将内容加入列表
        content = [result.strip() for result in read_content]
        # 返回数量丢给任务池
        return len(read_content), content

# Google搜索
def google_serach(query, locks, filename):
    try:
        # 关闭HTTPS报错信息
        urllib3.disable_warnings()
        filename = os.path.join(os.getcwd(), f'{filename}.txt')
        domains = ['fr', 'it', 'ca', 'co.uk', 'ru', 'co,jp', 'co.kr', 'com.au', 'co.in', 'com.br', 'com.ar', 'co.za', 'co.nz', 'es', 'se', 'nl', 'ch', 'at', 'dk', 'be', 'pl', 'fi', 'ie', 'pt', 'gr', 'tw', 'com', 'uk', 'de', 'br', 'ca', 'kr', 'mx', 'au', 'za']
        random_domain = random.choice(domains)
        url = f'https://www.google.{random_domain}/search?q={query}&num=100'
        # 请求头
        headers = {
            'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
            'accept-language': 'zh-CN,zh;q=0.9',
            'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
            'referer': 'https://www.google.com/',
            'origin': 'https://www.google.com',
            'Sec-Fetch-Site': 'same-origin',
            'Sec-Fetch-Mode': 'navigate',
            'Sec-Fetch-User': '?1',
            'Sec-Fetch-Dest': 'document'
        }
        # 代理
        proxies = {'http': 'http://127.0.0.1:7890', 'https': 'http://127.0.0.1:7890'}
        response = requests.get(url=url, headers=headers, proxies=proxies, verify=False, timeout=5)
        soup = BeautifulSoup(response.content, 'html.parser')
        # 查找全部div标签
        find_div = soup.find_all('div', {'class': 'yuRUbf'})
        # 开启线程锁
        locks.acquire()
        # 加入列表
        get_url = [url.findNext('a')['href'] + '
' for url in find_div if 'google.com.tw' not in url.findNext('a')['href']]
        global url_num, dork_finish_num
        url_num += len(get_url)
        dork_finish_num += 1
        print(Fore.GREEN + f'
{now_time}[INFO]{ '-' * 10}>已获取Url数量:{url_num}  Dorsk数量:{dork_finish_num} / {dork_total_num}', end='' + Fore.RESET)
        # 写入文件
        write_info(filename, get_url)
        # 释放线程锁
        locks.release()

    except TimeoutError:
        pass

# 写入文件函数
def write_info(filename, get_url):
    with open(filename, mode='a+', encoding='utf-8') as file:
        file.writelines(get_url)

if __name__ == '__main__':
    while True:
        try:
            init()  # 初始化颜色模块
            dorks_file = input(Fore.YELLOW + f'
{now_time}[INFO]{ '-' * 10}>input file:' + Fore.RESET)
            print('')
            filename = input(Fore.YELLOW + f'
{now_time}[INFO]{ '-' * 10}>output file:' + Fore.RESET)
            # 接受work函数返回的元组
            dork_total_num, query_list = work(dorks_file)
            # 定义全局变量完成数量/URL数量
            dork_finish_num = url_num = 0

            # 定义进程池和线程池数量
            process_pool = ProcessPoolExecutor(max_workers=4)
            thread_pool = ThreadPoolExecutor(max_workers=20)

            # 定义全局锁
            threads_lock = Lock()

            # 分配进程池任务
            futures = []
            for dokr_list in query_list:
                future = thread_pool.submit(google_serach, dokr_list, threads_lock, filename)
                futures.append(future)

            # 等待所有任务完成
            wait(futures, timeout=60)

            process_pool.shutdown(wait=True)
            thread_pool.shutdown(wait=True)
            if len(sys.argv) == 1:
                pass
            input(Fore.YELLOW + f'

{now_time}[INFO]{'-' * 10}>程序运行完毕,按回车退出' + Fore.RESET)
            break
        # 文件为空
        except FileNotFoundError:
            print(Fore.RED + f'{now_time}[Error]{'-' * 10}>文件不存在' + Fore.RESET)
        # 中断异常
        except KeyboardInterrupt:
            sys.exit(1)

代码说明:

  1. as_completed 函数替换为 wait 函数,并设置超时时间为 60 秒。
  2. 使用 wait 函数可以等待所有任务完成,即使部分任务超时,也不会抛出 TimeoutError 错误。

总结:

在使用 concurrent.futures 模块进行并发编程时,如果需要等待所有任务完成,即使部分任务超时,可以使用 wait 函数代替 as_completed 函数。wait 函数可以避免 TimeoutError 错误,并确保所有任务都得到处理。

注意:

使用 wait 函数会阻塞主线程,直到所有任务完成或超时。如果需要在等待任务完成时进行其他操作,可以使用 as_completed 函数,并设置超时时间,并在超时时间内处理未完成的任务。

Python 并发编程:使用 `wait` 函数解决 `as_completed` 超时问题

原文地址: https://www.cveoy.top/t/topic/oZQv 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录