Appearance
第36天:Agent感知与规划
学习目标
- 理解Agent的感知机制
- 掌握环境建模方法
- 学习状态表示技术
- 掌握规划算法
- 理解任务分解策略
Agent感知机制
感知流程
感知管道:
环境 → 传感器 → 数据采集 → 预处理 → 特征提取 → 状态表示 → 推理决策实现示例:
python
class PerceptionPipeline:
def __init__(self, sensors, preprocessor, feature_extractor):
self.sensors = sensors
self.preprocessor = preprocessor
self.feature_extractor = feature_extractor
def perceive(self, environment):
raw_data = self.collect_data(environment)
processed_data = self.preprocess(raw_data)
features = self.extract_features(processed_data)
state = self.build_state(features)
return state
def collect_data(self, environment):
data = {}
for sensor in self.sensors:
data[sensor.name] = sensor.read(environment)
return data
def preprocess(self, raw_data):
return self.preprocessor.process(raw_data)
def extract_features(self, processed_data):
return self.feature_extractor.extract(processed_data)
def build_state(self, features):
return State(features)环境建模
环境类型
确定性环境:
- 状态转换完全确定
- 相同行动总是产生相同结果
- 适合简单任务
随机环境:
- 状态转换有概率性
- 相同行动可能产生不同结果
- 需要概率推理
部分可观察环境:
- 只能观察到部分环境信息
- 需要推断隐藏状态
- 常见于现实世界
环境建模方法
基于状态空间:
python
class StateSpaceEnvironment:
def __init__(self, states, actions, transitions):
self.states = states
self.actions = actions
self.transitions = transitions
def get_next_state(self, state, action):
return self.transitions[state][action]
def get_possible_actions(self, state):
return [action for action in self.actions
if action in self.transitions[state]]基于概率模型:
python
class ProbabilisticEnvironment:
def __init__(self, states, actions, transition_probs):
self.states = states
self.actions = actions
self.transition_probs = transition_probs
def get_next_state(self, state, action):
probs = self.transition_probs[state][action]
return self.sample_from_distribution(probs)
def sample_from_distribution(self, probs):
import random
r = random.random()
cumulative = 0
for state, prob in probs.items():
cumulative += prob
if r <= cumulative:
return state
return list(probs.keys())[-1]状态表示
状态空间表示
离散状态空间:
python
class DiscreteState:
def __init__(self, state_id, features):
self.state_id = state_id
self.features = features
def __eq__(self, other):
return self.state_id == other.state_id
def __hash__(self):
return hash(self.state_id)连续状态空间:
python
class ContinuousState:
def __init__(self, vector):
self.vector = np.array(vector)
def distance(self, other):
return np.linalg.norm(self.vector - other.vector)
def similarity(self, other):
return 1 / (1 + self.distance(other))特征表示
特征提取:
python
class FeatureExtractor:
def __init__(self, feature_types):
self.feature_types = feature_types
def extract(self, data):
features = {}
for feature_type in self.feature_types:
extractor = self.get_extractor(feature_type)
features[feature_type] = extractor.extract(data)
return features
def get_extractor(self, feature_type):
extractors = {
"text": TextFeatureExtractor(),
"image": ImageFeatureExtractor(),
"numeric": NumericFeatureExtractor()
}
return extractors[feature_type]特征编码:
python
class FeatureEncoder:
def __init__(self, encoding_type):
self.encoding_type = encoding_type
def encode(self, features):
if self.encoding_type == "one_hot":
return self.one_hot_encode(features)
elif self.encoding_type == "embedding":
return self.embedding_encode(features)
else:
return features
def one_hot_encode(self, features):
encoded = {}
for key, value in features.items():
if isinstance(value, str):
encoded[key] = self._one_hot(value)
else:
encoded[key] = value
return encoded
def embedding_encode(self, features):
encoded = {}
for key, value in features.items():
if isinstance(value, str):
encoded[key] = self._get_embedding(value)
else:
encoded[key] = value
return encoded规划算法
A*算法
算法原理:
A*算法是一种启发式搜索算法,结合了Dijkstra算法和贪心最佳优先搜索的优点。
实现:
python
import heapq
from typing import List, Dict, Tuple, Optional
class AStarPlanner:
def __init__(self, heuristic):
self.heuristic = heuristic
def plan(self, start_state, goal_state, environment):
open_set = []
heapq.heappush(open_set, (0, start_state))
came_from = {}
g_score = {start_state: 0}
f_score = {start_state: self.heuristic(start_state, goal_state)}
while open_set:
current = heapq.heappop(open_set)[1]
if current == goal_state:
return self.reconstruct_path(came_from, current)
for action in environment.get_possible_actions(current):
neighbor = environment.get_next_state(current, action)
tentative_g_score = g_score[current] + 1
if neighbor not in g_score or tentative_g_score < g_score[neighbor]:
came_from[neighbor] = (current, action)
g_score[neighbor] = tentative_g_score
f_score[neighbor] = tentative_g_score + self.heuristic(neighbor, goal_state)
if neighbor not in [item[1] for item in open_set]:
heapq.heappush(open_set, (f_score[neighbor], neighbor))
return None
def reconstruct_path(self, came_from, current):
path = []
while current in came_from:
current, action = came_from[current]
path.append(action)
return path[::-1]STRIPS规划
算法原理:
STRIPS(Stanford Research Institute Problem Solver)是一种经典的规划表示语言。
实现:
python
class STRIPSPlanner:
def __init__(self):
self.actions = []
def add_action(self, name, preconditions, effects):
self.actions.append({
"name": name,
"preconditions": preconditions,
"effects": effects
})
def plan(self, initial_state, goal_state):
plan = []
current_state = set(initial_state)
while not goal_state.issubset(current_state):
applicable_actions = self.get_applicable_actions(current_state)
if not applicable_actions:
return None
best_action = self.select_best_action(applicable_actions, goal_state)
plan.append(best_action["name"])
current_state = self.apply_effects(current_state, best_action["effects"])
return plan
def get_applicable_actions(self, state):
applicable = []
for action in self.actions:
if action["preconditions"].issubset(state):
applicable.append(action)
return applicable
def select_best_action(self, actions, goal_state):
best_action = None
best_score = -1
for action in actions:
score = len(action["effects"].intersection(goal_state))
if score > best_score:
best_score = score
best_action = action
return best_action
def apply_effects(self, state, effects):
new_state = state.copy()
new_state.update(effects)
return new_state层次化规划
算法原理:
层次化规划将任务分解为多个层次,每个层次处理不同抽象级别的任务。
实现:
python
class HierarchicalPlanner:
def __init__(self, hierarchy):
self.hierarchy = hierarchy
def plan(self, goal):
plan = []
self._plan_at_level(goal, 0, plan)
return plan
def _plan_at_level(self, goal, level, plan):
if level >= len(self.hierarchy):
plan.append(goal)
return
subtasks = self.hierarchy[level].decompose(goal)
for subtask in subtasks:
if self.is_primitive(subtask, level):
self._plan_at_level(subtask, level + 1, plan)
else:
self._plan_at_level(subtask, level, plan)
def is_primitive(self, task, level):
return level == len(self.hierarchy) - 1任务分解
任务分解策略
基于规则的分解
python
class RuleBasedDecomposer:
def __init__(self, rules):
self.rules = rules
def decompose(self, task):
for rule in self.rules:
if rule.matches(task):
return rule.apply(task)
return [task]基于学习的分解
python
class LearningBasedDecomposer:
def __init__(self, model):
self.model = model
def decompose(self, task):
features = self.extract_features(task)
subtasks = self.model.predict(features)
return subtasks
def extract_features(self, task):
return {
"type": task.type,
"complexity": task.complexity,
"dependencies": task.dependencies
}目标层次
目标层次结构:
主目标
├── 子目标1
│ ├── 子任务1.1
│ └── 子任务1.2
├── 子目标2
│ ├── 子任务2.1
│ └── 子任务2.2
└── 子目标3
├── 子任务3.1
└── 子任务3.2实现:
python
class GoalHierarchy:
def __init__(self):
self.root = None
def set_root(self, goal):
self.root = goal
def decompose(self):
if self.root is None:
return []
return self._decompose_goal(self.root)
def _decompose_goal(self, goal):
if goal.is_primitive():
return [goal]
subtasks = []
for subgoal in goal.subgoals:
subtasks.extend(self._decompose_goal(subgoal))
return subtasks实践练习
练习1:实现简单的A*规划器
python
class SimpleAStarPlanner:
def __init__(self):
pass
def heuristic(self, state, goal):
return abs(state.x - goal.x) + abs(state.y - goal.y)
def plan(self, start, goal, grid):
planner = AStarPlanner(self.heuristic)
return planner.plan(start, goal, grid)练习2:实现STRIPS规划器
python
class SimpleSTRIPSPlanner:
def __init__(self):
self.planner = STRIPSPlanner()
self.setup_actions()
def setup_actions(self):
self.planner.add_action(
"move_to_a",
{"at_b"},
{"at_a", "not_at_b"}
)
self.planner.add_action(
"move_to_b",
{"at_a"},
{"at_b", "not_at_a"}
)
def plan(self, initial, goal):
return self.planner.plan(initial, goal)总结
本节我们学习了Agent的感知与规划:
- Agent的感知机制和感知流程
- 环境建模方法
- 状态表示技术
- 规划算法(A*、STRIPS、层次化规划)
- 任务分解策略
这些技术为Agent能够自主感知环境、制定计划并执行行动提供了基础。
