import os
import argparse
import requests
import logging
import re
import transmissionrpc
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm


def check_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)
    elif not os.path.isdir(path):
        raise argparse.ArgumentTypeError(f'{path} is not a directory')
    return path


def check_url(url):
    pattern = r'^https?://|^ftp://|^magnet:'
    if not re.match(pattern, url):
        raise argparse.ArgumentTypeError(f'{url} is not a valid url')
    return url


def download_file(url, filename, num_threads=4):
    response = requests.head(url)
    size = int(response.headers.get('Content-Length'))
    chunk_size = int(size / num_threads) + 1

    with open(filename, 'wb') as f:
        with tqdm(total=size, unit='B', unit_scale=True, desc=os.path.basename(filename), ncols=80, position=0) as bar:
            futures = []
            for i in range(num_threads):
                start = i * chunk_size
                end = min(start + chunk_size, size)
                futures.append((url, start, end, f))

            with ThreadPoolExecutor(max_workers=num_threads) as executor:
                for future in as_completed(executor.submit(download_range, *future, bar) for future in futures):
                    try:
                        future.result()
                    except Exception as e:
                        logging.error(f'Download failed: {e}')

    logging.info(f'Download completed: {filename}')


def download_range(url, start, end, fileobj, bar):
    headers = {'Range': f'bytes={start}-{end-1}'}
    with requests.get(url, headers=headers, stream=True) as response:
        for chunk in response.iter_content(chunk_size=1024):
            if chunk:
                fileobj.write(chunk)
                bar.update(len(chunk))


def download_ftp(url, filename, num_threads=4):
    with requests.get(url, stream=True) as response:
        total_size = response.headers.get('Content-Length')
        if total_size:
            total_size = int(total_size.strip())
            with tqdm(total=total_size, unit='B', unit_scale=True, desc=os.path.basename(filename), ncols=80, position=0) as bar:
                with open(filename, 'wb') as f:
                    chunk_size = int(total_size / num_threads) + 1
                    futures = []
                    for i in range(num_threads):
                        start = i * chunk_size
                        end = min(start + chunk_size, total_size)
                        futures.append((url, start, end, f, bar))

                    with ThreadPoolExecutor(max_workers=num_threads) as executor:
                        for future in as_completed(executor.submit(download_range_ftp, *future) for future in futures):
                            try:
                                future.result()
                            except Exception as e:
                                logging.error(f'Download failed: {e}')
            logging.info(f'Download completed: {filename}')
        else:
            with open(filename, 'wb') as f:
                f.write(response.content)
            logging.info(f'Download completed: {filename}')


def download_range_ftp(url, start, end, fileobj, bar):
    headers = {'Range': f'bytes={start}-{end-1}'}
    with requests.get(url, headers=headers, stream=True) as response:
        for chunk in response.iter_content(chunk_size=1024):
            if chunk:
                fileobj.write(chunk)
                bar.update(len(chunk))


def download_torrent(magnet, filename):
    tc = transmissionrpc.Client('localhost', port=9091)
    tc.add_torrent(magnet, download_dir=os.path.dirname(filename))
    logging.info(f'Download started: {filename}')


def main():
    parser = argparse.ArgumentParser(description='A command-line tool for downloading files.')
    parser.add_argument('-u', '--url', metavar='<URL>', type=check_url, required=True, help='download url')
    parser.add_argument('-o', '--output', metavar='<FILENAME>', type=str, required=True, help='output filename')
    parser.add_argument('-t', '--threads', metavar='<NUM_THREADS>', type=int, default=4, help='number of threads for downloading')
    parser.add_argument('-d', '--dir', metavar='<DIRECTORY>', type=check_dir, default='.', help='output directory')
    args = parser.parse_args()
    url, filename, num_threads, output_dir = args.url, args.output, args.threads, args.dir

    filename = os.path.join(output_dir, filename)
    if os.path.isfile(filename):
        i = 1
        while True:
            new_filename = f'{os.path.splitext(filename)[0]}_{i}{os.path.splitext(filename)[1]}'
            if not os.path.isfile(new_filename):
                filename = new_filename
                break
            i += 1

    if url.startswith('http'):
        download_file(url, filename, num_threads)
    elif url.startswith('ftp'):
        download_ftp(url, filename, num_threads)
    elif url.startswith('magnet'):
        download_torrent(url, filename)
    else:
        logging.error(f'The url {url} does not start with http, ftp or magnet.')


if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
    main()

该工具支持使用多线程下载文件,并提供进度条和错误日志记录功能。

使用方式:

python download.py -u <URL> -o <FILENAME> -t <NUM_THREADS> -d <DIRECTORY>

参数说明:

  • -u--url: 下载 URL
  • -o--output: 输出文件名
  • -t--threads: 下载线程数 (默认 4)
  • -d--dir: 输出目录 (默认当前目录)

示例:

python download.py -u https://example.com/file.zip -o file.zip -t 8 -d /path/to/download/dir

注意事项:

  • 该工具需要安装 requeststransmissionrpctqdmargparselogging 库。
  • 使用磁力链接下载时,需要安装 transmission-daemon 并确保 transmissionrpc 库能够连接到它。
  • 该工具支持 HTTP、FTP 和磁力链接下载,不支持其他协议。
  • 下载速度受网络状况影响。
  • 下载完成后,文件会保存到指定目录。

使用建议:

  • 在使用前,请先了解目标网站的下载限制。
  • 选择合适的下载线程数,避免占用过多系统资源。
  • 可以根据需要修改代码,添加其他功能。
多线程文件下载工具:支持HTTP、FTP和磁力链接

原文地址: https://www.cveoy.top/t/topic/oP08 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录