一、SSE协议基础原理
Server-Sent Events(SSE)是一种基于HTTP协议的轻量级服务器推送技术,其核心设计理念是通过持久化连接实现单向数据流传输。相比WebSocket的全双工通信,SSE具有更简单的实现机制和更好的浏览器兼容性,特别适合实时消息推送、日志更新等场景。
1.1 协议交互机制
客户端发起请求时需设置Accept: text/event-stream头部,服务器响应包含三个关键头部:
Content-Type: text/event-streamCache-Control: no-cacheConnection: keep-alive
这种配置确保:
- 浏览器正确解析事件流格式
- 禁用缓存保证数据实时性
- 维持长连接减少握手开销
1.2 数据格式规范
服务器发送的数据需遵循特定格式:
event: customEvent\nid: 12345\ndata: {"key":"value"}\n\n
关键要素:
- 每条消息以双换行符
\n\n终止 - 支持自定义事件类型(event字段)
- 可包含消息ID(id字段)用于断线重连
- 数据部分可以是纯文本或JSON格式
二、Java服务端实现方案
以Spring Boot框架为例,完整实现包含以下核心组件:
2.1 基础控制器实现
@RestController@RequestMapping("/api/sse")public class SseController {@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter createStream() {SseEmitter emitter = new SseEmitter(30_000L); // 30秒超时// 业务逻辑处理...return emitter;}}
关键点:
produces = MediaType.TEXT_EVENT_STREAM_VALUE声明响应类型SseEmitter默认超时时间为30秒,可根据业务调整- 返回类型必须为
SseEmitter或其子类
2.2 高级功能实现
2.2.1 完整生命周期管理
public SseEmitter createEnhancedStream() {SseEmitter emitter = new SseEmitter(60_000L);emitter.onCompletion(() -> {// 连接正常关闭处理log.info("Connection completed");});emitter.onTimeout(() -> {// 超时处理逻辑emitter.complete();});emitter.onError((ex) -> {// 异常处理逻辑log.error("Connection error", ex);});// 异步发送数据CompletableFuture.runAsync(() -> {try {for (int i = 0; i < 10; i++) {emitter.send(SseEmitter.event().id(String.valueOf(i)).name("progress").data("Processing item " + i).reconnectTime(5000));Thread.sleep(1000);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;}
2.2.2 消息发送模式对比
| 发送方式 | 特点 | 适用场景 |
|---|---|---|
emitter.send("data") |
简单字符串发送 | 纯文本通知 |
SseEmitter.event() |
支持完整事件结构 | 需要事件ID、类型等元数据 |
reconnectTime() |
设置自动重连时间 | 网络不稳定环境 |
2.3 性能优化策略
-
连接池管理:
@Beanpublic SseEmitterFactory sseEmitterFactory() {return new DefaultSseEmitterFactory() {@Overridepublic SseEmitter createSseEmitter() {return new SseEmitter(60_000L) {@Overrideprotected void extendResponse(ServerHttpResponse outputMessage) {// 自定义头部设置outputMessage.getHeaders().set("X-Accel-Buffering", "no");}};}};}
-
异步处理架构:
@Servicepublic class SseService {private final BlockingQueue<SseEmitter> emitters = new LinkedBlockingQueue<>();@PostConstructpublic void init() {new Thread(() -> {while (true) {try {SseEmitter emitter = emitters.take();sendUpdates(emitter);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}).start();}public void register(SseEmitter emitter) {emitters.add(emitter);}private void sendUpdates(SseEmitter emitter) {// 实际数据发送逻辑}}
三、典型应用场景
3.1 AI实时问答系统
@GetMapping("/qa/stream")public SseEmitter askQuestion(@RequestParam String query) {SseEmitter emitter = new SseEmitter(120_000L);// 模拟异步处理CompletableFuture.runAsync(() -> {try {// 调用AI服务获取分块结果List<String> chunks = aiService.processInChunks(query);for (String chunk : chunks) {emitter.send(SseEmitter.event().data(chunk).name("answer-chunk"));}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;}
3.2 实时监控仪表盘
@GetMapping("/monitor/metrics")public SseEmitter streamMetrics() {SseEmitter emitter = new SseEmitter();ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {try {Map<String, Object> metrics = monitoringService.getLatestMetrics();emitter.send(SseEmitter.event().data(objectMapper.writeValueAsString(metrics)).name("metrics-update"));} catch (Exception e) {emitter.completeWithError(e);scheduler.shutdown();}}, 0, 1, TimeUnit.SECONDS);emitter.onCompletion(() -> scheduler.shutdown());return emitter;}
四、生产环境注意事项
-
连接管理:
- 实现心跳机制防止连接空闲超时
- 建立连接池监控指标(活跃连接数、错误率等)
-
错误处理:
- 区分客户端断开(
IOException)和业务异常 - 实现指数退避重连策略
- 区分客户端断开(
-
安全考虑:
- 添加CSRF保护
- 实现会话级限流
- 对敏感数据进行加密传输
-
兼容性处理:
- 检测浏览器SSE支持情况
- 提供WebSocket作为降级方案
通过合理应用SSE技术,开发者可以构建出低延迟、高效率的实时数据推送服务。在实际项目中,建议结合业务特点选择合适的消息发送模式,并建立完善的监控体系确保服务稳定性。对于高并发场景,可考虑结合消息队列实现异步处理,进一步提升系统吞吐量。