8.2 Server-Sent Events
8.2 Server-Sent Events (SSE)
Server-Sent Events(SSE)是一种轻量级的服务器推送技术,允许服务器通过HTTP连接主动向客户端发送事件流。与WebSocket不同,SSE设计用于单向服务器到客户端的通信,是实现实时更新的高效解决方案。
协议基础与核心特性
1. 与WebSocket对比
| 特性 | SSE | WebSocket |
|---|---|---|
| 通信方向 | 单向(服务器→客户端) | 双向 |
| 协议基础 | 基于HTTP | 独立协议 |
| 重连机制 | 内置自动重连 | 需手动实现 |
| 数据格式 | 文本(UTF-8) | 文本/二进制 |
| 浏览器支持 | 除IE外的所有现代浏览器 | 所有现代浏览器 |
| 适用场景 | 通知、日志流、状态更新 | 聊天、游戏、实时协作 |
2. 事件流格式示例
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
event: status
data: {"userCount": 42}
data: 这是一条多行
data: 消息示例
id: 12345
retry: 5000
: 这是注释行
客户端API实现
1. 基础事件监听
const eventSource = new EventSource('/api/updates');
// 通用消息处理
eventSource.onmessage = (event) => {
console.log('收到消息:', event.data);
updateUI(JSON.parse(event.data));
};
// 特定事件类型处理
eventSource.addEventListener('status', (event) => {
const data = JSON.parse(event.data);
document.getElementById('userCount').textContent = data.userCount;
});
// 错误处理
eventSource.onerror = (error) => {
console.error('SSE连接错误:', error);
if (eventSource.readyState === EventSource.CLOSED) {
console.log('连接已关闭,尝试重连...');
}
};
2. 高级连接管理
class SSEClient {
constructor(url) {
this.url = url;
this.eventSource = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.connect();
}
connect() {
this.eventSource = new EventSource(this.url);
this.eventSource.onopen = () => {
this.reconnectAttempts = 0;
console.log('SSE连接已建立');
};
this.eventSource.onerror = () => {
if (this.eventSource.readyState === EventSource.CLOSED) {
this.handleDisconnection();
}
};
// 注册自定义处理器
this.eventSource.addEventListener('update', this.handleUpdate.bind(this));
}
handleDisconnection() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
const delay = Math.min(1000 * 2 ** this.reconnectAttempts, 30000);
this.reconnectAttempts++;
console.log(`将在 ${delay}ms 后尝试重连...`);
setTimeout(() => this.connect(), delay);
} else {
console.error('达到最大重连次数');
}
}
close() {
this.eventSource.close();
console.log('SSE连接已主动关闭');
}
handleUpdate(event) {
try {
const data = JSON.parse(event.data);
console.log('处理更新:', data);
// 更新逻辑...
} catch (error) {
console.error('消息解析失败:', error);
}
}
}
// 使用示例
const sseClient = new SSEClient('/api/real-time');
服务器端实现
1. Node.js Express实现
const express = require('express');
const app = express();
app.get('/updates', (req, res) => {
// 设置SSE专用头部
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
// 发送初始数据
sendEvent(res, 'init', { message: '连接已建立', timestamp: Date.now() });
// 定时发送更新
const intervalId = setInterval(() => {
const data = {
userCount: Math.floor(Math.random() * 100),
serverTime: new Date().toISOString()
};
sendEvent(res, 'status', data);
}, 3000);
// 客户端断开时清理
req.on('close', () => {
clearInterval(intervalId);
console.log('客户端断开连接');
res.end();
});
});
function sendEvent(res, event, data) {
const eventData = `event: ${event}\n` +
`data: ${JSON.stringify(data)}\n` +
`id: ${Date.now()}\n\n`;
res.write(eventData);
}
app.listen(3000, () => {
console.log('SSE服务运行在 http://localhost:3000');
});
2. 消息格式生成工具
class SSEMessage {
constructor(event, data, options = {}) {
this.event = event;
this.data = data;
this.id = options.id || Date.now();
this.retry = options.retry;
this.comment = options.comment;
}
toString() {
let msg = '';
if (this.comment) {
msg += `: ${this.comment}\n`;
}
if (this.event) {
msg += `event: ${this.event}\n`;
}
if (Array.isArray(this.data)) {
this.data.forEach(line => {
msg += `data: ${line}\n`;
});
} else {
msg += `data: ${this.data}\n`;
}
if (this.id) {
msg += `id: ${this.id}\n`;
}
if (this.retry) {
msg += `retry: ${this.retry}\n`;
}
return msg + '\n';
}
}
// 使用示例
const message = new SSEMessage('update', { value: 42 }, { retry: 5000 });
res.write(message.toString());
高级应用模式
1. 事件源分区
// 客户端订阅特定频道
const userEventSource = new EventSource('/api/events?channel=user_updates');
const systemEventSource = new EventSource('/api/events?channel=system_alerts');
// 服务器端实现
app.get('/events', (req, res) => {
const channel = req.query.channel || 'default';
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
// 基于频道的消息分发
const sendChannelMessage = (msg) => {
if (msg.channel === channel) {
res.write(new SSEMessage(msg.type, msg.data).toString());
}
};
// 模拟消息发布
setInterval(() => {
const messages = [
{ channel: 'user_updates', type: 'profile', data: { userId: 123 } },
{ channel: 'system_alerts', type: 'maintenance', data: { start: '2023-12-01T00:00:00Z' } }
];
messages.forEach(sendChannelMessage);
}, 5000);
});
2. 历史消息重放
app.get('/events-with-history', async (req, res) => {
// 1. 设置SSE头部
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
// 2. 从数据库获取最近10条消息
const recentMessages = await Message.find()
.sort({ createdAt: -1 })
.limit(10);
// 3. 发送历史消息(从旧到新)
recentMessages.reverse().forEach(msg => {
res.write(new SSEMessage(msg.type, msg.data, { id: msg._id }).toString());
});
// 4. 实时新消息
const listener = (msg) => {
res.write(new SSEMessage(msg.type, msg.data, { id: msg._id }).toString());
};
Message.on('new', listener);
// 5. 清理
req.on('close', () => {
Message.off('new', listener);
});
});
性能优化与调试
1. 连接复用策略
// 共享同一个EventSource连接
const globalEventSource = (function() {
let instance;
return function() {
if (!instance) {
instance = new EventSource('/api/events');
instance.onerror = () => {
instance = null; // 出错时重置
};
}
return instance;
};
})();
// 各模块注册自己的处理器
function setupNotificationHandler() {
const es = globalEventSource();
es.addEventListener('notification', handleNotification);
}
2. 消息压缩技术
// 服务器端发送压缩消息
function sendCompressed(res, event, data) {
const jsonStr = JSON.stringify(data);
const compressed = zlib.deflateSync(jsonStr).toString('base64');
const msg = new SSEMessage(event, {
_compressed: true,
payload: compressed
});
res.write(msg.toString());
}
// 客户端解压处理
eventSource.addEventListener('compressed-data', (event) => {
const { _compressed, payload } = JSON.parse(event.data);
if (_compressed) {
const jsonStr = pako.inflate(atob(payload), { to: 'string' });
const data = JSON.parse(jsonStr);
processData(data);
}
});
3. Chrome调试技巧
// 在开发者工具中监控EventSource
function monitorEventSource() {
const original = window.EventSource;
window.EventSource = function(url, config) {
console.log('创建EventSource:', url);
const es = new original(url, config);
es.addEventListener('open', () => {
console.log('SSE连接已打开');
});
es.addEventListener('error', (e) => {
console.error('SSE错误:', e);
});
return es;
};
}
// 调用一次开启监控
monitorEventSource();
安全最佳实践
1. 认证与授权
// 携带认证令牌
const eventSource = new EventSource('/api/events', {
withCredentials: true // 发送cookie
});
// 或使用URL参数
const token = 'user-auth-token';
const es = new EventSource(`/api/events?token=${encodeURIComponent(token)}`);
// 服务器端验证
app.get('/events', authenticate, (req, res) => {
if (!req.user) {
res.writeHead(401);
return res.end();
}
// 正常SSE逻辑...
});
2. 输入输出过滤
// 安全的SSE消息生成
function createSafeMessage(event, data) {
// 过滤事件类型
const safeEvent = event.replace(/[^\w-]/g, '');
// 净化数据
const safeData = {
message: sanitize(data.message), // 实现XSS过滤
timestamp: Date.now()
};
return new SSEMessage(safeEvent, safeData);
}
// 使用
res.write(createSafeMessage('user_message', userInput).toString());
实际应用案例
1. 实时股票行情展示
// 前端实现
class StockTicker {
constructor() {
this.sse = new EventSource('/api/stocks');
this.stocks = {};
this.sse.addEventListener('price_update', (e) => {
const { symbol, price, change } = JSON.parse(e.data);
this.updateStock(symbol, price, change);
});
}
updateStock(symbol, price, change) {
this.stocks[symbol] = { price, change };
const element = document.querySelector(`[data-stock="${symbol}"]`);
if (element) {
element.textContent = `${price.toFixed(2)} (${change > 0 ? '+' : ''}${change.toFixed(2)}%)`;
element.style.color = change >= 0 ? 'green' : 'red';
}
}
}
// 后端模拟数据
app.get('/stocks', (req, res) => {
setupSSEHeaders(res);
const symbols = ['AAPL', 'GOOGL', 'MSFT', 'AMZN'];
setInterval(() => {
symbols.forEach(symbol => {
const price = 100 + Math.random() * 50;
const change = (Math.random() - 0.5) * 5;
res.write(new SSEMessage('price_update', {
symbol,
price,
change
}).toString());
});
}, 1000);
});
2. 后台任务进度监控
// 前端进度显示
const progressSource = new EventSource('/api/task/progress');
progressSource.addEventListener('progress', (e) => {
const { taskId, progress, status } = JSON.parse(e.data);
const progressBar = document.getElementById(`task-${taskId}-progress`);
if (progressBar) {
progressBar.value = progress;
progressBar.nextElementSibling.textContent = `${progress}% ${status}`;
if (progress === 100) {
progressSource.close();
}
}
});
// 后端任务处理
app.post('/long-task', (req, res) => {
const taskId = generateTaskId();
// 立即返回任务ID
res.json({ taskId });
// 在后台执行任务
executeLongTask(taskId, (progress, status) => {
// 通过SSE发送进度更新
sseClients.forEach(client => {
client.write(new SSEMessage('progress', {
taskId,
progress,
status
}).toString());
});
});
});
Server-Sent Events为Web应用提供了一种简单高效的服务器推送机制,特别适合以下场景:
- 实时通知系统
- 数据仪表盘更新
- 后台任务进度报告
- 实时日志流监控
相比WebSocket,SSE的优势在于:
- 更简单的实现(基于HTTP)
- 内置重连机制
- 不需要额外协议
- 更好的浏览器兼容性
开发时需要注意:
- 合理设置
retry时间(建议2-30秒) - 处理连接中断和错误恢复
- 对敏感数据进行适当过滤
- 考虑连接数限制(浏览器通常限制每个源6个SSE连接)
#前端开发
分享于 2025-05-20
上一篇:8.1 WebSocket 基础
下一篇:8.3 WebRTC 简介