rocketMq
时间: 2025-06-05 08:50:47 浏览: 24
### RocketMQ 入门指南与常见用法
#### 1. RocketMQ 的基本概念
RocketMQ 是一款由阿里巴巴自主研发的高性能消息中间件,具有分布式架构、高吞吐量、高可用性和高可靠性等诸多优点[^3]。它的核心组件包括 NameServer、Broker、Producer 和 Consumer,这些组件共同协作完成消息的生产、存储和消费。
- **NameServer**: 提供路由服务,负责管理 Broker 集群的状态并为 Producer 和 Consumer 提供地址查询。
- **Broker**: 实际存储消息的服务端节点,分为 Master 和 Slave,支持主从同步以提高可靠性。
- **Producer**: 生产者角色,负责向指定 Topic 发送消息。
- **Consumer**: 消费者角色,负责订阅特定 Topic 并处理接收到的消息。
#### 2. 环境准备
在开始使用 RocketMQ 之前,需要安装 JDK (建议版本 8 及以上),并下载 RocketMQ 官方发布的二进制包或源码编译版。以下是启动单机环境的基本步骤:
1. 解压 RocketMQ 压缩包至目标目录;
2. 启动 NameServer:
```bash
nohup sh bin/mqnamesrv &
```
3. 启动 Broker:
```bash
nohup sh bin/mqborker -n localhost:9876 &
```
确认服务正常运行后,可以通过 `tail` 查看日志文件中的状态信息。
#### 3. 示例代码:简单生产者与消费者实现
##### (1)生产者示例
以下是一个简单的生产者代码片段,展示如何向指定 Topic 发送一条消息:
```java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class ProducerExample {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group_name");
producer.setNamesrvAddr("localhost:9876"); // 设置 NameServer 地址
producer.start();
Message msg = new Message("TestTopic", "TagA", ("Hello RocketMQ").getBytes());
producer.send(msg);
producer.shutdown();
}
}
```
##### (2)消费者示例
下面是一段消费者代码,演示如何订阅某个 Topic 并接收消息:
```java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ConsumerExample {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
consumer.setNamesrvAddr("localhost:9876"); // 设置 NameServer 地址
consumer.subscribe("TestTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 返回成功状态
}
});
consumer.start();
}
}
```
#### 4. 关键配置项说明
为了优化性能和满足不同场景的需求,开发者可以根据实际情况调整以下参数:
- **发送超时时间** (`sendMsgTimeout`):设置消息发送的最大等待时间,默认值为 3 秒。
- **重试次数** (`retryTimesWhenSendFailed`):当消息发送失败时允许的最大重试次数。
- **批量大小** (`batchSize`):对于大批量数据传输,启用批量化可以显著提升效率。
此外,在集群部署环境下还需要特别注意 NameServer 和 Broker 的心跳检测机制以及故障切换逻辑[^3]。
---
阅读全文
相关推荐
















