Skip to content

第3章:原版Nanobot源码解析

核心代码结构分析

Nanobot的核心代码结构清晰,组织合理,便于理解和维护。整体代码结构如下:

1. 项目根目录结构

nanobot/
├── src/
│   └── nanobot/
│       ├── __init__.py        # 包初始化文件
│       ├── config.py           # 配置管理
│       ├── bus.py              # 消息总线
│       ├── session.py          # 会话管理
│       ├── providers.py        # LLM提供商抽象
│       ├── agent/              # Agent核心
│       ├── skills/             # 技能系统
│       ├── channels/           # 渠道集成
│       ├── cron.py             # 定时任务
│       ├── heartbeat.py        # 心跳检测
│       ├── core.py             # 核心启动
│       └── cli.py              # 命令行接口
├── tests/                      # 测试代码
├── requirements.txt            # 依赖项
├── setup.py                    # 安装配置
├── pyproject.toml              # 项目配置
├── run.sh                      # 运行脚本
├── README.md                   # 项目说明
└── LICENSE                     # 许可证

2. 核心模块代码结构

2.1 配置管理(config.py)

核心代码结构

  • ConfigManager 类:管理配置的加载、解析和访问
  • get_config() 函数:获取全局配置管理器实例
  • 配置文件格式:JSON
  • 配置访问方式:支持点号分隔的嵌套访问

2.2 消息总线(bus.py)

核心代码结构

  • MessageBus 类:实现事件的发布和订阅
  • get_bus() 函数:获取全局消息总线实例
  • 事件处理:支持同步和异步事件处理器
  • 事件存储:使用字典存储事件和对应的处理器

2.3 会话管理(session.py)

核心代码结构

  • Session 类:管理单个会话的状态和消息历史
  • SessionManager 类:管理多个会话的创建和销毁
  • get_session_manager() 函数:获取全局会话管理器实例
  • 会话存储:使用字典存储会话实例
  • 会话过期:支持会话过期和清理

2.4 LLM提供商管理(providers.py)

核心代码结构

  • LLMProvider 基类:定义提供商的统一接口
  • 具体提供商实现:如 OpenAIProviderAnthropicProvider
  • ProviderManager 类:管理多个提供商的注册和使用
  • get_provider_manager() 函数:获取全局提供商管理器实例
  • 提供商注册:支持动态注册新的提供商

2.5 Agent核心(agent/agent.py)

核心代码结构

  • Agent 类:实现思考-行动-观察循环
  • get_agent() 函数:获取全局Agent实例
  • 核心方法:process_message()generate_response()
  • 上下文管理:构建和维护对话上下文
  • 技能协调:协调技能的执行

2.6 技能系统(skills/)

核心代码结构

  • Skill 基类:定义技能的统一接口
  • SkillManager 类:管理技能的注册和执行
  • get_skill_manager() 函数:获取全局技能管理器实例
  • 内置技能:如搜索、天气、计算器等
  • 技能注册:支持动态注册新的技能

2.7 渠道集成(channels/)

核心代码结构

  • Channel 基类:定义渠道的统一接口
  • ChannelManager 类:管理渠道的注册和使用
  • get_channel_manager() 函数:获取全局渠道管理器实例
  • 内置渠道:如命令行、Telegram、Discord等
  • 渠道注册:支持动态注册新的渠道

2.8 定时任务(cron.py)

核心代码结构

  • CronTask 基类:定义定时任务的统一接口
  • 具体任务实现:如 CleanupTaskHeartbeatTask
  • CronManager 类:管理定时任务的注册和执行
  • get_cron_manager() 函数:获取全局定时任务管理器实例
  • 任务调度:支持定时执行任务

2.9 心跳检测(heartbeat.py)

核心代码结构

  • HeartbeatMonitor 类:监控系统状态和健康状况
  • get_heartbeat_monitor() 函数:获取全局心跳监控器实例
  • 状态管理:跟踪系统各组件的状态
  • 异常检测:检测和报告异常情况

2.10 核心启动(core.py)

核心代码结构

  • start() 函数:启动系统
  • stop() 函数:停止系统
  • status() 函数:获取系统状态
  • 模块初始化:按依赖顺序初始化各模块
  • 信号处理:处理系统信号,实现优雅退出

关键模块源码解读

1. 消息总线实现

核心源码

python
class MessageBus:
    def __init__(self):
        self._handlers = {}
        self._lock = asyncio.Lock()
    
    async def subscribe(self, event, handler):
        if event not in self._handlers:
            self._handlers[event] = []
        self._handlers[event].append(handler)
    
    async def unsubscribe(self, event, handler):
        if event in self._handlers:
            self._handlers[event].remove(handler)
    
    async def publish(self, event, data):
        if event in self._handlers:
            for handler in self._handlers[event]:
                try:
                    if asyncio.iscoroutinefunction(handler):
                        await handler(data)
                    else:
                        handler(data)
                except Exception as e:
                    print(f"处理事件 {event} 时出错: {e}")

解读

  • 使用字典存储事件和对应的处理器
  • 支持同步和异步事件处理器
  • 内置错误处理,确保一个处理器的错误不会影响其他处理器
  • 线程安全:使用 asyncio.Lock 确保并发安全

2. Agent核心实现

核心源码

python
class Agent:
    def __init__(self, config_manager, message_bus, session_manager, provider_manager, skill_manager):
        self.config_manager = config_manager
        self.message_bus = message_bus
        self.session_manager = session_manager
        self.provider_manager = provider_manager
        self.skill_manager = skill_manager
    
    async def process_message(self, user_id, content, channel="cli"):
        # 获取或创建会话
        session = self.session_manager.get_session(user_id)
        
        # 添加用户消息到会话
        session.add_message("user", content)
        
        # 构建上下文
        context = self._build_context(session)
        
        # 生成响应
        response = await self.generate_response(context)
        
        # 添加助手消息到会话
        session.add_message("assistant", response)
        
        # 发布响应事件
        await self.message_bus.publish(f"channel.{channel}.response", {
            "user_id": user_id,
            "content": response
        })
        
        return response
    
    async def generate_response(self, context):
        # 获取默认提供商
        provider = self.provider_manager.get_default_provider()
        
        # 调用LLM生成响应
        response = await provider.generate(context)
        
        # 检查是否需要执行技能
        skill_call = self._parse_skill_call(response)
        if skill_call:
            # 执行技能
            skill_result = await self.skill_manager.execute_skill(
                skill_call["name"],
                skill_call["params"]
            )
            
            # 基于技能结果生成最终响应
            final_response = await self._generate_response_from_skill_result(
                context, skill_result
            )
            return final_response
        
        return response

解读

  • 实现了完整的消息处理流程:获取会话 → 添加消息 → 构建上下文 → 生成响应 → 添加响应 → 发布事件
  • 支持技能调用:解析LLM响应中的技能调用请求,执行对应技能
  • 上下文管理:构建包含历史消息的上下文,提供给LLM
  • 多渠道支持:通过channel参数支持不同的通信渠道

3. 技能系统实现

核心源码

python
class Skill:
    def __init__(self, name, description):
        self.name = name
        self.description = description
    
    async def execute(self, **kwargs):
        raise NotImplementedError("子类必须实现execute方法")

class SkillManager:
    def __init__(self, message_bus):
        self.message_bus = message_bus
        self._skills = {}
    
    def register_skill(self, skill_class, config=None):
        try:
            skill = skill_class(config or {})
            self._skills[skill.name] = skill
            print(f"注册技能: {skill.name}")
        except Exception as e:
            print(f"注册技能失败: {e}")
    
    async def execute_skill(self, skill_name, params):
        if skill_name not in self._skills:
            return f"技能 {skill_name} 不存在"
        
        try:
            skill = self._skills[skill_name]
            result = await skill.execute(**params)
            return result
        except Exception as e:
            return f"执行技能 {skill_name} 时出错: {e}"

解读

  • 采用基类+子类的设计模式,定义了技能的统一接口
  • 支持动态注册技能:通过 register_skill 方法注册新技能
  • 技能执行:通过 execute_skill 方法执行指定技能
  • 错误处理:内置错误处理,确保技能执行失败不会影响系统

4. 渠道集成实现

核心源码

python
class Channel:
    def __init__(self, name, config):
        self.name = name
        self.config = config
        self.running = False
    
    async def start(self):
        raise NotImplementedError("子类必须实现start方法")
    
    async def stop(self):
        raise NotImplementedError("子类必须实现stop方法")
    
    async def send_message(self, user_id, content):
        raise NotImplementedError("子类必须实现send_message方法")

class ChannelManager:
    def __init__(self, config_manager, message_bus):
        self.config_manager = config_manager
        self.message_bus = message_bus
        self._channels = {}
    
    def register_channel(self, channel_class, config=None):
        try:
            channel = channel_class(config or {})
            self._channels[channel.name] = channel
            print(f"注册渠道: {channel.name}")
        except Exception as e:
            print(f"注册渠道失败: {e}")
    
    async def start_all(self):
        for channel_name, channel in self._channels.items():
            await channel.start()
    
    async def stop_all(self):
        for channel_name, channel in self._channels.items():
            await channel.stop()
    
    async def send_message(self, channel_name, user_id, content):
        if channel_name not in self._channels:
            print(f"渠道 {channel_name} 不存在")
            return False
        
        try:
            channel = self._channels[channel_name]
            await channel.send_message(user_id, content)
            return True
        except Exception as e:
            print(f"发送消息到渠道 {channel_name} 时出错: {e}")
            return False

解读

  • 采用基类+子类的设计模式,定义了渠道的统一接口
  • 支持动态注册渠道:通过 register_channel 方法注册新渠道
  • 渠道管理:提供 start_allstop_all 方法管理所有渠道
  • 消息发送:通过 send_message 方法向指定渠道发送消息
  • 错误处理:内置错误处理,确保渠道操作失败不会影响系统

核心算法和实现细节

1. 上下文构建算法

实现细节

  • 从会话历史中获取最近的消息
  • 根据配置限制消息数量和长度
  • 格式化消息为LLM可理解的格式
  • 添加上下文提示和系统指令

代码示例

python
def _build_context(self, session):
    # 获取历史消息
    messages = session.get_messages()
    
    # 限制消息数量
    max_messages = self.config_manager.get("agent.max_messages", 20)
    messages = messages[-max_messages:]
    
    # 构建上下文
    context = []
    
    # 添加系统指令
    system_prompt = self.config_manager.get("agent.system_prompt", "你是一个 helpful 的助手")
    context.append({"role": "system", "content": system_prompt})
    
    # 添加历史消息
    for message in messages:
        context.append({
            "role": message["role"],
            "content": message["content"]
        })
    
    return context

2. 技能调用解析算法

实现细节

  • 解析LLM响应中的技能调用格式
  • 提取技能名称和参数
  • 验证技能是否存在
  • 准备技能执行参数

代码示例

python
def _parse_skill_call(self, response):
    # 检查响应是否包含技能调用格式
    if "[SKILL_CALL]" in response and "[/SKILL_CALL]" in response:
        # 提取技能调用部分
        skill_call_str = response.split("[SKILL_CALL]")[1].split("[/SKILL_CALL]")[0]
        
        # 解析技能名称和参数
        try:
            skill_data = json.loads(skill_call_str)
            return skill_data
        except json.JSONDecodeError:
            print(f"解析技能调用失败: {skill_call_str}")
    
    return None

3. 会话过期清理算法

实现细节

  • 定期检查所有会话的最后活动时间
  • 比较当前时间和最后活动时间
  • 清理超过过期时间的会话
  • 发布会话清理事件

代码示例

python
def cleanup_expired_sessions(self):
    current_time = time.time()
    expired_sessions = []
    
    for session_id, session in self._sessions.items():
        last_activity = session.get_last_activity()
        expiration_time = self.config_manager.get("session.expiration_time", 3600)  # 默认1小时
        
        if current_time - last_activity > expiration_time:
            expired_sessions.append(session_id)
    
    # 清理过期会话
    for session_id in expired_sessions:
        del self._sessions[session_id]
        print(f"清理过期会话: {session_id}")
    
    # 发布清理事件
    if expired_sessions:
        asyncio.create_task(self.message_bus.publish("session.cleanup", {
            "count": len(expired_sessions)
        }))

4. 定时任务调度算法

实现细节

  • 为每个任务创建一个异步任务
  • 任务循环中定期执行任务逻辑
  • 处理任务执行错误
  • 支持任务的启动和停止

代码示例

python
async def _run_loop(self):
    while self.running:
        try:
            # 执行任务
            await self.run()
            
            # 等待下一次执行
            for _ in range(self.interval):
                if not self.running:
                    break
                await asyncio.sleep(1)
                
        except asyncio.CancelledError:
            break
        except Exception as e:
            print(f"执行定时任务 {self.name} 时出错: {e}")
            await asyncio.sleep(1)

性能优化和设计决策

1. 性能优化

1.1 异步编程

  • 使用 asyncio 实现异步I/O操作
  • 避免阻塞操作,提高系统响应速度
  • 支持并发处理多个请求

1.2 内存优化

  • 限制会话历史消息数量
  • 定期清理过期会话
  • 使用轻量级数据结构

1.3 网络优化

  • 批量处理API请求
  • 实现请求缓存
  • 优化网络连接管理

1.4 计算优化

  • 避免重复计算
  • 缓存计算结果
  • 优化算法复杂度

2. 设计决策

2.1 模块化设计

  • 优点:代码结构清晰,便于理解和维护
  • 缺点:模块间通信开销增加
  • 决策理由:权衡代码可维护性和性能,选择模块化设计

2.2 事件驱动架构

  • 优点:松耦合,可扩展性强
  • 缺点:事件追踪和调试较复杂
  • 决策理由:适合AI助手这种需要处理多种事件的系统

2.3 多提供商支持

  • 优点:灵活性高,避免单一依赖
  • 缺点:增加代码复杂度
  • 决策理由:提供更多选择,适应不同用户的需求

2.4 多渠道支持

  • 优点:用户可以通过多种方式与助手交互
  • 缺点:增加代码复杂度和维护成本
  • 决策理由:提高用户体验,扩大应用场景

2.5 轻量级设计

  • 优点:启动速度快,资源占用低
  • 缺点:功能相对有限
  • 决策理由:专注于核心功能,提高系统可靠性和性能

小结

Nanobot的源码实现体现了现代Python项目的最佳实践,包括模块化设计、事件驱动架构、异步编程等。通过对核心模块源码的解析,我们可以看到Nanobot是如何实现其轻量级、高性能的设计目标的。

在手写Nanobot的过程中,我们将参考原版Nanobot的设计理念和实现细节,同时根据实际需求进行适当的调整和扩展。通过这种方式,我们不仅可以学习到AI助手的核心实现技术,还可以培养良好的系统设计和代码编写习惯。