springboot集成rocketmq stream
时间: 2025-05-30 16:01:35 浏览: 22
### Spring Boot集成RocketMQ Stream实现消息传递功能
#### 一、Spring Cloud Stream简介
Spring Cloud Stream 是一个用于构建微服务应用的消息驱动框架,它能够基于 Spring Boot 创建独立的应用程序。该框架支持多种消息中间件产品的个性化自动配置,并引入了发布-订阅模式、消费者组以及分区的核心概念[^3]。
#### 二、环境准备
为了完成 RocketMQ 和 Spring Boot 的集成,需先确保已安装并运行 RocketMQ 消息队列服务器。如果是在 Windows 平台上操作,则可以按照相关文档指导完成 RocketMQ 的部署与启动过程[^2]。
#### 三、依赖配置
在 Maven 工程中的 `pom.xml` 文件添加如下依赖项以便于使用 Spring Cloud Stream 及其对 RocketMQ 支持的功能模块:
```xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-rocketmq</artifactId>
<version>最新版本号</version>
</dependency>
```
注意替换上述代码片段中的 `最新版本号` 字段为实际可用的版本编号[^1]。
#### 四、配置文件设置
编辑项目的 application.yml 或者 application.properties 文件加入必要的连接参数设定:
```yaml
spring:
cloud:
stream:
rocketmq:
binder:
addresses: http://localhost:8080 # 替换为实际地址
bindings:
input-channel:
destination: test-topic
group: consumer-group
output-channel:
destination: test-topic
```
此部分定义了一个输入通道(input-channel)和输出通道(output-channel),分别对应到名为test-topic的主题上,并指定了消费群组名称consumer-group[^4]。
#### 五、编写业务逻辑代码
下面展示一段简单的生产者与消费者的例子演示如何通过 Spring Cloud Stream 发送接收消息。
##### 生产者端
创建一个 REST 控制器类用作触发事件向指定主题推送数据包:
```java
@RestController
public class MessageProducer {
@Autowired
private Source source;
public void sendMessage(String message){
this.source.output().send(MessageBuilder.withPayload(message).build());
}
}
```
##### 消费者端
同样地,在另一个组件内部监听来自特定渠道的信息流做进一步处理工作:
```java
@Component
@EnableBinding(Sink.class)
public class MessageConsumer {
@StreamListener(Sink.INPUT)
public void receiveMessage(String payload){
System.out.println("Received : "+payload);
}
}
```
以上即完成了基本的消息投递机制建设流程概述。
#### 六、高级特性——事务消息
对于某些场景可能还需要考虑可靠性保障措施比如采用事务型通信方式。此时可自定义 sendTransactionMessage 方法来满足需求:
```java
@Service
public class TransactionalMessageService {
@Resource(name="rocketMQTemplate")
private RocketMQTemplate template;
public String sendTransactionalMessages(){
List<String> messages = Arrays.asList("msg1","msg2",...,"msg10");
ExecutorService executor = Executors.newFixedThreadPool(5);
CountDownLatch latch = new CountDownLatch(messages.size());
for (String msg : messages) {
Future<?> future = executor.submit(() -> {
try{
SendResult result = template.sendMessageInTransaction(
"topicName",
null,
MessageBuilder.withPayload(msg).setHeader("transactionId", UUID.randomUUID()).build(),
null);
System.out.printf("Sent %s with status=%s%n", msg, result.getSendStatus());
}finally{
latch.countDown();
}
});
}
try {latch.await();} catch (InterruptedException e){e.printStackTrace();}
return "All done";
}
}
```
阅读全文
相关推荐


















