Python 队列
时间: 2025-05-19 19:12:36 浏览: 16
### Python 中队列的使用方法与实现
#### 使用标准库中的 `queue.Queue` 实现线程安全队列
Python 提供了一个内置模块 `queue`,用于创建多生产者/消费者场景下的线程安全队列。该模块支持多种类型的队列操作,例如 FIFO 队列、LIFO 队列以及优先级队列。
以下是基本的 `queue.Queue` 示例:
```python
import threading
from queue import Queue
def worker(q):
while True:
item = q.get()
if item is None:
break
print(f"Processing {item}")
q.task_done()
q = Queue()
threads = []
for i in range(4): # 创建四个工作线程
t = threading.Thread(target=worker, args=(q,))
t.start()
threads.append(t)
for item in range(10): # 向队列中放入十个任务
q.put(item)
# 停止所有线程
for _ in range(4):
q.put(None)
for t in threads:
t.join()
```
上述代码展示了如何通过 `Queue` 类来管理多个线程之间的通信[^1]。
---
#### 使用 `collections.deque` 构建高效的双端队列
对于单线程环境或者不需要线程同步的情况下,可以利用 `deque` 来构建高效队列。`deque` 是双向队列的一种实现形式,在两端执行插入和删除操作的时间复杂度均为 O(1)。
下面是一个简单的例子展示其用法:
```python
from collections import deque
dq = deque(maxlen=5) # 设置最大长度为5
dq.appendleft("a") # 左侧追加元素
dq.extend(["b", "c"]) # 右侧批量扩展
print(dq.popleft()) # 移除并返回左侧第一个元素
print(list(dq)) # 转换为列表查看当前状态
```
这种结构非常适合处理数据流缓冲区等问题[^1]。
---
#### 持久化本地消息队列:`persist-queue`
如果需要保存未完成的任务到磁盘上以便程序重启后继续运行,则可以选择第三方库 `persist-queue`。它提供了文件系统级别的持久存储功能,并且保持良好的性能表现。
安装命令如下所示:
```bash
pip install persist-queue
```
下面是具体的应用实例:
```python
from persistqueue import Queue
q = Queue('mypath', multithreading=True, auto_commit=False)
q.put("task_a")
q.put("task_b")
while not q.empty():
task = q.get()
try:
process(task) # 处理逻辑替换为你自己的函数
finally:
q.task_done() # 确认已完成此项工作
```
这里需要注意的是每次调用 `.get()` 方法获取新项目之后都应当匹配相应的 `.task_done()` 调用来通知内部机制释放资源[^2]。
---
#### 函数式编程工具增强队列操作能力
除了传统的面向对象风格外,还可以借助一些专注于函数式的开源包简化复杂的集合变换过程。比如 Coconut 就是一种兼容 PEP8 的变体语言设计专门服务于此类需求;而像 CyToolz 或 Funcy 这样的框架则进一步封装了许多高级特性使得开发者能够更加简洁明快地表达意图[^3]。
假设我们想过滤掉某些不符合条件的数据项再送入目标容器当中去的话就可以这样写:
```python
from toolz import filter as tz_filter
from funcy import collecting
@collecting
def filtered_queue(items):
yield from tz_filter(lambda x: condition(x), items)
result_queue = list(filtered_queue([1, 2, 3]))
```
在这里,“condition”代表任意布尔判断依据的具体定义部分。
---
问题
阅读全文
相关推荐

















