Appearance
第5章:高级功能
5.1 定时任务系统
定时任务系统是Nanobot2的重要高级功能,允许系统定期执行各种任务,如清理过期会话、发送提醒、更新数据等。定时任务系统采用异步设计,确保任务执行不会阻塞系统的其他操作。
5.1.1 定时任务基类
定时任务基类是所有定时任务的基础,定义了任务必须实现的方法和属性。
python
from abc import ABC, abstractmethod
class CronTask(ABC):
"""
定时任务基类
所有定时任务必须继承自此类并实现run方法
"""
def __init__(self, interval: int = 60):
"""
初始化定时任务
Args:
interval: 执行间隔(秒)
"""
self.interval = interval
self.running = False
self.task = None
@abstractmethod
async def run(self):
"""
执行任务
"""
pass
def get_interval(self) -> int:
"""
获取执行间隔
Returns:
执行间隔(秒)
"""
return self.interval
def is_running(self) -> bool:
"""
检查任务是否运行中
Returns:
是否运行中
"""
return self.running
def set_running(self, running: bool):
"""
设置任务运行状态
Args:
running: 是否运行中
"""
self.running = running
def set_task(self, task):
"""
设置任务对象
Args:
task: 任务对象
"""
self.task = task
def get_task(self):
"""
获取任务对象
Returns:
任务对象
"""
return self.task5.1.2 定时任务管理器
定时任务管理器负责任务的注册、管理和执行,是定时任务系统的核心组件。
python
import asyncio
class CronManager:
"""
定时任务管理器
负责定时任务的注册、管理和执行
"""
def __init__(self):
self.tasks = {}
def register_task(self, name: str, task: CronTask):
"""
注册定时任务
Args:
name: 任务名称
task: 任务实例
"""
self.tasks[name] = task
print(f"注册定时任务: {name}, 间隔: {task.get_interval()}秒")
def unregister_task(self, name: str):
"""
注销定时任务
Args:
name: 任务名称
"""
if name in self.tasks:
task = self.tasks[name]
if task.is_running():
task.set_running(False)
if task.get_task():
task.get_task().cancel()
del self.tasks[name]
print(f"注销定时任务: {name}")
def get_task(self, name: str) -> CronTask:
"""
获取定时任务
Args:
name: 任务名称
Returns:
任务实例
"""
return self.tasks.get(name)
def list_tasks(self) -> list:
"""
列出所有定时任务
Returns:
任务名称列表
"""
return list(self.tasks.keys())
async def start_task(self, name: str):
"""
启动定时任务
Args:
name: 任务名称
"""
task = self.get_task(name)
if not task:
print(f"定时任务不存在: {name}")
return
if task.is_running():
print(f"定时任务已运行: {name}")
return
task.set_running(True)
task_task = asyncio.create_task(self._run_task(task, name))
task.set_task(task_task)
print(f"启动定时任务: {name}")
async def stop_task(self, name: str):
"""
停止定时任务
Args:
name: 任务名称
"""
task = self.get_task(name)
if not task:
print(f"定时任务不存在: {name}")
return
if not task.is_running():
print(f"定时任务未运行: {name}")
return
task.set_running(False)
if task.get_task():
task.get_task().cancel()
try:
await task.get_task()
except asyncio.CancelledError:
pass
print(f"停止定时任务: {name}")
async def start_all_tasks(self):
"""
启动所有定时任务
"""
for task_name in self.list_tasks():
await self.start_task(task_name)
async def stop_all_tasks(self):
"""
停止所有定时任务
"""
for task_name in self.list_tasks():
await self.stop_task(task_name)
async def _run_task(self, task: CronTask, name: str):
"""
运行定时任务
Args:
task: 任务实例
name: 任务名称
"""
try:
while task.is_running():
try:
# 执行任务
await task.run()
except Exception as e:
print(f"定时任务执行失败: {name}, 错误: {e}")
# 等待下一次执行
await asyncio.sleep(task.get_interval())
except asyncio.CancelledError:
pass
except Exception as e:
print(f"定时任务运行出错: {name}, 错误: {e}")
finally:
task.set_running(False)
print(f"定时任务停止: {name}")5.1.3 内置定时任务
Nanobot2内置了几个常用的定时任务,如清理过期会话和系统心跳。
5.1.3.1 清理任务
python
class CleanupTask(CronTask):
"""
清理任务
负责清理过期会话和其他临时数据
"""
def __init__(self, interval: int = 3600):
"""
初始化清理任务
Args:
interval: 执行间隔(秒),默认1小时
"""
super().__init__(interval)
async def run(self):
"""
执行清理任务
"""
print("执行清理任务...")
# 清理过期会话
from nanobot.session import get_session_manager
session_manager = get_session_manager()
session_manager.cleanup_expired_sessions()
# 这里可以添加其他清理操作
# 例如:清理临时文件、缓存等
print("清理任务执行完成")5.1.3.2 心跳任务
python
class HeartbeatTask(CronTask):
"""
心跳任务
负责发送系统心跳,监控系统状态
"""
def __init__(self, interval: int = 30):
"""
初始化心跳任务
Args:
interval: 执行间隔(秒),默认30秒
"""
super().__init__(interval)
async def run(self):
"""
执行心跳任务
"""
# 心跳检测由heartbeat模块负责
# 这里可以添加其他心跳相关操作
pass5.1.4 定时任务注册
为了方便使用内置定时任务,我们提供了一个注册内置任务的函数:
python
def register_builtin_tasks():
"""
注册内置定时任务
"""
from nanobot.cron import get_cron_manager
from nanobot.cron import CleanupTask, HeartbeatTask
cron_manager = get_cron_manager()
# 注册内置定时任务
cron_manager.register_task("cleanup", CleanupTask())
cron_manager.register_task("heartbeat", HeartbeatTask())
print("内置定时任务注册完成")5.1.5 自定义定时任务
要创建自定义定时任务,只需继承CronTask基类并实现run方法:
python
from nanobot.cron import CronTask
class ReminderTask(CronTask):
"""
提醒任务
负责定期发送提醒
"""
def __init__(self, interval: int = 3600):
"""
初始化提醒任务
Args:
interval: 执行间隔(秒),默认1小时
"""
super().__init__(interval)
async def run(self):
"""
执行提醒任务
"""
print("执行提醒任务...")
# 这里可以实现提醒逻辑
# 例如:发送邮件、消息等
print("提醒任务执行完成")5.1.6 定时任务测试
为定时任务系统编写单元测试,确保各功能正常工作:
python
import unittest
import asyncio
from unittest.mock import AsyncMock, Mock
from nanobot.cron import CronManager, CronTask
class TestTask(CronTask):
def __init__(self, interval: int = 1):
super().__init__(interval)
self.executed = False
async def run(self):
self.executed = True
class TestCronManager(unittest.TestCase):
def setUp(self):
self.cron_manager = CronManager()
self.test_task = TestTask()
self.cron_manager.register_task("test", self.test_task)
async def test_start_task(self):
# 测试启动任务
await self.cron_manager.start_task("test")
# 等待任务执行
await asyncio.sleep(2)
# 验证任务是否执行
self.assertTrue(self.test_task.executed)
# 停止任务
await self.cron_manager.stop_task("test")
if __name__ == "__main__":
unittest.main()5.2 心跳检测
心跳检测是Nanobot2的重要监控功能,用于实时监控系统的健康状态和各个组件的运行情况。心跳检测系统会定期发送心跳信号,报告系统状态,并在发现问题时及时预警。
5.2.1 心跳监控器
心跳监控器负责监控系统的健康状态,包括各个组件的状态和系统的整体状态。
python
import time
import asyncio
from typing import Dict, Any, List, callable
class HeartbeatMonitor:
"""
心跳监控器
负责监控系统健康状态
"""
def __init__(self, message_bus, interval: int = 30):
"""
初始化心跳监控器
Args:
message_bus: 消息总线
interval: 心跳间隔(秒)
"""
self.message_bus = message_bus
self.interval = interval
self.running = False
self.monitor_task = None
self.components: Dict[str, Dict[str, Any]] = {}
self.last_heartbeat = 0
def register_component(self, component_name: str, health_check: callable = None):
"""
注册组件
Args:
component_name: 组件名称
health_check: 健康检查函数
"""
self.components[component_name] = {
"last_heartbeat": time.time(),
"status": "healthy",
"health_check": health_check,
"errors": []
}
print(f"注册组件: {component_name}")
def unregister_component(self, component_name: str):
"""
注销组件
Args:
component_name: 组件名称
"""
if component_name in self.components:
del self.components[component_name]
print(f"注销组件: {component_name}")
def update_component_heartbeat(self, component_name: str):
"""
更新组件心跳
Args:
component_name: 组件名称
"""
if component_name in self.components:
self.components[component_name]["last_heartbeat"] = time.time()
self.components[component_name]["status"] = "healthy"
def mark_component_error(self, component_name: str, error: str):
"""
标记组件错误
Args:
component_name: 组件名称
error: 错误信息
"""
if component_name in self.components:
self.components[component_name]["status"] = "error"
self.components[component_name]["errors"].append({
"timestamp": time.time(),
"error": error
})
# 限制错误记录数量
if len(self.components[component_name]["errors"]) > 10:
self.components[component_name]["errors"] = self.components[component_name]["errors"][-10:]
def get_component_status(self, component_name: str) -> Dict[str, Any]:
"""
获取组件状态
Args:
component_name: 组件名称
Returns:
组件状态
"""
return self.components.get(component_name, {
"status": "unknown",
"last_heartbeat": 0,
"errors": []
})
def get_system_status(self) -> Dict[str, Any]:
"""
获取系统状态
Returns:
系统状态
"""
healthy_components = 0
error_components = 0
unknown_components = 0
for component_name, component in self.components.items():
if component["status"] == "healthy":
healthy_components += 1
elif component["status"] == "error":
error_components += 1
else:
unknown_components += 1
# 检查组件是否过期
current_time = time.time()
for component_name, component in self.components.items():
if current_time - component["last_heartbeat"] > self.interval * 2:
component["status"] = "error"
component["errors"].append({
"timestamp": current_time,
"error": "心跳超时"
})
# 确定系统整体状态
if error_components > 0:
system_status = "error"
elif healthy_components == len(self.components):
system_status = "healthy"
else:
system_status = "unknown"
return {
"status": system_status,
"components": {
"total": len(self.components),
"healthy": healthy_components,
"error": error_components,
"unknown": unknown_components
},
"last_heartbeat": self.last_heartbeat,
"timestamp": current_time
}
async def start(self):
"""
启动心跳监控
"""
if self.running:
return
self.running = True
self.monitor_task = asyncio.create_task(self._monitor_loop())
print(f"心跳监控启动成功,间隔: {self.interval}秒")
async def stop(self):
"""
停止心跳监控
"""
if not self.running:
return
self.running = False
if self.monitor_task:
self.monitor_task.cancel()
try:
await self.monitor_task
except asyncio.CancelledError:
pass
print("心跳监控停止成功")
async def _monitor_loop(self):
"""
监控循环
"""
while self.running:
try:
# 更新系统心跳
self.last_heartbeat = time.time()
# 执行健康检查
await self._perform_health_checks()
# 获取系统状态
system_status = self.get_system_status()
# 发布心跳事件
await self.message_bus.publish("heartbeat", {
"timestamp": self.last_heartbeat,
"status": system_status
})
# 打印系统状态
if system_status["status"] != "healthy":
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 系统状态: {system_status['status']}")
print(f" 组件状态: 健康={system_status['components']['healthy']}, 错误={system_status['components']['error']}, 未知={system_status['components']['unknown']}")
# 等待下一次心跳
await asyncio.sleep(self.interval)
except asyncio.CancelledError:
break
except Exception as e:
print(f"心跳监控出错: {e}")
await asyncio.sleep(1)
async def _perform_health_checks(self):
"""
执行健康检查
"""
for component_name, component in self.components.items():
if component.get("health_check"):
try:
# 执行健康检查
result = component["health_check"]()
if result:
component["status"] = "healthy"
else:
component["status"] = "error"
component["errors"].append({
"timestamp": time.time(),
"error": "健康检查失败"
})
except Exception as e:
component["status"] = "error"
component["errors"].append({
"timestamp": time.time(),
"error": str(e)
})
def is_running(self) -> bool:
"""
检查心跳监控是否运行
Returns:
是否运行
"""
return self.running5.2.2 全局心跳监控器
为了方便在系统的不同部分使用心跳监控器,我们提供了一个全局心跳监控器实例:
python
# 全局心跳监控器实例
_heartbeat_monitor = None
def get_heartbeat_monitor() -> HeartbeatMonitor:
"""
获取全局心跳监控器实例
Returns:
心跳监控器实例
"""
global _heartbeat_monitor
if _heartbeat_monitor is None:
from nanobot.bus import get_bus
message_bus = get_bus()
from nanobot.config import get_config
config = get_config()
interval = config.get("heartbeat.interval", 30)
_heartbeat_monitor = HeartbeatMonitor(message_bus, interval)
return _heartbeat_monitor
# 初始化心跳监控
def initialize_heartbeat():
"""
初始化心跳监控
"""
from nanobot.heartbeat import get_heartbeat_monitor
monitor = get_heartbeat_monitor()
# 注册核心组件
monitor.register_component("config")
monitor.register_component("bus")
monitor.register_component("session")
monitor.register_component("providers")
monitor.register_component("agent")
monitor.register_component("skills")
monitor.register_component("channels")
monitor.register_component("cron")
# 启动心跳监控
import asyncio
asyncio.create_task(monitor.start())5.2.3 心跳检测集成
心跳检测系统与其他模块的集成:
- 核心启动:在系统启动时初始化心跳监控
- 组件注册:各模块在初始化时注册到心跳监控器
- 心跳更新:各模块定期更新自己的心跳
- 错误标记:各模块在发生错误时标记错误状态
- 状态监控:系统管理员可以通过命令行查看系统状态
5.2.4 心跳检测测试
为心跳检测系统编写单元测试,确保各功能正常工作:
python
import unittest
import asyncio
from unittest.mock import AsyncMock, Mock
from nanobot.heartbeat import HeartbeatMonitor
class TestHeartbeatMonitor(unittest.TestCase):
def setUp(self):
self.mock_bus = Mock()
self.mock_bus.publish = AsyncMock()
self.monitor = HeartbeatMonitor(self.mock_bus, interval=1)
async def test_start_stop(self):
# 测试启动和停止
await self.monitor.start()
self.assertTrue(self.monitor.is_running())
await asyncio.sleep(0.5)
await self.monitor.stop()
self.assertFalse(self.monitor.is_running())
def test_register_component(self):
# 测试注册组件
self.monitor.register_component("test")
status = self.monitor.get_component_status("test")
self.assertEqual(status["status"], "healthy")
if __name__ == "__main__":
unittest.main()5.3 系统监控与日志
5.3.1 日志系统
Nanobot2使用Python的标准日志模块,结合自定义的日志配置,提供全面的日志记录功能:
python
import logging
import logging.handlers
import os
import time
def setup_logging():
"""
设置日志系统
"""
# 创建日志目录
log_dir = os.path.expanduser("~/.nanobot/logs")
os.makedirs(log_dir, exist_ok=True)
# 日志文件名
log_file = os.path.join(log_dir, f"nanobot_{time.strftime('%Y-%m-%d')}.log")
# 配置日志
logger = logging.getLogger("nanobot")
logger.setLevel(logging.INFO)
# 避免重复添加处理器
if logger.handlers:
return logger
# 文件处理器
file_handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setLevel(logging.INFO)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 日志格式
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# 添加处理器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
# 使用示例
logger = setup_logging()
logger.info("系统启动")
logger.error("发生错误")5.3.2 系统状态命令
通过命令行接口,可以查看系统的详细状态:
bash
python -m nanobot.cli status或使用心跳命令:
bash
python -m nanobot.cli heartbeat status5.4 高级功能集成测试
运行完整的集成测试,确保高级功能与其他模块正常协作:
bash
python -m pytest tests/test_advanced.py -v5.5 小结
本章介绍了Nanobot2的高级功能,包括:
- 定时任务系统:支持定期执行各种任务,如清理过期会话、发送提醒等
- 心跳检测:实时监控系统健康状态和各个组件的运行情况
- 系统监控与日志:全面的日志记录和系统状态查看功能
这些高级功能使Nanobot2更加健壮、可靠和可维护。定时任务系统确保系统能够自动执行各种维护操作,心跳检测系统实时监控系统状态,及时发现和处理问题,日志系统记录系统的运行情况,便于故障排查和系统优化。
在接下来的章节中,我们将学习Nanobot2的部署与运维,包括项目打包、容器化部署、系统监控和自动重启等内容。
