进阶 · 3-4周
项目二:机器人抓取仿真
使用PyBullet物理引擎构建完整抓取系统,包括抓取检测、视觉伺服和PPO策略训练
1. 项目概述
本项目将实现一个完整的机器人抓取系统,包括:
难度:⭐⭐⭐⭐
预计时间:3-4周
前置知识:Python、深度学习、ROS基础
---
2. 目录
---
3. 1. 项目环境搭建
1.1 安装依赖
创建 projects/robot_grasping/requirements.txt:
# 物理仿真
pybullet>=3.2.0
# 深度学习
torch>=1.10.0
torchvision>=0.11.0
# 视觉
opencv-python>=4.5.0
pillow>=8.3.0
# 工具库
numpy>=1.21.0
matplotlib>=3.4.0
scipy>=1.7.0
pyyaml>=6.0
安装:
cd projects/robot_grasping
pip install -r requirements.txt
1.2 项目目录结构
robot_grasping/
├── envs/
│ ├── __init__.py
│ └── grasping_env.py # 抓取仿真环境
├── models/
│ ├── __init__.py
│ ├── graspnet.py # 抓取检测网络
│ └── policy.py # 抓取策略网络
├── utils/
│ ├── __init__.py
│ └── transforms.py # 坐标变换工具
├── train_graspnet.py # 训练抓取网络
├── train_rl.py # 训练RL策略
├── evaluate.py # 评估
└── configs/
└── default.yaml # 配置文件
---
4. 2. PyBullet物理仿真基础
2.1 PyBullet简介
PyBullet是一个Python接口,用于Bullet物理引擎。它提供:
2.2 创建抓取仿真环境
创建 envs/grasping_env.py:
"""
机器人抓取仿真环境
使用PyBullet实现物理仿真
"""
"kw">class="kw">import pybullet "kw">as p
"kw">class="kw">import pybullet_data
"kw">class="kw">import numpy "kw">as np
"kw">from gymnasium "kw">class="kw">import spaces
"kw">class="kw">import os
"kw">class RobotGraspingEnv:
"""
机器人抓取仿真环境
包含:
- 机械臂模型
- 目标物体
- 相机系统
"""
"kw">def __init__("kw">self, render="kw">False, gui="kw">True):
"""
初始化仿真环境
参数:
render: 是否渲染
gui: 是否使用GUI界面
"""
"kw">self.render = render
"kw">self.gui = gui
# 连接物理引擎
"kw">if gui:
"kw">self.physics_client = p.connect(p.GUI)
"kw">else:
"kw">self.physics_client = p.connect(p.DIRECT)
# 设置搜索路径
p.setAdditionalSearchPath(pybullet_data.getDataPath())
# 仿真参数
"kw">self.time_step = 1.0 / 240.0 # 240Hz
"kw">self.max_iterations = 500
# 初始化环境
"kw">self._setup_world()
"kw">self._setup_robot()
"kw">self._setup_objects()
"kw">self._setup_camera()
# 状态和动作空间
"kw">self.observation_space = spaces.Box(
low=0, high=255, shape=(224, 224, 3), dtype=np.uint8
)
"kw">self.action_space = spaces.Box(
low=-1, high=1, shape=(7,), dtype=np.float32
)
"kw">def _setup_world("kw">self):
"""设置物理世界"""
# 重力
p.setGravity(0, 0, -9.81)
# 时间步
p.setTimeStep("kw">self.time_step)
# 渲染选项
p.configureDebugVisualizer(p.COV_ENABLE_TINY_RENDERER, 0)
p.configureDebugVisualizer(p.COV_ENABLE_GUI, 0)
# 添加地面
plane_id = p.loadURDF("plane.urdf")
p.changeVisualShape(plane_id, -1, rgbaColor=[0.5, 0.5, 0.5, 1])
"kw">def _setup_robot("kw">self):
"""设置机械臂"""
# 加载Franka Panda机械臂
robot_path = os.path.join(
pybullet_data.getDataPath(),
"franka_panda/panda.urdf"
)
"kw">self.robot_id = p.loadURDF(
robot_path,
basePosition=[0, 0, 0],
baseOrientation=[0, 0, 0, 1],
useMaximalCoordinates="kw">False
)
# 机械臂配置
"kw">self.num_joints = p.getNumJoints("kw">self.robot_id)
# 获取关节信息
"kw">self.joint_indices = []
"kw">self.joint_limits = []
"kw">for i "kw">in "kw">range("kw">self.num_joints):
joint_info = p.getJointInfo("kw">self.robot_id, i)
"kw">if joint_info[2] != p.JOINT_FIXED:
"kw">self.joint_indices.append(i)
lower = joint_info[8]
upper = joint_info[9]
"kw">self.joint_limits.append((lower, upper))
# 设置控制模式
p.setJointMotorControlArray(
"kw">self.robot_id,
"kw">self.joint_indices,
p.POSITION_CONTROL,
forces=[500] * "kw">len("kw">self.joint_indices)
)
# 末端执行器索引(最后一个关节)
"kw">self.end_effector_index = 11 # Panda的末端
"kw">def _setup_objects("kw">self):
"""设置目标物体"""
# 随机生成物体位置
"kw">self.object_ids = []
# 添加一个立方体作为抓取目标
object_position = [
0.4 + np.random.uniform(-0.1, 0.1),
0.0 + np.random.uniform(-0.1, 0.1),
0.05
]
object_orientation = p.getQuaternionFromEuler([
0, 0, np.random.uniform(0, np.pi)
])
collision_shape = p.createCollisionShape(
p.GEOM_BOX,
halfExtents=[0.03, 0.03, 0.03]
)
visual_shape = p.createVisualShape(
p.GEOM_BOX,
halfExtents=[0.03, 0.03, 0.03],
rgbaColor=[1, 0.2, 0.2, 1]
)
"kw">self.object_id = p.createMultiBody(
baseMass=0.1,
baseCollisionShapeIndex=collision_shape,
baseVisualShapeIndex=visual_shape,
basePosition=object_position,
baseOrientation=object_orientation
)
"kw">self.object_ids.append("kw">self.object_id)
"kw">def _setup_camera("kw">self):
"""设置相机参数"""
"kw">self.camera_config = {
'width': 224,
'height': 224,
'fov': 60,
'near': 0.01,
'far': 10.0,
'eye_position': [0.5, 0, 0.8],
'target_position': [0.3, 0, 0],
'up_vector': [0, 0, 1]
}
"kw">def reset("kw">self):
"""重置环境"""
# 重置机械臂到初始位置
initial_positions = [0, -0.785, 0, -2.356, 0, 1.571, 0.785, 0, 0]
"kw">for i, pos "kw">in enumerate(initial_positions[:"kw">len("kw">self.joint_indices)]):
p.resetJointState("kw">self.robot_id, "kw">self.joint_indices[i], pos)
# 移除旧物体
"kw">for obj_id "kw">in "kw">self.object_ids:
p.removeBody(obj_id)
# 添加新物体
"kw">self._setup_objects()
# 执行几步仿真稳定
"kw">for _ "kw">in "kw">range(100):
p.stepSimulation()
# 获取观察
observation = "kw">self._get_observation()
"kw">class="kw">return observation, {}
"kw">def step("kw">self, action):
"""执行一步仿真"""
# 解码动作
target_positions = "kw">self._decode_action(action)
# 控制机械臂
p.setJointMotorControlArray(
"kw">self.robot_id,
"kw">self.joint_indices[:7], # 前7个关节
p.POSITION_CONTROL,
targetPositions=target_positions,
forces=[500] * 7
)
# 仿真一步
"kw">for _ "kw">in "kw">range(10): # 10个子步骤
p.stepSimulation()
# 获取观察和奖励
observation = "kw">self._get_observation()
reward = "kw">self._calculate_reward()
# 检查完成
done = "kw">self._check_done()
info = {
'object_position': "kw">self._get_object_position(),
'end_effector_position': "kw">self._get_end_effector_position()
}
"kw">class="kw">return observation, reward, done, "kw">False, info
"kw">def _decode_action("kw">self, action):
"""将动作向量解码为关节目标位置"""
# 动作 = 目标关节角度(相对于当前)
current_positions = []
"kw">for joint_idx "kw">in "kw">self.joint_indices[:7]:
state = p.getJointState("kw">self.robot_id, joint_idx)
current_positions.append(state[0])
target_positions = np.array(current_positions) + np.array(action[:7]) * 0.1
"kw">class="kw">return target_positions.tolist()
"kw">def _get_observation("kw">self):
"""获取RGB图像观察"""
# 计算相机视图矩阵
view_matrix = p.computeViewMatrixFromYawPitchRoll(
cameraTargetPosition="kw">self.camera_config['target_position'],
distance="kw">self.camera_config['width'] / 2,
yaw=45,
pitch=-30,
roll=0,
upAxisIndex=2
)
# 计算投影矩阵
proj_matrix = p.computeProjectionMatrixFOV(
fov="kw">self.camera_config['fov'],
aspect="kw">self.camera_config['width'] / "kw">self.camera_config['height'],
nearVal="kw">self.camera_config['near'],
farVal="kw">self.camera_config['far']
)
# 渲染图像
(_, _, img, _, _) = p.getCameraImage(
width="kw">self.camera_config['width'],
height="kw">self.camera_config['height'],
viewMatrix=view_matrix,
projectionMatrix=proj_matrix,
renderer=p.ER_BULLET_HARDWARE_OPENGL
)
# 处理图像(RGBA转RGB)
img = np.array(img, dtype=np.uint8)
img = img[:, :, :3] # 去掉alpha通道
"kw">class="kw">return img
"kw">def _calculate_reward("kw">self):
"""计算奖励"""
object_pos = "kw">self._get_object_position()
ee_pos = "kw">self._get_end_effector_position()
# 抓取判定:末端执行器靠近物体
distance = np.linalg.norm(np.array(object_pos) - np.array(ee_pos))
reward = 0
"kw">if distance < 0.05: # 抓取范围
reward += 5.0
# 惩罚距离
reward -= distance * 0.5
# 物体高度奖励
"kw">if object_pos[2] > 0.1: # 举起物体
reward += object_pos[2] * 10
"kw">class="kw">return reward
"kw">def _check_done("kw">self):
"""检查是否完成"""
object_pos = "kw">self._get_object_position()
# 检查物体是否被举起
"kw">if object_pos[2] > 0.15:
"kw">class="kw">return "kw">True
"kw">class="kw">return "kw">False
"kw">def _get_object_position("kw">self):
"""获取物体位置"""
pos, orn = p.getBasePositionAndOrientation("kw">self.object_id)
"kw">class="kw">return pos
"kw">def _get_end_effector_position("kw">self):
"""获取末端执行器位置"""
link_state = p.getLinkState("kw">self.robot_id, "kw">self.end_effector_index)
"kw">class="kw">return link_state[0]
"kw">def get_gripper_width("kw">self):
"""获取夹爪开度"""
# Panda的夹爪是最后一个关节
gripper_joint = "kw">self.joint_indices[-1]
state = p.getJointState("kw">self.robot_id, gripper_joint)
"kw">class="kw">return state[0]
"kw">def close("kw">self):
"""关闭环境"""
p.disconnect()
"kw">def render("kw">self):
"""渲染当前状态"""
"kw">class="kw">return "kw">self._get_observation()
"kw">def test_environment():
"""测试环境"""
"kw">print("创建抓取仿真环境...")
env = RobotGraspingEnv(render="kw">True, gui="kw">True)
"kw">print("测试重置...")
obs, _ = env.reset()
"kw">print(f"观察形状: {obs.shape}")
"kw">print("测试执行动作...")
"kw">for i "kw">in "kw">range(20):
action = np.random.uniform(-0.5, 0.5, 7)
obs, reward, done, _, info = env.step(action)
"kw">if i % 5 == 0:
"kw">print(f"步骤{i}: reward={reward:.3f}, done={done}")
"kw">if done:
"kw">print("任务完成!")
"kw">break
env.close()
"kw">print("环境测试完成!")
"kw">if __name__ == '__main__':
test_environment()
2.3 运行测试
python -m envs.grasping_env
---
5. 3. 抓取检测网络实现
3.1 GraspNet架构
抓取检测网络预测每个像素位置的最佳抓取参数:
创建 models/graspnet.py:
"""
抓取检测网络 (GraspNet)
基于RGB-D图像预测抓取参数
"""
"kw">class="kw">import torch
"kw">class="kw">import torch.nn "kw">as nn
"kw">class="kw">import torch.nn.functional "kw">as F
"kw">class GraspNet(nn.Module):
"""
抓取检测网络
输入: RGB-D图像
输出:
- quality: 抓取质量分数 (H, W, 1)
- angle: 抓取角度 (H, W, 18) - 0-180度,每10度一个通道
- width: 抓取宽度 (H, W, 1)
"""
"kw">def __init__("kw">self, input_channels=4, num_angle_bins=18):
super().__init__()
"kw">self.num_angle_bins = num_angle_bins
# 编码器(ResNet风格)
"kw">self.encoder = nn.Sequential(
# Block 1
"kw">self._conv_block(input_channels, 32, kernel_size=3, stride=2),
"kw">self._conv_block(32, 32, kernel_size=3, stride=1),
# Block 2
"kw">self._conv_block(32, 64, kernel_size=3, stride=2),
"kw">self._conv_block(64, 64, kernel_size=3, stride=1),
# Block 3
"kw">self._conv_block(64, 128, kernel_size=3, stride=2),
"kw">self._conv_block(128, 128, kernel_size=3, stride=1),
# Block 4
"kw">self._conv_block(128, 256, kernel_size=3, stride=2),
"kw">self._conv_block(256, 256, kernel_size=3, stride=1),
)
# 解码器
"kw">self.decoder = nn.Sequential(
# Decoder Block 1
nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1),
nn.BatchNorm2d(128),
nn.ReLU(inplace="kw">True),
# Decoder Block 2
nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(inplace="kw">True),
# Decoder Block 3
nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1),
nn.BatchNorm2d(32),
nn.ReLU(inplace="kw">True),
# Decoder Block 4
nn.ConvTranspose2d(32, 32, kernel_size=4, stride=2, padding=1),
nn.BatchNorm2d(32),
nn.ReLU(inplace="kw">True),
)
# 预测头
"kw">self.quality_head = nn.Conv2d(32, 1, kernel_size=1)
"kw">self.angle_head = nn.Conv2d(32, num_angle_bins, kernel_size=1)
"kw">self.width_head = nn.Conv2d(32, 1, kernel_size=1)
"kw">def _conv_block("kw">self, in_channels, out_channels, kernel_size=3, stride=1):
"""卷积块"""
padding = kernel_size // 2
"kw">class="kw">return nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace="kw">True),
nn.Conv2d(out_channels, out_channels, kernel_size, 1, padding),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace="kw">True)
)
"kw">def forward("kw">self, x):
"""
前向传播
参数:
x: 输入张量 (B, C, H, W)
C = 4 (RGB + Depth)
返回:
quality: 抓取质量 (B, 1, H, W)
angle: 抓取角度 (B, 18, H, W)
width: 抓取宽度 (B, 1, H, W)
"""
# 编码
features = "kw">self.encoder(x)
# 解码
decoded = "kw">self.decoder(features)
# 预测
quality = torch.sigmoid("kw">self.quality_head(decoded))
angle = "kw">self.angle_head(decoded) # softmax在损失函数中
width = torch.sigmoid("kw">self.width_head(decoded)) * 0.15 # 最大宽度15cm
"kw">class="kw">return quality, angle, width
"kw">def predict_grasp("kw">self, x):
"""
预测最佳抓取
参数:
x: 输入图像 (1, C, H, W)
返回:
best_grasp: (x, y, theta, width) 归一化到[0,1]
"""
"kw">self.eval()
"kw">with torch.no_grad():
quality, angle, width = "kw">self.forward(x)
# 找到最高质量的位置
quality = quality.squeeze().cpu().numpy()
angle_map = angle.squeeze().cpu().numpy()
width_map = width.squeeze().cpu().numpy()
# 质量最大的位置
idx = np.argmax(quality)
y, x = idx // quality.shape[1], idx % quality.shape[1]
# 角度
angle_probs = angle_map[:, y, x]
angle_idx = np.argmax(angle_probs)
theta = angle_idx * (180 / "kw">self.num_angle_bins)
# 宽度
w = width_map[0, y, x]
"kw">class="kw">return {
'x': x / quality.shape[1],
'y': y / quality.shape[0],
'theta': theta / 180,
'width': w
}
"kw">class GraspLoss(nn.Module):
"""抓取检测损失函数"""
"kw">def __init__("kw">self, angle_weight=1.0, width_weight=0.5):
super().__init__()
"kw">self.angle_weight = angle_weight
"kw">self.width_weight = width_weight
"kw">def forward("kw">self, pred_quality, pred_angle, pred_width,
target_quality, target_angle, target_width, target_mask):
"""
计算损失
参数:
pred_*: 预测值
target_*: 目标值
target_mask: 有效样本掩码
"""
# 质量损失(二元交叉熵)
quality_loss = F.binary_cross_entropy(
pred_quality, target_quality, reduction='none'
)
quality_loss = (quality_loss * target_mask).sum() / (target_mask.sum() + 1e-6)
# 角度损失(交叉熵)
angle_loss = F.cross_entropy(
pred_angle, target_angle.long(), reduction='none'
)
angle_loss = (angle_loss * target_mask.squeeze(1)).sum() / (target_mask.sum() + 1e-6)
# 宽度损失(L1)
width_loss = F.l1_loss(
pred_width, target_width, reduction='none'
)
width_loss = (width_loss * target_mask).sum() / (target_mask.sum() + 1e-6)
# 总损失
total_loss = quality_loss + "kw">self.angle_weight * angle_loss + "kw">self.width_weight * width_loss
"kw">class="kw">return total_loss, {
'quality_loss': quality_loss.item(),
'angle_loss': angle_loss.item(),
'width_loss': width_loss.item()
}
"kw">def create_model(input_channels=4, num_angle_bins=18):
"""创建抓取网络模型"""
model = GraspNet(input_channels=input_channels, num_angle_bins=num_angle_bins)
"kw">class="kw">return model
# 测试模型
"kw">if __name__ == '__main__':
model = create_model()
"kw">print(f"模型参数数量: {sum(p.numel() "kw">for p "kw">in model.parameters()):,}")
# 测试前向传播
x = torch.randn(2, 4, 224, 224)
quality, angle, width = model(x)
"kw">print(f"输入: {x.shape}")
"kw">print(f"质量: {quality.shape}")
"kw">print(f"角度: {angle.shape}")
"kw">print(f"宽度: {width.shape}")
---
6. 4. 视觉伺服控制
4.1 视觉伺服基础
视觉伺服(Visual Servoing)使用视觉反馈控制机器人运动。
两种控制策略:
创建 utils/visual_servo.py:
"""
视觉伺服控制器
"""
"kw">class="kw">import numpy "kw">as np
"kw">class="kw">import torch
"kw">from scipy.spatial.transform "kw">class="kw">import Rotation "kw">as R
"kw">class ImageBasedVisualServo:
"""
基于图像的视觉伺服控制器
控制末端执行器跟踪图像中的目标点
"""
"kw">def __init__("kw">self, camera_config, lambda_=0.5):
"""
初始化视觉伺服控制器
参数:
camera_config: 相机内参和配置
lambda_: 控制增益
"""
"kw">self.K = camera_config['intrinsics'] # 相机内参矩阵
"kw">self.lambda_ = lambda_
# 交互矩阵参数
"kw">self.Z = 0.5 # 假设深度
"kw">def compute_interaction_matrix("kw">self, point, Z="kw">None):
"""
计算交互矩阵(2×6)
参数:
point: 图像点 (u, v)
Z: 点的深度
返回:
L: 交互矩阵
"""
"kw">if Z "kw">is "kw">None:
Z = "kw">self.Z
u, v = point
L = np.array([
[-1/Z, 0, u/Z, u*v, -(1+u**2), v],
[0, -1/Z, v/Z, 1+v**2, -u*v, -u]
])
"kw">class="kw">return L
"kw">def compute_velocity("kw">self, current_points, desired_points, Z="kw">None):
"""
计算末端执行器速度
参数:
current_points: 当前图像点
desired_points: 期望图像点
Z: 深度
返回:
v: 线速度和角速度 (6,)
"""
# 计算图像误差
error = np.array(current_points) - np.array(desired_points)
error = error.flatten()
# 堆叠交互矩阵
L_total = []
"kw">for cp "kw">in current_points:
L_total.append("kw">self.compute_interaction_matrix(cp, Z))
L_total = np.vstack(L_total)
# 计算伪逆
"kw">try:
L_pinv = np.linalg.pinv(L_total)
v = -"kw">self.lambda_ * L_pinv @ error
"kw">class="kw">except:
v = np.zeros(6)
"kw">class="kw">return v
"kw">def compute_joint_velocities("kw">self, end_effector_pose, velocity, jacobian):
"""
将末端速度转换为关节速度
参数:
end_effector_pose: 末端执行器位姿 (x, y, z, roll, pitch, yaw)
velocity: 末端速度 (vx, vy, vz, wx, wy, wz)
jacobian: 雅可比矩阵
返回:
q_dot: 关节速度
"""
# 速度到齐次变换
T = "kw">self.pose_to_matrix(end_effector_pose)
# 提取旋转部分
R_ee = T[:3, :3]
# 速度转换(从末端坐标系到基座坐标系)
v_linear = R_ee @ velocity[:3]
v_angular = R_ee @ velocity[3:]
# 组合速度
v_cartesian = np.concatenate([v_linear, v_angular])
# 计算关节速度
q_dot = np.linalg.pinv(jacobian) @ v_cartesian
"kw">class="kw">return q_dot
@staticmethod
"kw">def pose_to_matrix(pose):
"""将位姿转为齐次变换矩阵"""
position = pose[:3]
euler = pose[3:]
rotation = R.from_euler('xyz', euler)
R_matrix = rotation.as_matrix()
T = np.eye(4)
T[:3, :3] = R_matrix
T[:3, 3] = position
"kw">class="kw">return T
"kw">def pixel_to_3d("kw">self, u, v, Z="kw">None):
"""
将像素坐标转换为3D点
参数:
u, v: 像素坐标
Z: 深度
返回:
point_3d: 3D点 (x, y, z)
"""
"kw">if Z "kw">is "kw">None:
Z = "kw">self.Z
fx, fy = "kw">self.K[0, 0], "kw">self.K[1, 1]
cx, cy = "kw">self.K[0, 2], "kw">self.K[1, 2]
x = (u - cx) * Z / fx
y = (v - cy) * Z / fy
"kw">class="kw">return np.array([x, y, Z])
"kw">class PositionBasedVisualServo:
"""
基于位置的视觉伺服控制器
估计目标3D位置,控制机械臂到达
"""
"kw">def __init__("kw">self, camera_config, lambda_=0.5):
"kw">self.K = camera_config['intrinsics']
"kw">self.lambda_ = lambda_
"kw">def estimate_3d_position("kw">self, point, depth_map):
"""
从深度图估计3D位置
参数:
point: 图像点 (u, v)
depth_map: 深度图
返回:
position_3d: 3D位置
"""
u, v = int(point[0]), int(point[1])
"kw">if v < depth_map.shape[0] "kw">and u < depth_map.shape[1]:
Z = depth_map[v, u]
"kw">else:
Z = 0.5 # 默认深度
fx, fy = "kw">self.K[0, 0], "kw">self.K[1, 1]
cx, cy = "kw">self.K[0, 2], "kw">self.K[1, 2]
x = (u - cx) * Z / fx
y = (v - cy) * Z / fy
"kw">class="kw">return np.array([x, y, Z])
"kw">def compute_end_effector_velocity("kw">self, current_pos, target_pos):
"""
计算末端执行器速度
参数:
current_pos: 当前末端位置
target_pos: 目标位置
返回:
velocity: 末端速度 (vx, vy, vz, wx, wy, wz)
"""
# 位置误差
position_error = np.array(target_pos) - np.array(current_pos)
# 线速度(比例控制)
v_linear = "kw">self.lambda_ * position_error
# 角速度(设为0,简化为纯位置控制)
v_angular = np.zeros(3)
"kw">class="kw">return np.concatenate([v_linear, v_angular])
"kw">class HybridVisualServo:
"""
混合视觉伺服控制器
结合位置和图像控制
"""
"kw">def __init__("kw">self, camera_config, lambda_=0.5, alpha=0.5):
"kw">self.position_controller = PositionBasedVisualServo(camera_config, lambda_)
"kw">self.image_controller = ImageBasedVisualServo(camera_config, lambda_)
"kw">self.alpha = alpha # 混合权重
"kw">def compute_velocity("kw">self, current_pos, current_pixels,
target_pos, target_pixels, depth_map):
"""
计算混合速度
参数:
current_pos: 当前末端位置
current_pixels: 当前图像点
target_pos: 目标位置
target_pixels: 目标图像点
depth_map: 深度图
"""
# 位置控制
v_position = "kw">self.position_controller.compute_end_effector_velocity(
current_pos, target_pos
)
# 图像控制
target_3d = "kw">self.position_controller.estimate_3d_position(target_pixels, depth_map)
current_3d = "kw">self.position_controller.estimate_3d_position(current_pixels, depth_map)
v_image = "kw">self.image_controller.compute_velocity(
[current_pixels], [target_pixels],
Z=(target_3d[2] + current_3d[2]) / 2
)
# 混合
v_total = "kw">self.alpha * v_position + (1 - "kw">self.alpha) * np.append(v_image, np.zeros(3))
"kw">class="kw">return v_total
---
7. 5. 强化学习抓取策略
5.1 抓取任务建模
状态:RGB-D图像 + 机械臂关节状态
动作:末端执行器位姿变化
奖励:成功抓取正奖励,距离惩罚
5.2 PPO抓取策略
创建 models/grasp_policy.py:
"""
抓取强化学习策略
使用PPO算法训练
"""
"kw">class="kw">import torch
"kw">class="kw">import torch.nn "kw">as nn
"kw">class="kw">import torch.nn.functional "kw">as F
"kw">from torch.distributions "kw">class="kw">import Normal
"kw">class GraspPolicyNetwork(nn.Module):
"""
抓取策略网络
输入:
- 图像特征 (来自CNN)
- 机械臂状态
输出:
- 动作均值和方差
"""
"kw">def __init__("kw">self, image_feature_dim=512, state_dim=14, action_dim=7, hidden_dim=256):
super().__init__()
"kw">self.action_dim = action_dim
# 图像特征处理
"kw">self.image_conv = nn.Sequential(
nn.Conv2d(3, 32, 3, stride=2),
nn.ReLU(),
nn.Conv2d(32, 64, 3, stride=2),
nn.ReLU(),
nn.Conv2d(64, 128, 3, stride=2),
nn.ReLU(),
nn.AdaptiveAvgPool2d((7, 7)),
nn.Flatten()
)
# 特征融合
"kw">self.fusion = nn.Sequential(
nn.Linear(128 * 7 * 7 + state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU()
)
# 动作均值
"kw">self.mean_head = nn.Linear(hidden_dim, action_dim)
# 动作方差(可学习或固定)
"kw">self.log_std = nn.Parameter(torch.zeros(action_dim))
"kw">def forward("kw">self, image, state):
"""
前向传播
参数:
image: 图像 (B, 3, 224, 224)
state: 机械臂状态 (B, 14) = 7关节位置 + 7关节速度
返回:
action_mean: 动作均值
action_std: 动作标准差
"""
# 图像特征
image_features = "kw">self.image_conv(image)
# 融合
combined = torch.cat([image_features, state], dim=-1)
fused = "kw">self.fusion(combined)
# 动作均值
action_mean = "kw">self.mean_head(fused)
# 动作标准差
action_std = torch.exp("kw">self.log_std).expand_as(action_mean)
"kw">class="kw">return action_mean, action_std
"kw">def sample_action("kw">self, image, state, deterministic="kw">False):
"""
采样动作
参数:
deterministic: 是否使用确定性策略
"""
action_mean, action_std = "kw">self.forward(image, state)
"kw">if deterministic:
"kw">class="kw">return action_mean
dist = Normal(action_mean, action_std)
action = dist.sample()
action = torch.tanh(action) # 限制到[-1, 1]
"kw">class="kw">return action
"kw">def get_log_prob("kw">self, image, state, action):
"""计算动作的对数概率"""
action_mean, action_std = "kw">self.forward(image, state)
dist = Normal(action_mean, action_std)
log_prob = dist.log_prob(action)
"kw">class="kw">return log_prob.sum(dim=-1)
"kw">class GraspPPO:
"""
抓取PPO算法实现
"""
"kw">def __init__("kw">self, image_feature_dim, state_dim, action_dim,
lr=3e-4, gamma=0.99, lamda=0.95, clip_epsilon=0.2,
value_coef=0.5, entropy_coef=0.01):
"kw">self.gamma = gamma
"kw">self.lamda = lamda
"kw">self.clip_epsilon = clip_epsilon
"kw">self.value_coef = value_coef
"kw">self.entropy_coef = entropy_coef
# 网络
"kw">self.policy = GraspPolicyNetwork(image_feature_dim, state_dim, action_dim)
"kw">self.value_network = nn.Sequential(
nn.Linear(128 * 7 * 7 + state_dim, 256),
nn.ReLU(),
nn.Linear(256, 1)
)
# 优化器
"kw">self.optimizer = torch.optim.Adam(
list("kw">self.policy.parameters()) + list("kw">self.value_network.parameters()),
lr=lr
)
"kw">def compute_gae("kw">self, rewards, values, dones, next_value):
"""计算GAE"""
advantages = []
gae = 0
next_value = torch.tensor(next_value) "kw">if "kw">not isinstance(next_value, torch.Tensor) "kw">else next_value
"kw">for t "kw">in reversed("kw">range("kw">len(rewards))):
"kw">if t == "kw">len(rewards) - 1:
next_val = next_value
"kw">else:
next_val = values[t + 1]
delta = rewards[t] + "kw">self.gamma * next_val * (1 - dones[t]) - values[t]
gae = delta + "kw">self.gamma * "kw">self.lamda * (1 - dones[t]) * gae
advantages.insert(0, gae)
returns = [adv + val "kw">for adv, val "kw">in zip(advantages, values)]
"kw">class="kw">return advantages, returns
"kw">def update("kw">self, batch):
"""更新策略"""
states, actions, old_log_probs, advantages, returns = batch
# 新策略
new_log_probs = "kw">self.policy.get_log_prob(states, actions)
entropy = "kw">self.policy.get_log_std.mean()
# PPO损失
ratio = torch.exp(new_log_probs - old_log_probs)
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 1 - "kw">self.clip_epsilon, 1 + "kw">self.clip_epsilon) * advantages
policy_loss = -torch.min(surr1, surr2).mean()
entropy_loss = -entropy * "kw">self.entropy_coef
# 价值损失
values = "kw">self.value_network(states).squeeze()
value_loss = F.mse_loss(values, returns)
# 总损失
total_loss = policy_loss + entropy_loss + "kw">self.value_coef * value_loss
# 反向传播
"kw">self.optimizer.zero_grad()
total_loss.backward()
torch.nn.utils.clip_grad_norm_("kw">self.policy.parameters(), 0.5)
"kw">self.optimizer.step()
"kw">class="kw">return {
'policy_loss': policy_loss.item(),
'value_loss': value_loss.item(),
'entropy': entropy.item()
}
"kw">def save("kw">self, path):
"""保存模型"""
torch.save({
'policy': "kw">self.policy.state_dict(),
'value': "kw">self.value_network.state_dict()
}, path)
"kw">def load("kw">self, path):
"""加载模型"""
checkpoint = torch.load(path)
"kw">self.policy.load_state_dict(checkpoint['policy'])
"kw">self.value_network.load_state_dict(checkpoint['value'])
---
8. 6. Sim-to-Real迁移
6.1 Sim-to-Real挑战
仿真到真实的迁移面临以下挑战:
6.2 域随机化
创建 utils/domain_randomization.py:
"""
域随机化工具
增加仿真多样性以提高迁移成功率
"""
"kw">class="kw">import numpy "kw">as np
"kw">class="kw">import pybullet "kw">as p
"kw">class DomainRandomizer:
"""
域随机化
随机化仿真参数以提高泛化能力
"""
"kw">def __init__("kw">self, physics_client):
"kw">self.physics_client = physics_client
# 参数范围
"kw">self.params = {
'friction': (0.5, 2.0),
'mass': (0.8, 1.2),
'gravity': (9.5, 10.0),
'camera_noise': (0, 0.05),
'light_intensity': (0.7, 1.3)
}
"kw">def randomize("kw">self):
"""应用随机化"""
"kw">self._randomize_physics()
"kw">self._randomize_visual()
"kw">def _randomize_physics("kw">self):
"""随机化物理参数"""
# 摩擦力
friction = np.random.uniform(*"kw">self.params['friction'])
"kw">for i "kw">in "kw">range(p.getNumBodies()):
p.changeDynamics(i, -1, lateralFriction=friction)
# 重力
gravity = np.random.uniform(*"kw">self.params['gravity'])
p.setGravity(0, 0, -gravity)
"kw">def _randomize_visual("kw">self):
"""随机化视觉参数"""
# 可以在这里修改渲染参数
"kw">pass
"kw">def get_camera_noise("kw">self):
"""获取相机噪声"""
"kw">class="kw">return np.random.normal(0, "kw">self.params['camera_noise'][1], size=(224, 224, 3))
"kw">def apply_camera_noise("kw">self, image):
"""对图像添加噪声"""
noise = "kw">self.get_camera_noise()
noisy_image = np.clip(image + noise * 255, 0, 255).astype(np.uint8)
"kw">class="kw">return noisy_image
"kw">class TextureRandomizer:
"""纹理随机化"""
"kw">def __init__("kw">self):
"kw">self.textures = []
"kw">def load_textures("kw">self, texture_paths):
"""加载纹理"""
"kw">for path "kw">in texture_paths:
texture_id = p.loadTexture(path)
"kw">self.textures.append(texture_id)
"kw">def apply_random_texture("kw">self, body_id):
"""应用随机纹理"""
"kw">if "kw">self.textures:
texture_id = np.random.choice("kw">self.textures)
p.changeVisualShape(
body_id, -1,
textureUniqueId=texture_id
)
6.3 域适应
创建 utils/adaptation.py:
"""
域适应方法
"""
"kw">class="kw">import torch
"kw">class="kw">import torch.nn "kw">as nn
"kw">class DomainAdaptationNetwork(nn.Module):
"""
域适应网络
学习域不变特征
"""
"kw">def __init__("kw">self, feature_dim=256):
super().__init__()
# 特征提取器
"kw">self.feature_extractor = nn.Sequential(
nn.Conv2d(3, 32, 3, stride=2),
nn.ReLU(),
nn.Conv2d(32, 64, 3, stride=2),
nn.ReLU(),
nn.Conv2d(64, 128, 3, stride=2),
nn.ReLU(),
nn.AdaptiveAvgPool2d((7, 7)),
nn.Flatten(),
nn.Linear(128 * 7 * 7, feature_dim)
)
# 域分类器(用于对抗训练)
"kw">self.domain_classifier = nn.Sequential(
nn.Linear(feature_dim, 128),
nn.ReLU(),
nn.Linear(128, 1)
)
"kw">def forward("kw">self, x, alpha=1.0):
"""
前向传播
参数:
x: 输入图像
alpha: 梯度反转强度
"""
features = "kw">self.feature_extractor(x)
# 域预测(带梯度反转)
"kw">if "kw">self.training:
domain_output = "kw">self.domain_classifier(features)
# 梯度反转
domain_output = domain_output * -alpha
"kw">else:
domain_output = "kw">None
"kw">class="kw">return features, domain_output
"kw">def train_domain_adaptation(source_data, target_data, model, epochs=100):
"""
训练域适应
目标是最小化源域和目标域之间的差异
"""
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.BCEWithLogitsLoss()
"kw">for epoch "kw">in "kw">range(epochs):
# 源域损失
source_features, source_domain = model(source_data)
source_loss = criterion(source_domain, torch.ones("kw">len(source_data), 1))
# 目标域损失
target_features, target_domain = model(target_data)
target_loss = criterion(target_domain, torch.zeros("kw">len(target_data), 1))
# 特征对齐损失
feature_loss = torch.mean(torch.abs(source_features - target_features))
# 总损失
total_loss = source_loss + target_loss + 0.1 * feature_loss
optimizer.zero_grad()
total_loss.backward()
optimizer.step()
"kw">if (epoch + 1) % 10 == 0:
"kw">print(f"Epoch {epoch+1}: loss={total_loss.item():.4f}")
---
9. 项目总结
完成清单
扩展方向
参考资源
---
下一步:继续学习阶段七的具身智能核心算法