中级 · 2周

项目一:强化学习机器人导航

从零实现Q-Learning和DQN,在Gymnasium环境中训练导航智能体——理解强化学习的核心机制如何驱动自主决策

中级难度 预计2周 10个章节

1. 项目概述

本项目将从头构建一个基于深度强化学习的机器人导航系统。你将学习:

  • 如何将机器人导航问题建模为马尔可夫决策过程(MDP)
  • 如何实现Q-Learning和DQN算法
  • 如何在Gymnasium环境中训练和评估智能体
  • 如何实现Sim-to-Real迁移的基本概念
  • 难度:⭐⭐⭐

    预计时间:2周

    前置知识:Python基础、线性代数、强化学习基础


    ---


    2. 目录

  • 项目环境搭建
  • 问题定义与建模
  • Q-Learning算法实现
  • 深度Q网络(DQN)实现
  • 训练与评估
  • 进阶:PPO算法
  • 可视化与调试
  • ---


    3. 1. 项目环境搭建

    1.1 创建项目目录

    cd ~/Documents/具身智能
    mkdir -p projects/robot_navigation
    cd projects/robot_navigation

    1.2 创建虚拟环境(推荐)

    # 创建虚拟环境
    python -m venv venv
    
    # 激活虚拟环境
    # macOS/Linux:
    source venv/bin/activate
    # Windows:
    # venv\Scripts\activate

    1.3 安装依赖

    创建 requirements.txt


    # 强化学习核心
    gymnasium>=0.26.0
    stable-baselines3>=1.6.0
    
    # 深度学习框架(选择一种)
    torch>=1.10.0  # 推荐PyTorch
    
    # 可视化
    matplotlib>=3.4.0
    pygame>=2.0.0
    
    # 数据处理
    numpy>=1.21.0

    安装依赖:


    pip install -r requirements.txt

    1.4 验证安装

    创建 check_env.py


    #!/usr/bin/env python3
    """验证环境安装"""
    
    "kw">class="kw">import sys
    "kw">print(f"Python版本: {sys.version}")
    
    # 检查核心库
    "kw">try:
        "kw">class="kw">import gymnasium
        "kw">print(f"✓ Gymnasium版本: {gymnasium.__version__}")
    "kw">class="kw">except ImportError:
        "kw">print("✗ Gymnasium未安装")
    
    "kw">try:
        "kw">class="kw">import torch
        "kw">print(f"✓ PyTorch版本: {torch.__version__}")
        "kw">print(f"  CUDA可用: {torch.cuda.is_available()}")
    "kw">class="kw">except ImportError:
        "kw">print("✗ PyTorch未安装")
    
    "kw">try:
        "kw">class="kw">import numpy "kw">as np
        "kw">print(f"✓ NumPy版本: {np.__version__}")
    "kw">class="kw">except ImportError:
        "kw">print("✗ NumPy未安装")
    
    "kw">try:
        "kw">class="kw">import matplotlib.pyplot "kw">as plt
        "kw">print(f"✓ Matplotlib版本: {plt.matplotlib.__version__}")
    "kw">class="kw">except ImportError:
        "kw">print("✗ Matplotlib未安装")
    
    "kw">print("\n环境验证完成!")

    运行:


    python check_env.py

    ---


    4. 2. 问题定义与建模

    2.1 机器人导航问题

    目标:让机器人在二维网格世界中从起始位置移动到目标位置,同时避开障碍物。


    环境描述

  • 网格世界:10×10的格子
  • 状态:机器人位置 (x, y)
  • 动作:上、下、左、右 四个方向
  • 奖励:接近目标正奖励,碰到障碍物负奖励
  • 2.2 创建自定义Gymnasium环境

    创建 envs/grid_world.py


    """
    自定义网格世界导航环境
    基于Gymnasium API实现
    """
    
    "kw">class="kw">import numpy "kw">as np
    "kw">class="kw">import gymnasium "kw">as gym
    "kw">from gymnasium "kw">class="kw">import spaces
    
    
    "kw">class GridWorldEnv(gym.Env):
        """
        10×10网格世界导航环境
        
        状态空间:机器人位置 (0-99)
        动作空间:0=上, 1=下, 2=左, 3=右
        """
        
        metadata = {'render_modes': ['human', 'rgb_array'], 'render_fps': 10}
        
        "kw">def __init__("kw">self, grid_size=10, max_steps=100, render_mode="kw">None):
            super().__init__()
            
            "kw">self.grid_size = grid_size
            "kw">self.max_steps = max_steps
            "kw">self.render_mode = render_mode
            
            # 状态空间:grid_size × grid_size
            "kw">self.observation_space = spaces.Discrete(grid_size * grid_size)
            
            # 动作空间:4个方向
            "kw">self.action_space = spaces.Discrete(4)
            
            # 定义网格(0=可通过, 1=障碍物, 2=目标)
            "kw">self.grid = np.zeros((grid_size, grid_size), dtype=np.int8)
            
            # 设置障碍物
            "kw">self._setup_obstacles()
            
            # 状态
            "kw">self.agent_pos = "kw">None
            "kw">self.goal_pos = "kw">None
            "kw">self.steps = "kw">None
            
            # 渲染
            "kw">self.window = "kw">None
            "kw">self.clock = "kw">None
        
        "kw">def _setup_obstacles("kw">self):
            """设置障碍物布局"""
            # L形障碍
            "kw">self.grid[3, 3:7] = 1
            "kw">self.grid[3:8, 3] = 1
            
            # 分散的障碍
            "kw">self.grid[1, 5] = 1
            "kw">self.grid[5, 1] = 1
            "kw">self.grid[7, 8] = 1
            "kw">self.grid[8, 2] = 1
        
        "kw">def _pos_to_state("kw">self, pos):
            """位置转状态"""
            "kw">class="kw">return pos[0] * "kw">self.grid_size + pos[1]
        
        "kw">def _state_to_pos("kw">self, state):
            """状态转位置"""
            "kw">class="kw">return (state // "kw">self.grid_size, state % "kw">self.grid_size)
        
        "kw">def reset("kw">self, seed="kw">None, options="kw">None):
            """重置环境"""
            super().reset(seed=seed)
            
            # 随机设置起点(角落区域)
            "kw">self.agent_pos = (
                "kw">self.np_random.integers(0, 3),
                "kw">self.np_random.integers(0, 3)
            )
            
            # 设置目标位置(对角区域)
            "kw">self.goal_pos = (
                "kw">self.np_random.integers("kw">self.grid_size-3, "kw">self.grid_size),
                "kw">self.np_random.integers("kw">self.grid_size-3, "kw">self.grid_size)
            )
            
            # 确保目标不在障碍物上
            "kw">while "kw">self.grid["kw">self.goal_pos] == 1:
                "kw">self.goal_pos = (
                    "kw">self.np_random.integers("kw">self.grid_size-3, "kw">self.grid_size),
                    "kw">self.np_random.integers("kw">self.grid_size-3, "kw">self.grid_size)
                )
            
            "kw">self.steps = 0
            
            observation = "kw">self._pos_to_state("kw">self.agent_pos)
            info = {}
            
            "kw">if "kw">self.render_mode == 'human':
                "kw">self._render_frame()
            
            "kw">class="kw">return observation, info
        
        "kw">def step("kw">self, action):
            """执行动作"""
            row, col = "kw">self.agent_pos
            
            # 动作映射:0=上, 1=下, 2=左, 3=右
            new_row, new_col = row, col
            "kw">if action == 0:  # 上
                new_row = max(0, row - 1)
            "kw">elif action == 1:  # 下
                new_row = min("kw">self.grid_size - 1, row + 1)
            "kw">elif action == 2:  # 左
                new_col = max(0, col - 1)
            "kw">elif action == 3:  # 右
                new_col = min("kw">self.grid_size - 1, col + 1)
            
            # 检查碰撞
            "kw">if "kw">self.grid[new_row, new_col] == 1:
                # 碰到障碍物,保持原位,给予负奖励
                reward = -1.0
                terminated = "kw">False
            "kw">else:
                "kw">self.agent_pos = (new_row, new_col)
                
                # 计算奖励
                distance = abs(new_row - "kw">self.goal_pos[0]) + abs(new_col - "kw">self.goal_pos[1])
                reward = -0.1  # 每步小惩罚
                
                # 到达目标
                "kw">if "kw">self.agent_pos == "kw">self.goal_pos:
                    reward = 10.0
                    terminated = "kw">True
                "kw">else:
                    # 接近目标奖励
                    reward += 0.1 * ("kw">self.grid_size * 2 - distance) / ("kw">self.grid_size * 2)
                    terminated = "kw">False
            
            "kw">self.steps += 1
            
            # 超时
            "kw">if "kw">self.steps >= "kw">self.max_steps:
                terminated = "kw">True
                reward = -1.0
            
            observation = "kw">self._pos_to_state("kw">self.agent_pos)
            info = {
                'agent_pos': "kw">self.agent_pos,
                'goal_pos': "kw">self.goal_pos,
                'steps': "kw">self.steps
            }
            
            "kw">if "kw">self.render_mode == 'human':
                "kw">self._render_frame()
            
            "kw">class="kw">return observation, reward, terminated, "kw">False, info
        
        "kw">def render("kw">self):
            """渲染环境"""
            "kw">if "kw">self.render_mode == 'human':
                "kw">class="kw">return "kw">self._render_frame()
            "kw">class="kw">return "kw">None
        
        "kw">def _render_frame("kw">self):
            """绘制一帧"""
            "kw">class="kw">import pygame
            "kw">class="kw">import numpy "kw">as np
            
            "kw">if "kw">self.window "kw">is "kw">None:
                pygame.init()
                "kw">self.window = pygame.display.set_mode((500, 500))
                pygame.display.set_caption("Grid World Navigation")
            
            "kw">if "kw">self.clock "kw">is "kw">None:
                "kw">self.clock = pygame.time.Clock()
            
            # 清屏
            "kw">self.window.fill((30, 30, 40))
            
            # 绘制网格
            cell_size = 500 // "kw">self.grid_size
            
            "kw">for row "kw">in "kw">range("kw">self.grid_size):
                "kw">for col "kw">in "kw">range("kw">self.grid_size):
                    rect = pygame.Rect(col * cell_size, row * cell_size, cell_size, cell_size)
                    
                    "kw">if "kw">self.grid[row, col] == 1:
                        pygame.draw.rect("kw">self.window, (100, 100, 100), rect)
                    "kw">elif (row, col) == "kw">self.goal_pos:
                        pygame.draw.rect("kw">self.window, (0, 200, 100), rect)
                    "kw">else:
                        pygame.draw.rect("kw">self.window, (60, 60, 80), rect, 1)
            
            # 绘制起点
            start = (0, 0)
            start_rect = pygame.Rect(start[1] * cell_size, start[0] * cell_size, cell_size, cell_size)
            pygame.draw.rect("kw">self.window, (100, 100, 200), start_rect)
            
            # 绘制智能体
            agent_rect = pygame.Rect(
                "kw">self.agent_pos[1] * cell_size + 5,
                "kw">self.agent_pos[0] * cell_size + 5,
                cell_size - 10,
                cell_size - 10
            )
            pygame.draw.circle("kw">self.window, (255, 200, 0), agent_rect.center, cell_size // 3)
            
            pygame.display.flip()
            "kw">self.clock.tick("kw">self.metadata['render_fps'])
        
        "kw">def close("kw">self):
            """关闭环境"""
            "kw">if "kw">self.window "kw">is "kw">not "kw">None:
                "kw">class="kw">import pygame
                pygame.quit()
                "kw">self.window = "kw">None
        
        "kw">def get_state("kw">self):
            """获取当前状态(用于调试)"""
            "kw">class="kw">return {
                'agent_pos': "kw">self.agent_pos,
                'goal_pos': "kw">self.goal_pos,
                'steps': "kw">self.steps,
                'grid': "kw">self.grid.copy()
            }

    2.3 测试环境

    创建 test_env.py


    """测试自定义环境"""
    
    "kw">class="kw">import sys
    sys.path.append('.')
    
    "kw">from envs.grid_world "kw">class="kw">import GridWorldEnv
    
    "kw">def test_environment():
        """测试环境功能"""
        "kw">print("=" * 50)
        "kw">print("测试 GridWorldEnv")
        "kw">print("=" * 50)
        
        # 创建环境
        env = GridWorldEnv(grid_size=10, render_mode="kw">None)
        
        "kw">print(f"状态空间: {env.observation_space}")
        "kw">print(f"动作空间: {env.action_space}")
        "kw">print(f"网格大小: {env.grid_size}×{env.grid_size}")
        
        # 重置环境
        observation, info = env.reset()
        state = env.get_state()
        
        "kw">print(f"\n初始状态:")
        "kw">print(f"  智能体位置: {state['agent_pos']}")
        "kw">print(f"  目标位置: {state['goal_pos']}")
        "kw">print(f"  观察值: {observation}")
        
        # 测试几个动作
        "kw">print("\n执行10步随机动作:")
        total_reward = 0
        
        "kw">for step "kw">in "kw">range(10):
            action = env.action_space.sample()
            observation, reward, terminated, truncated, info = env.step(action)
            total_reward += reward
            
            state = env.get_state()
            "kw">print(f"  步骤{step+1}: 动作={action}, 奖励={reward:.2f}, "
                  f"位置={state['agent_pos']}, 完成={terminated}")
            
            "kw">if terminated:
                "kw">print(f"  任务完成!总奖励: {total_reward:.2f}")
                "kw">break
        
        env.close()
        "kw">print("\n环境测试完成!")
    
    "kw">if __name__ == '__main__':
        test_environment()

    运行测试:


    python test_env.py

    ---


    5. 3. Q-Learning算法实现

    3.1 算法原理

    Q-Learning是一种基于值函数的强化学习算法,通过迭代更新Q表来学习最优策略。


    Q值更新公式

    Q(s, a) ← Q(s, a) + α[r + γ max_a' Q(s', a') - Q(s, a)]

    其中:

  • α:学习率
  • γ:折扣因子
  • r:即时奖励
  • max_a' Q(s', a'):下一状态的最大Q值
  • 3.2 Q-Learning实现

    创建 agents/q_learning.py


    """
    Q-Learning 智能体实现
    """
    
    "kw">class="kw">import numpy "kw">as np
    "kw">from collections "kw">class="kw">import defaultdict
    
    
    "kw">class QLearningAgent:
        """
        Q-Learning 强化学习智能体
        
        使用表格形式存储Q值,适用于离散状态和动作空间
        """
        
        "kw">def __init__("kw">self, n_states, n_actions, learning_rate=0.1, gamma=0.99,
                     epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01):
            """
            初始化Q-Learning智能体
            
            参数:
                n_states: 状态空间大小
                n_actions: 动作空间大小
                learning_rate: 学习率 (α)
                gamma: 折扣因子 (γ)
                epsilon: 探索率初始值
                epsilon_decay: 探索率衰减率
                epsilon_min: 探索率最小值
            """
            "kw">self.n_states = n_states
            "kw">self.n_actions = n_actions
            "kw">self.lr = learning_rate
            "kw">self.gamma = gamma
            "kw">self.epsilon = epsilon
            "kw">self.epsilon_decay = epsilon_decay
            "kw">self.epsilon_min = epsilon_min
            
            # 初始化Q表
            "kw">self.q_table = np.zeros((n_states, n_actions))
            
            # 训练统计
            "kw">self.training_history = []
        
        "kw">def select_action("kw">self, state):
            """
            ε-greedy策略选择动作
            
            参数:
                state: 当前状态
                
            返回:
                action: 选择的动作
            """
            "kw">if np.random.random() < "kw">self.epsilon:
                # 随机探索
                "kw">class="kw">return np.random.randint("kw">self.n_actions)
            "kw">else:
                # 利用已知最优
                "kw">class="kw">return np.argmax("kw">self.q_table[state])
        
        "kw">def update("kw">self, state, action, reward, next_state):
            """
            更新Q值
            
            Q(s, a) ← Q(s, a) + α[r + γ max Q(s', a') - Q(s, a)]
            
            参数:
                state: 当前状态
                action: 执行的动作
                reward: 获得的奖励
                next_state: 下一状态
            """
            current_q = "kw">self.q_table[state, action]
            max_next_q = np.max("kw">self.q_table[next_state])
            
            # TD误差
            td_error = reward + "kw">self.gamma * max_next_q - current_q
            
            # 更新Q值
            "kw">self.q_table[state, action] = current_q + "kw">self.lr * td_error
            
            "kw">class="kw">return td_error
        
        "kw">def decay_epsilon("kw">self):
            """衰减探索率"""
            "kw">self.epsilon = max("kw">self.epsilon_min, "kw">self.epsilon * "kw">self.epsilon_decay)
        
        "kw">def train("kw">self, env, n_episodes=500, max_steps=100):
            """
            训练智能体
            
            参数:
                env: Gymnasium环境
                n_episodes: 训练的回合数
                max_steps: 每回合最大步数
                
            返回:
                episode_rewards: 每回合的总奖励列表
            """
            episode_rewards = []
            
            "kw">for episode "kw">in "kw">range(n_episodes):
                state, _ = env.reset()
                total_reward = 0
                td_errors = []
                
                "kw">for step "kw">in "kw">range(max_steps):
                    # 选择动作
                    action = "kw">self.select_action(state)
                    
                    # 执行动作
                    next_state, reward, terminated, truncated, info = env.step(action)
                    
                    # 更新Q值
                    td_error = "kw">self.update(state, action, reward, next_state)
                    td_errors.append(abs(td_error))
                    
                    total_reward += reward
                    state = next_state
                    
                    "kw">if terminated "kw">or truncated:
                        "kw">break
                
                # 衰减探索率
                "kw">self.decay_epsilon()
                
                episode_rewards.append(total_reward)
                
                # 记录训练历史
                "kw">self.training_history.append({
                    'episode': episode,
                    'reward': total_reward,
                    'epsilon': "kw">self.epsilon,
                    'mean_td_error': np.mean(td_errors) "kw">if td_errors "kw">else 0
                })
                
                # 打印训练进度
                "kw">if (episode + 1) % 50 == 0:
                    recent_avg = np.mean(episode_rewards[-50:])
                    "kw">print(f"Episode {episode+1}/{n_episodes} | "
                          f"平均奖励: {recent_avg:.2f} | "
                          f"ε: {"kw">self.epsilon:.4f}")
            
            "kw">class="kw">return episode_rewards
        
        "kw">def get_policy("kw">self):
            """
            获取当前最优策略
            
            返回:
                policy: 每个状态对应的最优动作
            """
            "kw">class="kw">return np.argmax("kw">self.q_table, axis=1)
        
        "kw">def evaluate("kw">self, env, n_episodes=10, max_steps=100):
            """
            评估训练好的智能体
            
            参数:
                env: Gymnasium环境
                n_episodes: 评估的回合数
                max_steps: 每回合最大步数
                
            返回:
                mean_reward: 平均奖励
                success_rate: 成功率
            """
            episode_rewards = []
            successes = 0
            
            "kw">for episode "kw">in "kw">range(n_episodes):
                state, _ = env.reset()
                total_reward = 0
                
                "kw">for step "kw">in "kw">range(max_steps):
                    action = np.argmax("kw">self.q_table[state])  # 贪心策略
                    next_state, reward, terminated, truncated, _ = env.step(action)
                    
                    total_reward += reward
                    state = next_state
                    
                    "kw">if terminated:
                        "kw">if reward > 0:  # 成功完成任务
                            successes += 1
                        "kw">break
                    "kw">if truncated:
                        "kw">break
                
                episode_rewards.append(total_reward)
            
            "kw">class="kw">return np.mean(episode_rewards), successes / n_episodes
    
    
    "kw">def visualize_q_table(q_table, grid_size=10):
        """
        可视化Q表
        
        参数:
            q_table: Q值表
            grid_size: 网格大小
        """
        "kw">class="kw">import matplotlib.pyplot "kw">as plt
        "kw">class="kw">import numpy "kw">as np
        
        # 提取每个状态的最优动作和Q值
        best_actions = np.argmax(q_table, axis=1).reshape(grid_size, grid_size)
        max_q_values = np.max(q_table, axis=1).reshape(grid_size, grid_size)
        
        fig, axes = plt.subplots(1, 2, figsize=(14, 5))
        
        # 最优动作可视化
        ax = axes[0]
        im = ax.imshow(best_actions, cmap='viridis', aspect='auto')
        ax.set_title('最优动作 (0=上, 1=下, 2=左, 3=右)')
        ax.set_xlabel('列')
        ax.set_ylabel('行')
        plt.colorbar(im, ax=ax)
        
        # Q值可视化
        ax = axes[1]
        im = ax.imshow(max_q_values, cmap='plasma', aspect='auto')
        ax.set_title('最大Q值')
        ax.set_xlabel('列')
        ax.set_ylabel('行')
        plt.colorbar(im, ax=ax)
        
        plt.tight_layout()
        plt.savefig('q_table_visualization.png', dpi=150)
        plt.show()
        "kw">print("Q表可视化已保存到 q_table_visualization.png")
    
    
    "kw">if __name__ == '__main__':
        # 测试Q-Learning
        "kw">from envs.grid_world "kw">class="kw">import GridWorldEnv
        
        "kw">print("=" * 50)
        "kw">print("Q-Learning 算法测试")
        "kw">print("=" * 50)
        
        # 创建环境
        env = GridWorldEnv(grid_size=10)
        
        # 创建智能体
        agent = QLearningAgent(
            n_states=env.observation_space.n,
            n_actions=env.action_space.n,
            learning_rate=0.1,
            gamma=0.95,
            epsilon=1.0,
            epsilon_decay=0.99,
            epsilon_min=0.01
        )
        
        # 训练
        "kw">print("\n开始训练...")
        rewards = agent.train(env, n_episodes=500, max_steps=100)
        
        # 评估
        "kw">print("\n评估训练结果...")
        mean_reward, success_rate = agent.evaluate(env, n_episodes=50)
        "kw">print(f"平均奖励: {mean_reward:.2f}")
        "kw">print(f"成功率: {success_rate:.2%}")
        
        # 可视化Q表
        visualize_q_table(agent.q_table)
        
        env.close()

    3.3 运行Q-Learning

    python agents/q_learning.py

    ---


    6. 4. 深度Q网络(DQN)实现

    4.1 DQN原理

    当状态空间很大时(如图像),表格形式的Q-Learning不再适用。DQN使用深度神经网络来逼近Q函数。


    DQN的关键技术

  • 经验回放 (Experience Replay):存储转移样本,随机采样更新
  • 目标网络 (Target Network):定期复制网络参数,稳定训练
  • 4.2 DQN实现

    创建 agents/dqn.py


    """
    深度Q网络 (DQN) 实现
    """
    
    "kw">class="kw">import numpy "kw">as np
    "kw">class="kw">import torch
    "kw">class="kw">import torch.nn "kw">as nn
    "kw">class="kw">import torch.optim "kw">as optim
    "kw">from collections "kw">class="kw">import deque, namedtuple
    "kw">class="kw">import random
    
    
    # 定义转移样本
    Transition = namedtuple('Transition', 
                           ('state', 'action', 'reward', 'next_state', 'done'))
    
    
    "kw">class ReplayBuffer:
        """
        经验回放缓冲区
        存储转移样本,用于随机采样打破数据相关性
        """
        
        "kw">def __init__("kw">self, capacity=10000):
            "kw">self.buffer = deque(maxlen=capacity)
        
        "kw">def push("kw">self, state, action, reward, next_state, done):
            """添加转移样本到缓冲区"""
            "kw">self.buffer.append(Transition(state, action, reward, next_state, done))
        
        "kw">def sample("kw">self, batch_size):
            """随机采样一批样本"""
            batch = random.sample("kw">self.buffer, batch_size)
            
            states = torch.FloatTensor(np.array([t.state "kw">for t "kw">in batch]))
            actions = torch.LongTensor([t.action "kw">for t "kw">in batch])
            rewards = torch.FloatTensor([t.reward "kw">for t "kw">in batch])
            next_states = torch.FloatTensor(np.array([t.next_state "kw">for t "kw">in batch]))
            dones = torch.FloatTensor([t.done "kw">for t "kw">in batch])
            
            "kw">class="kw">return states, actions, rewards, next_states, dones
        
        "kw">def __len__("kw">self):
            "kw">class="kw">return "kw">len("kw">self.buffer)
    
    
    "kw">class QNetwork(nn.Module):
        """
        Q网络
        用于逼近Q(s, a)
        """
        
        "kw">def __init__("kw">self, state_dim, action_dim, hidden_dim=128):
            super().__init__()
            
            "kw">self.network = nn.Sequential(
                nn.Linear(state_dim, hidden_dim),
                nn.ReLU(),
                nn.Linear(hidden_dim, hidden_dim),
                nn.ReLU(),
                nn.Linear(hidden_dim, hidden_dim),
                nn.ReLU(),
                nn.Linear(hidden_dim, action_dim)
            )
        
        "kw">def forward("kw">self, x):
            "kw">class="kw">return "kw">self.network(x)
    
    
    "kw">class DQNAgent:
        """
        DQN智能体
        """
        
        "kw">def __init__("kw">self, state_dim, action_dim, hidden_dim=128,
                     learning_rate=0.001, gamma=0.99, epsilon=1.0,
                     epsilon_decay=0.995, epsilon_min=0.01,
                     target_update_freq=10, replay_buffer_size=10000):
            
            "kw">self.state_dim = state_dim
            "kw">self.action_dim = action_dim
            "kw">self.gamma = gamma
            "kw">self.epsilon = epsilon
            "kw">self.epsilon_decay = epsilon_decay
            "kw">self.epsilon_min = epsilon_min
            "kw">self.target_update_freq = target_update_freq
            
            # 设备
            "kw">self.device = torch.device("cuda" "kw">if torch.cuda.is_available() "kw">else "cpu")
            "kw">print(f"使用设备: {"kw">self.device}")
            
            # Q网络和目标网络
            "kw">self.q_network = QNetwork(state_dim, action_dim, hidden_dim).to("kw">self.device)
            "kw">self.target_network = QNetwork(state_dim, action_dim, hidden_dim).to("kw">self.device)
            "kw">self.target_network.load_state_dict("kw">self.q_network.state_dict())
            
            # 优化器
            "kw">self.optimizer = optim.Adam("kw">self.q_network.parameters(), lr=learning_rate)
            
            # 经验回放缓冲区
            "kw">self.replay_buffer = ReplayBuffer(replay_buffer_size)
            
            # 训练统计
            "kw">self.training_history = []
            "kw">self.update_count = 0
        
        "kw">def select_action("kw">self, state, training="kw">True):
            """ε-greedy策略选择动作"""
            "kw">if training "kw">and np.random.random() < "kw">self.epsilon:
                "kw">class="kw">return np.random.randint("kw">self.action_dim)
            "kw">else:
                "kw">with torch.no_grad():
                    state_tensor = torch.FloatTensor(state).unsqueeze(0).to("kw">self.device)
                    q_values = "kw">self.q_network(state_tensor)
                    "kw">class="kw">return q_values.argmax().item()
        
        "kw">def update("kw">self, batch_size=64):
            """更新Q网络"""
            "kw">if "kw">len("kw">self.replay_buffer) < batch_size:
                "kw">class="kw">return "kw">None
            
            # 采样
            states, actions, rewards, next_states, dones = \
                "kw">self.replay_buffer.sample(batch_size)
            
            states = states.to("kw">self.device)
            actions = actions.to("kw">self.device)
            rewards = rewards.to("kw">self.device)
            next_states = next_states.to("kw">self.device)
            dones = dones.to("kw">self.device)
            
            # 计算当前Q值
            current_q = "kw">self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze()
            
            # 计算目标Q值(使用目标网络)
            "kw">with torch.no_grad():
                max_next_q = "kw">self.target_network(next_states).max(1)[0]
                target_q = rewards + "kw">self.gamma * max_next_q * (1 - dones)
            
            # 计算损失
            loss = nn.functional.mse_loss(current_q, target_q)
            
            # 反向传播
            "kw">self.optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_("kw">self.q_network.parameters(), 1.0)
            "kw">self.optimizer.step()
            
            # 定期更新目标网络
            "kw">self.update_count += 1
            "kw">if "kw">self.update_count % "kw">self.target_update_freq == 0:
                "kw">self.target_network.load_state_dict("kw">self.q_network.state_dict())
            
            "kw">class="kw">return loss.item()
        
        "kw">def decay_epsilon("kw">self):
            """衰减探索率"""
            "kw">self.epsilon = max("kw">self.epsilon_min, "kw">self.epsilon * "kw">self.epsilon_decay)
        
        "kw">def train("kw">self, env, n_episodes=500, max_steps=100, batch_size=64):
            """训练智能体"""
            episode_rewards = []
            losses = []
            
            "kw">for episode "kw">in "kw">range(n_episodes):
                state, _ = env.reset()
                # 将状态转为one-hot或直接使用索引
                state = np.eye(env.observation_space.n)[state]  # one-hot编码
                
                total_reward = 0
                episode_loss = []
                
                "kw">for step "kw">in "kw">range(max_steps):
                    # 选择动作
                    action = "kw">self.select_action(state)
                    
                    # 执行动作
                    next_state, reward, terminated, truncated, _ = env.step(action)
                    next_state_onehot = np.eye(env.observation_space.n)[next_state]
                    
                    done = terminated "kw">or truncated
                    
                    # 存储转移
                    "kw">self.replay_buffer.push(state, action, reward, next_state_onehot, done)
                    
                    # 更新网络
                    loss = "kw">self.update(batch_size)
                    "kw">if loss "kw">is "kw">not "kw">None:
                        episode_loss.append(loss)
                    
                    total_reward += reward
                    state = next_state_onehot
                    
                    "kw">if done:
                        "kw">break
                
                "kw">self.decay_epsilon()
                episode_rewards.append(total_reward)
                losses.append(np.mean(episode_loss) "kw">if episode_loss "kw">else 0)
                
                "kw">if (episode + 1) % 50 == 0:
                    recent_avg = np.mean(episode_rewards[-50:])
                    "kw">print(f"Episode {episode+1}/{n_episodes} | "
                          f"平均奖励: {recent_avg:.2f} | "
                          f"ε: {"kw">self.epsilon:.4f}")
            
            "kw">self.training_history = {
                'rewards': episode_rewards,
                'losses': losses
            }
            
            "kw">class="kw">return episode_rewards
        
        "kw">def evaluate("kw">self, env, n_episodes=10, max_steps=100):
            """评估智能体"""
            episode_rewards = []
            successes = 0
            
            "kw">for episode "kw">in "kw">range(n_episodes):
                state, _ = env.reset()
                state = np.eye(env.observation_space.n)[state]
                total_reward = 0
                
                "kw">for step "kw">in "kw">range(max_steps):
                    action = "kw">self.select_action(state, training="kw">False)
                    next_state, reward, terminated, truncated, _ = env.step(action)
                    next_state = np.eye(env.observation_space.n)[next_state]
                    
                    total_reward += reward
                    state = next_state
                    
                    "kw">if terminated:
                        "kw">if reward > 0:
                            successes += 1
                        "kw">break
                    "kw">if truncated:
                        "kw">break
                
                episode_rewards.append(total_reward)
            
            "kw">class="kw">return np.mean(episode_rewards), successes / n_episodes
    
    
    "kw">def plot_training_history(history):
        """绘制训练曲线"""
        "kw">class="kw">import matplotlib.pyplot "kw">as plt
        
        fig, axes = plt.subplots(1, 2, figsize=(14, 5))
        
        # 奖励曲线
        axes[0].plot(history['rewards'])
        axes[0].set_xlabel('Episode')
        axes[0].set_ylabel('Total Reward')
        axes[0].set_title('Training Rewards')
        axes[0].grid("kw">True)
        
        # 损失曲线
        axes[1].plot(history['losses'])
        axes[1].set_xlabel('Episode')
        axes[1].set_ylabel('Loss')
        axes[1].set_title('Training Loss')
        axes[1].grid("kw">True)
        
        plt.tight_layout()
        plt.savefig('dqn_training.png', dpi=150)
        plt.show()
    
    
    "kw">if __name__ == '__main__':
        "kw">from envs.grid_world "kw">class="kw">import GridWorldEnv
        
        "kw">print("=" * 50)
        "kw">print("深度Q网络 (DQN) 算法测试")
        "kw">print("=" * 50)
        
        # 创建环境
        env = GridWorldEnv(grid_size=10)
        
        # 创建智能体
        state_dim = env.observation_space.n  # 100个状态
        action_dim = env.action_space.n     # 4个动作
        
        agent = DQNAgent(
            state_dim=state_dim,
            action_dim=action_dim,
            hidden_dim=128,
            learning_rate=0.001,
            gamma=0.99,
            epsilon=1.0,
            epsilon_decay=0.99,
            epsilon_min=0.01
        )
        
        # 训练
        "kw">print("\n开始训练...")
        rewards = agent.train(env, n_episodes=500, max_steps=100, batch_size=64)
        
        # 评估
        "kw">print("\n评估训练结果...")
        mean_reward, success_rate = agent.evaluate(env, n_episodes=50)
        "kw">print(f"平均奖励: {mean_reward:.2f}")
        "kw">print(f"成功率: {success_rate:.2%}")
        
        # 绘制训练曲线
        plot_training_history(agent.training_history)
        
        env.close()

    ---


    7. 5. 训练与评估

    5.1 训练脚本

    创建 train.py


    """训练和评估脚本"""
    
    "kw">class="kw">import argparse
    "kw">class="kw">import numpy "kw">as np
    "kw">class="kw">import matplotlib.pyplot "kw">as plt
    "kw">from envs.grid_world "kw">class="kw">import GridWorldEnv
    "kw">from agents.q_learning "kw">class="kw">import QLearningAgent
    "kw">from agents.dqn "kw">class="kw">import DQNAgent
    
    
    "kw">def plot_comparison(q_rewards, dqn_rewards, title="Algorithm Comparison"):
        """绘制算法对比图"""
        plt.figure(figsize=(12, 5))
        
        # 奖励曲线
        plt.subplot(1, 2, 1)
        plt.plot(q_rewards, label='Q-Learning', alpha=0.7)
        plt.plot(dqn_rewards, label='DQN', alpha=0.7)
        plt.xlabel('Episode')
        plt.ylabel('Total Reward')
        plt.title('Training Rewards')
        plt.legend()
        plt.grid("kw">True)
        
        # 移动平均
        window = 20
        plt.subplot(1, 2, 2)
        q_ma = np.convolve(q_rewards, np.ones(window)/window, mode='valid')
        dqn_ma = np.convolve(dqn_rewards, np.ones(window)/window, mode='valid')
        plt.plot(q_ma, label='Q-Learning (MA)', linewidth=2)
        plt.plot(dqn_ma, label='DQN (MA)', linewidth=2)
        plt.xlabel('Episode')
        plt.ylabel('Moving Average Reward')
        plt.title(f'{window}-Episode Moving Average')
        plt.legend()
        plt.grid("kw">True)
        
        plt.tight_layout()
        plt.savefig('algorithm_comparison.png', dpi=150)
        plt.show()
    
    
    "kw">def main():
        parser = argparse.ArgumentParser(description='训练强化学习智能体')
        parser.add_argument('--algorithm', type=str, default='dqn', 
                           choices=['qlearning', 'dqn'],
                           help='选择算法')
        parser.add_argument('--episodes', type=int, default=500,
                           help='训练回合数')
        args = parser.parse_args()
        
        # 创建环境
        env = GridWorldEnv(grid_size=10)
        
        "kw">if args.algorithm == 'qlearning':
            "kw">print("训练 Q-Learning 智能体...")
            agent = QLearningAgent(
                n_states=env.observation_space.n,
                n_actions=env.action_space.n,
                learning_rate=0.1,
                gamma=0.95,
                epsilon=1.0,
                epsilon_decay=0.99,
                epsilon_min=0.01
            )
            rewards = agent.train(env, n_episodes=args.episodes, max_steps=100)
            mean_reward, success_rate = agent.evaluate(env, n_episodes=50)
            
        "kw">else:  # dqn
            "kw">print("训练 DQN 智能体...")
            agent = DQNAgent(
                state_dim=env.observation_space.n,
                action_dim=env.action_space.n,
                hidden_dim=128,
                learning_rate=0.001,
                gamma=0.99,
                epsilon=1.0,
                epsilon_decay=0.99,
                epsilon_min=0.01
            )
            rewards = agent.train(env, n_episodes=args.episodes, max_steps=100, batch_size=64)
            mean_reward, success_rate = agent.evaluate(env, n_episodes=50)
        
        "kw">print(f"\n最终结果:")
        "kw">print(f"  平均奖励: {mean_reward:.2f}")
        "kw">print(f"  成功率: {success_rate:.2%}")
        
        env.close()
    
    
    "kw">if __name__ == '__main__':
        main()

    5.2 运行训练

    # 训练Q-Learning
    python train.py --algorithm qlearning --episodes 500
    
    # 训练DQN
    python train.py --algorithm dqn --episodes 500

    ---


    8. 6. 进阶:PPO算法

    6.1 PPO原理

    PPO (Proximal Policy Optimization) 是一种策略梯度算法,通过限制策略更新幅度来提高训练稳定性。


    核心思想

    L^CLIP(θ) = E[ min(r(θ) * A_t, clip(r(θ), 1-ε, 1+ε) * A_t) ]
    
    其中 r(θ) = π_θ(a|s) / π_θ_old(a|s) 是概率比率

    6.2 使用Stable-Baselines3实现PPO

    创建 train_ppo.py


    """
    使用Stable-Baselines3实现PPO
    """
    
    "kw">class="kw">import gymnasium "kw">as gym
    "kw">from stable_baselines3 "kw">class="kw">import PPO
    "kw">from stable_baselines3.common.env_util "kw">class="kw">import make_vec_env
    "kw">class="kw">import numpy "kw">as np
    
    
    "kw">def main():
        # 创建向量化环境(加速训练)
        env = make_vec_env('CartPole-v1', n_envs=4)
        
        # 创建PPO模型
        model = PPO(
            'MlpPolicy',
            env,
            learning_rate=3e-4,
            n_steps=2048,
            batch_size=64,
            n_epochs=10,
            gamma=0.99,
            gae_lambda=0.95,
            clip_range=0.2,
            ent_coef=0.01,
            verbose=1
        )
        
        # 训练
        "kw">print("训练 PPO 智能体...")
        model.learn(total_timesteps=100000, progress_bar="kw">True)
        
        # 评估
        "kw">print("\n评估训练结果...")
        eval_env = gym.make('CartPole-v1')
        
        obs, _ = eval_env.reset()
        total_reward = 0
        n_episodes = 10
        
        "kw">for episode "kw">in "kw">range(n_episodes):
            episode_reward = 0
            done = "kw">False
            
            "kw">while "kw">not done:
                action, _ = model.predict(obs)
                obs, reward, terminated, truncated, _ = eval_env.step(action)
                episode_reward += reward
                done = terminated "kw">or truncated
                
                "kw">if terminated "kw">or truncated:
                    obs, _ = eval_env.reset()
                    total_reward += episode_reward
                    "kw">print(f"Episode {episode+1}: reward = {episode_reward}")
                    "kw">break
        
        "kw">print(f"\n平均奖励: {total_reward / n_episodes:.2f}")
        
        # 保存模型
        model.save("ppo_cartpole")
        "kw">print("模型已保存到 ppo_cartpole.zip")
    
    
    "kw">if __name__ == '__main__':
        main()

    ---


    9. 7. 可视化与调试

    7.1 创建训练可视化工具

    创建 visualize.py


    """
    训练过程可视化工具
    """
    
    "kw">class="kw">import numpy "kw">as np
    "kw">class="kw">import matplotlib.pyplot "kw">as plt
    "kw">from matplotlib.patches "kw">class="kw">import FancyBboxPatch, Circle, Rectangle
    
    
    "kw">class TrainingVisualizer:
        """训练过程可视化"""
        
        "kw">def __init__("kw">self, grid_size=10):
            "kw">self.grid_size = grid_size
        
        "kw">def visualize_episode("kw">self, episode_data):
            """
            可视化一个回合的轨迹
            
            episode_data: list of (state, action, reward) tuples
            """
            fig, axes = plt.subplots(2, 5, figsize=(15, 6))
            axes = axes.flatten()
            
            # 只显示前10步
            "kw">for i, (state, action, reward) "kw">in enumerate(episode_data[:10]):
                ax = axes[i]
                
                # 计算位置
                row, col = state // "kw">self.grid_size, state % "kw">self.grid_size
                
                # 绘制网格
                ax.set_xlim(-0.5, "kw">self.grid_size - 0.5)
                ax.set_ylim(-0.5, "kw">self.grid_size - 0.5)
                ax.set_aspect('equal')
                
                "kw">for r "kw">in "kw">range("kw">self.grid_size):
                    "kw">for c "kw">in "kw">range("kw">self.grid_size):
                        rect = Rectangle((c, r), 1, 1, fill="kw">False, edgecolor='gray', linewidth=0.5)
                        ax.add_patch(rect)
                
                # 绘制智能体
                agent = Circle((col + 0.5, row + 0.5), 0.3, color='orange')
                ax.add_patch(agent)
                
                # 绘制动作方向
                action_dirs = {0: (0, 0.5), 1: (0, -0.5), 2: (-0.5, 0), 3: (0.5, 0)}
                dx, dy = action_dirs[action]
                ax.annotate('', xy=(col + 0.5 + dx, row + 0.5 + dy),
                           xytext=(col + 0.5, row + 0.5),
                           arrowprops=dict(arrowstyle='->', color='red', lw=2))
                
                ax.set_title(f'Step {i+1}: a={action}, r={reward:.1f}')
                ax.axis('off')
            
            plt.tight_layout()
            plt.savefig('episode_visualization.png', dpi=150)
            plt.show()
        
        "kw">def plot_learning_curve("kw">self, rewards, window=20):
            """绘制学习曲线"""
            fig, axes = plt.subplots(1, 2, figsize=(14, 5))
            
            # 原始奖励
            axes[0].plot(rewards, alpha=0.5)
            axes[0].set_xlabel('Episode')
            axes[0].set_ylabel('Total Reward')
            axes[0].set_title('Training Rewards')
            axes[0].grid("kw">True)
            
            # 移动平均
            ma = np.convolve(rewards, np.ones(window)/window, mode='valid')
            axes[1].plot(ma, color='orange', linewidth=2)
            axes[1].set_xlabel('Episode')
            axes[1].set_ylabel(f'{window}-Episode Moving Average')
            axes[1].set_title('Learning Curve (Smoothed)')
            axes[1].grid("kw">True)
            
            plt.tight_layout()
            plt.savefig('learning_curve.png', dpi=150)
            plt.show()
    
    
    "kw">if __name__ == '__main__':
        # 演示可视化
        "kw">class="kw">import numpy "kw">as np
        
        # 生成模拟数据
        episode_data = []
        state = 0
        "kw">for i "kw">in "kw">range(15):
            action = np.random.randint(4)
            reward = np.random.randn()
            episode_data.append((state, action, reward))
            state = (state + np.random.randint(1, 4)) % 100
        
        visualizer = TrainingVisualizer(grid_size=10)
        visualizer.visualize_episode(episode_data)
        
        # 模拟学习曲线
        rewards = np.cumsum(np.random.randn(500)) + 50
        rewards = rewards - np.arange(500) * 0.1 + np.random.rand(500) * 20
        visualizer.plot_learning_curve(rewards)

    ---


    10. 总结与下一步

    项目总结

    完成本项目后,你将掌握:

  • ✅ 自定义Gymnasium环境的创建
  • ✅ Q-Learning算法的实现和训练
  • ✅ DQN算法的实现和训练
  • ✅ 训练过程的可视化和评估
  • 扩展建议

  • 环境扩展
  • - 添加随机障碍物生成

    - 实现3D环境

    - 添加传感器噪声


  • 算法扩展
  • - 实现Double DQN

    - 实现Dueling DQN

    - 实现PPO算法


  • 应用扩展
  • - 迁移到Gazebo仿真环境

    - 结合视觉输入(CNN)

    - 实现多智能体协作


    参考资源

  • Gymnasium官方文档
  • Stable-Baselines3文档
  • Reinforcement Learning: An Introduction
  • ---


    下一步项目二:机器人抓取仿真