中级 · 2周
项目一:强化学习机器人导航
从零实现Q-Learning和DQN,在Gymnasium环境中训练导航智能体——理解强化学习的核心机制如何驱动自主决策
1. 项目概述
本项目将从头构建一个基于深度强化学习的机器人导航系统。你将学习:
难度:⭐⭐⭐
预计时间:2周
前置知识:Python基础、线性代数、强化学习基础
---
2. 目录
---
3. 1. 项目环境搭建
1.1 创建项目目录
cd ~/Documents/具身智能
mkdir -p projects/robot_navigation
cd projects/robot_navigation
1.2 创建虚拟环境(推荐)
# 创建虚拟环境
python -m venv venv
# 激活虚拟环境
# macOS/Linux:
source venv/bin/activate
# Windows:
# venv\Scripts\activate
1.3 安装依赖
创建 requirements.txt:
# 强化学习核心
gymnasium>=0.26.0
stable-baselines3>=1.6.0
# 深度学习框架(选择一种)
torch>=1.10.0 # 推荐PyTorch
# 可视化
matplotlib>=3.4.0
pygame>=2.0.0
# 数据处理
numpy>=1.21.0
安装依赖:
pip install -r requirements.txt
1.4 验证安装
创建 check_env.py:
#!/usr/bin/env python3
"""验证环境安装"""
"kw">class="kw">import sys
"kw">print(f"Python版本: {sys.version}")
# 检查核心库
"kw">try:
"kw">class="kw">import gymnasium
"kw">print(f"✓ Gymnasium版本: {gymnasium.__version__}")
"kw">class="kw">except ImportError:
"kw">print("✗ Gymnasium未安装")
"kw">try:
"kw">class="kw">import torch
"kw">print(f"✓ PyTorch版本: {torch.__version__}")
"kw">print(f" CUDA可用: {torch.cuda.is_available()}")
"kw">class="kw">except ImportError:
"kw">print("✗ PyTorch未安装")
"kw">try:
"kw">class="kw">import numpy "kw">as np
"kw">print(f"✓ NumPy版本: {np.__version__}")
"kw">class="kw">except ImportError:
"kw">print("✗ NumPy未安装")
"kw">try:
"kw">class="kw">import matplotlib.pyplot "kw">as plt
"kw">print(f"✓ Matplotlib版本: {plt.matplotlib.__version__}")
"kw">class="kw">except ImportError:
"kw">print("✗ Matplotlib未安装")
"kw">print("\n环境验证完成!")
运行:
python check_env.py
---
4. 2. 问题定义与建模
2.1 机器人导航问题
目标:让机器人在二维网格世界中从起始位置移动到目标位置,同时避开障碍物。
环境描述:
2.2 创建自定义Gymnasium环境
创建 envs/grid_world.py:
"""
自定义网格世界导航环境
基于Gymnasium API实现
"""
"kw">class="kw">import numpy "kw">as np
"kw">class="kw">import gymnasium "kw">as gym
"kw">from gymnasium "kw">class="kw">import spaces
"kw">class GridWorldEnv(gym.Env):
"""
10×10网格世界导航环境
状态空间:机器人位置 (0-99)
动作空间:0=上, 1=下, 2=左, 3=右
"""
metadata = {'render_modes': ['human', 'rgb_array'], 'render_fps': 10}
"kw">def __init__("kw">self, grid_size=10, max_steps=100, render_mode="kw">None):
super().__init__()
"kw">self.grid_size = grid_size
"kw">self.max_steps = max_steps
"kw">self.render_mode = render_mode
# 状态空间:grid_size × grid_size
"kw">self.observation_space = spaces.Discrete(grid_size * grid_size)
# 动作空间:4个方向
"kw">self.action_space = spaces.Discrete(4)
# 定义网格(0=可通过, 1=障碍物, 2=目标)
"kw">self.grid = np.zeros((grid_size, grid_size), dtype=np.int8)
# 设置障碍物
"kw">self._setup_obstacles()
# 状态
"kw">self.agent_pos = "kw">None
"kw">self.goal_pos = "kw">None
"kw">self.steps = "kw">None
# 渲染
"kw">self.window = "kw">None
"kw">self.clock = "kw">None
"kw">def _setup_obstacles("kw">self):
"""设置障碍物布局"""
# L形障碍
"kw">self.grid[3, 3:7] = 1
"kw">self.grid[3:8, 3] = 1
# 分散的障碍
"kw">self.grid[1, 5] = 1
"kw">self.grid[5, 1] = 1
"kw">self.grid[7, 8] = 1
"kw">self.grid[8, 2] = 1
"kw">def _pos_to_state("kw">self, pos):
"""位置转状态"""
"kw">class="kw">return pos[0] * "kw">self.grid_size + pos[1]
"kw">def _state_to_pos("kw">self, state):
"""状态转位置"""
"kw">class="kw">return (state // "kw">self.grid_size, state % "kw">self.grid_size)
"kw">def reset("kw">self, seed="kw">None, options="kw">None):
"""重置环境"""
super().reset(seed=seed)
# 随机设置起点(角落区域)
"kw">self.agent_pos = (
"kw">self.np_random.integers(0, 3),
"kw">self.np_random.integers(0, 3)
)
# 设置目标位置(对角区域)
"kw">self.goal_pos = (
"kw">self.np_random.integers("kw">self.grid_size-3, "kw">self.grid_size),
"kw">self.np_random.integers("kw">self.grid_size-3, "kw">self.grid_size)
)
# 确保目标不在障碍物上
"kw">while "kw">self.grid["kw">self.goal_pos] == 1:
"kw">self.goal_pos = (
"kw">self.np_random.integers("kw">self.grid_size-3, "kw">self.grid_size),
"kw">self.np_random.integers("kw">self.grid_size-3, "kw">self.grid_size)
)
"kw">self.steps = 0
observation = "kw">self._pos_to_state("kw">self.agent_pos)
info = {}
"kw">if "kw">self.render_mode == 'human':
"kw">self._render_frame()
"kw">class="kw">return observation, info
"kw">def step("kw">self, action):
"""执行动作"""
row, col = "kw">self.agent_pos
# 动作映射:0=上, 1=下, 2=左, 3=右
new_row, new_col = row, col
"kw">if action == 0: # 上
new_row = max(0, row - 1)
"kw">elif action == 1: # 下
new_row = min("kw">self.grid_size - 1, row + 1)
"kw">elif action == 2: # 左
new_col = max(0, col - 1)
"kw">elif action == 3: # 右
new_col = min("kw">self.grid_size - 1, col + 1)
# 检查碰撞
"kw">if "kw">self.grid[new_row, new_col] == 1:
# 碰到障碍物,保持原位,给予负奖励
reward = -1.0
terminated = "kw">False
"kw">else:
"kw">self.agent_pos = (new_row, new_col)
# 计算奖励
distance = abs(new_row - "kw">self.goal_pos[0]) + abs(new_col - "kw">self.goal_pos[1])
reward = -0.1 # 每步小惩罚
# 到达目标
"kw">if "kw">self.agent_pos == "kw">self.goal_pos:
reward = 10.0
terminated = "kw">True
"kw">else:
# 接近目标奖励
reward += 0.1 * ("kw">self.grid_size * 2 - distance) / ("kw">self.grid_size * 2)
terminated = "kw">False
"kw">self.steps += 1
# 超时
"kw">if "kw">self.steps >= "kw">self.max_steps:
terminated = "kw">True
reward = -1.0
observation = "kw">self._pos_to_state("kw">self.agent_pos)
info = {
'agent_pos': "kw">self.agent_pos,
'goal_pos': "kw">self.goal_pos,
'steps': "kw">self.steps
}
"kw">if "kw">self.render_mode == 'human':
"kw">self._render_frame()
"kw">class="kw">return observation, reward, terminated, "kw">False, info
"kw">def render("kw">self):
"""渲染环境"""
"kw">if "kw">self.render_mode == 'human':
"kw">class="kw">return "kw">self._render_frame()
"kw">class="kw">return "kw">None
"kw">def _render_frame("kw">self):
"""绘制一帧"""
"kw">class="kw">import pygame
"kw">class="kw">import numpy "kw">as np
"kw">if "kw">self.window "kw">is "kw">None:
pygame.init()
"kw">self.window = pygame.display.set_mode((500, 500))
pygame.display.set_caption("Grid World Navigation")
"kw">if "kw">self.clock "kw">is "kw">None:
"kw">self.clock = pygame.time.Clock()
# 清屏
"kw">self.window.fill((30, 30, 40))
# 绘制网格
cell_size = 500 // "kw">self.grid_size
"kw">for row "kw">in "kw">range("kw">self.grid_size):
"kw">for col "kw">in "kw">range("kw">self.grid_size):
rect = pygame.Rect(col * cell_size, row * cell_size, cell_size, cell_size)
"kw">if "kw">self.grid[row, col] == 1:
pygame.draw.rect("kw">self.window, (100, 100, 100), rect)
"kw">elif (row, col) == "kw">self.goal_pos:
pygame.draw.rect("kw">self.window, (0, 200, 100), rect)
"kw">else:
pygame.draw.rect("kw">self.window, (60, 60, 80), rect, 1)
# 绘制起点
start = (0, 0)
start_rect = pygame.Rect(start[1] * cell_size, start[0] * cell_size, cell_size, cell_size)
pygame.draw.rect("kw">self.window, (100, 100, 200), start_rect)
# 绘制智能体
agent_rect = pygame.Rect(
"kw">self.agent_pos[1] * cell_size + 5,
"kw">self.agent_pos[0] * cell_size + 5,
cell_size - 10,
cell_size - 10
)
pygame.draw.circle("kw">self.window, (255, 200, 0), agent_rect.center, cell_size // 3)
pygame.display.flip()
"kw">self.clock.tick("kw">self.metadata['render_fps'])
"kw">def close("kw">self):
"""关闭环境"""
"kw">if "kw">self.window "kw">is "kw">not "kw">None:
"kw">class="kw">import pygame
pygame.quit()
"kw">self.window = "kw">None
"kw">def get_state("kw">self):
"""获取当前状态(用于调试)"""
"kw">class="kw">return {
'agent_pos': "kw">self.agent_pos,
'goal_pos': "kw">self.goal_pos,
'steps': "kw">self.steps,
'grid': "kw">self.grid.copy()
}
2.3 测试环境
创建 test_env.py:
"""测试自定义环境"""
"kw">class="kw">import sys
sys.path.append('.')
"kw">from envs.grid_world "kw">class="kw">import GridWorldEnv
"kw">def test_environment():
"""测试环境功能"""
"kw">print("=" * 50)
"kw">print("测试 GridWorldEnv")
"kw">print("=" * 50)
# 创建环境
env = GridWorldEnv(grid_size=10, render_mode="kw">None)
"kw">print(f"状态空间: {env.observation_space}")
"kw">print(f"动作空间: {env.action_space}")
"kw">print(f"网格大小: {env.grid_size}×{env.grid_size}")
# 重置环境
observation, info = env.reset()
state = env.get_state()
"kw">print(f"\n初始状态:")
"kw">print(f" 智能体位置: {state['agent_pos']}")
"kw">print(f" 目标位置: {state['goal_pos']}")
"kw">print(f" 观察值: {observation}")
# 测试几个动作
"kw">print("\n执行10步随机动作:")
total_reward = 0
"kw">for step "kw">in "kw">range(10):
action = env.action_space.sample()
observation, reward, terminated, truncated, info = env.step(action)
total_reward += reward
state = env.get_state()
"kw">print(f" 步骤{step+1}: 动作={action}, 奖励={reward:.2f}, "
f"位置={state['agent_pos']}, 完成={terminated}")
"kw">if terminated:
"kw">print(f" 任务完成!总奖励: {total_reward:.2f}")
"kw">break
env.close()
"kw">print("\n环境测试完成!")
"kw">if __name__ == '__main__':
test_environment()
运行测试:
python test_env.py
---
5. 3. Q-Learning算法实现
3.1 算法原理
Q-Learning是一种基于值函数的强化学习算法,通过迭代更新Q表来学习最优策略。
Q值更新公式:
Q(s, a) ← Q(s, a) + α[r + γ max_a' Q(s', a') - Q(s, a)]
其中:
3.2 Q-Learning实现
创建 agents/q_learning.py:
"""
Q-Learning 智能体实现
"""
"kw">class="kw">import numpy "kw">as np
"kw">from collections "kw">class="kw">import defaultdict
"kw">class QLearningAgent:
"""
Q-Learning 强化学习智能体
使用表格形式存储Q值,适用于离散状态和动作空间
"""
"kw">def __init__("kw">self, n_states, n_actions, learning_rate=0.1, gamma=0.99,
epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01):
"""
初始化Q-Learning智能体
参数:
n_states: 状态空间大小
n_actions: 动作空间大小
learning_rate: 学习率 (α)
gamma: 折扣因子 (γ)
epsilon: 探索率初始值
epsilon_decay: 探索率衰减率
epsilon_min: 探索率最小值
"""
"kw">self.n_states = n_states
"kw">self.n_actions = n_actions
"kw">self.lr = learning_rate
"kw">self.gamma = gamma
"kw">self.epsilon = epsilon
"kw">self.epsilon_decay = epsilon_decay
"kw">self.epsilon_min = epsilon_min
# 初始化Q表
"kw">self.q_table = np.zeros((n_states, n_actions))
# 训练统计
"kw">self.training_history = []
"kw">def select_action("kw">self, state):
"""
ε-greedy策略选择动作
参数:
state: 当前状态
返回:
action: 选择的动作
"""
"kw">if np.random.random() < "kw">self.epsilon:
# 随机探索
"kw">class="kw">return np.random.randint("kw">self.n_actions)
"kw">else:
# 利用已知最优
"kw">class="kw">return np.argmax("kw">self.q_table[state])
"kw">def update("kw">self, state, action, reward, next_state):
"""
更新Q值
Q(s, a) ← Q(s, a) + α[r + γ max Q(s', a') - Q(s, a)]
参数:
state: 当前状态
action: 执行的动作
reward: 获得的奖励
next_state: 下一状态
"""
current_q = "kw">self.q_table[state, action]
max_next_q = np.max("kw">self.q_table[next_state])
# TD误差
td_error = reward + "kw">self.gamma * max_next_q - current_q
# 更新Q值
"kw">self.q_table[state, action] = current_q + "kw">self.lr * td_error
"kw">class="kw">return td_error
"kw">def decay_epsilon("kw">self):
"""衰减探索率"""
"kw">self.epsilon = max("kw">self.epsilon_min, "kw">self.epsilon * "kw">self.epsilon_decay)
"kw">def train("kw">self, env, n_episodes=500, max_steps=100):
"""
训练智能体
参数:
env: Gymnasium环境
n_episodes: 训练的回合数
max_steps: 每回合最大步数
返回:
episode_rewards: 每回合的总奖励列表
"""
episode_rewards = []
"kw">for episode "kw">in "kw">range(n_episodes):
state, _ = env.reset()
total_reward = 0
td_errors = []
"kw">for step "kw">in "kw">range(max_steps):
# 选择动作
action = "kw">self.select_action(state)
# 执行动作
next_state, reward, terminated, truncated, info = env.step(action)
# 更新Q值
td_error = "kw">self.update(state, action, reward, next_state)
td_errors.append(abs(td_error))
total_reward += reward
state = next_state
"kw">if terminated "kw">or truncated:
"kw">break
# 衰减探索率
"kw">self.decay_epsilon()
episode_rewards.append(total_reward)
# 记录训练历史
"kw">self.training_history.append({
'episode': episode,
'reward': total_reward,
'epsilon': "kw">self.epsilon,
'mean_td_error': np.mean(td_errors) "kw">if td_errors "kw">else 0
})
# 打印训练进度
"kw">if (episode + 1) % 50 == 0:
recent_avg = np.mean(episode_rewards[-50:])
"kw">print(f"Episode {episode+1}/{n_episodes} | "
f"平均奖励: {recent_avg:.2f} | "
f"ε: {"kw">self.epsilon:.4f}")
"kw">class="kw">return episode_rewards
"kw">def get_policy("kw">self):
"""
获取当前最优策略
返回:
policy: 每个状态对应的最优动作
"""
"kw">class="kw">return np.argmax("kw">self.q_table, axis=1)
"kw">def evaluate("kw">self, env, n_episodes=10, max_steps=100):
"""
评估训练好的智能体
参数:
env: Gymnasium环境
n_episodes: 评估的回合数
max_steps: 每回合最大步数
返回:
mean_reward: 平均奖励
success_rate: 成功率
"""
episode_rewards = []
successes = 0
"kw">for episode "kw">in "kw">range(n_episodes):
state, _ = env.reset()
total_reward = 0
"kw">for step "kw">in "kw">range(max_steps):
action = np.argmax("kw">self.q_table[state]) # 贪心策略
next_state, reward, terminated, truncated, _ = env.step(action)
total_reward += reward
state = next_state
"kw">if terminated:
"kw">if reward > 0: # 成功完成任务
successes += 1
"kw">break
"kw">if truncated:
"kw">break
episode_rewards.append(total_reward)
"kw">class="kw">return np.mean(episode_rewards), successes / n_episodes
"kw">def visualize_q_table(q_table, grid_size=10):
"""
可视化Q表
参数:
q_table: Q值表
grid_size: 网格大小
"""
"kw">class="kw">import matplotlib.pyplot "kw">as plt
"kw">class="kw">import numpy "kw">as np
# 提取每个状态的最优动作和Q值
best_actions = np.argmax(q_table, axis=1).reshape(grid_size, grid_size)
max_q_values = np.max(q_table, axis=1).reshape(grid_size, grid_size)
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 最优动作可视化
ax = axes[0]
im = ax.imshow(best_actions, cmap='viridis', aspect='auto')
ax.set_title('最优动作 (0=上, 1=下, 2=左, 3=右)')
ax.set_xlabel('列')
ax.set_ylabel('行')
plt.colorbar(im, ax=ax)
# Q值可视化
ax = axes[1]
im = ax.imshow(max_q_values, cmap='plasma', aspect='auto')
ax.set_title('最大Q值')
ax.set_xlabel('列')
ax.set_ylabel('行')
plt.colorbar(im, ax=ax)
plt.tight_layout()
plt.savefig('q_table_visualization.png', dpi=150)
plt.show()
"kw">print("Q表可视化已保存到 q_table_visualization.png")
"kw">if __name__ == '__main__':
# 测试Q-Learning
"kw">from envs.grid_world "kw">class="kw">import GridWorldEnv
"kw">print("=" * 50)
"kw">print("Q-Learning 算法测试")
"kw">print("=" * 50)
# 创建环境
env = GridWorldEnv(grid_size=10)
# 创建智能体
agent = QLearningAgent(
n_states=env.observation_space.n,
n_actions=env.action_space.n,
learning_rate=0.1,
gamma=0.95,
epsilon=1.0,
epsilon_decay=0.99,
epsilon_min=0.01
)
# 训练
"kw">print("\n开始训练...")
rewards = agent.train(env, n_episodes=500, max_steps=100)
# 评估
"kw">print("\n评估训练结果...")
mean_reward, success_rate = agent.evaluate(env, n_episodes=50)
"kw">print(f"平均奖励: {mean_reward:.2f}")
"kw">print(f"成功率: {success_rate:.2%}")
# 可视化Q表
visualize_q_table(agent.q_table)
env.close()
3.3 运行Q-Learning
python agents/q_learning.py
---
6. 4. 深度Q网络(DQN)实现
4.1 DQN原理
当状态空间很大时(如图像),表格形式的Q-Learning不再适用。DQN使用深度神经网络来逼近Q函数。
DQN的关键技术:
4.2 DQN实现
创建 agents/dqn.py:
"""
深度Q网络 (DQN) 实现
"""
"kw">class="kw">import numpy "kw">as np
"kw">class="kw">import torch
"kw">class="kw">import torch.nn "kw">as nn
"kw">class="kw">import torch.optim "kw">as optim
"kw">from collections "kw">class="kw">import deque, namedtuple
"kw">class="kw">import random
# 定义转移样本
Transition = namedtuple('Transition',
('state', 'action', 'reward', 'next_state', 'done'))
"kw">class ReplayBuffer:
"""
经验回放缓冲区
存储转移样本,用于随机采样打破数据相关性
"""
"kw">def __init__("kw">self, capacity=10000):
"kw">self.buffer = deque(maxlen=capacity)
"kw">def push("kw">self, state, action, reward, next_state, done):
"""添加转移样本到缓冲区"""
"kw">self.buffer.append(Transition(state, action, reward, next_state, done))
"kw">def sample("kw">self, batch_size):
"""随机采样一批样本"""
batch = random.sample("kw">self.buffer, batch_size)
states = torch.FloatTensor(np.array([t.state "kw">for t "kw">in batch]))
actions = torch.LongTensor([t.action "kw">for t "kw">in batch])
rewards = torch.FloatTensor([t.reward "kw">for t "kw">in batch])
next_states = torch.FloatTensor(np.array([t.next_state "kw">for t "kw">in batch]))
dones = torch.FloatTensor([t.done "kw">for t "kw">in batch])
"kw">class="kw">return states, actions, rewards, next_states, dones
"kw">def __len__("kw">self):
"kw">class="kw">return "kw">len("kw">self.buffer)
"kw">class QNetwork(nn.Module):
"""
Q网络
用于逼近Q(s, a)
"""
"kw">def __init__("kw">self, state_dim, action_dim, hidden_dim=128):
super().__init__()
"kw">self.network = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim)
)
"kw">def forward("kw">self, x):
"kw">class="kw">return "kw">self.network(x)
"kw">class DQNAgent:
"""
DQN智能体
"""
"kw">def __init__("kw">self, state_dim, action_dim, hidden_dim=128,
learning_rate=0.001, gamma=0.99, epsilon=1.0,
epsilon_decay=0.995, epsilon_min=0.01,
target_update_freq=10, replay_buffer_size=10000):
"kw">self.state_dim = state_dim
"kw">self.action_dim = action_dim
"kw">self.gamma = gamma
"kw">self.epsilon = epsilon
"kw">self.epsilon_decay = epsilon_decay
"kw">self.epsilon_min = epsilon_min
"kw">self.target_update_freq = target_update_freq
# 设备
"kw">self.device = torch.device("cuda" "kw">if torch.cuda.is_available() "kw">else "cpu")
"kw">print(f"使用设备: {"kw">self.device}")
# Q网络和目标网络
"kw">self.q_network = QNetwork(state_dim, action_dim, hidden_dim).to("kw">self.device)
"kw">self.target_network = QNetwork(state_dim, action_dim, hidden_dim).to("kw">self.device)
"kw">self.target_network.load_state_dict("kw">self.q_network.state_dict())
# 优化器
"kw">self.optimizer = optim.Adam("kw">self.q_network.parameters(), lr=learning_rate)
# 经验回放缓冲区
"kw">self.replay_buffer = ReplayBuffer(replay_buffer_size)
# 训练统计
"kw">self.training_history = []
"kw">self.update_count = 0
"kw">def select_action("kw">self, state, training="kw">True):
"""ε-greedy策略选择动作"""
"kw">if training "kw">and np.random.random() < "kw">self.epsilon:
"kw">class="kw">return np.random.randint("kw">self.action_dim)
"kw">else:
"kw">with torch.no_grad():
state_tensor = torch.FloatTensor(state).unsqueeze(0).to("kw">self.device)
q_values = "kw">self.q_network(state_tensor)
"kw">class="kw">return q_values.argmax().item()
"kw">def update("kw">self, batch_size=64):
"""更新Q网络"""
"kw">if "kw">len("kw">self.replay_buffer) < batch_size:
"kw">class="kw">return "kw">None
# 采样
states, actions, rewards, next_states, dones = \
"kw">self.replay_buffer.sample(batch_size)
states = states.to("kw">self.device)
actions = actions.to("kw">self.device)
rewards = rewards.to("kw">self.device)
next_states = next_states.to("kw">self.device)
dones = dones.to("kw">self.device)
# 计算当前Q值
current_q = "kw">self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze()
# 计算目标Q值(使用目标网络)
"kw">with torch.no_grad():
max_next_q = "kw">self.target_network(next_states).max(1)[0]
target_q = rewards + "kw">self.gamma * max_next_q * (1 - dones)
# 计算损失
loss = nn.functional.mse_loss(current_q, target_q)
# 反向传播
"kw">self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_("kw">self.q_network.parameters(), 1.0)
"kw">self.optimizer.step()
# 定期更新目标网络
"kw">self.update_count += 1
"kw">if "kw">self.update_count % "kw">self.target_update_freq == 0:
"kw">self.target_network.load_state_dict("kw">self.q_network.state_dict())
"kw">class="kw">return loss.item()
"kw">def decay_epsilon("kw">self):
"""衰减探索率"""
"kw">self.epsilon = max("kw">self.epsilon_min, "kw">self.epsilon * "kw">self.epsilon_decay)
"kw">def train("kw">self, env, n_episodes=500, max_steps=100, batch_size=64):
"""训练智能体"""
episode_rewards = []
losses = []
"kw">for episode "kw">in "kw">range(n_episodes):
state, _ = env.reset()
# 将状态转为one-hot或直接使用索引
state = np.eye(env.observation_space.n)[state] # one-hot编码
total_reward = 0
episode_loss = []
"kw">for step "kw">in "kw">range(max_steps):
# 选择动作
action = "kw">self.select_action(state)
# 执行动作
next_state, reward, terminated, truncated, _ = env.step(action)
next_state_onehot = np.eye(env.observation_space.n)[next_state]
done = terminated "kw">or truncated
# 存储转移
"kw">self.replay_buffer.push(state, action, reward, next_state_onehot, done)
# 更新网络
loss = "kw">self.update(batch_size)
"kw">if loss "kw">is "kw">not "kw">None:
episode_loss.append(loss)
total_reward += reward
state = next_state_onehot
"kw">if done:
"kw">break
"kw">self.decay_epsilon()
episode_rewards.append(total_reward)
losses.append(np.mean(episode_loss) "kw">if episode_loss "kw">else 0)
"kw">if (episode + 1) % 50 == 0:
recent_avg = np.mean(episode_rewards[-50:])
"kw">print(f"Episode {episode+1}/{n_episodes} | "
f"平均奖励: {recent_avg:.2f} | "
f"ε: {"kw">self.epsilon:.4f}")
"kw">self.training_history = {
'rewards': episode_rewards,
'losses': losses
}
"kw">class="kw">return episode_rewards
"kw">def evaluate("kw">self, env, n_episodes=10, max_steps=100):
"""评估智能体"""
episode_rewards = []
successes = 0
"kw">for episode "kw">in "kw">range(n_episodes):
state, _ = env.reset()
state = np.eye(env.observation_space.n)[state]
total_reward = 0
"kw">for step "kw">in "kw">range(max_steps):
action = "kw">self.select_action(state, training="kw">False)
next_state, reward, terminated, truncated, _ = env.step(action)
next_state = np.eye(env.observation_space.n)[next_state]
total_reward += reward
state = next_state
"kw">if terminated:
"kw">if reward > 0:
successes += 1
"kw">break
"kw">if truncated:
"kw">break
episode_rewards.append(total_reward)
"kw">class="kw">return np.mean(episode_rewards), successes / n_episodes
"kw">def plot_training_history(history):
"""绘制训练曲线"""
"kw">class="kw">import matplotlib.pyplot "kw">as plt
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 奖励曲线
axes[0].plot(history['rewards'])
axes[0].set_xlabel('Episode')
axes[0].set_ylabel('Total Reward')
axes[0].set_title('Training Rewards')
axes[0].grid("kw">True)
# 损失曲线
axes[1].plot(history['losses'])
axes[1].set_xlabel('Episode')
axes[1].set_ylabel('Loss')
axes[1].set_title('Training Loss')
axes[1].grid("kw">True)
plt.tight_layout()
plt.savefig('dqn_training.png', dpi=150)
plt.show()
"kw">if __name__ == '__main__':
"kw">from envs.grid_world "kw">class="kw">import GridWorldEnv
"kw">print("=" * 50)
"kw">print("深度Q网络 (DQN) 算法测试")
"kw">print("=" * 50)
# 创建环境
env = GridWorldEnv(grid_size=10)
# 创建智能体
state_dim = env.observation_space.n # 100个状态
action_dim = env.action_space.n # 4个动作
agent = DQNAgent(
state_dim=state_dim,
action_dim=action_dim,
hidden_dim=128,
learning_rate=0.001,
gamma=0.99,
epsilon=1.0,
epsilon_decay=0.99,
epsilon_min=0.01
)
# 训练
"kw">print("\n开始训练...")
rewards = agent.train(env, n_episodes=500, max_steps=100, batch_size=64)
# 评估
"kw">print("\n评估训练结果...")
mean_reward, success_rate = agent.evaluate(env, n_episodes=50)
"kw">print(f"平均奖励: {mean_reward:.2f}")
"kw">print(f"成功率: {success_rate:.2%}")
# 绘制训练曲线
plot_training_history(agent.training_history)
env.close()
---
7. 5. 训练与评估
5.1 训练脚本
创建 train.py:
"""训练和评估脚本"""
"kw">class="kw">import argparse
"kw">class="kw">import numpy "kw">as np
"kw">class="kw">import matplotlib.pyplot "kw">as plt
"kw">from envs.grid_world "kw">class="kw">import GridWorldEnv
"kw">from agents.q_learning "kw">class="kw">import QLearningAgent
"kw">from agents.dqn "kw">class="kw">import DQNAgent
"kw">def plot_comparison(q_rewards, dqn_rewards, title="Algorithm Comparison"):
"""绘制算法对比图"""
plt.figure(figsize=(12, 5))
# 奖励曲线
plt.subplot(1, 2, 1)
plt.plot(q_rewards, label='Q-Learning', alpha=0.7)
plt.plot(dqn_rewards, label='DQN', alpha=0.7)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Training Rewards')
plt.legend()
plt.grid("kw">True)
# 移动平均
window = 20
plt.subplot(1, 2, 2)
q_ma = np.convolve(q_rewards, np.ones(window)/window, mode='valid')
dqn_ma = np.convolve(dqn_rewards, np.ones(window)/window, mode='valid')
plt.plot(q_ma, label='Q-Learning (MA)', linewidth=2)
plt.plot(dqn_ma, label='DQN (MA)', linewidth=2)
plt.xlabel('Episode')
plt.ylabel('Moving Average Reward')
plt.title(f'{window}-Episode Moving Average')
plt.legend()
plt.grid("kw">True)
plt.tight_layout()
plt.savefig('algorithm_comparison.png', dpi=150)
plt.show()
"kw">def main():
parser = argparse.ArgumentParser(description='训练强化学习智能体')
parser.add_argument('--algorithm', type=str, default='dqn',
choices=['qlearning', 'dqn'],
help='选择算法')
parser.add_argument('--episodes', type=int, default=500,
help='训练回合数')
args = parser.parse_args()
# 创建环境
env = GridWorldEnv(grid_size=10)
"kw">if args.algorithm == 'qlearning':
"kw">print("训练 Q-Learning 智能体...")
agent = QLearningAgent(
n_states=env.observation_space.n,
n_actions=env.action_space.n,
learning_rate=0.1,
gamma=0.95,
epsilon=1.0,
epsilon_decay=0.99,
epsilon_min=0.01
)
rewards = agent.train(env, n_episodes=args.episodes, max_steps=100)
mean_reward, success_rate = agent.evaluate(env, n_episodes=50)
"kw">else: # dqn
"kw">print("训练 DQN 智能体...")
agent = DQNAgent(
state_dim=env.observation_space.n,
action_dim=env.action_space.n,
hidden_dim=128,
learning_rate=0.001,
gamma=0.99,
epsilon=1.0,
epsilon_decay=0.99,
epsilon_min=0.01
)
rewards = agent.train(env, n_episodes=args.episodes, max_steps=100, batch_size=64)
mean_reward, success_rate = agent.evaluate(env, n_episodes=50)
"kw">print(f"\n最终结果:")
"kw">print(f" 平均奖励: {mean_reward:.2f}")
"kw">print(f" 成功率: {success_rate:.2%}")
env.close()
"kw">if __name__ == '__main__':
main()
5.2 运行训练
# 训练Q-Learning
python train.py --algorithm qlearning --episodes 500
# 训练DQN
python train.py --algorithm dqn --episodes 500
---
8. 6. 进阶:PPO算法
6.1 PPO原理
PPO (Proximal Policy Optimization) 是一种策略梯度算法,通过限制策略更新幅度来提高训练稳定性。
核心思想:
L^CLIP(θ) = E[ min(r(θ) * A_t, clip(r(θ), 1-ε, 1+ε) * A_t) ]
其中 r(θ) = π_θ(a|s) / π_θ_old(a|s) 是概率比率
6.2 使用Stable-Baselines3实现PPO
创建 train_ppo.py:
"""
使用Stable-Baselines3实现PPO
"""
"kw">class="kw">import gymnasium "kw">as gym
"kw">from stable_baselines3 "kw">class="kw">import PPO
"kw">from stable_baselines3.common.env_util "kw">class="kw">import make_vec_env
"kw">class="kw">import numpy "kw">as np
"kw">def main():
# 创建向量化环境(加速训练)
env = make_vec_env('CartPole-v1', n_envs=4)
# 创建PPO模型
model = PPO(
'MlpPolicy',
env,
learning_rate=3e-4,
n_steps=2048,
batch_size=64,
n_epochs=10,
gamma=0.99,
gae_lambda=0.95,
clip_range=0.2,
ent_coef=0.01,
verbose=1
)
# 训练
"kw">print("训练 PPO 智能体...")
model.learn(total_timesteps=100000, progress_bar="kw">True)
# 评估
"kw">print("\n评估训练结果...")
eval_env = gym.make('CartPole-v1')
obs, _ = eval_env.reset()
total_reward = 0
n_episodes = 10
"kw">for episode "kw">in "kw">range(n_episodes):
episode_reward = 0
done = "kw">False
"kw">while "kw">not done:
action, _ = model.predict(obs)
obs, reward, terminated, truncated, _ = eval_env.step(action)
episode_reward += reward
done = terminated "kw">or truncated
"kw">if terminated "kw">or truncated:
obs, _ = eval_env.reset()
total_reward += episode_reward
"kw">print(f"Episode {episode+1}: reward = {episode_reward}")
"kw">break
"kw">print(f"\n平均奖励: {total_reward / n_episodes:.2f}")
# 保存模型
model.save("ppo_cartpole")
"kw">print("模型已保存到 ppo_cartpole.zip")
"kw">if __name__ == '__main__':
main()
---
9. 7. 可视化与调试
7.1 创建训练可视化工具
创建 visualize.py:
"""
训练过程可视化工具
"""
"kw">class="kw">import numpy "kw">as np
"kw">class="kw">import matplotlib.pyplot "kw">as plt
"kw">from matplotlib.patches "kw">class="kw">import FancyBboxPatch, Circle, Rectangle
"kw">class TrainingVisualizer:
"""训练过程可视化"""
"kw">def __init__("kw">self, grid_size=10):
"kw">self.grid_size = grid_size
"kw">def visualize_episode("kw">self, episode_data):
"""
可视化一个回合的轨迹
episode_data: list of (state, action, reward) tuples
"""
fig, axes = plt.subplots(2, 5, figsize=(15, 6))
axes = axes.flatten()
# 只显示前10步
"kw">for i, (state, action, reward) "kw">in enumerate(episode_data[:10]):
ax = axes[i]
# 计算位置
row, col = state // "kw">self.grid_size, state % "kw">self.grid_size
# 绘制网格
ax.set_xlim(-0.5, "kw">self.grid_size - 0.5)
ax.set_ylim(-0.5, "kw">self.grid_size - 0.5)
ax.set_aspect('equal')
"kw">for r "kw">in "kw">range("kw">self.grid_size):
"kw">for c "kw">in "kw">range("kw">self.grid_size):
rect = Rectangle((c, r), 1, 1, fill="kw">False, edgecolor='gray', linewidth=0.5)
ax.add_patch(rect)
# 绘制智能体
agent = Circle((col + 0.5, row + 0.5), 0.3, color='orange')
ax.add_patch(agent)
# 绘制动作方向
action_dirs = {0: (0, 0.5), 1: (0, -0.5), 2: (-0.5, 0), 3: (0.5, 0)}
dx, dy = action_dirs[action]
ax.annotate('', xy=(col + 0.5 + dx, row + 0.5 + dy),
xytext=(col + 0.5, row + 0.5),
arrowprops=dict(arrowstyle='->', color='red', lw=2))
ax.set_title(f'Step {i+1}: a={action}, r={reward:.1f}')
ax.axis('off')
plt.tight_layout()
plt.savefig('episode_visualization.png', dpi=150)
plt.show()
"kw">def plot_learning_curve("kw">self, rewards, window=20):
"""绘制学习曲线"""
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 原始奖励
axes[0].plot(rewards, alpha=0.5)
axes[0].set_xlabel('Episode')
axes[0].set_ylabel('Total Reward')
axes[0].set_title('Training Rewards')
axes[0].grid("kw">True)
# 移动平均
ma = np.convolve(rewards, np.ones(window)/window, mode='valid')
axes[1].plot(ma, color='orange', linewidth=2)
axes[1].set_xlabel('Episode')
axes[1].set_ylabel(f'{window}-Episode Moving Average')
axes[1].set_title('Learning Curve (Smoothed)')
axes[1].grid("kw">True)
plt.tight_layout()
plt.savefig('learning_curve.png', dpi=150)
plt.show()
"kw">if __name__ == '__main__':
# 演示可视化
"kw">class="kw">import numpy "kw">as np
# 生成模拟数据
episode_data = []
state = 0
"kw">for i "kw">in "kw">range(15):
action = np.random.randint(4)
reward = np.random.randn()
episode_data.append((state, action, reward))
state = (state + np.random.randint(1, 4)) % 100
visualizer = TrainingVisualizer(grid_size=10)
visualizer.visualize_episode(episode_data)
# 模拟学习曲线
rewards = np.cumsum(np.random.randn(500)) + 50
rewards = rewards - np.arange(500) * 0.1 + np.random.rand(500) * 20
visualizer.plot_learning_curve(rewards)
---
10. 总结与下一步
项目总结
完成本项目后,你将掌握:
扩展建议
- 添加随机障碍物生成
- 实现3D环境
- 添加传感器噪声
- 实现Double DQN
- 实现Dueling DQN
- 实现PPO算法
- 迁移到Gazebo仿真环境
- 结合视觉输入(CNN)
- 实现多智能体协作
参考资源
---
下一步:项目二:机器人抓取仿真