Appearance
数据库集成源代码导览
MongoDB 源代码分析
Mongoose 核心组件
Mongoose 的源代码结构如下:
mongoose/
├── lib/
│ ├── connection.js # 数据库连接
│ ├── model.js # 模型定义
│ ├── schema.js # Schema 定义
│ ├── document.js # 文档实例
│ ├── query.js # 查询构建器
│ ├── types/ # 数据类型
│ │ ├── array.js
│ │ ├── string.js
│ │ ├── number.js
│ │ └── ...
│ ├── plugins/ # 插件系统
│ ├── middleware/ # 中间件
│ │ ├── pre.js
│ │ ├── post.js
│ │ └── ...
│ └── utils.js # 工具函数
├── test/ # 测试文件
└── package.json连接管理
javascript
// connection.js 核心逻辑
class Connection {
constructor(uri, options) {
this.uri = uri;
this.options = options;
this.readyState = 'disconnected';
this.models = {};
this.collections = {};
}
// 建立连接
async connect() {
const { MongoClient } = require('mongodb');
this.client = new MongoClient(this.uri, this.options);
await this.client.connect();
this.db = this.client.db();
this.readyState = 'connected';
this.emit('connected');
}
// 关闭连接
async close() {
await this.client.close();
this.readyState = 'disconnected';
this.emit('disconnected');
}
}Schema 定义
javascript
// schema.js 核心逻辑
class Schema {
definition(obj, options) {
this.paths = {};
this.virtuals = {};
this.methods = {};
this.statics = {};
this.middleware = {
pre: {},
post: {}
};
// 定义字段
for (const key in obj) {
this.path(key, obj[key]);
}
}
// 添加字段
path(name, schemaType) {
this.paths[name] = schemaType;
return this;
}
// 添加方法
method(name, fn) {
this.methods[name] = fn;
return this;
}
// 添加静态方法
static(name, fn) {
this.statics[name] = fn;
return this;
}
// 添加中间件
pre(hook, fn) {
if (!this.middleware.pre[hook]) {
this.middleware.pre[hook] = [];
}
this.middleware.pre[hook].push(fn);
}
post(hook, fn) {
if (!this.middleware.post[hook]) {
this.middleware.post[hook] = [];
}
this.middleware.post[hook].push(fn);
}
}模型定义
javascript
// model.js 核心逻辑
class Model {
constructor(doc, fields) {
this._doc = doc || {};
this._isNew = !doc;
this._modified = {};
// 添加 Schema 方法
for (const name in this.schema.methods) {
this[name] = this.schema.methods[name];
}
}
// 保存文档
async save() {
// 执行 pre 中间件
await this._execMiddleware('pre', 'save');
// 验证数据
await this.validate();
// 保存到数据库
if (this._isNew) {
const result = await this.collection.insertOne(this._doc);
this._doc._id = result.insertedId;
this._isNew = false;
} else {
await this.collection.updateOne(
{ _id: this._doc._id },
{ $set: this._modified }
);
}
// 执行 post 中间件
await this._execMiddleware('post', 'save');
}
// 验证数据
async validate() {
const errors = [];
for (const path in this.schema.paths) {
const schemaType = this.schema.paths[path];
const value = this._doc[path];
// 验证必填字段
if (schemaType.required && !value) {
errors.push({ path, message: `${path} is required` });
}
// 验证类型
if (value && typeof value !== schemaType.type) {
errors.push({ path, message: `${path} must be ${schemaType.type}` });
}
}
if (errors.length > 0) {
throw new ValidationError(errors);
}
}
// 执行中间件
async _execMiddleware(type, hook) {
const middleware = this.schema.middleware[type][hook] || [];
for (const fn of middleware) {
await fn.call(this);
}
}
}查询构建器
javascript
// query.js 核心逻辑
class Query {
constructor(model, conditions) {
this.model = model;
this.conditions = conditions || {};
this.projection = {};
this.options = {};
this.sort = {};
this.limit = 0;
this.skip = 0;
}
// 添加查询条件
where(path, value) {
this.conditions[path] = value;
return this;
}
// 投影字段
select(fields) {
this.projection = fields;
return this;
}
// 排序
sort(sort) {
this.sort = sort;
return this;
}
// 限制数量
limit(limit) {
this.limit = limit;
return this;
}
// 跳过数量
skip(skip) {
this.skip = skip;
return this;
}
// 执行查询
async exec() {
const cursor = this.collection.find(this.conditions);
if (Object.keys(this.projection).length > 0) {
cursor.project(this.projection);
}
if (Object.keys(this.sort).length > 0) {
cursor.sort(this.sort);
}
if (this.skip > 0) {
cursor.skip(this.skip);
}
if (this.limit > 0) {
cursor.limit(this.limit);
}
const docs = await cursor.toArray();
return docs.map(doc => new this.model(doc));
}
// 查找单个文档
async findOne() {
const doc = await this.collection.findOne(this.conditions);
return doc ? new this.model(doc) : null;
}
// 统计数量
async countDocuments() {
return await this.collection.countDocuments(this.conditions);
}
}PostgreSQL 源代码分析
pg 核心组件
pg 的源代码结构如下:
pg/
├── lib/
│ ├── client.js # 客户端连接
│ ├── query.js # 查询执行
│ ├── connection.js # 连接管理
│ ├── pool.js # 连接池
│ ├── types/ # 数据类型转换
│ │ ├── array.js
│ │ ├── json.js
│ │ ├── numeric.js
│ │ └── ...
│ └── utils.js # 工具函数
├── test/ # 测试文件
└── package.json连接管理
javascript
// client.js 核心逻辑
class Client {
constructor(config) {
this.config = config;
this.connection = null;
this.queryQueue = [];
}
// 建立连接
async connect() {
const net = require('net');
this.connection = net.connect({
host: this.config.host,
port: this.config.port
});
// 发送启动消息
this._sendStartupMessage();
// 等待认证
await this._authenticate();
// 处理查询队列
this._processQueryQueue();
}
// 发送查询
async query(text, values) {
return new Promise((resolve, reject) => {
this.queryQueue.push({ text, values, resolve, reject });
this._processQueryQueue();
});
}
// 处理查询队列
_processQueryQueue() {
if (this.queryQueue.length === 0) return;
const { text, values, resolve, reject } = this.queryQueue.shift();
// 发送查询消息
this._sendQueryMessage(text, values);
// 等待响应
this.connection.once('readyForQuery', () => {
resolve(this.lastResult);
});
}
// 关闭连接
async end() {
await this.query('SELECT pg_terminate_backend($1)', [this.connection.processID]);
this.connection.end();
}
}连接池
javascript
// pool.js 核心逻辑
class Pool {
constructor(config) {
this.config = config;
this.clients = [];
this.waiting = [];
this.maxClients = config.max || 10;
this.minClients = config.min || 2;
}
// 获取连接
async connect() {
// 检查是否有空闲连接
const client = this.clients.find(c => !c.inUse);
if (client) {
client.inUse = true;
return client;
}
// 检查是否可以创建新连接
if (this.clients.length < this.maxClients) {
const newClient = await this._createClient();
newClient.inUse = true;
this.clients.push(newClient);
return newClient;
}
// 等待连接释放
return new Promise(resolve => {
this.waiting.push(resolve);
});
}
// 释放连接
release(client) {
client.inUse = false;
// 检查是否有等待的请求
if (this.waiting.length > 0) {
const resolve = this.waiting.shift();
client.inUse = true;
resolve(client);
}
}
// 创建客户端
async _createClient() {
const { Client } = require('./client');
const client = new Client(this.config);
await client.connect();
return client;
}
// 关闭连接池
async end() {
await Promise.all(this.clients.map(c => c.end()));
this.clients = [];
}
}查询执行
javascript
// query.js 核心逻辑
class Query {
constructor(text, values, client) {
this.text = text;
this.values = values || [];
this.client = client;
this.result = null;
this.rows = [];
}
// 执行查询
async execute() {
// 参数化查询
const preparedText = this._prepareQuery();
// 发送查询
await this.client.query(preparedText, this.values);
// 等待结果
await this._waitForResult();
return this.rows;
}
// 准备查询
_prepareQuery() {
let text = this.text;
const values = this.values;
// 替换参数占位符
for (let i = 0; i < values.length; i++) {
const value = this._escapeValue(values[i]);
text = text.replace(`$${i + 1}`, value);
}
return text;
}
// 转义值
_escapeValue(value) {
if (value === null) return 'NULL';
if (typeof value === 'string') return `'${value.replace(/'/g, "''")}'`;
if (typeof value === 'number') return value.toString();
if (typeof value === 'boolean') return value ? 'TRUE' : 'FALSE';
if (Array.isArray(value)) return `ARRAY[${value.map(v => this._escapeValue(v)).join(',')}]`;
return `'${JSON.stringify(value).replace(/'/g, "''")}'`;
}
// 等待结果
async _waitForResult() {
return new Promise(resolve => {
this.client.connection.on('data', data => {
this._parseResult(data);
resolve();
});
});
}
// 解析结果
_parseResult(data) {
// 解析 PostgreSQL 协议消息
const messages = this._parseMessages(data);
for (const message of messages) {
if (message.type === 'DataRow') {
this.rows.push(message.row);
} else if (message.type === 'CommandComplete') {
this.result = message;
}
}
}
}数据访问层实现
Repository 模式
javascript
// repository.js 核心逻辑
class Repository {
constructor(model) {
this.model = model;
}
// 创建
async create(data) {
const instance = new this.model(data);
await instance.save();
return instance;
}
// 查找所有
async findAll(options = {}) {
const query = this.model.find();
if (options.where) {
query.where(options.where);
}
if (options.sort) {
query.sort(options.sort);
}
if (options.limit) {
query.limit(options.limit);
}
if (options.skip) {
query.skip(options.skip);
}
return await query.exec();
}
// 查找单个
async findOne(conditions) {
return await this.model.findOne(conditions);
}
// 更新
async update(conditions, data) {
return await this.model.updateMany(conditions, data);
}
// 删除
async delete(conditions) {
return await this.model.deleteMany(conditions);
}
// 统计
async count(conditions) {
return await this.model.countDocuments(conditions);
}
}总结
MongoDB 和 PostgreSQL 的源代码展示了不同的设计理念。MongoDB 使用文档模型,提供了灵活的数据结构和丰富的查询能力;PostgreSQL 使用关系模型,提供了强大的事务和查询优化能力。理解两种数据库的源代码结构,有助于我们更好地使用和扩展数据库功能。
