进阶 · 3-4周

项目五:RGB-D物体6D位姿估计

构建点云处理流水线:滤波、配准、ICP精炼,输出可供机械臂抓取的6D候选姿态

进阶难度 预计3-4周 9个章节

1. 项目概述

构建基于RGB-D的物体位姿估计流水线,用于机器人抓取:

  • 点云预处理(滤波、降采样、法向量估计)
  • 全局配准(FPFH特征 + RANSAC)
  • ICP精炼
  • 抓取姿态生成
  • 评估与可视化
  • 2. 难度:★★★★☆ (4/5)

    3. 预估时间:3-4周

    ---


    4. 1. 项目结构

    rgbd_pose/
    ├── data/
    │   ├── models/          # 物体CAD模型 (ply/obj)
    │   ├── scenes/          # 场景点云
    │   └── results/         # 评估结果
    ├── src/
    │   ├── preprocess.py    # 点云预处理
    │   ├── global_reg.py    # 全局粗配准
    │   ├── icp_refine.py    # ICP精配准
    │   ├── grasp_gen.py     # 抓取姿态生成
    │   ├── evaluate.py      # 评估脚本
    │   └── visualize.py     # 可视化
    ├── config/
    │   └── params.yaml
    └── requirements.txt

    ---


    5. 2. 核心代码

    2.1 点云预处理

    "kw">class="kw">import open3d "kw">as o3d
    "kw">class="kw">import numpy "kw">as np
    
    "kw">class PointCloudProcessor:
        """RGB-D点云预处理流水线"""
    
        "kw">def __init__("kw">self, voxel_size=0.005):
            "kw">self.voxel_size = voxel_size
    
        "kw">def load_rgbd_to_pointcloud("kw">self, rgb_path, depth_path, K, depth_scale=1000.0):
            """从RGB-D图像生成点云"""
            color = o3d.io.read_image(rgb_path)
            depth = o3d.io.read_image(depth_path)
    
            rgbd = o3d.geometry.RGBDImage.create_from_color_and_depth(
                color, depth,
                depth_scale=depth_scale,
                depth_trunc=3.0,
                convert_rgb_to_intensity="kw">False
            )
    
            intrinsics = o3d.camera.PinholeCameraIntrinsic(
                width=color.width, height=color.height,
                fx=K[0,0], fy=K[1,1], cx=K[0,2], cy=K[1,2]
            )
    
            "kw">class="kw">return o3d.geometry.PointCloud.create_from_rgbd_image(
                rgbd, intrinsics
            )
    
        "kw">def preprocess("kw">self, pcd, crop_box="kw">None):
            """预处理流水线"""
            # 1. 裁剪工作空间
            "kw">if crop_box "kw">is "kw">not "kw">None:
                pcd = pcd.crop(
                    o3d.geometry.AxisAlignedBoundingBox(*crop_box)
                )
    
            # 2. 降采样(保持几何结构)
            pcd = pcd.voxel_down_sample("kw">self.voxel_size)
    
            # 3. 统计滤波去除离群点
            pcd, _ = pcd.remove_statistical_outlier(
                nb_neighbors=30, std_ratio=2.0
            )
    
            # 4. 估计法向量
            pcd.estimate_normals(
                o3d.geometry.KDTreeSearchParamHybrid(
                    radius="kw">self.voxel_size * 2, max_nn=30
                )
            )
    
            # 5. 法向量一致性定向(朝向相机)
            pcd.orient_normals_towards_camera_location(
                np.array([0, 0, 0])
            )
    
            "kw">class="kw">return pcd
    
        "kw">def segment_plane_and_objects("kw">self, pcd, distance_threshold=0.01,
                                       ransac_n=3, num_iterations=1000):
            """RANSAC平面分割:分离桌面和物体"""
            # 分割最大平面(桌面)
            plane_model, inliers = pcd.segment_plane(
                distance_threshold=distance_threshold,
                ransac_n=ransac_n,
                num_iterations=num_iterations
            )
            [a, b, c, d] = plane_model
            "kw">print(f"平面方程: {a:.3f}x + {b:.3f}y + {c:.3f}z + {d:.3f} = 0")
    
            # 桌面点云和物体点云
            plane_pcd = pcd.select_by_index(inliers)
            objects_pcd = pcd.select_by_index(inliers, invert="kw">True)
    
            "kw">class="kw">return plane_pcd, objects_pcd
    
        "kw">def cluster_objects("kw">self, objects_pcd, eps=0.02, min_points=100):
            """DBSCAN聚类:分离单个物体"""
            labels = np.array(
                objects_pcd.cluster_dbscan(eps=eps, min_points=min_points)
            )
    
            clusters = []
            "kw">for label "kw">in np.unique(labels):
                "kw">if label == -1:  # 噪声
                    "kw">class="kw">continue
                cluster = objects_pcd.select_by_index(
                    np.where(labels == label)[0]
                )
                clusters.append(cluster)
    
            "kw">print(f"找到 {"kw">len(clusters)} 个物体簇")
            "kw">class="kw">return clusters

    2.2 全局配准 (FPFH + RANSAC)

    "kw">class GlobalRegistrator:
        """FPFH特征 + RANSAC全局粗配准"""
    
        "kw">def __init__("kw">self, voxel_size=0.005):
            "kw">self.voxel_size = voxel_size
    
        "kw">def compute_fpfh("kw">self, pcd):
            """计算FPFH特征"""
            radius_normal = "kw">self.voxel_size * 2
            radius_feature = "kw">self.voxel_size * 5
    
            pcd.estimate_normals(
                o3d.geometry.KDTreeSearchParamHybrid(
                    radius=radius_normal, max_nn=30
                )
            )
    
            fpfh = o3d.pipelines.registration.compute_fpfh_feature(
                pcd,
                o3d.geometry.KDTreeSearchParamHybrid(
                    radius=radius_feature, max_nn=100
                )
            )
            "kw">class="kw">return fpfh
    
        "kw">def register("kw">self, source_pcd, target_pcd):
            """FPFH + RANSAC 全局配准"""
            source_fpfh = "kw">self.compute_fpfh(source_pcd)
            target_fpfh = "kw">self.compute_fpfh(target_pcd)
    
            distance_threshold = "kw">self.voxel_size * 1.5
    
            result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
                source_pcd, target_pcd,
                source_fpfh, target_fpfh,
                mutual_filter="kw">True,
                max_correspondence_distance=distance_threshold,
                estimation_method=o3d.pipelines.registration.
                    TransformationEstimationPointToPoint("kw">False),
                ransac_n=3,
                checkers=[
                    o3d.pipelines.registration.
                        CorrespondenceCheckerBasedOnEdgeLength(0.9),
                    o3d.pipelines.registration.
                        CorrespondenceCheckerBasedOnDistance(distance_threshold),
                ],
                criteria=o3d.pipelines.registration.
                    RANSACConvergenceCriteria(1000000, 500)
            )
    
            "kw">print(f"RANSAC配准: fit={result.fitness:.4f}, rmse={result.inlier_rmse:.4f}")
            "kw">class="kw">return result.transformation, result

    2.3 ICP精配准

    "kw">class ICPRefiner:
        """ICP多阶段精配准"""
    
        "kw">def __init__("kw">self):
            "kw">self.results = []
    
        "kw">def refine("kw">self, source, target, init_transform,
                   max_correspondence_distances=[0.05, 0.02, 0.01, 0.005]):
            """多分辨率ICP"""
            current_transform = init_transform
    
            "kw">for i, dist "kw">in enumerate(max_correspondence_distances):
                reg = o3d.pipelines.registration.registration_icp(
                    source, target, dist, current_transform,
                    o3d.pipelines.registration.
                        TransformationEstimationPointToPoint(),
                    o3d.pipelines.registration.
                        ICPConvergenceCriteria(
                            relative_fitness=1e-6,
                            relative_rmse=1e-6,
                            max_iteration=100
                        )
                )
    
                current_transform = reg.transformation
                "kw">self.results.append({
                    'level': i,
                    'distance_threshold': dist,
                    'fitness': reg.fitness,
                    'inlier_rmse': reg.inlier_rmse,
                })
                "kw">print(f"  Level {i}: dist={dist:.3f}, "
                      f"fitness={reg.fitness:.4f}, rmse={reg.inlier_rmse:.4f}")
    
            "kw">class="kw">return current_transform
    
        "kw">def colored_icp("kw">self, source, target, init_transform):
            """彩色ICP(利用颜色信息)"""
            # 体素降采样
            source_down = source.voxel_down_sample(0.005)
            target_down = target.voxel_down_sample(0.005)
    
            source_down.estimate_normals(
                o3d.geometry.KDTreeSearchParamHybrid(radius=0.01, max_nn=30)
            )
            target_down.estimate_normals(
                o3d.geometry.KDTreeSearchParamHybrid(radius=0.01, max_nn=30)
            )
    
            result = o3d.pipelines.registration.registration_colored_icp(
                source_down, target_down, 0.02, init_transform,
                o3d.pipelines.registration.
                    TransformationEstimationForColoredICP(),
                o3d.pipelines.registration.
                    ICPConvergenceCriteria(
                        relative_fitness=1e-6, relative_rmse=1e-6,
                        max_iteration=100
                    )
            )
            "kw">class="kw">return result.transformation

    2.4 抓取姿态生成

    "kw">class GraspGenerator:
        """从物体位姿生成抓取候选"""
    
        "kw">def __init__("kw">self, gripper_width=0.08):
            "kw">self.gripper_width = gripper_width
    
        "kw">def generate_top_grasps("kw">self, object_pose, n_angles=8):
            """生成顶部抓取候选"""
            grasps = []
            center = object_pose[:3, 3]
    
            "kw">for i "kw">in "kw">range(n_angles):
                angle = 2 * np.pi * i / n_angles
                # 旋转绕z轴的抓取姿态
                R_grasp = np.array([
                    [np.cos(angle), -np.sin(angle), 0],
                    [np.sin(angle),  np.cos(angle), 0],
                    [0,              0,             1]
                ])
    
                T_grasp = np.eye(4)
                T_grasp[:3, :3] = R_grasp
                T_grasp[:3, 3] = center + np.array([0, 0, 0.1])  # 上方10cm接近
    
                grasps.append(T_grasp)
    
            "kw">class="kw">return grasps
    
        "kw">def score_grasp("kw">self, grasp_pose, object_pcd, table_pcd):
            """启发式抓取评分"""
            score = 0.0
    
            # 1. 接近方向:抓取器Z轴应接近垂直
            approach_dir = grasp_pose[:3, 2]
            gravity_dir = np.array([0, 0, -1])
            alignment = abs(np.dot(approach_dir, gravity_dir))
            score += alignment * 3
    
            # 2. 远离桌面碰撞
            grasp_height = grasp_pose[2, 3]
            "kw">if grasp_height > 0.05:  # 高于桌面5cm
                score += 2
    
            # 3. 物体中心距离
            obj_center = np.mean(np.asarray(object_pcd.points), axis=0)
            dist_to_center = np.linalg.norm(grasp_pose[:3, 3] - obj_center)
            score += np.exp(-dist_to_center * 10) * 2
    
            "kw">class="kw">return score

    2.5 端到端位姿估计流水线

    "kw">class PoseEstimationPipeline:
        """完整6D位姿估计流水线"""
    
        "kw">def __init__("kw">self, model_paths, voxel_size=0.005):
            """
            model_paths: dict {物体名: 模型点云路径}
            """
            "kw">self.processor = PointCloudProcessor(voxel_size)
            "kw">self.global_reg = GlobalRegistrator(voxel_size)
            "kw">self.icp = ICPRefiner()
    
            # 加载物体模型并预处理
            "kw">self.models = {}
            "kw">for name, path "kw">in model_paths.items():
                model = o3d.io.read_point_cloud(path)
                model = "kw">self.processor.preprocess(model)
                # 模型可能尺度不一致,归一化到单位球内
                bbox = model.get_axis_aligned_bounding_box()
                scale = 1.0 / max(bbox.get_extent())
                model.scale(scale, center=model.get_center())
                "kw">self.models[name] = model
    
            "kw">print(f"加载了 {"kw">len("kw">self.models)} 个物体模型")
    
        "kw">def estimate("kw">self, scene_pcd, target_objects="kw">None):
            """
            估计场景中物体的6D位姿
            返回: list of {name, T, fitness, rmse}
            """
            "kw">if target_objects "kw">is "kw">None:
                target_objects = list("kw">self.models.keys())
    
            # 预处理
            scene_pcd = "kw">self.processor.preprocess(scene_pcd)
    
            # 分割物体
            _, objects_pcd = "kw">self.processor.segment_plane_and_objects(scene_pcd)
            clusters = "kw">self.processor.cluster_objects(objects_pcd)
    
            results = []
            "kw">for cluster "kw">in clusters:
                best_fitness = 0
                best_result = "kw">None
    
                "kw">for name "kw">in target_objects:
                    model = "kw">self.models[name]
    
                    # 全局配准
                    T_ransac, ransac_result = "kw">self.global_reg.register(
                        model, cluster
                    )
    
                    # ICP精炼
                    T_refined = "kw">self.icp.refine(model, cluster, T_ransac)
    
                    # 评估
                    source_temp = o3d.geometry.PointCloud(model)
                    source_temp.transform(T_refined)
                    eval_result = o3d.pipelines.registration.evaluate_registration(
                        source_temp, cluster, 0.01, T_refined
                    )
    
                    "kw">if eval_result.fitness > best_fitness:
                        best_fitness = eval_result.fitness
                        best_result = {
                            'name': name,
                            'transform': T_refined,
                            'fitness': eval_result.fitness,
                            'rmse': eval_result.inlier_rmse,
                        }
    
                "kw">if best_result "kw">and best_result['fitness'] > 0.3:
                    results.append(best_result)
                    "kw">print(f"物体: {best_result['name']}, "
                          f"fitness={best_result['fitness']:.3f}")
    
            "kw">class="kw">return results

    ---


    6. 3. 评估指标

    "kw">def evaluate_pose(estimated, ground_truth):
        """评估位姿估计精度"""
        R_est, t_est = estimated[:3,:3], estimated[:3,3]
        R_gt, t_gt = ground_truth[:3,:3], ground_truth[:3,3]
    
        # 平移误差
        translation_error = np.linalg.norm(t_est - t_gt)
    
        # 旋转误差 (角度)
        R_diff = R_est @ R_gt.T
        # 从旋转矩阵提取角度
        trace = np.clip((np.trace(R_diff) - 1) / 2, -1, 1)
        rotation_error = np.arccos(trace) * 180 / np.pi
    
        # ADD-S指标 (Average Distance of model points)
        # 用模型点云采样计算
    
        "kw">class="kw">return {
            'translation_error_mm': translation_error * 1000,
            'rotation_error_deg': rotation_error,
        }

    ---


    7. 4. 验收标准

  • 点云预处理:能正确降采样、去噪、分割桌面和物体(IoU > 0.85)
  • 位姿估计:在BOP挑战格式的数据集上,ADD-S < 2cm 且 < 5° 的物体 > 70%
  • ICP收敛:粗配准后的ICP能在30步内收敛,最终RMSE < 5mm
  • 端到端Demo:任给一张RGB-D图像 → 输出物体类型和6D位姿 → 可视化
  • ---


    8. 5. 数据集

  • BOP Challenge: https://bop.felk.cvut.cz/datasets/
  • YCB-Video: 21个日常物体的视频序列
  • LineMOD: 15个无纹理物体的标准基准
  • 9. 参考资源

  • Open3D Tutorial
  • BOP Toolkit
  • ICP Paper