进阶 · 3-4周

项目二:机器人抓取仿真

使用PyBullet物理引擎构建完整抓取系统,包括抓取检测、视觉伺服和PPO策略训练

进阶难度 预计3-4周 9个章节

1. 项目概述

本项目将实现一个完整的机器人抓取系统,包括:

  • 使用PyBullet或Gazebo进行物理仿真
  • 实现基于视觉的抓取检测网络
  • 训练深度强化学习策略进行抓取
  • 实现Sim-to-Real的基本概念
  • 难度:⭐⭐⭐⭐

    预计时间:3-4周

    前置知识:Python、深度学习、ROS基础


    ---


    2. 目录

  • 项目环境搭建
  • PyBullet物理仿真基础
  • 抓取检测网络实现
  • 视觉伺服控制
  • 强化学习抓取策略
  • Sim-to-Real迁移
  • ---


    3. 1. 项目环境搭建

    1.1 安装依赖

    创建 projects/robot_grasping/requirements.txt


    # 物理仿真
    pybullet>=3.2.0
    
    # 深度学习
    torch>=1.10.0
    torchvision>=0.11.0
    
    # 视觉
    opencv-python>=4.5.0
    pillow>=8.3.0
    
    # 工具库
    numpy>=1.21.0
    matplotlib>=3.4.0
    scipy>=1.7.0
    pyyaml>=6.0

    安装:


    cd projects/robot_grasping
    pip install -r requirements.txt

    1.2 项目目录结构

    robot_grasping/
    ├── envs/
    │   ├── __init__.py
    │   └── grasping_env.py      # 抓取仿真环境
    ├── models/
    │   ├── __init__.py
    │   ├── graspnet.py          # 抓取检测网络
    │   └── policy.py            # 抓取策略网络
    ├── utils/
    │   ├── __init__.py
    │   └── transforms.py        # 坐标变换工具
    ├── train_graspnet.py         # 训练抓取网络
    ├── train_rl.py              # 训练RL策略
    ├── evaluate.py              # 评估
    └── configs/
        └── default.yaml          # 配置文件

    ---


    4. 2. PyBullet物理仿真基础

    2.1 PyBullet简介

    PyBullet是一个Python接口,用于Bullet物理引擎。它提供:

  • 刚体动力学仿真
  • 碰撞检测
  • 正逆运动学
  • URDF/SDF模型加载
  • 2.2 创建抓取仿真环境

    创建 envs/grasping_env.py


    """
    机器人抓取仿真环境
    使用PyBullet实现物理仿真
    """
    
    "kw">class="kw">import pybullet "kw">as p
    "kw">class="kw">import pybullet_data
    "kw">class="kw">import numpy "kw">as np
    "kw">from gymnasium "kw">class="kw">import spaces
    "kw">class="kw">import os
    
    
    "kw">class RobotGraspingEnv:
        """
        机器人抓取仿真环境
        
        包含:
        - 机械臂模型
        - 目标物体
        - 相机系统
        """
        
        "kw">def __init__("kw">self, render="kw">False, gui="kw">True):
            """
            初始化仿真环境
            
            参数:
                render: 是否渲染
                gui: 是否使用GUI界面
            """
            "kw">self.render = render
            "kw">self.gui = gui
            
            # 连接物理引擎
            "kw">if gui:
                "kw">self.physics_client = p.connect(p.GUI)
            "kw">else:
                "kw">self.physics_client = p.connect(p.DIRECT)
            
            # 设置搜索路径
            p.setAdditionalSearchPath(pybullet_data.getDataPath())
            
            # 仿真参数
            "kw">self.time_step = 1.0 / 240.0  # 240Hz
            "kw">self.max_iterations = 500
            
            # 初始化环境
            "kw">self._setup_world()
            "kw">self._setup_robot()
            "kw">self._setup_objects()
            "kw">self._setup_camera()
            
            # 状态和动作空间
            "kw">self.observation_space = spaces.Box(
                low=0, high=255, shape=(224, 224, 3), dtype=np.uint8
            )
            "kw">self.action_space = spaces.Box(
                low=-1, high=1, shape=(7,), dtype=np.float32
            )
        
        "kw">def _setup_world("kw">self):
            """设置物理世界"""
            # 重力
            p.setGravity(0, 0, -9.81)
            
            # 时间步
            p.setTimeStep("kw">self.time_step)
            
            # 渲染选项
            p.configureDebugVisualizer(p.COV_ENABLE_TINY_RENDERER, 0)
            p.configureDebugVisualizer(p.COV_ENABLE_GUI, 0)
            
            # 添加地面
            plane_id = p.loadURDF("plane.urdf")
            p.changeVisualShape(plane_id, -1, rgbaColor=[0.5, 0.5, 0.5, 1])
        
        "kw">def _setup_robot("kw">self):
            """设置机械臂"""
            # 加载Franka Panda机械臂
            robot_path = os.path.join(
                pybullet_data.getDataPath(), 
                "franka_panda/panda.urdf"
            )
            
            "kw">self.robot_id = p.loadURDF(
                robot_path,
                basePosition=[0, 0, 0],
                baseOrientation=[0, 0, 0, 1],
                useMaximalCoordinates="kw">False
            )
            
            # 机械臂配置
            "kw">self.num_joints = p.getNumJoints("kw">self.robot_id)
            
            # 获取关节信息
            "kw">self.joint_indices = []
            "kw">self.joint_limits = []
            
            "kw">for i "kw">in "kw">range("kw">self.num_joints):
                joint_info = p.getJointInfo("kw">self.robot_id, i)
                "kw">if joint_info[2] != p.JOINT_FIXED:
                    "kw">self.joint_indices.append(i)
                    lower = joint_info[8]
                    upper = joint_info[9]
                    "kw">self.joint_limits.append((lower, upper))
            
            # 设置控制模式
            p.setJointMotorControlArray(
                "kw">self.robot_id,
                "kw">self.joint_indices,
                p.POSITION_CONTROL,
                forces=[500] * "kw">len("kw">self.joint_indices)
            )
            
            # 末端执行器索引(最后一个关节)
            "kw">self.end_effector_index = 11  # Panda的末端
        
        "kw">def _setup_objects("kw">self):
            """设置目标物体"""
            # 随机生成物体位置
            "kw">self.object_ids = []
            
            # 添加一个立方体作为抓取目标
            object_position = [
                0.4 + np.random.uniform(-0.1, 0.1),
                0.0 + np.random.uniform(-0.1, 0.1),
                0.05
            ]
            
            object_orientation = p.getQuaternionFromEuler([
                0, 0, np.random.uniform(0, np.pi)
            ])
            
            collision_shape = p.createCollisionShape(
                p.GEOM_BOX,
                halfExtents=[0.03, 0.03, 0.03]
            )
            
            visual_shape = p.createVisualShape(
                p.GEOM_BOX,
                halfExtents=[0.03, 0.03, 0.03],
                rgbaColor=[1, 0.2, 0.2, 1]
            )
            
            "kw">self.object_id = p.createMultiBody(
                baseMass=0.1,
                baseCollisionShapeIndex=collision_shape,
                baseVisualShapeIndex=visual_shape,
                basePosition=object_position,
                baseOrientation=object_orientation
            )
            
            "kw">self.object_ids.append("kw">self.object_id)
        
        "kw">def _setup_camera("kw">self):
            """设置相机参数"""
            "kw">self.camera_config = {
                'width': 224,
                'height': 224,
                'fov': 60,
                'near': 0.01,
                'far': 10.0,
                'eye_position': [0.5, 0, 0.8],
                'target_position': [0.3, 0, 0],
                'up_vector': [0, 0, 1]
            }
        
        "kw">def reset("kw">self):
            """重置环境"""
            # 重置机械臂到初始位置
            initial_positions = [0, -0.785, 0, -2.356, 0, 1.571, 0.785, 0, 0]
            "kw">for i, pos "kw">in enumerate(initial_positions[:"kw">len("kw">self.joint_indices)]):
                p.resetJointState("kw">self.robot_id, "kw">self.joint_indices[i], pos)
            
            # 移除旧物体
            "kw">for obj_id "kw">in "kw">self.object_ids:
                p.removeBody(obj_id)
            
            # 添加新物体
            "kw">self._setup_objects()
            
            # 执行几步仿真稳定
            "kw">for _ "kw">in "kw">range(100):
                p.stepSimulation()
            
            # 获取观察
            observation = "kw">self._get_observation()
            
            "kw">class="kw">return observation, {}
        
        "kw">def step("kw">self, action):
            """执行一步仿真"""
            # 解码动作
            target_positions = "kw">self._decode_action(action)
            
            # 控制机械臂
            p.setJointMotorControlArray(
                "kw">self.robot_id,
                "kw">self.joint_indices[:7],  # 前7个关节
                p.POSITION_CONTROL,
                targetPositions=target_positions,
                forces=[500] * 7
            )
            
            # 仿真一步
            "kw">for _ "kw">in "kw">range(10):  # 10个子步骤
                p.stepSimulation()
            
            # 获取观察和奖励
            observation = "kw">self._get_observation()
            reward = "kw">self._calculate_reward()
            
            # 检查完成
            done = "kw">self._check_done()
            
            info = {
                'object_position': "kw">self._get_object_position(),
                'end_effector_position': "kw">self._get_end_effector_position()
            }
            
            "kw">class="kw">return observation, reward, done, "kw">False, info
        
        "kw">def _decode_action("kw">self, action):
            """将动作向量解码为关节目标位置"""
            # 动作 = 目标关节角度(相对于当前)
            current_positions = []
            "kw">for joint_idx "kw">in "kw">self.joint_indices[:7]:
                state = p.getJointState("kw">self.robot_id, joint_idx)
                current_positions.append(state[0])
            
            target_positions = np.array(current_positions) + np.array(action[:7]) * 0.1
            "kw">class="kw">return target_positions.tolist()
        
        "kw">def _get_observation("kw">self):
            """获取RGB图像观察"""
            # 计算相机视图矩阵
            view_matrix = p.computeViewMatrixFromYawPitchRoll(
                cameraTargetPosition="kw">self.camera_config['target_position'],
                distance="kw">self.camera_config['width'] / 2,
                yaw=45,
                pitch=-30,
                roll=0,
                upAxisIndex=2
            )
            
            # 计算投影矩阵
            proj_matrix = p.computeProjectionMatrixFOV(
                fov="kw">self.camera_config['fov'],
                aspect="kw">self.camera_config['width'] / "kw">self.camera_config['height'],
                nearVal="kw">self.camera_config['near'],
                farVal="kw">self.camera_config['far']
            )
            
            # 渲染图像
            (_, _, img, _, _) = p.getCameraImage(
                width="kw">self.camera_config['width'],
                height="kw">self.camera_config['height'],
                viewMatrix=view_matrix,
                projectionMatrix=proj_matrix,
                renderer=p.ER_BULLET_HARDWARE_OPENGL
            )
            
            # 处理图像(RGBA转RGB)
            img = np.array(img, dtype=np.uint8)
            img = img[:, :, :3]  # 去掉alpha通道
            
            "kw">class="kw">return img
        
        "kw">def _calculate_reward("kw">self):
            """计算奖励"""
            object_pos = "kw">self._get_object_position()
            ee_pos = "kw">self._get_end_effector_position()
            
            # 抓取判定:末端执行器靠近物体
            distance = np.linalg.norm(np.array(object_pos) - np.array(ee_pos))
            
            reward = 0
            
            "kw">if distance < 0.05:  # 抓取范围
                reward += 5.0
            
            # 惩罚距离
            reward -= distance * 0.5
            
            # 物体高度奖励
            "kw">if object_pos[2] > 0.1:  # 举起物体
                reward += object_pos[2] * 10
            
            "kw">class="kw">return reward
        
        "kw">def _check_done("kw">self):
            """检查是否完成"""
            object_pos = "kw">self._get_object_position()
            
            # 检查物体是否被举起
            "kw">if object_pos[2] > 0.15:
                "kw">class="kw">return "kw">True
            
            "kw">class="kw">return "kw">False
        
        "kw">def _get_object_position("kw">self):
            """获取物体位置"""
            pos, orn = p.getBasePositionAndOrientation("kw">self.object_id)
            "kw">class="kw">return pos
        
        "kw">def _get_end_effector_position("kw">self):
            """获取末端执行器位置"""
            link_state = p.getLinkState("kw">self.robot_id, "kw">self.end_effector_index)
            "kw">class="kw">return link_state[0]
        
        "kw">def get_gripper_width("kw">self):
            """获取夹爪开度"""
            # Panda的夹爪是最后一个关节
            gripper_joint = "kw">self.joint_indices[-1]
            state = p.getJointState("kw">self.robot_id, gripper_joint)
            "kw">class="kw">return state[0]
        
        "kw">def close("kw">self):
            """关闭环境"""
            p.disconnect()
        
        "kw">def render("kw">self):
            """渲染当前状态"""
            "kw">class="kw">return "kw">self._get_observation()
    
    
    "kw">def test_environment():
        """测试环境"""
        "kw">print("创建抓取仿真环境...")
        env = RobotGraspingEnv(render="kw">True, gui="kw">True)
        
        "kw">print("测试重置...")
        obs, _ = env.reset()
        "kw">print(f"观察形状: {obs.shape}")
        
        "kw">print("测试执行动作...")
        "kw">for i "kw">in "kw">range(20):
            action = np.random.uniform(-0.5, 0.5, 7)
            obs, reward, done, _, info = env.step(action)
            
            "kw">if i % 5 == 0:
                "kw">print(f"步骤{i}: reward={reward:.3f}, done={done}")
            
            "kw">if done:
                "kw">print("任务完成!")
                "kw">break
        
        env.close()
        "kw">print("环境测试完成!")
    
    
    "kw">if __name__ == '__main__':
        test_environment()

    2.3 运行测试

    python -m envs.grasping_env

    ---


    5. 3. 抓取检测网络实现

    3.1 GraspNet架构

    抓取检测网络预测每个像素位置的最佳抓取参数:

  • 抓取质量分数
  • 抓取角度(θ)
  • 抓取宽度(w)
  • 创建 models/graspnet.py


    """
    抓取检测网络 (GraspNet)
    基于RGB-D图像预测抓取参数
    """
    
    "kw">class="kw">import torch
    "kw">class="kw">import torch.nn "kw">as nn
    "kw">class="kw">import torch.nn.functional "kw">as F
    
    
    "kw">class GraspNet(nn.Module):
        """
        抓取检测网络
        
        输入: RGB-D图像
        输出: 
            - quality: 抓取质量分数 (H, W, 1)
            - angle: 抓取角度 (H, W, 18) - 0-180度,每10度一个通道
            - width: 抓取宽度 (H, W, 1)
        """
        
        "kw">def __init__("kw">self, input_channels=4, num_angle_bins=18):
            super().__init__()
            
            "kw">self.num_angle_bins = num_angle_bins
            
            # 编码器(ResNet风格)
            "kw">self.encoder = nn.Sequential(
                # Block 1
                "kw">self._conv_block(input_channels, 32, kernel_size=3, stride=2),
                "kw">self._conv_block(32, 32, kernel_size=3, stride=1),
                
                # Block 2
                "kw">self._conv_block(32, 64, kernel_size=3, stride=2),
                "kw">self._conv_block(64, 64, kernel_size=3, stride=1),
                
                # Block 3
                "kw">self._conv_block(64, 128, kernel_size=3, stride=2),
                "kw">self._conv_block(128, 128, kernel_size=3, stride=1),
                
                # Block 4
                "kw">self._conv_block(128, 256, kernel_size=3, stride=2),
                "kw">self._conv_block(256, 256, kernel_size=3, stride=1),
            )
            
            # 解码器
            "kw">self.decoder = nn.Sequential(
                # Decoder Block 1
                nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1),
                nn.BatchNorm2d(128),
                nn.ReLU(inplace="kw">True),
                
                # Decoder Block 2
                nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
                nn.BatchNorm2d(64),
                nn.ReLU(inplace="kw">True),
                
                # Decoder Block 3
                nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1),
                nn.BatchNorm2d(32),
                nn.ReLU(inplace="kw">True),
                
                # Decoder Block 4
                nn.ConvTranspose2d(32, 32, kernel_size=4, stride=2, padding=1),
                nn.BatchNorm2d(32),
                nn.ReLU(inplace="kw">True),
            )
            
            # 预测头
            "kw">self.quality_head = nn.Conv2d(32, 1, kernel_size=1)
            "kw">self.angle_head = nn.Conv2d(32, num_angle_bins, kernel_size=1)
            "kw">self.width_head = nn.Conv2d(32, 1, kernel_size=1)
        
        "kw">def _conv_block("kw">self, in_channels, out_channels, kernel_size=3, stride=1):
            """卷积块"""
            padding = kernel_size // 2
            "kw">class="kw">return nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(inplace="kw">True),
                nn.Conv2d(out_channels, out_channels, kernel_size, 1, padding),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(inplace="kw">True)
            )
        
        "kw">def forward("kw">self, x):
            """
            前向传播
            
            参数:
                x: 输入张量 (B, C, H, W)
                   C = 4 (RGB + Depth)
            
            返回:
                quality: 抓取质量 (B, 1, H, W)
                angle: 抓取角度 (B, 18, H, W)
                width: 抓取宽度 (B, 1, H, W)
            """
            # 编码
            features = "kw">self.encoder(x)
            
            # 解码
            decoded = "kw">self.decoder(features)
            
            # 预测
            quality = torch.sigmoid("kw">self.quality_head(decoded))
            angle = "kw">self.angle_head(decoded)  # softmax在损失函数中
            width = torch.sigmoid("kw">self.width_head(decoded)) * 0.15  # 最大宽度15cm
            
            "kw">class="kw">return quality, angle, width
        
        "kw">def predict_grasp("kw">self, x):
            """
            预测最佳抓取
            
            参数:
                x: 输入图像 (1, C, H, W)
            
            返回:
                best_grasp: (x, y, theta, width) 归一化到[0,1]
            """
            "kw">self.eval()
            "kw">with torch.no_grad():
                quality, angle, width = "kw">self.forward(x)
            
            # 找到最高质量的位置
            quality = quality.squeeze().cpu().numpy()
            angle_map = angle.squeeze().cpu().numpy()
            width_map = width.squeeze().cpu().numpy()
            
            # 质量最大的位置
            idx = np.argmax(quality)
            y, x = idx // quality.shape[1], idx % quality.shape[1]
            
            # 角度
            angle_probs = angle_map[:, y, x]
            angle_idx = np.argmax(angle_probs)
            theta = angle_idx * (180 / "kw">self.num_angle_bins)
            
            # 宽度
            w = width_map[0, y, x]
            
            "kw">class="kw">return {
                'x': x / quality.shape[1],
                'y': y / quality.shape[0],
                'theta': theta / 180,
                'width': w
            }
    
    
    "kw">class GraspLoss(nn.Module):
        """抓取检测损失函数"""
        
        "kw">def __init__("kw">self, angle_weight=1.0, width_weight=0.5):
            super().__init__()
            "kw">self.angle_weight = angle_weight
            "kw">self.width_weight = width_weight
        
        "kw">def forward("kw">self, pred_quality, pred_angle, pred_width, 
                    target_quality, target_angle, target_width, target_mask):
            """
            计算损失
            
            参数:
                pred_*: 预测值
                target_*: 目标值
                target_mask: 有效样本掩码
            """
            # 质量损失(二元交叉熵)
            quality_loss = F.binary_cross_entropy(
                pred_quality, target_quality, reduction='none'
            )
            quality_loss = (quality_loss * target_mask).sum() / (target_mask.sum() + 1e-6)
            
            # 角度损失(交叉熵)
            angle_loss = F.cross_entropy(
                pred_angle, target_angle.long(), reduction='none'
            )
            angle_loss = (angle_loss * target_mask.squeeze(1)).sum() / (target_mask.sum() + 1e-6)
            
            # 宽度损失(L1)
            width_loss = F.l1_loss(
                pred_width, target_width, reduction='none'
            )
            width_loss = (width_loss * target_mask).sum() / (target_mask.sum() + 1e-6)
            
            # 总损失
            total_loss = quality_loss + "kw">self.angle_weight * angle_loss + "kw">self.width_weight * width_loss
            
            "kw">class="kw">return total_loss, {
                'quality_loss': quality_loss.item(),
                'angle_loss': angle_loss.item(),
                'width_loss': width_loss.item()
            }
    
    
    "kw">def create_model(input_channels=4, num_angle_bins=18):
        """创建抓取网络模型"""
        model = GraspNet(input_channels=input_channels, num_angle_bins=num_angle_bins)
        "kw">class="kw">return model
    
    
    # 测试模型
    "kw">if __name__ == '__main__':
        model = create_model()
        "kw">print(f"模型参数数量: {sum(p.numel() "kw">for p "kw">in model.parameters()):,}")
        
        # 测试前向传播
        x = torch.randn(2, 4, 224, 224)
        quality, angle, width = model(x)
        
        "kw">print(f"输入: {x.shape}")
        "kw">print(f"质量: {quality.shape}")
        "kw">print(f"角度: {angle.shape}")
        "kw">print(f"宽度: {width.shape}")

    ---


    6. 4. 视觉伺服控制

    4.1 视觉伺服基础

    视觉伺服(Visual Servoing)使用视觉反馈控制机器人运动。


    两种控制策略

  • 位置-based (PBVS):估计目标3D位置,计算期望关节角度
  • 图像-based (IBVS):直接在图像空间控制
  • 创建 utils/visual_servo.py


    """
    视觉伺服控制器
    """
    
    "kw">class="kw">import numpy "kw">as np
    "kw">class="kw">import torch
    "kw">from scipy.spatial.transform "kw">class="kw">import Rotation "kw">as R
    
    
    "kw">class ImageBasedVisualServo:
        """
        基于图像的视觉伺服控制器
        
        控制末端执行器跟踪图像中的目标点
        """
        
        "kw">def __init__("kw">self, camera_config, lambda_=0.5):
            """
            初始化视觉伺服控制器
            
            参数:
                camera_config: 相机内参和配置
                lambda_: 控制增益
            """
            "kw">self.K = camera_config['intrinsics']  # 相机内参矩阵
            "kw">self.lambda_ = lambda_
            
            # 交互矩阵参数
            "kw">self.Z = 0.5  # 假设深度
        
        "kw">def compute_interaction_matrix("kw">self, point, Z="kw">None):
            """
            计算交互矩阵(2×6)
            
            参数:
                point: 图像点 (u, v)
                Z: 点的深度
            
            返回:
                L: 交互矩阵
            """
            "kw">if Z "kw">is "kw">None:
                Z = "kw">self.Z
            
            u, v = point
            
            L = np.array([
                [-1/Z, 0, u/Z, u*v, -(1+u**2), v],
                [0, -1/Z, v/Z, 1+v**2, -u*v, -u]
            ])
            
            "kw">class="kw">return L
        
        "kw">def compute_velocity("kw">self, current_points, desired_points, Z="kw">None):
            """
            计算末端执行器速度
            
            参数:
                current_points: 当前图像点
                desired_points: 期望图像点
                Z: 深度
            
            返回:
                v: 线速度和角速度 (6,)
            """
            # 计算图像误差
            error = np.array(current_points) - np.array(desired_points)
            error = error.flatten()
            
            # 堆叠交互矩阵
            L_total = []
            "kw">for cp "kw">in current_points:
                L_total.append("kw">self.compute_interaction_matrix(cp, Z))
            L_total = np.vstack(L_total)
            
            # 计算伪逆
            "kw">try:
                L_pinv = np.linalg.pinv(L_total)
                v = -"kw">self.lambda_ * L_pinv @ error
            "kw">class="kw">except:
                v = np.zeros(6)
            
            "kw">class="kw">return v
        
        "kw">def compute_joint_velocities("kw">self, end_effector_pose, velocity, jacobian):
            """
            将末端速度转换为关节速度
            
            参数:
                end_effector_pose: 末端执行器位姿 (x, y, z, roll, pitch, yaw)
                velocity: 末端速度 (vx, vy, vz, wx, wy, wz)
                jacobian: 雅可比矩阵
            
            返回:
                q_dot: 关节速度
            """
            # 速度到齐次变换
            T = "kw">self.pose_to_matrix(end_effector_pose)
            
            # 提取旋转部分
            R_ee = T[:3, :3]
            
            # 速度转换(从末端坐标系到基座坐标系)
            v_linear = R_ee @ velocity[:3]
            v_angular = R_ee @ velocity[3:]
            
            # 组合速度
            v_cartesian = np.concatenate([v_linear, v_angular])
            
            # 计算关节速度
            q_dot = np.linalg.pinv(jacobian) @ v_cartesian
            
            "kw">class="kw">return q_dot
        
        @staticmethod
        "kw">def pose_to_matrix(pose):
            """将位姿转为齐次变换矩阵"""
            position = pose[:3]
            euler = pose[3:]
            
            rotation = R.from_euler('xyz', euler)
            R_matrix = rotation.as_matrix()
            
            T = np.eye(4)
            T[:3, :3] = R_matrix
            T[:3, 3] = position
            
            "kw">class="kw">return T
        
        "kw">def pixel_to_3d("kw">self, u, v, Z="kw">None):
            """
            将像素坐标转换为3D点
            
            参数:
                u, v: 像素坐标
                Z: 深度
            
            返回:
                point_3d: 3D点 (x, y, z)
            """
            "kw">if Z "kw">is "kw">None:
                Z = "kw">self.Z
            
            fx, fy = "kw">self.K[0, 0], "kw">self.K[1, 1]
            cx, cy = "kw">self.K[0, 2], "kw">self.K[1, 2]
            
            x = (u - cx) * Z / fx
            y = (v - cy) * Z / fy
            
            "kw">class="kw">return np.array([x, y, Z])
    
    
    "kw">class PositionBasedVisualServo:
        """
        基于位置的视觉伺服控制器
        
        估计目标3D位置,控制机械臂到达
        """
        
        "kw">def __init__("kw">self, camera_config, lambda_=0.5):
            "kw">self.K = camera_config['intrinsics']
            "kw">self.lambda_ = lambda_
        
        "kw">def estimate_3d_position("kw">self, point, depth_map):
            """
            从深度图估计3D位置
            
            参数:
                point: 图像点 (u, v)
                depth_map: 深度图
            
            返回:
                position_3d: 3D位置
            """
            u, v = int(point[0]), int(point[1])
            
            "kw">if v < depth_map.shape[0] "kw">and u < depth_map.shape[1]:
                Z = depth_map[v, u]
            "kw">else:
                Z = 0.5  # 默认深度
            
            fx, fy = "kw">self.K[0, 0], "kw">self.K[1, 1]
            cx, cy = "kw">self.K[0, 2], "kw">self.K[1, 2]
            
            x = (u - cx) * Z / fx
            y = (v - cy) * Z / fy
            
            "kw">class="kw">return np.array([x, y, Z])
        
        "kw">def compute_end_effector_velocity("kw">self, current_pos, target_pos):
            """
            计算末端执行器速度
            
            参数:
                current_pos: 当前末端位置
                target_pos: 目标位置
            
            返回:
                velocity: 末端速度 (vx, vy, vz, wx, wy, wz)
            """
            # 位置误差
            position_error = np.array(target_pos) - np.array(current_pos)
            
            # 线速度(比例控制)
            v_linear = "kw">self.lambda_ * position_error
            
            # 角速度(设为0,简化为纯位置控制)
            v_angular = np.zeros(3)
            
            "kw">class="kw">return np.concatenate([v_linear, v_angular])
    
    
    "kw">class HybridVisualServo:
        """
        混合视觉伺服控制器
        结合位置和图像控制
        """
        
        "kw">def __init__("kw">self, camera_config, lambda_=0.5, alpha=0.5):
            "kw">self.position_controller = PositionBasedVisualServo(camera_config, lambda_)
            "kw">self.image_controller = ImageBasedVisualServo(camera_config, lambda_)
            "kw">self.alpha = alpha  # 混合权重
        
        "kw">def compute_velocity("kw">self, current_pos, current_pixels, 
                            target_pos, target_pixels, depth_map):
            """
            计算混合速度
            
            参数:
                current_pos: 当前末端位置
                current_pixels: 当前图像点
                target_pos: 目标位置
                target_pixels: 目标图像点
                depth_map: 深度图
            """
            # 位置控制
            v_position = "kw">self.position_controller.compute_end_effector_velocity(
                current_pos, target_pos
            )
            
            # 图像控制
            target_3d = "kw">self.position_controller.estimate_3d_position(target_pixels, depth_map)
            current_3d = "kw">self.position_controller.estimate_3d_position(current_pixels, depth_map)
            
            v_image = "kw">self.image_controller.compute_velocity(
                [current_pixels], [target_pixels], 
                Z=(target_3d[2] + current_3d[2]) / 2
            )
            
            # 混合
            v_total = "kw">self.alpha * v_position + (1 - "kw">self.alpha) * np.append(v_image, np.zeros(3))
            
            "kw">class="kw">return v_total

    ---


    7. 5. 强化学习抓取策略

    5.1 抓取任务建模

    状态:RGB-D图像 + 机械臂关节状态

    动作:末端执行器位姿变化

    奖励:成功抓取正奖励,距离惩罚


    5.2 PPO抓取策略

    创建 models/grasp_policy.py


    """
    抓取强化学习策略
    使用PPO算法训练
    """
    
    "kw">class="kw">import torch
    "kw">class="kw">import torch.nn "kw">as nn
    "kw">class="kw">import torch.nn.functional "kw">as F
    "kw">from torch.distributions "kw">class="kw">import Normal
    
    
    "kw">class GraspPolicyNetwork(nn.Module):
        """
        抓取策略网络
        
        输入: 
            - 图像特征 (来自CNN)
            - 机械臂状态
        输出:
            - 动作均值和方差
        """
        
        "kw">def __init__("kw">self, image_feature_dim=512, state_dim=14, action_dim=7, hidden_dim=256):
            super().__init__()
            
            "kw">self.action_dim = action_dim
            
            # 图像特征处理
            "kw">self.image_conv = nn.Sequential(
                nn.Conv2d(3, 32, 3, stride=2),
                nn.ReLU(),
                nn.Conv2d(32, 64, 3, stride=2),
                nn.ReLU(),
                nn.Conv2d(64, 128, 3, stride=2),
                nn.ReLU(),
                nn.AdaptiveAvgPool2d((7, 7)),
                nn.Flatten()
            )
            
            # 特征融合
            "kw">self.fusion = nn.Sequential(
                nn.Linear(128 * 7 * 7 + state_dim, hidden_dim),
                nn.ReLU(),
                nn.Linear(hidden_dim, hidden_dim),
                nn.ReLU()
            )
            
            # 动作均值
            "kw">self.mean_head = nn.Linear(hidden_dim, action_dim)
            
            # 动作方差(可学习或固定)
            "kw">self.log_std = nn.Parameter(torch.zeros(action_dim))
        
        "kw">def forward("kw">self, image, state):
            """
            前向传播
            
            参数:
                image: 图像 (B, 3, 224, 224)
                state: 机械臂状态 (B, 14) = 7关节位置 + 7关节速度
            
            返回:
                action_mean: 动作均值
                action_std: 动作标准差
            """
            # 图像特征
            image_features = "kw">self.image_conv(image)
            
            # 融合
            combined = torch.cat([image_features, state], dim=-1)
            fused = "kw">self.fusion(combined)
            
            # 动作均值
            action_mean = "kw">self.mean_head(fused)
            
            # 动作标准差
            action_std = torch.exp("kw">self.log_std).expand_as(action_mean)
            
            "kw">class="kw">return action_mean, action_std
        
        "kw">def sample_action("kw">self, image, state, deterministic="kw">False):
            """
            采样动作
            
            参数:
                deterministic: 是否使用确定性策略
            """
            action_mean, action_std = "kw">self.forward(image, state)
            
            "kw">if deterministic:
                "kw">class="kw">return action_mean
            
            dist = Normal(action_mean, action_std)
            action = dist.sample()
            action = torch.tanh(action)  # 限制到[-1, 1]
            
            "kw">class="kw">return action
        
        "kw">def get_log_prob("kw">self, image, state, action):
            """计算动作的对数概率"""
            action_mean, action_std = "kw">self.forward(image, state)
            
            dist = Normal(action_mean, action_std)
            log_prob = dist.log_prob(action)
            
            "kw">class="kw">return log_prob.sum(dim=-1)
    
    
    "kw">class GraspPPO:
        """
        抓取PPO算法实现
        """
        
        "kw">def __init__("kw">self, image_feature_dim, state_dim, action_dim,
                     lr=3e-4, gamma=0.99, lamda=0.95, clip_epsilon=0.2,
                     value_coef=0.5, entropy_coef=0.01):
            "kw">self.gamma = gamma
            "kw">self.lamda = lamda
            "kw">self.clip_epsilon = clip_epsilon
            "kw">self.value_coef = value_coef
            "kw">self.entropy_coef = entropy_coef
            
            # 网络
            "kw">self.policy = GraspPolicyNetwork(image_feature_dim, state_dim, action_dim)
            "kw">self.value_network = nn.Sequential(
                nn.Linear(128 * 7 * 7 + state_dim, 256),
                nn.ReLU(),
                nn.Linear(256, 1)
            )
            
            # 优化器
            "kw">self.optimizer = torch.optim.Adam(
                list("kw">self.policy.parameters()) + list("kw">self.value_network.parameters()),
                lr=lr
            )
        
        "kw">def compute_gae("kw">self, rewards, values, dones, next_value):
            """计算GAE"""
            advantages = []
            gae = 0
            
            next_value = torch.tensor(next_value) "kw">if "kw">not isinstance(next_value, torch.Tensor) "kw">else next_value
            
            "kw">for t "kw">in reversed("kw">range("kw">len(rewards))):
                "kw">if t == "kw">len(rewards) - 1:
                    next_val = next_value
                "kw">else:
                    next_val = values[t + 1]
                
                delta = rewards[t] + "kw">self.gamma * next_val * (1 - dones[t]) - values[t]
                gae = delta + "kw">self.gamma * "kw">self.lamda * (1 - dones[t]) * gae
                advantages.insert(0, gae)
            
            returns = [adv + val "kw">for adv, val "kw">in zip(advantages, values)]
            
            "kw">class="kw">return advantages, returns
        
        "kw">def update("kw">self, batch):
            """更新策略"""
            states, actions, old_log_probs, advantages, returns = batch
            
            # 新策略
            new_log_probs = "kw">self.policy.get_log_prob(states, actions)
            entropy = "kw">self.policy.get_log_std.mean()
            
            # PPO损失
            ratio = torch.exp(new_log_probs - old_log_probs)
            surr1 = ratio * advantages
            surr2 = torch.clamp(ratio, 1 - "kw">self.clip_epsilon, 1 + "kw">self.clip_epsilon) * advantages
            
            policy_loss = -torch.min(surr1, surr2).mean()
            entropy_loss = -entropy * "kw">self.entropy_coef
            
            # 价值损失
            values = "kw">self.value_network(states).squeeze()
            value_loss = F.mse_loss(values, returns)
            
            # 总损失
            total_loss = policy_loss + entropy_loss + "kw">self.value_coef * value_loss
            
            # 反向传播
            "kw">self.optimizer.zero_grad()
            total_loss.backward()
            torch.nn.utils.clip_grad_norm_("kw">self.policy.parameters(), 0.5)
            "kw">self.optimizer.step()
            
            "kw">class="kw">return {
                'policy_loss': policy_loss.item(),
                'value_loss': value_loss.item(),
                'entropy': entropy.item()
            }
        
        "kw">def save("kw">self, path):
            """保存模型"""
            torch.save({
                'policy': "kw">self.policy.state_dict(),
                'value': "kw">self.value_network.state_dict()
            }, path)
        
        "kw">def load("kw">self, path):
            """加载模型"""
            checkpoint = torch.load(path)
            "kw">self.policy.load_state_dict(checkpoint['policy'])
            "kw">self.value_network.load_state_dict(checkpoint[&#x27;value'])

    ---


    8. 6. Sim-to-Real迁移

    6.1 Sim-to-Real挑战

    仿真到真实的迁移面临以下挑战:

  • 物理参数差异:摩擦力、惯性、质量
  • 视觉差异:纹理、光照、噪声
  • 控制延迟:仿真无延迟,真实系统有延迟
  • 6.2 域随机化

    创建 utils/domain_randomization.py


    """
    域随机化工具
    增加仿真多样性以提高迁移成功率
    """
    
    "kw">class="kw">import numpy "kw">as np
    "kw">class="kw">import pybullet "kw">as p
    
    
    "kw">class DomainRandomizer:
        """
        域随机化
        
        随机化仿真参数以提高泛化能力
        """
        
        "kw">def __init__("kw">self, physics_client):
            "kw">self.physics_client = physics_client
            
            # 参数范围
            "kw">self.params = {
                'friction': (0.5, 2.0),
                'mass': (0.8, 1.2),
                'gravity': (9.5, 10.0),
                'camera_noise': (0, 0.05),
                'light_intensity': (0.7, 1.3)
            }
        
        "kw">def randomize("kw">self):
            """应用随机化"""
            "kw">self._randomize_physics()
            "kw">self._randomize_visual()
        
        "kw">def _randomize_physics("kw">self):
            """随机化物理参数"""
            # 摩擦力
            friction = np.random.uniform(*"kw">self.params['friction'])
            "kw">for i "kw">in "kw">range(p.getNumBodies()):
                p.changeDynamics(i, -1, lateralFriction=friction)
            
            # 重力
            gravity = np.random.uniform(*"kw">self.params['gravity'])
            p.setGravity(0, 0, -gravity)
        
        "kw">def _randomize_visual("kw">self):
            """随机化视觉参数"""
            # 可以在这里修改渲染参数
            "kw">pass
        
        "kw">def get_camera_noise("kw">self):
            """获取相机噪声"""
            "kw">class="kw">return np.random.normal(0, "kw">self.params['camera_noise'][1], size=(224, 224, 3))
        
        "kw">def apply_camera_noise("kw">self, image):
            """对图像添加噪声"""
            noise = "kw">self.get_camera_noise()
            noisy_image = np.clip(image + noise * 255, 0, 255).astype(np.uint8)
            "kw">class="kw">return noisy_image
    
    
    "kw">class TextureRandomizer:
        """纹理随机化"""
        
        "kw">def __init__("kw">self):
            "kw">self.textures = []
        
        "kw">def load_textures("kw">self, texture_paths):
            """加载纹理"""
            "kw">for path "kw">in texture_paths:
                texture_id = p.loadTexture(path)
                "kw">self.textures.append(texture_id)
        
        "kw">def apply_random_texture("kw">self, body_id):
            """应用随机纹理"""
            "kw">if "kw">self.textures:
                texture_id = np.random.choice("kw">self.textures)
                p.changeVisualShape(
                    body_id, -1,
                    textureUniqueId=texture_id
                )

    6.3 域适应

    创建 utils/adaptation.py


    """
    域适应方法
    """
    
    "kw">class="kw">import torch
    "kw">class="kw">import torch.nn "kw">as nn
    
    
    "kw">class DomainAdaptationNetwork(nn.Module):
        """
        域适应网络
        
        学习域不变特征
        """
        
        "kw">def __init__("kw">self, feature_dim=256):
            super().__init__()
            
            # 特征提取器
            "kw">self.feature_extractor = nn.Sequential(
                nn.Conv2d(3, 32, 3, stride=2),
                nn.ReLU(),
                nn.Conv2d(32, 64, 3, stride=2),
                nn.ReLU(),
                nn.Conv2d(64, 128, 3, stride=2),
                nn.ReLU(),
                nn.AdaptiveAvgPool2d((7, 7)),
                nn.Flatten(),
                nn.Linear(128 * 7 * 7, feature_dim)
            )
            
            # 域分类器(用于对抗训练)
            "kw">self.domain_classifier = nn.Sequential(
                nn.Linear(feature_dim, 128),
                nn.ReLU(),
                nn.Linear(128, 1)
            )
        
        "kw">def forward("kw">self, x, alpha=1.0):
            """
            前向传播
            
            参数:
                x: 输入图像
                alpha: 梯度反转强度
            """
            features = "kw">self.feature_extractor(x)
            
            # 域预测(带梯度反转)
            "kw">if "kw">self.training:
                domain_output = "kw">self.domain_classifier(features)
                # 梯度反转
                domain_output = domain_output * -alpha
            "kw">else:
                domain_output = "kw">None
            
            "kw">class="kw">return features, domain_output
    
    
    "kw">def train_domain_adaptation(source_data, target_data, model, epochs=100):
        """
        训练域适应
        
        目标是最小化源域和目标域之间的差异
        """
        optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
        criterion = nn.BCEWithLogitsLoss()
        
        "kw">for epoch "kw">in "kw">range(epochs):
            # 源域损失
            source_features, source_domain = model(source_data)
            source_loss = criterion(source_domain, torch.ones("kw">len(source_data), 1))
            
            # 目标域损失
            target_features, target_domain = model(target_data)
            target_loss = criterion(target_domain, torch.zeros("kw">len(target_data), 1))
            
            # 特征对齐损失
            feature_loss = torch.mean(torch.abs(source_features - target_features))
            
            # 总损失
            total_loss = source_loss + target_loss + 0.1 * feature_loss
            
            optimizer.zero_grad()
            total_loss.backward()
            optimizer.step()
            
            "kw">if (epoch + 1) % 10 == 0:
                "kw">print(f"Epoch {epoch+1}: loss={total_loss.item():.4f}")

    ---


    9. 项目总结

    完成清单

  • ✅ 创建PyBullet抓取仿真环境
  • ✅ 实现抓取检测网络(GraspNet)
  • ✅ 实现视觉伺服控制器
  • ✅ 训练PPO抓取策略
  • ✅ 域随机化和Sim-to-Real基础
  • 扩展方向

  • 更真实的仿真:使用Gazebo + MoveIt
  • 更好的抓取检测:使用深度学习的抓取检测
  • 多指抓取:扩展到灵巧手抓取
  • 实际部署:在真实机器人上测试
  • 参考资源

  • PyBullet文档
  • GraspNet论文
  • Visual Servoing书籍
  • ---


    下一步:继续学习阶段七的具身智能核心算法