Skip to content

微服务架构源代码导览

服务注册与发现

服务注册

javascript
// service-registry.js 核心逻辑
class ServiceRegistry {
  constructor() {
    this.services = new Map();
  }

  register(service) {
    const { name, url, metadata } = service;
    
    if (!this.services.has(name)) {
      this.services.set(name, []);
    }
    
    const instances = this.services.get(name);
    const instance = {
      url,
      metadata,
      registeredAt: Date.now(),
      lastHeartbeat: Date.now()
    };
    
    instances.push(instance);
    return instance;
  }

  deregister(name, url) {
    if (!this.services.has(name)) {
      return false;
    }
    
    const instances = this.services.get(name);
    const index = instances.findIndex(instance => instance.url === url);
    
    if (index !== -1) {
      instances.splice(index, 1);
      return true;
    }
    
    return false;
  }

  discover(name) {
    if (!this.services.has(name)) {
      return [];
    }
    
    const instances = this.services.get(name);
    const now = Date.now();
    
    // 过滤掉过期的实例
    const validInstances = instances.filter(instance => {
      const timeSinceHeartbeat = now - instance.lastHeartbeat;
      return timeSinceHeartbeat < 30000; // 30 秒心跳超时
    });
    
    return validInstances;
  }

  heartbeat(name, url) {
    if (!this.services.has(name)) {
      return false;
    }
    
    const instances = this.services.get(name);
    const instance = instances.find(instance => instance.url === url);
    
    if (instance) {
      instance.lastHeartbeat = Date.now();
      return true;
    }
    
    return false;
  }
}

服务发现

javascript
// service-discovery.js 核心逻辑
class ServiceDiscovery {
  constructor(registryUrl) {
    this.registryUrl = registryUrl;
    this.cache = new Map();
    this.cacheTimeout = 5000; // 5 秒缓存
  }

  async discover(serviceName) {
    // 检查缓存
    const cached = this.cache.get(serviceName);
    if (cached && Date.now() - cached.timestamp < this.cacheTimeout) {
      return cached.instances;
    }

    // 从注册中心获取服务实例
    const instances = await this.fetchInstances(serviceName);
    
    // 更新缓存
    this.cache.set(serviceName, {
      instances,
      timestamp: Date.now()
    });
    
    return instances;
  }

  async fetchInstances(serviceName) {
    const response = await fetch(`${this.registryUrl}/services/${serviceName}`);
    const data = await response.json();
    return data.instances;
  }

  async selectInstance(serviceName, strategy = 'round-robin') {
    const instances = await this.discover(serviceName);
    
    if (instances.length === 0) {
      throw new Error(`No instances found for service: ${serviceName}`);
    }
    
    switch (strategy) {
      case 'round-robin':
        return this.roundRobin(instances);
      case 'random':
        return this.random(instances);
      case 'least-connections':
        return this.leastConnections(instances);
      default:
        return instances[0];
    }
  }

  roundRobin(instances) {
    const index = this.getRoundRobinIndex(instances.length);
    return instances[index];
  }

  random(instances) {
    const index = Math.floor(Math.random() * instances.length);
    return instances[index];
  }

  leastConnections(instances) {
    // 简化实现,实际应该跟踪每个实例的连接数
    return instances[0];
  }

  getRoundRobinIndex(length) {
    if (!this.roundRobinCounter) {
      this.roundRobinCounter = 0;
    }
    
    const index = this.roundRobinCounter % length;
    this.roundRobinCounter++;
    
    return index;
  }
}

服务通信

HTTP 客户端

javascript
// http-client.js 核心逻辑
class HttpClient {
  constructor(serviceDiscovery) {
    this.serviceDiscovery = serviceDiscovery;
    this.requestTimeout = 5000;
  }

  async request(serviceName, options) {
    const instance = await this.serviceDiscovery.selectInstance(serviceName);
    const url = `${instance.url}${options.path}`;
    
    const response = await fetch(url, {
      method: options.method || 'GET',
      headers: options.headers || {},
      body: options.body ? JSON.stringify(options.body) : undefined,
      timeout: this.requestTimeout
    });
    
    if (!response.ok) {
      throw new Error(`HTTP error! status: ${response.status}`);
    }
    
    return response.json();
  }

  async get(serviceName, path, headers) {
    return this.request(serviceName, {
      method: 'GET',
      path,
      headers
    });
  }

  async post(serviceName, path, body, headers) {
    return this.request(serviceName, {
      method: 'POST',
      path,
      body,
      headers
    });
  }

  async put(serviceName, path, body, headers) {
    return this.request(serviceName, {
      method: 'PUT',
      path,
      body,
      headers
    });
  }

  async delete(serviceName, path, headers) {
    return this.request(serviceName, {
      method: 'DELETE',
      path,
      headers
    });
  }
}

配置管理

配置中心

javascript
// config-center.js 核心逻辑
class ConfigCenter {
  constructor(storage) {
    this.storage = storage;
    this.listeners = new Map();
  }

  async getConfig(serviceName, version = 'latest') {
    const config = await this.storage.get(serviceName, version);
    return config;
  }

  async setConfig(serviceName, config, version) {
    await this.storage.set(serviceName, version, config);
    
    // 通知监听器
    this.notifyListeners(serviceName, config, version);
  }

  subscribe(serviceName, callback) {
    if (!this.listeners.has(serviceName)) {
      this.listeners.set(serviceName, []);
    }
    
    this.listeners.get(serviceName).push(callback);
  }

  unsubscribe(serviceName, callback) {
    if (!this.listeners.has(serviceName)) {
      return;
    }
    
    const callbacks = this.listeners.get(serviceName);
    const index = callbacks.indexOf(callback);
    
    if (index !== -1) {
      callbacks.splice(index, 1);
    }
  }

  notifyListeners(serviceName, config, version) {
    const callbacks = this.listeners.get(serviceName);
    if (!callbacks) return;
    
    callbacks.forEach(callback => {
      callback(config, version);
    });
  }
}

总结

微服务架构的源代码展示了服务注册与发现、服务通信和配置管理的核心机制。理解这些核心组件的实现,有助于我们更好地构建微服务系统。