Appearance
微服务架构源代码导览
服务注册与发现
服务注册
javascript
// service-registry.js 核心逻辑
class ServiceRegistry {
constructor() {
this.services = new Map();
}
register(service) {
const { name, url, metadata } = service;
if (!this.services.has(name)) {
this.services.set(name, []);
}
const instances = this.services.get(name);
const instance = {
url,
metadata,
registeredAt: Date.now(),
lastHeartbeat: Date.now()
};
instances.push(instance);
return instance;
}
deregister(name, url) {
if (!this.services.has(name)) {
return false;
}
const instances = this.services.get(name);
const index = instances.findIndex(instance => instance.url === url);
if (index !== -1) {
instances.splice(index, 1);
return true;
}
return false;
}
discover(name) {
if (!this.services.has(name)) {
return [];
}
const instances = this.services.get(name);
const now = Date.now();
// 过滤掉过期的实例
const validInstances = instances.filter(instance => {
const timeSinceHeartbeat = now - instance.lastHeartbeat;
return timeSinceHeartbeat < 30000; // 30 秒心跳超时
});
return validInstances;
}
heartbeat(name, url) {
if (!this.services.has(name)) {
return false;
}
const instances = this.services.get(name);
const instance = instances.find(instance => instance.url === url);
if (instance) {
instance.lastHeartbeat = Date.now();
return true;
}
return false;
}
}服务发现
javascript
// service-discovery.js 核心逻辑
class ServiceDiscovery {
constructor(registryUrl) {
this.registryUrl = registryUrl;
this.cache = new Map();
this.cacheTimeout = 5000; // 5 秒缓存
}
async discover(serviceName) {
// 检查缓存
const cached = this.cache.get(serviceName);
if (cached && Date.now() - cached.timestamp < this.cacheTimeout) {
return cached.instances;
}
// 从注册中心获取服务实例
const instances = await this.fetchInstances(serviceName);
// 更新缓存
this.cache.set(serviceName, {
instances,
timestamp: Date.now()
});
return instances;
}
async fetchInstances(serviceName) {
const response = await fetch(`${this.registryUrl}/services/${serviceName}`);
const data = await response.json();
return data.instances;
}
async selectInstance(serviceName, strategy = 'round-robin') {
const instances = await this.discover(serviceName);
if (instances.length === 0) {
throw new Error(`No instances found for service: ${serviceName}`);
}
switch (strategy) {
case 'round-robin':
return this.roundRobin(instances);
case 'random':
return this.random(instances);
case 'least-connections':
return this.leastConnections(instances);
default:
return instances[0];
}
}
roundRobin(instances) {
const index = this.getRoundRobinIndex(instances.length);
return instances[index];
}
random(instances) {
const index = Math.floor(Math.random() * instances.length);
return instances[index];
}
leastConnections(instances) {
// 简化实现,实际应该跟踪每个实例的连接数
return instances[0];
}
getRoundRobinIndex(length) {
if (!this.roundRobinCounter) {
this.roundRobinCounter = 0;
}
const index = this.roundRobinCounter % length;
this.roundRobinCounter++;
return index;
}
}服务通信
HTTP 客户端
javascript
// http-client.js 核心逻辑
class HttpClient {
constructor(serviceDiscovery) {
this.serviceDiscovery = serviceDiscovery;
this.requestTimeout = 5000;
}
async request(serviceName, options) {
const instance = await this.serviceDiscovery.selectInstance(serviceName);
const url = `${instance.url}${options.path}`;
const response = await fetch(url, {
method: options.method || 'GET',
headers: options.headers || {},
body: options.body ? JSON.stringify(options.body) : undefined,
timeout: this.requestTimeout
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
return response.json();
}
async get(serviceName, path, headers) {
return this.request(serviceName, {
method: 'GET',
path,
headers
});
}
async post(serviceName, path, body, headers) {
return this.request(serviceName, {
method: 'POST',
path,
body,
headers
});
}
async put(serviceName, path, body, headers) {
return this.request(serviceName, {
method: 'PUT',
path,
body,
headers
});
}
async delete(serviceName, path, headers) {
return this.request(serviceName, {
method: 'DELETE',
path,
headers
});
}
}配置管理
配置中心
javascript
// config-center.js 核心逻辑
class ConfigCenter {
constructor(storage) {
this.storage = storage;
this.listeners = new Map();
}
async getConfig(serviceName, version = 'latest') {
const config = await this.storage.get(serviceName, version);
return config;
}
async setConfig(serviceName, config, version) {
await this.storage.set(serviceName, version, config);
// 通知监听器
this.notifyListeners(serviceName, config, version);
}
subscribe(serviceName, callback) {
if (!this.listeners.has(serviceName)) {
this.listeners.set(serviceName, []);
}
this.listeners.get(serviceName).push(callback);
}
unsubscribe(serviceName, callback) {
if (!this.listeners.has(serviceName)) {
return;
}
const callbacks = this.listeners.get(serviceName);
const index = callbacks.indexOf(callback);
if (index !== -1) {
callbacks.splice(index, 1);
}
}
notifyListeners(serviceName, config, version) {
const callbacks = this.listeners.get(serviceName);
if (!callbacks) return;
callbacks.forEach(callback => {
callback(config, version);
});
}
}总结
微服务架构的源代码展示了服务注册与发现、服务通信和配置管理的核心机制。理解这些核心组件的实现,有助于我们更好地构建微服务系统。
