Skip to content

第4章:多渠道集成

4.1 渠道系统概述

多渠道集成是Nanobot2的重要特性,允许AI助手通过不同的通信渠道与用户交互,如命令行、Telegram、Discord等。渠道系统采用统一的接口设计,便于扩展和管理。

4.1.1 渠道系统架构

渠道系统由以下组件组成:

  1. 渠道基类:定义渠道的基本接口和行为
  2. 渠道管理器:负责渠道的注册、管理和状态控制
  3. 内置渠道:系统自带的常用渠道
  4. 自定义渠道:用户或开发者创建的渠道

4.2 渠道基类设计

渠道基类是所有渠道的基础,定义了渠道必须实现的方法和属性。

4.2.1 核心实现

python
from abc import ABC, abstractmethod

class Channel(ABC):
    """
    渠道基类
    
    所有渠道必须继承自此类并实现start和send_message方法
    """
    
    def __init__(self, name: str):
        self.name = name
        self.enabled = True
        self.running = False
    
    @abstractmethod
    async def start(self):
        """
        启动渠道
        """
        pass
    
    @abstractmethod
    async def stop(self):
        """
        停止渠道
        """
        pass
    
    @abstractmethod
    async def send_message(self, user_id: str, content: str):
        """
        发送消息
        
        Args:
            user_id: 用户ID
            content: 消息内容
        """
        pass
    
    @abstractmethod
    async def receive_message(self, user_id: str, content: str):
        """
        接收消息
        
        Args:
            user_id: 用户ID
            content: 消息内容
        """
        pass
    
    def get_name(self) -> str:
        """
        获取渠道名称
        
        Returns:
            渠道名称
        """
        return self.name
    
    def is_enabled(self) -> bool:
        """
        检查渠道是否启用
        
        Returns:
            是否启用
        """
        return self.enabled
    
    def enable(self):
        """
        启用渠道
        """
        self.enabled = True
    
    def disable(self):
        """
        禁用渠道
        """
        self.enabled = False
    
    def is_running(self) -> bool:
        """
        检查渠道是否运行中
        
        Returns:
            是否运行中
        """
        return self.running
    
    def set_running(self, running: bool):
        """
        设置渠道运行状态
        
        Args:
            running: 是否运行中
        """
        self.running = running

4.2.2 渠道属性

属性描述默认值
name渠道名称构造函数参数
enabled渠道是否启用True
running渠道是否运行中False

4.3 渠道管理器

渠道管理器负责渠道的注册、管理和状态控制,是渠道系统的核心组件。

4.3.1 核心实现

python
class ChannelManager:
    """
    渠道管理器
    
    负责渠道的注册、管理和状态控制
    """
    
    def __init__(self):
        self.channels = {}
    
    def register_channel(self, channel: Channel):
        """
        注册渠道
        
        Args:
            channel: 渠道实例
        """
        channel_name = channel.get_name()
        self.channels[channel_name] = channel
        print(f"注册渠道: {channel_name}")
    
    def unregister_channel(self, channel_name: str):
        """
        注销渠道
        
        Args:
            channel_name: 渠道名称
        """
        if channel_name in self.channels:
            del self.channels[channel_name]
            print(f"注销渠道: {channel_name}")
    
    def get_channel(self, channel_name: str) -> Channel:
        """
        获取渠道
        
        Args:
            channel_name: 渠道名称
        
        Returns:
            渠道实例
        """
        return self.channels.get(channel_name)
    
    def list_channels(self) -> list:
        """
        列出所有渠道
        
        Returns:
            渠道名称列表
        """
        return list(self.channels.keys())
    
    async def start_channel(self, channel_name: str) -> bool:
        """
        启动渠道
        
        Args:
            channel_name: 渠道名称
        
        Returns:
            是否启动成功
        """
        channel = self.get_channel(channel_name)
        if not channel:
            print(f"渠道不存在: {channel_name}")
            return False
        
        if not channel.is_enabled():
            print(f"渠道未启用: {channel_name}")
            return False
        
        if channel.is_running():
            print(f"渠道已运行: {channel_name}")
            return True
        
        try:
            await channel.start()
            channel.set_running(True)
            print(f"渠道启动成功: {channel_name}")
            return True
        except Exception as e:
            print(f"渠道启动失败: {channel_name}, 错误: {e}")
            return False
    
    async def stop_channel(self, channel_name: str) -> bool:
        """
        停止渠道
        
        Args:
            channel_name: 渠道名称
        
        Returns:
            是否停止成功
        """
        channel = self.get_channel(channel_name)
        if not channel:
            print(f"渠道不存在: {channel_name}")
            return False
        
        if not channel.is_running():
            print(f"渠道未运行: {channel_name}")
            return True
        
        try:
            await channel.stop()
            channel.set_running(False)
            print(f"渠道停止成功: {channel_name}")
            return True
        except Exception as e:
            print(f"渠道停止失败: {channel_name}, 错误: {e}")
            return False
    
    async def start_all_channels(self):
        """
        启动所有渠道
        """
        for channel_name in self.list_channels():
            await self.start_channel(channel_name)
    
    async def stop_all_channels(self):
        """
        停止所有渠道
        """
        for channel_name in self.list_channels():
            await self.stop_channel(channel_name)

4.3.2 全局渠道管理器

为了方便在系统的不同部分使用渠道管理器,我们提供了一个全局渠道管理器实例:

python
# 全局渠道管理器实例
_channel_manager = None

def get_channel_manager() -> ChannelManager:
    """
    获取全局渠道管理器实例
    
    Returns:
        渠道管理器实例
    """
    global _channel_manager
    if _channel_manager is None:
        _channel_manager = ChannelManager()
    return _channel_manager

4.4 内置渠道实现

Nanobot2内置了几个常用渠道,如命令行、Telegram和Discord。

4.4.1 命令行渠道

命令行渠道是最基本的渠道,允许用户通过终端与Nanobot2交互:

python
import asyncio
import sys
from nanobot.channels import Channel
from nanobot.agent import get_agent

class CLIChannel(Channel):
    """
    命令行渠道
    
    允许用户通过终端与Nanobot2交互
    """
    
    def __init__(self):
        super().__init__("cli")
        self.agent = get_agent()
        self.running = False
        self.input_task = None
    
    async def start(self):
        """
        启动命令行渠道
        """
        print("启动命令行渠道...")
        self.running = True
        self.input_task = asyncio.create_task(self._input_loop())
        print("命令行渠道启动成功!")
        print("输入 'exit' 退出,输入 'help' 获取帮助")
    
    async def stop(self):
        """
        停止命令行渠道
        """
        print("停止命令行渠道...")
        self.running = False
        if self.input_task:
            self.input_task.cancel()
            try:
                await self.input_task
            except asyncio.CancelledError:
                pass
        print("命令行渠道停止成功!")
    
    async def send_message(self, user_id: str, content: str):
        """
        发送消息到命令行
        
        Args:
            user_id: 用户ID
            content: 消息内容
        """
        print(f"Nanobot2: {content}")
    
    async def receive_message(self, user_id: str, content: str):
        """
        接收消息并处理
        
        Args:
            user_id: 用户ID
            content: 消息内容
        """
        if content.strip().lower() == "exit":
            print("再见!")
            await self.stop()
            return
        
        if content.strip().lower() == "help":
            help_message = """
            命令帮助:
            exit - 退出程序
            help - 获取帮助
            其他输入将被发送给AI助手
            """
            print(help_message)
            return
        
        # 处理消息
        response = await self.agent.process_message(user_id, self.get_name(), content)
        await self.send_message(user_id, response)
    
    async def _input_loop(self):
        """
        输入循环
        """
        try:
            while self.running:
                # 等待用户输入
                content = await self._get_input()
                if content is None:
                    break
                
                # 处理输入
                await self.receive_message("cli-user", content)
        except asyncio.CancelledError:
            pass
        except Exception as e:
            print(f"命令行渠道出错: {e}")
    
    async def _get_input(self) -> str:
        """
        异步获取用户输入
        
        Returns:
            用户输入内容
        """
        loop = asyncio.get_event_loop()
        try:
            content = await loop.run_in_executor(
                None, 
                input, "你: "
            )
            return content
        except EOFError:
            return None

4.4.2 Telegram渠道

Telegram渠道允许用户通过Telegram与Nanobot2交互:

python
import asyncio
from nanobot.channels import Channel
from nanobot.agent import get_agent

class TelegramChannel(Channel):
    """
    Telegram渠道
    
    允许用户通过Telegram与Nanobot2交互
    """
    
    def __init__(self, token: str):
        super().__init__("telegram")
        self.token = token
        self.agent = get_agent()
        self.running = False
        self.updater = None
    
    async def start(self):
        """
        启动Telegram渠道
        """
        try:
            from telegram import Update
            from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
            
            print("启动Telegram渠道...")
            
            # 创建应用
            application = Application.builder().token(self.token).build()
            
            # 添加处理器
            application.add_handler(CommandHandler("start", self._handle_start))
            application.add_handler(CommandHandler("help", self._handle_help))
            application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self._handle_message))
            
            # 启动应用
            self.updater = application
            await application.initialize()
            await application.start()
            await application.updater.start_polling()
            
            self.running = True
            print("Telegram渠道启动成功!")
        except Exception as e:
            print(f"启动Telegram渠道失败: {e}")
            raise
    
    async def stop(self):
        """
        停止Telegram渠道
        """
        print("停止Telegram渠道...")
        if self.updater:
            await self.updater.stop()
            await self.updater.shutdown()
        self.running = False
        print("Telegram渠道停止成功!")
    
    async def send_message(self, user_id: str, content: str):
        """
        发送消息到Telegram
        
        Args:
            user_id: 用户ID
            content: 消息内容
        """
        if not self.running or not self.updater:
            return
        
        try:
            await self.updater.bot.send_message(chat_id=user_id, text=content)
        except Exception as e:
            print(f"发送Telegram消息失败: {e}")
    
    async def receive_message(self, user_id: str, content: str):
        """
        接收消息并处理
        
        Args:
            user_id: 用户ID
            content: 消息内容
        """
        # 处理消息
        response = await self.agent.process_message(user_id, self.get_name(), content)
        await self.send_message(user_id, response)
    
    async def _handle_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """
        处理/start命令
        """
        user_id = str(update.effective_chat.id)
        await self.send_message(user_id, "你好!我是Nanobot2,你的AI助手。有什么可以帮助你的吗?")
    
    async def _handle_help(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """
        处理/help命令
        """
        user_id = str(update.effective_chat.id)
        help_message = """
        命令帮助:
        /start - 开始对话
        /help - 获取帮助
        其他消息将被发送给AI助手
        """
        await self.send_message(user_id, help_message)
    
    async def _handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """
        处理文本消息
        """
        user_id = str(update.effective_chat.id)
        content = update.message.text
        await self.receive_message(user_id, content)

4.4.3 Discord渠道

Discord渠道允许用户通过Discord与Nanobot2交互:

python
import asyncio
from nanobot.channels import Channel
from nanobot.agent import get_agent

class DiscordChannel(Channel):
    """
    Discord渠道
    
    允许用户通过Discord与Nanobot2交互
    """
    
    def __init__(self, token: str):
        super().__init__("discord")
        self.token = token
        self.agent = get_agent()
        self.running = False
        self.client = None
    
    async def start(self):
        """
        启动Discord渠道
        """
        try:
            import discord
            from discord.ext import commands
            
            print("启动Discord渠道...")
            
            # 创建客户端
            intents = discord.Intents.default()
            intents.message_content = True
            
            bot = commands.Bot(command_prefix="!", intents=intents)
            
            # 事件处理
            @bot.event
            async def on_ready():
                print(f"Discord机器人已登录: {bot.user}")
                print("Discord渠道启动成功!")
            
            @bot.event
            async def on_message(message):
                # 忽略机器人自己的消息
                if message.author == bot.user:
                    return
                
                # 处理命令
                await bot.process_commands(message)
                
                # 处理普通消息
                if not message.content.startswith(bot.command_prefix):
                    user_id = str(message.author.id)
                    content = message.content
                    await self.receive_message(user_id, content)
            
            @bot.command()
            async def help(ctx):
                """
                显示帮助信息
                """
                help_message = """
                命令帮助:
                !help - 获取帮助
                其他消息将被发送给AI助手
                """
                await ctx.send(help_message)
            
            self.client = bot
            self.running = True
            
            # 启动机器人
            await bot.start(self.token)
        except Exception as e:
            print(f"启动Discord渠道失败: {e}")
            raise
    
    async def stop(self):
        """
        停止Discord渠道
        """
        print("停止Discord渠道...")
        if self.client and self.running:
            await self.client.close()
        self.running = False
        print("Discord渠道停止成功!")
    
    async def send_message(self, user_id: str, content: str):
        """
        发送消息到Discord
        
        Args:
            user_id: 用户ID
            content: 消息内容
        """
        if not self.running or not self.client:
            return
        
        try:
            user = await self.client.fetch_user(int(user_id))
            await user.send(content)
        except Exception as e:
            print(f"发送Discord消息失败: {e}")
    
    async def receive_message(self, user_id: str, content: str):
        """
        接收消息并处理
        
        Args:
            user_id: 用户ID
            content: 消息内容
        """
        # 处理消息
        response = await self.agent.process_message(user_id, self.get_name(), content)
        await self.send_message(user_id, response)

4.5 渠道注册

为了方便使用内置渠道,我们提供了一个注册内置渠道的函数:

python
from nanobot.config import get_config

def register_builtin_channels():
    """
    注册内置渠道
    """
    from nanobot.channels import get_channel_manager
    from nanobot.channels.builtin import CLIChannel, TelegramChannel, DiscordChannel
    
    channel_manager = get_channel_manager()
    config = get_config()
    
    # 注册命令行渠道
    cli_channel = CLIChannel()
    channel_manager.register_channel(cli_channel)
    
    # 注册Telegram渠道(如果启用)
    telegram_config = config.get("channels.telegram", {})
    if telegram_config.get("enabled", False):
        token = telegram_config.get("token", "")
        if token:
            telegram_channel = TelegramChannel(token)
            channel_manager.register_channel(telegram_channel)
        else:
            print("Telegram渠道未配置token,跳过注册")
    
    # 注册Discord渠道(如果启用)
    discord_config = config.get("channels.discord", {})
    if discord_config.get("enabled", False):
        token = discord_config.get("token", "")
        if token:
            discord_channel = DiscordChannel(token)
            channel_manager.register_channel(discord_channel)
        else:
            print("Discord渠道未配置token,跳过注册")
    
    print("内置渠道注册完成")

4.6 自定义渠道开发

4.6.1 自定义渠道示例

要创建自定义渠道,只需继承Channel基类并实现必要的方法:

python
from nanobot.channels import Channel
from nanobot.agent import get_agent

class WebChannel(Channel):
    """
    Web渠道
    
    允许用户通过Web界面与Nanobot2交互
    """
    
    def __init__(self):
        super().__init__("web")
        self.agent = get_agent()
        self.running = False
        self.server = None
    
    async def start(self):
        """
        启动Web渠道
        """
        print("启动Web渠道...")
        # 这里可以启动一个Web服务器
        self.running = True
        print("Web渠道启动成功!")
    
    async def stop(self):
        """
        停止Web渠道
        """
        print("停止Web渠道...")
        # 这里可以停止Web服务器
        self.running = False
        print("Web渠道停止成功!")
    
    async def send_message(self, user_id: str, content: str):
        """
        发送消息到Web界面
        
        Args:
            user_id: 用户ID
            content: 消息内容
        """
        # 这里可以实现向Web界面发送消息的逻辑
        print(f"发送消息到用户 {user_id}: {content}")
    
    async def receive_message(self, user_id: str, content: str):
        """
        接收消息并处理
        
        Args:
            user_id: 用户ID
            content: 消息内容
        """
        # 处理消息
        response = await self.agent.process_message(user_id, self.get_name(), content)
        await self.send_message(user_id, response)

4.6.2 注册自定义渠道

创建自定义渠道后,需要将其注册到渠道管理器:

python
from nanobot.channels import get_channel_manager
from my_channels import WebChannel

channel_manager = get_channel_manager()
channel_manager.register_channel(WebChannel())

4.7 渠道集成测试

4.7.1 单元测试

为渠道系统编写单元测试,确保各功能正常工作:

python
import unittest
from unittest.mock import AsyncMock, Mock
from nanobot.channels import ChannelManager
from nanobot.channels.builtin import CLIChannel

class TestChannelManager(unittest.TestCase):
    def setUp(self):
        self.channel_manager = ChannelManager()
        self.cli_channel = CLIChannel()
        self.channel_manager.register_channel(self.cli_channel)
    
    async def test_start_channel(self):
        # 测试启动渠道
        result = await self.channel_manager.start_channel("cli")
        self.assertTrue(result)
        self.assertTrue(self.cli_channel.is_running())
    
    async def test_stop_channel(self):
        # 先启动渠道
        await self.channel_manager.start_channel("cli")
        
        # 测试停止渠道
        result = await self.channel_manager.stop_channel("cli")
        self.assertTrue(result)
        self.assertFalse(self.cli_channel.is_running())

if __name__ == "__main__":
    unittest.main()

4.7.2 集成测试

运行完整的集成测试,确保渠道系统与其他模块正常协作:

bash
python -m pytest tests/test_channels.py -v

4.8 渠道系统扩展

4.8.1 渠道配置

为了使渠道更加灵活,可以支持渠道配置:

python
class ConfigurableChannel(Channel):
    """
    可配置渠道基类
    """
    
    def __init__(self, name: str, config: dict = None):
        super().__init__(name)
        self.config = config or {}
    
    def get_config(self, key: str, default: Any = None) -> Any:
        """
        获取渠道配置
        
        Args:
            key: 配置键
            default: 默认值
        
        Returns:
            配置值
        """
        return self.config.get(key, default)
    
    def set_config(self, key: str, value: Any):
        """
        设置渠道配置
        
        Args:
            key: 配置键
            value: 配置值
        """
        self.config[key] = value

4.8.2 渠道状态监控

为了监控渠道的状态,可以添加状态监控功能:

python
class MonitorableChannel(Channel):
    """
    可监控渠道基类
    """
    
    def __init__(self, name: str):
        super().__init__(name)
        self.status = "idle"
        self.error_count = 0
    
    def get_status(self) -> str:
        """
        获取渠道状态
        
        Returns:
            渠道状态
        """
        return self.status
    
    def set_status(self, status: str):
        """
        设置渠道状态
        
        Args:
            status: 渠道状态
        """
        self.status = status
    
    def increment_error_count(self):
        """
        增加错误计数
        """
        self.error_count += 1
    
    def get_error_count(self) -> int:
        """
        获取错误计数
        
        Returns:
            错误计数
        """
        return self.error_count

4.9 小结

本章介绍了Nanobot2渠道系统的设计和实现,包括:

  1. 渠道系统架构:渠道基类、渠道管理器、内置渠道和自定义渠道
  2. 渠道基类设计:定义渠道的基本接口和行为
  3. 渠道管理器:负责渠道的注册、管理和状态控制
  4. 内置渠道实现:命令行、Telegram和Discord渠道
  5. 自定义渠道开发:如何创建和注册自定义渠道
  6. 渠道集成测试:单元测试和集成测试
  7. 渠道系统扩展:渠道配置和状态监控

渠道系统使Nanobot2能够通过多种方式与用户交互,大大提高了其实用性和可访问性。在接下来的章节中,我们将学习高级功能,如定时任务系统和心跳检测,进一步增强Nanobot2的能力。