Appearance
第1章:项目初始化与基础架构
1.1 项目结构设计
一个良好的项目结构对于代码的可维护性和可扩展性至关重要。Nanobot2采用模块化设计,将不同功能划分为独立的模块,便于开发和维护。
1.1.1 核心目录结构
nanobot2/
├── src/
│ └── nanobot/
│ ├── __init__.py # 包初始化文件
│ ├── config.py # 配置管理
│ ├── bus.py # 消息总线
│ ├── session.py # 会话管理
│ ├── providers.py # LLM提供商抽象
│ ├── agent/ # Agent核心
│ ├── skills/ # 技能系统
│ ├── channels/ # 渠道集成
│ ├── cron.py # 定时任务
│ ├── heartbeat.py # 心跳检测
│ ├── core.py # 核心启动
│ └── cli.py # 命令行接口
├── tests/ # 测试代码
├── requirements.txt # 依赖项
├── setup.py # 安装配置
├── pyproject.toml # 项目配置
├── run.sh # 运行脚本
├── README.md # 项目说明
└── LICENSE # 许可证1.1.2 模块职责划分
| 模块 | 职责 | 文件位置 |
|---|---|---|
| 配置管理 | 管理系统配置,支持嵌套配置访问 | src/nanobot/config.py |
| 消息总线 | 实现事件驱动通信,支持模块间解耦 | src/nanobot/bus.py |
| 会话管理 | 管理用户会话和上下文 | src/nanobot/session.py |
| LLM提供商 | 抽象不同LLM提供商,统一接口 | src/nanobot/providers.py |
| Agent核心 | 实现核心推理逻辑,处理用户请求 | src/nanobot/agent/ |
| 技能系统 | 管理和执行各种技能 | src/nanobot/skills/ |
| 渠道集成 | 支持多种通信渠道 | src/nanobot/channels/ |
| 定时任务 | 管理和执行定时任务 | src/nanobot/cron.py |
| 心跳检测 | 监控系统状态和健康情况 | src/nanobot/heartbeat.py |
| 核心启动 | 系统初始化和生命周期管理 | src/nanobot/core.py |
| 命令行接口 | 提供命令行交互功能 | src/nanobot/cli.py |
1.2 配置管理系统
配置管理是系统的基础,负责加载、解析和提供配置信息。Nanobot2采用JSON格式存储配置,并支持嵌套配置访问。
1.2.1 核心实现
python
class ConfigManager:
def __init__(self, config_path: str = "~/.nanobot/config.json"):
self.config_path = os.path.expanduser(config_path)
self.config = {}
self.load_config()
def get(self, key: str, default: Any = None) -> Any:
keys = key.split(".")
value = self.config
for k in keys:
if isinstance(value, dict) and k in value:
value = value[k]
else:
return default
return value1.2.2 配置文件结构
json
{
"providers": {
"openai": {
"api_key": "your-api-key",
"model": "gpt-3.5-turbo"
},
"anthropic": {
"api_key": "your-api-key",
"model": "claude-2"
}
},
"channels": {
"cli": true,
"telegram": {
"enabled": false,
"token": "your-token"
},
"discord": {
"enabled": false,
"token": "your-token"
}
},
"heartbeat": {
"interval": 30
},
"session": {
"timeout": 3600
}
}1.3 消息总线实现
消息总线是系统的神经中枢,负责模块间的通信,采用事件驱动模式,实现模块间的解耦。
1.3.1 核心实现
python
class MessageBus:
def __init__(self):
self.subscribers: dict[str, List[Callable]] = {}
def subscribe(self, event_type: str, handler: Callable):
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)
async def publish(self, event_type: str, data: Any = None):
if event_type in self.subscribers:
for handler in self.subscribers[event_type]:
if asyncio.iscoroutinefunction(handler):
await handler(data)
else:
handler(data)1.3.2 事件类型
| 事件类型 | 描述 | 数据结构 |
|---|---|---|
user.message | 用户发送消息 | {"user_id": str, "channel": str, "content": str} |
assistant.response | 助手生成响应 | {"user_id": str, "channel": str, "content": str} |
skill.executed | 技能执行完成 | {"skill_name": str, "result": str} |
channel.connected | 渠道连接成功 | {"channel_name": str} |
system.started | 系统启动完成 | {"startup_time": float, "current_time": float} |
heartbeat | 系统心跳 | {"timestamp": float, "status": dict} |
1.4 会话管理系统
会话管理负责维护用户会话状态和上下文,确保对话的连续性和一致性。
1.4.1 核心实现
python
class SessionManager:
def __init__(self, timeout: int = 3600):
self.sessions: dict[str, Session] = {}
self.timeout = timeout
def get_or_create_session(self, user_id: str, channel: str) -> Session:
session_key = f"{channel}:{user_id}"
if session_key not in self.sessions:
self.sessions[session_key] = Session(user_id, channel)
else:
self.sessions[session_key].update_last_active()
return self.sessions[session_key]
def cleanup_expired_sessions(self):
current_time = time.time()
expired_sessions = []
for session_key, session in self.sessions.items():
if current_time - session.last_active > self.timeout:
expired_sessions.append(session_key)
for session_key in expired_sessions:
del self.sessions[session_key]
print(f"清理过期会话: {session_key}")1.4.2 会话数据结构
python
class Session:
def __init__(self, user_id: str, channel: str):
self.user_id = user_id
self.channel = channel
self.messages = []
self.last_active = time.time()
self.context = {}
def add_message(self, role: str, content: str):
self.messages.append({
"role": role,
"content": content,
"timestamp": time.time()
})
self.update_last_active()
def get_messages(self, limit: int = 50):
return self.messages[-limit:]
def update_last_active(self):
self.last_active = time.time()
def set_context(self, key: str, value: Any):
self.context[key] = value
def get_context(self, key: str, default: Any = None):
return self.context.get(key, default)1.5 项目初始化步骤
1.5.1 创建项目目录
bash
mkdir -p nanobot2/src/nanobot/{agent,skills,channels}
mkdir -p nanobot2/tests1.5.2 初始化Python包
创建setup.py和pyproject.toml文件,配置项目依赖和打包信息。
1.5.3 安装依赖
bash
pip install -r requirements.txt1.5.4 配置文件
创建~/.nanobot/config.json配置文件,设置API密钥和其他配置项。
1.6 测试运行
完成基础架构搭建后,可以运行以下命令启动Nanobot2:
bash
python -m nanobot.cli start或使用启动脚本:
bash
./run.sh1.7 小结
本章介绍了Nanobot2项目的初始化过程和基础架构设计,包括:
- 项目结构设计:模块化的目录结构和职责划分
- 配置管理系统:支持嵌套配置访问的JSON配置
- 消息总线实现:事件驱动的模块间通信
- 会话管理系统:维护用户会话状态和上下文
这些基础组件为后续的核心引擎、技能系统和多渠道集成奠定了基础。在接下来的章节中,我们将深入学习这些组件的实现细节和使用方法。
