Skip to content

第41天:SubAgent(子智能体)

学习目标

  • 理解SubAgent的概念和原理
  • 掌握子智能体协作模式
  • 学习任务分配机制
  • 了解通信协议
  • 掌握协调策略

SubAgent概念

什么是SubAgent

SubAgent(子智能体)是大型Agent系统的组成部分,专门负责特定领域或功能的智能体。

核心特点

  • 专业化:每个SubAgent专注于特定领域
  • 模块化:可以独立开发和测试
  • 可组合:多个SubAgent可以组合成复杂系统
  • 可扩展:易于添加新的SubAgent

优势

  1. 降低单个Agent的复杂度
  2. 提高系统的可维护性
  3. 支持并行处理
  4. 便于专业化优化

SubAgent架构

┌─────────────────────────────────────────┐
│         Main Agent (Coordinator)      │
├─────────────────────────────────────────┤
│                                     │
│  ┌─────────┐  ┌─────────┐          │
│  │ SubAgent│  │ SubAgent│          │
│  │   1     │  │   2     │          │
│  └─────────┘  └─────────┘          │
│       │             │                │
│       └──────┬──────┘                │
│              ▼                        │
│  ┌─────────────────────────┐          │
│  │   Communication Hub    │          │
│  └─────────────────────────┘          │
│              │                        │
│  ┌─────────┐  ┌─────────┐          │
│  │ SubAgent│  │ SubAgent│          │
│  │   3     │  │   4     │          │
│  └─────────┘  └─────────┘          │
│                                     │
└─────────────────────────────────────────┘

子智能体协作模式

1. 层次化协作

模式描述

SubAgent按照层次结构组织,上层SubAgent协调下层SubAgent。

实现

python
class HierarchicalSubAgent:
    def __init__(self, name: str, role: str):
        self.name = name
        self.role = role
        self.subordinates = []
        self.superior = None
    
    def add_subordinate(self, subagent):
        subagent.superior = self
        self.subordinates.append(subagent)
    
    def delegate_task(self, task: Task) -> Dict:
        if self.can_handle(task):
            return self.execute(task)
        else:
            return self.delegate_to_subordinate(task)
    
    def delegate_to_subordinate(self, task: Task) -> Dict:
        best_subordinate = self.select_best_subordinate(task)
        return best_subordinate.delegate_task(task)
    
    def select_best_subordinate(self, task: Task) -> 'HierarchicalSubAgent':
        scores = []
        
        for subordinate in self.subordinates:
            score = self.evaluate_capability(subordinate, task)
            scores.append((subordinate, score))
        
        return max(scores, key=lambda x: x[1])[0]
    
    def evaluate_capability(self, subagent: 'HierarchicalSubAgent', task: Task) -> float:
        return len(set(subagent.role.lower().split()) & 
                  set(task.type.lower().split())) / max(
                      len(subagent.role.lower().split()),
                      len(task.type.lower().split())
                  )
    
    def can_handle(self, task: Task) -> bool:
        return self.evaluate_capability(self, task) > 0.5
    
    def execute(self, task: Task) -> Dict:
        return {
            "agent": self.name,
            "task": task.description,
            "status": "completed",
            "result": f"Task executed by {self.name}"
        }

2. 平行协作

模式描述

多个SubAgent并行处理不同的任务或任务的不同部分。

实现

python
import asyncio
from typing import List, Dict

class ParallelSubAgent:
    def __init__(self, name: str, capabilities: List[str]):
        self.name = name
        self.capabilities = capabilities
    
    async def execute_task(self, task: Task) -> Dict:
        if not self.can_handle(task):
            return {
                "status": "failed",
                "error": f"{self.name} cannot handle this task"
            }
        
        await asyncio.sleep(1)
        
        return {
            "agent": self.name,
            "task": task.description,
            "status": "completed",
            "result": f"Task executed by {self.name}"
        }
    
    def can_handle(self, task: Task) -> bool:
        return any(cap in task.type.lower() 
                  for cap in self.capabilities)

class ParallelCoordinator:
    def __init__(self, subagents: List[ParallelSubAgent]):
        self.subagents = subagents
    
    async def execute_parallel(self, tasks: List[Task]) -> List[Dict]:
        assignments = self.assign_tasks(tasks)
        
        results = await asyncio.gather(*[
            subagent.execute_task(task)
            for subagent, task in assignments
        ])
        
        return results
    
    def assign_tasks(self, tasks: List[Task]) -> List[tuple]:
        assignments = []
        
        for task in tasks:
            capable_agents = [
                agent for agent in self.subagents
                if agent.can_handle(task)
            ]
            
            if capable_agents:
                best_agent = self.select_least_loaded(capable_agents)
                assignments.append((best_agent, task))
        
        return assignments
    
    def select_least_loaded(self, agents: List[ParallelSubAgent]) -> ParallelSubAgent:
        return agents[0]

3. 管道协作

模式描述

SubAgent按照流水线方式处理任务,每个SubAgent处理任务的一个阶段。

实现

python
class PipelineSubAgent:
    def __init__(self, name: str, stage: str):
        self.name = name
        self.stage = stage
        self.next_agent = None
    
    def set_next(self, next_agent: 'PipelineSubAgent'):
        self.next_agent = next_agent
    
    def process(self, task: Task) -> Dict:
        result = self.execute_stage(task)
        
        if self.next_agent:
            next_task = Task(
                description=f"{task.description} (after {self.stage})",
                type=task.type,
                data=result
            )
            return self.next_agent.process(next_task)
        
        return result
    
    def execute_stage(self, task: Task) -> Dict:
        return {
            "agent": self.name,
            "stage": self.stage,
            "input": task.data if hasattr(task, 'data') else None,
            "output": f"Processed by {self.name} at {self.stage} stage"
        }

class Pipeline:
    def __init__(self):
        self.head = None
        self.tail = None
    
    def add_stage(self, agent: PipelineSubAgent):
        if not self.head:
            self.head = agent
            self.tail = agent
        else:
            self.tail.set_next(agent)
            self.tail = agent
    
    def execute(self, task: Task) -> Dict:
        if not self.head:
            return {"status": "failed", "error": "No pipeline defined"}
        
        return self.head.process(task)

4. 竞争协作

模式描述

多个SubAgent竞争处理同一个任务,选择最佳结果。

实现

python
class CompetitiveSubAgent:
    def __init__(self, name: str, capability: str, quality_score: float):
        self.name = name
        self.capability = capability
        self.quality_score = quality_score
    
    def execute_task(self, task: Task) -> Dict:
        return {
            "agent": self.name,
            "task": task.description,
            "status": "completed",
            "result": f"Result from {self.name}",
            "quality": self.quality_score * 0.9 + 0.1 * random.random()
        }
    
    def can_handle(self, task: Task) -> bool:
        return self.capability.lower() in task.type.lower()

class CompetitiveCoordinator:
    def __init__(self, subagents: List[CompetitiveSubAgent]):
        self.subagents = subagents
    
    def execute_competitive(self, task: Task) -> Dict:
        capable_agents = [
            agent for agent in self.subagents
            if agent.can_handle(task)
        ]
        
        if not capable_agents:
            return {
                "status": "failed",
                "error": "No capable agent found"
            }
        
        results = [
            agent.execute_task(task)
            for agent in capable_agents
        ]
        
        best_result = max(results, key=lambda x: x["quality"])
        
        return {
            "status": "completed",
            "result": best_result,
            "all_results": results
        }

任务分配机制

基于能力的分配

python
class CapabilityBasedAllocator:
    def __init__(self, subagents: List[SubAgent]):
        self.subagents = subagents
        self.capability_matrix = self._build_capability_matrix()
    
    def _build_capability_matrix(self) -> Dict[str, Dict[str, float]]:
        matrix = {}
        
        for agent in self.subagents:
            matrix[agent.name] = {}
            for capability in agent.capabilities:
                matrix[agent.name][capability] = 1.0
        
        return matrix
    
    def allocate(self, task: Task) -> SubAgent:
        scores = []
        
        for agent in self.subagents:
            score = self._calculate_score(agent, task)
            scores.append((agent, score))
        
        return max(scores, key=lambda x: x[1])[0]
    
    def _calculate_score(self, agent: SubAgent, task: Task) -> float:
        score = 0
        
        for capability in agent.capabilities:
            if capability.lower() in task.type.lower():
                score += 1
        
        return score / len(agent.capabilities)

基于负载的分配

python
class LoadBasedAllocator:
    def __init__(self, subagents: List[SubAgent]):
        self.subagents = subagents
        self.load_tracker = {agent.name: 0 for agent in subagents}
    
    def allocate(self, task: Task) -> SubAgent:
        capable_agents = [
            agent for agent in self.subagents
            if agent.can_handle(task)
        ]
        
        if not capable_agents:
            raise Exception("No capable agent found")
        
        least_loaded = min(
            capable_agents,
            key=lambda x: self.load_tracker[x.name]
        )
        
        self.load_tracker[least_loaded.name] += 1
        
        return least_loaded
    
    def complete_task(self, agent: SubAgent):
        self.load_tracker[agent.name] = max(
            0, 
            self.load_tracker[agent.name] - 1
        )

基于优先级的分配

python
class PriorityBasedAllocator:
    def __init__(self, subagents: List[SubAgent]):
        self.subagents = subagents
        self.task_queue = PriorityQueue()
    
    def allocate(self, task: Task) -> SubAgent:
        self.task_queue.put(task)
        
        capable_agents = [
            agent for agent in self.subagents
            if agent.can_handle(task)
        ]
        
        if not capable_agents:
            raise Exception("No capable agent found")
        
        return self._select_agent(capable_agents, task)
    
    def _select_agent(self, agents: List[SubAgent], task: Task) -> SubAgent:
        if task.priority == "high":
            return agents[0]
        elif task.priority == "medium":
            return agents[len(agents) // 2]
        else:
            return agents[-1]

通信协议

消息格式

python
from dataclasses import dataclass
from typing import Any, Optional
from datetime import datetime

@dataclass
class Message:
    sender: str
    receiver: str
    content: Any
    message_type: str
    timestamp: datetime
    message_id: str
    reply_to: Optional[str] = None
    
    def to_dict(self) -> Dict:
        return {
            "sender": self.sender,
            "receiver": self.receiver,
            "content": self.content,
            "message_type": self.message_type,
            "timestamp": self.timestamp.isoformat(),
            "message_id": self.message_id,
            "reply_to": self.reply_to
        }
    
    @classmethod
    def from_dict(cls, data: Dict) -> 'Message':
        return cls(
            sender=data["sender"],
            receiver=data["receiver"],
            content=data["content"],
            message_type=data["message_type"],
            timestamp=datetime.fromisoformat(data["timestamp"]),
            message_id=data["message_id"],
            reply_to=data.get("reply_to")
        )

消息总线

python
import uuid
from typing import Callable, Dict

class MessageBus:
    def __init__(self):
        self.subscribers: Dict[str, List[Callable]] = {}
        self.message_history = []
    
    def subscribe(self, agent_name: str, handler: Callable):
        if agent_name not in self.subscribers:
            self.subscribers[agent_name] = []
        self.subscribers[agent_name].append(handler)
    
    def publish(self, message: Message):
        self.message_history.append(message)
        
        if message.receiver in self.subscribers:
            for handler in self.subscribers[message.receiver]:
                handler(message)
    
    def send(self, sender: str, receiver: str, content: Any, 
             message_type: str, reply_to: Optional[str] = None) -> Message:
        message = Message(
            sender=sender,
            receiver=receiver,
            content=content,
            message_type=message_type,
            timestamp=datetime.now(),
            message_id=str(uuid.uuid4()),
            reply_to=reply_to
        )
        
        self.publish(message)
        return message
    
    def get_history(self, agent_name: str) -> List[Message]:
        return [
            msg for msg in self.message_history
            if msg.sender == agent_name or msg.receiver == agent_name
        ]

请求-响应模式

python
class RequestResponseProtocol:
    def __init__(self, message_bus: MessageBus):
        self.message_bus = message_bus
        self.pending_requests = {}
    
    def send_request(self, sender: str, receiver: str, 
                    content: Any, timeout: float = 30.0) -> Dict:
        request_id = str(uuid.uuid4())
        
        self.message_bus.send(
            sender=sender,
            receiver=receiver,
            content=content,
            message_type="request",
            reply_to=request_id
        )
        
        return self._wait_for_response(request_id, timeout)
    
    def _wait_for_response(self, request_id: str, 
                         timeout: float) -> Dict:
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            if request_id in self.pending_requests:
                return self.pending_requests.pop(request_id)
            time.sleep(0.1)
        
        raise TimeoutError(f"Request {request_id} timed out")
    
    def handle_response(self, message: Message):
        if message.reply_to:
            self.pending_requests[message.reply_to] = message.content

协调策略

集中式协调

python
class CentralizedCoordinator:
    def __init__(self, subagents: List[SubAgent]):
        self.subagents = subagents
        self.task_queue = []
        self.completed_tasks = []
    
    def add_task(self, task: Task):
        self.task_queue.append(task)
    
    def coordinate(self) -> Dict:
        results = []
        
        while self.task_queue:
            task = self.task_queue.pop(0)
            
            assigned_agent = self._assign_task(task)
            result = assigned_agent.execute(task)
            
            results.append(result)
            self.completed_tasks.append(result)
        
        return {
            "status": "completed",
            "results": results
        }
    
    def _assign_task(self, task: Task) -> SubAgent:
        capable_agents = [
            agent for agent in self.subagents
            if agent.can_handle(task)
        ]
        
        if not capable_agents:
            raise Exception("No capable agent found")
        
        return capable_agents[0]

分布式协调

python
class DistributedCoordinator:
    def __init__(self, subagents: List[SubAgent]):
        self.subagents = subagents
        self.message_bus = MessageBus()
        self._setup_subscriptions()
    
    def _setup_subscriptions(self):
        for agent in self.subagents:
            self.message_bus.subscribe(
                agent.name,
                lambda msg: self.handle_message(agent, msg)
            )
    
    def handle_message(self, agent: SubAgent, message: Message):
        if message.message_type == "task":
            result = agent.execute(message.content)
            
            self.message_bus.send(
                sender=agent.name,
                receiver=message.sender,
                content=result,
                message_type="result",
                reply_to=message.message_id
            )
    
    def coordinate(self, tasks: List[Task]) -> Dict:
        results = []
        
        for task in tasks:
            self.message_bus.send(
                sender="coordinator",
                receiver="all",
                content=task,
                message_type="task"
            )
        
        return {
            "status": "completed",
            "results": results
        }

实践练习

练习1:实现层次化SubAgent系统

python
class SimpleHierarchicalSystem:
    def __init__(self):
        self.root = HierarchicalSubAgent("Root", "coordinator")
        
        research = HierarchicalSubAgent("Research", "research")
        development = HierarchicalSubAgent("Development", "development")
        testing = HierarchicalSubAgent("Testing", "testing")
        
        self.root.add_subordinate(research)
        self.root.add_subordinate(development)
        self.root.add_subordinate(testing)
    
    def execute_task(self, task: Task):
        return self.root.delegate_task(task)

练习2:实现并行SubAgent系统

python
class SimpleParallelSystem:
    def __init__(self):
        agents = [
            ParallelSubAgent("Agent1", ["research", "analysis"]),
            ParallelSubAgent("Agent2", ["development", "coding"]),
            ParallelSubAgent("Agent3", ["testing", "qa"])
        ]
        self.coordinator = ParallelCoordinator(agents)
    
    async def execute_tasks(self, tasks: List[Task]):
        return await self.coordinator.execute_parallel(tasks)

总结

本节我们学习了SubAgent(子智能体):

  1. SubAgent的概念和架构
  2. 子智能体协作模式(层次化、平行、管道、竞争)
  3. 任务分配机制(基于能力、负载、优先级)
  4. 通信协议(消息格式、消息总线、请求-响应)
  5. 协调策略(集中式、分布式)

SubAgent系统使我们能够构建更复杂、更强大的AI Agent应用。

参考资源