Zhao Dongyu's Blog

A life which is unexamined is not worth living.

0%

RLToken实践

1.安装 ROS 2 Humble

1.1 Set locale

1
2
3
4
5
6
7
8
locale  # check for UTF-8

sudo apt update && sudo apt install locales
sudo locale-gen en_US en_US.UTF-8
sudo update-locale LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8
export LANG=en_US.UTF-8

locale # verify settings

1.2 Setup Sources

1
2
3
4
5
6
7
sudo apt install software-properties-common
sudo add-apt-repository universe

sudo apt update && sudo apt install curl -y
export ROS_APT_SOURCE_VERSION=$(curl -s https://api.github.com/repos/ros-infrastructure/ros-apt-source/releases/latest | grep -F "tag_name" | awk -F'"' '{print $4}')
curl -L -o /tmp/ros2-apt-source.deb "https://github.com/ros-infrastructure/ros-apt-source/releases/download/${ROS_APT_SOURCE_VERSION}/ros2-apt-source_${ROS_APT_SOURCE_VERSION}.$(. /etc/os-release && echo ${UBUNTU_CODENAME:-${VERSION_CODENAME}})_all.deb"
sudo dpkg -i /tmp/ros2-apt-source.deb

1.3 Install ROS 2 packages

1
2
3
sudo apt update
sudo apt upgrade
sudo apt install ros-humble-desktop

2. 安装AgileX Robotic Arm ROS2 Driver

2.1 Install Python SDK

1
2
3
git clone https://github.com/agilexrobotics/pyAgxArm.git
cd pyAgxArm
pip3 install .

2.2 Install ROS2 Driver

1
2
3
4
5
mkdir -p ~/agx_arm_ws/src
cd ~/agx_arm_ws/src
git clone -b ros2 --recurse-submodules https://github.com/agilexrobotics/agx_arm_ros.git
cd agx_arm_ros/
git submodule update --remote --recursive

2.3 Install Dependencies

1
2
3
cd ~/agx_arm_ws/src/agx_arm_ros/scripts/
chmod +x agx_arm_install_deps.sh
bash ./agx_arm_install_deps.sh

2.4 Build and Source Workspace

1
2
3
cd ~/agx_arm_ws
colcon build
source install/setup.bash

3. 实践

3.1 加载环境(每个新终端执行一次):

1
2
source /opt/ros/humble/setup.bash
source ~/agx_arm_ws/install/setup.bash

若已在 ~/.bashrc 中写入上述 source,新开终端可省略。

3.2 激活 CAN

1
bash ~/agx_arm_ws/src/agx_arm_ros/scripts/can_activate.sh
-------------------START-----------------------
Both ethtool and can-utils are installed.
Expected to configure a single CAN module, detected interface can0 with corresponding USB address 1-8:1.0.
Interface can0 is already activated with a bitrate of 1000000.
The interface name is already can0.
-------------------OVER------------------------

3.3 运行节点

1
2
3
4
5
ros2 launch agx_arm_ctrl start_single_agx_arm.launch.py \
can_port:=can0 \
arm_type:=piper \
effector_type:=agx_gripper \
tcp_offset:='[0.0, 0.0, 0.12, 0.0, 0.0, 0.0]'
参数 含义 本机示例 / 可选值
can_port CAN 网卡名 **can0**
arm_type 机械臂型号 pipernero
effector_type 末端 noneagx_gripperrevo2
tcp_offset TCP 相对法兰偏移 TCP_OFFSET.md
auto_enable 启动是否自动使能 默认 true
speed_percent 速度 (%) 0–100
namespace 多臂命名空间 可选

3.4 控制

操作 命令
使能 ros2 service call /enable_agx_arm std_srvs/srv/SetBool "{data: true}"
失能 ros2 service call /enable_agx_arm std_srvs/srv/SetBool "{data: false}"
回零 ros2 service call /move_home std_srvs/srv/Empty
急停(保持位姿) ros2 service call /emergency_stop std_srvs/srv/Empty

修改 test_arm_gripper_joint_states.yaml 的内容实现具体的动作控制

1
2
cd ~/agx_arm_ws/src/agx_arm_ros/test/piper/
ros2 topic pub -1 /control/joint_states sensor_msgs/msg/JointState "$(cat test_arm_gripper_joint_states.yaml)"

cat test_arm_gripper_joint_states.yaml

name:
- joint1
- joint2
- joint3
- joint4
- joint5
- joint6
- gripper
position: [0.0, 0.2, -0.2, 0.0, -0.0, 0.0, 0.05]
velocity: []
effort: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.5]

4. 状态查看

1
2
3
ros2 topic echo /feedback/joint_states
ros2 topic echo /feedback/tcp_pose
ros2 topic echo /feedback/arm_status
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# ros2 topic list 
/control/joint_states
/control/move_c
/control/move_j
/control/move_js
/control/move_l
/control/move_mit
/control/move_p
/feedback/arm_status
/feedback/gripper_status
/feedback/joint_states
/feedback/leader_joint_angles
/feedback/tcp_pose
/parameter_events
/rosout

5. Demo

python /home/midea/Documents/ai_platform/test.py

代码图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[启动节点 JointStatePrinter]
|
+--> [订阅 /feedback/joint_states] --> [拿到 JointState]
| |
| v
| [提取 joint1-6 + gripper]
| |
| v
| [基于当前姿态生成轨迹]
| |
| v
| [20Hz 取下一步控制量]
| |
| v
| [发布到 /control/joint_states]
|
+--> [打开相机 /dev/video4] --> [拿到 frame]
|
v
[处理相机数据: 记录尺寸/可选预览]
|
v
[1Hz 打印状态信息(含反馈与相机)]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#!/usr/bin/env python3
"""Subscribe /feedback/joint_states and preview camera."""

import math

import cv2
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState

FEEDBACK_TOPIC = "/feedback/joint_states"
CONTROL_TOPIC = "/control/joint_states"
CAMERA_INDEX = 4
TARGET_NAMES = ["joint1", "joint2", "joint3", "joint4", "joint5", "joint6", "gripper"]
GRIPPER_MIN_M = 0.0
GRIPPER_MAX_M = 0.1
# Motion amplitudes (radians / meters). Increase these to make movement larger.
JOINT2_DELTA_RAD = 0.12
JOINT3_DELTA_RAD = 0.12
JOINT4_DELTA_RAD = 0.06
GRIPPER_DELTA_M = 0.05


class JointStatePrinter(Node):
def __init__(self) -> None:
super().__init__("joint_state_printer")
self.latest_msg: JointState | None = None
self.latest_frame_shape: tuple[int, int, int] | None = None
self.show_window = False
self.window_name = "camera_preview"

self.cap = cv2.VideoCapture(CAMERA_INDEX, cv2.CAP_V4L2)
if self.cap.isOpened():
self.get_logger().info(f"OpenCV camera opened: /dev/video{CAMERA_INDEX}")
try:
cv2.namedWindow(self.window_name, cv2.WINDOW_NORMAL)
cv2.resizeWindow(self.window_name, 1280, 720)
self.show_window = True
except cv2.error:
self.get_logger().warn("No GUI backend, run without preview window.")
else:
self.get_logger().warn(f"OpenCV camera open failed: /dev/video{CAMERA_INDEX}")

self.ctrl_pub = self.create_publisher(JointState, CONTROL_TOPIC, 10)
self.traj_msgs: list[JointState] = []
self.traj_idx = 0
self.traj_cycle = 0
self.traj_started = False

self.create_subscription(JointState, FEEDBACK_TOPIC, self._callback, 10)
self.create_timer(0.05, self._read_camera_frame)
self.create_timer(0.05, self._publish_traj_step) # 20Hz
self.create_timer(1.0, self._print_latest)
self.get_logger().info(
f"Subscribed to {FEEDBACK_TOPIC}; publish planned stream to {CONTROL_TOPIC} at 20Hz"
)

def _callback(self, msg: JointState) -> None:
self.latest_msg = msg
if self.traj_started:
return
base = self._extract_target_positions(msg)
if base is None:
self.get_logger().warn("Feedback JointState missing required joints, waiting next message.")
return
self.traj_msgs = self._build_trajectory(base, steps=50)
self.traj_idx = 0
self.traj_cycle = 0
self.traj_started = True
self.get_logger().info("Generated 50-step motion stream from current feedback position (loop mode).")

def _extract_target_positions(self, msg: JointState) -> list[float] | None:
name_to_pos = {n: p for n, p in zip(msg.name, msg.position)}
out: list[float] = []
for joint in TARGET_NAMES[:6]:
if joint not in name_to_pos:
return None
out.append(float(name_to_pos[joint]))
if "gripper" in name_to_pos:
out.append(float(name_to_pos["gripper"]))
elif "gripper_joint1" in name_to_pos and "gripper_joint2" in name_to_pos:
out.append(abs(float(name_to_pos["gripper_joint1"])) + abs(float(name_to_pos["gripper_joint2"])))
else:
out.append(0.05)
out[6] = max(GRIPPER_MIN_M, min(GRIPPER_MAX_M, out[6]))
return out

def _build_trajectory(self, base: list[float], steps: int) -> list[JointState]:
traj: list[JointState] = []
for i in range(steps):
t = i / max(steps - 1, 1)
s = math.sin(2.0 * math.pi * t)
s2 = math.sin(4.0 * math.pi * t)
p = list(base)
# Larger adjustments around current pose.
p[1] += JOINT2_DELTA_RAD * s
p[2] += -JOINT3_DELTA_RAD * s
p[3] += JOINT4_DELTA_RAD * s2
# Gripper opening width in meters, clamped to [0, 0.1].
p[6] = max(GRIPPER_MIN_M, min(GRIPPER_MAX_M, p[6] + GRIPPER_DELTA_M * s))

m = JointState()
m.name = TARGET_NAMES
m.position = p
m.velocity = []
m.effort = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.5]
traj.append(m)
return traj

def _publish_traj_step(self) -> None:
if not self.traj_started or not self.traj_msgs:
return
if self.traj_idx >= len(self.traj_msgs):
self.traj_idx = 0
self.traj_cycle += 1
self.get_logger().info(f"Looping motion stream, cycle={self.traj_cycle}")
msg = self.traj_msgs[self.traj_idx]
msg.header.stamp = self.get_clock().now().to_msg()
self.ctrl_pub.publish(msg)
self.traj_idx += 1

def _read_camera_frame(self) -> None:
if not self.cap.isOpened():
return
ok, frame = self.cap.read()
if not ok:
return
self.latest_frame_shape = frame.shape
if self.show_window:
cv2.imshow(self.window_name, frame)
key = cv2.waitKey(1) & 0xFF
if key == ord("q"):
self.get_logger().info("Pressed q, shutting down...")
if rclpy.ok():
rclpy.shutdown()

def _print_latest(self) -> None:
if self.latest_msg is None:
return
msg = self.latest_msg
stamp = f"{msg.header.stamp.sec}.{msg.header.stamp.nanosec:09d}"
print("==== /feedback/joint_states ====")
print(f"stamp: {stamp}")
print(f"name: {list(msg.name)}")
print(f"position: {list(msg.position)}")
print(f"velocity: {list(msg.velocity)}")
print(f"effort: {list(msg.effort)}")
if self.traj_started:
print(f"stream_progress: {self.traj_idx}/{len(self.traj_msgs)} cycle={self.traj_cycle}")
if self.latest_frame_shape is not None:
h, w, c = self.latest_frame_shape
print(f"camera_frame: {w}x{h} channels={c}")
else:
print("camera_frame: N/A")


def main() -> None:
rclpy.init()
node = JointStatePrinter()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
if node.cap.isOpened():
node.cap.release()
if node.show_window:
cv2.destroyAllWindows()
node.destroy_node()
if rclpy.ok():
rclpy.shutdown()


if __name__ == "__main__":
main()

使用以下指令确定camera index

python3 -c " import cv2 for i in range(8): c = cv2.VideoCapture(i, cv2.CAP_V4L2) print(i, 'open=', c.isOpened()) c.release() "

Thanks for your support.