Skip to content

第79天:个人助理Agent-核心功能开发

学习目标

  • 掌握Agent核心实现
  • 学习任务规划实现
  • 理解工具调用实现
  • 掌握记忆管理实现
  • 学习Agent循环实现

Agent核心实现

Agent基类

python
from typing import Dict, List, Optional, Any
from datetime import datetime
import openai
import asyncio

class BaseAgent:
    def __init__(
        self,
        llm_client: openai.OpenAI,
        name: str = "Assistant",
        description: str = "AI智能助手"
    ):
        self.llm_client = llm_client
        self.name = name
        self.description = description
        self.conversation_history = []
        self.current_task = None
        self.state = "idle"
    
    async def process_input(
        self,
        user_input: str,
        context: Optional[Dict] = None
    ) -> Dict:
        self.conversation_history.append({
            "role": "user",
            "content": user_input,
            "timestamp": datetime.now().isoformat()
        })
        
        try:
            result = await self._execute_agent_loop(user_input, context)
            
            self.conversation_history.append({
                "role": "assistant",
                "content": result["response"],
                "timestamp": datetime.now().isoformat(),
                "actions": result.get("actions", [])
            })
            
            return result
        
        except Exception as e:
            error_response = {
                "response": f"处理失败: {str(e)}",
                "error": str(e),
                "success": False
            }
            
            self.conversation_history.append({
                "role": "assistant",
                "content": error_response["response"],
                "timestamp": datetime.now().isoformat()
            })
            
            return error_response
    
    async def _execute_agent_loop(
        self,
        user_input: str,
        context: Optional[Dict] = None
    ) -> Dict:
        max_iterations = 10
        iteration = 0
        
        current_state = {
            "user_input": user_input,
            "context": context or {},
            "actions": [],
            "observations": []
        }
        
        while iteration < max_iterations:
            iteration += 1
            
            decision = await self._make_decision(current_state)
            
            if decision["action"] == "respond":
                return {
                    "response": decision["response"],
                    "actions": current_state["actions"],
                    "success": True
                }
            
            elif decision["action"] == "tool":
                tool_result = await self._execute_tool(
                    decision["tool"],
                    decision["parameters"]
                )
                
                current_state["actions"].append({
                    "tool": decision["tool"],
                    "parameters": decision["parameters"],
                    "result": tool_result
                })
                
                current_state["observations"].append({
                    "type": "tool_result",
                    "content": tool_result
                })
            
            elif decision["action"] == "think":
                current_state["observations"].append({
                    "type": "thought",
                    "content": decision["thought"]
                })
            
            elif decision["action"] == "plan":
                plan = await self._create_plan(
                    decision["goal"],
                    current_state
                )
                
                current_state["actions"].append({
                    "action": "plan",
                    "plan": plan
                })
        
        return {
            "response": "我需要更多信息来帮助您",
            "actions": current_state["actions"],
            "success": False
        }
    
    async def _make_decision(self, state: Dict) -> Dict:
        prompt = self._build_decision_prompt(state)
        
        try:
            completion = self.llm_client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": self._get_system_prompt()},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.3,
                response_format={"type": "json_object"}
            )
            
            result = completion.choices[0].message.content
            
            import json
            return json.loads(result)
        
        except Exception as e:
            return {
                "action": "respond",
                "response": "我遇到了一些问题,请稍后再试"
            }
    
    def _get_system_prompt(self) -> str:
        return f"""你是{self.name}{self.description}

你的能力:
- 理解用户意图
- 规划任务步骤
- 调用工具完成任务
- 记忆重要信息
- 学习用户偏好

请根据用户输入和当前状态,决定下一步行动。"""
    
    def _build_decision_prompt(self, state: Dict) -> str:
        prompt = f"""用户输入:{state['user_input']}

当前状态:
{self._format_state(state)}

请决定下一步行动,返回JSON格式:
{{
  "action": "respond|tool|think|plan",
  "response": "如果action是respond,提供回复",
  "tool": "如果action是tool,指定工具名称",
  "parameters": "如果action是tool,提供工具参数",
  "thought": "如果action是think,提供思考内容",
  "goal": "如果action是plan,提供规划目标"
}}"""
        
        return prompt
    
    def _format_state(self, state: Dict) -> str:
        parts = []
        
        if state.get("actions"):
            parts.append("已执行的操作:")
            for action in state["actions"]:
                parts.append(f"- {action}")
        
        if state.get("observations"):
            parts.append("观察结果:")
            for obs in state["observations"]:
                parts.append(f"- {obs}")
        
        return "\n".join(parts)
    
    async def _execute_tool(
        self,
        tool_name: str,
        parameters: Dict
    ) -> Dict:
        raise NotImplementedError("子类必须实现_execute_tool方法")
    
    async def _create_plan(
        self,
        goal: str,
        state: Dict
    ) -> Dict:
        raise NotImplementedError("子类必须实现_create_plan方法")

个人助理Agent

python
class PersonalAssistantAgent(BaseAgent):
    def __init__(
        self,
        llm_client: openai.OpenAI,
        tool_manager,
        memory_storage
    ):
        super().__init__(
            llm_client,
            name="个人助理",
            description="您的智能个人助理,帮助您管理任务、日程和信息"
        )
        
        self.tool_manager = tool_manager
        self.memory_storage = memory_storage
    
    async def _execute_tool(
        self,
        tool_name: str,
        parameters: Dict
    ) -> Dict:
        try:
            result = await self.tool_manager.call_tool(
                tool_name,
                parameters
            )
            
            self.memory_storage.add_short_term(
                f"调用工具{tool_name},参数:{parameters},结果:{result}"
            )
            
            return result
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
    
    async def _create_plan(
        self,
        goal: str,
        state: Dict
    ) -> Dict:
        prompt = f"""请为以下目标创建执行计划:

目标:{goal}

当前状态:
{self._format_state(state)}

请返回JSON格式的计划:
{{
  "steps": [
    {{
      "step": 1,
      "action": "tool|respond",
      "description": "步骤描述",
      "tool": "如果action是tool,指定工具",
      "parameters": "工具参数"
    }}
  ]
}}"""
        
        try:
            completion = self.llm_client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "你是一个专业的任务规划器"},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.3,
                response_format={"type": "json_object"}
            )
            
            result = completion.choices[0].message.content
            
            import json
            return json.loads(result)
        
        except Exception as e:
            return {
                "steps": []
            }

任务规划实现

任务分解器

python
class TaskDecomposer:
    def __init__(self, llm_client: openai.OpenAI):
        self.llm_client = llm_client
    
    async def decompose(
        self,
        task: str,
        context: Optional[Dict] = None
    ) -> List[Dict]:
        prompt = f"""请将以下任务分解为可执行的子任务:

任务:{task}

上下文:
{context if context else "无"}

请返回JSON格式的子任务列表:
{{
  "subtasks": [
    {{
      "id": "task_1",
      "name": "子任务名称",
      "description": "子任务描述",
      "tool": "需要的工具",
      "parameters": "工具参数",
      "dependencies": ["依赖的子任务ID"],
      "estimated_time": "预估时间(分钟)"
    }}
  ]
}}"""
        
        try:
            completion = self.llm_client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "你是一个专业的任务分解器"},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.3,
                response_format={"type": "json_object"}
            )
            
            result = completion.choices[0].message.content
            
            import json
            return json.loads(result).get("subtasks", [])
        
        except Exception as e:
            return []
    
    async def validate_subtasks(
        self,
        subtasks: List[Dict],
        available_tools: List[str]
    ) -> Dict:
        valid_subtasks = []
        invalid_subtasks = []
        
        for subtask in subtasks:
            tool = subtask.get("tool")
            
            if tool in available_tools:
                valid_subtasks.append(subtask)
            else:
                invalid_subtasks.append(subtask)
        
        return {
            "valid": valid_subtasks,
            "invalid": invalid_subtasks,
            "validation_passed": len(invalid_subtasks) == 0
        }

任务执行器

python
class TaskExecutor:
    def __init__(
        self,
        tool_manager,
        memory_storage
    ):
        self.tool_manager = tool_manager
        self.memory_storage = memory_storage
    
    async def execute(
        self,
        subtasks: List[Dict]
    ) -> Dict:
        execution_log = []
        results = {}
        
        task_graph = self._build_dependency_graph(subtasks)
        
        executed_tasks = set()
        
        while len(executed_tasks) < len(subtasks):
            ready_tasks = self._find_ready_tasks(
                task_graph,
                executed_tasks
            )
            
            if not ready_tasks:
                break
            
            for task_id in ready_tasks:
                task = self._find_task_by_id(subtasks, task_id)
                
                try:
                    result = await self._execute_single_task(task)
                    
                    execution_log.append({
                        "task_id": task_id,
                        "status": "success",
                        "result": result
                    })
                    
                    results[task_id] = result
                    executed_tasks.add(task_id)
                
                except Exception as e:
                    execution_log.append({
                        "task_id": task_id,
                        "status": "failed",
                        "error": str(e)
                    })
                    
                    results[task_id] = {
                        "success": False,
                        "error": str(e)
                    }
        
        return {
            "execution_log": execution_log,
            "results": results,
            "all_success": all(
                log["status"] == "success"
                for log in execution_log
            )
        }
    
    def _build_dependency_graph(self, subtasks: List[Dict]) -> Dict:
        graph = {
            "nodes": {},
            "edges": {}
        }
        
        for task in subtasks:
            task_id = task["id"]
            graph["nodes"][task_id] = task
            graph["edges"][task_id] = task.get("dependencies", [])
        
        return graph
    
    def _find_ready_tasks(
        self,
        graph: Dict,
        executed_tasks: set
    ) -> List[str]:
        ready_tasks = []
        
        for task_id, dependencies in graph["edges"].items():
            if task_id in executed_tasks:
                continue
            
            if all(
                dep in executed_tasks
                for dep in dependencies
            ):
                ready_tasks.append(task_id)
        
        return ready_tasks
    
    def _find_task_by_id(
        self,
        subtasks: List[Dict],
        task_id: str
    ) -> Dict:
        for task in subtasks:
            if task["id"] == task_id:
                return task
        
        raise ValueError(f"任务不存在: {task_id}")
    
    async def _execute_single_task(
        self,
        task: Dict
    ) -> Dict:
        tool = task["tool"]
        parameters = task["parameters"]
        
        result = await self.tool_manager.call_tool(
            tool,
            parameters
        )
        
        self.memory_storage.add_short_term(
            f"执行任务{task['id']}{task['name']},结果:{result}"
        )
        
        return result

工具调用实现

工具管理器

python
class ToolManager:
    def __init__(self):
        self.tools = {}
        self.tool_schemas = {}
    
    def register_tool(
        self,
        name: str,
        schema: Dict,
        handler: callable
    ):
        self.tools[name] = handler
        self.tool_schemas[name] = schema
    
    async def call_tool(
        self,
        name: str,
        parameters: Dict
    ) -> Dict:
        if name not in self.tools:
            raise ValueError(f"工具不存在: {name}")
        
        schema = self.tool_schemas[name]
        
        self._validate_parameters(parameters, schema)
        
        handler = self.tools[name]
        
        try:
            result = await handler(parameters)
            
            return {
                "success": True,
                "result": result
            }
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }
    
    def _validate_parameters(
        self,
        parameters: Dict,
        schema: Dict
    ):
        required = schema.get("required", [])
        properties = schema.get("properties", {})
        
        for param in required:
            if param not in parameters:
                raise ValueError(f"缺少必需参数: {param}")
        
        for param_name, param_value in parameters.items():
            if param_name not in properties:
                continue
            
            param_schema = properties[param_name]
            param_type = param_schema.get("type")
            
            if param_type == "string" and not isinstance(param_value, str):
                raise ValueError(f"参数{param_name}必须是字符串")
            elif param_type == "number" and not isinstance(param_value, (int, float)):
                raise ValueError(f"参数{param_name}必须是数字")
            elif param_type == "integer" and not isinstance(param_value, int):
                raise ValueError(f"参数{param_name}必须是整数")
            elif param_type == "boolean" and not isinstance(param_value, bool):
                raise ValueError(f"参数{param_name}必须是布尔值")
    
    def get_tool_schemas(self) -> Dict:
        return self.tool_schemas
    
    def discover_tools(self, query: str) -> List[str]:
        relevant_tools = []
        
        for tool_name, schema in self.tool_schemas.items():
            description = schema.get("description", "")
            
            if self._is_relevant(query, description):
                relevant_tools.append(tool_name)
        
        return relevant_tools
    
    def _is_relevant(self, query: str, description: str) -> bool:
        keywords = query.lower().split()
        description_lower = description.lower()
        
        return any(
            keyword in description_lower
            for keyword in keywords
        )

个人助理工具集

python
class PersonalAssistantTools:
    @staticmethod
    async def search_web(parameters: Dict) -> Dict:
        query = parameters["query"]
        
        import requests
        
        try:
            response = requests.get(
                "https://api.duckduckgo.com/",
                params={"q": query}
            )
            
            results = response.json().get("RelatedTopics", [])
            
            return {
                "query": query,
                "results": results[:5],
                "count": len(results)
            }
        
        except Exception as e:
            raise Exception(f"搜索失败: {str(e)}")
    
    @staticmethod
    async def create_calendar_event(parameters: Dict) -> Dict:
        title = parameters["title"]
        start_time = parameters["start_time"]
        end_time = parameters.get("end_time")
        description = parameters.get("description", "")
        
        event_id = f"evt_{hash(title)}_{hash(start_time)}"
        
        return {
            "event_id": event_id,
            "title": title,
            "start_time": start_time,
            "end_time": end_time,
            "description": description,
            "status": "created"
        }
    
    @staticmethod
    async def get_calendar_events(parameters: Dict) -> Dict:
        date = parameters.get("date")
        
        return {
            "date": date,
            "events": [],
            "count": 0
        }
    
    @staticmethod
    async def create_todo(parameters: Dict) -> Dict:
        title = parameters["title"]
        description = parameters.get("description", "")
        priority = parameters.get("priority", "medium")
        
        todo_id = f"todo_{hash(title)}"
        
        return {
            "todo_id": todo_id,
            "title": title,
            "description": description,
            "priority": priority,
            "status": "pending",
            "created_at": datetime.now().isoformat()
        }
    
    @staticmethod
    async def get_todos(parameters: Dict) -> Dict:
        status = parameters.get("status")
        
        return {
            "todos": [],
            "count": 0
        }
    
    @staticmethod
    async def send_email(parameters: Dict) -> Dict:
        to = parameters["to"]
        subject = parameters["subject"]
        body = parameters["body"]
        
        email_id = f"email_{hash(subject)}"
        
        return {
            "email_id": email_id,
            "to": to,
            "subject": subject,
            "status": "sent"
        }
    
    @staticmethod
    async def take_note(parameters: Dict) -> Dict:
        content = parameters["content"]
        title = parameters.get("title", "")
        tags = parameters.get("tags", [])
        
        note_id = f"note_{hash(content)}"
        
        return {
            "note_id": note_id,
            "title": title,
            "content": content,
            "tags": tags,
            "created_at": datetime.now().isoformat()
        }
    
    @staticmethod
    async def search_notes(parameters: Dict) -> Dict:
        query = parameters["query"]
        
        return {
            "query": query,
            "notes": [],
            "count": 0
        }

记忆管理实现

记忆管理器

python
class MemoryManager:
    def __init__(self, storage):
        self.storage = storage
    
    async def store(
        self,
        content: str,
        memory_type: str = "short_term",
        metadata: Optional[Dict] = None
    ) -> str:
        if memory_type == "short_term":
            return self.storage.add_short_term(content, metadata)
        elif memory_type == "long_term":
            key = metadata.get("key", f"ltm_{hash(content)}")
            self.storage.add_long_term(key, content, metadata)
            return key
        elif memory_type == "knowledge":
            key = metadata.get("key", f"knl_{hash(content)}")
            self.storage.add_knowledge(key, content)
            return key
        else:
            raise ValueError(f"不支持的记忆类型: {memory_type}")
    
    async def retrieve(
        self,
        query: str,
        memory_types: Optional[List[str]] = None
    ) -> Dict:
        if memory_types is None:
            memory_types = ["short_term", "long_term", "knowledge"]
        
        results = {}
        
        if "short_term" in memory_types:
            results["short_term"] = self.storage.retrieve_short_term()
        
        if "long_term" in memory_types:
            key = self._find_best_match(query, self.storage.long_term_memory)
            if key:
                results["long_term"] = [self.storage.retrieve_long_term(key)]
            else:
                results["long_term"] = []
        
        if "knowledge" in memory_types:
            results["knowledge"] = self.storage.retrieve_knowledge(query)
        
        return results
    
    def _find_best_match(
        self,
        query: str,
        memory: Dict
    ) -> Optional[str]:
        if not memory:
            return None
        
        best_key = None
        best_score = 0
        
        for key, value in memory.items():
            score = self._calculate_similarity(query, value["content"])
            
            if score > best_score:
                best_score = score
                best_key = key
        
        return best_key if best_score > 0.5 else None
    
    def _calculate_similarity(
        self,
        query: str,
        content: str
    ) -> float:
        query_words = set(query.lower().split())
        content_words = set(content.lower().split())
        
        if not query_words or not content_words:
            return 0.0
        
        intersection = query_words & content_words
        union = query_words | content_words
        
        return len(intersection) / len(union)
    
    async def update(
        self,
        key: str,
        content: str,
        memory_type: str = "long_term"
    ):
        if memory_type == "long_term":
            if key in self.storage.long_term_memory:
                self.storage.long_term_memory[key]["content"] = content
                self.storage.long_term_memory[key]["updated_at"] = datetime.now().isoformat()
        elif memory_type == "knowledge":
            if key in self.storage.knowledge_base:
                self.storage.knowledge_base[key]["knowledge"] = content
                self.storage.knowledge_base[key]["updated_at"] = datetime.now().isoformat()
    
    async def delete(
        self,
        key: str,
        memory_type: str = "long_term"
    ):
        if memory_type == "long_term" and key in self.storage.long_term_memory:
            del self.storage.long_term_memory[key]
        elif memory_type == "knowledge" and key in self.storage.knowledge_base:
            del self.storage.knowledge_base[key]

Agent循环实现

Agent循环

python
class AgentLoop:
    def __init__(
        self,
        agent: PersonalAssistantAgent,
        max_iterations: int = 10
    ):
        self.agent = agent
        self.max_iterations = max_iterations
    
    async def run(
        self,
        user_input: str,
        context: Optional[Dict] = None
    ) -> Dict:
        iteration = 0
        state = {
            "user_input": user_input,
            "context": context or {},
            "actions": [],
            "observations": []
        }
        
        while iteration < self.max_iterations:
            iteration += 1
            
            decision = await self._make_decision(state)
            
            if decision["action"] == "respond":
                return {
                    "response": decision["response"],
                    "actions": state["actions"],
                    "iterations": iteration,
                    "success": True
                }
            
            elif decision["action"] == "tool":
                tool_result = await self._execute_tool(
                    decision["tool"],
                    decision["parameters"]
                )
                
                state["actions"].append({
                    "tool": decision["tool"],
                    "parameters": decision["parameters"],
                    "result": tool_result
                })
                
                state["observations"].append({
                    "type": "tool_result",
                    "content": tool_result
                })
            
            elif decision["action"] == "think":
                state["observations"].append({
                    "type": "thought",
                    "content": decision["thought"]
                })
            
            elif decision["action"] == "plan":
                plan = await self._create_plan(
                    decision["goal"],
                    state
                )
                
                state["actions"].append({
                    "action": "plan",
                    "plan": plan
                })
        
        return {
            "response": "我需要更多信息来帮助您",
            "actions": state["actions"],
            "iterations": iteration,
            "success": False
        }
    
    async def _make_decision(self, state: Dict) -> Dict:
        return await self.agent._make_decision(state)
    
    async def _execute_tool(
        self,
        tool_name: str,
        parameters: Dict
    ) -> Dict:
        return await self.agent._execute_tool(tool_name, parameters)
    
    async def _create_plan(
        self,
        goal: str,
        state: Dict
    ) -> Dict:
        return await self.agent._create_plan(goal, state)

实践练习

练习1:实现Agent核心

python
def implement_agent_core():
    llm_client = openai.OpenAI(api_key="your-api-key")
    tool_manager = ToolManager()
    memory_storage = MemoryStorage()
    
    agent = PersonalAssistantAgent(
        llm_client,
        tool_manager,
        memory_storage
    )
    
    return agent

练习2:实现任务规划

python
def implement_task_planning():
    llm_client = openai.OpenAI(api_key="your-api-key")
    decomposer = TaskDecomposer(llm_client)
    
    return decomposer

练习3:实现工具调用

python
def implement_tool_calling():
    tool_manager = ToolManager()
    
    return tool_manager

总结

本节我们学习了个人助理Agent的核心功能开发:

  1. Agent核心实现
  2. 任务规划实现
  3. 工具调用实现
  4. 记忆管理实现
  5. Agent循环实现

Agent的核心是理解、规划、执行、记忆的循环。

参考资源