RabbitMQ 四种交换机
时间: 2025-05-27 17:25:42 浏览: 16
### RabbitMQ 的四种交换机类型及用法
#### 1. **Direct Exchange**
Direct Exchange 是一种基于精确匹配路由键(Routing Key)的交换机。当消息被发送到 Direct 类型的交换机时,它会根据消息中的 Routing Key 将其传递给绑定该 Routing Key 的队列。
- 使用场景:适用于需要将不同类型的消息分类并发送至特定队列的情况。
- 实现方式:生产者在发送消息时需指定一个 Routing Key,而消费者则通过绑定相应的 Routing Key 到队列上来接收对应的消息[^1]。
```python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明exchange和queue,并设置binding key
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
severities = ['info', 'error']
for severity in severities:
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
def callback(ch, method, properties, body):
print(f"[x] {method.routing_key}: {body}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
```
---
#### 2. **Fanout Exchange**
Fanout Exchange 不关心消息的 Routing Key,而是将接收到的所有消息广播到与其绑定的所有队列中。
- 使用场景:适合用于通知机制或广播消息的应用场合。
- 特点:无需配置 Routing Key 即可实现消息的广泛传播[^2]。
```python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明fanout类型的exchange
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print('[*] Waiting for logs.')
def callback(ch, method, properties, body):
print(f"[x] Received {body}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
```
---
#### 3. **Topic Exchange**
Topic Exchange 支持模糊匹配和通配符的方式进行消息路由。它的 Routing Key 可以是一个由点号分隔的字符串,支持 `*` 和 `#` 两个特殊字符:
- `*` 表示匹配单个词;
- `#` 表示匹配零个或多个词。
- 使用场景:适用于复杂的订阅/发布模型,允许灵活地筛选消息。
- 配置说明:可以通过 Topic Exchange 定义更精细的消息过滤规则[^5]。
```python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明topic类型的exchange
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
bindings_keys = ['#'] # 或者 '*.critical'
for binding_key in bindings_keys:
channel.queue_bind(
exchange='topic_logs',
queue=queue_name,
routing_key=binding_key
)
print("[*] Waiting for logs.")
def callback(ch, method, properties, body):
print(f"[x] {method.routing_key}:{body}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
```
---
#### 4. **Headers Exchange**
Headers Exchange 不依赖于 Routing Key 进行消息路由,而是依据消息头字段的内容决定目标队列。这种交换机通常用于更加复杂的消息匹配逻辑。
- 使用场景:适用于无法仅靠简单的 Routing Key 来区分消息的情况下。
- 工作原理:通过设置 Headers 参数来控制消息流向哪些队列[^3]。
```python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明headers类型的exchange
channel.exchange_declare(exchange='header_exchange', exchange_type='headers')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
arguments = {'x-match': 'all', 'key1': 'value1'}
channel.queue_bind(
exchange='header_exchange',
queue=queue_name,
arguments=arguments
)
print("[*] Waiting for messages with headers matching criteria.")
def callback(ch, method, properties, body):
print(f"[x] Received {body} with headers {properties.headers}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
```
---
### 总结
RabbitMQ 提供了多种交换机类型以满足不同的业务需求。每种交换机都有其独特的特性和适用范围,在实际开发过程中应根据具体的业务场景选择合适的交换机类型[^4]。
阅读全文
相关推荐

















