Skip to content

WebSocket 实时通信源代码导览

ws 库核心组件

ws 库的源代码结构如下:

ws/
├── lib/
│   ├── receiver.js       # 消息接收器
│   ├── sender.js         # 消息发送器
│   ├── websocket.js      # WebSocket 类
│   └── server.js         # WebSocket 服务器
├── test/               # 测试文件
└── package.json

WebSocket 服务器源代码

服务器实现

javascript
// server.js 核心逻辑
const WebSocket = require('ws');

class WebSocketServer {
  constructor(options = {}) {
    this.wss = new WebSocket.Server({
      port: options.port || 8080,
      path: options.path || '/'
    });

    this.clients = new Map();
    this.rooms = new Map();

    this.setupEventHandlers();
  }

  setupEventHandlers() {
    // 连接事件
    this.wss.on('connection', (ws, req) => {
      this.handleConnection(ws, req);
    });

    // 错误事件
    this.wss.on('error', (error) => {
      console.error('WebSocket server error:', error);
    });
  }

  handleConnection(ws, req) {
    // 生成客户端 ID
    const clientId = this.generateClientId();
    
    // 存储客户端
    this.clients.set(clientId, {
      ws,
      id: clientId,
      rooms: new Set(),
      metadata: this.extractMetadata(req)
    });

    // 设置客户端事件处理器
    this.setupClientHandlers(ws, clientId);

    // 发送欢迎消息
    ws.send(JSON.stringify({
      type: 'connected',
      clientId: clientId
    }));

    console.log(`Client connected: ${clientId}`);
  }

  setupClientHandlers(ws, clientId) {
    // 消息事件
    ws.on('message', (data) => {
      this.handleMessage(clientId, data);
    });

    // 关闭事件
    ws.on('close', () => {
      this.handleDisconnection(clientId);
    });

    // 错误事件
    ws.on('error', (error) => {
      console.error(`Client error (${clientId}):`, error);
    });
  }

  handleMessage(clientId, data) {
    try {
      const message = JSON.parse(data);
      
      switch (message.type) {
        case 'join':
          this.handleJoinRoom(clientId, message.room);
          break;
        case 'leave':
          this.handleLeaveRoom(clientId, message.room);
          break;
        case 'message':
          this.handleRoomMessage(clientId, message);
          break;
        case 'broadcast':
          this.handleBroadcast(clientId, message);
          break;
        default:
          console.warn(`Unknown message type: ${message.type}`);
      }
    } catch (error) {
      console.error('Error handling message:', error);
    }
  }

  handleJoinRoom(clientId, roomName) {
    const client = this.clients.get(clientId);
    if (!client) return;

    // 创建房间(如果不存在)
    if (!this.rooms.has(roomName)) {
      this.rooms.set(roomName, new Set());
    }

    // 添加客户端到房间
    this.rooms.get(roomName).add(clientId);
    client.rooms.add(roomName);

    // 通知客户端
    client.ws.send(JSON.stringify({
      type: 'joined',
      room: roomName
    }));

    // 通知房间内其他客户端
    this.broadcastToRoom(roomName, {
      type: 'user_joined',
      clientId: clientId,
      room: roomName
    }, clientId);

    console.log(`Client ${clientId} joined room ${roomName}`);
  }

  handleLeaveRoom(clientId, roomName) {
    const client = this.clients.get(clientId);
    if (!client) return;

    // 从房间移除客户端
    if (this.rooms.has(roomName)) {
      this.rooms.get(roomName).delete(clientId);
      
      // 如果房间为空,删除房间
      if (this.rooms.get(roomName).size === 0) {
        this.rooms.delete(roomName);
      }
    }

    client.rooms.delete(roomName);

    // 通知客户端
    client.ws.send(JSON.stringify({
      type: 'left',
      room: roomName
    }));

    // 通知房间内其他客户端
    this.broadcastToRoom(roomName, {
      type: 'user_left',
      clientId: clientId,
      room: roomName
    }, clientId);

    console.log(`Client ${clientId} left room ${roomName}`);
  }

  handleRoomMessage(clientId, message) {
    const client = this.clients.get(clientId);
    if (!client || !client.rooms.has(message.room)) {
      return;
    }

    // 广播消息到房间
    this.broadcastToRoom(message.room, {
      type: 'message',
      clientId: clientId,
      room: message.room,
      content: message.content,
      timestamp: Date.now()
    });
  }

  handleBroadcast(clientId, message) {
    // 广播消息到所有客户端
    this.broadcast({
      type: 'broadcast',
      clientId: clientId,
      content: message.content,
      timestamp: Date.now()
    });
  }

  handleDisconnection(clientId) {
    const client = this.clients.get(clientId);
    if (!client) return;

    // 从所有房间移除客户端
    client.rooms.forEach(roomName => {
      if (this.rooms.has(roomName)) {
        this.rooms.get(roomName).delete(clientId);
        
        if (this.rooms.get(roomName).size === 0) {
          this.rooms.delete(roomName);
        }
      }
    });

    // 删除客户端
    this.clients.delete(clientId);

    console.log(`Client disconnected: ${clientId}`);
  }

  broadcast(message, excludeClientId = null) {
    this.clients.forEach((client, clientId) => {
      if (clientId !== excludeClientId && client.ws.readyState === WebSocket.OPEN) {
        client.ws.send(JSON.stringify(message));
      }
    });
  }

  broadcastToRoom(roomName, message, excludeClientId = null) {
    const room = this.rooms.get(roomName);
    if (!room) return;

    room.forEach(clientId => {
      if (clientId !== excludeClientId) {
        const client = this.clients.get(clientId);
        if (client && client.ws.readyState === WebSocket.OPEN) {
          client.ws.send(JSON.stringify(message));
        }
      }
    });
  }

  generateClientId() {
    return `client_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }

  extractMetadata(req) {
    return {
      ip: req.socket.remoteAddress,
      userAgent: req.headers['user-agent'],
      connectedAt: Date.now()
    };
  }
}

module.exports = WebSocketServer;

WebSocket 客户端源代码

客户端实现

javascript
// client.js 核心逻辑
class WebSocketClient {
  constructor(url) {
    this.url = url;
    this.ws = null;
    this.clientId = null;
    this.rooms = new Set();
    this.eventHandlers = new Map();
  }

  connect() {
    return new Promise((resolve, reject) => {
      this.ws = new WebSocket(this.url);

      this.ws.onopen = () => {
        console.log('Connected to WebSocket server');
        resolve();
      };

      this.ws.onerror = (error) => {
        console.error('WebSocket error:', error);
        reject(error);
      };

      this.ws.onmessage = (event) => {
        this.handleMessage(event.data);
      };

      this.ws.onclose = () => {
        console.log('Disconnected from WebSocket server');
        this.emit('disconnected');
      };
    });
  }

  handleMessage(data) {
    try {
      const message = JSON.parse(data);
      
      switch (message.type) {
        case 'connected':
          this.clientId = message.clientId;
          this.emit('connected', message);
          break;
        case 'joined':
          this.rooms.add(message.room);
          this.emit('joined', message);
          break;
        case 'left':
          this.rooms.delete(message.room);
          this.emit('left', message);
          break;
        case 'message':
          this.emit('message', message);
          break;
        case 'broadcast':
          this.emit('broadcast', message);
          break;
        case 'user_joined':
          this.emit('user_joined', message);
          break;
        case 'user_left':
          this.emit('user_left', message);
          break;
        default:
          console.warn(`Unknown message type: ${message.type}`);
      }
    } catch (error) {
      console.error('Error handling message:', error);
    }
  }

  joinRoom(roomName) {
    if (this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify({
        type: 'join',
        room: roomName
      }));
    }
  }

  leaveRoom(roomName) {
    if (this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify({
        type: 'leave',
        room: roomName
      }));
    }
  }

  sendMessage(roomName, content) {
    if (this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify({
        type: 'message',
        room: roomName,
        content: content
      }));
    }
  }

  broadcast(content) {
    if (this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify({
        type: 'broadcast',
        content: content
      }));
    }
  }

  on(event, handler) {
    if (!this.eventHandlers.has(event)) {
      this.eventHandlers.set(event, []);
    }
    this.eventHandlers.get(event).push(handler);
  }

  emit(event, data) {
    const handlers = this.eventHandlers.get(event);
    if (handlers) {
      handlers.forEach(handler => handler(data));
    }
  }

  disconnect() {
    if (this.ws) {
      this.ws.close();
    }
  }
}

module.exports = WebSocketClient;

总结

WebSocket 的源代码展示了实时通信的核心机制。理解 WebSocket 服务器和客户端的实现,有助于我们更好地实现和扩展实时通信功能。