改进代码import requests with open('PRRSV.txt','r') as f: file_names = f.readlines() for file_name in file_names: file_name = file_name.strip() url = f"ftp://ftp.ncbi.nlm.nih.gov/genomes/genbank/all/{file_name}/{file_name}_genomic.gbff.gz" response = requests.get(url) with open(f'{file_name}.gbff.gz', 'wb') as f: f.write(response.content)

时间: 2024-04-27 20:21:51 浏览: 163
你的代码似乎是从NCBI下载一些基因组数据文件,然后保存到本地。如果你想改进代码,可以考虑以下几点: 1. 引入异常处理机制,以应对可能出现的网络连接异常、文件读写异常等情况。 2. 使用多线程或异步IO等技术,以提高下载的效率。 3. 将文件的下载和保存分开,以便于维护和修改。 4. 添加一些日志记录,以便于排查问题和追踪程序的执行情况。 下面是一个可能的改进版本的代码示例,供你参考: ```python import os import logging import requests import concurrent.futures # 设置日志记录器 logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') sh = logging.StreamHandler() sh.setLevel(logging.DEBUG) sh.setFormatter(formatter) logger.addHandler(sh) # 设置下载目录和线程数 DOWNLOAD_DIR = 'genomes' THREADS = 4 # 下载文件的函数 def download_file(url, file_path): try: logger.debug(f"downloading {url}") response = requests.get(url, stream=True) response.raise_for_status() with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=1024): f.write(chunk) logger.debug(f"downloaded {file_path}") except Exception as e: logger.error(f"failed to download {url}: {e}") # 下载文件列表中的所有文件 def download_files(file_list): with concurrent.futures.ThreadPoolExecutor(max_workers=THREADS) as executor: futures = [] for file_name in file_list: file_name = file_name.strip() url = f"ftp://ftp.ncbi.nlm.nih.gov/genomes/genbank/all/{file_name}/{file_name}_genomic.gbff.gz" file_path = os.path.join(DOWNLOAD_DIR, f"{file_name}.gbff.gz") os.makedirs(os.path.dirname(file_path), exist_ok=True) futures.append(executor.submit(download_file, url, file_path)) for future in concurrent.futures.as_completed(futures): future.result() # 主函数 def main(): with open('PRRSV.txt', 'r') as f: file_names = f.readlines() download_files(file_names) if __name__ == '__main__': main() ```
阅读全文

相关推荐

import os import requests from urllib.parse import urljoin from bs4 import BeautifulSoup import logging from pathlib import Path # 配置日志记录 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) def get_page_content(url): """获取网页内容""" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' } try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() response.encoding = response.apparent_encoding return response.text except requests.exceptions.RequestException as e: logging.error(f"请求失败: {e}") return None def extract_file_links(html, base_url): """提取文件下载链接""" soup = BeautifulSoup(html, 'html.parser') file_links = [] # 查找所有包含文件扩展名的链接 for link in soup.find_all('a', href=True): href = link['href'] if any(href.lower().endswith(ext) for ext in ('.pdf', '.doc', '.docx', '.xls', '.xlsx', '.zip', '.rar')): absolute_url = urljoin(base_url, href) file_links.append((absolute_url, link.text.strip())) return file_links def download_file(url, save_dir): """下载单个文件""" try: response = requests.get(url, stream=True, timeout=15) response.raise_for_status() # 从URL获取文件名 filename = os.path.basename(url.split('?')[0]) # 去除URL参数 if not filename: filename = f"file_{hash(url)}" # 创建保存目录 Path(save_dir).mkdir(parents=True, exist_ok=True) # 保存文件 file_path = os.path.join(save_dir, filename) with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) logging.info(f"成功下载: {filename}") return True except Exception as e: logging.error(f"下载失败 {url}: {str(e)}") return False def main(): target_url = "http://www.nhc.gov.cn/lljks/zcwj2/new_list.shtml" save_directory = "./downloaded_files" # 获取页面内容 html_content = get_page_content(target_url) if not html_content: return # 提取文件链接 file_links = extract_file_links(html_content, target_url) if not file_links: logging.warning("未找到可下载文件") return # 下载所有文件 success_count = 0 for url, name in file_links: if download_file(url, save_directory): success_count += 1 logging.info(f"下载完成,成功{success_count}/{len(file_links)}个文件") if __name__ == "__main__": main()