Skip to content

第36天:Agent感知与规划

学习目标

  • 理解Agent的感知机制
  • 掌握环境建模方法
  • 学习状态表示技术
  • 掌握规划算法
  • 理解任务分解策略

Agent感知机制

感知流程

感知管道

环境 → 传感器 → 数据采集 → 预处理 → 特征提取 → 状态表示 → 推理决策

实现示例

python
class PerceptionPipeline:
    def __init__(self, sensors, preprocessor, feature_extractor):
        self.sensors = sensors
        self.preprocessor = preprocessor
        self.feature_extractor = feature_extractor
    
    def perceive(self, environment):
        raw_data = self.collect_data(environment)
        processed_data = self.preprocess(raw_data)
        features = self.extract_features(processed_data)
        state = self.build_state(features)
        return state
    
    def collect_data(self, environment):
        data = {}
        for sensor in self.sensors:
            data[sensor.name] = sensor.read(environment)
        return data
    
    def preprocess(self, raw_data):
        return self.preprocessor.process(raw_data)
    
    def extract_features(self, processed_data):
        return self.feature_extractor.extract(processed_data)
    
    def build_state(self, features):
        return State(features)

环境建模

环境类型

确定性环境

  • 状态转换完全确定
  • 相同行动总是产生相同结果
  • 适合简单任务

随机环境

  • 状态转换有概率性
  • 相同行动可能产生不同结果
  • 需要概率推理

部分可观察环境

  • 只能观察到部分环境信息
  • 需要推断隐藏状态
  • 常见于现实世界

环境建模方法

基于状态空间

python
class StateSpaceEnvironment:
    def __init__(self, states, actions, transitions):
        self.states = states
        self.actions = actions
        self.transitions = transitions
    
    def get_next_state(self, state, action):
        return self.transitions[state][action]
    
    def get_possible_actions(self, state):
        return [action for action in self.actions 
                if action in self.transitions[state]]

基于概率模型

python
class ProbabilisticEnvironment:
    def __init__(self, states, actions, transition_probs):
        self.states = states
        self.actions = actions
        self.transition_probs = transition_probs
    
    def get_next_state(self, state, action):
        probs = self.transition_probs[state][action]
        return self.sample_from_distribution(probs)
    
    def sample_from_distribution(self, probs):
        import random
        r = random.random()
        cumulative = 0
        for state, prob in probs.items():
            cumulative += prob
            if r <= cumulative:
                return state
        return list(probs.keys())[-1]

状态表示

状态空间表示

离散状态空间

python
class DiscreteState:
    def __init__(self, state_id, features):
        self.state_id = state_id
        self.features = features
    
    def __eq__(self, other):
        return self.state_id == other.state_id
    
    def __hash__(self):
        return hash(self.state_id)

连续状态空间

python
class ContinuousState:
    def __init__(self, vector):
        self.vector = np.array(vector)
    
    def distance(self, other):
        return np.linalg.norm(self.vector - other.vector)
    
    def similarity(self, other):
        return 1 / (1 + self.distance(other))

特征表示

特征提取

python
class FeatureExtractor:
    def __init__(self, feature_types):
        self.feature_types = feature_types
    
    def extract(self, data):
        features = {}
        for feature_type in self.feature_types:
            extractor = self.get_extractor(feature_type)
            features[feature_type] = extractor.extract(data)
        return features
    
    def get_extractor(self, feature_type):
        extractors = {
            "text": TextFeatureExtractor(),
            "image": ImageFeatureExtractor(),
            "numeric": NumericFeatureExtractor()
        }
        return extractors[feature_type]

特征编码

python
class FeatureEncoder:
    def __init__(self, encoding_type):
        self.encoding_type = encoding_type
    
    def encode(self, features):
        if self.encoding_type == "one_hot":
            return self.one_hot_encode(features)
        elif self.encoding_type == "embedding":
            return self.embedding_encode(features)
        else:
            return features
    
    def one_hot_encode(self, features):
        encoded = {}
        for key, value in features.items():
            if isinstance(value, str):
                encoded[key] = self._one_hot(value)
            else:
                encoded[key] = value
        return encoded
    
    def embedding_encode(self, features):
        encoded = {}
        for key, value in features.items():
            if isinstance(value, str):
                encoded[key] = self._get_embedding(value)
            else:
                encoded[key] = value
        return encoded

规划算法

A*算法

算法原理

A*算法是一种启发式搜索算法,结合了Dijkstra算法和贪心最佳优先搜索的优点。

实现

python
import heapq
from typing import List, Dict, Tuple, Optional

class AStarPlanner:
    def __init__(self, heuristic):
        self.heuristic = heuristic
    
    def plan(self, start_state, goal_state, environment):
        open_set = []
        heapq.heappush(open_set, (0, start_state))
        
        came_from = {}
        g_score = {start_state: 0}
        f_score = {start_state: self.heuristic(start_state, goal_state)}
        
        while open_set:
            current = heapq.heappop(open_set)[1]
            
            if current == goal_state:
                return self.reconstruct_path(came_from, current)
            
            for action in environment.get_possible_actions(current):
                neighbor = environment.get_next_state(current, action)
                tentative_g_score = g_score[current] + 1
                
                if neighbor not in g_score or tentative_g_score < g_score[neighbor]:
                    came_from[neighbor] = (current, action)
                    g_score[neighbor] = tentative_g_score
                    f_score[neighbor] = tentative_g_score + self.heuristic(neighbor, goal_state)
                    
                    if neighbor not in [item[1] for item in open_set]:
                        heapq.heappush(open_set, (f_score[neighbor], neighbor))
        
        return None
    
    def reconstruct_path(self, came_from, current):
        path = []
        while current in came_from:
            current, action = came_from[current]
            path.append(action)
        return path[::-1]

STRIPS规划

算法原理

STRIPS(Stanford Research Institute Problem Solver)是一种经典的规划表示语言。

实现

python
class STRIPSPlanner:
    def __init__(self):
        self.actions = []
    
    def add_action(self, name, preconditions, effects):
        self.actions.append({
            "name": name,
            "preconditions": preconditions,
            "effects": effects
        })
    
    def plan(self, initial_state, goal_state):
        plan = []
        current_state = set(initial_state)
        
        while not goal_state.issubset(current_state):
            applicable_actions = self.get_applicable_actions(current_state)
            
            if not applicable_actions:
                return None
            
            best_action = self.select_best_action(applicable_actions, goal_state)
            plan.append(best_action["name"])
            current_state = self.apply_effects(current_state, best_action["effects"])
        
        return plan
    
    def get_applicable_actions(self, state):
        applicable = []
        for action in self.actions:
            if action["preconditions"].issubset(state):
                applicable.append(action)
        return applicable
    
    def select_best_action(self, actions, goal_state):
        best_action = None
        best_score = -1
        
        for action in actions:
            score = len(action["effects"].intersection(goal_state))
            if score > best_score:
                best_score = score
                best_action = action
        
        return best_action
    
    def apply_effects(self, state, effects):
        new_state = state.copy()
        new_state.update(effects)
        return new_state

层次化规划

算法原理

层次化规划将任务分解为多个层次,每个层次处理不同抽象级别的任务。

实现

python
class HierarchicalPlanner:
    def __init__(self, hierarchy):
        self.hierarchy = hierarchy
    
    def plan(self, goal):
        plan = []
        self._plan_at_level(goal, 0, plan)
        return plan
    
    def _plan_at_level(self, goal, level, plan):
        if level >= len(self.hierarchy):
            plan.append(goal)
            return
        
        subtasks = self.hierarchy[level].decompose(goal)
        
        for subtask in subtasks:
            if self.is_primitive(subtask, level):
                self._plan_at_level(subtask, level + 1, plan)
            else:
                self._plan_at_level(subtask, level, plan)
    
    def is_primitive(self, task, level):
        return level == len(self.hierarchy) - 1

任务分解

任务分解策略

基于规则的分解

python
class RuleBasedDecomposer:
    def __init__(self, rules):
        self.rules = rules
    
    def decompose(self, task):
        for rule in self.rules:
            if rule.matches(task):
                return rule.apply(task)
        return [task]

基于学习的分解

python
class LearningBasedDecomposer:
    def __init__(self, model):
        self.model = model
    
    def decompose(self, task):
        features = self.extract_features(task)
        subtasks = self.model.predict(features)
        return subtasks
    
    def extract_features(self, task):
        return {
            "type": task.type,
            "complexity": task.complexity,
            "dependencies": task.dependencies
        }

目标层次

目标层次结构

主目标
├── 子目标1
│   ├── 子任务1.1
│   └── 子任务1.2
├── 子目标2
│   ├── 子任务2.1
│   └── 子任务2.2
└── 子目标3
    ├── 子任务3.1
    └── 子任务3.2

实现

python
class GoalHierarchy:
    def __init__(self):
        self.root = None
    
    def set_root(self, goal):
        self.root = goal
    
    def decompose(self):
        if self.root is None:
            return []
        return self._decompose_goal(self.root)
    
    def _decompose_goal(self, goal):
        if goal.is_primitive():
            return [goal]
        
        subtasks = []
        for subgoal in goal.subgoals:
            subtasks.extend(self._decompose_goal(subgoal))
        
        return subtasks

实践练习

练习1:实现简单的A*规划器

python
class SimpleAStarPlanner:
    def __init__(self):
        pass
    
    def heuristic(self, state, goal):
        return abs(state.x - goal.x) + abs(state.y - goal.y)
    
    def plan(self, start, goal, grid):
        planner = AStarPlanner(self.heuristic)
        return planner.plan(start, goal, grid)

练习2:实现STRIPS规划器

python
class SimpleSTRIPSPlanner:
    def __init__(self):
        self.planner = STRIPSPlanner()
        self.setup_actions()
    
    def setup_actions(self):
        self.planner.add_action(
            "move_to_a",
            {"at_b"},
            {"at_a", "not_at_b"}
        )
        self.planner.add_action(
            "move_to_b",
            {"at_a"},
            {"at_b", "not_at_a"}
        )
    
    def plan(self, initial, goal):
        return self.planner.plan(initial, goal)

总结

本节我们学习了Agent的感知与规划:

  1. Agent的感知机制和感知流程
  2. 环境建模方法
  3. 状态表示技术
  4. 规划算法(A*、STRIPS、层次化规划)
  5. 任务分解策略

这些技术为Agent能够自主感知环境、制定计划并执行行动提供了基础。

参考资源