π0 把大语言模型的「预训练 + 微调」范式搬到机器人领域:先在海量多任务、多机器人数据上预训练,再在小量下游数据上微调到具体任务。目标是训练一个通用机器人基础策略,能处理折衣服、收拾桌子、装盒子等复杂长程任务。
| 方法 | 痛点 |
|---|---|
| 离散化动作 token + 自回归(RT-2 / OpenVLA) | 动作维度大时 token 数爆炸(双臂 14 维);量化损失精度;逐 token 解码慢 |
| Diffusion 扩散模型生成动作 | 训练目标复杂,推理需要很多步去噪,实时性差 |
π0 选择 Flow Matching 的理由:
| 组件 | 配置 | 参数量 |
|---|---|---|
| VLM 主干 (Expert 1) | width=2048, depth=18, GQA(1 KV head) | ~2B |
| Action Expert (Expert 2) | width=1024, depth=18 | ~311M |
| 图像编码器 | SigLIP-So400m/14, 224×224 | — |
| 总参数量 | PaliGemma + Action Expert | ~3.3B |
[图像 tokens, 语言 tokens] → 全双向[状态 token] → 对前面因果可见[噪声动作 tokens × action_horizon] → 对前面因果可见,block 内互相可见| 字段 | 说明 | 形状 |
|---|---|---|
image/base_0_rgb | 第三视角相机 RGB | (224, 224, 3) |
image/left_wrist_0_rgb | 左手腕相机(可选) | (224, 224, 3) |
image/right_wrist_0_rgb | 右手腕相机(可选) | (224, 224, 3) |
state | 机器人本体状态(关节角+夹爪) | (s,) |
tokenized_prompt | 语言指令 token id | (l,) |
| 输出 | 动作块(action chunk) | (action_horizon, action_dim) |
Action Horizon(动作时域):π0 默认 50 步,DROID 配置 10 步。Action Dim 因平台不同:DROID=8,ALOHA 双臂=14。
# pi0.py → compute_loss
# 1. 采样噪声和流匹配时间步 τ
noise = randn(actions.shape)
time = beta(1.5, 1) * 0.999 + 0.001 # β(1.5,1)采样,偏向高 τ
# 2. 构造中间状态动作 x_τ(线性插值路径)
x_t = time * noise + (1 - time) * actions # τ=0 干净动作,τ=1 纯噪声
# 3. 回归目标 = 速度场 u_t = noise - actions
u_t = noise - actions
# 4. 一次前向:前缀(图+语言)+后缀(x_τ, τ) 送入 Transformer
v_t = model(observation, x_t, time) # 模型预测速度场
# 5. 损失 = MSE(v_t, u_t)
loss = mean(square(v_t - u_t), axis=-1)
# pi0.py → sample_actions
dt = -1.0 / num_steps # num_steps=10
x = randn(batch, action_horizon, action_dim) # 从纯噪声出发
# ★ 第 1 步:前缀前向,缓存 KV cache(图像+语言只算一次!)
kv_cache = model.forward_prefix(observation)
# ★ 第 2~10 步:仅重算后缀(动作 token),复用 KV cache
for step in range(10):
v_t = model.forward_suffix(x, t, kv_cache)
x = x + dt * v_t # 欧拉步
t = t + dt
return x # x_0 = 生成的动作块
| 指标 | 数值 |
|---|---|
| 预训练数据 | 10,000+ 小时机器人数据 |
| 私有数据覆盖 | 7 种机器人构型,68 个任务(单臂/双臂/移动) |
| 开源数据 | Open X-Embodiment (OXE) 混合 |
| 数据加权 | 权重 = n^0.43(缓解长尾不平衡) |
| 预训练步数 | 700,000 步(主模型) |
openpi/
├── src/openpi/ # 核心库
│ ├── models/ # ★ 模型实现
│ │ ├── pi0.py # π0 主模型(flow matching 核心)
│ │ ├── pi0_config.py # Pi0Config(action_dim/horizon)
│ │ ├── pi0_fast.py # π0-FAST(自回归 + FAST tokenizer)
│ │ ├── gemma.py # ★ 双专家 Gemma Transformer + SigLIP
│ │ ├── model.py # Observation / Actions 数据结构
│ │ └── lora.py # LoRA 适配器
│ ├── policies/ # ★ 策略封装(输入输出映射)
│ │ ├── policy.py # Policy.infer() 推理管线
│ │ └── policy_config.py # create_trained_policy()
│ ├── training/
│ │ ├── config.py # ★★ 所有 TrainConfig 注册表
│ │ ├── data_loader.py # LeRobot / RLDS 数据加载
│ │ └── checkpoints.py # checkpoint + norm_stats
│ ├── serving/ # 推理服务(websocket)
│ └── transforms.py # 数据变换管线
├── scripts/
│ ├── train.py # ★ JAX 训练入口
│ ├── train_pytorch.py # ★ PyTorch 训练入口
│ ├── serve_policy.py # ★ 启动推理服务器
│ └── compute_norm_stats.py # ★ 计算归一化统计量
├── packages/openpi-client/ # ★ 轻量客户端(机器人端安装)
└── examples/ # 各平台示例(DROID/ALOHA/LIBERO/UR5)
| 模块 | 职责 |
|---|---|
models/pi0.py | embed_prefix(图+语言)、embed_suffix(状态+噪声动作+τ)、compute_loss、sample_actions |
models/gemma.py | 双专家 Gemma:一个 Module 接受 configs=[2b_config, 300m_config],两套权重通过 attention 交互 |
training/config.py | 所有训练配置注册表,get_config("pi05_droid") 按名取配置 |
policies/policy.py | Policy.infer(obs):input_transforms → sample_actions → output_transforms |
transforms.py | Resize、归一化、tokenize 等数据预处理管线 |
uv run examples/libero/convert_libero_data_to_lerobot.py --data_dir /path/to/your/data
你的数据需包含:图像帧、本体状态、动作、语言指令。
在 training/config.py 的 _CONFIGS 中添加配置:
TrainConfig(
name="pi05_my_robot",
model=pi0_config.Pi0Config(pi05=True, action_horizon=10),
data=LeRobotLiberoDataConfig(repo_id="your_org/your_dataset"),
batch_size=256,
lr_schedule=CosineDecaySchedule(warmup_steps=10_000, peak_lr=5e-5),
ema_decay=0.999,
weight_loader=CheckpointWeightLoader("gs://openpi-assets/checkpoints/pi05_base"),
num_train_steps=30_000,
)
# (a) 计算归一化统计量(必做)
uv run scripts/compute_norm_stats.py --config-name pi05_my_robot
# (b) 训练
XLA_PYTHON_CLIENT_MEM_FRACTION=0.9 uv run scripts/train.py pi05_my_robot \
--exp-name=my_experiment --overwrite
# 启动策略服务器
uv run scripts/serve_policy.py policy:checkpoint \
--policy.config=pi05_my_robot \
--policy.dir=checkpoints/pi05_my_robot/my_experiment/20000
model=pi0_config.Pi0Config(
paligemma_variant="gemma_2b_lora", # rank=16
action_expert_variant="gemma_300m_lora", # rank=32
)
LoRA 冻结主干、只训练低秩适配器,显存降至 22.5GB。
方式 A — 进程内推理(最简单)
from openpi.training import config as _config
from openpi.policies import policy_config
from openpi.shared import download
config = _config.get_config("pi05_droid")
ckpt = download.maybe_download("gs://openpi-assets/checkpoints/pi05_droid")
policy = policy_config.create_trained_policy(config, ckpt)
action_chunk = policy.infer(example)["actions"] # → (action_horizon, action_dim)
方式 B — 远程推理(推荐,机器人端零依赖)
# GPU 服务器端
uv run scripts/serve_policy.py --env DROID
# 机器人端(pip install openpi-client)
from openpi_client import websocket_client_policy, image_tools
client = websocket_client_policy.WebsocketClientPolicy(host="GPU_IP", port=8000)
observation = {
"observation/image": image_tools.convert_to_uint8(
image_tools.resize_with_pad(img, 224, 224)),
"observation/state": state,
"prompt": "pick up the fork",
}
action_chunk = client.infer(observation)["actions"] # (action_horizon, action_dim)
git clone --recurse-submodules https://github.com/Physical-Intelligence/openpi.git
cd openpi
# 用 uv 管理依赖(需先装 uv: curl -LsSf https://astral.sh/uv/install.sh | sh)
GIT_LFS_SKIP_SMUDGE=1 uv sync
GIT_LFS_SKIP_SMUDGE=1 uv pip install -e .
from openpi.training import config as _config
from openpi.policies import policy_config
from openpi.shared import download
import numpy as np
# 1. 选配置 + 自动下载 checkpoint
config = _config.get_config("pi05_droid")
ckpt = download.maybe_download("gs://openpi-assets/checkpoints/pi05_droid")
# 2. 创建策略(自动组装 transforms + norm_stats)
policy = policy_config.create_trained_policy(config, ckpt)
# 3. 构造观测(DROID 格式)
example = {
"observation/exterior_image_1_left": np.random.randint(256, (224,224,3), dtype=np.uint8),
"observation/wrist_image_left": np.random.randint(256, (224,224,3), dtype=np.uint8),
"observation/joint_position": np.random.rand(7),
"observation/gripper_position": np.random.rand(1),
"prompt": "pick up the fork",
}
# 4. 推理 → 动作块
actions = policy.infer(example)["actions"]
print(actions.shape) # (action_horizon, action_dim) e.g. (15, 8)
uv run scripts/serve_policy.py --env DROID # 自动用 pi05_droid 默认 checkpoint
| 模式 | 显存 | 示例 GPU |
|---|---|---|
| 推理 | >8GB | RTX 4090 |
| 微调(LoRA) | >22.5GB | RTX 4090 |
| 微调(全量) | >70GB | A100 80GB / H100 |
多卡可用 FSDP(fsdp_devices=N)分摊显存。远程推理方案下机器人端无需 GPU。大规模 DROID 训练(100k 步)需 8×H100 约 2 天。
| 指标 | 数值 |
|---|---|
| 总参数量 | ~3.3B(PaliGemma 2B + Action Expert 300M) |
| 预训练数据 | 10,000+ 小时 |
| 机器人覆盖 | 7 种构型,68 个任务 |
| 预训练步数 | 700,000 步 |
| Action chunk 时域 | 默认 50 步(可配置) |
| Flow matching 推理步数 | 10 步欧拉积分 |
| 流匹配时间步采样 | Beta(1.5, 1) |
| 图像输入 | 最多 3 路相机(224×224,SigLIP-So400m/14) |
| 词表大小 | 257,152(PaliGemma) |