Zhao Dongyu's Blog

A life which is unexamined is not worth living.

0%

CUDA 学习记录

Read more »

记录 Dynamo 学习实践 P/D 分离。

Read more »

学习C++模版完全是因为Nvidia。之前看一个Nvidia的视频,里面提到了要好好学习C++知识;最近学cutlass又用到了大量的C++模版;在知乎上看有人说看了 C++ Templates - The Complete Guide, 2nd Edition 之后神清气爽,惊叹还有这种操作。于是,C++ template,启动!

Read more »

rlt blog paper

VLA(Vision-Language-Action)模型已经可以“通用操作”,但最后一毫米(last millimeter)不行,比如动作变慢、需要反复试探、精细任务容易失败。

RL擅长优化关键细节,很容易得出用强化学习(RL)微调的解决方案。但是RL训练有个缺点: - RL训练大模型 → 很慢、很贵 - 小模型RL → 学得快但不聪明 怎么具备又能利用 VLA 的泛化能力,又具备 RL 的高效学习?

(1)从VLA中“抽取一个压缩表示”,也就是 RL token

(2)用小网络做RL 冻结VLA 训练:小 actor,小 critic 输入:RL token 和动作 输出:修正动作

模块 作用 VLA 提供理解 + 初始动作 RL token 提供状态表示 Actor-Critic 做局部优化

RLT的核心创新

创新1:RL Token(最关键) 👉 专门为RL设计的“压缩状态表示” 作用: 保留VLA知识 降低维度 提高sample efficiency

创新2:Action Chunk RL 👉 在“动作段”上做RL,而不是单步 好处: 缩短horizon 更容易学习

创新3:直接优化动作(不是残差) 👉 actor直接输出动作 而不是: residual noise

创新4:用VLA作为“参考策略” 两个关键点: (1)输入参考动作 RL不是盲猜 (2)加正则约束 不偏离VLA太远

本质改变 从: ❌ RL从头探索 变成: ✅ RL在“好策略附近微调”

RLT = 给RL一个“聪明的大脑接口”

Chunk级别RL让RL更“灵活”(更频繁调整)缩短时间尺度(horizon)

如果不用chunk: 50Hz × 10秒 = 500步 reward很难传回来 用chunk: 每次决策10步 horizon缩短10倍

为什么用 off-policy? 因为: 数据昂贵 必须重复利用

用一个“瓶颈结构”压缩VLA信息

Because the representation for the RL token must retain enough information to enable the decoder to reconstruct the inputs, it acts as a bottleneck.

RL token必须保留重要信息 否则: 重建不了 loss大 ✔ 同时又是低维的 👉 适合RL

在RL token基础上: 学一个小策略来“修正VLA动作”

拿起螺丝刀可能很容易,但快速且精准地对准小螺丝就难得多。想提高这个上限的话就得上RL了。

Reinforcement learning (RL) is a natural way to improve such tasks: by trying, observing outcomes, and adjusting, a robot can refine behaviors that are difficult to learn from demonstrations alone.

RLT 旨在提升那些需要细致操作的精细任务,通过几分钟或几小时的真实经验学习。

RLT adds a special output token that provides a compact interface between the VLA and a lightweight RL policy, allowing the robot to adapt its behavior in just a few hours without fine-tuning the full model.

实现方式:

We train the VLA to produce an "RL token" that can then provide a concise summary of the VLA's internal representations. This RL token is then used as the input into a much smaller model that can be trained with RL in real time.

The RL token is used by an actor and critic, which are trained with a sample-efficient off-policy RL method.

RL Token

  • a concise summary of the VLA's internal representations
  • 提供 VLA 内部表示的简明总结

RLT 首先通过添加一个编码器-解码器 Transformer 来改造 VLA。 这个模块通过一个瓶颈结构(bottleneck)来训练,使其能够预测模型的内部嵌入表示(internal embeddings),从而得到一种压缩表示,我们称之为 RL token。 这个 RL token 总结了强化学习中的 actor(策略) 和 critic(价值函数) 在当前观测下所需要的信息。 因此,即使使用非常小的 actor 和 critic 网络,也可以基于模型自身丰富的内部表示来进行学习和改进。

本质上是用大模型做感知,用小模型做决策。RLT = 用一个 bottleneck Transformer,把大模型的内部表示压缩成 RL token,让小型 RL 网络也能利用大模型的能力进行学习。

首先,RL 策略预测的是动作片段(action chunks),而不是每一步的低层控制动作。这与 VLA 的动作结构保持一致,使得策略能够学习到时间上连续的动作(temporal motions)。 其次,RL 策略不是从零开始决策: actor 会接收 VLA 预测的动作作为输入,因此它学的是“修改 VLA 的动作”,而不是完全替代它。 我们还在策略更新时加入一个正则项(regularization),让策略倾向于接近这个参考动作(VLA 的输出)。 这样一来: 当 VLA 已经表现不错时,策略不会偏离太远 只有当 critic 判断有更优解时,才会偏离 为了防止训练初期策略直接抄 VLA,我们引入了 reference-action dropout: 👉 随机丢掉参考动作输入,强制 actor 学会独立生成动作 最后,我们还可以把人类干预(human interventions)直接加入 RL 更新中: 当机器人卡住或犯错时,人类的修正可以被纳入训练,进一步提升策略。 这些设计使得在线 RL 成为一种通用方法(recipe),可以直接接在任何预训练 VLA 上,而不需要针对具体任务做复杂工程。

RL 怎么用 VLA? VLA → 给出初始动作 RL actor → 在此基础上“微调”

为什么用 action chunk? 低层控制:每一步都学(难、数据多) chunk:一段动作一起学(高效)

exploration 怎么控制 - 参考动作输入(VLA action) - 正则约束(stay close) - critic 决定何时偏离

默认相信 VLA 只有更优时才改

reference-action dropout 的意义 防止 actor = copy(VLA) 强制 actor = 能独立决策

human intervention 的作用 👉 相当于: 在线纠错 人类反馈 → 直接进 RL 类似: imitation learning + RL 混合

让 RL 不从零学,而是在 VLA 的基础上“安全地微调”,用最少数据完成适应。

👉 真正输出动作的是:RL actor(在推理阶段) 👉 VLA 提供“先验动作 + 表示能力” 👉 RL token 只是给 actor/critic 用的“状态压缩表示”

观测(image / state)
  ↓
VLA → 生成:
      1️⃣ 内部 embedding
      2️⃣ 一个初始动作(reference action)
  ↓
RL token(压缩 embedding)
  ↓
RL actor(输入:RL token + VLA action)
  ↓
最终动作(执行)

下载数据集

1
2
3
4
modelscope download \
--dataset agibot_world/GenieSim3.0-Dataset \
--local_dir /home/x/dataset \
--include "dataset/pick_block_color/g1/*"

分卷压缩包(pick_block_color_g1.tar.gz.*):

分卷 大小
pick_block_color_g1.tar.gz.000 37.58 GB
pick_block_color_g1.tar.gz.001 37.58 GB
pick_block_color_g1.tar.gz.002 37.58 GB
pick_block_color_g1.tar.gz.003 36.98 GB
合计 ≈149.72 GB

git clone https://github.com/huggingface/lerobot.git git switch -c feat/RL-token --track origin/feat/RL-token

一些没有下文的文档汇总。本身没什么价值,但是结绳记事一下。

Read more »

1.安装 ROS 2 Humble

1.1 Set locale

1
2
3
4
5
6
7
8
locale  # check for UTF-8

sudo apt update && sudo apt install locales
sudo locale-gen en_US en_US.UTF-8
sudo update-locale LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8
export LANG=en_US.UTF-8

locale # verify settings

1.2 Setup Sources

1
2
3
4
5
6
7
sudo apt install software-properties-common
sudo add-apt-repository universe

sudo apt update && sudo apt install curl -y
export ROS_APT_SOURCE_VERSION=$(curl -s https://api.github.com/repos/ros-infrastructure/ros-apt-source/releases/latest | grep -F "tag_name" | awk -F'"' '{print $4}')
curl -L -o /tmp/ros2-apt-source.deb "https://github.com/ros-infrastructure/ros-apt-source/releases/download/${ROS_APT_SOURCE_VERSION}/ros2-apt-source_${ROS_APT_SOURCE_VERSION}.$(. /etc/os-release && echo ${UBUNTU_CODENAME:-${VERSION_CODENAME}})_all.deb"
sudo dpkg -i /tmp/ros2-apt-source.deb

1.3 Install ROS 2 packages

1
2
3
sudo apt update
sudo apt upgrade
sudo apt install ros-humble-desktop

2. 安装AgileX Robotic Arm ROS2 Driver

2.1 Install Python SDK

1
2
3
git clone https://github.com/agilexrobotics/pyAgxArm.git
cd pyAgxArm
pip3 install .

2.2 Install ROS2 Driver

1
2
3
4
5
mkdir -p ~/agx_arm_ws/src
cd ~/agx_arm_ws/src
git clone -b ros2 --recurse-submodules https://github.com/agilexrobotics/agx_arm_ros.git
cd agx_arm_ros/
git submodule update --remote --recursive

2.3 Install Dependencies

1
2
3
cd ~/agx_arm_ws/src/agx_arm_ros/scripts/
chmod +x agx_arm_install_deps.sh
bash ./agx_arm_install_deps.sh

2.4 Build and Source Workspace

1
2
3
cd ~/agx_arm_ws
colcon build
source install/setup.bash

3. 实践

3.1 加载环境(每个新终端执行一次):

1
2
source /opt/ros/humble/setup.bash
source ~/agx_arm_ws/install/setup.bash

若已在 ~/.bashrc 中写入上述 source,新开终端可省略。

3.2 激活 CAN

1
bash ~/agx_arm_ws/src/agx_arm_ros/scripts/can_activate.sh
-------------------START-----------------------
Both ethtool and can-utils are installed.
Expected to configure a single CAN module, detected interface can0 with corresponding USB address 1-8:1.0.
Interface can0 is already activated with a bitrate of 1000000.
The interface name is already can0.
-------------------OVER------------------------

3.3 运行节点

1
2
3
4
5
ros2 launch agx_arm_ctrl start_single_agx_arm.launch.py \
can_port:=can0 \
arm_type:=piper \
effector_type:=agx_gripper \
tcp_offset:='[0.0, 0.0, 0.12, 0.0, 0.0, 0.0]'
参数 含义 本机示例 / 可选值
can_port CAN 网卡名 **can0**
arm_type 机械臂型号 pipernero
effector_type 末端 noneagx_gripperrevo2
tcp_offset TCP 相对法兰偏移 TCP_OFFSET.md
auto_enable 启动是否自动使能 默认 true
speed_percent 速度 (%) 0–100
namespace 多臂命名空间 可选

3.4 控制

操作 命令
使能 ros2 service call /enable_agx_arm std_srvs/srv/SetBool "{data: true}"
失能 ros2 service call /enable_agx_arm std_srvs/srv/SetBool "{data: false}"
回零 ros2 service call /move_home std_srvs/srv/Empty
急停(保持位姿) ros2 service call /emergency_stop std_srvs/srv/Empty

修改 test_arm_gripper_joint_states.yaml 的内容实现具体的动作控制

1
2
cd ~/agx_arm_ws/src/agx_arm_ros/test/piper/
ros2 topic pub -1 /control/joint_states sensor_msgs/msg/JointState "$(cat test_arm_gripper_joint_states.yaml)"

cat test_arm_gripper_joint_states.yaml

name:
- joint1
- joint2
- joint3
- joint4
- joint5
- joint6
- gripper
position: [0.0, 0.2, -0.2, 0.0, -0.0, 0.0, 0.05]
velocity: []
effort: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.5]

4. 状态查看

1
2
3
ros2 topic echo /feedback/joint_states
ros2 topic echo /feedback/tcp_pose
ros2 topic echo /feedback/arm_status
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# ros2 topic list 
/control/joint_states
/control/move_c
/control/move_j
/control/move_js
/control/move_l
/control/move_mit
/control/move_p
/feedback/arm_status
/feedback/gripper_status
/feedback/joint_states
/feedback/leader_joint_angles
/feedback/tcp_pose
/parameter_events
/rosout

5. Demo

python /home/midea/Documents/ai_platform/test.py

代码图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[启动节点 JointStatePrinter]
|
+--> [订阅 /feedback/joint_states] --> [拿到 JointState]
| |
| v
| [提取 joint1-6 + gripper]
| |
| v
| [基于当前姿态生成轨迹]
| |
| v
| [20Hz 取下一步控制量]
| |
| v
| [发布到 /control/joint_states]
|
+--> [打开相机 /dev/video4] --> [拿到 frame]
|
v
[处理相机数据: 记录尺寸/可选预览]
|
v
[1Hz 打印状态信息(含反馈与相机)]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#!/usr/bin/env python3
"""Subscribe /feedback/joint_states and preview camera."""

import math

import cv2
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState

FEEDBACK_TOPIC = "/feedback/joint_states"
CONTROL_TOPIC = "/control/joint_states"
CAMERA_INDEX = 4
TARGET_NAMES = ["joint1", "joint2", "joint3", "joint4", "joint5", "joint6", "gripper"]
GRIPPER_MIN_M = 0.0
GRIPPER_MAX_M = 0.1
# Motion amplitudes (radians / meters). Increase these to make movement larger.
JOINT2_DELTA_RAD = 0.12
JOINT3_DELTA_RAD = 0.12
JOINT4_DELTA_RAD = 0.06
GRIPPER_DELTA_M = 0.05


class JointStatePrinter(Node):
def __init__(self) -> None:
super().__init__("joint_state_printer")
self.latest_msg: JointState | None = None
self.latest_frame_shape: tuple[int, int, int] | None = None
self.show_window = False
self.window_name = "camera_preview"

self.cap = cv2.VideoCapture(CAMERA_INDEX, cv2.CAP_V4L2)
if self.cap.isOpened():
self.get_logger().info(f"OpenCV camera opened: /dev/video{CAMERA_INDEX}")
try:
cv2.namedWindow(self.window_name, cv2.WINDOW_NORMAL)
cv2.resizeWindow(self.window_name, 1280, 720)
self.show_window = True
except cv2.error:
self.get_logger().warn("No GUI backend, run without preview window.")
else:
self.get_logger().warn(f"OpenCV camera open failed: /dev/video{CAMERA_INDEX}")

self.ctrl_pub = self.create_publisher(JointState, CONTROL_TOPIC, 10)
self.traj_msgs: list[JointState] = []
self.traj_idx = 0
self.traj_cycle = 0
self.traj_started = False

self.create_subscription(JointState, FEEDBACK_TOPIC, self._callback, 10)
self.create_timer(0.05, self._read_camera_frame)
self.create_timer(0.05, self._publish_traj_step) # 20Hz
self.create_timer(1.0, self._print_latest)
self.get_logger().info(
f"Subscribed to {FEEDBACK_TOPIC}; publish planned stream to {CONTROL_TOPIC} at 20Hz"
)

def _callback(self, msg: JointState) -> None:
self.latest_msg = msg
if self.traj_started:
return
base = self._extract_target_positions(msg)
if base is None:
self.get_logger().warn("Feedback JointState missing required joints, waiting next message.")
return
self.traj_msgs = self._build_trajectory(base, steps=50)
self.traj_idx = 0
self.traj_cycle = 0
self.traj_started = True
self.get_logger().info("Generated 50-step motion stream from current feedback position (loop mode).")

def _extract_target_positions(self, msg: JointState) -> list[float] | None:
name_to_pos = {n: p for n, p in zip(msg.name, msg.position)}
out: list[float] = []
for joint in TARGET_NAMES[:6]:
if joint not in name_to_pos:
return None
out.append(float(name_to_pos[joint]))
if "gripper" in name_to_pos:
out.append(float(name_to_pos["gripper"]))
elif "gripper_joint1" in name_to_pos and "gripper_joint2" in name_to_pos:
out.append(abs(float(name_to_pos["gripper_joint1"])) + abs(float(name_to_pos["gripper_joint2"])))
else:
out.append(0.05)
out[6] = max(GRIPPER_MIN_M, min(GRIPPER_MAX_M, out[6]))
return out

def _build_trajectory(self, base: list[float], steps: int) -> list[JointState]:
traj: list[JointState] = []
for i in range(steps):
t = i / max(steps - 1, 1)
s = math.sin(2.0 * math.pi * t)
s2 = math.sin(4.0 * math.pi * t)
p = list(base)
# Larger adjustments around current pose.
p[1] += JOINT2_DELTA_RAD * s
p[2] += -JOINT3_DELTA_RAD * s
p[3] += JOINT4_DELTA_RAD * s2
# Gripper opening width in meters, clamped to [0, 0.1].
p[6] = max(GRIPPER_MIN_M, min(GRIPPER_MAX_M, p[6] + GRIPPER_DELTA_M * s))

m = JointState()
m.name = TARGET_NAMES
m.position = p
m.velocity = []
m.effort = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.5]
traj.append(m)
return traj

def _publish_traj_step(self) -> None:
if not self.traj_started or not self.traj_msgs:
return
if self.traj_idx >= len(self.traj_msgs):
self.traj_idx = 0
self.traj_cycle += 1
self.get_logger().info(f"Looping motion stream, cycle={self.traj_cycle}")
msg = self.traj_msgs[self.traj_idx]
msg.header.stamp = self.get_clock().now().to_msg()
self.ctrl_pub.publish(msg)
self.traj_idx += 1

def _read_camera_frame(self) -> None:
if not self.cap.isOpened():
return
ok, frame = self.cap.read()
if not ok:
return
self.latest_frame_shape = frame.shape
if self.show_window:
cv2.imshow(self.window_name, frame)
key = cv2.waitKey(1) & 0xFF
if key == ord("q"):
self.get_logger().info("Pressed q, shutting down...")
if rclpy.ok():
rclpy.shutdown()

def _print_latest(self) -> None:
if self.latest_msg is None:
return
msg = self.latest_msg
stamp = f"{msg.header.stamp.sec}.{msg.header.stamp.nanosec:09d}"
print("==== /feedback/joint_states ====")
print(f"stamp: {stamp}")
print(f"name: {list(msg.name)}")
print(f"position: {list(msg.position)}")
print(f"velocity: {list(msg.velocity)}")
print(f"effort: {list(msg.effort)}")
if self.traj_started:
print(f"stream_progress: {self.traj_idx}/{len(self.traj_msgs)} cycle={self.traj_cycle}")
if self.latest_frame_shape is not None:
h, w, c = self.latest_frame_shape
print(f"camera_frame: {w}x{h} channels={c}")
else:
print("camera_frame: N/A")


def main() -> None:
rclpy.init()
node = JointStatePrinter()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
if node.cap.isOpened():
node.cap.release()
if node.show_window:
cv2.destroyAllWindows()
node.destroy_node()
if rclpy.ok():
rclpy.shutdown()


if __name__ == "__main__":
main()

使用以下指令确定camera index

python3 -c " import cv2 for i in range(8): c = cv2.VideoCapture(i, cv2.CAP_V4L2) print(i, 'open=', c.isOpened()) c.release() "