多线程下载工具:使用 HTTP、FTP 或磁力链接下载文件
import transmissionrpc import argparse import os import requests import logging from concurrent.futures import ThreadPoolExecutor from tqdm import tqdm
def download_torrent(magnet, filename): tc = transmissionrpc.Client('localhost', port=9091) tc.add_torrent(magnet, download_dir=os.path.dirname(filename)) logging.info(f'下载已开始:{filename}')
def download_file(url, filename, num_threads=4): response = requests.head(url) size = int(response.headers.get('Content-Length')) chunk_size = int(size / num_threads) + 1
with open(filename, 'wb') as f:
with tqdm(total=size, unit='B', unit_scale=True, desc=os.path.basename(filename), ncols=80, position=0) as bar:
futures = []
for i in range(num_threads):
start = i * chunk_size
end = min(start + chunk_size, size)
futures.append((start, end, f))
with ThreadPoolExecutor(max_workers=num_threads) as executor:
for future in futures:
executor.submit(download_range, url, future, bar)
logging.info(f'下载已完成:{filename}')
def download_range(url, future, bar): start, end, fileobj = future headers = {'Range': f'bytes={start}-{end-1}'} with requests.get(url, headers=headers, stream=True) as response: for chunk in response.iter_content(chunk_size=1024): if chunk: fileobj.write(chunk) bar.update(len(chunk))
def download_ftp(url, filename, num_threads=4): with requests.get(url, stream=True) as response: total_size = response.headers.get('Content-Length') if total_size: total_size = int(total_size.strip()) with tqdm(total=total_size, unit='B', unit_scale=True, desc=os.path.basename(filename), ncols=80, position=0) as bar: with open(filename, 'wb') as f: chunk_size = int(total_size / num_threads) + 1 futures = [] for i in range(num_threads): start = i * chunk_size end = min(start + chunk_size, total_size) futures.append((start, end, f, bar))
with ThreadPoolExecutor(max_workers=num_threads) as executor:
for future in futures:
executor.submit(download_range_ftp, url, future)
logging.info(f'下载已完成:{filename}')
else:
with open(filename, 'wb') as f:
f.write(response.content)
logging.info(f'下载已完成:{filename}')
def download_range_ftp(url, future): start, end, fileobj, bar = future headers = {'Range': f'bytes={start}-{end-1}'} with requests.get(url, headers=headers, stream=True) as response: for chunk in response.iter_content(chunk_size=1024): if chunk: fileobj.write(chunk) bar.update(len(chunk))
def main():
parser = argparse.ArgumentParser(description='用于下载文件的命令行工具')
parser.add_argument('-u', '--url', metavar='
if not os.path.exists(os.path.dirname(filename)):
logging.error(f'错误:文件路径 {filename} 不正确。')
return
if os.path.exists(filename):
ans = input(f'文件 {filename} 已存在,是否覆盖?(Y/N)').upper()
if ans != 'Y':
logging.info('下载已被用户取消。')
return
if args.http:
download_file(url, filename, num_threads)
elif args.ftp:
download_ftp(url, filename, num_threads)
elif args.magnet:
download_torrent(url, filename)
else:
logging.error(f'错误:不支持的下载选项')
if name == 'main': logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') main()
原文地址: https://www.cveoy.top/t/topic/oO69 著作权归作者所有。请勿转载和采集!