Python 下载工具:支持 HTTP、FTP、磁力链接下载
import transmissionrpc
import argparse
import os
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
def download_torrent(magnet, filename):
tc = transmissionrpc.Client('localhost', port=9091)
tc.add_torrent(magnet, download_dir=os.path.dirname(filename))
print(f'Download started: {filename}')
def download_file(url, filename, num_threads=4):
response = urllib.request.urlopen(url)
size = int(response.getheader('Content-Length'))
chunk_size = int(size / num_threads) + 1
with open(filename, 'wb') as f:
with tqdm(total=size, unit='B', unit_scale=True, desc=os.path.basename(filename), ncols=80, position=0) as bar:
futures = []
for i in range(num_threads):
start = i * chunk_size
end = min(start + chunk_size, size)
futures.append((start, end, f))
with ThreadPoolExecutor(max_workers=num_threads) as executor:
for future in futures:
executor.submit(download_range, url, future, bar)
print(f'Download complete: {filename}')
def download_range(url, future, bar):
start, end, fileobj = future
req = urllib.request.Request(url, headers={'Range': f'bytes={start}-{end-1}'})
with urllib.request.urlopen(req) as response:
fileobj.seek(start)
while True:
chunk = response.read(1024)
if not chunk:
break
fileobj.write(chunk)
bar.update(len(chunk))
def download_ftp(url, filename, num_threads=4):
with urllib.request.urlopen(url) as response:
total_size = response.getheader('Content-Length')
if total_size:
total_size = int(total_size.strip())
with tqdm(total=total_size, unit='B', unit_scale=True, desc=os.path.basename(filename), ncols=80, position=0) as bar:
with open(filename, 'wb') as f:
chunk_size = int(total_size / num_threads) + 1
futures = []
for i in range(num_threads):
start = i * chunk_size
end = min(start + chunk_size, total_size)
futures.append((start, end, f, bar))
with ThreadPoolExecutor(max_workers=num_threads) as executor:
for future in futures:
executor.submit(download_range_ftp, url, future)
print(f'Download complete: {filename}')
else:
with open(filename, 'wb') as f:
f.write(response.read())
print(f'Download complete: {filename}')
def download_range_ftp(url, future):
start, end, fileobj, bar = future
req = urllib.request.Request(url)
req.headers['Range'] = f'bytes={start}-{end-1}'
with urllib.request.urlopen(req) as response:
while True:
chunk = response.read(1024)
if not chunk:
break
fileobj.write(chunk)
bar.update(len(chunk))
def main():
parser = argparse.ArgumentParser(description='A command-line tool for downloading files')
parser.add_argument('-u', '--url', metavar='<URL>', type=str, required=True, help='The download URL')
parser.add_argument('-o', '--output', metavar='<FILENAME>', type=str, required=True, help='The output filename')
parser.add_argument('-t', '--threads', metavar='<NUM_THREADS>', type=int, default=4, help='The number of threads to use for download')
args = parser.parse_args()
url, filename, num_threads = args.url, args.output, args.threads
if not os.path.isfile(filename):
if os.path.isdir(os.path.dirname(filename)):
if url.startswith('ftp'):
download_ftp(url, filename, num_threads)
elif url.startswith('http') or url.startswith('https'):
download_file(url, filename, num_threads)
elif url.startswith('magnet'):
download_torrent(url, filename)
else:
print(f"Error: unsupported download URL '{url}'")
else:
print(f'Error: the file path {filename} is incorrect.')
else:
ans = input(f'The file {filename} exists, do you want to overwrite it? (Y/N)').upper()
if ans == 'Y':
os.remove(filename)
if url.startswith('ftp'):
download_ftp(url, filename, num_threads)
elif url.startswith('http') or url.startswith('https'):
download_file(url, filename, num_threads)
elif url.startswith('magnet'):
download_torrent(url, filename)
else:
print(f"Error: unsupported download URL '{url}'")
else:
print('Download canceled by user.')
if __name__ == '__main__':
main()
This code provides a command-line tool for downloading files using Python. It supports three types of download URLs:
- HTTP/HTTPS: Uses multi-threaded download for faster speed.
- FTP: Supports multi-threaded download for FTP resources.
- Magnet links: Uses the
transmissionrpclibrary to download torrents via Transmission daemon.
Usage:
python downloader.py -u <URL> -o <OUTPUT_FILENAME> [-t <NUM_THREADS>]
Example:
python downloader.py -u https://www.example.com/file.zip -o downloads/file.zip -t 8
python downloader.py -u ftp://ftp.example.com/file.txt -o downloads/file.txt
python downloader.py -u magnet:?xt=urn:btih:XXXXXXXXXXXXXXXXXXXXXXXX -o downloads/torrent.file
Features:
- Multi-threaded download for faster speed.
- Progress bar for monitoring download progress.
- Handles file overwrite with user confirmation.
- Supports both HTTP/HTTPS and FTP download protocols.
- Supports downloading torrents using Transmission daemon.
Requirements:
- Python 3.x
transmissionrpcargparseurllib.requestconcurrent.futurestqdm
Installation:
pip install transmissionrpc argparse urllib3 tqdm
Note:
- To use the torrent download functionality, you need to have Transmission daemon installed and running.
- The code uses f-strings, so Python 3.6 or higher is required.
- The code is provided as-is and without any warranty. Use at your own risk.
原文地址: https://www.cveoy.top/t/topic/oKZ9 著作权归作者所有。请勿转载和采集!