Skip to content

第5章:高级功能

5.1 定时任务系统

定时任务系统是Nanobot2的重要高级功能,允许系统定期执行各种任务,如清理过期会话、发送提醒、更新数据等。定时任务系统采用异步设计,确保任务执行不会阻塞系统的其他操作。

5.1.1 定时任务基类

定时任务基类是所有定时任务的基础,定义了任务必须实现的方法和属性。

python
from abc import ABC, abstractmethod

class CronTask(ABC):
    """
    定时任务基类
    
    所有定时任务必须继承自此类并实现run方法
    """
    
    def __init__(self, interval: int = 60):
        """
        初始化定时任务
        
        Args:
            interval: 执行间隔(秒)
        """
        self.interval = interval
        self.running = False
        self.task = None
    
    @abstractmethod
    async def run(self):
        """
        执行任务
        """
        pass
    
    def get_interval(self) -> int:
        """
        获取执行间隔
        
        Returns:
            执行间隔(秒)
        """
        return self.interval
    
    def is_running(self) -> bool:
        """
        检查任务是否运行中
        
        Returns:
            是否运行中
        """
        return self.running
    
    def set_running(self, running: bool):
        """
        设置任务运行状态
        
        Args:
            running: 是否运行中
        """
        self.running = running
    
    def set_task(self, task):
        """
        设置任务对象
        
        Args:
            task: 任务对象
        """
        self.task = task
    
    def get_task(self):
        """
        获取任务对象
        
        Returns:
            任务对象
        """
        return self.task

5.1.2 定时任务管理器

定时任务管理器负责任务的注册、管理和执行,是定时任务系统的核心组件。

python
import asyncio

class CronManager:
    """
    定时任务管理器
    
    负责定时任务的注册、管理和执行
    """
    
    def __init__(self):
        self.tasks = {}
    
    def register_task(self, name: str, task: CronTask):
        """
        注册定时任务
        
        Args:
            name: 任务名称
            task: 任务实例
        """
        self.tasks[name] = task
        print(f"注册定时任务: {name}, 间隔: {task.get_interval()}秒")
    
    def unregister_task(self, name: str):
        """
        注销定时任务
        
        Args:
            name: 任务名称
        """
        if name in self.tasks:
            task = self.tasks[name]
            if task.is_running():
                task.set_running(False)
                if task.get_task():
                    task.get_task().cancel()
            del self.tasks[name]
            print(f"注销定时任务: {name}")
    
    def get_task(self, name: str) -> CronTask:
        """
        获取定时任务
        
        Args:
            name: 任务名称
        
        Returns:
            任务实例
        """
        return self.tasks.get(name)
    
    def list_tasks(self) -> list:
        """
        列出所有定时任务
        
        Returns:
            任务名称列表
        """
        return list(self.tasks.keys())
    
    async def start_task(self, name: str):
        """
        启动定时任务
        
        Args:
            name: 任务名称
        """
        task = self.get_task(name)
        if not task:
            print(f"定时任务不存在: {name}")
            return
        
        if task.is_running():
            print(f"定时任务已运行: {name}")
            return
        
        task.set_running(True)
        task_task = asyncio.create_task(self._run_task(task, name))
        task.set_task(task_task)
        print(f"启动定时任务: {name}")
    
    async def stop_task(self, name: str):
        """
        停止定时任务
        
        Args:
            name: 任务名称
        """
        task = self.get_task(name)
        if not task:
            print(f"定时任务不存在: {name}")
            return
        
        if not task.is_running():
            print(f"定时任务未运行: {name}")
            return
        
        task.set_running(False)
        if task.get_task():
            task.get_task().cancel()
            try:
                await task.get_task()
            except asyncio.CancelledError:
                pass
        print(f"停止定时任务: {name}")
    
    async def start_all_tasks(self):
        """
        启动所有定时任务
        """
        for task_name in self.list_tasks():
            await self.start_task(task_name)
    
    async def stop_all_tasks(self):
        """
        停止所有定时任务
        """
        for task_name in self.list_tasks():
            await self.stop_task(task_name)
    
    async def _run_task(self, task: CronTask, name: str):
        """
        运行定时任务
        
        Args:
            task: 任务实例
            name: 任务名称
        """
        try:
            while task.is_running():
                try:
                    # 执行任务
                    await task.run()
                except Exception as e:
                    print(f"定时任务执行失败: {name}, 错误: {e}")
                
                # 等待下一次执行
                await asyncio.sleep(task.get_interval())
        except asyncio.CancelledError:
            pass
        except Exception as e:
            print(f"定时任务运行出错: {name}, 错误: {e}")
        finally:
            task.set_running(False)
            print(f"定时任务停止: {name}")

5.1.3 内置定时任务

Nanobot2内置了几个常用的定时任务,如清理过期会话和系统心跳。

5.1.3.1 清理任务

python
class CleanupTask(CronTask):
    """
    清理任务
    
    负责清理过期会话和其他临时数据
    """
    
    def __init__(self, interval: int = 3600):
        """
        初始化清理任务
        
        Args:
            interval: 执行间隔(秒),默认1小时
        """
        super().__init__(interval)
    
    async def run(self):
        """
        执行清理任务
        """
        print("执行清理任务...")
        
        # 清理过期会话
        from nanobot.session import get_session_manager
        session_manager = get_session_manager()
        session_manager.cleanup_expired_sessions()
        
        # 这里可以添加其他清理操作
        # 例如:清理临时文件、缓存等
        
        print("清理任务执行完成")

5.1.3.2 心跳任务

python
class HeartbeatTask(CronTask):
    """
    心跳任务
    
    负责发送系统心跳,监控系统状态
    """
    
    def __init__(self, interval: int = 30):
        """
        初始化心跳任务
        
        Args:
            interval: 执行间隔(秒),默认30秒
        """
        super().__init__(interval)
    
    async def run(self):
        """
        执行心跳任务
        """
        # 心跳检测由heartbeat模块负责
        # 这里可以添加其他心跳相关操作
        pass

5.1.4 定时任务注册

为了方便使用内置定时任务,我们提供了一个注册内置任务的函数:

python
def register_builtin_tasks():
    """
    注册内置定时任务
    """
    from nanobot.cron import get_cron_manager
    from nanobot.cron import CleanupTask, HeartbeatTask
    
    cron_manager = get_cron_manager()
    
    # 注册内置定时任务
    cron_manager.register_task("cleanup", CleanupTask())
    cron_manager.register_task("heartbeat", HeartbeatTask())
    
    print("内置定时任务注册完成")

5.1.5 自定义定时任务

要创建自定义定时任务,只需继承CronTask基类并实现run方法:

python
from nanobot.cron import CronTask

class ReminderTask(CronTask):
    """
    提醒任务
    
    负责定期发送提醒
    """
    
    def __init__(self, interval: int = 3600):
        """
        初始化提醒任务
        
        Args:
            interval: 执行间隔(秒),默认1小时
        """
        super().__init__(interval)
    
    async def run(self):
        """
        执行提醒任务
        """
        print("执行提醒任务...")
        
        # 这里可以实现提醒逻辑
        # 例如:发送邮件、消息等
        
        print("提醒任务执行完成")

5.1.6 定时任务测试

为定时任务系统编写单元测试,确保各功能正常工作:

python
import unittest
import asyncio
from unittest.mock import AsyncMock, Mock
from nanobot.cron import CronManager, CronTask

class TestTask(CronTask):
    def __init__(self, interval: int = 1):
        super().__init__(interval)
        self.executed = False
    
    async def run(self):
        self.executed = True

class TestCronManager(unittest.TestCase):
    def setUp(self):
        self.cron_manager = CronManager()
        self.test_task = TestTask()
        self.cron_manager.register_task("test", self.test_task)
    
    async def test_start_task(self):
        # 测试启动任务
        await self.cron_manager.start_task("test")
        # 等待任务执行
        await asyncio.sleep(2)
        # 验证任务是否执行
        self.assertTrue(self.test_task.executed)
        # 停止任务
        await self.cron_manager.stop_task("test")

if __name__ == "__main__":
    unittest.main()

5.2 心跳检测

心跳检测是Nanobot2的重要监控功能,用于实时监控系统的健康状态和各个组件的运行情况。心跳检测系统会定期发送心跳信号,报告系统状态,并在发现问题时及时预警。

5.2.1 心跳监控器

心跳监控器负责监控系统的健康状态,包括各个组件的状态和系统的整体状态。

python
import time
import asyncio
from typing import Dict, Any, List, callable

class HeartbeatMonitor:
    """
    心跳监控器
    
    负责监控系统健康状态
    """
    
    def __init__(self, message_bus, interval: int = 30):
        """
        初始化心跳监控器
        
        Args:
            message_bus: 消息总线
            interval: 心跳间隔(秒)
        """
        self.message_bus = message_bus
        self.interval = interval
        self.running = False
        self.monitor_task = None
        self.components: Dict[str, Dict[str, Any]] = {}
        self.last_heartbeat = 0
    
    def register_component(self, component_name: str, health_check: callable = None):
        """
        注册组件
        
        Args:
            component_name: 组件名称
            health_check: 健康检查函数
        """
        self.components[component_name] = {
            "last_heartbeat": time.time(),
            "status": "healthy",
            "health_check": health_check,
            "errors": []
        }
        print(f"注册组件: {component_name}")
    
    def unregister_component(self, component_name: str):
        """
        注销组件
        
        Args:
            component_name: 组件名称
        """
        if component_name in self.components:
            del self.components[component_name]
            print(f"注销组件: {component_name}")
    
    def update_component_heartbeat(self, component_name: str):
        """
        更新组件心跳
        
        Args:
            component_name: 组件名称
        """
        if component_name in self.components:
            self.components[component_name]["last_heartbeat"] = time.time()
            self.components[component_name]["status"] = "healthy"
    
    def mark_component_error(self, component_name: str, error: str):
        """
        标记组件错误
        
        Args:
            component_name: 组件名称
            error: 错误信息
        """
        if component_name in self.components:
            self.components[component_name]["status"] = "error"
            self.components[component_name]["errors"].append({
                "timestamp": time.time(),
                "error": error
            })
            # 限制错误记录数量
            if len(self.components[component_name]["errors"]) > 10:
                self.components[component_name]["errors"] = self.components[component_name]["errors"][-10:]
    
    def get_component_status(self, component_name: str) -> Dict[str, Any]:
        """
        获取组件状态
        
        Args:
            component_name: 组件名称
        
        Returns:
            组件状态
        """
        return self.components.get(component_name, {
            "status": "unknown",
            "last_heartbeat": 0,
            "errors": []
        })
    
    def get_system_status(self) -> Dict[str, Any]:
        """
        获取系统状态
        
        Returns:
            系统状态
        """
        healthy_components = 0
        error_components = 0
        unknown_components = 0
        
        for component_name, component in self.components.items():
            if component["status"] == "healthy":
                healthy_components += 1
            elif component["status"] == "error":
                error_components += 1
            else:
                unknown_components += 1
        
        # 检查组件是否过期
        current_time = time.time()
        for component_name, component in self.components.items():
            if current_time - component["last_heartbeat"] > self.interval * 2:
                component["status"] = "error"
                component["errors"].append({
                    "timestamp": current_time,
                    "error": "心跳超时"
                })
        
        # 确定系统整体状态
        if error_components > 0:
            system_status = "error"
        elif healthy_components == len(self.components):
            system_status = "healthy"
        else:
            system_status = "unknown"
        
        return {
            "status": system_status,
            "components": {
                "total": len(self.components),
                "healthy": healthy_components,
                "error": error_components,
                "unknown": unknown_components
            },
            "last_heartbeat": self.last_heartbeat,
            "timestamp": current_time
        }
    
    async def start(self):
        """
        启动心跳监控
        """
        if self.running:
            return
        
        self.running = True
        self.monitor_task = asyncio.create_task(self._monitor_loop())
        print(f"心跳监控启动成功,间隔: {self.interval}秒")
    
    async def stop(self):
        """
        停止心跳监控
        """
        if not self.running:
            return
        
        self.running = False
        
        if self.monitor_task:
            self.monitor_task.cancel()
            try:
                await self.monitor_task
            except asyncio.CancelledError:
                pass
        
        print("心跳监控停止成功")
    
    async def _monitor_loop(self):
        """
        监控循环
        """
        while self.running:
            try:
                # 更新系统心跳
                self.last_heartbeat = time.time()
                
                # 执行健康检查
                await self._perform_health_checks()
                
                # 获取系统状态
                system_status = self.get_system_status()
                
                # 发布心跳事件
                await self.message_bus.publish("heartbeat", {
                    "timestamp": self.last_heartbeat,
                    "status": system_status
                })
                
                # 打印系统状态
                if system_status["status"] != "healthy":
                    print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 系统状态: {system_status['status']}")
                    print(f"  组件状态: 健康={system_status['components']['healthy']}, 错误={system_status['components']['error']}, 未知={system_status['components']['unknown']}")
                
                # 等待下一次心跳
                await asyncio.sleep(self.interval)
                
            except asyncio.CancelledError:
                break
            except Exception as e:
                print(f"心跳监控出错: {e}")
                await asyncio.sleep(1)
    
    async def _perform_health_checks(self):
        """
        执行健康检查
        """
        for component_name, component in self.components.items():
            if component.get("health_check"):
                try:
                    # 执行健康检查
                    result = component["health_check"]()
                    if result:
                        component["status"] = "healthy"
                    else:
                        component["status"] = "error"
                        component["errors"].append({
                            "timestamp": time.time(),
                            "error": "健康检查失败"
                        })
                except Exception as e:
                    component["status"] = "error"
                    component["errors"].append({
                        "timestamp": time.time(),
                        "error": str(e)
                    })
    
    def is_running(self) -> bool:
        """
        检查心跳监控是否运行
        
        Returns:
            是否运行
        """
        return self.running

5.2.2 全局心跳监控器

为了方便在系统的不同部分使用心跳监控器,我们提供了一个全局心跳监控器实例:

python
# 全局心跳监控器实例
_heartbeat_monitor = None

def get_heartbeat_monitor() -> HeartbeatMonitor:
    """
    获取全局心跳监控器实例
    
    Returns:
        心跳监控器实例
    """
    global _heartbeat_monitor
    if _heartbeat_monitor is None:
        from nanobot.bus import get_bus
        message_bus = get_bus()
        from nanobot.config import get_config
        config = get_config()
        interval = config.get("heartbeat.interval", 30)
        _heartbeat_monitor = HeartbeatMonitor(message_bus, interval)
    return _heartbeat_monitor

# 初始化心跳监控
def initialize_heartbeat():
    """
    初始化心跳监控
    """
    from nanobot.heartbeat import get_heartbeat_monitor
    monitor = get_heartbeat_monitor()
    
    # 注册核心组件
    monitor.register_component("config")
    monitor.register_component("bus")
    monitor.register_component("session")
    monitor.register_component("providers")
    monitor.register_component("agent")
    monitor.register_component("skills")
    monitor.register_component("channels")
    monitor.register_component("cron")
    
    # 启动心跳监控
    import asyncio
    asyncio.create_task(monitor.start())

5.2.3 心跳检测集成

心跳检测系统与其他模块的集成:

  1. 核心启动:在系统启动时初始化心跳监控
  2. 组件注册:各模块在初始化时注册到心跳监控器
  3. 心跳更新:各模块定期更新自己的心跳
  4. 错误标记:各模块在发生错误时标记错误状态
  5. 状态监控:系统管理员可以通过命令行查看系统状态

5.2.4 心跳检测测试

为心跳检测系统编写单元测试,确保各功能正常工作:

python
import unittest
import asyncio
from unittest.mock import AsyncMock, Mock
from nanobot.heartbeat import HeartbeatMonitor

class TestHeartbeatMonitor(unittest.TestCase):
    def setUp(self):
        self.mock_bus = Mock()
        self.mock_bus.publish = AsyncMock()
        self.monitor = HeartbeatMonitor(self.mock_bus, interval=1)
    
    async def test_start_stop(self):
        # 测试启动和停止
        await self.monitor.start()
        self.assertTrue(self.monitor.is_running())
        await asyncio.sleep(0.5)
        await self.monitor.stop()
        self.assertFalse(self.monitor.is_running())
    
    def test_register_component(self):
        # 测试注册组件
        self.monitor.register_component("test")
        status = self.monitor.get_component_status("test")
        self.assertEqual(status["status"], "healthy")

if __name__ == "__main__":
    unittest.main()

5.3 系统监控与日志

5.3.1 日志系统

Nanobot2使用Python的标准日志模块,结合自定义的日志配置,提供全面的日志记录功能:

python
import logging
import logging.handlers
import os
import time

def setup_logging():
    """
    设置日志系统
    """
    # 创建日志目录
    log_dir = os.path.expanduser("~/.nanobot/logs")
    os.makedirs(log_dir, exist_ok=True)
    
    # 日志文件名
    log_file = os.path.join(log_dir, f"nanobot_{time.strftime('%Y-%m-%d')}.log")
    
    # 配置日志
    logger = logging.getLogger("nanobot")
    logger.setLevel(logging.INFO)
    
    # 避免重复添加处理器
    if logger.handlers:
        return logger
    
    # 文件处理器
    file_handler = logging.handlers.RotatingFileHandler(
        log_file,
        maxBytes=10*1024*1024,  # 10MB
        backupCount=5
    )
    file_handler.setLevel(logging.INFO)
    
    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    
    # 日志格式
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)
    
    # 添加处理器
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    
    return logger

# 使用示例
logger = setup_logging()
logger.info("系统启动")
logger.error("发生错误")

5.3.2 系统状态命令

通过命令行接口,可以查看系统的详细状态:

bash
python -m nanobot.cli status

或使用心跳命令:

bash
python -m nanobot.cli heartbeat status

5.4 高级功能集成测试

运行完整的集成测试,确保高级功能与其他模块正常协作:

bash
python -m pytest tests/test_advanced.py -v

5.5 小结

本章介绍了Nanobot2的高级功能,包括:

  1. 定时任务系统:支持定期执行各种任务,如清理过期会话、发送提醒等
  2. 心跳检测:实时监控系统健康状态和各个组件的运行情况
  3. 系统监控与日志:全面的日志记录和系统状态查看功能

这些高级功能使Nanobot2更加健壮、可靠和可维护。定时任务系统确保系统能够自动执行各种维护操作,心跳检测系统实时监控系统状态,及时发现和处理问题,日志系统记录系统的运行情况,便于故障排查和系统优化。

在接下来的章节中,我们将学习Nanobot2的部署与运维,包括项目打包、容器化部署、系统监控和自动重启等内容。