Appearance
第79天:个人助理Agent-核心功能开发
学习目标
- 掌握Agent核心实现
- 学习任务规划实现
- 理解工具调用实现
- 掌握记忆管理实现
- 学习Agent循环实现
Agent核心实现
Agent基类
python
from typing import Dict, List, Optional, Any
from datetime import datetime
import openai
import asyncio
class BaseAgent:
def __init__(
self,
llm_client: openai.OpenAI,
name: str = "Assistant",
description: str = "AI智能助手"
):
self.llm_client = llm_client
self.name = name
self.description = description
self.conversation_history = []
self.current_task = None
self.state = "idle"
async def process_input(
self,
user_input: str,
context: Optional[Dict] = None
) -> Dict:
self.conversation_history.append({
"role": "user",
"content": user_input,
"timestamp": datetime.now().isoformat()
})
try:
result = await self._execute_agent_loop(user_input, context)
self.conversation_history.append({
"role": "assistant",
"content": result["response"],
"timestamp": datetime.now().isoformat(),
"actions": result.get("actions", [])
})
return result
except Exception as e:
error_response = {
"response": f"处理失败: {str(e)}",
"error": str(e),
"success": False
}
self.conversation_history.append({
"role": "assistant",
"content": error_response["response"],
"timestamp": datetime.now().isoformat()
})
return error_response
async def _execute_agent_loop(
self,
user_input: str,
context: Optional[Dict] = None
) -> Dict:
max_iterations = 10
iteration = 0
current_state = {
"user_input": user_input,
"context": context or {},
"actions": [],
"observations": []
}
while iteration < max_iterations:
iteration += 1
decision = await self._make_decision(current_state)
if decision["action"] == "respond":
return {
"response": decision["response"],
"actions": current_state["actions"],
"success": True
}
elif decision["action"] == "tool":
tool_result = await self._execute_tool(
decision["tool"],
decision["parameters"]
)
current_state["actions"].append({
"tool": decision["tool"],
"parameters": decision["parameters"],
"result": tool_result
})
current_state["observations"].append({
"type": "tool_result",
"content": tool_result
})
elif decision["action"] == "think":
current_state["observations"].append({
"type": "thought",
"content": decision["thought"]
})
elif decision["action"] == "plan":
plan = await self._create_plan(
decision["goal"],
current_state
)
current_state["actions"].append({
"action": "plan",
"plan": plan
})
return {
"response": "我需要更多信息来帮助您",
"actions": current_state["actions"],
"success": False
}
async def _make_decision(self, state: Dict) -> Dict:
prompt = self._build_decision_prompt(state)
try:
completion = self.llm_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": self._get_system_prompt()},
{"role": "user", "content": prompt}
],
temperature=0.3,
response_format={"type": "json_object"}
)
result = completion.choices[0].message.content
import json
return json.loads(result)
except Exception as e:
return {
"action": "respond",
"response": "我遇到了一些问题,请稍后再试"
}
def _get_system_prompt(self) -> str:
return f"""你是{self.name},{self.description}。
你的能力:
- 理解用户意图
- 规划任务步骤
- 调用工具完成任务
- 记忆重要信息
- 学习用户偏好
请根据用户输入和当前状态,决定下一步行动。"""
def _build_decision_prompt(self, state: Dict) -> str:
prompt = f"""用户输入:{state['user_input']}
当前状态:
{self._format_state(state)}
请决定下一步行动,返回JSON格式:
{{
"action": "respond|tool|think|plan",
"response": "如果action是respond,提供回复",
"tool": "如果action是tool,指定工具名称",
"parameters": "如果action是tool,提供工具参数",
"thought": "如果action是think,提供思考内容",
"goal": "如果action是plan,提供规划目标"
}}"""
return prompt
def _format_state(self, state: Dict) -> str:
parts = []
if state.get("actions"):
parts.append("已执行的操作:")
for action in state["actions"]:
parts.append(f"- {action}")
if state.get("observations"):
parts.append("观察结果:")
for obs in state["observations"]:
parts.append(f"- {obs}")
return "\n".join(parts)
async def _execute_tool(
self,
tool_name: str,
parameters: Dict
) -> Dict:
raise NotImplementedError("子类必须实现_execute_tool方法")
async def _create_plan(
self,
goal: str,
state: Dict
) -> Dict:
raise NotImplementedError("子类必须实现_create_plan方法")个人助理Agent
python
class PersonalAssistantAgent(BaseAgent):
def __init__(
self,
llm_client: openai.OpenAI,
tool_manager,
memory_storage
):
super().__init__(
llm_client,
name="个人助理",
description="您的智能个人助理,帮助您管理任务、日程和信息"
)
self.tool_manager = tool_manager
self.memory_storage = memory_storage
async def _execute_tool(
self,
tool_name: str,
parameters: Dict
) -> Dict:
try:
result = await self.tool_manager.call_tool(
tool_name,
parameters
)
self.memory_storage.add_short_term(
f"调用工具{tool_name},参数:{parameters},结果:{result}"
)
return result
except Exception as e:
return {
"success": False,
"error": str(e)
}
async def _create_plan(
self,
goal: str,
state: Dict
) -> Dict:
prompt = f"""请为以下目标创建执行计划:
目标:{goal}
当前状态:
{self._format_state(state)}
请返回JSON格式的计划:
{{
"steps": [
{{
"step": 1,
"action": "tool|respond",
"description": "步骤描述",
"tool": "如果action是tool,指定工具",
"parameters": "工具参数"
}}
]
}}"""
try:
completion = self.llm_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "你是一个专业的任务规划器"},
{"role": "user", "content": prompt}
],
temperature=0.3,
response_format={"type": "json_object"}
)
result = completion.choices[0].message.content
import json
return json.loads(result)
except Exception as e:
return {
"steps": []
}任务规划实现
任务分解器
python
class TaskDecomposer:
def __init__(self, llm_client: openai.OpenAI):
self.llm_client = llm_client
async def decompose(
self,
task: str,
context: Optional[Dict] = None
) -> List[Dict]:
prompt = f"""请将以下任务分解为可执行的子任务:
任务:{task}
上下文:
{context if context else "无"}
请返回JSON格式的子任务列表:
{{
"subtasks": [
{{
"id": "task_1",
"name": "子任务名称",
"description": "子任务描述",
"tool": "需要的工具",
"parameters": "工具参数",
"dependencies": ["依赖的子任务ID"],
"estimated_time": "预估时间(分钟)"
}}
]
}}"""
try:
completion = self.llm_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "你是一个专业的任务分解器"},
{"role": "user", "content": prompt}
],
temperature=0.3,
response_format={"type": "json_object"}
)
result = completion.choices[0].message.content
import json
return json.loads(result).get("subtasks", [])
except Exception as e:
return []
async def validate_subtasks(
self,
subtasks: List[Dict],
available_tools: List[str]
) -> Dict:
valid_subtasks = []
invalid_subtasks = []
for subtask in subtasks:
tool = subtask.get("tool")
if tool in available_tools:
valid_subtasks.append(subtask)
else:
invalid_subtasks.append(subtask)
return {
"valid": valid_subtasks,
"invalid": invalid_subtasks,
"validation_passed": len(invalid_subtasks) == 0
}任务执行器
python
class TaskExecutor:
def __init__(
self,
tool_manager,
memory_storage
):
self.tool_manager = tool_manager
self.memory_storage = memory_storage
async def execute(
self,
subtasks: List[Dict]
) -> Dict:
execution_log = []
results = {}
task_graph = self._build_dependency_graph(subtasks)
executed_tasks = set()
while len(executed_tasks) < len(subtasks):
ready_tasks = self._find_ready_tasks(
task_graph,
executed_tasks
)
if not ready_tasks:
break
for task_id in ready_tasks:
task = self._find_task_by_id(subtasks, task_id)
try:
result = await self._execute_single_task(task)
execution_log.append({
"task_id": task_id,
"status": "success",
"result": result
})
results[task_id] = result
executed_tasks.add(task_id)
except Exception as e:
execution_log.append({
"task_id": task_id,
"status": "failed",
"error": str(e)
})
results[task_id] = {
"success": False,
"error": str(e)
}
return {
"execution_log": execution_log,
"results": results,
"all_success": all(
log["status"] == "success"
for log in execution_log
)
}
def _build_dependency_graph(self, subtasks: List[Dict]) -> Dict:
graph = {
"nodes": {},
"edges": {}
}
for task in subtasks:
task_id = task["id"]
graph["nodes"][task_id] = task
graph["edges"][task_id] = task.get("dependencies", [])
return graph
def _find_ready_tasks(
self,
graph: Dict,
executed_tasks: set
) -> List[str]:
ready_tasks = []
for task_id, dependencies in graph["edges"].items():
if task_id in executed_tasks:
continue
if all(
dep in executed_tasks
for dep in dependencies
):
ready_tasks.append(task_id)
return ready_tasks
def _find_task_by_id(
self,
subtasks: List[Dict],
task_id: str
) -> Dict:
for task in subtasks:
if task["id"] == task_id:
return task
raise ValueError(f"任务不存在: {task_id}")
async def _execute_single_task(
self,
task: Dict
) -> Dict:
tool = task["tool"]
parameters = task["parameters"]
result = await self.tool_manager.call_tool(
tool,
parameters
)
self.memory_storage.add_short_term(
f"执行任务{task['id']}:{task['name']},结果:{result}"
)
return result工具调用实现
工具管理器
python
class ToolManager:
def __init__(self):
self.tools = {}
self.tool_schemas = {}
def register_tool(
self,
name: str,
schema: Dict,
handler: callable
):
self.tools[name] = handler
self.tool_schemas[name] = schema
async def call_tool(
self,
name: str,
parameters: Dict
) -> Dict:
if name not in self.tools:
raise ValueError(f"工具不存在: {name}")
schema = self.tool_schemas[name]
self._validate_parameters(parameters, schema)
handler = self.tools[name]
try:
result = await handler(parameters)
return {
"success": True,
"result": result
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def _validate_parameters(
self,
parameters: Dict,
schema: Dict
):
required = schema.get("required", [])
properties = schema.get("properties", {})
for param in required:
if param not in parameters:
raise ValueError(f"缺少必需参数: {param}")
for param_name, param_value in parameters.items():
if param_name not in properties:
continue
param_schema = properties[param_name]
param_type = param_schema.get("type")
if param_type == "string" and not isinstance(param_value, str):
raise ValueError(f"参数{param_name}必须是字符串")
elif param_type == "number" and not isinstance(param_value, (int, float)):
raise ValueError(f"参数{param_name}必须是数字")
elif param_type == "integer" and not isinstance(param_value, int):
raise ValueError(f"参数{param_name}必须是整数")
elif param_type == "boolean" and not isinstance(param_value, bool):
raise ValueError(f"参数{param_name}必须是布尔值")
def get_tool_schemas(self) -> Dict:
return self.tool_schemas
def discover_tools(self, query: str) -> List[str]:
relevant_tools = []
for tool_name, schema in self.tool_schemas.items():
description = schema.get("description", "")
if self._is_relevant(query, description):
relevant_tools.append(tool_name)
return relevant_tools
def _is_relevant(self, query: str, description: str) -> bool:
keywords = query.lower().split()
description_lower = description.lower()
return any(
keyword in description_lower
for keyword in keywords
)个人助理工具集
python
class PersonalAssistantTools:
@staticmethod
async def search_web(parameters: Dict) -> Dict:
query = parameters["query"]
import requests
try:
response = requests.get(
"https://api.duckduckgo.com/",
params={"q": query}
)
results = response.json().get("RelatedTopics", [])
return {
"query": query,
"results": results[:5],
"count": len(results)
}
except Exception as e:
raise Exception(f"搜索失败: {str(e)}")
@staticmethod
async def create_calendar_event(parameters: Dict) -> Dict:
title = parameters["title"]
start_time = parameters["start_time"]
end_time = parameters.get("end_time")
description = parameters.get("description", "")
event_id = f"evt_{hash(title)}_{hash(start_time)}"
return {
"event_id": event_id,
"title": title,
"start_time": start_time,
"end_time": end_time,
"description": description,
"status": "created"
}
@staticmethod
async def get_calendar_events(parameters: Dict) -> Dict:
date = parameters.get("date")
return {
"date": date,
"events": [],
"count": 0
}
@staticmethod
async def create_todo(parameters: Dict) -> Dict:
title = parameters["title"]
description = parameters.get("description", "")
priority = parameters.get("priority", "medium")
todo_id = f"todo_{hash(title)}"
return {
"todo_id": todo_id,
"title": title,
"description": description,
"priority": priority,
"status": "pending",
"created_at": datetime.now().isoformat()
}
@staticmethod
async def get_todos(parameters: Dict) -> Dict:
status = parameters.get("status")
return {
"todos": [],
"count": 0
}
@staticmethod
async def send_email(parameters: Dict) -> Dict:
to = parameters["to"]
subject = parameters["subject"]
body = parameters["body"]
email_id = f"email_{hash(subject)}"
return {
"email_id": email_id,
"to": to,
"subject": subject,
"status": "sent"
}
@staticmethod
async def take_note(parameters: Dict) -> Dict:
content = parameters["content"]
title = parameters.get("title", "")
tags = parameters.get("tags", [])
note_id = f"note_{hash(content)}"
return {
"note_id": note_id,
"title": title,
"content": content,
"tags": tags,
"created_at": datetime.now().isoformat()
}
@staticmethod
async def search_notes(parameters: Dict) -> Dict:
query = parameters["query"]
return {
"query": query,
"notes": [],
"count": 0
}记忆管理实现
记忆管理器
python
class MemoryManager:
def __init__(self, storage):
self.storage = storage
async def store(
self,
content: str,
memory_type: str = "short_term",
metadata: Optional[Dict] = None
) -> str:
if memory_type == "short_term":
return self.storage.add_short_term(content, metadata)
elif memory_type == "long_term":
key = metadata.get("key", f"ltm_{hash(content)}")
self.storage.add_long_term(key, content, metadata)
return key
elif memory_type == "knowledge":
key = metadata.get("key", f"knl_{hash(content)}")
self.storage.add_knowledge(key, content)
return key
else:
raise ValueError(f"不支持的记忆类型: {memory_type}")
async def retrieve(
self,
query: str,
memory_types: Optional[List[str]] = None
) -> Dict:
if memory_types is None:
memory_types = ["short_term", "long_term", "knowledge"]
results = {}
if "short_term" in memory_types:
results["short_term"] = self.storage.retrieve_short_term()
if "long_term" in memory_types:
key = self._find_best_match(query, self.storage.long_term_memory)
if key:
results["long_term"] = [self.storage.retrieve_long_term(key)]
else:
results["long_term"] = []
if "knowledge" in memory_types:
results["knowledge"] = self.storage.retrieve_knowledge(query)
return results
def _find_best_match(
self,
query: str,
memory: Dict
) -> Optional[str]:
if not memory:
return None
best_key = None
best_score = 0
for key, value in memory.items():
score = self._calculate_similarity(query, value["content"])
if score > best_score:
best_score = score
best_key = key
return best_key if best_score > 0.5 else None
def _calculate_similarity(
self,
query: str,
content: str
) -> float:
query_words = set(query.lower().split())
content_words = set(content.lower().split())
if not query_words or not content_words:
return 0.0
intersection = query_words & content_words
union = query_words | content_words
return len(intersection) / len(union)
async def update(
self,
key: str,
content: str,
memory_type: str = "long_term"
):
if memory_type == "long_term":
if key in self.storage.long_term_memory:
self.storage.long_term_memory[key]["content"] = content
self.storage.long_term_memory[key]["updated_at"] = datetime.now().isoformat()
elif memory_type == "knowledge":
if key in self.storage.knowledge_base:
self.storage.knowledge_base[key]["knowledge"] = content
self.storage.knowledge_base[key]["updated_at"] = datetime.now().isoformat()
async def delete(
self,
key: str,
memory_type: str = "long_term"
):
if memory_type == "long_term" and key in self.storage.long_term_memory:
del self.storage.long_term_memory[key]
elif memory_type == "knowledge" and key in self.storage.knowledge_base:
del self.storage.knowledge_base[key]Agent循环实现
Agent循环
python
class AgentLoop:
def __init__(
self,
agent: PersonalAssistantAgent,
max_iterations: int = 10
):
self.agent = agent
self.max_iterations = max_iterations
async def run(
self,
user_input: str,
context: Optional[Dict] = None
) -> Dict:
iteration = 0
state = {
"user_input": user_input,
"context": context or {},
"actions": [],
"observations": []
}
while iteration < self.max_iterations:
iteration += 1
decision = await self._make_decision(state)
if decision["action"] == "respond":
return {
"response": decision["response"],
"actions": state["actions"],
"iterations": iteration,
"success": True
}
elif decision["action"] == "tool":
tool_result = await self._execute_tool(
decision["tool"],
decision["parameters"]
)
state["actions"].append({
"tool": decision["tool"],
"parameters": decision["parameters"],
"result": tool_result
})
state["observations"].append({
"type": "tool_result",
"content": tool_result
})
elif decision["action"] == "think":
state["observations"].append({
"type": "thought",
"content": decision["thought"]
})
elif decision["action"] == "plan":
plan = await self._create_plan(
decision["goal"],
state
)
state["actions"].append({
"action": "plan",
"plan": plan
})
return {
"response": "我需要更多信息来帮助您",
"actions": state["actions"],
"iterations": iteration,
"success": False
}
async def _make_decision(self, state: Dict) -> Dict:
return await self.agent._make_decision(state)
async def _execute_tool(
self,
tool_name: str,
parameters: Dict
) -> Dict:
return await self.agent._execute_tool(tool_name, parameters)
async def _create_plan(
self,
goal: str,
state: Dict
) -> Dict:
return await self.agent._create_plan(goal, state)实践练习
练习1:实现Agent核心
python
def implement_agent_core():
llm_client = openai.OpenAI(api_key="your-api-key")
tool_manager = ToolManager()
memory_storage = MemoryStorage()
agent = PersonalAssistantAgent(
llm_client,
tool_manager,
memory_storage
)
return agent练习2:实现任务规划
python
def implement_task_planning():
llm_client = openai.OpenAI(api_key="your-api-key")
decomposer = TaskDecomposer(llm_client)
return decomposer练习3:实现工具调用
python
def implement_tool_calling():
tool_manager = ToolManager()
return tool_manager总结
本节我们学习了个人助理Agent的核心功能开发:
- Agent核心实现
- 任务规划实现
- 工具调用实现
- 记忆管理实现
- Agent循环实现
Agent的核心是理解、规划、执行、记忆的循环。
