活动介绍

def process_single_pdf(pdf_path): """处理单个PDF文件(供多进程调用)""" file_name = os.path.basename(pdf_path) print(file_name) all_results = [] all_tables = {} all_wrong = {} try: with pdfplumber.open(pdf_path) as pdf: for page in pdf.pages: page_num = page.page_number tables = page.extract_tables({ "vertical_strategy": "lines", "horizontal_strategy": "lines", "explicit_vertical_lines": page.curves + page.edges, "explicit_horizontal_lines": page.curves + page.edges, "snap_tolerance": 8, "join_tolerance": 6, "text_x_tolerance": 6, "text_y_tolerance": 6 }) if tables: cleaned_tables = [ [ [cell.strip() if cell else "" for cell in row] for row in table ] for table in tables ] [filtered_df,error_flag]= process_single_table(cleaned_tables[0]).copy() # 显式复制 filtered_df['来源页码'] = page_num all_results.append(filtered_df) if error_flag >0: try: wrong_df = pd.DataFrame(cleaned_tables[0]) all_wrong[f"Page_{page_num}"] = wrong_df except Exception as e: print(f"错了也存不进去咯: {str(e)}") except Exception as e: print(f"处理文件 {file_name} 时出错: {str(e)}") # 返回包含错误信息的空DataFrame error_df = pd.DataFrame({"错误信息": [f"{file_name}处理失败: {str(e)}"]}) return pd.DataFrame() # 防御性处理:当没有有效数据时返回空DataFrame if len(all_results) > 0: df_concat = pd.concat(all_results, ignore_index=True) df_concat = df_concat.loc[df_concat['护理指标'] != '其他'] df_ranjinle = merge_nursing_indicators(df_concat) return df_ranjinle else: return pd.DataFrame() # 返回带列名的空D 如何仅在all_wrong非空时存为文件?

时间: 2025-07-12 16:44:47 浏览: 11
<think>好的,我现在需要帮助用户解决他们的问题:如何在处理PDF文件时,仅在`all_wrong`非空的情况下将其保存为文件。用户提供的代码中有一个`all_wrong`字典,用于存储错误的数据表。用户希望只有当这个字典里有内容时,才将其保存,避免生成空的文件。 首先,我需要仔细阅读用户提供的代码,理解各个部分的功能。函数`process_single_pdf`的作用是处理单个PDF文件,提取其中的表格数据,并进行清洗和处理。在处理过程中,如果发现错误(`error_flag > 0`),就会将原始的错误表格数据存入`all_wrong`字典,键是页码,值是对应的DataFrame。 接下来,用户的问题是在函数的最后部分,如何判断`all_wrong`是否有内容,如果有,则保存为文件。当前代码中,处理完所有页面后,返回的是合并后的有效数据`df_ranjinle`或空DataFrame。但用户希望在这里添加逻辑,仅在`all_wrong`非空时保存。 首先,我需要确定保存的条件。`all_wrong`是一个字典,当它非空时,即`len(all_wrong) > 0`,才需要保存。这时候需要将字典中的每个DataFrame保存到文件中,可能是Excel格式,每个页码对应一个工作表。 然后,保存的路径需要确定。用户可能需要指定一个输出目录,或者将文件保存在与原PDF相同的目录下,但文件名需要做区分,例如在原文件名后添加“_错误表”等后缀。这里可能需要用户提供更多信息,但根据现有代码,可以假设有一个全局变量或参数来指定输出路径,或者直接在函数内部处理。 另外,保存的方式可以是Excel文件,因为每个页码的DataFrame可以作为不同的sheet。使用pandas的ExcelWriter可以实现这一点。例如,使用`pd.ExcelWriter`,遍历`all_wrong`的每个键值对,将DataFrame写入不同的sheet。 需要注意的是,保存文件的代码应该放在函数返回之前,即在最后的判断之后。例如,在返回`df_ranjinle`或空DataFrame之前,先检查`all_wrong`是否非空,如果是,就执行保存操作。 现在,考虑代码结构。用户现在的代码在函数末尾返回处理后的数据,所以保存`all_wrong`的操作需要在这个返回之前完成。例如: 在`if len(all_results) > 0:`块内部,处理完`df_ranjinle`之后,或者在最后的else之前,添加对`all_wrong`的判断。 但需要注意,`all_wrong`是在处理每个页面时可能被填充的,所以无论是否有有效数据,都可能存在错误表。例如,可能存在部分页面处理出错,但其他页面成功,此时`all_results`可能有数据,而`all_wrong`也有数据。因此,保存`all_wrong`的逻辑应该独立于`all_results`的判断。 因此,正确的做法是在函数末尾,处理完所有页面后,检查`all_wrong`是否非空,如果是,则保存。无论是否生成了有效的`df_ranjinle`,只要存在错误表,就需要保存。 接下来,保存的具体实现。假设输出路径是原PDF的同目录,文件名加上“_错误表.xlsx”。可以使用`os.path`模块来构建路径。例如: ```python if all_wrong: output_path = os.path.join(os.path.dirname(pdf_path), f"{os.path.splitext(file_name)[0]}_错误表.xlsx") with pd.ExcelWriter(output_path) as writer: for sheet_name, df in all_wrong.items(): df.to_excel(writer, sheet_name=sheet_name, index=False) ``` 这里,使用`os.path.dirname(pdf_path)`获取原PDF的目录,`os.path.splitext(file_name)[0]`获取不带扩展名的文件名,然后拼接成新的输出路径。使用ExcelWriter,遍历`all_wrong`中的每个DataFrame,写入不同的sheet。 需要注意的是,Excel的sheet名称有长度限制(31字符),并且不能包含特殊字符,例如这里的键是`Page_1`这样的格式,应该是合法的。 另外,需要处理可能的异常,比如写入文件时的权限问题等,但根据用户现有代码,可能不需要处理,或者可以添加try-except块。 最后,将这段代码添加到函数末尾,返回之前的位置。例如: ```python # 仅在all_wrong非空时保存错误表 if all_wrong: output_path = os.path.join(os.path.dirname(pdf_path), f"{os.path.splitext(file_name)[0]}_错误表.xlsx") try: with pd.ExcelWriter(output_path) as writer: for sheet_name, df in all_wrong.items(): df.to_excel(writer, sheet_name=sheet_name, index=False) print(f"已保存错误表至: {output_path}") except Exception as e: print(f"保存错误表失败: {str(e)}") if len(all_results) > 0: df_concat = pd.concat(all_results, ignore_index=True) df_concat = df_concat.loc[df_concat['护理指标'] != '其他'] df_ranjinle = merge_nursing_indicators(df_concat) return df_ranjinle else: return pd.DataFrame() ``` 这样,无论是否有有效数据,只要存在错误表,就会保存。并且保存操作在返回之前完成。 另外,需要考虑多进程环境下文件写入是否会有冲突,但用户函数是处理单个PDF,每个进程处理不同的文件,所以文件名应该是唯一的,不会冲突。 最后,测试一下,如果all_wrong为空,不执行保存操作,否则执行。这符合用户的需求。</think>以下是针对代码中`如何仅在all_wrong非空时存为文件?`问题的分步解决方案: --- ### **实现步骤** 1. **添加保存条件判断** 在函数返回前增加逻辑判断,仅当`all_wrong`字典中包含数据时执行保存操作 ```python # 仅在all_wrong非空时保存错误表 if all_wrong: # 等价于 len(all_wrong) > 0 # 构建输出路径(与原PDF同目录) base_name = os.path.splitext(file_name)[0] # 去除原文件扩展名 output_path = os.path.join( os.path.dirname(pdf_path), f"{base_name}_错误表.xlsx" )
阅读全文

相关推荐

import os import pdfplumber import pandas as pd from multiprocessing import Pool # 设置全局参数 PDF_FOLDER = 'your_folder_path' # 替换为实际文件夹路径 tb_header = pd.read_excel("护理表表头.xlsx", header=0) tb_df = tb_header.copy() def process_single_pdf(pdf_path): """处理单个PDF文件(供多进程调用)""" file_name = os.path.basename(pdf_path) result = {file_name: {}} try: with pdfplumber.open(pdf_path) as pdf: for page in pdf.pages: page_num = page.page_number tables = page.extract_tables({ "vertical_strategy": "lines", "horizontal_strategy": "lines", "explicit_vertical_lines": page.curves + page.edges, "explicit_horizontal_lines": page.curves + page.edges, "snap_tolerance": 8, "join_tolerance": 6, "text_x_tolerance": 6, "text_y_tolerance": 6 }) if tables: cleaned_tables = [ [ [cell.strip() if cell else "" for cell in row] for row in table ] for table in tables ] result[file_name][f"Page_{page_num}"] = cleaned_tables except Exception as e: print(f"处理文件 {file_name} 时出错: {str(e)}") result[file_name] = {"error": str(e)} return result def batch_process_pdfs(folder_path, workers=8): """批量处理PDF文件(多进程版本)""" all_results = {} pdf_files = [ os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.lower().endswith('.pdf') ] with Pool(processes=workers) as pool: results = pool.map(process_single_pdf, pdf_files) for res in results: all_results.update(res) return all_results # 使用示例 if __name__ == "__main__": # 启动多进程处理(workers数量建议设置为CPU核心数×2) final_output = batch_process_pdfs(PDF_FOLDER, workers=8) # 查看结果结构 print(f"已处理 {len(final_output)} 个文件") for filename, content in final_output.items(): print(f"文件 {filename} 包含 {len(content)} 页表格数据") 似乎对于这段代码,只有所有文件都处理完毕才会有输出(final_output)?

import fitz import re import os import argparse from multiprocessing import Pool, freeze_support def extract_name(pdf_path): """从单个PDF提取姓名""" doc = fitz.open(pdf_path) name = None for page in doc: text = page.get_text() match = re.search(r'姓名[::]?\s*\n*([^\s\n]+)', text) if match: name = match.group(1).strip() break doc.close() return name def process_single_pdf(args): """处理单个PDF的包装函数(多进程兼容)""" input_path, output_dir = args file_name = os.path.basename(input_path) output_path = os.path.join(output_dir, file_name) target_name = extract_name(input_path) if not target_name: print(f"警告:{file_name} 未找到姓名信息") return doc = fitz.open(input_path) for page in doc: text_instances = page.search_for(target_name) for rect in text_instances: page.draw_rect(rect, color=(1,1,1), fill=(1,1,1), overlay=True) doc.save(output_path) doc.close() def batch_process(input_dir, workers=4): """批量处理入口函数""" # 创建输出目录 output_dir = os.path.join(input_dir, "processed_pdfs") os.makedirs(output_dir, exist_ok=True) # 获取所有PDF文件路径 pdf_files = [ os.path.join(input_dir, f) for f in os.listdir(input_dir) if f.lower().endswith(".pdf") ] # 准备多进程参数 task_args = [(f, output_dir) for f in pdf_files] # 使用多进程池 with Pool(processes=workers) as pool: pool.map(process_single_pdf, task_args) if __name__ == "__main__": freeze_support() # 确保多进程在打包后正常工作 parser = argparse.ArgumentParser(description="批量处理PDF文件,擦除敏感姓名信息") parser.add_argument("--input_dir", required=True, help="输入目录路径(包含待处理PDF)") parser.add_argument("--workers", type=int, default=4, help="并行工作进程数(默认4)") args = parser.parse_args() batch_process(args.input_dir)

def process_single_pdf(pdf_path): “”“处理单个PDF文件(供多进程调用)”“” file_name = os.path.basename(pdf_path) print(file_name) all_results = [] all_tables = {} all_wrong = {} try: with pdfplumber.open(pdf_path) as pdf: for page in pdf.pages: page_num = page.page_number tables = page.extract_tables({ “vertical_strategy”: “lines”, “horizontal_strategy”: “lines”, “explicit_vertical_lines”: page.curves + page.edges, “explicit_horizontal_lines”: page.curves + page.edges, “snap_tolerance”: 8, “join_tolerance”: 6, “text_x_tolerance”: 6, “text_y_tolerance”: 6 }) if tables: cleaned_tables = [ [ [cell.strip() if cell else "" for cell in row] for row in table ] for table in tables ] [filtered_df,error_flag]= process_single_table(cleaned_tables[0]).copy() # 显式复制 filtered_df['来源页码'] = page_num all_results.append(filtered_df) if error_flag >0: try: wrong_df = pd.DataFrame(cleaned_tables[0]) all_wrong[f"Page_{page_num}"] = wrong_df except Exception as e: print(f"错了也存不进去咯: {str(e)}") except Exception as e: print(f"处理文件 {file_name} 时出错: {str(e)}") # 返回包含错误信息的空DataFrame error_df = pd.DataFrame({"错误信息": [f"{file_name}处理失败: {str(e)}"]}) return pd.DataFrame() # 防御性处理:当没有有效数据时返回空DataFrame if len(all_results) > 0: df_concat = pd.concat(all_results, ignore_index=True) df_concat = df_concat.loc[df_concat['护理指标'] != '其他'] df_ranjinle = merge_nursing_indicators(df_concat) return df_ranjinle else: return pd.DataFrame() # 返回带列名的空D 如何仅在all_wrong非空时将其存为文件到指定目录?文件名与file_name相同