Appearance
WebSocket 实时通信源代码导览
ws 库核心组件
ws 库的源代码结构如下:
ws/
├── lib/
│ ├── receiver.js # 消息接收器
│ ├── sender.js # 消息发送器
│ ├── websocket.js # WebSocket 类
│ └── server.js # WebSocket 服务器
├── test/ # 测试文件
└── package.jsonWebSocket 服务器源代码
服务器实现
javascript
// server.js 核心逻辑
const WebSocket = require('ws');
class WebSocketServer {
constructor(options = {}) {
this.wss = new WebSocket.Server({
port: options.port || 8080,
path: options.path || '/'
});
this.clients = new Map();
this.rooms = new Map();
this.setupEventHandlers();
}
setupEventHandlers() {
// 连接事件
this.wss.on('connection', (ws, req) => {
this.handleConnection(ws, req);
});
// 错误事件
this.wss.on('error', (error) => {
console.error('WebSocket server error:', error);
});
}
handleConnection(ws, req) {
// 生成客户端 ID
const clientId = this.generateClientId();
// 存储客户端
this.clients.set(clientId, {
ws,
id: clientId,
rooms: new Set(),
metadata: this.extractMetadata(req)
});
// 设置客户端事件处理器
this.setupClientHandlers(ws, clientId);
// 发送欢迎消息
ws.send(JSON.stringify({
type: 'connected',
clientId: clientId
}));
console.log(`Client connected: ${clientId}`);
}
setupClientHandlers(ws, clientId) {
// 消息事件
ws.on('message', (data) => {
this.handleMessage(clientId, data);
});
// 关闭事件
ws.on('close', () => {
this.handleDisconnection(clientId);
});
// 错误事件
ws.on('error', (error) => {
console.error(`Client error (${clientId}):`, error);
});
}
handleMessage(clientId, data) {
try {
const message = JSON.parse(data);
switch (message.type) {
case 'join':
this.handleJoinRoom(clientId, message.room);
break;
case 'leave':
this.handleLeaveRoom(clientId, message.room);
break;
case 'message':
this.handleRoomMessage(clientId, message);
break;
case 'broadcast':
this.handleBroadcast(clientId, message);
break;
default:
console.warn(`Unknown message type: ${message.type}`);
}
} catch (error) {
console.error('Error handling message:', error);
}
}
handleJoinRoom(clientId, roomName) {
const client = this.clients.get(clientId);
if (!client) return;
// 创建房间(如果不存在)
if (!this.rooms.has(roomName)) {
this.rooms.set(roomName, new Set());
}
// 添加客户端到房间
this.rooms.get(roomName).add(clientId);
client.rooms.add(roomName);
// 通知客户端
client.ws.send(JSON.stringify({
type: 'joined',
room: roomName
}));
// 通知房间内其他客户端
this.broadcastToRoom(roomName, {
type: 'user_joined',
clientId: clientId,
room: roomName
}, clientId);
console.log(`Client ${clientId} joined room ${roomName}`);
}
handleLeaveRoom(clientId, roomName) {
const client = this.clients.get(clientId);
if (!client) return;
// 从房间移除客户端
if (this.rooms.has(roomName)) {
this.rooms.get(roomName).delete(clientId);
// 如果房间为空,删除房间
if (this.rooms.get(roomName).size === 0) {
this.rooms.delete(roomName);
}
}
client.rooms.delete(roomName);
// 通知客户端
client.ws.send(JSON.stringify({
type: 'left',
room: roomName
}));
// 通知房间内其他客户端
this.broadcastToRoom(roomName, {
type: 'user_left',
clientId: clientId,
room: roomName
}, clientId);
console.log(`Client ${clientId} left room ${roomName}`);
}
handleRoomMessage(clientId, message) {
const client = this.clients.get(clientId);
if (!client || !client.rooms.has(message.room)) {
return;
}
// 广播消息到房间
this.broadcastToRoom(message.room, {
type: 'message',
clientId: clientId,
room: message.room,
content: message.content,
timestamp: Date.now()
});
}
handleBroadcast(clientId, message) {
// 广播消息到所有客户端
this.broadcast({
type: 'broadcast',
clientId: clientId,
content: message.content,
timestamp: Date.now()
});
}
handleDisconnection(clientId) {
const client = this.clients.get(clientId);
if (!client) return;
// 从所有房间移除客户端
client.rooms.forEach(roomName => {
if (this.rooms.has(roomName)) {
this.rooms.get(roomName).delete(clientId);
if (this.rooms.get(roomName).size === 0) {
this.rooms.delete(roomName);
}
}
});
// 删除客户端
this.clients.delete(clientId);
console.log(`Client disconnected: ${clientId}`);
}
broadcast(message, excludeClientId = null) {
this.clients.forEach((client, clientId) => {
if (clientId !== excludeClientId && client.ws.readyState === WebSocket.OPEN) {
client.ws.send(JSON.stringify(message));
}
});
}
broadcastToRoom(roomName, message, excludeClientId = null) {
const room = this.rooms.get(roomName);
if (!room) return;
room.forEach(clientId => {
if (clientId !== excludeClientId) {
const client = this.clients.get(clientId);
if (client && client.ws.readyState === WebSocket.OPEN) {
client.ws.send(JSON.stringify(message));
}
}
});
}
generateClientId() {
return `client_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
extractMetadata(req) {
return {
ip: req.socket.remoteAddress,
userAgent: req.headers['user-agent'],
connectedAt: Date.now()
};
}
}
module.exports = WebSocketServer;WebSocket 客户端源代码
客户端实现
javascript
// client.js 核心逻辑
class WebSocketClient {
constructor(url) {
this.url = url;
this.ws = null;
this.clientId = null;
this.rooms = new Set();
this.eventHandlers = new Map();
}
connect() {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('Connected to WebSocket server');
resolve();
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
reject(error);
};
this.ws.onmessage = (event) => {
this.handleMessage(event.data);
};
this.ws.onclose = () => {
console.log('Disconnected from WebSocket server');
this.emit('disconnected');
};
});
}
handleMessage(data) {
try {
const message = JSON.parse(data);
switch (message.type) {
case 'connected':
this.clientId = message.clientId;
this.emit('connected', message);
break;
case 'joined':
this.rooms.add(message.room);
this.emit('joined', message);
break;
case 'left':
this.rooms.delete(message.room);
this.emit('left', message);
break;
case 'message':
this.emit('message', message);
break;
case 'broadcast':
this.emit('broadcast', message);
break;
case 'user_joined':
this.emit('user_joined', message);
break;
case 'user_left':
this.emit('user_left', message);
break;
default:
console.warn(`Unknown message type: ${message.type}`);
}
} catch (error) {
console.error('Error handling message:', error);
}
}
joinRoom(roomName) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({
type: 'join',
room: roomName
}));
}
}
leaveRoom(roomName) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({
type: 'leave',
room: roomName
}));
}
}
sendMessage(roomName, content) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({
type: 'message',
room: roomName,
content: content
}));
}
}
broadcast(content) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({
type: 'broadcast',
content: content
}));
}
}
on(event, handler) {
if (!this.eventHandlers.has(event)) {
this.eventHandlers.set(event, []);
}
this.eventHandlers.get(event).push(handler);
}
emit(event, data) {
const handlers = this.eventHandlers.get(event);
if (handlers) {
handlers.forEach(handler => handler(data));
}
}
disconnect() {
if (this.ws) {
this.ws.close();
}
}
}
module.exports = WebSocketClient;总结
WebSocket 的源代码展示了实时通信的核心机制。理解 WebSocket 服务器和客户端的实现,有助于我们更好地实现和扩展实时通信功能。
