Python 多线程下载文件工具
import sys import os import math import time import requests import urllib.request import threading from tqdm import tqdm
class DownloadThread(threading.Thread): def init(self, url, start, end, fileobj, bar): threading.Thread.init(self) self.url = url self.start = start self.end = end self.fileobj = fileobj self.bar = bar
def run(self):
headers = {'Range': 'bytes=%d-%d' % (self.start, self.end)}
response = requests.get(self.url, headers=headers, stream=True)
for chunk in response.iter_content(chunk_size=1024):
if not chunk:
break
self.fileobj.write(chunk)
self.bar.update(len(chunk))
def download_file(url:str, filename:str, num_threads:int=4) -> None: response = requests.get(url, stream=True) if 'Content-Length' in response.headers: file_size = int(response.headers['Content-Length']) else: print('The file size is unknown, use single thread download.') download_single_thread(url=url, filename=filename) return
# 判断文件大小是否为0
if file_size == 0:
raise ValueError('Failed to get file size, please check the download URL')
with open(filename, 'wb') as file:
with tqdm(total=file_size, unit='B', unit_scale=True,
desc=os.path.basename(filename), ncols=80, position=0) as bar:
download_time = 0
chunk_size = 1024
avg_speed = 0
last_speed = 0
while True:
start = time.time()
chunk = response.raw.read(chunk_size*num_threads)
end = time.time()
if not chunk:
break
used_time = end - start
download_time += used_time
speed = chunk_size*num_threads/used_time
avg_speed = (avg_speed + speed) / 2
if avg_speed > 0:
last_speed = avg_speed
num_threads = int(file_size / avg_speed / chunk_size) + 1
num_threads = min(max(num_threads, 1), 32)
thread_pool = []
start = 0
end = -1
for i in range(num_threads):
start = end + 1
end = start + chunk_size - 1
if i == num_threads - 1:
end = file_size - 1
thread = DownloadThread(url=url, start=start, end=end, fileobj=file, bar=bar)
thread.setDaemon(True)
thread_pool.append(thread)
for thread in thread_pool:
thread.start()
for thread in thread_pool:
thread.join()
bar.update(chunk_size*num_threads)
bar.close()
return filename
def download_single_thread(url:str, filename:str) -> None: urllib.request.urlretrieve(url, filename) return filename
def download(input_str:str, filename:str, num_threads:int=4) -> None: try: if os.path.isdir(filename): output_filename = os.path.join(filename, os.path.basename(input_str)) else: output_filename = filename
if input_str.startswith('magnet:'):
download_magnet(input_str, output_filename)
else:
download_file(input_str, output_filename, num_threads)
print(f'Download succeed! The file is saved as: {output_filename}')
except Exception as e:
print('Download failed, exception:', e)
if name == 'main': if len(sys.argv) < 3: print('Usage: python download.py url output_file') sys.exit(1)
url = sys.argv[1]
filename = os.path.join('.', 'Download', sys.argv[2])
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
download(input_str=url, filename=filename, num_threads=4)
原文地址: https://www.cveoy.top/t/topic/oHOH 著作权归作者所有。请勿转载和采集!