springboot消息
时间: 2025-01-31 21:07:02 浏览: 38
### Spring Boot 消息处理机制及实现方式
#### 使用 Redis 发布/订阅模式的消息队列
在构建分布式系统时,消息传递是一个重要组成部分。为了确保不同组件之间的高效通信,在Spring Boot项目中可以利用Redis的发布/订阅特性来创建轻量级的消息队列[^1]。
```java
// 配置类用于设置RedisTemplate序列化器和其他配置项
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory){
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
```
对于生产者端来说,只需要向指定频道发送消息即可;而对于消费者,则监听该频道并接收相应数据:
```java
// 生产者服务类负责推送消息到特定主题
@Service
public class ProducerService {
private final StringRedisTemplate stringRedisTemplate;
public ProducerService(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
void sendMessageToChannel(String channelName, Message message) {
stringRedisTemplate.convertAndSend(channelName,message.toString());
}
}
// 订阅者监听某个或某些通道上的事件,并执行相应的业务逻辑
@Component
@RequiredArgsConstructor
public class SubscriberListener implements MessageListener {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Override
public void onMessage(Message message, byte[] pattern) {
logger.info("Received message '{}' from topic '{}'",message.toString(),new String(pattern));
// 处理接收到的信息...
}
// 可以定义多个方法分别对应不同的channel
}
```
以上代码片段展示了如何借助于`StringRedisTemplate`对象来进行简单的消息广播操作以及怎样注册一个实现了`MessageListener`接口的服务实例去捕获来自目标topic的数据流。
#### 异步消息处理支持
除了同步的方式外,还可以采用异步的方式来优化系统的整体表现力。Spring Boot 提供了强大的异步编程模型,允许开发者通过简单地添加 `@Async` 注解就能让方法变为非阻塞调用形式[^2]。
需要注意的是,默认情况下所有的异步任务都会被提交给同一个线程池内排队等待被执行。如果希望自定义调度策略的话就需要额外声明一个名为 "taskExecutor" 的 Bean 来覆盖默认行为。
```java
@EnableAsync
@SpringBootApplication
public class AsyncApplication {
...
}
// 自定义线程池配置
@Bean(name="customTaskExecutor")
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
executor.initialize();
return executor;
}
// 应用了@Async的方法将在后台运行而不影响主线程流程
@Async("customTaskExecutor")
public Future<Boolean> asyncProcessWithResult(@NonNull final Long id) throws InterruptedException{
Thread.sleep(id * 10L);
log.debug("Processing finished for ID={}",id);
return new AsyncResult<>(true);
}
```
上述例子说明了如何开启全局异步功能开关并通过重载 Executor 参数改变并发控制参数从而更好地适应实际应用场景下的需求变化趋势。
阅读全文
相关推荐









