Skip to content

数据库集成源代码导览

MongoDB 源代码分析

Mongoose 核心组件

Mongoose 的源代码结构如下:

mongoose/
├── lib/
│   ├── connection.js       # 数据库连接
│   ├── model.js          # 模型定义
│   ├── schema.js         # Schema 定义
│   ├── document.js       # 文档实例
│   ├── query.js          # 查询构建器
│   ├── types/           # 数据类型
│   │   ├── array.js
│   │   ├── string.js
│   │   ├── number.js
│   │   └── ...
│   ├── plugins/          # 插件系统
│   ├── middleware/       # 中间件
│   │   ├── pre.js
│   │   ├── post.js
│   │   └── ...
│   └── utils.js         # 工具函数
├── test/              # 测试文件
└── package.json

连接管理

javascript
// connection.js 核心逻辑
class Connection {
  constructor(uri, options) {
    this.uri = uri;
    this.options = options;
    this.readyState = 'disconnected';
    this.models = {};
    this.collections = {};
  }

  // 建立连接
  async connect() {
    const { MongoClient } = require('mongodb');
    this.client = new MongoClient(this.uri, this.options);
    
    await this.client.connect();
    this.db = this.client.db();
    this.readyState = 'connected';
    
    this.emit('connected');
  }

  // 关闭连接
  async close() {
    await this.client.close();
    this.readyState = 'disconnected';
    this.emit('disconnected');
  }
}

Schema 定义

javascript
// schema.js 核心逻辑
class Schema {
  definition(obj, options) {
    this.paths = {};
    this.virtuals = {};
    this.methods = {};
    this.statics = {};
    this.middleware = {
      pre: {},
      post: {}
    };

    // 定义字段
    for (const key in obj) {
      this.path(key, obj[key]);
    }
  }

  // 添加字段
  path(name, schemaType) {
    this.paths[name] = schemaType;
    return this;
  }

  // 添加方法
  method(name, fn) {
    this.methods[name] = fn;
    return this;
  }

  // 添加静态方法
  static(name, fn) {
    this.statics[name] = fn;
    return this;
  }

  // 添加中间件
  pre(hook, fn) {
    if (!this.middleware.pre[hook]) {
      this.middleware.pre[hook] = [];
    }
    this.middleware.pre[hook].push(fn);
  }

  post(hook, fn) {
    if (!this.middleware.post[hook]) {
      this.middleware.post[hook] = [];
    }
    this.middleware.post[hook].push(fn);
  }
}

模型定义

javascript
// model.js 核心逻辑
class Model {
  constructor(doc, fields) {
    this._doc = doc || {};
    this._isNew = !doc;
    this._modified = {};
    
    // 添加 Schema 方法
    for (const name in this.schema.methods) {
      this[name] = this.schema.methods[name];
    }
  }

  // 保存文档
  async save() {
    // 执行 pre 中间件
    await this._execMiddleware('pre', 'save');

    // 验证数据
    await this.validate();

    // 保存到数据库
    if (this._isNew) {
      const result = await this.collection.insertOne(this._doc);
      this._doc._id = result.insertedId;
      this._isNew = false;
    } else {
      await this.collection.updateOne(
        { _id: this._doc._id },
        { $set: this._modified }
      );
    }

    // 执行 post 中间件
    await this._execMiddleware('post', 'save');
  }

  // 验证数据
  async validate() {
    const errors = [];

    for (const path in this.schema.paths) {
      const schemaType = this.schema.paths[path];
      const value = this._doc[path];

      // 验证必填字段
      if (schemaType.required && !value) {
        errors.push({ path, message: `${path} is required` });
      }

      // 验证类型
      if (value && typeof value !== schemaType.type) {
        errors.push({ path, message: `${path} must be ${schemaType.type}` });
      }
    }

    if (errors.length > 0) {
      throw new ValidationError(errors);
    }
  }

  // 执行中间件
  async _execMiddleware(type, hook) {
    const middleware = this.schema.middleware[type][hook] || [];
    for (const fn of middleware) {
      await fn.call(this);
    }
  }
}

查询构建器

javascript
// query.js 核心逻辑
class Query {
  constructor(model, conditions) {
    this.model = model;
    this.conditions = conditions || {};
    this.projection = {};
    this.options = {};
    this.sort = {};
    this.limit = 0;
    this.skip = 0;
  }

  // 添加查询条件
  where(path, value) {
    this.conditions[path] = value;
    return this;
  }

  // 投影字段
  select(fields) {
    this.projection = fields;
    return this;
  }

  // 排序
  sort(sort) {
    this.sort = sort;
    return this;
  }

  // 限制数量
  limit(limit) {
    this.limit = limit;
    return this;
  }

  // 跳过数量
  skip(skip) {
    this.skip = skip;
    return this;
  }

  // 执行查询
  async exec() {
    const cursor = this.collection.find(this.conditions);

    if (Object.keys(this.projection).length > 0) {
      cursor.project(this.projection);
    }

    if (Object.keys(this.sort).length > 0) {
      cursor.sort(this.sort);
    }

    if (this.skip > 0) {
      cursor.skip(this.skip);
    }

    if (this.limit > 0) {
      cursor.limit(this.limit);
    }

    const docs = await cursor.toArray();
    return docs.map(doc => new this.model(doc));
  }

  // 查找单个文档
  async findOne() {
    const doc = await this.collection.findOne(this.conditions);
    return doc ? new this.model(doc) : null;
  }

  // 统计数量
  async countDocuments() {
    return await this.collection.countDocuments(this.conditions);
  }
}

PostgreSQL 源代码分析

pg 核心组件

pg 的源代码结构如下:

pg/
├── lib/
│   ├── client.js          # 客户端连接
│   ├── query.js           # 查询执行
│   ├── connection.js      # 连接管理
│   ├── pool.js           # 连接池
│   ├── types/            # 数据类型转换
│   │   ├── array.js
│   │   ├── json.js
│   │   ├── numeric.js
│   │   └── ...
│   └── utils.js          # 工具函数
├── test/               # 测试文件
└── package.json

连接管理

javascript
// client.js 核心逻辑
class Client {
  constructor(config) {
    this.config = config;
    this.connection = null;
    this.queryQueue = [];
  }

  // 建立连接
  async connect() {
    const net = require('net');
    this.connection = net.connect({
      host: this.config.host,
      port: this.config.port
    });

    // 发送启动消息
    this._sendStartupMessage();

    // 等待认证
    await this._authenticate();

    // 处理查询队列
    this._processQueryQueue();
  }

  // 发送查询
  async query(text, values) {
    return new Promise((resolve, reject) => {
      this.queryQueue.push({ text, values, resolve, reject });
      this._processQueryQueue();
    });
  }

  // 处理查询队列
  _processQueryQueue() {
    if (this.queryQueue.length === 0) return;

    const { text, values, resolve, reject } = this.queryQueue.shift();

    // 发送查询消息
    this._sendQueryMessage(text, values);

    // 等待响应
    this.connection.once('readyForQuery', () => {
      resolve(this.lastResult);
    });
  }

  // 关闭连接
  async end() {
    await this.query('SELECT pg_terminate_backend($1)', [this.connection.processID]);
    this.connection.end();
  }
}

连接池

javascript
// pool.js 核心逻辑
class Pool {
  constructor(config) {
    this.config = config;
    this.clients = [];
    this.waiting = [];
    this.maxClients = config.max || 10;
    this.minClients = config.min || 2;
  }

  // 获取连接
  async connect() {
    // 检查是否有空闲连接
    const client = this.clients.find(c => !c.inUse);
    if (client) {
      client.inUse = true;
      return client;
    }

    // 检查是否可以创建新连接
    if (this.clients.length < this.maxClients) {
      const newClient = await this._createClient();
      newClient.inUse = true;
      this.clients.push(newClient);
      return newClient;
    }

    // 等待连接释放
    return new Promise(resolve => {
      this.waiting.push(resolve);
    });
  }

  // 释放连接
  release(client) {
    client.inUse = false;

    // 检查是否有等待的请求
    if (this.waiting.length > 0) {
      const resolve = this.waiting.shift();
      client.inUse = true;
      resolve(client);
    }
  }

  // 创建客户端
  async _createClient() {
    const { Client } = require('./client');
    const client = new Client(this.config);
    await client.connect();
    return client;
  }

  // 关闭连接池
  async end() {
    await Promise.all(this.clients.map(c => c.end()));
    this.clients = [];
  }
}

查询执行

javascript
// query.js 核心逻辑
class Query {
  constructor(text, values, client) {
    this.text = text;
    this.values = values || [];
    this.client = client;
    this.result = null;
    this.rows = [];
  }

  // 执行查询
  async execute() {
    // 参数化查询
    const preparedText = this._prepareQuery();

    // 发送查询
    await this.client.query(preparedText, this.values);

    // 等待结果
    await this._waitForResult();

    return this.rows;
  }

  // 准备查询
  _prepareQuery() {
    let text = this.text;
    const values = this.values;

    // 替换参数占位符
    for (let i = 0; i < values.length; i++) {
      const value = this._escapeValue(values[i]);
      text = text.replace(`$${i + 1}`, value);
    }

    return text;
  }

  // 转义值
  _escapeValue(value) {
    if (value === null) return 'NULL';
    if (typeof value === 'string') return `'${value.replace(/'/g, "''")}'`;
    if (typeof value === 'number') return value.toString();
    if (typeof value === 'boolean') return value ? 'TRUE' : 'FALSE';
    if (Array.isArray(value)) return `ARRAY[${value.map(v => this._escapeValue(v)).join(',')}]`;
    return `'${JSON.stringify(value).replace(/'/g, "''")}'`;
  }

  // 等待结果
  async _waitForResult() {
    return new Promise(resolve => {
      this.client.connection.on('data', data => {
        this._parseResult(data);
        resolve();
      });
    });
  }

  // 解析结果
  _parseResult(data) {
    // 解析 PostgreSQL 协议消息
    const messages = this._parseMessages(data);

    for (const message of messages) {
      if (message.type === 'DataRow') {
        this.rows.push(message.row);
      } else if (message.type === 'CommandComplete') {
        this.result = message;
      }
    }
  }
}

数据访问层实现

Repository 模式

javascript
// repository.js 核心逻辑
class Repository {
  constructor(model) {
    this.model = model;
  }

  // 创建
  async create(data) {
    const instance = new this.model(data);
    await instance.save();
    return instance;
  }

  // 查找所有
  async findAll(options = {}) {
    const query = this.model.find();

    if (options.where) {
      query.where(options.where);
    }

    if (options.sort) {
      query.sort(options.sort);
    }

    if (options.limit) {
      query.limit(options.limit);
    }

    if (options.skip) {
      query.skip(options.skip);
    }

    return await query.exec();
  }

  // 查找单个
  async findOne(conditions) {
    return await this.model.findOne(conditions);
  }

  // 更新
  async update(conditions, data) {
    return await this.model.updateMany(conditions, data);
  }

  // 删除
  async delete(conditions) {
    return await this.model.deleteMany(conditions);
  }

  // 统计
  async count(conditions) {
    return await this.model.countDocuments(conditions);
  }
}

总结

MongoDB 和 PostgreSQL 的源代码展示了不同的设计理念。MongoDB 使用文档模型,提供了灵活的数据结构和丰富的查询能力;PostgreSQL 使用关系模型,提供了强大的事务和查询优化能力。理解两种数据库的源代码结构,有助于我们更好地使用和扩展数据库功能。