/** * @author zhaobingqian * @version 1.0 * @since 2021/12/27 */ @Configuration @EnableWebSocketMessageBroker public class WebsocketConfig extends WebSocketMessageBrokerConfigurationSupport implements WebSocketMessageBrokerConfigurer { @Qualifier(ThreadPoolNameConstant.API_WEBSOCKET_MESSAGE_OUT_POOL) @Autowired private ThreadPoolTaskExecutor threadPoolTaskExecutor; @Resource private HttpSessionInterceptor httpSessionInterceptor; @Resource private CloudAccessHttpSessionInterceptor cloudAccessHttpSessionInterceptor; @Resource private CloudAccessHttpSessionInterceptorV2 cloudAccessHttpSessionInterceptorV2; @Resource private MessageInboundInterceptor messageInboundV2Interceptor; @Resource private MessageOutboundInterceptor messageOutboundInterceptor; /** * 注册Stomp的端点,这个HTTP URL是供WebSocket或SockJS客户端访问的地址 */ @Override public void registerStompEndpoints(StompEndpointRegistry registry) { // normal stomp endpoint registry.addEndpoint(WebsocketConstant.NORMAL_ENDPOINT) .addInterceptors(httpSessionInterceptor) .setAllowedOriginPatterns("*") .withSockJS() // 设置SockJS连接超时时间为30s .setDisconnectDelay(TimeUnit.SECONDS.toMillis(30)); // cloud-access(local vms) stomp endpoint registry.addEndpoint(WebsocketConstant.CLOUD_ACCESS_ENDPOINT) .addInterceptors(cloudAccessHttpSessionInterceptor) .setAllowedOriginPatterns("*") .withSockJS() // 设置SockJS连接超时时间为30s .setDisconnectDelay(TimeUnit.SECONDS.toMillis(30)); // cloud-access(local vms) stomp endpoint registry.addEndpoint(WebsocketConstant.CLOUD_ACCESS_ENDPOINT_V2) .addInterceptors(cloudAccessHttpSessionInterceptorV2) .setAllowedOriginPatterns("*") .withSockJS() // 设置SockJS连接超时时间为30s .setDisconnectDelay(TimeUnit.SECONDS.toMillis(30)); // devops endpoint registry.addEndpoint(WebsocketConstant.DEVOPS_ENDPOINT) .addInterceptors(new DevopsHandshakeInterceptor()) // 添加允许跨域访问 .setAllowedOriginPatterns("*") // 支持SockJS .withSockJS() // 设置SockJS连接超时时间为30s .setDisconnectDelay(TimeUnit.SECONDS.toMillis(30)); } @Override public void configureWebSocketTransport(WebSocketTransportRegistration registry) { } /** * 配置消息代理 */ @Override public void configureMessageBroker(MessageBrokerRegistry registry) { super.configureMessageBroker(registry); // 在 /topic、/queue 这三个域上可以向客户端发消息, 规定 /topic 用于广播,/queue 用于单播 registry.enableSimpleBroker( WebsocketConstant.SIMPLE_BROKER_PREFIX_TOPIC, WebsocketConstant.SIMPLE_BROKER_PREFIX_QUEUE); // 给指定用户发送一对一的消息前缀 /v2user,用于单播 registry.setUserDestinationPrefix(WebsocketConstant.USER_DESTINATION_PREFIX_V2); // 客户端向服务端发起请求时,需要以/app为前缀,可被controller拦截,能进入controller层(未使用) registry.setApplicationDestinationPrefixes(WebsocketConstant.APPLICATION_DESTINATION_PREFIX); } @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(messageInboundV2Interceptor); } @Override public void configureClientOutboundChannel(ChannelRegistration registration) { registration.interceptors(messageOutboundInterceptor); // 使用线程池增加WebSocket发送消息的并发性能 registration.taskExecutor(threadPoolTaskExecutor); } @Override public void addArgumentResolvers(List<HandlerMethodArgumentResolver> argumentResolvers) { super.addArgumentResolvers(argumentResolvers); } @Override public void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> returnValueHandlers) { super.addReturnValueHandlers(returnValueHandlers); } @Override public boolean configureMessageConverters(List<MessageConverter> messageConverters) { if (CollectionUtils.isEmpty(messageConverters)) { messageConverters.add(new StringMessageConverter()); messageConverters.add(new ByteArrayMessageConverter()); messageConverters.add(createJacksonConverter()); } // 返回false后Spring不会添加其他默认Converter return false; } @Bean @Override public WebSocketHandler subProtocolWebSocketHandler(AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel) { return new SessionHolderWsHandler(clientInboundChannel, clientOutboundChannel); } }讲解一下
时间: 2025-09-16 07:45:53 AIGC 浏览: 1
这段代码是一个 Spring WebSocket 的配置类 `WebsocketConfig`,主要用于配置 WebSocket 消息代理、端点注册、消息转换器、拦截器等。以下是对代码的详细讲解:
---
### **1. 类定义**
```java
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfig extends WebSocketMessageBrokerConfigurationSupport
implements WebSocketMessageBrokerConfigurer {
```
- **`@Configuration`**:标识这是一个 Spring 配置类。
- **`@EnableWebSocketMessageBroker`**:启用 WebSocket 消息代理功能,支持 STOMP 协议。
- **`extends WebSocketMessageBrokerConfigurationSupport`**:继承基础配置类,提供默认的 WebSocket 配置。
- **`implements WebSocketMessageBrokerConfigurer`**:实现接口,用于自定义 WebSocket 配置。
---
### **2. 依赖注入**
```java
@Qualifier(ThreadPoolNameConstant.API_WEBSOCKET_MESSAGE_OUT_POOL)
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Resource
private HttpSessionInterceptor httpSessionInterceptor;
@Resource
private CloudAccessHttpSessionInterceptor cloudAccessHttpSessionInterceptor;
@Resource
private CloudAccessHttpSessionInterceptorV2 cloudAccessHttpSessionInterceptorV2;
@Resource
private MessageInboundInterceptor messageInboundV2Interceptor;
@Resource
private MessageOutboundInterceptor messageOutboundInterceptor;
```
- **`ThreadPoolTaskExecutor`**:用于配置 WebSocket 消息发送的线程池,提高并发性能。
- **拦截器(Interceptors)**:
- `HttpSessionInterceptor`:普通 HTTP 会话拦截器。
- `CloudAccessHttpSessionInterceptor` 和 `CloudAccessHttpSessionInterceptorV2`:云访问相关的会话拦截器。
- `MessageInboundInterceptor` 和 `MessageOutboundInterceptor`:消息入站和出站拦截器。
---
### **3. 注册 STOMP 端点**
```java
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint(WebsocketConstant.NORMAL_ENDPOINT)
.addInterceptors(httpSessionInterceptor)
.setAllowedOriginPatterns("*")
.withSockJS()
.setDisconnectDelay(TimeUnit.SECONDS.toMillis(30));
registry.addEndpoint(WebsocketConstant.CLOUD_ACCESS_ENDPOINT)
.addInterceptors(cloudAccessHttpSessionInterceptor)
.setAllowedOriginPatterns("*")
.withSockJS()
.setDisconnectDelay(TimeUnit.SECONDS.toMillis(30));
registry.addEndpoint(WebsocketConstant.CLOUD_ACCESS_ENDPOINT_V2)
.addInterceptors(cloudAccessHttpSessionInterceptorV2)
.setAllowedOriginPatterns("*")
.withSockJS()
.setDisconnectDelay(TimeUnit.SECONDS.toMillis(30));
registry.addEndpoint(WebsocketConstant.DEVOPS_ENDPOINT)
.addInterceptors(new DevopsHandshakeInterceptor())
.setAllowedOriginPatterns("*")
.withSockJS()
.setDisconnectDelay(TimeUnit.SECONDS.toMillis(30));
}
```
- **功能**:注册 WebSocket 或 SockJS 的端点,客户端通过这些端点建立连接。
- **配置项**:
- **`addEndpoint()`**:定义 WebSocket 端点路径(如 `/ws`)。
- **`addInterceptors()`**:添加握手拦截器,用于验证连接。
- **`setAllowedOriginPatterns("*")`**:允许跨域请求(`*` 表示允许所有来源)。
- **`withSockJS()`**:启用 SockJS 回退选项(兼容不支持 WebSocket 的浏览器)。
- **`setDisconnectDelay()`**:设置 SockJS 连接断开延迟(30 秒)。
---
### **4. 配置消息代理**
```java
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
super.configureMessageBroker(registry);
registry.enableSimpleBroker(
WebsocketConstant.SIMPLE_BROKER_PREFIX_TOPIC,
WebsocketConstant.SIMPLE_BROKER_PREFIX_QUEUE);
registry.setUserDestinationPrefix(WebsocketConstant.USER_DESTINATION_PREFIX_V2);
registry.setApplicationDestinationPrefixes(WebsocketConstant.APPLICATION_DESTINATION_PREFIX);
}
```
- **功能**:配置消息代理,用于路由消息(广播、单播)。
- **配置项**:
- **`enableSimpleBroker()`**:启用简单内存消息代理,支持 `/topic`(广播)和 `/queue`(单播)。
- **`setUserDestinationPrefix()`**:设置单播消息前缀(如 `/v2user`)。
- **`setApplicationDestinationPrefixes()`**:设置客户端发送消息的前缀(如 `/app`),可被 `@Controller` 拦截。
---
### **5. 配置客户端通道**
```java
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(messageInboundV2Interceptor);
}
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.interceptors(messageOutboundInterceptor);
registration.taskExecutor(threadPoolTaskExecutor);
}
```
- **`configureClientInboundChannel`**:配置入站通道拦截器(如权限校验)。
- **`configureClientOutboundChannel`**:配置出站通道拦截器,并指定线程池提高并发性能。
---
### **6. 消息转换器**
```java
@Override
public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
if (CollectionUtils.isEmpty(messageConverters)) {
messageConverters.add(new StringMessageConverter());
messageConverters.add(new ByteArrayMessageConverter());
messageConverters.add(createJacksonConverter());
}
return false;
}
```
- **功能**:配置消息转换器,用于序列化/反序列化消息。
- **转换器类型**:
- `StringMessageConverter`:处理字符串消息。
- `ByteArrayMessageConverter`:处理二进制消息。
- `JacksonConverter`:处理 JSON 消息(通过 `createJacksonConverter()` 创建)。
- **`return false`**:表示不使用 Spring 默认的转换器。
---
### **7. 自定义 WebSocket 处理器**
```java
@Bean
@Override
public WebSocketHandler subProtocolWebSocketHandler(
AbstractSubscribableChannel clientInboundChannel,
AbstractSubscribableChannel clientOutboundChannel) {
return new SessionHolderWsHandler(clientInboundChannel, clientOutboundChannel);
}
```
- **功能**:自定义 WebSocket 处理器,继承默认的 `SubProtocolWebSocketHandler`。
- **`SessionHolderWsHandler`**:可能是自定义类,用于管理 WebSocket 会话。
---
### **总结**
这段代码完成了以下功能:
1. **注册 WebSocket 端点**:支持普通和云访问场景,兼容 SockJS。
2. **配置消息代理**:实现广播(`/topic`)和单播(`/queue`、`/v2user`)。
3. **拦截器**:对消息的入站、出站和握手过程进行拦截。
4. **线程池优化**:提高消息发送的并发性能。
5. **消息转换**:支持字符串、二进制和 JSON 格式。
6. **自定义处理器**:扩展默认的 WebSocket 行为。
---
阅读全文
相关推荐


















