多线程文件下载工具:支持HTTP、FTP和磁力链接
import os
import argparse
import requests
import logging
import re
import transmissionrpc
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
def check_dir(path):
if not os.path.exists(path):
os.makedirs(path)
elif not os.path.isdir(path):
raise argparse.ArgumentTypeError(f'{path} is not a directory')
return path
def check_url(url):
pattern = r'^https?://|^ftp://|^magnet:'
if not re.match(pattern, url):
raise argparse.ArgumentTypeError(f'{url} is not a valid url')
return url
def download_file(url, filename, num_threads=4):
response = requests.head(url)
size = int(response.headers.get('Content-Length'))
chunk_size = int(size / num_threads) + 1
with open(filename, 'wb') as f:
with tqdm(total=size, unit='B', unit_scale=True, desc=os.path.basename(filename), ncols=80, position=0) as bar:
futures = []
for i in range(num_threads):
start = i * chunk_size
end = min(start + chunk_size, size)
futures.append((url, start, end, f))
with ThreadPoolExecutor(max_workers=num_threads) as executor:
for future in as_completed(executor.submit(download_range, *future, bar) for future in futures):
try:
future.result()
except Exception as e:
logging.error(f'Download failed: {e}')
logging.info(f'Download completed: {filename}')
def download_range(url, start, end, fileobj, bar):
headers = {'Range': f'bytes={start}-{end-1}'}
with requests.get(url, headers=headers, stream=True) as response:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
fileobj.write(chunk)
bar.update(len(chunk))
def download_ftp(url, filename, num_threads=4):
with requests.get(url, stream=True) as response:
total_size = response.headers.get('Content-Length')
if total_size:
total_size = int(total_size.strip())
with tqdm(total=total_size, unit='B', unit_scale=True, desc=os.path.basename(filename), ncols=80, position=0) as bar:
with open(filename, 'wb') as f:
chunk_size = int(total_size / num_threads) + 1
futures = []
for i in range(num_threads):
start = i * chunk_size
end = min(start + chunk_size, total_size)
futures.append((url, start, end, f, bar))
with ThreadPoolExecutor(max_workers=num_threads) as executor:
for future in as_completed(executor.submit(download_range_ftp, *future) for future in futures):
try:
future.result()
except Exception as e:
logging.error(f'Download failed: {e}')
logging.info(f'Download completed: {filename}')
else:
with open(filename, 'wb') as f:
f.write(response.content)
logging.info(f'Download completed: {filename}')
def download_range_ftp(url, start, end, fileobj, bar):
headers = {'Range': f'bytes={start}-{end-1}'}
with requests.get(url, headers=headers, stream=True) as response:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
fileobj.write(chunk)
bar.update(len(chunk))
def download_torrent(magnet, filename):
tc = transmissionrpc.Client('localhost', port=9091)
tc.add_torrent(magnet, download_dir=os.path.dirname(filename))
logging.info(f'Download started: {filename}')
def main():
parser = argparse.ArgumentParser(description='A command-line tool for downloading files.')
parser.add_argument('-u', '--url', metavar='<URL>', type=check_url, required=True, help='download url')
parser.add_argument('-o', '--output', metavar='<FILENAME>', type=str, required=True, help='output filename')
parser.add_argument('-t', '--threads', metavar='<NUM_THREADS>', type=int, default=4, help='number of threads for downloading')
parser.add_argument('-d', '--dir', metavar='<DIRECTORY>', type=check_dir, default='.', help='output directory')
args = parser.parse_args()
url, filename, num_threads, output_dir = args.url, args.output, args.threads, args.dir
filename = os.path.join(output_dir, filename)
if os.path.isfile(filename):
i = 1
while True:
new_filename = f'{os.path.splitext(filename)[0]}_{i}{os.path.splitext(filename)[1]}'
if not os.path.isfile(new_filename):
filename = new_filename
break
i += 1
if url.startswith('http'):
download_file(url, filename, num_threads)
elif url.startswith('ftp'):
download_ftp(url, filename, num_threads)
elif url.startswith('magnet'):
download_torrent(url, filename)
else:
logging.error(f'The url {url} does not start with http, ftp or magnet.')
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
main()
该工具支持使用多线程下载文件,并提供进度条和错误日志记录功能。
使用方式:
python download.py -u <URL> -o <FILENAME> -t <NUM_THREADS> -d <DIRECTORY>
参数说明:
-u或--url: 下载 URL-o或--output: 输出文件名-t或--threads: 下载线程数 (默认 4)-d或--dir: 输出目录 (默认当前目录)
示例:
python download.py -u https://example.com/file.zip -o file.zip -t 8 -d /path/to/download/dir
注意事项:
- 该工具需要安装
requests、transmissionrpc、tqdm、argparse、logging库。 - 使用磁力链接下载时,需要安装
transmission-daemon并确保transmissionrpc库能够连接到它。 - 该工具支持 HTTP、FTP 和磁力链接下载,不支持其他协议。
- 下载速度受网络状况影响。
- 下载完成后,文件会保存到指定目录。
使用建议:
- 在使用前,请先了解目标网站的下载限制。
- 选择合适的下载线程数,避免占用过多系统资源。
- 可以根据需要修改代码,添加其他功能。
原文地址: https://www.cveoy.top/t/topic/oP08 著作权归作者所有。请勿转载和采集!