高级 · 6-8周
项目十:具身大模型综合项目
LLM任务规划+VLA执行+技能库映射+失败重规划——构建完整的语言指令到机器人执行的端到端闭环系统。
LLM任务规划+VLA执行+技能库映射+失败重规划——构建完整的语言指令到机器人执行的端到端闭环系统。
这个项目把Phase 1-12所学的全部知识串联成一个闭环——LLM理解指令→分解为技能序列→VLA执行每项技能→视觉验证结果→失败重规划。这是目前具身智能领域最前沿的架构:让机器人不仅能执行预编程任务,还能理解新的自然语言指令并自主推理如何完成。
用户: "把红色罐子放到蓝色盒子里"
│
▼
┌──────────────────────────────────────────┐
│ Phase 1: LLM 任务规划 (GPT-4/Qwen) │
│ 输入: 场景描述 + 用户指令 │
│ 输出: JSON任务序列 + 前置条件 │
│ [{"skill":"find","target":"red_can"}, │
│ {"skill":"pick","target":"red_can"}, │
│ {"skill":"place","target":"blue_box"}] │
└──────────────┬───────────────────────────┘
│
▼
┌──────────────────────────────────────────┐
│ Phase 2: 技能库映射 + 参数化 │
│ find(red_can) → 视觉检测器("red can") │
│ pick(red_can) → ACT策略(目标位姿) │
│ place(blue_box) → 运动规划(目标位姿) │
└──────────────┬───────────────────────────┘
│
▼
┌──────────────────────────────────────────┐
│ Phase 3: 顺序执行 + 结果验证 │
│ for each skill: │
│ execute() → 视觉验证 → success? │
│ if fail: retry(2次) or replan │
└──────────────┬───────────────────────────┘
│
▼
机器人完成指令 ✓
import json, openai
from typing import List, Dict
class LLMTaskPlanner:
"""基于LLM的机器人任务规划器"""
SKILL_LIBRARY = [
"find(object)", # 视觉搜索目标物体
"pick(object)", # 抓取物体
"place(location)", # 放置物体
"push(object,dir)", # 推动物体
"open(drawer)", # 打开抽屉
"close(drawer)", # 关闭抽屉
"pour(source,target)",# 倒液体
"wipe(surface)", # 擦拭表面
]
SYSTEM_PROMPT = f"""你是一个机器人任务规划器。你有以下技能可用:
{chr(10).join(f'- {s}' for s in SKILL_LIBRARY)}
场景中可见物体:{{objects}}
请将用户指令分解为技能序列。输出JSON格式:
{{"plan": [{{"skill": "skill_name", "target": "object", "params": {{}}, "precondition": "..."}}]}}
规则:
1. 每个技能必须有明确的前置条件
2. pick前必须先find
3. place前必须先有物体在手中(来自上一个pick)
4. 考虑任务可行性——如果找不到物体,返回error
"""
def plan(self, instruction: str, scene_objects: List[str]) -> Dict:
"""将自然语言指令转换为任务序列"""
prompt = self.SYSTEM_PROMPT.format(objects=', '.join(scene_objects))
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": instruction},
],
temperature=0.0, # 确定性输出
response_format={"type": "json_object"},
)
plan = json.loads(response.choices[0].message.content)
self._validate_plan(plan)
return plan
def _validate_plan(self, plan):
"""验证计划的逻辑正确性"""
holding = None # 当前手中物体
for i, step in enumerate(plan['plan']):
skill = step['skill']
if skill == 'pick':
if holding is not None:
raise ValueError(f"Step {i}: 手中有{holding},不能pick新物体")
# 检查前一步是否为find
if i == 0 or plan['plan'][i-1]['skill'] != 'find':
print(f"⚠️ Step {i}: pick缺少前置find")
holding = step['target']
elif skill == 'place':
if holding is None:
raise ValueError(f"Step {i}: 手中无物体,不能place")
holding = None
class SkillLibrary:
"""机器人技能库——每个技能封装感知+规划+执行"""
def __init__(self, robot, detectors, policies):
self.robot = robot
self.detectors = detectors # {name: vision_model}
self.policies = policies # {name: (policy, config)}
def find(self, target: str) -> dict:
"""视觉搜索目标物体,返回3D位姿"""
detector = self.detectors.get('yolo')
camera_img = self.robot.get_camera_image()
depth_img = self.robot.get_depth_image()
# YOLO检测 + 深度图→3D位姿
detections = detector(camera_img)
for det in detections:
if det.label == target:
u, v = det.center
depth = depth_img[v, u]
if depth > 0:
# 像素→3D(内参反投影)
point_3d = self.robot.pixel_to_3d(u, v, depth)
return {'success': True, 'position': point_3d, 'label': det.label}
return {'success': False, 'reason': f'未检测到{target}'}
def pick(self, target: str) -> dict:
"""抓取目标物体"""
# 1. 先查找
result = self.find(target)
if not result['success']:
return result
# 2. 判断目标位姿是否可达
if not self.robot.is_reachable(result['position']):
return {'success': False, 'reason': '目标超出工作空间'}
# 3. 调用VLA策略执行抓取
policy = self.policies['pick']
obs = self.robot.get_obs()
action = policy(obs, target_position=result['position'])
self.robot.execute(action)
# 4. 验证:检查夹爪是否闭合(物体在手中)
gripper_closed = self.robot.get_gripper_state() < 0.01
return {'success': gripper_closed, 'holding': target if gripper_closed else None}
def place(self, target: str) -> dict:
"""放置物体到目标位置"""
# 1. 查找放置目标
result = self.find(target)
if not result['success']:
return result
# 2. 规划放置轨迹
target_pos = result['position']
# 在上方+安全距离处释放
approach_pos = target_pos + np.array([0, 0, 0.15])
place_pos = target_pos + np.array([0, 0, 0.02]) # 容器内2cm
# 3. 执行运动+释放
self.robot.move_to(approach_pos)
self.robot.move_to(place_pos)
self.robot.open_gripper()
self.robot.move_to(approach_pos)
return {'success': True}
class TaskExecutor:
"""执行任务序列,带失败检测与重规划"""
def __init__(self, planner, skill_lib, max_retries=2):
self.planner = planner
self.skills = skill_lib
self.max_retries = max_retries
def execute_instruction(self, instruction: str, scene_objects: list):
"""完整的指令→执行→验证→重规划循环"""
# Step 1: LLM任务规划
plan = self.planner.plan(instruction, scene_objects)
print(f"📋 任务计划: {json.dumps(plan, indent=2, ensure_ascii=False)}")
# Step 2: 顺序执行每个技能
execution_log = []
step_idx = 0
while step_idx < len(plan['plan']):
step = plan['plan'][step_idx]
skill_name = step['skill']
print(f"
🔧 执行 Step {step_idx+1}: {skill_name}({step['target']})")
# Step 3: 调用技能
skill_fn = getattr(self.skills, skill_name)
success = False
for retry in range(self.max_retries + 1):
result = skill_fn(step['target'])
execution_log.append({
'step': step_idx, 'skill': skill_name,
'target': step['target'], 'retry': retry,
'success': result['success'],
'details': result
})
if result['success']:
success = True
break
print(f" ❌ 失败 (retry {retry+1}/{self.max_retries}): {result.get('reason', 'unknown')}")
if not success:
# Step 4: 失败重规划
print(f" 🔄 重规划: 技能{skill_name}失败3次")
# 告诉LLM哪个技能失败了,让它重新规划剩余任务
feedback = self._generate_feedback(plan, execution_log, step_idx)
plan = self.planner.replan(plan, feedback)
if plan is None:
print(" 💥 任务无法完成,中止")
return {'success': False, 'log': execution_log}
continue # 从当前步重新开始
step_idx += 1
print(f"
✅ 任务完成!")
return {'success': True, 'log': execution_log}
| 场景 | 指令 | 难度 | 预期成功率 |
|---|---|---|---|
| 简单抓取 | "把苹果拿起来" | ★☆☆☆☆ | > 90% |
| 放置任务 | "把苹果放进红色碗里" | ★★☆☆☆ | > 75% |
| 多步操作 | "把苹果放进碗里,再把碗推到桌子左边" | ★★★★☆ | > 50% |
| 失败恢复 | 遮挡目标→"把苹果放进碗里" | ★★★★☆ | > 40%(重规划后) |
| 推理任务 | "把最重的那个方块放进盒子里" | ★★★★★ | > 30% |
class EmbodiedBenchmark:
def evaluate(self, executor, test_cases):
metrics = {
'task_success_rate': [],
'avg_steps_to_complete': [],
'avg_retries_per_skill': [],
'replan_rate': [],
}
for case in test_cases:
result = executor.execute_instruction(case.instruction, case.objects)
metrics['task_success_rate'].append(result['success'])
# ... 计算其他指标
for k, v in metrics.items():
print(f"{k}: {np.mean(v)*100:.1f}%")