Python 多线程下载器:高效下载文件
python/nimport sys/nimport os/nimport math/nimport time/nimport requests/nimport urllib.request/nimport threading/nfrom tqdm import tqdm/n/n/nclass DownloadThread(threading.Thread):/n def __init__(self, url, start, end, fileobj, bar):/n threading.Thread.__init__(self)/n self.url = url/n self.start = start/n self.end = end/n self.fileobj = fileobj/n self.bar = bar/n/n def run(self):/n headers = {'Range': 'bytes=%d-%d' % (self.start, self.end)}/n response = requests.get(self.url, headers=headers, stream=True)/n for chunk in response.iter_content(chunk_size=1024):/n if not chunk:/n break/n self.fileobj.write(chunk)/n self.bar.update(len(chunk))/n/n/ndef download_file(url:str, filename:str, num_threads:int=4) -> None:/n response = requests.get(url, stream=True)/n if 'Content-Length' in response.headers:/n file_size = int(response.headers['Content-Length'])/n else:/n print(/'The file size is unknown, use single thread download./')/n download_single_thread(url=url, filename=filename)/n return/n/n # 判断文件大小是否为0/n if file_size == 0:/n raise ValueError(/'Failed to get file size, please check the download URL/') /n/n with open(filename, 'wb') as file:/n with tqdm(total=file_size, unit='B', unit_scale=True,/n desc=os.path.basename(filename), ncols=80, position=0) as bar:/n/n download_time = 0/n chunk_size = 1024/n avg_speed = 0/n last_speed = 0/n while True:/n start = time.time()/n chunk = response.raw.read(chunk_size*num_threads)/n end = time.time()/n/n if not chunk:/n break/n/n used_time = end - start/n download_time += used_time/n/n speed = chunk_size*num_threads/used_time/n avg_speed = (avg_speed + speed) / 2/n/n if avg_speed > 0:/n last_speed = avg_speed/n num_threads = int(file_size / avg_speed / chunk_size) + 1/n num_threads = min(max(num_threads, 1), 32)/n/n thread_pool = []/n start = 0/n end = -1/n for i in range(num_threads):/n start = end + 1/n end = start + chunk_size - 1/n/n if i == num_threads - 1:/n end = file_size - 1/n/n thread = DownloadThread(url=url, start=start, end=end, fileobj=file, bar=bar)/n thread.setDaemon(True)/n thread_pool.append(thread)/n/n for thread in thread_pool:/n thread.start()/n/n for thread in thread_pool:/n thread.join()/n/n bar.update(chunk_size*num_threads)/n/n bar.close()/n/n return filename/n/n/ndef download_single_thread(url:str, filename:str) -> None:/n urllib.request.urlretrieve(url, filename)/n return filename/n/n/ndef download(input_str:str, filename:str, num_threads:int=4) -> None:/n try:/n if input_str.startswith('magnet:'):/n download_magnet(input_str, output_filename)/n if os.path.isdir(filename):/n output_filename = os.path.join('./Download', os.path.basename(input_str))/n else:/n output_filename = os.path.join('./Download', os.path.basename(input_str))/n if not os.path.exists('.//Download'):/n os.makedirs('.//Download')/n else:/n download_file(input_str, output_filename, num_threads)/n /n print(f/'Download succeed! The file is saved as: {output_filename}/')/n except Exception as e:/n print('Download failed, exception:', e)/n/n/nif __name__ == '__main__':/n if len(sys.argv) < 3:/n print(/'Usage: python download.py url output_file/')/n sys.exit(1)/n/n url = sys.argv[1]/n filename = sys.argv[2]/n/n download(input_str=url, filename=filename, num_threads=4)/n/n/n使用方法:/n/n1. 将代码保存为 download.py 文件。/n2. 打开命令行,进入代码所在目录。/n3. 使用以下命令执行代码:/n/nbash/npython download.py [URL] [文件名]/n/n/n例如:/n/nbash/npython download.py https://www.example.com/file.zip file.zip/n/n/n优化建议:/n/n1. 可以添加命令行参数解析,使得使用更加方便。/n2. 可以添加断点续传功能,提高下载效率和稳定性。/n3. 可以添加多种下载方式的支持,例如FTP等。/n4. 可以添加日志记录功能,方便排查问题和优化性能。/n/n代码说明:/n/n- DownloadThread 类:负责下载文件的一部分数据。/n- download_file 函数:根据文件大小动态调整线程数量,并使用进度条显示下载进度。/n- download_single_thread 函数:使用单线程下载文件。/n- download 函数:处理下载请求,根据不同的输入类型调用不同的下载函数。/n/n注意:/n/n- 该代码只支持从 URL 地址下载文件。/n- 下载速度会受到网络带宽等因素的影响。/n- 在使用过程中,请确保目标文件路径可写。/n- 该代码仅供学习参考,请勿用于任何非法用途。/n/n版权声明:/n/n该代码由 Bard 生成,可以自由使用和修改。但请保留版权信息。/n/n免责声明:/n/n该代码仅供学习参考,使用过程中产生的任何问题,由使用者自行负责。作者不对该代码的使用结果承担任何责任。', 'content_html': 'python/nimport sys/nimport os/nimport math/nimport time/nimport requests/nimport urllib.request/nimport threading/nfrom tqdm import tqdm/n/n/nclass DownloadThread(threading.Thread):/n def __init__(self, url, start, end, fileobj, bar):/n threading.Thread.__init__(self)/n self.url = url/n self.start = start/n self.end = end/n self.fileobj = fileobj/n self.bar = bar/n/n def run(self):/n headers = {'Range': 'bytes=%d-%d' % (self.start, self.end)}/n response = requests.get(self.url, headers=headers, stream=True)/n for chunk in response.iter_content(chunk_size=1024):/n if not chunk:/n break/n self.fileobj.write(chunk)/n self.bar.update(len(chunk))/n/n/ndef download_file(url:str, filename:str, num_threads:int=4) -> None:/n response = requests.get(url, stream=True)/n if 'Content-Length' in response.headers:/n file_size = int(response.headers['Content-Length'])/n else:/n print(/'The file size is unknown, use single thread download./')/n download_single_thread(url=url, filename=filename)/n return/n/n # 判断文件大小是否为0/n if file_size == 0:/n raise ValueError(/'Failed to get file size, please check the download URL/') /n/n with open(filename, 'wb') as file:/n with tqdm(total=file_size, unit='B', unit_scale=True,/n desc=os.path.basename(filename), ncols=80, position=0) as bar:/n/n download_time = 0/n chunk_size = 1024/n avg_speed = 0/n last_speed = 0/n while True:/n start = time.time()/n chunk = response.raw.read(chunk_size*num_threads)/n end = time.time()/n/n if not chunk:/n break/n/n used_time = end - start/n download_time += used_time/n/n speed = chunk_size*num_threads/used_time/n avg_speed = (avg_speed + speed) / 2/n/n if avg_speed > 0:/n last_speed = avg_speed/n num_threads = int(file_size / avg_speed / chunk_size) + 1/n num_threads = min(max(num_threads, 1), 32)/n/n thread_pool = []/n start = 0/n end = -1/n for i in range(num_threads):/n start = end + 1/n end = start + chunk_size - 1/n/n if i == num_threads - 1:/n end = file_size - 1/n/n thread = DownloadThread(url=url, start=start, end=end, fileobj=file, bar=bar)/n thread.setDaemon(True)/n thread_pool.append(thread)/n/n for thread in thread_pool:/n thread.start()/n/n for thread in thread_pool:/n thread.join()/n/n bar.update(chunk_size*num_threads)/n/n bar.close()/n/n return filename/n/n/ndef download_single_thread(url:str, filename:str) -> None:/n urllib.request.urlretrieve(url, filename)/n return filename/n/n/ndef download(input_str:str, filename:str, num_threads:int=4) -> None:/n try:/n if input_str.startswith('magnet:'):/n download_magnet(input_str, output_filename)/n if os.path.isdir(filename):/n output_filename = os.path.join('./Download', os.path.basename(input_str))/n else:/n output_filename = os.path.join('./Download', os.path.basename(input_str))/n if not os.path.exists('.//Download'):/n os.makedirs('.//Download')/n else:/n download_file(input_str, output_filename, num_threads)/n /n print(f/'Download succeed! The file is saved as: {output_filename}/')/n except Exception as e:/n print('Download failed, exception:', e)/n/n/nif __name__ == '__main__':/n if len(sys.argv) < 3:/n print(/'Usage: python download.py url output_file/')/n sys.exit(1)/n/n url = sys.argv[1]/n filename = sys.argv[2]/n/n download(input_str=url, filename=filename, num_threads=4)/n/n/n使用方法:/n/n1. 将代码保存为 download.py 文件。/n2. 打开命令行,进入代码所在目录。/n3. 使用以下命令执行代码:/n/nbash/npython download.py [URL] [文件名]/n/n/n例如:/n/nbash/npython download.py https://www.example.com/file.zip file.zip/n/n/n优化建议:/n/n1. 可以添加命令行参数解析,使得使用更加方便。/n2. 可以添加断点续传功能,提高下载效率和稳定性。/n3. 可以添加多种下载方式的支持,例如FTP等。/n4. 可以添加日志记录功能,方便排查问题和优化性能。/n/n代码说明:/n/n- DownloadThread 类:负责下载文件的一部分数据。/n- download_file 函数:根据文件大小动态调整线程数量,并使用进度条显示下载进度。/n- download_single_thread 函数:使用单线程下载文件。/n- download 函数:处理下载请求,根据不同的输入类型调用不同的下载函数。/n/n注意:/n/n- 该代码只支持从 URL 地址下载文件。/n- 下载速度会受到网络带宽等因素的影响。/n- 在使用过程中,请确保目标文件路径可写。/n- 该代码仅供学习参考,请勿用于任何非法用途。/n/n版权声明:/n/n该代码由 Bard 生成,可以自由使用和修改。但请保留版权信息。/n/n免责声明:/n/n该代码仅供学习参考,使用过程中产生的任何问题,由使用者自行负责。作者不对该代码的使用结果承担任何责任。', 'url': nul
原文地址: https://www.cveoy.top/t/topic/oHPA 著作权归作者所有。请勿转载和采集!