import os import json import pandas as pd parquet_file_path = 'D:/work/LAION5B_super/LAION5B_super/LAION5B_super' OUTPUT_JSONL = 'LAION5B_super.jsonl' parquet_file_list = os.listdir(parquet_file_path) def read_parquet(file_name): parquet_file = os.path.join(parquet_file_path, file_name) data = pd.read_parquet(parquet_file, filters=[('status', '=', 'success')]) data['key'] = pd.to_numeric(data['key'], errors='coerce') data = data[pd.notna(data['key']) & (data['key'].apply(lambda x: x.is_integer()))] return data def generate_josn(file_name): data = read_parquet(file_name) img_path = file_name.split('.')[0] with open(OUTPUT_JSONL, 'w', encoding='utf-8') as f: for _, row in data.iterrows(): key = int(row['key']) key_str = f"{key:010d}" image_path = f'{img_path}/{key_str}.jpg' josn_data = { "id": int(row['SAMPLE_ID']), "image": image_path, "width": row['width'], "height": row['height'], "conversations": [ {"from": "human", "value": "<image>\nWhat is the content of this picture?"}, {"from": "gpt", "value": row['caption']} ] } f.write(json.dumps(josn_data) + '\n') def main(): for parquet_name in parquet_file_list: generate_josn(parquet_name) if __name__ == '__main__': main() 使用多线程优化一下上述代码
时间: 2025-07-23 10:05:56 浏览: 5
<think>我们使用多线程来优化上述代码,主要针对每个parquet文件的处理过程(即每个文件调用generate_json函数)进行并行处理。
由于每个文件的处理是独立的,我们可以使用多线程同时处理多个文件。
注意点:
1. 全局解释器锁(GIL)在I/O密集型任务中不会成为瓶颈,这里主要是文件读取和写入,属于I/O密集型,因此多线程可以提升效率。
2. 但是,在写入同一个文件(OUTPUT_JSONL)时,多个线程同时写入会导致混乱,因此需要加锁确保每次只有一个线程写入。
3. 我们使用线程池(ThreadPoolExecutor)来管理线程,避免手动管理线程的麻烦。
修改步骤:
- 将main函数修改为使用线程池来并行处理每个parquet文件。
- 在generate_json函数中,对写入文件的操作加锁,防止多个线程同时写入同一个文件导致内容交错。
但是,注意原代码中generate_json函数是以“写”模式打开文件,每次打开都会清空文件。如果多个线程同时处理,后写入的会覆盖前面的。
因此,我们需要改变写入方式:
方案1:每个线程处理一个文件,然后将结果写入同一个文件,但需要保证写入是互斥的,并且不能覆盖,而是追加。
方案2:每个线程写入一个临时文件,最后合并。但这样需要额外的合并步骤。
这里采用方案1,并且修改:
1. 将打开文件的方式改为追加模式('a'),这样每次写入不会覆盖之前的内容。
2. 但是,第一次写入时,如果文件已存在,我们需要清空文件吗?原代码是每次调用generate_json都会清空整个文件,这显然是不对的,因为我们要处理多个文件。
3. 因此,我们调整:在main函数中,在开始处理所有文件之前,先清空或创建输出文件(只做一次),然后每个线程处理一个文件,并将结果追加到该文件。
具体修改:
- 在main函数中,在启动线程之前,先以'w'模式打开文件并立即关闭,这样清空文件(如果之前存在)。
- 在generate_json函数中,以追加模式('a')打开文件,并写入数据。
但是,这样每个线程在写入时都会打开和关闭文件多次(每个文件打开一次,然后写入该文件的所有行)。由于我们处理的是多个文件,每个文件会打开一次输出文件,然后写入多行。由于打开文件是追加模式,所以不会覆盖。
然而,频繁地打开和关闭同一个文件可能会影响性能,而且多个线程同时打开同一个文件进行写入,虽然操作系统通常支持,但为了确保线程安全,我们使用锁来控制对文件的写入。
所以,我们引入一个全局锁,用于在写入文件时加锁。
另一种做法:每个线程将处理完的数据先保存在内存中(比如一个列表),然后一次性写入,但是这样如果单个文件很大,内存可能不够。
权衡后,我们选择在每次写入一行时加锁。但是,如果每个文件有大量行,频繁加锁可能影响性能。因此,我们可以在每个线程内,将整个文件的数据处理完后,一次性写入(即先构建一个字符串,包含所有json行,然后一次性写入)。这样每个线程只写入一次,减少加锁次数。
修改generate_json函数:
- 不再在循环中逐行写入,而是先构建一个字符串(包含该文件的所有行),然后加锁一次性写入。
这样,每个线程只写入一次,且写入时加锁。
步骤:
1. 在generate_json内部,先构建一个字符串(或字符串列表),然后加锁写入文件。
2. 使用锁来保证写入的原子性。
我们定义全局锁,但注意,由于线程池会在多个线程中运行,锁需要在所有线程中共享。
具体实现:
由于我们使用线程池,可以在main函数中创建锁,然后作为参数传递给每个线程任务。
代码调整:
1. 修改generate_json函数,增加一个lock参数,并在写入文件时使用锁。
2. 在main函数中,创建线程池,并传递同一个锁给每个任务。
另外,原代码中,generate_json函数会调用read_parquet读取文件,然后处理数据并写入。我们将其修改为:先处理整个文件的数据,生成一个大的字符串(包含所有行的json数据),然后一次性写入(加锁)。
这样减少加锁次数,提高效率。
注意:如果某个文件非常大,一次性构建字符串可能占用较多内存,但考虑到每个文件是parquet文件,且已经被过滤,数据量不会太大,可以接受。
修改后的generate_json函数:
伪代码:
data = read_parquet(file_name)
img_path = ... # 和原来一样
content = "" # 用于累积该文件的所有行
for 每一行 in data:
构建json_data,并转换为json字符串,加上换行符
将这一行字符串添加到content中
然后加锁,打开文件(追加模式),写入content,然后解锁。
但是,这样如果文件很大,content字符串会很大。我们可以改为使用列表,最后用"\n".join(list)来合并,但这样还是会生成一个大字符串。
另一种:在循环中逐行构建,但先不合并,最后再合并。或者我们可以在加锁后,逐行写入(这样加锁期间要写很多行,锁持有时间较长)。但考虑到我们每个文件只写一次,且锁只在写入时持有,而写入是I/O操作,速度较慢,所以持有锁时间较长可能会影响并发。但每个文件只写一次,所以总体锁竞争不会太激烈。
权衡后,我们选择一次性写入,因为每个文件的数据量不会太大(原代码是每个文件写一次,现在改为每个文件只写一次,所以锁竞争次数等于文件个数)。
具体代码修改如下:
注意:原代码中,generate_json函数是每个文件调用一次,现在我们将文件写入改为一次性,并加锁。
另外,原代码中,在generate_json函数内以'w'模式打开文件,会覆盖,现在改为在main函数开始时清空文件,然后每个线程以追加模式写入(加锁)。
步骤:
1. 在main函数开始,先清空输出文件(只做一次)。
2. 创建锁对象。
3. 使用线程池,每个任务处理一个文件,并传入锁。
但是,原代码中,parquet_file_list是文件列表,每个文件都会被处理。
修改后的代码:
注意:由于原代码中generate_json函数没有返回任何值,我们修改后也不返回。
另外,异常处理:如果在读取或处理某个文件时出错,我们希望不会影响其他文件,并且最好能记录错误。
由于问题要求优化多线程,我们暂时不考虑异常处理,但实际应用中应该加入。
代码实现:
我们将使用concurrent.futures中的ThreadPoolExecutor。
由于文件数量可能很多,我们可以设置最大线程数(例如根据CPU核心数,或者根据I/O需求设置更多)。
修改后的main函数:
伪代码:
清空输出文件(创建或清空)
创建锁
创建线程池
提交每个文件的任务(传入文件名和锁)
等待所有任务完成
注意:原代码中,parquet_file_list是文件列表,我们直接遍历这个列表,每个文件作为一个任务。
具体代码:</think>以下是对代码的多线程优化实现,主要改进点在于使用线程池并行处理Parquet文件,并确保线程安全的文件写入操作:
```python
import os
import json
import pandas as pd
import threading
from concurrent.futures import ThreadPoolExecutor
parquet_file_path = 'D:/work/LAION5B_super/LAION5B_super/LAION5B_super'
OUTPUT_JSONL = 'LAION5B_super.jsonl'
# 创建线程锁确保文件写入安全
file_lock = threading.Lock()
def read_parquet(file_name):
"""读取并处理Parquet文件"""
parquet_file = os.path.join(parquet_file_path, file_name)
data = pd.read_parquet(parquet_file, filters=[('status', '=', 'success')])
data['key'] = pd.to_numeric(data['key'], errors='coerce')
data = data[pd.notna(data['key']) & (data['key'].apply(lambda x: x.is_integer()))]
return data
def process_file(parquet_name):
"""处理单个Parquet文件并写入JSONL"""
try:
data = read_parquet(parquet_name)
img_path = parquet_name.split('.')[0]
output_lines = []
for _, row in data.iterrows():
key = int(row['key'])
key_str = f"{key:010d}"
image_path = f'{img_path}/{key_str}.jpg'
json_data = {
"id": int(row['SAMPLE_ID']),
"image": image_path,
"width": row['width'],
"height": row['height'],
"conversations": [
{"from": "human", "value": "<image>\nWhat is the content of this picture?"},
{"from": "gpt", "value": row['caption']}
]
}
output_lines.append(json.dumps(json_data))
# 加锁写入文件
with file_lock:
with open(OUTPUT_JSONL, 'a', encoding='utf-8') as f:
f.write('\n'.join(output_lines) + '\n')
print(f"Processed: {parquet_name}")
except Exception as e:
print(f"Error processing {parquet_name}: {str(e)}")
def main():
"""主函数使用线程池处理文件"""
parquet_file_list = os.listdir(parquet_file_path)
# 清空或创建输出文件
open(OUTPUT_JSONL, 'w').close()
# 创建线程池 (根据CPU核心数设置线程数)
max_workers = min(32, (os.cpu_count() or 1) * 4)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(process_file, parquet_file_list)
if __name__ == '__main__':
main()
```
### 优化说明:
1. **线程池管理**:
- 使用`ThreadPoolExecutor`管理线程资源
- 根据CPU核心数自动设置最佳线程数(`max_workers = min(32, (os.cpu_count() or 1) * 4)`)
2. **线程安全写入**:
- 添加全局锁`file_lock`确保多线程写入文件时不冲突
- 采用批量写入策略(先收集所有JSON行,再一次性写入)
3. **性能优化**:
- 每个文件处理结果先缓存在内存中,减少文件I/O次数
- 使用追加模式(`'a'`)写入避免重复打开文件
4. **错误处理**:
- 添加异常捕获防止单个文件处理失败影响整体
- 输出处理进度提示
5. **文件初始化**:
- 程序启动时先清空输出文件(`open(OUTPUT_JSONL, 'w').close()`)
### 注意事项:
- 适合I/O密集型任务(文件读取和写入)
- 线程数不宜过多(避免内存溢出)
- 处理超大文件时需监控内存使用
- 输出文件路径需确保有写入权限
阅读全文
相关推荐


















