19.1 WebSocket 通信

19.1 WebSocket 通信

实时聊天应用的核心在于双向通信能力,WebSocket 提供了全双工通信通道,本节将深入讲解如何在现代前端应用中实现健壮的WebSocket集成。

WebSocket 基础架构

1. 客户端基础实现

class ChatSocket {
  constructor(url) {
    this.socket = null;
    this.url = url;
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = 5;
    this.reconnectDelay = 1000;
    this.messageQueue = [];
    this.eventListeners = {};
    this.heartbeatInterval = 30000; // 30秒心跳
    this.heartbeatTimer = null;
  }

  connect() {
    this.socket = new WebSocket(this.url);

    this.socket.onopen = () => {
      this.reconnectAttempts = 0;
      this.setupHeartbeat();
      this.flushMessageQueue();
      this.emit('connected');
    };

    this.socket.onmessage = (event) => {
      this.handleIncomingMessage(event.data);
    };

    this.socket.onclose = (event) => {
      this.clearHeartbeat();
      this.emit('disconnected', event);
      this.handleReconnect();
    };

    this.socket.onerror = (error) => {
      this.emit('error', error);
    };
  }

  handleIncomingMessage(data) {
    try {
      const message = JSON.parse(data);
      if (message.type === 'pong') {
        return; // 忽略心跳响应
      }
      this.emit('message', message);
    } catch (err) {
      console.error('消息解析失败:', err);
    }
  }
}

核心功能实现

2. 自动重连机制

class ChatSocket {
  // ...其他代码

  handleReconnect() {
    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      this.reconnectAttempts++;
      const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts);
      
      this.emit('reconnecting', {
        attempt: this.reconnectAttempts,
        delay
      });

      setTimeout(() => {
        this.connect();
      }, delay);
    } else {
      this.emit('reconnect_failed');
    }
  }

  destroy() {
    this.maxReconnectAttempts = 0;
    this.socket?.close();
    this.clearHeartbeat();
  }
}

3. 心跳检测

class ChatSocket {
  // ...其他代码

  setupHeartbeat() {
    this.clearHeartbeat();
    this.heartbeatTimer = setInterval(() => {
      if (this.socket?.readyState === WebSocket.OPEN) {
        this.send({ type: 'ping' });
      }
    }, this.heartbeatInterval);
  }

  clearHeartbeat() {
    if (this.heartbeatTimer) {
      clearInterval(this.heartbeatTimer);
      this.heartbeatTimer = null;
    }
  }
}

消息管理

4. 消息队列与发送

class ChatSocket {
  // ...其他代码

  send(message) {
    const payload = JSON.stringify(message);
    
    if (this.socket?.readyState === WebSocket.OPEN) {
      this.socket.send(payload);
    } else {
      this.messageQueue.push(payload);
      if (this.reconnectAttempts === 0) {
        this.connect();
      }
    }
  }

  flushMessageQueue() {
    while (this.messageQueue.length > 0) {
      const message = this.messageQueue.shift();
      this.socket?.send(message);
    }
  }
}

5. 事件系统

class ChatSocket {
  // ...其他代码

  on(event, callback) {
    if (!this.eventListeners[event]) {
      this.eventListeners[event] = [];
    }
    this.eventListeners[event].push(callback);
  }

  off(event, callback) {
    if (this.eventListeners[event]) {
      this.eventListeners[event] = this.eventListeners[event]
        .filter(cb => cb !== callback);
    }
  }

  emit(event, ...args) {
    const listeners = this.eventListeners[event];
    if (listeners) {
      listeners.forEach(callback => {
        try {
          callback(...args);
        } catch (err) {
          console.error(`事件处理错误 ${event}:`, err);
        }
      });
    }
  }
}

与React集成

6. 自定义Hook实现

import { useEffect, useRef, useState } from 'react';

export function useWebSocket(url) {
  const socketRef = useRef(null);
  const [isConnected, setIsConnected] = useState(false);
  const [messages, setMessages] = useState([]);

  useEffect(() => {
    const socket = new ChatSocket(url);
    socketRef.current = socket;

    socket.on('connected', () => {
      setIsConnected(true);
    });

    socket.on('disconnected', () => {
      setIsConnected(false);
    });

    socket.on('message', (message) => {
      setMessages(prev => [...prev, message]);
    });

    socket.connect();

    return () => {
      socket.destroy();
    };
  }, [url]);

  const sendMessage = (content) => {
    if (socketRef.current) {
      socketRef.current.send({
        type: 'chat_message',
        content,
        timestamp: Date.now()
      });
    }
  };

  return { isConnected, messages, sendMessage };
}

安全增强

7. 认证与消息验证

class SecureChatSocket extends ChatSocket {
  constructor(url, authToken) {
    super(url);
    this.authToken = authToken;
  }

  connect() {
    super.connect();
    this.on('connected', () => {
      this.send({ type: 'authenticate', token: this.authToken });
    });
  }

  handleIncomingMessage(data) {
    try {
      const message = JSON.parse(data);
      
      // 验证消息签名
      if (message.signature !== this.calculateSignature(message)) {
        throw new Error('Invalid message signature');
      }
      
      super.handleIncomingMessage(data);
    } catch (err) {
      console.error('安全验证失败:', err);
      this.emit('security_error', err);
    }
  }

  calculateSignature(message) {
    // 实现实际的签名验证逻辑
    return crypto.subtle.digest(
      'SHA-256',
      new TextEncoder().encode(JSON.stringify(message) + this.authToken)
    );
  }
}

性能优化

8. 消息压缩与批处理

class OptimizedChatSocket extends ChatSocket {
  constructor(url) {
    super(url);
    this.batchInterval = 100; // 100ms批处理窗口
    this.batchTimer = null;
    this.batchBuffer = [];
  }

  send(message) {
    this.batchBuffer.push(message);
    
    if (!this.batchTimer) {
      this.batchTimer = setTimeout(() => {
        this.flushBatch();
      }, this.batchInterval);
    }
  }

  flushBatch() {
    if (this.batchBuffer.length > 0) {
      const compressed = this.compressMessages(this.batchBuffer);
      super.send(compressed);
      this.batchBuffer = [];
    }
    this.batchTimer = null;
  }

  compressMessages(messages) {
    // 实现实际压缩逻辑(如使用pako)
    return {
      type: 'batch',
      payload: messages,
      compressed: true
    };
  }
}

错误恢复策略

9. 消息确认与重发

class ReliableChatSocket extends ChatSocket {
  constructor(url) {
    super(url);
    this.pendingMessages = new Map();
    this.sequenceNumber = 0;
  }

  send(message) {
    const seq = this.sequenceNumber++;
    const messageWithSeq = { ...message, seq };
    
    this.pendingMessages.set(seq, {
      message: messageWithSeq,
      timestamp: Date.now(),
      retries: 0
    });
    
    super.send(messageWithSeq);
    this.startAckCheck();
  }

  handleIncomingMessage(data) {
    const message = JSON.parse(data);
    
    if (message.type === 'ack') {
      this.pendingMessages.delete(message.seq);
      return;
    }
    
    super.handleIncomingMessage(data);
    
    // 发送确认回执
    if (message.seq !== undefined) {
      this.send({ type: 'ack', seq: message.seq });
    }
  }

  startAckCheck() {
    const now = Date.now();
    const timeout = 3000; // 3秒超时
    
    this.pendingMessages.forEach((item, seq) => {
      if (now - item.timestamp > timeout && item.retries < 3) {
        item.retries++;
        item.timestamp = now;
        super.send(item.message);
      } else if (item.retries >= 3) {
        this.pendingMessages.delete(seq);
        this.emit('message_failed', item.message);
      }
    });
    
    if (this.pendingMessages.size > 0) {
      setTimeout(() => this.startAckCheck(), 1000);
    }
  }
}

完整应用示例

// App.jsx
import { useWebSocket } from './useWebSocket';

function ChatApp() {
  const [input, setInput] = useState('');
  const { isConnected, messages, sendMessage } = useWebSocket(
    'wss://api.example.com/chat'
  );

  const handleSubmit = (e) => {
    e.preventDefault();
    if (input.trim()) {
      sendMessage(input);
      setInput('');
    }
  };

  return (
    <div className="chat-container">
      <div className="status">
        {isConnected ? '🟢 已连接' : '🔴 连接中...'}
      </div>
      <div className="messages">
        {messages.map((msg, i) => (
          <div key={i} className="message">
            {msg.content}
          </div>
        ))}
      </div>
      <form onSubmit={handleSubmit}>
        <input
          type="text"
          value={input}
          onChange={(e) => setInput(e.target.value)}
          disabled={!isConnected}
        />
        <button type="submit" disabled={!isConnected}>
          发送
        </button>
      </form>
    </div>
  );
}

服务端配合建议

Node.js WebSocket服务器示例

// server.js
import WebSocket, { WebSocketServer } from 'ws';
import { createServer } from 'http';

const server = createServer();
const wss = new WebSocketServer({ server });

const clients = new Map();

wss.on('connection', (ws, req) => {
  const clientId = req.headers['sec-websocket-key'];
  clients.set(clientId, ws);
  
  ws.on('message', (data) => {
    try {
      const message = JSON.parse(data);
      
      // 广播消息给所有客户端
      clients.forEach((client) => {
        if (client.readyState === WebSocket.OPEN) {
          client.send(JSON.stringify({
            ...message,
            timestamp: Date.now()
          }));
        }
      });
    } catch (err) {
      console.error('消息处理错误:', err);
    }
  });

  ws.on('close', () => {
    clients.delete(clientId);
  });
});

server.listen(8080, () => {
  console.log('WebSocket 服务器运行在 ws://localhost:8080');
});

调试技巧

  1. 浏览器开发者工具

    • Chrome → Network → WS 选项卡查看WebSocket流量
    • 过滤WebSocket帧内容
  2. 消息日志

// 在ChatSocket类中添加
logMessage(direction, message) {
  console.log(`[WS ${direction}]`, {
    timestamp: new Date().toISOString(),
    message: message.length > 100 
      ? `${message.substring(0, 100)}...` 
      : message
  });
}

// 在send和handleIncomingMessage中调用
this.logMessage('outgoing', payload);
this.logMessage('incoming', data);
  1. 模拟延迟
// 开发环境模拟网络延迟
if (process.env.NODE_ENV === 'development') {
  const originalSend = WebSocket.prototype.send;
  WebSocket.prototype.send = function(data) {
    setTimeout(() => {
      originalSend.call(this, data);
    }, Math.random() * 1000);
  };
}

通过以上实现,你已经可以构建一个功能完整、健壮的WebSocket通信系统。接下来我们将学习如何集成前端框架构建聊天界面。

#前端开发 分享于 2025-03-25

【 内容由 AI 共享,不代表本站观点,请谨慎参考 】