多线程文件下载工具 - 支持HTTP、FTP、磁力链接
import os import argparse import requests import logging import re import transmissionrpc from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm
def check_dir(path): if not os.path.exists(path): os.makedirs(path) elif not os.path.isdir(path): raise argparse.ArgumentTypeError(f'{path} is not a directory') return path
def check_url(url): pattern = r'^https?://|^ftp://|^magnet:' if not re.match(pattern, url): raise argparse.ArgumentTypeError(f'{url} is not a valid url') return url
def download_file(url, filename, num_threads=4): response = requests.head(url) size = int(response.headers.get('Content-Length')) chunk_size = int(size / num_threads) + 1
with open(filename, 'wb') as f:
with tqdm(total=size, unit='B', unit_scale=True, desc=os.path.basename(filename), ncols=80, position=0) as bar:
futures = []
for i in range(num_threads):
start = i * chunk_size
end = min(start + chunk_size, size)
futures.append((url, start, end, f))
with ThreadPoolExecutor(max_workers=num_threads) as executor:
for future in as_completed(executor.submit(download_range, *future, bar) for future in futures):
try:
future.result()
except Exception as e:
logging.error(f'Download failed: {e}')
if os.path.getsize(filename) != size:
os.remove(filename)
raise Exception(f'Download incomplete: {filename}')
logging.info(f'Download completed: {filename}')
def download_range(url, start, end, fileobj, bar):
headers = {'Range': f'bytes={start}-{end-1}'}
with requests.get(url, headers=headers, stream=True) as response:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
fileobj.write(chunk)
bar.update(len(chunk))
def download_ftp(url, filename, num_threads=4): with requests.get(url, stream=True) as response: total_size = response.headers.get('Content-Length') if total_size: total_size = int(total_size.strip()) with tqdm(total=total_size, unit='B', unit_scale=True, desc=os.path.basename(filename), ncols=80, position=0) as bar: with open(filename, 'wb') as f: chunk_size = int(total_size / num_threads) + 1 futures = [] for i in range(num_threads): start = i * chunk_size end = min(start + chunk_size, total_size) futures.append((url, start, end, f, bar))
with ThreadPoolExecutor(max_workers=num_threads) as executor:
for future in as_completed(executor.submit(download_range_ftp, *future) for future in futures):
try:
future.result()
except Exception as e:
logging.error(f'Download failed: {e}')
if os.path.getsize(filename) != total_size:
os.remove(filename)
raise Exception(f'Download incomplete: {filename}')
logging.info(f'Download completed: {filename}')
else:
with open(filename, 'wb') as f:
f.write(response.content)
logging.info(f'Download completed: {filename}')
def download_range_ftp(url, start, end, fileobj, bar):
headers = {'Range': f'bytes={start}-{end-1}'}
with requests.get(url, headers=headers, stream=True) as response:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
fileobj.write(chunk)
bar.update(len(chunk))
def download_torrent(magnet, filename): tc = transmissionrpc.Client('localhost', port=9091) tc.add_torrent(magnet, download_dir=os.path.dirname(filename)) logging.info(f'Download started: {filename}')
def main():
parser = argparse.ArgumentParser(description='A command-line tool for downloading files.')
parser.add_argument('-u', '--url', metavar='
filename = os.path.join(output_dir, filename)
if os.path.isfile(filename):
i = 1
while True:
new_filename = f'{os.path.splitext(filename)[0]}_{i}{os.path.splitext(filename)[1]}'
if not os.path.isfile(new_filename):
filename = new_filename
break
i += 1
if url.startswith('http'):
download_file(url, filename, num_threads)
elif url.startswith('ftp'):
download_ftp(url, filename, num_threads)
elif url.startswith('magnet'):
download_torrent(url, filename)
else:
logging.error(f'The url {url} does not start with http, ftp or magnet.')
if name == 'main': logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') main()
原文地址: https://www.cveoy.top/t/topic/oP1c 著作权归作者所有。请勿转载和采集!