进阶 · 3-4周
项目五:RGB-D物体6D位姿估计
构建点云处理流水线:滤波、配准、ICP精炼,输出可供机械臂抓取的6D候选姿态
1. 项目概述
构建基于RGB-D的物体位姿估计流水线,用于机器人抓取:
2. 难度:★★★★☆ (4/5)
3. 预估时间:3-4周
---
4. 1. 项目结构
rgbd_pose/
├── data/
│ ├── models/ # 物体CAD模型 (ply/obj)
│ ├── scenes/ # 场景点云
│ └── results/ # 评估结果
├── src/
│ ├── preprocess.py # 点云预处理
│ ├── global_reg.py # 全局粗配准
│ ├── icp_refine.py # ICP精配准
│ ├── grasp_gen.py # 抓取姿态生成
│ ├── evaluate.py # 评估脚本
│ └── visualize.py # 可视化
├── config/
│ └── params.yaml
└── requirements.txt
---
5. 2. 核心代码
2.1 点云预处理
"kw">class="kw">import open3d "kw">as o3d
"kw">class="kw">import numpy "kw">as np
"kw">class PointCloudProcessor:
"""RGB-D点云预处理流水线"""
"kw">def __init__("kw">self, voxel_size=0.005):
"kw">self.voxel_size = voxel_size
"kw">def load_rgbd_to_pointcloud("kw">self, rgb_path, depth_path, K, depth_scale=1000.0):
"""从RGB-D图像生成点云"""
color = o3d.io.read_image(rgb_path)
depth = o3d.io.read_image(depth_path)
rgbd = o3d.geometry.RGBDImage.create_from_color_and_depth(
color, depth,
depth_scale=depth_scale,
depth_trunc=3.0,
convert_rgb_to_intensity="kw">False
)
intrinsics = o3d.camera.PinholeCameraIntrinsic(
width=color.width, height=color.height,
fx=K[0,0], fy=K[1,1], cx=K[0,2], cy=K[1,2]
)
"kw">class="kw">return o3d.geometry.PointCloud.create_from_rgbd_image(
rgbd, intrinsics
)
"kw">def preprocess("kw">self, pcd, crop_box="kw">None):
"""预处理流水线"""
# 1. 裁剪工作空间
"kw">if crop_box "kw">is "kw">not "kw">None:
pcd = pcd.crop(
o3d.geometry.AxisAlignedBoundingBox(*crop_box)
)
# 2. 降采样(保持几何结构)
pcd = pcd.voxel_down_sample("kw">self.voxel_size)
# 3. 统计滤波去除离群点
pcd, _ = pcd.remove_statistical_outlier(
nb_neighbors=30, std_ratio=2.0
)
# 4. 估计法向量
pcd.estimate_normals(
o3d.geometry.KDTreeSearchParamHybrid(
radius="kw">self.voxel_size * 2, max_nn=30
)
)
# 5. 法向量一致性定向(朝向相机)
pcd.orient_normals_towards_camera_location(
np.array([0, 0, 0])
)
"kw">class="kw">return pcd
"kw">def segment_plane_and_objects("kw">self, pcd, distance_threshold=0.01,
ransac_n=3, num_iterations=1000):
"""RANSAC平面分割:分离桌面和物体"""
# 分割最大平面(桌面)
plane_model, inliers = pcd.segment_plane(
distance_threshold=distance_threshold,
ransac_n=ransac_n,
num_iterations=num_iterations
)
[a, b, c, d] = plane_model
"kw">print(f"平面方程: {a:.3f}x + {b:.3f}y + {c:.3f}z + {d:.3f} = 0")
# 桌面点云和物体点云
plane_pcd = pcd.select_by_index(inliers)
objects_pcd = pcd.select_by_index(inliers, invert="kw">True)
"kw">class="kw">return plane_pcd, objects_pcd
"kw">def cluster_objects("kw">self, objects_pcd, eps=0.02, min_points=100):
"""DBSCAN聚类:分离单个物体"""
labels = np.array(
objects_pcd.cluster_dbscan(eps=eps, min_points=min_points)
)
clusters = []
"kw">for label "kw">in np.unique(labels):
"kw">if label == -1: # 噪声
"kw">class="kw">continue
cluster = objects_pcd.select_by_index(
np.where(labels == label)[0]
)
clusters.append(cluster)
"kw">print(f"找到 {"kw">len(clusters)} 个物体簇")
"kw">class="kw">return clusters
2.2 全局配准 (FPFH + RANSAC)
"kw">class GlobalRegistrator:
"""FPFH特征 + RANSAC全局粗配准"""
"kw">def __init__("kw">self, voxel_size=0.005):
"kw">self.voxel_size = voxel_size
"kw">def compute_fpfh("kw">self, pcd):
"""计算FPFH特征"""
radius_normal = "kw">self.voxel_size * 2
radius_feature = "kw">self.voxel_size * 5
pcd.estimate_normals(
o3d.geometry.KDTreeSearchParamHybrid(
radius=radius_normal, max_nn=30
)
)
fpfh = o3d.pipelines.registration.compute_fpfh_feature(
pcd,
o3d.geometry.KDTreeSearchParamHybrid(
radius=radius_feature, max_nn=100
)
)
"kw">class="kw">return fpfh
"kw">def register("kw">self, source_pcd, target_pcd):
"""FPFH + RANSAC 全局配准"""
source_fpfh = "kw">self.compute_fpfh(source_pcd)
target_fpfh = "kw">self.compute_fpfh(target_pcd)
distance_threshold = "kw">self.voxel_size * 1.5
result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
source_pcd, target_pcd,
source_fpfh, target_fpfh,
mutual_filter="kw">True,
max_correspondence_distance=distance_threshold,
estimation_method=o3d.pipelines.registration.
TransformationEstimationPointToPoint("kw">False),
ransac_n=3,
checkers=[
o3d.pipelines.registration.
CorrespondenceCheckerBasedOnEdgeLength(0.9),
o3d.pipelines.registration.
CorrespondenceCheckerBasedOnDistance(distance_threshold),
],
criteria=o3d.pipelines.registration.
RANSACConvergenceCriteria(1000000, 500)
)
"kw">print(f"RANSAC配准: fit={result.fitness:.4f}, rmse={result.inlier_rmse:.4f}")
"kw">class="kw">return result.transformation, result
2.3 ICP精配准
"kw">class ICPRefiner:
"""ICP多阶段精配准"""
"kw">def __init__("kw">self):
"kw">self.results = []
"kw">def refine("kw">self, source, target, init_transform,
max_correspondence_distances=[0.05, 0.02, 0.01, 0.005]):
"""多分辨率ICP"""
current_transform = init_transform
"kw">for i, dist "kw">in enumerate(max_correspondence_distances):
reg = o3d.pipelines.registration.registration_icp(
source, target, dist, current_transform,
o3d.pipelines.registration.
TransformationEstimationPointToPoint(),
o3d.pipelines.registration.
ICPConvergenceCriteria(
relative_fitness=1e-6,
relative_rmse=1e-6,
max_iteration=100
)
)
current_transform = reg.transformation
"kw">self.results.append({
'level': i,
'distance_threshold': dist,
'fitness': reg.fitness,
'inlier_rmse': reg.inlier_rmse,
})
"kw">print(f" Level {i}: dist={dist:.3f}, "
f"fitness={reg.fitness:.4f}, rmse={reg.inlier_rmse:.4f}")
"kw">class="kw">return current_transform
"kw">def colored_icp("kw">self, source, target, init_transform):
"""彩色ICP(利用颜色信息)"""
# 体素降采样
source_down = source.voxel_down_sample(0.005)
target_down = target.voxel_down_sample(0.005)
source_down.estimate_normals(
o3d.geometry.KDTreeSearchParamHybrid(radius=0.01, max_nn=30)
)
target_down.estimate_normals(
o3d.geometry.KDTreeSearchParamHybrid(radius=0.01, max_nn=30)
)
result = o3d.pipelines.registration.registration_colored_icp(
source_down, target_down, 0.02, init_transform,
o3d.pipelines.registration.
TransformationEstimationForColoredICP(),
o3d.pipelines.registration.
ICPConvergenceCriteria(
relative_fitness=1e-6, relative_rmse=1e-6,
max_iteration=100
)
)
"kw">class="kw">return result.transformation
2.4 抓取姿态生成
"kw">class GraspGenerator:
"""从物体位姿生成抓取候选"""
"kw">def __init__("kw">self, gripper_width=0.08):
"kw">self.gripper_width = gripper_width
"kw">def generate_top_grasps("kw">self, object_pose, n_angles=8):
"""生成顶部抓取候选"""
grasps = []
center = object_pose[:3, 3]
"kw">for i "kw">in "kw">range(n_angles):
angle = 2 * np.pi * i / n_angles
# 旋转绕z轴的抓取姿态
R_grasp = np.array([
[np.cos(angle), -np.sin(angle), 0],
[np.sin(angle), np.cos(angle), 0],
[0, 0, 1]
])
T_grasp = np.eye(4)
T_grasp[:3, :3] = R_grasp
T_grasp[:3, 3] = center + np.array([0, 0, 0.1]) # 上方10cm接近
grasps.append(T_grasp)
"kw">class="kw">return grasps
"kw">def score_grasp("kw">self, grasp_pose, object_pcd, table_pcd):
"""启发式抓取评分"""
score = 0.0
# 1. 接近方向:抓取器Z轴应接近垂直
approach_dir = grasp_pose[:3, 2]
gravity_dir = np.array([0, 0, -1])
alignment = abs(np.dot(approach_dir, gravity_dir))
score += alignment * 3
# 2. 远离桌面碰撞
grasp_height = grasp_pose[2, 3]
"kw">if grasp_height > 0.05: # 高于桌面5cm
score += 2
# 3. 物体中心距离
obj_center = np.mean(np.asarray(object_pcd.points), axis=0)
dist_to_center = np.linalg.norm(grasp_pose[:3, 3] - obj_center)
score += np.exp(-dist_to_center * 10) * 2
"kw">class="kw">return score
2.5 端到端位姿估计流水线
"kw">class PoseEstimationPipeline:
"""完整6D位姿估计流水线"""
"kw">def __init__("kw">self, model_paths, voxel_size=0.005):
"""
model_paths: dict {物体名: 模型点云路径}
"""
"kw">self.processor = PointCloudProcessor(voxel_size)
"kw">self.global_reg = GlobalRegistrator(voxel_size)
"kw">self.icp = ICPRefiner()
# 加载物体模型并预处理
"kw">self.models = {}
"kw">for name, path "kw">in model_paths.items():
model = o3d.io.read_point_cloud(path)
model = "kw">self.processor.preprocess(model)
# 模型可能尺度不一致,归一化到单位球内
bbox = model.get_axis_aligned_bounding_box()
scale = 1.0 / max(bbox.get_extent())
model.scale(scale, center=model.get_center())
"kw">self.models[name] = model
"kw">print(f"加载了 {"kw">len("kw">self.models)} 个物体模型")
"kw">def estimate("kw">self, scene_pcd, target_objects="kw">None):
"""
估计场景中物体的6D位姿
返回: list of {name, T, fitness, rmse}
"""
"kw">if target_objects "kw">is "kw">None:
target_objects = list("kw">self.models.keys())
# 预处理
scene_pcd = "kw">self.processor.preprocess(scene_pcd)
# 分割物体
_, objects_pcd = "kw">self.processor.segment_plane_and_objects(scene_pcd)
clusters = "kw">self.processor.cluster_objects(objects_pcd)
results = []
"kw">for cluster "kw">in clusters:
best_fitness = 0
best_result = "kw">None
"kw">for name "kw">in target_objects:
model = "kw">self.models[name]
# 全局配准
T_ransac, ransac_result = "kw">self.global_reg.register(
model, cluster
)
# ICP精炼
T_refined = "kw">self.icp.refine(model, cluster, T_ransac)
# 评估
source_temp = o3d.geometry.PointCloud(model)
source_temp.transform(T_refined)
eval_result = o3d.pipelines.registration.evaluate_registration(
source_temp, cluster, 0.01, T_refined
)
"kw">if eval_result.fitness > best_fitness:
best_fitness = eval_result.fitness
best_result = {
'name': name,
'transform': T_refined,
'fitness': eval_result.fitness,
'rmse': eval_result.inlier_rmse,
}
"kw">if best_result "kw">and best_result['fitness'] > 0.3:
results.append(best_result)
"kw">print(f"物体: {best_result['name']}, "
f"fitness={best_result['fitness']:.3f}")
"kw">class="kw">return results
---
6. 3. 评估指标
"kw">def evaluate_pose(estimated, ground_truth):
"""评估位姿估计精度"""
R_est, t_est = estimated[:3,:3], estimated[:3,3]
R_gt, t_gt = ground_truth[:3,:3], ground_truth[:3,3]
# 平移误差
translation_error = np.linalg.norm(t_est - t_gt)
# 旋转误差 (角度)
R_diff = R_est @ R_gt.T
# 从旋转矩阵提取角度
trace = np.clip((np.trace(R_diff) - 1) / 2, -1, 1)
rotation_error = np.arccos(trace) * 180 / np.pi
# ADD-S指标 (Average Distance of model points)
# 用模型点云采样计算
"kw">class="kw">return {
'translation_error_mm': translation_error * 1000,
'rotation_error_deg': rotation_error,
}
---
7. 4. 验收标准
---