import os import argparse import requests import logging import re import transmissionrpc from concurrent.futures import ThreadPoolExecutor, as_completed from tqdm import tqdm

def check_dir(path): if not os.path.exists(path): os.makedirs(path) elif not os.path.isdir(path): raise argparse.ArgumentTypeError(f'{path} is not a directory') return path

def check_url(url): pattern = r'^https?://|^ftp://|^magnet:' if not re.match(pattern, url): raise argparse.ArgumentTypeError(f'{url} is not a valid url') return url

def download_file(url, filename, num_threads=4): response = requests.head(url) size = int(response.headers.get('Content-Length')) chunk_size = int(size / num_threads) + 1

with open(filename, 'wb') as f:
    with tqdm(total=size, unit='B', unit_scale=True, desc=os.path.basename(filename), ncols=80, position=0) as bar:
        futures = []
        for i in range(num_threads):
            start = i * chunk_size
            end = min(start + chunk_size, size)
            futures.append((url, start, end, f))

        with ThreadPoolExecutor(max_workers=num_threads) as executor:
            for future in as_completed(executor.submit(download_range, *future, bar) for future in futures):
                try:
                    future.result()
                except Exception as e:
                    logging.error(f'Download failed: {e}')

if os.path.getsize(filename) != size:
    os.remove(filename)
    raise Exception(f'Download incomplete: {filename}')
logging.info(f'Download completed: {filename}')

def download_range(url, start, end, fileobj, bar): headers = {'Range': f'bytes={start}-{end-1}'}
with requests.get(url, headers=headers, stream=True) as response: for chunk in response.iter_content(chunk_size=1024): if chunk: fileobj.write(chunk) bar.update(len(chunk))

def download_ftp(url, filename, num_threads=4): with requests.get(url, stream=True) as response: total_size = response.headers.get('Content-Length') if total_size: total_size = int(total_size.strip()) with tqdm(total=total_size, unit='B', unit_scale=True, desc=os.path.basename(filename), ncols=80, position=0) as bar: with open(filename, 'wb') as f: chunk_size = int(total_size / num_threads) + 1 futures = [] for i in range(num_threads): start = i * chunk_size end = min(start + chunk_size, total_size) futures.append((url, start, end, f, bar))

                with ThreadPoolExecutor(max_workers=num_threads) as executor:
                    for future in as_completed(executor.submit(download_range_ftp, *future) for future in futures):
                        try:
                            future.result()
                        except Exception as e:
                            logging.error(f'Download failed: {e}')
        if os.path.getsize(filename) != total_size:
            os.remove(filename)
            raise Exception(f'Download incomplete: {filename}')
        logging.info(f'Download completed: {filename}')
    else:
        with open(filename, 'wb') as f:
            f.write(response.content)
        logging.info(f'Download completed: {filename}')

def download_range_ftp(url, start, end, fileobj, bar): headers = {'Range': f'bytes={start}-{end-1}'}
with requests.get(url, headers=headers, stream=True) as response: for chunk in response.iter_content(chunk_size=1024): if chunk: fileobj.write(chunk) bar.update(len(chunk))

def download_torrent(magnet, filename): tc = transmissionrpc.Client('localhost', port=9091) tc.add_torrent(magnet, download_dir=os.path.dirname(filename)) logging.info(f'Download started: {filename}')

def main(): parser = argparse.ArgumentParser(description='A command-line tool for downloading files.') parser.add_argument('-u', '--url', metavar='', type=check_url, required=True, help='download url') parser.add_argument('-o', '--output', metavar='', type=str, required=True, help='output filename') parser.add_argument('-t', '--threads', metavar='<NUM_THREADS>', type=int, default=4, help='number of threads for downloading') parser.add_argument('-d', '--dir', metavar='', type=check_dir, default='.', help='output directory') args = parser.parse_args() url, filename, num_threads, output_dir = args.url, args.output, args.threads, args.dir

filename = os.path.join(output_dir, filename)
if os.path.isfile(filename):
    i = 1
    while True:
        new_filename = f'{os.path.splitext(filename)[0]}_{i}{os.path.splitext(filename)[1]}'
        if not os.path.isfile(new_filename):
            filename = new_filename
            break
        i += 1

if url.startswith('http'):
    download_file(url, filename, num_threads)
elif url.startswith('ftp'):
    download_ftp(url, filename, num_threads)
elif url.startswith('magnet'):
    download_torrent(url, filename)
else:
    logging.error(f'The url {url} does not start with http, ftp or magnet.')

if name == 'main': logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') main()

多线程文件下载工具 - 支持HTTP、FTP、磁力链接

原文地址: https://www.cveoy.top/t/topic/oP1c 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录