真实机器人部署

Jetson边缘部署、TensorRT优化、ROS 2 QoS实时配置、Sim2Real系统性迁移——把仿真中的代码真正跑在铁疙瘩上。

4-6周
8 个章节
代码示例
验收实验

本阶段目录

  1. 边缘部署
  2. Sim2Real迁移

1. 边缘设备部署——Jetson与树莓派

机械类比

你把一台西门子PLC装进控制柜时,需要考虑供电、散热、I/O接口、实时性。边缘AI部署同理——Jetson Orin就是你的"AI PLC",需要TensorRT优化、Docker容器、ROS 2节点生命周期管理。

模型优化流水线:PyTorch → ONNX → TensorRT

# 导出ONNX
dummy_input = torch.randn(1, 3, 224, 224).cuda()
torch.onnx.export(model, dummy_input, "policy.onnx", opset_version=17)
# 转TensorRT (Jetson上执行)
# trtexec --onnx=policy.onnx --saveEngine=policy.trt --fp16
# FP16→速度提升2-4倍,精度损失 < 0.5%

ROS 2 QoS实时配置

参数控制场景传感器场景
ReliabilityRELIABLE(不丢包)BEST_EFFORT(允许丢旧数据)
DurabilityTRANSIENT_LOCAL(新订阅者收到历史)VOLATILE(只收新数据)
Depth1(只关心最新命令)10(缓冲少量历史)

2. Sim2Real迁移实战

核心挑战

仿真策略放到真机上往往差很多。原因:物理参数不准(摩擦、电机延迟)、视觉域差异、状态观测噪声。你的机械背景是巨大优势——你天然理解"理想模型"和"真实系统"之间的差距。

# Sim2Real迁移检查清单
checklist = {{
    "标定": ["相机内参已标定?", "手眼标定精度<2mm?"],
    "动力学": ["关节摩擦系数已测量?", "电机力矩常数已验证?"],
    "感知": ["图像做了domain adaptation?", "深度图噪声匹配仿真?"],
    "控制": ["控制频率>50Hz?", "通讯延迟<10ms?"],
    "安全": ["紧急停止已测试?", "力/力矩限制已配置?"],
}}

验收标准

  • 在Jetson上部署推理延迟 < 50ms的策略模型
  • 完成至少一次Sim2Real迁移,量化仿真与真机的成功率gap
  • 分析gap来源(感知/控制/动力学各占百分比),形成迁移报告

3. ROS 2实时控制与QoS深度配置

实时性需求

机器人控制对实时性有严格要求:位置控制环 > 100Hz,力控制 > 500Hz,视觉伺服 > 30Hz。ROS 2的QoS和executor配置直接决定了你能否达到这些频率。

最优QoS配置

from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy, HistoryPolicy

# 控制命令: 必须可靠传输
control_qos = QoSProfile(
    depth=1,
    reliability=ReliabilityPolicy.RELIABLE,   # 不能丢包!
    durability=DurabilityPolicy.TRANSIENT_LOCAL, # 后来订阅者也收到最新命令
)

# 传感器数据: 允许丢弃旧数据以保持低延迟
sensor_qos = QoSProfile(
    depth=5,
    reliability=ReliabilityPolicy.BEST_EFFORT,   # 新数据优先
    durability=DurabilityPolicy.VOLATILE,
)
🔧 工程连接:你在设计PLC控制系统时要考虑扫描周期和I/O刷新率。ROS 2的QoS配置就是"数字版"的扫描周期和信号优先级设定。

4. 控制器增益整定——从仿真到真机

从机械调试到控制整定

你调试过机械系统的阻尼和刚度——控制器增益整定是同一套思维。PID三个增益的物理意义:

增益物理作用过高后果机械类比
$K_p$响应速度超调/振荡弹簧刚度
$K_d$阻尼/稳定放大传感器噪声减震器
$K_i$消除稳态误差积分饱和持续偏置力
class AdaptivePID:
    def __init__(self, Kp, Kd, Ki, max_output):
        self.Kp, self.Kd, self.Ki = Kp, Kd, Ki
        self.max_output = max_output
        self.prev_error = self.integral = 0
    
    def update(self, target, current, dt):
        error = target - current
        self.integral += error * dt
        derivative = (error - self.prev_error) / dt
        output = self.Kp * error + self.Kd * derivative + self.Ki * self.integral
        self.prev_error = error
        return np.clip(output, -self.max_output, self.max_output)

验收标准

  • 在真机上完成PID整定(位置误差 < 0.5mm稳态)
  • Sim2Real迁移成功率从50%提升到 > 80%

5. TensorRT优化深度解析——从100ms到5ms的秘密

模型部署的性能瓶颈

PyTorch训练的模型在Jetson上直接推理:100ms。TensorRT优化后:5ms。这20倍的差距来自什么?

优化技术原理加速比精度损失
层融合Conv+BN+ReLU→单个CBR核1.5-2x
精度校准FP32→FP16/INT82-4x<1%
内核自动调优为特定GPU架构选择最优实现1.5-3x
动态张量内存预分配+复用显存1.2-1.5x
多流并行多输入batch并行推理接近线性
import tensorrt as trt
import numpy as np

class TensorRTPolicy:
    """TensorRT优化的策略模型——生产级推理引擎"""
    
    def __init__(self, engine_path):
        self.logger = trt.Logger(trt.Logger.WARNING)
        self.runtime = trt.Runtime(self.logger)
        
        with open(engine_path, 'rb') as f:
            self.engine = self.runtime.deserialize_cuda_engine(f.read())
        
        self.context = self.engine.create_execution_context()
        
        # 预分配输入/输出buffer(关键优化)
        self.inputs, self.outputs, self.bindings = [], [], []
        for binding in self.engine:
            shape = self.engine.get_binding_shape(binding)
            size = trt.volume(shape)
            dtype = trt.nptype(self.engine.get_binding_dtype(binding))
            
            # 在GPU上预分配(PyCUDA或cupy)
            buf = cuda.mem_alloc(size * dtype.itemsize)
            if self.engine.binding_is_input(binding):
                self.inputs.append((buf, dtype, shape))
            else:
                self.outputs.append((buf, dtype, shape))
            self.bindings.append(int(buf))
    
    def infer(self, input_data):
        """推理——<5ms的目标延迟"""
        # 拷贝输入到GPU
        input_buf = self.inputs[0][0]
        cuda.memcpy_htod(input_buf, input_data.ravel())
        
        # 执行推理
        self.context.execute_v2(self.bindings)
        
        # 拷贝输出回CPU
        output = np.empty(self.outputs[0][2], dtype=self.outputs[0][1])
        cuda.memcpy_dtoh(output, self.outputs[0][0])
        return output

# INT8量化的关键:校准数据集
def build_int8_calibrator(calib_data, cache_file='calibration.cache'):
    """INT8精度校准——需要代表性数据集"""
    class RobotCalibrator(trt.IInt8EntropyCalibrator2):
        def __init__(self):
            trt.IInt8EntropyCalibrator2.__init__(self)
            self.cache_file = cache_file
            self.batch_idx = 0
            self.data = calib_data
            self.device_input = cuda.mem_alloc(...)
        
        def get_batch(self, names):
            if self.batch_idx >= len(self.data):
                return None
            batch = self.data[self.batch_idx]
            cuda.memcpy_htod(self.device_input, batch)
            self.batch_idx += 1
            return [int(self.device_input)]
    
    return RobotCalibrator()
🔧 工程连接:TensorRT的层融合优化和你做减速器传动链优化一样——去掉中间的轴承座、直接用法兰连接,减少能量传递环节。INT8量化就是"用精度换速度"——和你选择IT7精度还是IT9精度一样,取决于应用需求。

6. Docker容器化——机器人软件的可复现部署

为什么机器人系统必须用Docker

机器人软件栈极其复杂:ROS 2 + CUDA + TensorRT + PyTorch + 各种驱动。不同机器人的Jetson可能运行不同版本的JetPack。Docker提供环境一致性——"在你的Jetson上能跑,在任何Jetson上都能跑"。

# Dockerfile for ROS 2 + PyTorch + TensorRT on Jetson
FROM nvcr.io/nvidia/l4t-ros:r36.3.0-ros2-humble

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    python3-pip python3-opencv libopenblas-dev \
    ros-humble-cv-bridge ros-humble-vision-msgs \
    ros-humble-rosbag2-storage-mcap

# 安装Python依赖
COPY requirements.txt /tmp/
RUN pip3 install --no-cache-dir -r /tmp/requirements.txt

# 安装PyTorch for Jetson(官方wheel)
RUN pip3 install --no-cache-dir \
    https://developer.download.nvidia.com/compute/redist/jp/v60/pytorch/...

# 复制ROS 2工作空间
COPY ros2_ws /opt/ros2_ws
RUN . /opt/ros/humble/setup.sh && \
    cd /opt/ros2_ws && colcon build --symlink-install

# 启动脚本
COPY entrypoint.sh /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]

Docker Compose for multi-node robotics

# docker-compose.yml — 多节点机器人系统
version: '3.8'

services:
  perception:
    build: ./perception
    network_mode: "host"
    runtime: nvidia
    environment:
      - ROS_DOMAIN_ID=42
    volumes:
      - /dev/video0:/dev/video0  # USB相机
      - ./config:/opt/config:ro

  control:
    build: ./control
    network_mode: "host"
    runtime: nvidia
    environment:
      - ROS_DOMAIN_ID=42
    privileged: true  # 实时优先级需要
    volumes:
      - /dev/ttyUSB0:/dev/ttyUSB0  # 机器人串口

  policy:
    build: ./policy
    network_mode: "host"
    runtime: nvidia
    environment:
      - ROS_DOMAIN_ID=42
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]

容器中的实时ROS 2

# 赋予容器实时调度权限
# docker run --privileged --ulimit rtprio=99 my_robot_image

# entrypoint.sh — 启动时配置实时性
echo -1 > /proc/sys/kernel/sched_rt_runtime_us  # 允许RT任务占用100% CPU
source /opt/ros/humble/setup.bash
source /opt/ros2_ws/install/setup.bash
exec ros2 launch my_robot_system bringup.launch.py

7. 机器人CI/CD——持续集成与持续部署

机器人CI/CD的特殊挑战

# .github/workflows/robot-ci.yml
name: Robot CI Pipeline

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

jobs:
  unit-tests:
    runs-on: ubuntu-22.04
    container:
      image: osrf/ros:humble-desktop
    steps:
      - uses: actions/checkout@v4
      - name: Build & Test
        run: |
          source /opt/ros/humble/setup.bash
          colcon build --symlink-install
          colcon test --event-handlers console_direct+
  
  simulation-tests:
    runs-on: ubuntu-22.04
    steps:
      - uses: actions/checkout@v4
      - name: Run Gazebo Simulation Tests
        run: |
          # 启动无头Gazebo + 运行测试场景
          python3 -m pytest tests/simulation/ --gazebo-headless
  
  policy-eval:
    runs-on: ubuntu-22.04
    steps:
      - uses: actions/checkout@v4
      - name: Evaluate Policy Success Rate
        run: |
          python3 eval_policy.py --n-episodes 100
          # 成功率 < 80% 则失败
          python3 check_threshold.py --metric success_rate --threshold 0.8

部署流水线——从代码到真机

Git Push → CI测试(单元+仿真) → 构建Docker镜像 → 推送到容器注册表 → Jetson拉取更新 → 健康检查 → 策略切换

# deploy.sh — 机器人端的自动更新脚本
#!/bin/bash
set -e

echo "[Deploy] Pulling latest image..."
docker pull registry.example.com/robot-policy:latest

echo "[Deploy] Running smoke test..."
if ! docker run --rm registry.example.com/robot-policy:latest \
    python3 -c "from policy import main; print('OK')"; then
    echo "[Deploy] Smoke test failed, aborting."
    exit 1
fi

echo "[Deploy] Gracefully switching policy..."
# 通过ROS 2 Service通知当前运行的策略节点切换
ros2 service call /policy_switch std_srvs/srv/Trigger

echo "[Deploy] Restarting control stack..."
docker-compose down control policy
docker-compose up -d control policy

验收标准

  • 在Jetson上完成TensorRT优化,推理延迟 < 10ms
  • 编写完整的Dockerfile和docker-compose,一键启动机器人系统
  • 搭建CI流水线:PR自动运行仿真测试+策略评估

8. 监控与日志——生产级机器人的运维

机器人系统的关键监控指标

类别指标告警阈值机械类比
实时性控制循环周期> 目标周期+20%PLC扫描超时
推理策略推理延迟> 控制周期的30%算法执行超限
温度GPU/CPU温度> 85°C电机温升
通信ROS 2丢包率> 1%通信误码率
内存系统内存使用率> 90%缓存溢出
import psutil, time, json
from datetime import datetime

class RobotMonitor:
    """轻量级机器人监控——记录CPU/内存/GPU/ROS话题统计"""
    
    def __init__(self, log_dir='/var/log/robot'):
        self.log_dir = log_dir
        self.metrics = []
    
    def sample(self):
        """采样当前系统状态"""
        sample = {
            'timestamp': datetime.now().isoformat(),
            'cpu_percent': psutil.cpu_percent(interval=None),
            'memory_percent': psutil.virtual_memory().percent,
            'disk_percent': psutil.disk_usage('/').percent,
        }
        
        # GPU温度(Jetson)
        try:
            with open('/sys/devices/gpu.0/temp') as f:
                sample['gpu_temp'] = int(f.read().strip())
        except:
            pass
        
        self.metrics.append(sample)
        return sample
    
    def flush(self):
        """持久化到磁盘"""
        log_file = f"{self.log_dir}/robot_metrics_{datetime.now():%Y%m%d}.jsonl"
        with open(log_file, 'a') as f:
            for m in self.metrics:
                f.write(json.dumps(m) + '
')
        self.metrics.clear()

延迟基准测试——你需要的工程数据

def benchmark_latency(pipeline, n_trials=1000):
    """测量端到端管线延迟"""
    latencies = {
        'image_capture': [],
        'inference': [],
        'control_output': [],
        'total': [],
    }
    
    for _ in range(n_trials):
        t0 = time.perf_counter()
        img = pipeline.capture_image()
        t1 = time.perf_counter()
        
        action = pipeline.inference(img)
        t2 = time.perf_counter()
        
        pipeline.send_command(action)
        t3 = time.perf_counter()
        
        latencies['image_capture'].append(t1 - t0)
        latencies['inference'].append(t2 - t1)
        latencies['control_output'].append(t3 - t2)
        latencies['total'].append(t3 - t0)
    
    # 统计报表:p50 / p95 / p99 / max
    for key, vals in latencies.items():
        vals = np.array(vals) * 1000  # → ms
        print(f"{key:20s} p50={np.percentile(vals,50):.1f}ms "
              f"p95={np.percentile(vals,95):.1f}ms p99={np.percentile(vals,99):.1f}ms")

最终验收

  • TensorRT部署推理延迟 < 10ms(p95)
  • Docker Compose一键启动完整机器人系统
  • CI自动运行仿真测试 — 成功率 < 80% 自动拦截合并
  • 监控系统记录CPU/内存/GPU/延迟指标,支持故障回溯
  • Sim2Real迁移成功率从初始的50%提升到 > 80%