Uniapp小程序实现SSE流式响应的完整实践指南

一、SSE技术背景与适用场景

Server-Sent Events(SSE)是一种基于HTTP协议的服务器推送技术,允许客户端建立持久连接并接收服务器持续发送的流式数据。相较于WebSocket的全双工通信,SSE具有以下优势:

  1. 基于标准HTTP协议,无需额外端口或协议支持
  2. 自动重连机制保障连接稳定性
  3. 支持自定义事件类型与ID标识
  4. 天然适配需要单向数据推送的场景(如实时日志、股票行情、AI推理结果流)

在Uniapp小程序开发中,SSE特别适用于需要接收服务器持续输出的场景。例如某AI大模型推理服务,其输出结果采用流式传输方式,每产生一个token便通过SSE通道推送至客户端,此时使用SSE可实现边推理边显示的效果,显著提升用户体验。

二、核心实现方案解析

2.1 基础请求配置

实现SSE通信的关键在于正确配置HTTP请求参数。以下配置项需特别注意:

  1. uni.request({
  2. url: 'https://api.example.com/stream', // 替换为实际接口
  3. method: 'GET',
  4. enableChunked: true, // 启用分块传输编码
  5. responseType: 'arraybuffer', // 以二进制形式接收数据
  6. header: {
  7. 'Accept': 'text/event-stream',
  8. 'Content-Type': 'application/json',
  9. 'X-API-KEY': 'your_api_key' // 实际开发中应从安全配置获取
  10. }
  11. })

关键参数说明

  • enableChunked:必须设置为true以支持分块传输
  • responseType:必须使用arraybuffer处理二进制数据流
  • 请求头需明确声明Accept类型为text/event-stream

2.2 数据流处理机制

接收到的原始数据需经过三步处理:

  1. 二进制转换:将ArrayBuffer转换为可读字符串
  2. 消息分割:按双换行符分割完整消息
  3. 数据提取:解析SSE标准格式中的data字段
  1. requestTask.onChunkReceived((res) => {
  2. // 二进制转换流程
  3. const arrayBuffer = res.data
  4. const uint8Array = new Uint8Array(arrayBuffer)
  5. const base64Data = uni.arrayBufferToBase64(uint8Array)
  6. const decodedData = Buffer.from(base64Data, 'base64').toString('utf8')
  7. // 消息分割处理
  8. const messages = decodedData.split('\n\n')
  9. messages.forEach(message => {
  10. if (message.includes('data:')) {
  11. const dataMatch = message.match(/data:(\{.*\})/)
  12. if (dataMatch) {
  13. const jsonData = JSON.parse(dataMatch[1])
  14. processSSEData(jsonData) // 自定义数据处理函数
  15. }
  16. }
  17. })
  18. })

2.3 特殊格式处理

针对AI推理场景中常见的特殊格式,需实现以下解析逻辑:

  1. Think标签处理:识别并分割包裹的中间结果
  2. 增量内容合并:维护上下文状态实现内容连续性
  3. 错误状态检测:识别服务器推送的错误事件
  1. function processSSEData(data) {
  2. const content = data.choices?.[0]?.delta?.content || ''
  3. // Think标签处理逻辑
  4. if (content.includes('<think>')) {
  5. return {
  6. id: data.id,
  7. content: content.split('<think>')[0],
  8. isThinking: true
  9. }
  10. }
  11. if (content.includes('</think>')) {
  12. const baseContent = this.currentContext || ''
  13. return {
  14. id: data.id,
  15. content: baseContent + content.split('</think>')[1],
  16. isThinking: false
  17. }
  18. }
  19. // 常规内容处理
  20. return {
  21. id: data.id,
  22. content: this.currentContext
  23. ? this.currentContext + content
  24. : content
  25. }
  26. }

三、完整实现示例

3.1 请求管理类封装

  1. class SSEClient {
  2. constructor(url, apiKey) {
  3. this.url = url
  4. this.apiKey = apiKey
  5. this.currentContext = ''
  6. this.isInThink = false
  7. this.requestTask = null
  8. }
  9. connect() {
  10. this.requestTask = uni.request({
  11. url: this.url,
  12. enableChunked: true,
  13. responseType: 'arraybuffer',
  14. header: {
  15. 'Accept': 'text/event-stream',
  16. 'X-API-KEY': this.apiKey
  17. },
  18. method: 'GET'
  19. })
  20. this.setupEventListeners()
  21. }
  22. setupEventListeners() {
  23. this.requestTask.onHeadersReceived((res) => {
  24. console.log('Headers received:', res)
  25. })
  26. this.requestTask.onChunkReceived((res) => {
  27. try {
  28. const decoded = this.decodeChunk(res.data)
  29. this.processMessages(decoded)
  30. } catch (error) {
  31. console.error('Chunk processing failed:', error)
  32. }
  33. })
  34. }
  35. decodeChunk(arrayBuffer) {
  36. const uint8 = new Uint8Array(arrayBuffer)
  37. const base64 = uni.arrayBufferToBase64(uint8)
  38. return Buffer.from(base64, 'base64').toString('utf8')
  39. }
  40. processMessages(dataStr) {
  41. const messages = dataStr.split('\n\n')
  42. messages.forEach(msg => {
  43. if (msg.startsWith('data:')) {
  44. const jsonStr = msg.replace('data:', '').trim()
  45. try {
  46. const parsed = JSON.parse(jsonStr)
  47. this.handleParsedData(parsed)
  48. } catch (e) {
  49. console.error('JSON parse error:', e)
  50. }
  51. }
  52. })
  53. }
  54. handleParsedData(data) {
  55. // 实现具体业务逻辑
  56. console.log('Received data:', data)
  57. }
  58. disconnect() {
  59. if (this.requestTask) {
  60. this.requestTask.abort()
  61. this.requestTask = null
  62. }
  63. }
  64. }

3.2 页面集成示例

  1. // pages/stream-demo/index.vue
  2. export default {
  3. data() {
  4. return {
  5. sseClient: null,
  6. messages: []
  7. }
  8. },
  9. onLoad() {
  10. this.initSSEConnection()
  11. },
  12. onUnload() {
  13. this.cleanup()
  14. },
  15. methods: {
  16. initSSEConnection() {
  17. this.sseClient = new SSEClient(
  18. 'https://api.example.com/stream',
  19. 'your_api_key'
  20. )
  21. this.sseClient.handleParsedData = (data) => {
  22. this.messages.push({
  23. id: data.id,
  24. content: data.choices[0].delta.content
  25. })
  26. // 触发视图更新
  27. this.$forceUpdate()
  28. }
  29. this.sseClient.connect()
  30. },
  31. cleanup() {
  32. if (this.sseClient) {
  33. this.sseClient.disconnect()
  34. }
  35. }
  36. }
  37. }

四、常见问题与解决方案

4.1 连接稳定性问题

现象:开发工具正常但真机频繁断开
解决方案

  1. 增加心跳机制:每30秒发送注释行保持连接
  2. 实现自动重连:捕获abort错误后延迟重试
  3. 优化网络配置:检查小程序基础库版本兼容性

4.2 数据乱序问题

现象:接收到的消息顺序与发送不一致
解决方案

  1. 依赖SSE协议的ID字段实现排序
  2. 在业务层维护消息队列缓冲机制
  3. 确保服务器正确设置Last-Event-ID头

4.3 性能优化建议

  1. 节流处理:对高频更新的内容进行合并显示
  2. 虚拟列表:长消息列表采用虚拟滚动技术
  3. Web Worker:将复杂解析逻辑移至Worker线程

五、进阶实践建议

  1. 多事件类型支持:扩展解析逻辑处理event字段
  2. 重试策略优化:实现指数退避重连机制
  3. 离线缓存:结合本地存储实现断点续传
  4. 监控告警:集成日志服务跟踪连接状态

通过系统化的SSE实现方案,开发者可以构建出稳定高效的实时通信应用。建议在实际开发中结合具体业务场景,对上述代码进行适应性调整,并充分测试各种网络环境下的表现。对于高并发场景,可考虑使用消息队列服务作为后端缓冲,进一步提升系统可靠性。