8.2 Server-Sent Events

8.2 Server-Sent Events (SSE)

Server-Sent Events(SSE)是一种轻量级的服务器推送技术,允许服务器通过HTTP连接主动向客户端发送事件流。与WebSocket不同,SSE设计用于单向服务器到客户端的通信,是实现实时更新的高效解决方案。

协议基础与核心特性

1. 与WebSocket对比

特性 SSE WebSocket
通信方向 单向(服务器→客户端) 双向
协议基础 基于HTTP 独立协议
重连机制 内置自动重连 需手动实现
数据格式 文本(UTF-8) 文本/二进制
浏览器支持 除IE外的所有现代浏览器 所有现代浏览器
适用场景 通知、日志流、状态更新 聊天、游戏、实时协作

2. 事件流格式示例

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

event: status
data: {"userCount": 42}

data: 这是一条多行
data: 消息示例

id: 12345
retry: 5000
: 这是注释行

客户端API实现

1. 基础事件监听

const eventSource = new EventSource('/api/updates');

// 通用消息处理
eventSource.onmessage = (event) => {
  console.log('收到消息:', event.data);
  updateUI(JSON.parse(event.data));
};

// 特定事件类型处理
eventSource.addEventListener('status', (event) => {
  const data = JSON.parse(event.data);
  document.getElementById('userCount').textContent = data.userCount;
});

// 错误处理
eventSource.onerror = (error) => {
  console.error('SSE连接错误:', error);
  if (eventSource.readyState === EventSource.CLOSED) {
    console.log('连接已关闭,尝试重连...');
  }
};

2. 高级连接管理

class SSEClient {
  constructor(url) {
    this.url = url;
    this.eventSource = null;
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = 5;
    this.connect();
  }
  
  connect() {
    this.eventSource = new EventSource(this.url);
    
    this.eventSource.onopen = () => {
      this.reconnectAttempts = 0;
      console.log('SSE连接已建立');
    };
    
    this.eventSource.onerror = () => {
      if (this.eventSource.readyState === EventSource.CLOSED) {
        this.handleDisconnection();
      }
    };
    
    // 注册自定义处理器
    this.eventSource.addEventListener('update', this.handleUpdate.bind(this));
  }
  
  handleDisconnection() {
    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      const delay = Math.min(1000 * 2 ** this.reconnectAttempts, 30000);
      this.reconnectAttempts++;
      
      console.log(`将在 ${delay}ms 后尝试重连...`);
      setTimeout(() => this.connect(), delay);
    } else {
      console.error('达到最大重连次数');
    }
  }
  
  close() {
    this.eventSource.close();
    console.log('SSE连接已主动关闭');
  }
  
  handleUpdate(event) {
    try {
      const data = JSON.parse(event.data);
      console.log('处理更新:', data);
      // 更新逻辑...
    } catch (error) {
      console.error('消息解析失败:', error);
    }
  }
}

// 使用示例
const sseClient = new SSEClient('/api/real-time');

服务器端实现

1. Node.js Express实现

const express = require('express');
const app = express();

app.get('/updates', (req, res) => {
  // 设置SSE专用头部
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive'
  });
  
  // 发送初始数据
  sendEvent(res, 'init', { message: '连接已建立', timestamp: Date.now() });
  
  // 定时发送更新
  const intervalId = setInterval(() => {
    const data = { 
      userCount: Math.floor(Math.random() * 100),
      serverTime: new Date().toISOString()
    };
    sendEvent(res, 'status', data);
  }, 3000);
  
  // 客户端断开时清理
  req.on('close', () => {
    clearInterval(intervalId);
    console.log('客户端断开连接');
    res.end();
  });
});

function sendEvent(res, event, data) {
  const eventData = `event: ${event}\n` +
                   `data: ${JSON.stringify(data)}\n` +
                   `id: ${Date.now()}\n\n`;
  res.write(eventData);
}

app.listen(3000, () => {
  console.log('SSE服务运行在 http://localhost:3000');
});

2. 消息格式生成工具

class SSEMessage {
  constructor(event, data, options = {}) {
    this.event = event;
    this.data = data;
    this.id = options.id || Date.now();
    this.retry = options.retry;
    this.comment = options.comment;
  }
  
  toString() {
    let msg = '';
    
    if (this.comment) {
      msg += `: ${this.comment}\n`;
    }
    
    if (this.event) {
      msg += `event: ${this.event}\n`;
    }
    
    if (Array.isArray(this.data)) {
      this.data.forEach(line => {
        msg += `data: ${line}\n`;
      });
    } else {
      msg += `data: ${this.data}\n`;
    }
    
    if (this.id) {
      msg += `id: ${this.id}\n`;
    }
    
    if (this.retry) {
      msg += `retry: ${this.retry}\n`;
    }
    
    return msg + '\n';
  }
}

// 使用示例
const message = new SSEMessage('update', { value: 42 }, { retry: 5000 });
res.write(message.toString());

高级应用模式

1. 事件源分区

// 客户端订阅特定频道
const userEventSource = new EventSource('/api/events?channel=user_updates');
const systemEventSource = new EventSource('/api/events?channel=system_alerts');

// 服务器端实现
app.get('/events', (req, res) => {
  const channel = req.query.channel || 'default';
  
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive'
  });
  
  // 基于频道的消息分发
  const sendChannelMessage = (msg) => {
    if (msg.channel === channel) {
      res.write(new SSEMessage(msg.type, msg.data).toString());
    }
  };
  
  // 模拟消息发布
  setInterval(() => {
    const messages = [
      { channel: 'user_updates', type: 'profile', data: { userId: 123 } },
      { channel: 'system_alerts', type: 'maintenance', data: { start: '2023-12-01T00:00:00Z' } }
    ];
    
    messages.forEach(sendChannelMessage);
  }, 5000);
});

2. 历史消息重放

app.get('/events-with-history', async (req, res) => {
  // 1. 设置SSE头部
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive'
  });
  
  // 2. 从数据库获取最近10条消息
  const recentMessages = await Message.find()
    .sort({ createdAt: -1 })
    .limit(10);
  
  // 3. 发送历史消息(从旧到新)
  recentMessages.reverse().forEach(msg => {
    res.write(new SSEMessage(msg.type, msg.data, { id: msg._id }).toString());
  });
  
  // 4. 实时新消息
  const listener = (msg) => {
    res.write(new SSEMessage(msg.type, msg.data, { id: msg._id }).toString());
  };
  
  Message.on('new', listener);
  
  // 5. 清理
  req.on('close', () => {
    Message.off('new', listener);
  });
});

性能优化与调试

1. 连接复用策略

// 共享同一个EventSource连接
const globalEventSource = (function() {
  let instance;
  
  return function() {
    if (!instance) {
      instance = new EventSource('/api/events');
      instance.onerror = () => {
        instance = null; // 出错时重置
      };
    }
    return instance;
  };
})();

// 各模块注册自己的处理器
function setupNotificationHandler() {
  const es = globalEventSource();
  es.addEventListener('notification', handleNotification);
}

2. 消息压缩技术

// 服务器端发送压缩消息
function sendCompressed(res, event, data) {
  const jsonStr = JSON.stringify(data);
  const compressed = zlib.deflateSync(jsonStr).toString('base64');
  
  const msg = new SSEMessage(event, {
    _compressed: true,
    payload: compressed
  });
  
  res.write(msg.toString());
}

// 客户端解压处理
eventSource.addEventListener('compressed-data', (event) => {
  const { _compressed, payload } = JSON.parse(event.data);
  if (_compressed) {
    const jsonStr = pako.inflate(atob(payload), { to: 'string' });
    const data = JSON.parse(jsonStr);
    processData(data);
  }
});

3. Chrome调试技巧

// 在开发者工具中监控EventSource
function monitorEventSource() {
  const original = window.EventSource;
  
  window.EventSource = function(url, config) {
    console.log('创建EventSource:', url);
    
    const es = new original(url, config);
    
    es.addEventListener('open', () => {
      console.log('SSE连接已打开');
    });
    
    es.addEventListener('error', (e) => {
      console.error('SSE错误:', e);
    });
    
    return es;
  };
}

// 调用一次开启监控
monitorEventSource();

安全最佳实践

1. 认证与授权

// 携带认证令牌
const eventSource = new EventSource('/api/events', {
  withCredentials: true // 发送cookie
});

// 或使用URL参数
const token = 'user-auth-token';
const es = new EventSource(`/api/events?token=${encodeURIComponent(token)}`);

// 服务器端验证
app.get('/events', authenticate, (req, res) => {
  if (!req.user) {
    res.writeHead(401);
    return res.end();
  }
  
  // 正常SSE逻辑...
});

2. 输入输出过滤

// 安全的SSE消息生成
function createSafeMessage(event, data) {
  // 过滤事件类型
  const safeEvent = event.replace(/[^\w-]/g, '');
  
  // 净化数据
  const safeData = {
    message: sanitize(data.message), // 实现XSS过滤
    timestamp: Date.now()
  };
  
  return new SSEMessage(safeEvent, safeData);
}

// 使用
res.write(createSafeMessage('user_message', userInput).toString());

实际应用案例

1. 实时股票行情展示

// 前端实现
class StockTicker {
  constructor() {
    this.sse = new EventSource('/api/stocks');
    this.stocks = {};
    
    this.sse.addEventListener('price_update', (e) => {
      const { symbol, price, change } = JSON.parse(e.data);
      this.updateStock(symbol, price, change);
    });
  }
  
  updateStock(symbol, price, change) {
    this.stocks[symbol] = { price, change };
    
    const element = document.querySelector(`[data-stock="${symbol}"]`);
    if (element) {
      element.textContent = `${price.toFixed(2)} (${change > 0 ? '+' : ''}${change.toFixed(2)}%)`;
      element.style.color = change >= 0 ? 'green' : 'red';
    }
  }
}

// 后端模拟数据
app.get('/stocks', (req, res) => {
  setupSSEHeaders(res);
  
  const symbols = ['AAPL', 'GOOGL', 'MSFT', 'AMZN'];
  
  setInterval(() => {
    symbols.forEach(symbol => {
      const price = 100 + Math.random() * 50;
      const change = (Math.random() - 0.5) * 5;
      
      res.write(new SSEMessage('price_update', {
        symbol,
        price,
        change
      }).toString());
    });
  }, 1000);
});

2. 后台任务进度监控

// 前端进度显示
const progressSource = new EventSource('/api/task/progress');

progressSource.addEventListener('progress', (e) => {
  const { taskId, progress, status } = JSON.parse(e.data);
  
  const progressBar = document.getElementById(`task-${taskId}-progress`);
  if (progressBar) {
    progressBar.value = progress;
    progressBar.nextElementSibling.textContent = `${progress}% ${status}`;
    
    if (progress === 100) {
      progressSource.close();
    }
  }
});

// 后端任务处理
app.post('/long-task', (req, res) => {
  const taskId = generateTaskId();
  
  // 立即返回任务ID
  res.json({ taskId });
  
  // 在后台执行任务
  executeLongTask(taskId, (progress, status) => {
    // 通过SSE发送进度更新
    sseClients.forEach(client => {
      client.write(new SSEMessage('progress', {
        taskId,
        progress,
        status
      }).toString());
    });
  });
});

Server-Sent Events为Web应用提供了一种简单高效的服务器推送机制,特别适合以下场景:

  • 实时通知系统
  • 数据仪表盘更新
  • 后台任务进度报告
  • 实时日志流监控

相比WebSocket,SSE的优势在于:

  • 更简单的实现(基于HTTP)
  • 内置重连机制
  • 不需要额外协议
  • 更好的浏览器兼容性

开发时需要注意:

  • 合理设置retry时间(建议2-30秒)
  • 处理连接中断和错误恢复
  • 对敏感数据进行适当过滤
  • 考虑连接数限制(浏览器通常限制每个源6个SSE连接)
#前端开发 分享于 2025-05-20

上一篇:8.1 WebSocket 基础 下一篇:8.3 WebRTC 简介
【 内容由 AI 共享,不代表本站观点,请谨慎参考 】