19.1 WebSocket 通信
19.1 WebSocket 通信
实时聊天应用的核心在于双向通信能力,WebSocket 提供了全双工通信通道,本节将深入讲解如何在现代前端应用中实现健壮的WebSocket集成。
WebSocket 基础架构
1. 客户端基础实现
class ChatSocket {
constructor(url) {
this.socket = null;
this.url = url;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectDelay = 1000;
this.messageQueue = [];
this.eventListeners = {};
this.heartbeatInterval = 30000; // 30秒心跳
this.heartbeatTimer = null;
}
connect() {
this.socket = new WebSocket(this.url);
this.socket.onopen = () => {
this.reconnectAttempts = 0;
this.setupHeartbeat();
this.flushMessageQueue();
this.emit('connected');
};
this.socket.onmessage = (event) => {
this.handleIncomingMessage(event.data);
};
this.socket.onclose = (event) => {
this.clearHeartbeat();
this.emit('disconnected', event);
this.handleReconnect();
};
this.socket.onerror = (error) => {
this.emit('error', error);
};
}
handleIncomingMessage(data) {
try {
const message = JSON.parse(data);
if (message.type === 'pong') {
return; // 忽略心跳响应
}
this.emit('message', message);
} catch (err) {
console.error('消息解析失败:', err);
}
}
}
核心功能实现
2. 自动重连机制
class ChatSocket {
// ...其他代码
handleReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts);
this.emit('reconnecting', {
attempt: this.reconnectAttempts,
delay
});
setTimeout(() => {
this.connect();
}, delay);
} else {
this.emit('reconnect_failed');
}
}
destroy() {
this.maxReconnectAttempts = 0;
this.socket?.close();
this.clearHeartbeat();
}
}
3. 心跳检测
class ChatSocket {
// ...其他代码
setupHeartbeat() {
this.clearHeartbeat();
this.heartbeatTimer = setInterval(() => {
if (this.socket?.readyState === WebSocket.OPEN) {
this.send({ type: 'ping' });
}
}, this.heartbeatInterval);
}
clearHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
}
消息管理
4. 消息队列与发送
class ChatSocket {
// ...其他代码
send(message) {
const payload = JSON.stringify(message);
if (this.socket?.readyState === WebSocket.OPEN) {
this.socket.send(payload);
} else {
this.messageQueue.push(payload);
if (this.reconnectAttempts === 0) {
this.connect();
}
}
}
flushMessageQueue() {
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift();
this.socket?.send(message);
}
}
}
5. 事件系统
class ChatSocket {
// ...其他代码
on(event, callback) {
if (!this.eventListeners[event]) {
this.eventListeners[event] = [];
}
this.eventListeners[event].push(callback);
}
off(event, callback) {
if (this.eventListeners[event]) {
this.eventListeners[event] = this.eventListeners[event]
.filter(cb => cb !== callback);
}
}
emit(event, ...args) {
const listeners = this.eventListeners[event];
if (listeners) {
listeners.forEach(callback => {
try {
callback(...args);
} catch (err) {
console.error(`事件处理错误 ${event}:`, err);
}
});
}
}
}
与React集成
6. 自定义Hook实现
import { useEffect, useRef, useState } from 'react';
export function useWebSocket(url) {
const socketRef = useRef(null);
const [isConnected, setIsConnected] = useState(false);
const [messages, setMessages] = useState([]);
useEffect(() => {
const socket = new ChatSocket(url);
socketRef.current = socket;
socket.on('connected', () => {
setIsConnected(true);
});
socket.on('disconnected', () => {
setIsConnected(false);
});
socket.on('message', (message) => {
setMessages(prev => [...prev, message]);
});
socket.connect();
return () => {
socket.destroy();
};
}, [url]);
const sendMessage = (content) => {
if (socketRef.current) {
socketRef.current.send({
type: 'chat_message',
content,
timestamp: Date.now()
});
}
};
return { isConnected, messages, sendMessage };
}
安全增强
7. 认证与消息验证
class SecureChatSocket extends ChatSocket {
constructor(url, authToken) {
super(url);
this.authToken = authToken;
}
connect() {
super.connect();
this.on('connected', () => {
this.send({ type: 'authenticate', token: this.authToken });
});
}
handleIncomingMessage(data) {
try {
const message = JSON.parse(data);
// 验证消息签名
if (message.signature !== this.calculateSignature(message)) {
throw new Error('Invalid message signature');
}
super.handleIncomingMessage(data);
} catch (err) {
console.error('安全验证失败:', err);
this.emit('security_error', err);
}
}
calculateSignature(message) {
// 实现实际的签名验证逻辑
return crypto.subtle.digest(
'SHA-256',
new TextEncoder().encode(JSON.stringify(message) + this.authToken)
);
}
}
性能优化
8. 消息压缩与批处理
class OptimizedChatSocket extends ChatSocket {
constructor(url) {
super(url);
this.batchInterval = 100; // 100ms批处理窗口
this.batchTimer = null;
this.batchBuffer = [];
}
send(message) {
this.batchBuffer.push(message);
if (!this.batchTimer) {
this.batchTimer = setTimeout(() => {
this.flushBatch();
}, this.batchInterval);
}
}
flushBatch() {
if (this.batchBuffer.length > 0) {
const compressed = this.compressMessages(this.batchBuffer);
super.send(compressed);
this.batchBuffer = [];
}
this.batchTimer = null;
}
compressMessages(messages) {
// 实现实际压缩逻辑(如使用pako)
return {
type: 'batch',
payload: messages,
compressed: true
};
}
}
错误恢复策略
9. 消息确认与重发
class ReliableChatSocket extends ChatSocket {
constructor(url) {
super(url);
this.pendingMessages = new Map();
this.sequenceNumber = 0;
}
send(message) {
const seq = this.sequenceNumber++;
const messageWithSeq = { ...message, seq };
this.pendingMessages.set(seq, {
message: messageWithSeq,
timestamp: Date.now(),
retries: 0
});
super.send(messageWithSeq);
this.startAckCheck();
}
handleIncomingMessage(data) {
const message = JSON.parse(data);
if (message.type === 'ack') {
this.pendingMessages.delete(message.seq);
return;
}
super.handleIncomingMessage(data);
// 发送确认回执
if (message.seq !== undefined) {
this.send({ type: 'ack', seq: message.seq });
}
}
startAckCheck() {
const now = Date.now();
const timeout = 3000; // 3秒超时
this.pendingMessages.forEach((item, seq) => {
if (now - item.timestamp > timeout && item.retries < 3) {
item.retries++;
item.timestamp = now;
super.send(item.message);
} else if (item.retries >= 3) {
this.pendingMessages.delete(seq);
this.emit('message_failed', item.message);
}
});
if (this.pendingMessages.size > 0) {
setTimeout(() => this.startAckCheck(), 1000);
}
}
}
完整应用示例
// App.jsx
import { useWebSocket } from './useWebSocket';
function ChatApp() {
const [input, setInput] = useState('');
const { isConnected, messages, sendMessage } = useWebSocket(
'wss://api.example.com/chat'
);
const handleSubmit = (e) => {
e.preventDefault();
if (input.trim()) {
sendMessage(input);
setInput('');
}
};
return (
<div className="chat-container">
<div className="status">
{isConnected ? '🟢 已连接' : '🔴 连接中...'}
</div>
<div className="messages">
{messages.map((msg, i) => (
<div key={i} className="message">
{msg.content}
</div>
))}
</div>
<form onSubmit={handleSubmit}>
<input
type="text"
value={input}
onChange={(e) => setInput(e.target.value)}
disabled={!isConnected}
/>
<button type="submit" disabled={!isConnected}>
发送
</button>
</form>
</div>
);
}
服务端配合建议
Node.js WebSocket服务器示例
// server.js
import WebSocket, { WebSocketServer } from 'ws';
import { createServer } from 'http';
const server = createServer();
const wss = new WebSocketServer({ server });
const clients = new Map();
wss.on('connection', (ws, req) => {
const clientId = req.headers['sec-websocket-key'];
clients.set(clientId, ws);
ws.on('message', (data) => {
try {
const message = JSON.parse(data);
// 广播消息给所有客户端
clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({
...message,
timestamp: Date.now()
}));
}
});
} catch (err) {
console.error('消息处理错误:', err);
}
});
ws.on('close', () => {
clients.delete(clientId);
});
});
server.listen(8080, () => {
console.log('WebSocket 服务器运行在 ws://localhost:8080');
});
调试技巧
-
浏览器开发者工具:
- Chrome → Network → WS 选项卡查看WebSocket流量
- 过滤
WebSocket帧内容
-
消息日志:
// 在ChatSocket类中添加
logMessage(direction, message) {
console.log(`[WS ${direction}]`, {
timestamp: new Date().toISOString(),
message: message.length > 100
? `${message.substring(0, 100)}...`
: message
});
}
// 在send和handleIncomingMessage中调用
this.logMessage('outgoing', payload);
this.logMessage('incoming', data);
- 模拟延迟:
// 开发环境模拟网络延迟
if (process.env.NODE_ENV === 'development') {
const originalSend = WebSocket.prototype.send;
WebSocket.prototype.send = function(data) {
setTimeout(() => {
originalSend.call(this, data);
}, Math.random() * 1000);
};
}
通过以上实现,你已经可以构建一个功能完整、健壮的WebSocket通信系统。接下来我们将学习如何集成前端框架构建聊天界面。
#前端开发
分享于 2025-03-25