高级 · 6-8周

项目十:具身大模型综合项目

LLM任务规划+VLA执行+技能库映射+失败重规划——构建完整的语言指令到机器人执行的端到端闭环系统。

高级难度 预计6-8周 7个章节

1. 项目背景与顶层设计

为什么这是终极项目

这个项目把Phase 1-12所学的全部知识串联成一个闭环——LLM理解指令→分解为技能序列→VLA执行每项技能→视觉验证结果→失败重规划。这是目前具身智能领域最前沿的架构:让机器人不仅能执行预编程任务,还能理解新的自然语言指令并自主推理如何完成。

系统架构

用户: "把红色罐子放到蓝色盒子里"
    │
    ▼
┌──────────────────────────────────────────┐
│  Phase 1: LLM 任务规划 (GPT-4/Qwen)       │
│  输入: 场景描述 + 用户指令                  │
│  输出: JSON任务序列 + 前置条件              │
│  [{"skill":"find","target":"red_can"},     │
│   {"skill":"pick","target":"red_can"},     │
│   {"skill":"place","target":"blue_box"}]   │
└──────────────┬───────────────────────────┘
               │
               ▼
┌──────────────────────────────────────────┐
│  Phase 2: 技能库映射 + 参数化               │
│  find(red_can) → 视觉检测器("red can")     │
│  pick(red_can) → ACT策略(目标位姿)         │
│  place(blue_box) → 运动规划(目标位姿)       │
└──────────────┬───────────────────────────┘
               │
               ▼
┌──────────────────────────────────────────┐
│  Phase 3: 顺序执行 + 结果验证              │
│  for each skill:                          │
│    execute() → 视觉验证 → success?         │
│    if fail: retry(2次) or replan          │
└──────────────┬───────────────────────────┘
               │
               ▼
          机器人完成指令 ✓

学习目标

2. LLM任务规划器实现

import json, openai
from typing import List, Dict

class LLMTaskPlanner:
    """基于LLM的机器人任务规划器"""
    
    SKILL_LIBRARY = [
        "find(object)",       # 视觉搜索目标物体
        "pick(object)",       # 抓取物体
        "place(location)",    # 放置物体
        "push(object,dir)",   # 推动物体
        "open(drawer)",       # 打开抽屉
        "close(drawer)",      # 关闭抽屉
        "pour(source,target)",# 倒液体
        "wipe(surface)",      # 擦拭表面
    ]
    
    SYSTEM_PROMPT = f"""你是一个机器人任务规划器。你有以下技能可用:
{chr(10).join(f'- {s}' for s in SKILL_LIBRARY)}

场景中可见物体:{{objects}}

请将用户指令分解为技能序列。输出JSON格式:
{{"plan": [{{"skill": "skill_name", "target": "object", "params": {{}}, "precondition": "..."}}]}}

规则:
1. 每个技能必须有明确的前置条件
2. pick前必须先find
3. place前必须先有物体在手中(来自上一个pick)
4. 考虑任务可行性——如果找不到物体,返回error
"""
    
    def plan(self, instruction: str, scene_objects: List[str]) -> Dict:
        """将自然语言指令转换为任务序列"""
        
        prompt = self.SYSTEM_PROMPT.format(objects=', '.join(scene_objects))
        
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": prompt},
                {"role": "user", "content": instruction},
            ],
            temperature=0.0,  # 确定性输出
            response_format={"type": "json_object"},
        )
        
        plan = json.loads(response.choices[0].message.content)
        self._validate_plan(plan)
        return plan
    
    def _validate_plan(self, plan):
        """验证计划的逻辑正确性"""
        holding = None  # 当前手中物体
        
        for i, step in enumerate(plan['plan']):
            skill = step['skill']
            
            if skill == 'pick':
                if holding is not None:
                    raise ValueError(f"Step {i}: 手中有{holding},不能pick新物体")
                # 检查前一步是否为find
                if i == 0 or plan['plan'][i-1]['skill'] != 'find':
                    print(f"⚠️ Step {i}: pick缺少前置find")
                holding = step['target']
            
            elif skill == 'place':
                if holding is None:
                    raise ValueError(f"Step {i}: 手中无物体,不能place")
                holding = None

3. 技能库实现

class SkillLibrary:
    """机器人技能库——每个技能封装感知+规划+执行"""
    
    def __init__(self, robot, detectors, policies):
        self.robot = robot
        self.detectors = detectors  # {name: vision_model}
        self.policies = policies    # {name: (policy, config)}
    
    def find(self, target: str) -> dict:
        """视觉搜索目标物体,返回3D位姿"""
        detector = self.detectors.get('yolo')
        camera_img = self.robot.get_camera_image()
        depth_img = self.robot.get_depth_image()
        
        # YOLO检测 + 深度图→3D位姿
        detections = detector(camera_img)
        for det in detections:
            if det.label == target:
                u, v = det.center
                depth = depth_img[v, u]
                if depth > 0:
                    # 像素→3D(内参反投影)
                    point_3d = self.robot.pixel_to_3d(u, v, depth)
                    return {'success': True, 'position': point_3d, 'label': det.label}
        
        return {'success': False, 'reason': f'未检测到{target}'}
    
    def pick(self, target: str) -> dict:
        """抓取目标物体"""
        # 1. 先查找
        result = self.find(target)
        if not result['success']:
            return result
        
        # 2. 判断目标位姿是否可达
        if not self.robot.is_reachable(result['position']):
            return {'success': False, 'reason': '目标超出工作空间'}
        
        # 3. 调用VLA策略执行抓取
        policy = self.policies['pick']
        obs = self.robot.get_obs()
        action = policy(obs, target_position=result['position'])
        self.robot.execute(action)
        
        # 4. 验证:检查夹爪是否闭合(物体在手中)
        gripper_closed = self.robot.get_gripper_state() < 0.01
        return {'success': gripper_closed, 'holding': target if gripper_closed else None}
    
    def place(self, target: str) -> dict:
        """放置物体到目标位置"""
        # 1. 查找放置目标
        result = self.find(target)
        if not result['success']:
            return result
        
        # 2. 规划放置轨迹
        target_pos = result['position']
        # 在上方+安全距离处释放
        approach_pos = target_pos + np.array([0, 0, 0.15])
        place_pos = target_pos + np.array([0, 0, 0.02])  # 容器内2cm
        
        # 3. 执行运动+释放
        self.robot.move_to(approach_pos)
        self.robot.move_to(place_pos)
        self.robot.open_gripper()
        self.robot.move_to(approach_pos)
        
        return {'success': True}

4. 主执行循环——带重试和重规划

class TaskExecutor:
    """执行任务序列,带失败检测与重规划"""
    
    def __init__(self, planner, skill_lib, max_retries=2):
        self.planner = planner
        self.skills = skill_lib
        self.max_retries = max_retries
    
    def execute_instruction(self, instruction: str, scene_objects: list):
        """完整的指令→执行→验证→重规划循环"""
        
        # Step 1: LLM任务规划
        plan = self.planner.plan(instruction, scene_objects)
        print(f"📋 任务计划: {json.dumps(plan, indent=2, ensure_ascii=False)}")
        
        # Step 2: 顺序执行每个技能
        execution_log = []
        step_idx = 0
        
        while step_idx < len(plan['plan']):
            step = plan['plan'][step_idx]
            skill_name = step['skill']
            
            print(f"
🔧 执行 Step {step_idx+1}: {skill_name}({step['target']})")
            
            # Step 3: 调用技能
            skill_fn = getattr(self.skills, skill_name)
            
            success = False
            for retry in range(self.max_retries + 1):
                result = skill_fn(step['target'])
                execution_log.append({
                    'step': step_idx, 'skill': skill_name,
                    'target': step['target'], 'retry': retry,
                    'success': result['success'],
                    'details': result
                })
                
                if result['success']:
                    success = True
                    break
                
                print(f"  ❌ 失败 (retry {retry+1}/{self.max_retries}): {result.get('reason', 'unknown')}")
            
            if not success:
                # Step 4: 失败重规划
                print(f"  🔄 重规划: 技能{skill_name}失败3次")
                
                # 告诉LLM哪个技能失败了,让它重新规划剩余任务
                feedback = self._generate_feedback(plan, execution_log, step_idx)
                plan = self.planner.replan(plan, feedback)
                
                if plan is None:
                    print("  💥 任务无法完成,中止")
                    return {'success': False, 'log': execution_log}
                
                continue  # 从当前步重新开始
            
            step_idx += 1
        
        print(f"
✅ 任务完成!")
        return {'success': True, 'log': execution_log}

5. 端到端测试与评估

测试场景设计

场景指令难度预期成功率
简单抓取"把苹果拿起来"★☆☆☆☆> 90%
放置任务"把苹果放进红色碗里"★★☆☆☆> 75%
多步操作"把苹果放进碗里,再把碗推到桌子左边"★★★★☆> 50%
失败恢复遮挡目标→"把苹果放进碗里"★★★★☆> 40%(重规划后)
推理任务"把最重的那个方块放进盒子里"★★★★★> 30%

评估指标

class EmbodiedBenchmark:
    def evaluate(self, executor, test_cases):
        metrics = {
            'task_success_rate': [],
            'avg_steps_to_complete': [],
            'avg_retries_per_skill': [],
            'replan_rate': [],
        }
        for case in test_cases:
            result = executor.execute_instruction(case.instruction, case.objects)
            metrics['task_success_rate'].append(result['success'])
            # ... 计算其他指标
        
        for k, v in metrics.items():
            print(f"{k}: {np.mean(v)*100:.1f}%")

验收标准