public class SSEService { private static final Map<String,SseEmitter> sseEmitterMap = new ConcurrentHashMap<>(); public SseEmitter crateSse(String uid) { SseEmitter sseEmitter = new SseEmitter(0L); sseEmitter.onCompletion(() -> { log.info("[{}]结束链接" , uid); sseEmitterMap.remove(uid); }); sseEmitter.onTimeout(() -> { log.info("[{}]链接超时",uid); }); sseEmitter.onError(throwable -> { try{ log.info("[{}]链接异常,{}",uid,throwable.toString()); sseEmitter.send(SseEmitter.event() .id(uid) .name("发生异常") .data("发生异常请重试") .reconnectTime(3000)); sseEmitterMap.put(uid,sseEmitter); }catch (IOException e){ e.printStackTrace(); } }); try{ sseEmitter.send(SseEmitter.event().reconnectTime(5000)); }catch (IOException e){ e.printStackTrace(); } sseEmitterMap.put(uid,sseEmitter); log.info("[{}]创建sse连接成功!",uid); return sseEmitter; } public boolean sendMessage(String uid,String messageId,String message){ if(StringUtils.isEmpty(message)){ log.info("[{}]参数异常,msg为空",uid); return false; } SseEmitter sseEmitter = sseEmitterMap.get(uid); if(sseEmitter == null){ log.info("[{}]sse连接不存在",uid); return false; } try{ sseEmitter.send(SseEmitter.event().id(messageId).reconnectTime(60000).data(message)); log.info("用户{},消息ID:{},推送成功:{}",uid,messageId,message); return true; }catch (IOException e){ sseEmitterMap.remove(uid); log.info("用户{},消息ID:{},消息推送失败:{}",uid,messageId,message); sseEmitter.complete(); return false; } } public void closeSse(String uid){ if(sseEmitterMap.containsKey(uid)){ SseEmitter sseEmitter = sseEmitterMap.get(uid); sseEmitter.complete(); sseEmitterMap.remove(uid); }else { log.info("用户{}连接已关闭",uid); } } }这是发送数据,怎么接收数据
时间: 2025-07-04 07:45:49 浏览: 3
<think>好的,我现在需要帮助用户理解如何接收通过SSE(Server-Sent Events)发送的数据。用户提供了一个Java类SSEService,用于服务器端发送数据,现在想知道客户端如何接收这些数据。
首先,我得回忆一下SSE的基本工作原理。SSE允许服务器主动向客户端推送数据,基于HTTP协议,客户端通过EventSource API来监听事件。用户的代码中,服务器端使用SseEmitter来管理连接,并提供了创建、发送和关闭的方法。那客户端应该怎么接收呢?
接下来,我需要考虑不同的客户端环境。常见的客户端可能是Web前端,使用JavaScript,或者可能是其他类型的客户端,比如移动应用或后端服务。但最常见的是Web前端,所以应该重点讲解如何使用JavaScript的EventSource来接收。
用户提供的代码中有几个关键点:创建SSE连接时发送了reconnectTime,发送消息时指定了event的id、reconnectTime和数据。客户端在接收时可能需要处理不同的事件类型,但通常默认是message事件。需要确认服务器发送的事件名称,如果服务器没有指定name,客户端可能需要监听默认事件。
另外,用户代码中的sendMessage方法发送的事件没有指定name,所以客户端应该监听默认事件,或者服务器发送的事件名称。例如,如果服务器发送时用了.name("事件名"),客户端需要监听对应的事件名。但根据提供的代码,sendMessage中没有设置name,所以事件名称可能是默认的,或者服务器发送的是未命名事件,客户端使用onmessage处理。
还需要考虑连接的生命周期管理,比如如何处理连接关闭、错误和超时。在服务器端的代码中,有onCompletion、onTimeout和onError的处理,客户端也需要相应处理这些情况,比如自动重连。
可能的遗漏点:跨域问题。如果客户端和服务器不在同一个域,可能需要CORS配置。但用户的问题可能不涉及这方面,所以暂时不提,除非用户后续提问。
然后,组织步骤:首先介绍客户端使用EventSource的基本方法,然后处理不同的事件,错误处理,最后给出完整示例。同时,对比不同客户端环境,但主要聚焦于Web前端。
需要检查是否有错误,比如在服务器发送数据时,是否必须设置event的name,如果未设置,客户端如何接收。例如,在SSE中,如果没有指定事件名,客户端使用onmessage来接收数据。如果指定了事件名,客户端需要addEventListener来监听特定事件。
在用户的sendMessage方法中,发送的事件没有设置name,所以客户端应该用onmessage来接收。但服务器端的onError处理中发送了一个name为“发生异常”的事件,这时候客户端需要监听该特定事件。
因此,在回答中需要分情况说明:默认消息和自定义事件的处理。
最后,确保示例代码正确,包括EventSource的创建、事件监听、错误处理等。同时提醒用户可能的兼容性和连接管理问题,比如关闭连接时调用close()方法。</think>以下是接收SSE数据的客户端实现步骤分析:
---
### 一、SSE接收原理
SSE(Server-Sent Events)基于HTTP长连接,客户端通过`EventSource` API监听事件流。服务端代码使用`SseEmitter`管理连接,客户端需按照以下逻辑接收:
---
### 二、Web前端接收实现(JavaScript)
#### 1. 基础接收代码
```javascript
// 创建EventSource连接(对应服务端的crateSse方法)
const eventSource = new EventSource('http://你的服务地址/sse-endpoint?uid=用户ID');
// 监听默认消息事件(服务端sendMessage发送的无name事件)
eventSource.onmessage = (event) => {
console.log('收到消息:',
`消息ID: ${event.lastEventId}`, // 对应服务端.event().id(messageId)
`数据: ${event.data}` // 对应服务端.data(message)
);
};
// 监听自定义事件(对应服务端onError中的.name("发生异常"))
eventSource.addEventListener("发生异常", (event) => {
console.error('服务端异常:', event.data);
});
// 错误处理
eventSource.onerror = (error) => {
console.error('连接异常:', error);
eventSource.close(); // 可选重连逻辑
};
```
#### 2. 关键参数说明
| 服务端行为 | 客户端对应处理 |
|--------------------------|-------------------------------|
| `.id(messageId)` | `event.lastEventId` 读取 |
| `.data(message)` | `event.data` 读取 |
| `.name("事件名")` | `addEventListener("事件名")` |
| `reconnectTime(毫秒)` | 自动重连间隔 |
---
### 三、其他客户端类型
#### 1. Android客户端
需使用第三方库(如`OkHttp`)实现SSE监听:
```kotlin
val request = Request.Builder().url("http://服务地址").build()
val listener = object : EventSourceListener() {
override fun onEvent(event: EventSource, id: String?, type: String?, data: String) {
when (type) {
"发生异常" -> handleError(data)
else -> handleMessage(id, data)
}
}
}
OkHttpClient().newBuilder().build().newEventSource(request, listener)
```
#### 2. Java后端客户端
使用`Spring`的`WebClient`接收:
```java
WebClient client = WebClient.create();
client.get()
.uri("/sse-endpoint")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class)
.subscribe(data -> System.out.println("Received: " + data));
```
---
### 四、连接生命周期管理
```javascript
// 主动关闭连接(对应服务端closeSse方法)
function closeSSE() {
eventSource.close();
console.log('连接已关闭');
}
// 自动重连机制(服务端设置reconnectTime后)
eventSource.onerror = () => {
setTimeout(() => {
new EventSource(...); // 重新创建连接
}, 5000); // 建议使用服务端返回的重连时间
};
```
---
### 五、调试建议
1. 检查响应头需包含:
```http
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
```
2. 使用`curl`测试:
```bash
curl -N http://服务地址/sse-endpoint
```
---
### 六、与服务端代码的映射关系
| 服务端方法 | 客户端触发行为 |
|--------------------|-----------------------------------|
| `sseEmitter.send()`| `onmessage`/`addEventListener` |
| `onCompletion()` | 连接正常关闭时触发 |
| `onTimeout()` | 客户端收到超时信号(需配合重连) |
---
通过上述实现,客户端可以完整接收服务端通过`SSEService`推送的数据流,建议在Web端优先使用EventSource API,注意处理跨域问题和连接状态维护。
阅读全文
相关推荐


















