import osimport argparseimport requestsimport loggingimport reimport transmissionrpcfrom concurrentfutures import ThreadPoolExecutor as_completedfrom tqdm import tqdmfrom filelock import FileLockdef c
已修改代码如下:
import os
import argparse
import requests
import logging
import re
import transmissionrpc
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from filelock import FileLock
def check_dir(path):
if not os.path.exists(path):
os.makedirs(path)
elif not os.path.isdir(path):
raise argparse.ArgumentTypeError(f'{path} is not a directory')
return path
def check_url(url):
pattern = r'^https?://|^ftp://|^magnet:'
if not re.match(pattern, url):
raise argparse.ArgumentTypeError(f'{url} is not a valid url')
return url
def download_file(url, filename, num_threads=4):
response = requests.head(url)
size = int(response.headers.get('Content-Length'))
chunk_size = int(size / num_threads) + 1
with open(filename, 'wb') as f:
with tqdm(total=size, unit='B', unit_scale=True, desc=os.path.basename(filename), ncols=80, position=0) as bar:
futures = []
for i in range(num_threads):
start = i * chunk_size
end = min(start + chunk_size, size)
futures.append((url, start, end, f))
with ThreadPoolExecutor(max_workers=num_threads) as executor:
for future in as_completed(executor.submit(download_range, *future, bar) for future in futures):
try:
future.result()
except Exception as e:
logging.error(f'Download failed: {e}')
if os.path.getsize(filename) != size:
os.remove(filename)
raise Exception(f'Download incomplete: {filename}')
logging.info(f'Download completed: {filename}')
def download_range(url, start, end, fileobj, bar):
headers = {'Range': f'bytes={start}-{end-1}'}
with requests.get(url, headers=headers, stream=True) as response:
with open(fileobj.name, 'rb+') as f:
with FileLock(f'{fileobj.name}.lock'):
f.seek(start)
for chunk in response.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
bar.update(len(chunk))
def download_ftp(url, filename, num_threads=4):
with requests.get(url, stream=True) as response:
total_size = response.headers.get('Content-Length')
if total_size:
total_size = int(total_size.strip())
with tqdm(total=total_size, unit='B', unit_scale=True, desc=os.path.basename(filename), ncols=80, position=0) as bar:
with open(filename, 'wb') as f:
chunk_size = int(total_size / num_threads) + 1
futures = []
for i in range(num_threads):
start = i * chunk_size
end = min(start + chunk_size, total_size)
futures.append((url, start, end, f, bar))
with ThreadPoolExecutor(max_workers=num_threads) as executor:
for future in as_completed(executor.submit(download_range_ftp, *future) for future in futures):
try:
future.result()
except Exception as e:
logging.error(f'Download failed: {e}')
if os.path.getsize(filename) != total_size:
os.remove(filename)
raise Exception(f'Download incomplete: {filename}')
logging.info(f'Download completed: {filename}')
else:
with open(filename, 'wb') as f:
f.write(response.content)
logging.info(f'Download completed: {filename}')
def download_range_ftp(url, start, end, fileobj, bar):
headers = {'Range': f'bytes={start}-{end-1}'}
with requests.get(url, headers=headers, stream=True) as response:
with open(fileobj.name, 'rb+') as f:
with FileLock(f'{fileobj.name}.lock'):
f.seek(start)
for chunk in response.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
bar.update(len(chunk))
def download_torrent(magnet, filename):
tc = transmissionrpc.Client('localhost', port=9091)
tc.add_torrent(magnet, download_dir=os.path.dirname(filename))
logging.info(f'Download started: {filename}')
def main():
parser = argparse.ArgumentParser(description='A command-line tool for downloading files.')
parser.add_argument('-u', '--url', metavar='<URL>', type=check_url, required=True, help='download url')
parser.add_argument('-o', '--output', metavar='<FILENAME>', type=str, required=True, help='output filename')
parser.add_argument('-t', '--threads', metavar='<NUM_THREADS>', type=int, default=4, help='number of threads for downloading')
parser.add_argument('-d', '--dir', metavar='<DIRECTORY>', type=check_dir, default='./Download', help='output directory')
args = parser.parse_args()
url, filename, num_threads, output_dir = args.url, args.output, args.threads, args.dir
if not os.path.exists(output_dir):
create_dir = input("Directory does not exist. Do you want to create it? (y/n) ")
if create_dir.lower() == 'y':
os.makedirs(output_dir)
else:
return
filename = os.path.join(output_dir, filename)
if os.path.isfile(filename):
i = 1
while True:
new_filename = f'{os.path.splitext(filename)[0]}_{i}{os.path.splitext(filename)[1]}'
if not os.path.isfile(new_filename):
filename = new_filename
break
i += 1
if url.startswith('http'):
download_file(url, filename, num_threads)
elif url.startswith('ftp'):
download_ftp(url, filename, num_threads)
elif url.startswith('magnet'):
download_torrent(url, filename)
else:
logging.error(f'The url {url} does not start with http, ftp or magnet.')
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
main()
在原代码的基础上,修改了以下内容:
- 在
main函数中加入了判断是否存在下载文件夹的代码,并询问用户是否要创建文件夹。 - 将默认下载文件夹路径改为
./Download。 - 在
check_dir函数中,若路径不存在则创建对应的文件夹
原文地址: https://www.cveoy.top/t/topic/hnwo 著作权归作者所有。请勿转载和采集!