import queueimport threadingfrom concurrent.futures import ThreadPoolExecutor, as_completedfrom threading import Eventdef push_data(data_queue: queue.Queue): while not stop_event.is_set(): data = "123" data_queue.put(data) stop_event.wait(0.5)def process_data(data): # 处理数据逻辑 print("Processing data:", data)if __name__ == '__main__': data_queue = queue.Queue() stop_event = Event() # 创建线程池 with ThreadPoolExecutor(max_workers=5) as executor: # 启动数据生产线程 t = threading.Thread(target=push_data, args=(data_queue,)) t.start() # 从队列中获取数据并处理 while not stop_event.is_set(): try: data = data_queue.get(timeout=1) except queue.Empty: continue # 提交任务到线程池 future = executor.submit(process_data, data) future.add_done_callback(lambda f: f.result()) # 关闭数据生产线程 stop_event.set() t.join() 代码优化,不要 Event
时间: 2024-02-25 14:59:50 浏览: 140
可以使用 `threading.Event` 代替 `queue.Queue` 来控制线程的结束。`threading.Event` 是一个线程同步的工具,可以让一个或多个线程等待某个事件的发生。具体地,可以创建一个 `Event` 对象,调用 `wait()` 方法等待事件的发生,调用 `set()` 方法通知事件的发生。
下面是一个使用 `Event` 改写后的代码:
```python
import queue
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
def push_data(data_list: list):
while data_list:
data = data_list.pop()
data_queue.put(data)
def process_data(data):
# 处理数据逻辑
print("Processing data:", data)
if __name__ == '__main__':
data_queue = queue.Queue()
data_list = ["123", "456", "789"]
# 创建线程池
with ThreadPoolExecutor(max_workers=5) as executor:
# 启动数据生产线程
t = threading.Thread(target=push_data, args=(data_list,))
t.start()
# 从队列中获取数据并处理
while True:
try:
data = data_queue.get(timeout=1)
except queue.Empty:
if not t.is_alive():
break
else:
continue
# 提交任务到线程池
future = executor.submit(process_data, data)
future.add_done_callback(lambda f: f.result())
t.join()
```
在这个代码中,我们将数据存储在一个普通的列表中,用一个专门的线程 `push_data` 来将数据逐一放入队列中。主线程不断从队列中获取数据并处理,如果队列为空并且 `push_data` 线程已经结束,则主线程也结束。这样就可以避免使用 `Event` 对象了。
阅读全文
相关推荐



















