import transmissionrpc import argparse import os import requests import logging from concurrent.futures import ThreadPoolExecutor from tqdm import tqdm

def download_torrent(magnet, filename): tc = transmissionrpc.Client('localhost', port=9091) tc.add_torrent(magnet, download_dir=os.path.dirname(filename)) logging.info(f'下载已开始:{filename}')

def download_file(url, filename, num_threads=4): response = requests.head(url) size = int(response.headers.get('Content-Length')) chunk_size = int(size / num_threads) + 1

with open(filename, 'wb') as f:
    with tqdm(total=size, unit='B', unit_scale=True, desc=os.path.basename(filename), ncols=80, position=0) as bar:
        futures = []
        for i in range(num_threads):
            start = i * chunk_size
            end = min(start + chunk_size, size)
            futures.append((start, end, f))

        with ThreadPoolExecutor(max_workers=num_threads) as executor:
            for future in futures:
                executor.submit(download_range, url, future, bar)

logging.info(f'下载已完成:{filename}')

def download_range(url, future, bar): start, end, fileobj = future headers = {'Range': f'bytes={start}-{end-1}'} with requests.get(url, headers=headers, stream=True) as response: for chunk in response.iter_content(chunk_size=1024): if chunk: fileobj.write(chunk) bar.update(len(chunk))

def download_ftp(url, filename, num_threads=4): with requests.get(url, stream=True) as response: total_size = response.headers.get('Content-Length') if total_size: total_size = int(total_size.strip()) with tqdm(total=total_size, unit='B', unit_scale=True, desc=os.path.basename(filename), ncols=80, position=0) as bar: with open(filename, 'wb') as f: chunk_size = int(total_size / num_threads) + 1 futures = [] for i in range(num_threads): start = i * chunk_size end = min(start + chunk_size, total_size) futures.append((start, end, f, bar))

                with ThreadPoolExecutor(max_workers=num_threads) as executor:
                    for future in futures:
                        executor.submit(download_range_ftp, url, future)
        logging.info(f'下载已完成:{filename}')
    else:
        with open(filename, 'wb') as f:
            f.write(response.content)
        logging.info(f'下载已完成:{filename}')

def download_range_ftp(url, future): start, end, fileobj, bar = future headers = {'Range': f'bytes={start}-{end-1}'} with requests.get(url, headers=headers, stream=True) as response: for chunk in response.iter_content(chunk_size=1024): if chunk: fileobj.write(chunk) bar.update(len(chunk))

def main(): parser = argparse.ArgumentParser(description='用于下载文件的命令行工具') parser.add_argument('-u', '--url', metavar='', type=str, required=True, help='下载链接') parser.add_argument('-o', '--output', metavar='', type=str, required=True, help='输出文件名') parser.add_argument('-t', '--threads', metavar='<NUM_THREADS>', type=int, default=4, help='下载使用的线程数') group = parser.add_mutually_exclusive_group(required=True) group.add_argument('--http', action='store_true', help='使用HTTP下载') group.add_argument('--ftp', action='store_true', help='使用FTP下载') group.add_argument('--magnet', action='store_true', help='使用磁力链接下载') args = parser.parse_args() url, filename, num_threads = args.url, args.output, args.threads

if not os.path.exists(os.path.dirname(filename)):
    logging.error(f'错误:文件路径 {filename} 不正确。')
    return

if os.path.exists(filename):
    ans = input(f'文件 {filename} 已存在,是否覆盖?(Y/N)').upper()
    if ans != 'Y':
        logging.info('下载已被用户取消。')
        return

if args.http:
    download_file(url, filename, num_threads)
elif args.ftp:
    download_ftp(url, filename, num_threads)
elif args.magnet:
    download_torrent(url, filename)
else:
    logging.error(f'错误:不支持的下载选项')

if name == 'main': logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') main()

多线程下载工具:使用 HTTP、FTP 或磁力链接下载文件

原文地址: https://www.cveoy.top/t/topic/oO69 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录