一、SSE技术背景与适用场景
Server-Sent Events(SSE)是一种基于HTTP协议的服务器推送技术,允许客户端建立持久连接并接收服务器持续发送的流式数据。相较于WebSocket的全双工通信,SSE具有以下优势:
- 基于标准HTTP协议,无需额外端口或协议支持
- 自动重连机制保障连接稳定性
- 支持自定义事件类型与ID标识
- 天然适配需要单向数据推送的场景(如实时日志、股票行情、AI推理结果流)
在Uniapp小程序开发中,SSE特别适用于需要接收服务器持续输出的场景。例如某AI大模型推理服务,其输出结果采用流式传输方式,每产生一个token便通过SSE通道推送至客户端,此时使用SSE可实现边推理边显示的效果,显著提升用户体验。
二、核心实现方案解析
2.1 基础请求配置
实现SSE通信的关键在于正确配置HTTP请求参数。以下配置项需特别注意:
uni.request({url: 'https://api.example.com/stream', // 替换为实际接口method: 'GET',enableChunked: true, // 启用分块传输编码responseType: 'arraybuffer', // 以二进制形式接收数据header: {'Accept': 'text/event-stream','Content-Type': 'application/json','X-API-KEY': 'your_api_key' // 实际开发中应从安全配置获取}})
关键参数说明:
enableChunked:必须设置为true以支持分块传输responseType:必须使用arraybuffer处理二进制数据流- 请求头需明确声明Accept类型为text/event-stream
2.2 数据流处理机制
接收到的原始数据需经过三步处理:
- 二进制转换:将ArrayBuffer转换为可读字符串
- 消息分割:按双换行符分割完整消息
- 数据提取:解析SSE标准格式中的data字段
requestTask.onChunkReceived((res) => {// 二进制转换流程const arrayBuffer = res.dataconst uint8Array = new Uint8Array(arrayBuffer)const base64Data = uni.arrayBufferToBase64(uint8Array)const decodedData = Buffer.from(base64Data, 'base64').toString('utf8')// 消息分割处理const messages = decodedData.split('\n\n')messages.forEach(message => {if (message.includes('data:')) {const dataMatch = message.match(/data:(\{.*\})/)if (dataMatch) {const jsonData = JSON.parse(dataMatch[1])processSSEData(jsonData) // 自定义数据处理函数}}})})
2.3 特殊格式处理
针对AI推理场景中常见的特殊格式,需实现以下解析逻辑:
- Think标签处理:识别并分割包裹的中间结果
- 增量内容合并:维护上下文状态实现内容连续性
- 错误状态检测:识别服务器推送的错误事件
function processSSEData(data) {const content = data.choices?.[0]?.delta?.content || ''// Think标签处理逻辑if (content.includes('<think>')) {return {id: data.id,content: content.split('<think>')[0],isThinking: true}}if (content.includes('</think>')) {const baseContent = this.currentContext || ''return {id: data.id,content: baseContent + content.split('</think>')[1],isThinking: false}}// 常规内容处理return {id: data.id,content: this.currentContext? this.currentContext + content: content}}
三、完整实现示例
3.1 请求管理类封装
class SSEClient {constructor(url, apiKey) {this.url = urlthis.apiKey = apiKeythis.currentContext = ''this.isInThink = falsethis.requestTask = null}connect() {this.requestTask = uni.request({url: this.url,enableChunked: true,responseType: 'arraybuffer',header: {'Accept': 'text/event-stream','X-API-KEY': this.apiKey},method: 'GET'})this.setupEventListeners()}setupEventListeners() {this.requestTask.onHeadersReceived((res) => {console.log('Headers received:', res)})this.requestTask.onChunkReceived((res) => {try {const decoded = this.decodeChunk(res.data)this.processMessages(decoded)} catch (error) {console.error('Chunk processing failed:', error)}})}decodeChunk(arrayBuffer) {const uint8 = new Uint8Array(arrayBuffer)const base64 = uni.arrayBufferToBase64(uint8)return Buffer.from(base64, 'base64').toString('utf8')}processMessages(dataStr) {const messages = dataStr.split('\n\n')messages.forEach(msg => {if (msg.startsWith('data:')) {const jsonStr = msg.replace('data:', '').trim()try {const parsed = JSON.parse(jsonStr)this.handleParsedData(parsed)} catch (e) {console.error('JSON parse error:', e)}}})}handleParsedData(data) {// 实现具体业务逻辑console.log('Received data:', data)}disconnect() {if (this.requestTask) {this.requestTask.abort()this.requestTask = null}}}
3.2 页面集成示例
// pages/stream-demo/index.vueexport default {data() {return {sseClient: null,messages: []}},onLoad() {this.initSSEConnection()},onUnload() {this.cleanup()},methods: {initSSEConnection() {this.sseClient = new SSEClient('https://api.example.com/stream','your_api_key')this.sseClient.handleParsedData = (data) => {this.messages.push({id: data.id,content: data.choices[0].delta.content})// 触发视图更新this.$forceUpdate()}this.sseClient.connect()},cleanup() {if (this.sseClient) {this.sseClient.disconnect()}}}}
四、常见问题与解决方案
4.1 连接稳定性问题
现象:开发工具正常但真机频繁断开
解决方案:
- 增加心跳机制:每30秒发送注释行保持连接
- 实现自动重连:捕获abort错误后延迟重试
- 优化网络配置:检查小程序基础库版本兼容性
4.2 数据乱序问题
现象:接收到的消息顺序与发送不一致
解决方案:
- 依赖SSE协议的ID字段实现排序
- 在业务层维护消息队列缓冲机制
- 确保服务器正确设置Last-Event-ID头
4.3 性能优化建议
- 节流处理:对高频更新的内容进行合并显示
- 虚拟列表:长消息列表采用虚拟滚动技术
- Web Worker:将复杂解析逻辑移至Worker线程
五、进阶实践建议
- 多事件类型支持:扩展解析逻辑处理event字段
- 重试策略优化:实现指数退避重连机制
- 离线缓存:结合本地存储实现断点续传
- 监控告警:集成日志服务跟踪连接状态
通过系统化的SSE实现方案,开发者可以构建出稳定高效的实时通信应用。建议在实际开发中结合具体业务场景,对上述代码进行适应性调整,并充分测试各种网络环境下的表现。对于高并发场景,可考虑使用消息队列服务作为后端缓冲,进一步提升系统可靠性。