Python 并发编程:使用 `wait` 函数解决 `as_completed` 超时问题
Python 并发编程:使用 wait 函数解决 as_completed 超时问题
在使用 Python 的 concurrent.futures 模块进行并发编程时,as_completed 函数可以方便地管理多个任务并获取已完成的任务。然而,当使用 as_completed 函数时,可能会遇到超时问题。
问题描述:
for future in as_completed(futures, timeout=60):
File 'C:\Program Files\Python310\lib\concurrent\futures\_base.py', line 241, in as_completed
raise TimeoutError(
concurrent.futures._base.TimeoutError: 65287 (of 65760) futures unfinished
这个错误提示表明在 60 秒的超时时间内,只有部分任务完成。如果需要等待所有任务完成,则可以使用 wait 函数代替 as_completed 函数。
修改后的代码:
import requests
import time
import os
import urllib3
import sys
import random
from bs4 import BeautifulSoup
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, wait # 修改导入的模块
from threading import Lock
from colorama import Fore, init
now_time = time.strftime('%Y-%m-%d %H-%M')
# 读取Dorks
def work(dorks):
with open(dorks, mode='r', encoding='utf-8') as file:
read_content = file.readlines()
# 将内容加入列表
content = [result.strip() for result in read_content]
# 返回数量丢给任务池
return len(read_content), content
# Google搜索
def google_serach(query, locks, filename):
try:
# 关闭HTTPS报错信息
urllib3.disable_warnings()
filename = os.path.join(os.getcwd(), f'{filename}.txt')
domains = ['fr', 'it', 'ca', 'co.uk', 'ru', 'co,jp', 'co.kr', 'com.au', 'co.in', 'com.br', 'com.ar', 'co.za', 'co.nz', 'es', 'se', 'nl', 'ch', 'at', 'dk', 'be', 'pl', 'fi', 'ie', 'pt', 'gr', 'tw', 'com', 'uk', 'de', 'br', 'ca', 'kr', 'mx', 'au', 'za']
random_domain = random.choice(domains)
url = f'https://www.google.{random_domain}/search?q={query}&num=100'
# 请求头
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
'accept-language': 'zh-CN,zh;q=0.9',
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'referer': 'https://www.google.com/',
'origin': 'https://www.google.com',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-User': '?1',
'Sec-Fetch-Dest': 'document'
}
# 代理
proxies = {'http': 'http://127.0.0.1:7890', 'https': 'http://127.0.0.1:7890'}
response = requests.get(url=url, headers=headers, proxies=proxies, verify=False, timeout=5)
soup = BeautifulSoup(response.content, 'html.parser')
# 查找全部div标签
find_div = soup.find_all('div', {'class': 'yuRUbf'})
# 开启线程锁
locks.acquire()
# 加入列表
get_url = [url.findNext('a')['href'] + '
' for url in find_div if 'google.com.tw' not in url.findNext('a')['href']]
global url_num, dork_finish_num
url_num += len(get_url)
dork_finish_num += 1
print(Fore.GREEN + f'
{now_time}[INFO]{ '-' * 10}>已获取Url数量:{url_num} Dorsk数量:{dork_finish_num} / {dork_total_num}', end='' + Fore.RESET)
# 写入文件
write_info(filename, get_url)
# 释放线程锁
locks.release()
except TimeoutError:
pass
# 写入文件函数
def write_info(filename, get_url):
with open(filename, mode='a+', encoding='utf-8') as file:
file.writelines(get_url)
if __name__ == '__main__':
while True:
try:
init() # 初始化颜色模块
dorks_file = input(Fore.YELLOW + f'
{now_time}[INFO]{ '-' * 10}>input file:' + Fore.RESET)
print('')
filename = input(Fore.YELLOW + f'
{now_time}[INFO]{ '-' * 10}>output file:' + Fore.RESET)
# 接受work函数返回的元组
dork_total_num, query_list = work(dorks_file)
# 定义全局变量完成数量/URL数量
dork_finish_num = url_num = 0
# 定义进程池和线程池数量
process_pool = ProcessPoolExecutor(max_workers=4)
thread_pool = ThreadPoolExecutor(max_workers=20)
# 定义全局锁
threads_lock = Lock()
# 分配进程池任务
futures = []
for dokr_list in query_list:
future = thread_pool.submit(google_serach, dokr_list, threads_lock, filename)
futures.append(future)
# 等待所有任务完成
wait(futures, timeout=60)
process_pool.shutdown(wait=True)
thread_pool.shutdown(wait=True)
if len(sys.argv) == 1:
pass
input(Fore.YELLOW + f'
{now_time}[INFO]{'-' * 10}>程序运行完毕,按回车退出' + Fore.RESET)
break
# 文件为空
except FileNotFoundError:
print(Fore.RED + f'{now_time}[Error]{'-' * 10}>文件不存在' + Fore.RESET)
# 中断异常
except KeyboardInterrupt:
sys.exit(1)
代码说明:
- 将
as_completed函数替换为wait函数,并设置超时时间为 60 秒。 - 使用
wait函数可以等待所有任务完成,即使部分任务超时,也不会抛出TimeoutError错误。
总结:
在使用 concurrent.futures 模块进行并发编程时,如果需要等待所有任务完成,即使部分任务超时,可以使用 wait 函数代替 as_completed 函数。wait 函数可以避免 TimeoutError 错误,并确保所有任务都得到处理。
注意:
使用 wait 函数会阻塞主线程,直到所有任务完成或超时。如果需要在等待任务完成时进行其他操作,可以使用 as_completed 函数,并设置超时时间,并在超时时间内处理未完成的任务。
原文地址: https://www.cveoy.top/t/topic/oZQv 著作权归作者所有。请勿转载和采集!