rocketMq
时间: 2025-06-05 16:52:00 浏览: 19
### RocketMQ 使用指南概述
RocketMQ 是阿里巴巴开源的一款分布式消息中间件,具有高吞吐量、低延迟和可靠的特性。以下是关于如何使用 RocketMQ 的一些核心概念和操作方法。
#### 1. 生产者发送消息
生产者负责将消息发送到 RocketMQ 中的消息队列。常见的发送方式包括同步发送、异步发送以及单向发送。其中单向发送适合于不需要确认机制的场景,例如日志收集[^4]。
```java
public class OnewayProducer {
public static void main(String[] args) throws Exception {
// 初始化生产者实例并设置组名
DefaultMQProducer producer = new DefaultMQProducer("message-group");
// 配置 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 循环发送消息
for (int i = 0; i < 10; i++) {
Message msg = new Message(
"TopicTest", // 主题名称
"TagA", // 消息标签
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) // 消息体
);
// 单向发送消息
producer.sendOneway(msg);
}
// 关闭生产者
producer.shutdown();
}
}
```
#### 2. 消费者接收消息
消费者用于从 RocketMQ 接收消息,并支持两种消费模式:集群消费模式和广播消费模式。默认情况下采用的是集群消费模式,在这种模式下每条消息只会被其中一个消费者实例消费[^5]。
```java
public class PushConsumerDemo {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 实例化消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
// 设置 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅主题及其过滤条件
consumer.subscribe("TopicTest", "*");
// 注册回调函数来处理接收到的消息
consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 开始启动消费者服务
consumer.start();
System.out.println("Consumer Started.");
}
}
```
#### 3. RocketMQ Streams 流计算框架简介
除了基本的消息传递功能外,RocketMQ 还提供了流计算的能力——即 **RocketMQ Streams**。它允许开发者基于消息流构建复杂的实时数据分析应用[^2]。
---
### 如何选择合适的消息中间件?
当面对多种消息中间件的选择时,需考虑具体的业务需求。例如,如果更关注数据的一致性、可靠性和较高的分区数量,则 RocketMQ 可能是一个更好的选项;而 Kafka 更擅长大规模的数据处理任务,在性能优化方面表现突出[^3]。
---
###
阅读全文
相关推荐
















