Appearance
第41天:SubAgent(子智能体)
学习目标
- 理解SubAgent的概念和原理
- 掌握子智能体协作模式
- 学习任务分配机制
- 了解通信协议
- 掌握协调策略
SubAgent概念
什么是SubAgent
SubAgent(子智能体)是大型Agent系统的组成部分,专门负责特定领域或功能的智能体。
核心特点:
- 专业化:每个SubAgent专注于特定领域
- 模块化:可以独立开发和测试
- 可组合:多个SubAgent可以组合成复杂系统
- 可扩展:易于添加新的SubAgent
优势:
- 降低单个Agent的复杂度
- 提高系统的可维护性
- 支持并行处理
- 便于专业化优化
SubAgent架构
┌─────────────────────────────────────────┐
│ Main Agent (Coordinator) │
├─────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ │
│ │ SubAgent│ │ SubAgent│ │
│ │ 1 │ │ 2 │ │
│ └─────────┘ └─────────┘ │
│ │ │ │
│ └──────┬──────┘ │
│ ▼ │
│ ┌─────────────────────────┐ │
│ │ Communication Hub │ │
│ └─────────────────────────┘ │
│ │ │
│ ┌─────────┐ ┌─────────┐ │
│ │ SubAgent│ │ SubAgent│ │
│ │ 3 │ │ 4 │ │
│ └─────────┘ └─────────┘ │
│ │
└─────────────────────────────────────────┘子智能体协作模式
1. 层次化协作
模式描述:
SubAgent按照层次结构组织,上层SubAgent协调下层SubAgent。
实现:
python
class HierarchicalSubAgent:
def __init__(self, name: str, role: str):
self.name = name
self.role = role
self.subordinates = []
self.superior = None
def add_subordinate(self, subagent):
subagent.superior = self
self.subordinates.append(subagent)
def delegate_task(self, task: Task) -> Dict:
if self.can_handle(task):
return self.execute(task)
else:
return self.delegate_to_subordinate(task)
def delegate_to_subordinate(self, task: Task) -> Dict:
best_subordinate = self.select_best_subordinate(task)
return best_subordinate.delegate_task(task)
def select_best_subordinate(self, task: Task) -> 'HierarchicalSubAgent':
scores = []
for subordinate in self.subordinates:
score = self.evaluate_capability(subordinate, task)
scores.append((subordinate, score))
return max(scores, key=lambda x: x[1])[0]
def evaluate_capability(self, subagent: 'HierarchicalSubAgent', task: Task) -> float:
return len(set(subagent.role.lower().split()) &
set(task.type.lower().split())) / max(
len(subagent.role.lower().split()),
len(task.type.lower().split())
)
def can_handle(self, task: Task) -> bool:
return self.evaluate_capability(self, task) > 0.5
def execute(self, task: Task) -> Dict:
return {
"agent": self.name,
"task": task.description,
"status": "completed",
"result": f"Task executed by {self.name}"
}2. 平行协作
模式描述:
多个SubAgent并行处理不同的任务或任务的不同部分。
实现:
python
import asyncio
from typing import List, Dict
class ParallelSubAgent:
def __init__(self, name: str, capabilities: List[str]):
self.name = name
self.capabilities = capabilities
async def execute_task(self, task: Task) -> Dict:
if not self.can_handle(task):
return {
"status": "failed",
"error": f"{self.name} cannot handle this task"
}
await asyncio.sleep(1)
return {
"agent": self.name,
"task": task.description,
"status": "completed",
"result": f"Task executed by {self.name}"
}
def can_handle(self, task: Task) -> bool:
return any(cap in task.type.lower()
for cap in self.capabilities)
class ParallelCoordinator:
def __init__(self, subagents: List[ParallelSubAgent]):
self.subagents = subagents
async def execute_parallel(self, tasks: List[Task]) -> List[Dict]:
assignments = self.assign_tasks(tasks)
results = await asyncio.gather(*[
subagent.execute_task(task)
for subagent, task in assignments
])
return results
def assign_tasks(self, tasks: List[Task]) -> List[tuple]:
assignments = []
for task in tasks:
capable_agents = [
agent for agent in self.subagents
if agent.can_handle(task)
]
if capable_agents:
best_agent = self.select_least_loaded(capable_agents)
assignments.append((best_agent, task))
return assignments
def select_least_loaded(self, agents: List[ParallelSubAgent]) -> ParallelSubAgent:
return agents[0]3. 管道协作
模式描述:
SubAgent按照流水线方式处理任务,每个SubAgent处理任务的一个阶段。
实现:
python
class PipelineSubAgent:
def __init__(self, name: str, stage: str):
self.name = name
self.stage = stage
self.next_agent = None
def set_next(self, next_agent: 'PipelineSubAgent'):
self.next_agent = next_agent
def process(self, task: Task) -> Dict:
result = self.execute_stage(task)
if self.next_agent:
next_task = Task(
description=f"{task.description} (after {self.stage})",
type=task.type,
data=result
)
return self.next_agent.process(next_task)
return result
def execute_stage(self, task: Task) -> Dict:
return {
"agent": self.name,
"stage": self.stage,
"input": task.data if hasattr(task, 'data') else None,
"output": f"Processed by {self.name} at {self.stage} stage"
}
class Pipeline:
def __init__(self):
self.head = None
self.tail = None
def add_stage(self, agent: PipelineSubAgent):
if not self.head:
self.head = agent
self.tail = agent
else:
self.tail.set_next(agent)
self.tail = agent
def execute(self, task: Task) -> Dict:
if not self.head:
return {"status": "failed", "error": "No pipeline defined"}
return self.head.process(task)4. 竞争协作
模式描述:
多个SubAgent竞争处理同一个任务,选择最佳结果。
实现:
python
class CompetitiveSubAgent:
def __init__(self, name: str, capability: str, quality_score: float):
self.name = name
self.capability = capability
self.quality_score = quality_score
def execute_task(self, task: Task) -> Dict:
return {
"agent": self.name,
"task": task.description,
"status": "completed",
"result": f"Result from {self.name}",
"quality": self.quality_score * 0.9 + 0.1 * random.random()
}
def can_handle(self, task: Task) -> bool:
return self.capability.lower() in task.type.lower()
class CompetitiveCoordinator:
def __init__(self, subagents: List[CompetitiveSubAgent]):
self.subagents = subagents
def execute_competitive(self, task: Task) -> Dict:
capable_agents = [
agent for agent in self.subagents
if agent.can_handle(task)
]
if not capable_agents:
return {
"status": "failed",
"error": "No capable agent found"
}
results = [
agent.execute_task(task)
for agent in capable_agents
]
best_result = max(results, key=lambda x: x["quality"])
return {
"status": "completed",
"result": best_result,
"all_results": results
}任务分配机制
基于能力的分配
python
class CapabilityBasedAllocator:
def __init__(self, subagents: List[SubAgent]):
self.subagents = subagents
self.capability_matrix = self._build_capability_matrix()
def _build_capability_matrix(self) -> Dict[str, Dict[str, float]]:
matrix = {}
for agent in self.subagents:
matrix[agent.name] = {}
for capability in agent.capabilities:
matrix[agent.name][capability] = 1.0
return matrix
def allocate(self, task: Task) -> SubAgent:
scores = []
for agent in self.subagents:
score = self._calculate_score(agent, task)
scores.append((agent, score))
return max(scores, key=lambda x: x[1])[0]
def _calculate_score(self, agent: SubAgent, task: Task) -> float:
score = 0
for capability in agent.capabilities:
if capability.lower() in task.type.lower():
score += 1
return score / len(agent.capabilities)基于负载的分配
python
class LoadBasedAllocator:
def __init__(self, subagents: List[SubAgent]):
self.subagents = subagents
self.load_tracker = {agent.name: 0 for agent in subagents}
def allocate(self, task: Task) -> SubAgent:
capable_agents = [
agent for agent in self.subagents
if agent.can_handle(task)
]
if not capable_agents:
raise Exception("No capable agent found")
least_loaded = min(
capable_agents,
key=lambda x: self.load_tracker[x.name]
)
self.load_tracker[least_loaded.name] += 1
return least_loaded
def complete_task(self, agent: SubAgent):
self.load_tracker[agent.name] = max(
0,
self.load_tracker[agent.name] - 1
)基于优先级的分配
python
class PriorityBasedAllocator:
def __init__(self, subagents: List[SubAgent]):
self.subagents = subagents
self.task_queue = PriorityQueue()
def allocate(self, task: Task) -> SubAgent:
self.task_queue.put(task)
capable_agents = [
agent for agent in self.subagents
if agent.can_handle(task)
]
if not capable_agents:
raise Exception("No capable agent found")
return self._select_agent(capable_agents, task)
def _select_agent(self, agents: List[SubAgent], task: Task) -> SubAgent:
if task.priority == "high":
return agents[0]
elif task.priority == "medium":
return agents[len(agents) // 2]
else:
return agents[-1]通信协议
消息格式
python
from dataclasses import dataclass
from typing import Any, Optional
from datetime import datetime
@dataclass
class Message:
sender: str
receiver: str
content: Any
message_type: str
timestamp: datetime
message_id: str
reply_to: Optional[str] = None
def to_dict(self) -> Dict:
return {
"sender": self.sender,
"receiver": self.receiver,
"content": self.content,
"message_type": self.message_type,
"timestamp": self.timestamp.isoformat(),
"message_id": self.message_id,
"reply_to": self.reply_to
}
@classmethod
def from_dict(cls, data: Dict) -> 'Message':
return cls(
sender=data["sender"],
receiver=data["receiver"],
content=data["content"],
message_type=data["message_type"],
timestamp=datetime.fromisoformat(data["timestamp"]),
message_id=data["message_id"],
reply_to=data.get("reply_to")
)消息总线
python
import uuid
from typing import Callable, Dict
class MessageBus:
def __init__(self):
self.subscribers: Dict[str, List[Callable]] = {}
self.message_history = []
def subscribe(self, agent_name: str, handler: Callable):
if agent_name not in self.subscribers:
self.subscribers[agent_name] = []
self.subscribers[agent_name].append(handler)
def publish(self, message: Message):
self.message_history.append(message)
if message.receiver in self.subscribers:
for handler in self.subscribers[message.receiver]:
handler(message)
def send(self, sender: str, receiver: str, content: Any,
message_type: str, reply_to: Optional[str] = None) -> Message:
message = Message(
sender=sender,
receiver=receiver,
content=content,
message_type=message_type,
timestamp=datetime.now(),
message_id=str(uuid.uuid4()),
reply_to=reply_to
)
self.publish(message)
return message
def get_history(self, agent_name: str) -> List[Message]:
return [
msg for msg in self.message_history
if msg.sender == agent_name or msg.receiver == agent_name
]请求-响应模式
python
class RequestResponseProtocol:
def __init__(self, message_bus: MessageBus):
self.message_bus = message_bus
self.pending_requests = {}
def send_request(self, sender: str, receiver: str,
content: Any, timeout: float = 30.0) -> Dict:
request_id = str(uuid.uuid4())
self.message_bus.send(
sender=sender,
receiver=receiver,
content=content,
message_type="request",
reply_to=request_id
)
return self._wait_for_response(request_id, timeout)
def _wait_for_response(self, request_id: str,
timeout: float) -> Dict:
start_time = time.time()
while time.time() - start_time < timeout:
if request_id in self.pending_requests:
return self.pending_requests.pop(request_id)
time.sleep(0.1)
raise TimeoutError(f"Request {request_id} timed out")
def handle_response(self, message: Message):
if message.reply_to:
self.pending_requests[message.reply_to] = message.content协调策略
集中式协调
python
class CentralizedCoordinator:
def __init__(self, subagents: List[SubAgent]):
self.subagents = subagents
self.task_queue = []
self.completed_tasks = []
def add_task(self, task: Task):
self.task_queue.append(task)
def coordinate(self) -> Dict:
results = []
while self.task_queue:
task = self.task_queue.pop(0)
assigned_agent = self._assign_task(task)
result = assigned_agent.execute(task)
results.append(result)
self.completed_tasks.append(result)
return {
"status": "completed",
"results": results
}
def _assign_task(self, task: Task) -> SubAgent:
capable_agents = [
agent for agent in self.subagents
if agent.can_handle(task)
]
if not capable_agents:
raise Exception("No capable agent found")
return capable_agents[0]分布式协调
python
class DistributedCoordinator:
def __init__(self, subagents: List[SubAgent]):
self.subagents = subagents
self.message_bus = MessageBus()
self._setup_subscriptions()
def _setup_subscriptions(self):
for agent in self.subagents:
self.message_bus.subscribe(
agent.name,
lambda msg: self.handle_message(agent, msg)
)
def handle_message(self, agent: SubAgent, message: Message):
if message.message_type == "task":
result = agent.execute(message.content)
self.message_bus.send(
sender=agent.name,
receiver=message.sender,
content=result,
message_type="result",
reply_to=message.message_id
)
def coordinate(self, tasks: List[Task]) -> Dict:
results = []
for task in tasks:
self.message_bus.send(
sender="coordinator",
receiver="all",
content=task,
message_type="task"
)
return {
"status": "completed",
"results": results
}实践练习
练习1:实现层次化SubAgent系统
python
class SimpleHierarchicalSystem:
def __init__(self):
self.root = HierarchicalSubAgent("Root", "coordinator")
research = HierarchicalSubAgent("Research", "research")
development = HierarchicalSubAgent("Development", "development")
testing = HierarchicalSubAgent("Testing", "testing")
self.root.add_subordinate(research)
self.root.add_subordinate(development)
self.root.add_subordinate(testing)
def execute_task(self, task: Task):
return self.root.delegate_task(task)练习2:实现并行SubAgent系统
python
class SimpleParallelSystem:
def __init__(self):
agents = [
ParallelSubAgent("Agent1", ["research", "analysis"]),
ParallelSubAgent("Agent2", ["development", "coding"]),
ParallelSubAgent("Agent3", ["testing", "qa"])
]
self.coordinator = ParallelCoordinator(agents)
async def execute_tasks(self, tasks: List[Task]):
return await self.coordinator.execute_parallel(tasks)总结
本节我们学习了SubAgent(子智能体):
- SubAgent的概念和架构
- 子智能体协作模式(层次化、平行、管道、竞争)
- 任务分配机制(基于能力、负载、优先级)
- 通信协议(消息格式、消息总线、请求-响应)
- 协调策略(集中式、分布式)
SubAgent系统使我们能够构建更复杂、更强大的AI Agent应用。
