import pandas as pd import os def process_item_mapping(): try: # 路径配置 input_purchases = 'delete/TranLine_Item_Purchases.csv' input_mapping = 'Mapping.csv' output_file = 'add/Updated_TranLine_Item_Purchases.csv' output_unmatched = 'add/Unmatched_Records.csv' # 自动创建输出目录 os.makedirs('add', exist_ok=True) # 读取数据(处理常见编码问题) purchases = pd.read_csv(input_purchases, encoding='utf-8-sig') mapping = pd.read_csv(input_mapping, encoding='utf-8-sig') # 列存在性验证 required_columns = {'Line Unique Key', 'Item'} if not required_columns.issubset(purchases.columns): missing = required_columns - set(purchases.columns) raise ValueError(f"购买表中缺少必要列:{missing}") # 执行左连接 merged = pd.merge( purchases, mapping[['Line Unique Key', 'Item Internal ID']], on='Line Unique Key', how='left', validate='one_to_one' # 确保键唯一性 ) # 列位置调整 if 'Item' in merged.columns: item_pos = merged.columns.get_loc('Item') + 1 cols = list(merged.columns) cols.insert(item_pos, cols.pop(cols.index('Item Internal ID'))) merged = merged[cols] # 生成未匹配报告 unmatched = merged[merged['Item Internal ID'].isna()] if not unmatched.empty: unmatched.to_csv(output_unmatched, index=False) print(f"发现{len(unmatched)}条未匹配记录,已保存到:{output_unmatched}") # 保存结果 merged.to_csv(output_file, index=False, encoding='utf-8-sig') print(f"处理完成!结果已保存到:{output_file}") print(f"总记录数:{len(merged)} | 匹配成功率:{merged['Item Internal ID'].notna().mean():.2%}") except FileNotFoundError as e: print(f"文件不存在错误:{str(e)}") except pd.errors.EmptyDataError: print("错误:CSV文件内容为空或格式错误") except Exception as e: print(f"未预料的错误:{str(e)}") if __name__ == "__main__": process_item_mapping(),帮我优化一下,遍历delete文件夹里的所有文件,不再是某一个文件
时间: 2025-03-13 19:18:01 浏览: 50
<think>好的,我需要帮用户优化他们的Python脚本,原来的脚本只能处理delete文件夹下的特定文件TranLine_Item_Purchases.csv,现在用户希望遍历delete文件夹里的所有文件。首先,我需要理解用户现有的代码结构和功能。
用户当前的process_item_mapping函数有几个主要步骤:读取购买数据和映射文件,进行左连接,调整列顺序,生成未匹配报告,保存结果。问题在于输入文件路径是硬编码的,只能处理一个特定文件。用户希望改为处理delete文件夹下的所有CSV文件。
首先,我需要修改代码,使其遍历delete目录中的所有CSV文件。可以使用os.listdir或glob模块来获取文件列表。考虑到可能需要过滤非CSV文件,用glob.glob('delete/*.csv')更合适,这样可以自动获取所有CSV文件。
接下来,需要为每个输入文件生成对应的输出文件名。原来的输出是add/Updated_TranLine_Item_Purchases.csv,现在可能需要为每个输入文件生成不同的输出文件名,比如在原文件名前加前缀,或者用其他方式命名,避免覆盖。同时,未匹配记录的文件名也需要相应调整,例如使用原文件名加上“_unmatched”后缀。
然后,处理流程需要循环每个文件。读取每个文件时,需要保持原有的列验证、合并、保存等步骤。要注意的是,如果多个文件有相同的列结构,才能进行相同的处理。如果不同文件结构不同,可能需要额外处理,但用户可能假设所有文件结构一致,所以暂时按原逻辑处理。
另外,异常处理需要考虑每个文件的处理过程中可能出现的错误,例如某个文件损坏或格式不对,应该捕获异常并提示用户,同时继续处理其他文件,而不是整个程序终止。因此,在循环内部需要单独的try-except块,确保一个文件的错误不会影响其他文件。
输出目录add需要确保存在,原代码已经用os.makedirs处理,所以这部分可以保持不变。但输出文件的路径生成需要动态创建,例如输出到add/processed目录,或者保持原结构。用户原来的输出文件直接放在add下,可能需要调整路径以避免覆盖,例如为每个输入文件创建一个子目录,或者添加前缀。
还有,原代码中的映射文件Mapping.csv是否需要每个处理文件都读取一次?目前代码是在函数开始时就读取了,这样在循环处理每个文件时,映射文件只需读取一次,提高效率。这部分可以保持不变。
列存在性验证部分,原来的required_columns需要针对每个输入文件进行检查,确保每个文件都有所需的列。这一步在循环内部处理每个文件时进行。
合并后的数据保存时,可能需要不同的输出文件名。例如,输入文件是delete/file1.csv,输出可以是add/file1_updated.csv,未匹配的文件是add/file1_unmatched.csv。这样每个处理后的文件都有对应的输出,避免覆盖。
此外,用户可能需要保留原始文件名的一部分,方便追踪。例如,使用os.path.basename和os.path.splitext来提取文件名和扩展名,然后构造新的输出文件名。
最后,测试优化后的代码是否能够正确处理多个文件,处理过程中是否有足够的错误提示,以及生成的文件是否符合预期。需要确保循环中的每个步骤都正确,特别是文件路径的处理和异常捕获。
总结修改步骤:
1. 使用glob获取delete下所有CSV文件。
2. 循环处理每个文件。
3. 动态生成输出和未匹配文件的路径。
4. 在循环内部处理每个文件,并捕获异常。
5. 调整输出文件名以避免覆盖。
6. 确保映射文件只读取一次,提高效率。
7. 验证每个文件的列是否存在。
8. 处理完成后,输出相应的成功或错误信息。
可能遇到的问题:如果某个文件没有所需的列,应该跳过并提示用户;如果输出目录结构需要调整,可能需要创建子目录;处理大量文件时,性能是否受影响,但用户可能更关注功能的正确性,暂时可以忽略性能优化。
现在根据这些思考,编写优化后的代码,并确保代码结构清晰,注释明确,便于用户理解。</think>---
### **优化后的代码方案**
```python
import pandas as pd
import os
from glob import glob # 新增模块
def process_item_mapping():
try:
# 路径配置
input_mapping = 'Mapping.csv'
output_dir = 'add'
os.makedirs(output_dir, exist_ok=True)
# 读取映射表(仅需一次)
mapping = pd.read_csv(input_mapping, encoding='utf-8-sig')
# 遍历delete文件夹下所有CSV文件
for input_purchases in glob('delete/*.csv'): # 关键修改点
# 动态生成输出路径
base_name = os.path.basename(input_purchases) # 获取原文件名
output_file = f"{output_dir}/Updated_{base_name}"
output_unmatched = f"{output_dir}/Unmatched_{base_name}"
# 处理单个文件
try:
# 读取购买数据
purchases = pd.read_csv(input_purchases, encoding='utf-8-sig')
# 列存在性验证
required_columns = {'Line Unique Key', 'Item'}
if not required_columns.issubset(purchases.columns):
missing = required_columns - set(purchases.columns)
print(f"跳过文件 {base_name}:缺少列 {missing}")
continue
# 执行左连接
merged = pd.merge(
purchases,
mapping[['Line Unique Key', 'Item Internal ID']],
on='Line Unique Key',
how='left',
validate='one_to_one'
)
# 列位置调整
if 'Item' in merged.columns:
item_pos = merged.columns.get_loc('Item') + 1
cols = list(merged.columns)
cols.insert(item_pos, cols.pop(cols.index('Item Internal ID')))
merged = merged[cols]
# 生成未匹配报告
unmatched = merged[merged['Item Internal ID'].isna()]
if not unmatched.empty:
unmatched.to_csv(output_unmatched, index=False)
print(f"发现 {len(unmatched)} 条未匹配记录 → {os.path.basename(output_unmatched)}")
# 保存结果
merged.to_csv(output_file, index=False, encoding='utf-8-sig')
print(f"处理完成:{base_name} → {os.path.basename(output_file)}")
print(f"总记录数:{len(merged)} | 匹配成功率:{merged['Item Internal ID'].notna().mean():.2%}\n")
except pd.errors.EmptyDataError:
print(f"文件 {base_name} 内容为空或格式错误,已跳过")
except Exception as e:
print(f"处理 {base_name} 时发生错误:{str(e)}")
except FileNotFoundError as e:
print(f"关键文件缺失:{str(e)}")
except Exception as e:
print(f"系统级错误:{str(e)}")
if __name__ == "__main__":
process_item_mapping()
```
---
### **核心改进说明**
1. **自动化遍历文件**
- 使用 `glob('delete/*.csv')` 获取目标文件夹下所有CSV文件路径
- 动态处理任意数量的输入文件
2. **智能输出命名**
- 原始文件 `delete/example.csv`
- 生成文件:
- 主输出:`add/Updated_example.csv`
- 未匹配记录:`add/Unmatched_example.csv`
3. **增强的错误处理**
```python
for input_purchases in glob(...):
try:
# 处理单个文件
except:
# 捕获单个文件错误,不影响其他文件处理
```
- 文件级错误隔离:单个文件的异常不会中断整个流程
- 明确跳过有问题的文件并给出原因
4. **保留原数据特征**
- 继承原有列位置调整逻辑
- 保持相同的合并验证规则 (`validate='one_to_one'`)
---
### **使用效果示例**
假设 `delete` 文件夹包含3个文件:
```
delete/
A.csv
B.csv
C.xlsx (非CSV文件会被自动过滤)
```
运行后将生成:
```
add/
Updated_A.csv
Unmatched_A.csv
Updated_B.csv
Unmatched_B.csv
```
---
### **验证方法**
```bash
# 查看处理后的文件列表
ls add
# 检查任意输出文件的结构
head -n 5 add/Updated_A.csv
```
阅读全文