Aubo ROS2 集成使用手册
本文面向现场使用人员、集成开发人员以及需要在 ARCS / ROS2 环境中接入 Aubo 机器人的开发者。本文档描述 ros_integrate 提供的 ROS2 模块,以及示教器侧接入 ROS2 的实现方式。
ros_integrate 的核心作用有两点:
- 将
aubo_sdk的功能封装为可直接使用的 ROS2 Topic、Service 和 Action - 为示教器 / ARCS 脚本侧提供 ROS2 接入模块
1. 项目概览
ros_integrate 将 aubo_sdk 的功能整理为统一、可直接访问的 ROS2 模块,并支持示教器侧接入 ROS2。
本手册说明 ros_integrate 的能力范围、接口组织方式以及各类能力的适用场景。
从使用对象来看,相关能力分为两组。
示教器侧能力:
- 在示教器 / ARCS 脚本侧通过 Lua 模块收发 ROS2 消息
SDK 侧能力:
- 通过 Service 访问 Aubo SDK 开放出来的查询、配置、运动和算法能力
- 读取机器人状态 Topic
- 在高阶连续控制场景下发送
joint_cmd - 在伺服会话管理场景下使用
servo_modeAction
除接口清单外,本手册还包括以下内容:
- 各类接口对应的功能范围
- 各接口的职责划分
- 推荐的调用顺序
所有 Topic、Service 和 Action 均带机器人名前缀。
例如机器人名为 rob1 时:
/rob1/joint_states/rob1/joint_cmd/rob1/servo_mode/rob1/RobotConfig/getDof/rob1/MotionControl/moveJoint
多机器人场景下,每台机器人都有独立命名空间,接口互不冲突。
2. 部署
本章面向已经拿到 ros_integrate 交付包、需要在目标 ARCS 机器上安装的现场用户。
ros_integrate 不是一个独立启动的 ROS2 应用,而是作为 aubo_control 的扩展模块加载。部署过程会同时落地以下内容:
- 最小 ROS2 Humble 运行时
ros_integrate扩展模块- Lua 侧
ros_bind模块 - C++ 侧共享库
libros_bindings.so aubo_control.conf中的扩展加载配置
部署完成后,aubo_control 启动时会加载 ros_integrate,再由插件发布 Topic、注册 Service 和 Action。
2.1 部署包内容
标准部署包中通常包含以下目录和文件:
.
├── setup.sh
├── ros2_runtime/
│ ├── minimal-ros2-humble-x86_64-v0.3.zip
│ └── minimal-ros2-humble-arm64-v0.3.zip
├── extensions/
│ └── ros_integrate/
│ └── ros_integrate.so
└── lib/
├── libros_bindings.so
└── lua/
└── 5.3/
└── ros_bind.so
其中:
setup.sh是目标机部署入口ros2_runtime/存放不同架构的最小 ROS2 运行时压缩包extensions/ros_integrate/ros_integrate.so是 ARCS 扩展模块lib/lua/5.3/ros_bind.so是示教器 / ARCS Lua 侧加载的ros_bind模块lib/libros_bindings.so是 C++ 与 Lua 共用的 ROS2 bridge 库
不同交付版本的压缩包文件名可能略有变化,实际部署时以交付包中的文件为准。
如果在源码仓库中调试部署脚本,脚本路径为 scripts/setup.sh;如果使用构建后的交付包,脚本位于交付包根目录。
2.2 目标机前置条件
目标机需要满足以下条件:
- 系统中已经安装并配置 ARCS /
aubo_control /etc/default/arcs存在,并能提供ARCS_WS环境变量ARCS_WS/config/aubo_control.conf文件存在- 当前用户具备写入
/opt/、$ARCS_WS、/etc/ld.so.conf.d/的权限 - 系统中已安装
rsync和unzip
可用以下命令检查基础环境:
source /etc/default/arcs
echo "$ARCS_WS"
test -f "$ARCS_WS/config/aubo_control.conf"
command -v rsync
command -v unzip
如果上述检查失败,应先修复 ARCS 环境或安装缺失命令。
2.3 执行部署
进入部署包根目录后,建议先执行预演:
./setup.sh --dry-run
预演模式只打印将要执行的复制、解压和配置动作,不真正写入系统。
确认路径和文件无误后,再执行正式部署:
sudo ./setup.sh
脚本会根据 uname -m 自动选择运行时包:
x86_64:使用 x86_64 运行时压缩包aarch64:使用 arm64 运行时压缩包
当前脚本会将运行时解压并合并到 /opt/,将扩展、Lua 模块和共享库同步到 ARCS 工作区。
2.4 脚本写入内容
setup.sh 的主要落地动作如下:
- 从
/etc/default/arcs读取ARCS_WS - 解压
ros2_runtime/中匹配当前架构的最小 ROS2 运行时 - 将运行时目录合并到
/opt/ - 将
extensions/合并到$ARCS_WS/extensions - 将
lib/lua/合并到$ARCS_WS/lib/lua - 将
lib/libros_bindings.so覆盖到$ARCS_WS/lib/libros_bindings.so - 若
aubo_control.conf中还没有alias = "ros_integrate",则追加扩展配置 - 写入
/etc/ld.so.conf.d/ros-humble.conf - 执行
ldconfig
目录同步采用合并覆盖策略:
- 源目录中同名文件会覆盖目标文件
- 目标目录中多余文件不会被删除
2.5 aubo_control.conf 配置
首次部署时,脚本会向 $ARCS_WS/config/aubo_control.conf 追加类似配置:
[[Extension]]
location = "extensions/ros_integrate/ros_integrate.so"
bundle = "ros_integrate"
alias = "ros_integrate"
enable = true
min_arcs_version = "0.32.1-rc.6"
[Extension.options]
ament_prefix_env = "/opt/ros/humble"
ld_library_env = "/opt/ros/humble/lib"
rmw_imp = "rmw_fastrtps_cpp"
domin_id = 0
localhost_only = 0
legacy_names = false
debug = false
enable_fjt_action = false
fjt_mode = 1
fjt_time = 0.0
fjt_send_period = 0.0
# rmw_qos_file = "/root/arcs_ws/config/fastdds.xml"
关键字段说明:
location:扩展模块在 ARCS 工作区中的相对路径bundle:扩展包名alias:扩展别名,脚本用它判断是否已经配置过enable:是否启用该扩展min_arcs_version:最低 ARCS 版本要求
Extension.options 字段说明:
ament_prefix_env:ROS2 安装前缀路径,用于补齐AMENT_PREFIX_PATH;默认可使用/opt/ros/humbleld_library_env:ROS2 动态库路径,用于补齐LD_LIBRARY_PATH;默认可使用/opt/ros/humble/librmw_imp:指定 ROS2 RMW 实现,例如rmw_fastrtps_cpp;留空时按当前环境默认实现处理domin_id:设置ROS_DOMAIN_ID;该字段名沿用当前代码拼写,配置时需要写作domin_idlocalhost_only:设置ROS_LOCALHOST_ONLY;0表示允许跨主机通信,1表示仅本机通信legacy_names:是否启用旧命名兼容;一般新项目保持falsedebug:是否打开ros_integrate调试日志;现场排查问题时可临时设为trueenable_fjt_action:是否默认启用FollowJointTrajectoryAction 入口;启用入口不代表后台自动占用伺服控制权fjt_mode:FollowJointTrajectory默认伺服模式;未通过joint_cmd动态覆盖时使用该值fjt_time:FollowJointTrajectory每个轨迹点传给底层伺服接口的默认时间参数;0.0表示自动使用轨迹相邻点时间差fjt_send_period:FollowJointTrajectory会话默认发送周期;0.0表示使用内部默认发送周期rmw_qos_file:可选 Fast DDS QoS XML 文件路径;配置后会启用FASTRTPS_DEFAULT_PROFILES_FILE和RMW_FASTRTPS_USE_QOS_FROM_XML
如果运行时又通过 joint_cmd 发送 fjt_control JSON,enable_fjt_action、fjt_mode、fjt_time 和 fjt_send_period 的运行时命令会优先生效。
2.6 部署后检查
部署完成后,建议按以下顺序检查。
检查文件是否落地
source /etc/default/arcs
test -f "$ARCS_WS/extensions/ros_integrate/ros_integrate.so"
test -f "$ARCS_WS/lib/lua/5.3/ros_bind.so"
test -f "$ARCS_WS/lib/libros_bindings.so"
test -d /opt/ros/humble
检查动态库路径
cat /etc/ld.so.conf.d/ros-humble.conf
ldconfig -p | grep libros_bindings
检查配置是否追加
grep -n 'alias = "ros_integrate"' "$ARCS_WS/config/aubo_control.conf"
检查 ROS2 接口是否出现
启动或重启 aubo_control 后,在 ROS2 环境中查看接口:
source /opt/ros/humble/setup.bash
ros2 topic list | grep joint_states
ros2 service list | grep RobotConfig
ros2 action list | grep servo_mode
若机器人名为 rob1,通常应能看到类似接口:
/rob1/joint_states/rob1/joint_cmd/rob1/servo_mode/rob1/RobotConfig/getDof
2.7 常见问题
找不到 /etc/default/arcs
说明目标机 ARCS 环境尚未按预期配置。应先确认 ARCS 安装是否完整,并确保 /etc/default/arcs 中存在 ARCS_WS。
提示缺少 rsync 或 unzip
安装缺失命令后重新执行部署:
sudo apt-get update
sudo apt-get install -y rsync unzip
提示找不到运行时压缩包
确认部署包中存在 ros2_runtime/ 目录,并且其中有与当前架构匹配的最小 ROS2 运行时包。
可用以下命令查看当前架构:
uname -m
部署后没有 ROS2 Topic 或 Service
建议依次检查:
aubo_control.conf中是否存在并启用了ros_integrateros_integrate.so是否已经同步到$ARCS_WS/extensions/ros_integrate//opt/ros/humble和$ARCS_WS/lib是否已经写入动态库搜索路径aubo_control是否已经重启ROS_DOMAIN_ID、ROS_LOCALHOST_ONLY是否与外部 ROS2 节点一致
重复执行部署是否安全
部署脚本支持重复执行。目录会合并覆盖,aubo_control.conf 中如果已经存在 alias = "ros_integrate",脚本不会重复追加同一段扩展配置。
3. Lua 模块
系统提供一个面向示教器 / ARCS 脚本侧使用的 Lua 5.3 模块:
ros_bind
Lua 侧典型使用方式为:
local ros_bind = require("ros_bind")
这个模块主要用于示教器侧脚本和 ROS2 系统之间的通信。常用方式包括:
- 使用
ros_publisher_factory创建一个 Topic 发布对象 - 使用
ros_subscriber_factory创建一个 Topic 订阅对象
创建时通常需要提供:
- Topic 名称
- 消息类型名称
- 可选的队列深度参数
keep_last
典型接入场景如下:
- 在示教器 / ARCS 脚本中发布或订阅标准 ROS2 Topic
- 在示教器侧脚本环境中与外部 ROS2 节点交换数据
- 以脚本方式实现轻量级通信逻辑接入
注意事项:
- 对
std_msgs原子类型,Lua 可以直接使用string、boolean、number - 对非原子消息,不提供 Lua table 到 ROS 消息的自动映射
- 非原子消息通常需要以二进制序列化后的 Lua string 形式收发
keep_last是可选队列深度参数,省略时默认值为10
最小示例如下:
local ros_bind = require("ros_bind")
local pub = ros_bind.ros_publisher_factory(
"/demo/from_lua",
"std_msgs/msg/String"
)
local sub = ros_bind.ros_subscriber_factory(
"/demo/from_ros",
"std_msgs/msg/String"
)
pub:write("hello from lua")
local ok, msg = sub:read_timeout(1000)
if ok then
textmsg("recv:", msg)
else
textmsg("timeout")
end
4. Topic 模块
4.1 /<robot_name>/joint_states Topic
joint_states 的作用
发布机器人当前关节状态。
joint_states 的典型用途
- 获取当前关节角
- 监控机器人状态
- 为上层规划、记录、可视化提供输入
joint_states 的使用建议
在执行伺服控制、轨迹回放、闭环控制前,先确认该 Topic 正常更新。
4.2 /<robot_name>/joint_cmd Topic
joint_cmd 的作用
该 Topic 是连续伺服控制场景下的主要输入接口。客户端向该 Topic 发布字符串消息,服务端将其解析为伺服控制指令。
joint_cmd 支持的输入格式
推荐使用 JSON 协议。
4.2.1 JSON 格式
支持两种 type:
servo_controlservo_point
这两类协议为推荐输入格式,详见第 7 章。
5. Service 模块
系统提供了大量 ROS2 Service,用于统一访问 Aubo 控制能力。
5.1 Service 命名规则
统一格式:
/<robot_name>/<Class>/<Method>
例如:
/rob1/RobotConfig/getDof/rob1/RobotState/getJointPositions/rob1/MotionControl/moveJoint/rob1/RobotAlgorithm/inverseKinematics
5.2 Service 分类总览
主要服务类别包括:
MathSerialSocketSystemInfoRuntimeMachineRegisterControlSyncMoveTraceRobotConfigMotionControlForceControlIoControlRobotAlgorithmRobotManageRobotState
说明: 并非清单中的所有历史接口都已对外开放。若某些接口在现场环境中无法调用或未提供,请以当前发布版本实际开放情况为准。
5.3 Service 通用调用方法
所有 Service 的调用方式基本一致:
- 确认机器人命名空间
- 确认 Service 路径
- 按对应
.srv类型构造请求 - 调用后检查:
codemessageresult
一般情况下,Service 返回值至少包含:
code:调用状态码message:描述信息result:实际结果
通用判定建议
code == 0:一般表示调用成功code != 0:表示失败或异常,需要查看message
5.4 Service 分类与用途
下面按功能分类说明各类 Service 的常见用途。
5.4.1 RobotConfig:机器人配置查询与设置
常见用途:
- 查询自由度:
getDof - 查询机器人名称:
getName - 查询周期:
getCycletime - 读取/设置关节限制、TCP 限制
- 设置碰撞等级、重力、负载、TCP 偏移
典型使用场景:
- 上位机启动时读取 DOF
- 控制前读取控制周期
- 工艺切换时更新工具或负载参数
示例中的关键调用:
/<robot>/RobotConfig/getDof
用途:获取关节自由度,用来校验 joint_cmd 点长度和控制参数。
5.4.2 RobotState:机器人状态查询
常见用途:
- 获取当前关节位置:
getJointPositions - 判断机器人是否稳定:
isSteady - 获取各种当前状态量
示例中的关键调用:
/<robot>/RobotState/getJointPositions/<robot>/RobotState/isSteady
典型使用流程:
- 调
getJointPositions获取当前姿态 - 若需要先移动到起始点,则调用
moveJoint - 再轮询
isSteady等待机器人稳定
5.4.3 MotionControl:运动与伺服控制
这是运动控制相关的核心 Service 集合。
常见能力包括:
moveJointmoveLinemoveCircleservoJointservoCartesiansetServoModeisServoModeEnabledgetExecIdstopMovestartMove
示例中的关键调用:
/<robot>/MotionControl/moveJoint/<robot>/MotionControl/getExecId
moveJoint 的典型用途
在正式开始伺服之前,可先将机器人移动到轨迹起始点。
示例参数含义:
q:目标关节位置a:加速度v:速度blend_radius:混合半径duration:持续时间
getExecId 的典型用途
用于判断运动是否已经开始或结束:
-1通常表示当前空闲- 非
-1通常表示有运动任务正在执行
示例脚本通过该接口判断 moveJoint 是否执行完成。
5.4.4 RobotAlgorithm:算法服务
常见用途:
- 逆运动学:
inverseKinematics - 正运动学
- 姿态变换与算法计算
示例中的关键调用:
/<robot>/RobotAlgorithm/inverseKinematics
典型使用场景:
- 当轨迹选择
cartesian类型时,首点如果要先通过moveJoint到位,需要先做逆解,得到对应关节角
5.4.5 Math:数学工具服务
包含位姿运算、姿态转换、坐标变换等工具,例如:
poseAddposeSubposeTransposeInverserpyToQuaternionquaternionToRpy
适用场景:
- 坐标变换
- 工具姿态计算
- 上位机几何辅助计算
5.4.6 RuntimeMachine 类服务
用于程序加载、启动、停止、暂停、恢复等运行时控制,例如:
loadProgramrunProgramstartstopabortpauseresume
适用场景:
- 控制机器人程序执行流程
- 与示教/运行环境联动
5.4.7 RegisterControl 类服务
用于寄存器、变量、modbus 等控制,例如:
getBoolInput/setBoolInputgetInt32Register/setInt16RegistersetWatchDogmodbusAddSignalmodbusGetSignalStatus
适用场景:
- 与 PLC / 工业 IO / modbus 设备交互
- 在机器人任务中共享变量
5.4.8 SyncMove 类服务
用于同步运动、坐标系管理、轴组控制,例如:
syncMoveSegmentsyncMoveOffframeAddframeAttachaxisGroupMoveJoint
适用场景:
- 多轴联动
- 外部轴组控制
- 工件坐标系管理
5.4.9 ForceControl 类服务
用于力控相关接口,例如:
fcEnablefcDisableisFcEnabledfcSetSensorThresholdsfcSetSensorLimits
适用场景:
- 柔顺控制
- 力传感器阈值配置
- 力控工艺
5.4.10 Serial / Socket 类服务
用于串口和网络通信:
serialOpen/serialCloseserialReadString/serialSendStringsocketOpen/socketClosesocketReadString/socketSendString
适用场景:
- 与外部设备通信
- 工装、扫码枪、传感器接入
5.4.11 SystemInfo / Trace 类服务
SystemInfo:版本、构建信息、系统时间Trace:文本日志、提示、通知
适用场景:
- 系统诊断
- 状态追踪
- 联调日志输出
5.5 Service 使用建议
- 所有调用前先确认机器人命名空间
- 优先检查
code == 0 - 对运动类接口,调用后结合状态接口判断是否真正执行完成
- 对算法类和配置类接口,注意参数维度与单位
- 多机器人时不要混用不同前缀的 service 路径
6. Action 模块
核心 Action 为:
/<robot_name>/servo_mode
该 Action 本身不直接承载轨迹点,而用于补充伺服会话管理与状态反馈。
在本文档中,可将该 Action 视为扩展能力接口,而非基础调用入口。
- 推荐通过
/<robot_name>/joint_cmd发送servo_control + servo_point - 联调时建议优先采用第 7 章定义的 JSON 协议
servo_modeAction 更适合作为扩展接口或补充接口使用
6.1 Action 名称
例如机器人名为 rob1:
/rob1/servo_mode
6.2 Action 的职责
servo_mode Action 负责:
- 检查是否允许进入伺服模式
- 建立伺服会话
- 在会话期间持续提供 feedback
- 检查 watchdog 超时
- 处理取消、错误、会话关闭
而真正的目标点数据仍然通过:
/<robot_name>/joint_cmd
持续发送。
也就是说:
- Action 管会话
- Topic 管点流
推荐控制路径如下:
- 先向
/<robot_name>/joint_cmd发送servo_control - 再持续发送
servo_point - 结束时再发送
servo_control stop
6.3 模式简介
无论通过 joint_cmd 还是 Action 管理伺服过程,均会涉及伺服模式号。
常见模式说明如下:
1:截断式规划伺服,实时调整目标点和规划路线2:透传模式,直接下发3:透传模式,带缓存4:1ms 透传模式,带缓存5:规划伺服模式,强调经过所有目标点6:截断式规划伺服,可叠加力控7:规划伺服模式,可叠加力控
初次联调时,建议优先从行为更易观测的关节模式开始,再逐步切换至复杂模式。
6.4 Action 的适用场景
建议在以下场景中使用 Action:
- 已完成 Service 与
joint_cmd的基础联调 - 需要额外的会话状态反馈
- 需要基于 goal / feedback / result 机制管理伺服过程
- 需要排查伺服会话被拒绝、取消或提前结束等问题
servo_mode Action 的字段、反馈、结果与示例见附录。
7. joint_cmd JSON 协议
这是主要用户接口之一。
joint_cmd 支持两类 JSON:
servo_controlservo_point
7.1 servo_control 协议
servo_control 的作用
用于开启、切换或停止伺服会话。
servo_control 标准格式
{
"type": "servo_control",
"command": "set_mode",
"mode": 1,
"watchdog_timeout": 0.5,
"send_period": 0.005
}
servo_control 字段说明
字段 servo_control.type
固定为:
servo_control
字段 command
支持:
stopset_mode
含义:
stop:停止伺服会话set_mode:设置并启动伺服模式
字段 mode
- 类型:整数
- 作用:指定伺服模式
- 规则:
- 示例中使用
1到7 0通常表示退出伺服模式1:截断式规划伺服模式,会实时调整目标点和规划路线2:透传模式,直接下发3:透传模式,缓存4:1ms 透传模式,缓存5:规划伺服模式,保证经过所有目标点6:截断式规划伺服模式,可叠加力控7:规划伺服模式,可叠加力控
- 示例中使用
字段 servo_control.watchdog_timeout
- 类型:浮点数,单位秒
- 作用:设置看门狗超时
- 规则:
- 推荐不小于
0.05 - 取值过小时可能导致会话因超时而退出
- 推荐不小于
字段 send_period
- 类型:浮点数,单位秒
- 作用:期望发送周期
- 规则:
- 如果未设置或小于等于 0,则使用机器人控制周期
典型用法
启动伺服
{
"type": "servo_control",
"command": "set_mode",
"mode": 4,
"watchdog_timeout": 0.5,
"send_period": 0.02
}
停止伺服
{
"type": "servo_control",
"command": "stop"
}
7.2 servo_point 协议
servo_point 的作用
在已启动的伺服会话中发送目标点。
servo_point 标准格式
{
"type": "servo_point",
"session_id": 123456789,
"point_id": 1,
"point_type": "joint",
"position": [0, 0, 0, 0, 0, 0],
"time": 0.02,
"lookahead_time": 0.1,
"gain": 200.0
}
servo_point 字段说明
字段 servo_point.type
固定为:
servo_point
如果未传 type,实现通常仍按点命令解析;工程使用中仍建议显式传入。
字段 session_id
- 类型:整数
- 作用:标识一次伺服会话
- 规则:
- 建议一次会话内保持不变
- 若服务端首次收到带
session_id的点,会锁定该值 - 后续若切换为其他
session_id,该点会被丢弃
字段 point_id
- 类型:整数
- 作用:标识点序号
- 规则:
- JSON 点流必须带
point_id - 必须严格递增
- 若
point_id小于等于上一次已接受点,则该点会被丢弃
- JSON 点流必须带
字段 point_type
支持:
jointcartesian
也支持数值:
0表示joint1表示cartesian
字段 position
- 类型:数组
- 规则:
point_type = joint时,长度必须等于机器人 DOFpoint_type = cartesian时,长度必须为 6
字段 lookahead_time
- 类型:浮点数
- 作用:伺服前瞻时间
positionServo实现使用该参数
字段 gain
- 类型:浮点数
- 作用:伺服增益
positionServo实现使用该参数
字段 acceleration
- 兼容预留字段
- 实现不依赖该字段
字段 velocity
- 兼容预留字段
- 实现不依赖该字段
字段 time
- 类型:浮点数
- 作用:点对应的伺服时间参数
- 示例中显式发送该字段
- 若未单独指定,通常可与
send_period保持一致
7.3 servo_point 的处理规则
关节点模式 joint 说明
- 直接将
position作为关节目标 - 长度必须等于 DOF
- 适合作为初始联调输入类型
笛卡尔模式 cartesian 说明
position必须为 6 维- 服务端会先根据当前关节状态做逆运动学
- 再将逆解结果下发给位置伺服
- 该模式仍在持续优化中,现场使用前建议完成充分联调与稳定性验证
综合考虑如下:
cartesian更灵活- 但依赖逆解成功
- 初次联调建议优先使用
joint
7.4 JSON 点流有效性规则
对于 JSON 点流,以下条件非常关键:
- 必须处于活动伺服会话中
point_id必须存在point_id必须严格递增- 若会话已锁定
session_id,后续点必须保持相同session_id position维度必须正确- 对
cartesian点,逆解必须成功
否则该点可能被直接丢弃,或导致命令执行失败。
7.5 推荐发送策略
建议每次会话都使用:
- 一个固定
session_id - 从
1开始递增的point_id - 稳定的
send_period
该策略便于问题定位,也更符合服务端处理逻辑。
8. 附录:示例脚本说明
在完成前述接口与调用流程理解后,可再查阅本章中的完整脚本与补充说明。
8.0 完整脚本
#!/usr/bin/env python3
"""
ros_control 伺服路径的最小 joint_cmd 示例。
示例功能说明:
- 发送 servo_control JSON 以建立或退出服务端 servo session
- 发送 servo_point JSON 作为 latest-reference 关节目标点流
- 支持从当前位姿密集插补至全 0 位的测试,以及 csv 轨迹回放
- 在 csv 模式下支持继续选择点位类型(joint/cartesian)
采用的 JSON 协议:
- servo_control: {"type":"servo_control","command":"set_mode"/"stop",...}
- servo_point: {"type":"servo_point","session_id":...,"point_id":...,
"point_type":"joint","position":[...],"time":...,
"lookahead_time":...,"gain":...}
说明:
- `--servo-mode 0` 表示退出伺服模式
- `--servo-mode 1` 表示截断式规划伺服模式,会实时调整目标点和规划路线
- `--servo-mode 2` 表示透传模式(直接下发)
- `--servo-mode 3` 表示透传模式(缓存)
- `--servo-mode 4` 表示 1ms 透传模式(缓存)
- `--servo-mode 5` 表示规划伺服模式,能保证经过所有目标点
- `--servo-mode 6` 表示截断式规划伺服模式,可叠加力控
- `--servo-mode 7` 表示规划伺服模式,可叠加力控
- 所有伺服模式都会发送 `send_period` 和 `time`
常用测试示例:
- current:
uv run python example_joint_cmd_position_servo.py rob1 --mode current --servo-mode 4 --period 0.02 --time 0.02
- current:
uv run python example_joint_cmd_position_servo.py rob1 --mode current --servo-mode 3 --period 0.02 --time 0.02
- csv:
uv run python example_joint_cmd_position_servo.py rob1 --mode csv --servo-mode 3 --traj /path/to/traj.csv --point-type joint --period 0.02 --time 0.02
"""
import argparse
import json
import os
import subprocess
import sys
import time
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from aubo_interfaces.srv import (
GetDof,
GetExecId,
GetJointPositions,
InverseKinematics,
IsSteady,
MoveJoint,
)
DEFAULT_TRAJ = '/root/GitLab/aubo_sdk/example/trajs/record6.offt'
def parse_args():
parser = argparse.ArgumentParser(
description='Test servo control through joint_cmd JSON.',
formatter_class=argparse.RawTextHelpFormatter,
epilog=(
'Examples:\n'
' uv run python example_joint_cmd_position_servo.py rob1 --mode current --servo-mode 4 --period 0.02 --time 0.02\n'
' uv run python example_joint_cmd_position_servo.py rob1 --mode current --servo-mode 3 --period 0.02 --time 0.02\n'
' uv run python example_joint_cmd_position_servo.py rob1 --mode csv --servo-mode 3 --traj /path/to/traj.csv --point-type joint --period 0.02 --time 0.02'
),
)
parser.add_argument('robot', nargs='?', default='')
parser.add_argument('--mode', choices=('current', 'csv'), default='')
parser.add_argument(
'--traj',
default='',
help='csv 模式下指定轨迹文件路径;未指定时会在交互模式下提示输入',
)
parser.add_argument(
'--point-type',
choices=('joint', 'cartesian'),
default='',
help='csv 模式下指定点位类型;未指定时会在交互模式下提示选择',
)
parser.add_argument('--period', type=float, default=-1.0)
parser.add_argument('--watchdog', type=float, default=0.5)
parser.add_argument(
'--servo-mode',
type=int,
choices=(1, 2, 3, 4, 5, 6, 7),
default=0,
help='伺服模式号;未指定时可交互选择',
)
parser.add_argument(
'--time',
type=float,
default=-1.0,
help='servo_point.time;未指定时会在交互模式下提示输入',
)
parser.add_argument('--gain', type=float, default=200.0)
parser.add_argument('--lookahead-time', type=float, default=0.1)
parser.add_argument(
'--interp-step',
type=float,
default=0.005,
help='current 模式下,从当前位姿插补到全 0 位时的单关节最大步长',
)
parser.add_argument('--prefeed', type=int, default=10)
parser.add_argument('--hold', type=int, default=10)
parser.add_argument('--delta', type=float, default=0.02)
parser.add_argument('--joint-index', type=int, default=0)
parser.add_argument('--move-to-first', action='store_true', default=True)
return parser.parse_args()
class JointCmdPositionServoTest(Node):
def __init__(self, args):
super().__init__('example_joint_cmd_position_servo')
self.args = args
self._argv = sys.argv[1:]
self.args.servo_mode = self._resolve_servo_mode()
self.robot = self._resolve_robot_name(args.robot)
self.args.mode = self._resolve_run_mode()
self._resolve_servo_timing()
self.pub = self.create_publisher(String, f'/{self.robot}/joint_cmd', 20)
self.get_dof_cli = self.create_client(
GetDof, f'/{self.robot}/RobotConfig/getDof'
)
self.get_joint_positions_cli = self.create_client(
GetJointPositions, f'/{self.robot}/RobotState/getJointPositions'
)
self.get_exec_id_cli = self.create_client(
GetExecId, f'/{self.robot}/MotionControl/getExecId'
)
self.is_steady_cli = self.create_client(
IsSteady, f'/{self.robot}/RobotState/isSteady'
)
self.move_joint_cli = self.create_client(
MoveJoint, f'/{self.robot}/MotionControl/moveJoint'
)
self.inverse_kinematics_cli = self.create_client(
InverseKinematics, f'/{self.robot}/RobotAlgorithm/inverseKinematics'
)
self._session_id = 0
self._point_id = 0
self._traj_points = []
self._point_type = 'joint'
self.dof = self._resolve_dof()
if self.args.joint_index >= self.dof:
self.args.joint_index = max(0, self.dof - 1)
log_text = (
f'robot={self.robot} mode={self.args.mode} '
f'joint_cmd_topic=/{self.robot}/joint_cmd '
f'servo_mode={self.args.servo_mode} period={self.args.period:.4f}s '
f'dof={self.dof}'
)
if self.args.mode == 'csv':
log_text += f' traj={self.resolve_traj_path()}'
self.get_logger().info(log_text)
def _arg_was_provided(self, name):
return any(arg == name or arg.startswith(f'{name}=') for arg in self._argv)
def _resolve_run_mode(self):
if self.args.mode in ('csv', 'current'):
return self.args.mode
env_value = os.environ.get('AUBO_RUN_MODE', '').strip().lower()
if env_value in ('1', 'csv', 'csv_playback'):
return 'csv'
if env_value in ('2', 'current', 'current_start', 'prefeed'):
return 'current'
if not sys.stdin.isatty():
return 'csv'
print('选择运行模式:')
print(' 1. csv - 使用指定轨迹文件回放')
print(' 2. current - 从当前位姿密集插补到全 0 位')
while True:
try:
choice = input('模式 [1/2] (默认 1): ').strip().lower()
except EOFError:
return 'csv'
if choice in ('', '1', 'csv', 'csv_playback'):
return 'csv'
if choice in ('2', 'current', 'current_start', 'prefeed'):
return 'current'
print(f'无效选择: {choice!r},请输入 1 或 2。')
def _resolve_robot_name(self, cli_value):
candidate = cli_value.strip().lstrip('/')
if candidate:
return candidate
env_value = os.environ.get('AUBO_ROBOT_NAME', '').strip().lstrip('/')
if env_value:
return env_value
robot_names = self._discover_servo_robots()
if robot_names:
return self._select_robot_name(robot_names)
return 'rob1'
def _resolve_servo_mode(self):
if self.args.servo_mode in (1, 2, 3, 4, 5, 6, 7):
return self.args.servo_mode
env_value = os.environ.get('AUBO_SERVO_MODE', '').strip().lower()
if env_value in ('1', '2', '3', '4', '5', '6', '7'):
return int(env_value)
if env_value in ('position', 'positionservo', 'mode4'):
return 4
if env_value in ('mode1',):
return 1
if env_value in ('mode2',):
return 2
if env_value in ('servojoint', 'joint', 'mode3'):
return 3
if env_value in ('mode5',):
return 5
if env_value in ('mode6',):
return 6
if env_value in ('mode7',):
return 7
if not sys.stdin.isatty():
return 4
print('选择伺服模式:')
print(' 1. mode 1 - 截断式规划伺服,实时调整目标点/规划路线')
print(' 2. mode 2 - 透传模式(直接下发)')
print(' 3. mode 3 - 透传模式(缓存)')
print(' 4. mode 4 - 1ms 透传模式(缓存)')
print(' 5. mode 5 - 规划伺服,保证经过所有目标点')
print(' 6. mode 6 - 截断式规划伺服,可叠加力控')
print(' 7. mode 7 - 规划伺服,可叠加力控')
while True:
try:
choice = input('伺服模式 [1/2/3/4/5/6/7] (默认 4): ').strip().lower()
except EOFError:
return 4
if choice in ('', '4', 'position', 'positionservo', 'mode4'):
return 4
if choice in ('1', 'mode1'):
return 1
if choice in ('2', 'mode2'):
return 2
if choice in ('3', 'servojoint', 'joint', 'mode3'):
return 3
if choice in ('5', 'mode5'):
return 5
if choice in ('6', 'mode6'):
return 6
if choice in ('7', 'mode7'):
return 7
print(f'无效选择: {choice!r},请输入 1、2、3、4、5、6 或 7。')
def _resolve_servo_timing(self):
period_provided = self._arg_was_provided('--period')
time_provided = self._arg_was_provided('--time')
if not period_provided:
env_period = os.environ.get('AUBO_SERVO_PERIOD', '').strip()
if env_period:
try:
value = float(env_period)
if value > 0.0:
self.args.period = value
period_provided = True
except ValueError:
pass
if not time_provided:
env_time = os.environ.get('AUBO_SERVO_TIME', '').strip()
if env_time:
try:
value = float(env_time)
if value >= 0.0:
self.args.time = value
time_provided = True
except ValueError:
pass
if not sys.stdin.isatty():
if not period_provided:
self.args.period = 0.02
if self.args.time < 0.0:
self.args.time = 0.02
return
if not period_provided:
while True:
try:
value = input(
'send_period (s) [默认 0.0200]: '
).strip()
except EOFError:
value = ''
if value == '':
self.args.period = 0.02
break
try:
parsed = float(value)
except ValueError:
print(f'无效数值: {value!r},请输入正数。')
continue
if parsed <= 0.0:
print(f'无效数值: {value!r},请输入正数。')
continue
self.args.period = parsed
break
if not time_provided:
while True:
try:
value = input(
'time (s) [默认 0.0200]: '
).strip()
except EOFError:
value = ''
if value == '':
self.args.time = 0.02
break
try:
parsed = float(value)
except ValueError:
print(f'无效数值: {value!r},请输入非负数。')
continue
if parsed < 0.0:
print(f'无效数值: {value!r},请输入非负数。')
continue
self.args.time = parsed
break
if self.args.time < 0.0:
self.args.time = 0.02
def _discover_servo_robots(self):
try:
result = subprocess.run(
['ros2', 'action', 'list'],
check=False,
capture_output=True,
text=True,
timeout=2.0,
)
except Exception:
return self._discover_joint_cmd_robots()
robot_names = []
for line in result.stdout.splitlines():
action_name = line.strip()
if action_name.endswith('/servo_mode'):
robot_names.append(action_name.strip('/').split('/')[0])
robot_names = sorted(set(robot_names))
if robot_names:
return robot_names
return self._discover_joint_cmd_robots()
def _discover_joint_cmd_robots(self):
try:
result = subprocess.run(
['ros2', 'topic', 'list'],
check=False,
capture_output=True,
text=True,
timeout=2.0,
)
except Exception:
return []
robot_names = []
for line in result.stdout.splitlines():
topic = line.strip()
if topic.endswith('/joint_cmd'):
robot_names.append(topic.strip('/').split('/')[0])
return sorted(set(robot_names))
def _select_robot_name(self, robot_names):
if len(robot_names) == 1:
self.get_logger().info(f'found 1 robot: {robot_names[0]}')
return robot_names[0]
for idx, robot_name in enumerate(robot_names, start=1):
self.get_logger().info(f' {idx}. {robot_name}')
if not sys.stdin.isatty():
self.get_logger().warning(
f'non-interactive stdin detected, defaulting to {robot_names[0]}'
)
return robot_names[0]
while True:
try:
choice = input(
f'选择机器人 [1-{len(robot_names)}] '
f'(默认 1: {robot_names[0]}): '
).strip()
except EOFError:
self.get_logger().warning(
f'input closed, defaulting to {robot_names[0]}'
)
return robot_names[0]
if choice == '':
return robot_names[0]
if choice.isdigit():
index = int(choice)
if 1 <= index <= len(robot_names):
return robot_names[index - 1]
print(f'无效选择: {choice!r},请输入编号。')
def call_srv(self, client, request, name, timeout_sec=3.0):
if not client.wait_for_service(timeout_sec=timeout_sec):
self.get_logger().error(f'service not available: {name}')
return None
future = client.call_async(request)
rclpy.spin_until_future_complete(self, future)
response = future.result()
if response is None:
self.get_logger().error(f'service call failed: {name}')
return response
def _resolve_dof(self):
resp = self.call_srv(
self.get_dof_cli,
GetDof.Request(),
f'/{self.robot}/RobotConfig/getDof',
)
if resp is not None and resp.code == 0 and resp.result > 0:
return int(resp.result)
return 6
def get_joint_positions(self):
resp = self.call_srv(
self.get_joint_positions_cli,
GetJointPositions.Request(),
f'/{self.robot}/RobotState/getJointPositions',
)
if resp is None or resp.code != 0:
self.get_logger().error('getJointPositions failed')
return None
values = list(resp.result)
if values:
self.dof = len(values)
return values
def get_exec_id(self):
resp = self.call_srv(
self.get_exec_id_cli,
GetExecId.Request(),
f'/{self.robot}/MotionControl/getExecId',
)
if resp is None:
return None
if resp.code != 0:
self.get_logger().warning(
f'getExecId code={resp.code} message={resp.message}'
)
return resp.result
def is_near_start_pose(self, tolerance=1e-3):
if not self._traj_points:
self.get_logger().error('trajectory not loaded')
return False
current_q = self.get_joint_positions()
if current_q is None:
return False
start_q = self.resolve_move_joint_target(self._traj_points[0])
if start_q is None:
return False
max_err = max(abs(current_q[i] - start_q[i]) for i in range(self.dof))
self.get_logger().info(
f'compare current pose to traj start: max_err={max_err:.6f}, '
f'tolerance={tolerance:.6f}'
)
return max_err <= tolerance
def move_to_first(self, point):
joint_target = self.resolve_move_joint_target(point)
if joint_target is None:
return False
req = MoveJoint.Request()
req.q = list(joint_target)
req.a = 0.3
req.v = 0.3
req.blend_radius = 0.0
req.duration = 0.0
resp = self.call_srv(
self.move_joint_cli,
req,
f'/{self.robot}/MotionControl/moveJoint',
timeout_sec=5.0,
)
if resp is None:
return False
self.get_logger().info(
f'moveJoint result={resp.result} code={resp.code} message={resp.message}'
)
if resp.result == 0:
return True
if resp.result == 13:
self.get_logger().warning(
'moveJoint returned 13 (ignored); checking whether robot is already at trajectory start'
)
if self.is_near_start_pose():
self.get_logger().info(
'robot is already at trajectory start; skip waiting for moveJoint'
)
return True
return False
def resolve_move_joint_target(self, point):
if self._point_type == 'joint':
if len(point) != self.dof:
self.get_logger().error(
f'joint point dimension mismatch: expected {self.dof}, got {len(point)}'
)
return None
return list(point)
if self._point_type != 'cartesian':
self.get_logger().error(f'unsupported point_type: {self._point_type}')
return None
if len(point) != 6:
self.get_logger().error(
f'cartesian point dimension mismatch: expected 6, got {len(point)}'
)
return None
qnear = self.get_joint_positions()
if qnear is None:
self.get_logger().error('failed to read current joints before inverseKinematics')
return None
req = InverseKinematics.Request()
req.qnear = list(qnear)
req.pose = list(point)
resp = self.call_srv(
self.inverse_kinematics_cli,
req,
f'/{self.robot}/RobotAlgorithm/inverseKinematics',
)
if resp is None or resp.code != 0:
self.get_logger().error(
'inverseKinematics failed for cartesian first point'
)
return None
joint_target, ik_rc = self.parse_ik_result(resp.result)
if joint_target is None:
self.get_logger().error(
f'inverseKinematics returned unsupported result payload: {resp.result!r}'
)
return None
if ik_rc not in (None, 0):
self.get_logger().warning(
f'inverseKinematics returned ik_rc={ik_rc} for cartesian first point'
)
if len(joint_target) != self.dof:
self.get_logger().error(
f'inverseKinematics result dimension mismatch: expected {self.dof}, '
f'got {len(joint_target)}'
)
return None
self.get_logger().info(
f'resolved cartesian first point to joint target via inverseKinematics: {joint_target}'
)
return joint_target
def parse_ik_result(self, value):
def try_cast_sequence(seq):
try:
return [float(item) for item in seq]
except (TypeError, ValueError):
return None
if isinstance(value, (list, tuple)):
direct = try_cast_sequence(value)
if direct is not None:
return direct, 0
if value and isinstance(value[0], (list, tuple)):
joint_target = try_cast_sequence(value[0])
if joint_target is None:
return None, None
ik_code = value[1] if len(value) > 1 else 0
try:
ik_code = int(ik_code)
except (TypeError, ValueError):
ik_code = None
return joint_target, ik_code
return None, None
if not isinstance(value, str):
return None, None
text = value.strip()
if not text:
return None, None
try:
parsed = json.loads(text)
if isinstance(parsed, list):
direct = try_cast_sequence(parsed)
if direct is not None:
return direct, 0
if parsed and isinstance(parsed[0], list):
joint_target = try_cast_sequence(parsed[0])
if joint_target is None:
return None, None
ik_code = parsed[1] if len(parsed) > 1 else 0
try:
ik_code = int(ik_code)
except (TypeError, ValueError):
ik_code = None
return joint_target, ik_code
except (ValueError, TypeError):
pass
try:
return [float(item.strip()) for item in text.split(',') if item.strip()], 0
except ValueError:
return None, None
def wait_until_steady(self, timeout_sec=10.0):
deadline = time.time() + timeout_sec
while rclpy.ok() and time.time() < deadline:
resp = self.call_srv(
self.is_steady_cli,
IsSteady.Request(),
f'/{self.robot}/RobotState/isSteady',
)
if resp is not None and resp.code == 0 and resp.result:
return True
time.sleep(0.1)
self.get_logger().error('robot did not become steady')
return False
def wait_move_done(self, timeout_sec=30.0):
start_deadline = time.time() + min(timeout_sec, 3.0)
exec_id = -1
while rclpy.ok() and time.time() < start_deadline:
value = self.get_exec_id()
if value is None:
return False
exec_id = value
if exec_id != -1:
self.get_logger().info(f'moveJoint started exec_id={exec_id}')
break
time.sleep(0.05)
if exec_id == -1:
if self.is_near_start_pose():
self.get_logger().info(
'moveJoint did not start, but robot is already at trajectory start'
)
return self.wait_until_steady()
self.get_logger().error('moveJoint did not start within timeout')
return False
finish_deadline = time.time() + timeout_sec
while rclpy.ok() and time.time() < finish_deadline:
value = self.get_exec_id()
if value is None:
return False
if value == -1:
self.get_logger().info('moveJoint exec_id returned to idle')
return self.wait_until_steady(timeout_sec=5.0)
time.sleep(0.05)
self.get_logger().error('moveJoint did not finish within timeout')
return False
def publish_json(self, payload, tag):
msg = String()
msg.data = json.dumps(payload, separators=(',', ':'))
self.pub.publish(msg)
self.get_logger().info(f'joint_cmd[{tag}] {msg.data}')
def start_session(self):
self._session_id = self.get_clock().now().nanoseconds
self._point_id = 0
# ros_control joint_cmd JSON 协议说明:
# 1) {"type":"servo_control","command":"set_mode",...}
# 用来进入/退出服务端的 servo session。
# 2) {"type":"servo_point","session_id":...,"point_id":...,"position":[...],...}
# 用来在该 session 内发送一条 latest-reference 伺服点。
# 3) mode 语义以底层伺服模式定义为准。
# 4) 所有模式都显式发送 send_period 和 time。
def publish_set_mode(self):
self.start_session()
payload = {
'type': 'servo_control',
'command': 'set_mode',
'mode': self.args.servo_mode,
'watchdog_timeout': self.args.watchdog,
'send_period': self.args.period,
}
self.publish_json(payload, 'set_mode')
def publish_stop(self):
payload = {
'type': 'servo_control',
'command': 'stop',
}
self.publish_json(payload, 'stop')
def publish_point(self, values, tag):
self._point_id += 1
payload = {
'type': 'servo_point',
'session_id': self._session_id,
'point_id': self._point_id,
'point_type': self._point_type,
'position': list(values),
'time': self.args.time if self.args.time >= 0.0 else self.args.period,
'lookahead_time': self.args.lookahead_time,
'gain': self.args.gain,
}
self.publish_json(payload, tag)
def build_current_test_points(self):
current = self.get_joint_positions()
if current is None:
return []
# current 模式采用从当前位姿到全 0 位的密集插补轨迹,
# 用于验证 latest-reference joint stream 在连续轨迹下的表现。
target = [0.0] * len(current)
max_delta = max(abs(current[i] - target[i]) for i in range(len(current)))
step = max(self.args.interp_step, 1e-4)
segments = max(1, int(max_delta / step + 0.999999))
points = []
for _ in range(self.args.prefeed):
points.append(list(current))
for idx in range(1, segments + 1):
ratio = idx / segments
point = [
current[j] + (target[j] - current[j]) * ratio
for j in range(len(current))
]
points.append(point)
for _ in range(self.args.hold):
points.append(list(target))
self.get_logger().info(
f'build current mode trajectory: start={current} target={target} '
f'segments={segments} interp_step={step:.6f}'
)
self._traj_points = points
return points
def resolve_traj_path(self):
if self.args.traj:
return self.args.traj
env_value = os.environ.get('AUBO_TRAJ_PATH', '').strip()
if env_value:
self.args.traj = env_value
return self.args.traj
if sys.stdin.isatty():
while True:
try:
value = input(
f'请输入 csv 轨迹文件路径 (默认: {DEFAULT_TRAJ}): '
).strip()
except EOFError:
value = ''
self.args.traj = value or DEFAULT_TRAJ
if os.path.isfile(self.args.traj):
return self.args.traj
print(f'轨迹文件不存在: {self.args.traj!r},请重新输入。')
self.args.traj = DEFAULT_TRAJ
return self.args.traj
def resolve_csv_point_type(self):
if self.args.mode != 'csv':
self._point_type = 'joint'
return self._point_type
if self.args.point_type in ('joint', 'cartesian'):
self._point_type = self.args.point_type
return self._point_type
env_value = os.environ.get('AUBO_POINT_TYPE', '').strip().lower()
if env_value in ('joint', 'cartesian'):
self.args.point_type = env_value
self._point_type = env_value
return self._point_type
if sys.stdin.isatty():
while True:
try:
choice = input(
'请选择 csv 点位类型 [1/2] (默认 1): '
'\n 1. joint'
'\n 2. cartesian'
'\n> '
).strip().lower()
except EOFError:
choice = ''
if choice in ('', '1', 'joint'):
self.args.point_type = 'joint'
self._point_type = 'joint'
return self._point_type
if choice in ('2', 'cartesian'):
self.args.point_type = 'cartesian'
self._point_type = 'cartesian'
return self._point_type
print(f'无效选择: {choice!r},请输入 1 或 2。')
self.args.point_type = 'joint'
self._point_type = 'joint'
return self._point_type
def load_csv_points(self):
traj_path = self.resolve_traj_path()
if not traj_path:
return []
if not os.path.isfile(traj_path):
self.get_logger().error(f'trajectory not found: {traj_path}')
return []
point_type = self.resolve_csv_point_type()
expected_size = self.dof if point_type == 'joint' else 6
points = []
with open(traj_path, 'r', encoding='utf-8', errors='replace') as file:
for lineno, line in enumerate(file, start=1):
text = line.strip()
if not text:
continue
try:
point = [float(x.strip()) for x in text.split(',')]
except ValueError:
self.get_logger().error(f'invalid line {lineno}: {text}')
return []
points.append(point)
if not points:
self.get_logger().error('trajectory is empty')
return []
if any(len(point) != len(points[0]) for point in points):
self.get_logger().error('trajectory dimension mismatch')
return []
if any(len(point) != expected_size for point in points):
self.get_logger().error(
f'trajectory point size mismatch: point_type={point_type} '
f'expected={expected_size} actual={len(points[0])}'
)
return []
self._traj_points = points
self.get_logger().info(
f'loaded csv trajectory: {traj_path} point_type={point_type}'
)
return points
def run(self):
if self.args.mode == 'csv':
points = self.load_csv_points()
if not points:
return 1
if self.args.move_to_first:
if not self.move_to_first(points[0]):
return 1
if not self.wait_move_done():
return 1
else:
self._point_type = 'joint'
points = self.build_current_test_points()
if not points:
return 1
self.publish_set_mode()
time.sleep(min(self.args.period, 0.05))
try:
next_tp = time.monotonic()
for index, point in enumerate(points, start=1):
self.publish_point(point, f'point_{index}')
next_tp += self.args.period
delay = next_tp - time.monotonic()
if delay > 0.0:
time.sleep(delay)
time.sleep(min(self.args.watchdog * 0.5, 0.2))
finally:
self.publish_stop()
return 0
def main():
args = parse_args()
rclpy.init()
node = JointCmdPositionServoTest(args)
try:
rc = node.run()
finally:
node.destroy_node()
rclpy.shutdown()
raise SystemExit(rc)
if __name__ == '__main__':
main()
8.1 脚本主流程说明
脚本主流程可以概括成下面这一段:
def run(self):
if self.args.mode == 'csv':
points = self.load_csv_points()
if not points:
return 1
if self.args.move_to_first:
if not self.move_to_first(points[0]):
return 1
if not self.wait_move_done():
return 1
else:
self._point_type = 'joint'
points = self.build_current_test_points()
if not points:
return 1
self.publish_set_mode()
time.sleep(min(self.args.period, 0.05))
try:
next_tp = time.monotonic()
for index, point in enumerate(points, start=1):
self.publish_point(point, f'point_{index}')
next_tp += self.args.period
delay = next_tp - time.monotonic()
if delay > 0.0:
time.sleep(delay)
time.sleep(min(self.args.watchdog * 0.5, 0.2))
finally:
self.publish_stop()
return 0
该段代码对应的调用顺序如下:
- 先准备好要发送的点列
- 如果是
csv模式,可先moveJoint到起点 - 发送
servo_control set_mode建立伺服会话 - 按固定周期连续发送
servo_point - 结束时无论是否异常都发送
stop
8.2 脚本中使用到的 Service
脚本初始化时会创建这些 Service 客户端:
self.get_dof_cli = self.create_client(
GetDof, f'/{self.robot}/RobotConfig/getDof'
)
self.get_joint_positions_cli = self.create_client(
GetJointPositions, f'/{self.robot}/RobotState/getJointPositions'
)
self.get_exec_id_cli = self.create_client(
GetExecId, f'/{self.robot}/MotionControl/getExecId'
)
self.is_steady_cli = self.create_client(
IsSteady, f'/{self.robot}/RobotState/isSteady'
)
self.move_joint_cli = self.create_client(
MoveJoint, f'/{self.robot}/MotionControl/moveJoint'
)
self.inverse_kinematics_cli = self.create_client(
InverseKinematics, f'/{self.robot}/RobotAlgorithm/inverseKinematics'
)
它们分别用于:
getDof:读取机器人自由度getJointPositions:读取当前关节位置getExecId:判断moveJoint是否已经启动或结束isSteady:确认机器人是否重新稳定moveJoint:在csv模式下先移动到轨迹起点inverseKinematics:当csv输入的是笛卡尔点时,把第一点转换成关节目标
8.3 current 模式
current 模式的核心逻辑如下:
def build_current_test_points(self):
current = self.get_joint_positions()
if current is None:
return []
target = [0.0] * len(current)
max_delta = max(abs(current[i] - target[i]) for i in range(len(current)))
step = max(self.args.interp_step, 1e-4)
segments = max(1, int(max_delta / step + 0.999999))
points = []
for _ in range(self.args.prefeed):
points.append(list(current))
for idx in range(1, segments + 1):
ratio = idx / segments
point = [
current[j] + (target[j] - current[j]) * ratio
for j in range(len(current))
]
points.append(point)
for _ in range(self.args.hold):
points.append(list(target))
return points
处理逻辑如下:
- 先读取机器人当前关节位置
- 把目标位置设为全 0 位
- 根据
interp_step计算需要插补多少段 - 前置重复发送若干次当前点,用于提高伺服会话启动阶段的接收稳定性
- 中间发送一串从当前位姿逐步过渡到目标位姿的关节点
- 末尾重复发送若干次目标点,用于结束阶段稳定保持
该模式适合作为基础联调验证方式,因为其不依赖外部轨迹文件。
8.4 csv 模式
csv 模式的核心逻辑如下:
def load_csv_points(self):
traj_path = self.resolve_traj_path()
if not os.path.isfile(traj_path):
return []
point_type = self.resolve_csv_point_type()
expected_size = self.dof if point_type == 'joint' else 6
points = []
with open(traj_path, 'r', encoding='utf-8', errors='replace') as file:
for lineno, line in enumerate(file, start=1):
text = line.strip()
if not text:
continue
point = [float(x.strip()) for x in text.split(',')]
points.append(point)
if any(len(point) != expected_size for point in points):
return []
self._traj_points = points
return points
配合主流程里的这段判断:
if self.args.move_to_first:
if not self.move_to_first(points[0]):
return 1
if not self.wait_move_done():
return 1
csv 模式的主要特征如下:
- 从文本轨迹中逐行读取点位
- 支持
joint和cartesian两种点类型 - 如果是
joint,每行长度必须等于 DOF - 如果是
cartesian,每行长度必须为 6 - 在正式进入伺服前,可先将机器人移动至轨迹第一点附近
其中真正发送到 joint_cmd 的 JSON 格式如下:
def publish_set_mode(self):
self.start_session()
payload = {
'type': 'servo_control',
'command': 'set_mode',
'mode': self.args.servo_mode,
'watchdog_timeout': self.args.watchdog,
'send_period': self.args.period,
}
self.publish_json(payload, 'set_mode')
def publish_point(self, values, tag):
self._point_id += 1
payload = {
'type': 'servo_point',
'session_id': self._session_id,
'point_id': self._point_id,
'point_type': self._point_type,
'position': list(values),
'lookahead_time': self.args.lookahead_time,
'gain': self.args.gain,
}
self.publish_json(payload, tag)
关键字段说明如下:
servo_control用于开始或结束会话session_id用于标识同一轮伺服会话point_id需要在同一会话内持续递增point_type指明当前点是关节点还是笛卡尔点position是本次目标点数据lookahead_time与gain为实现实际消费的参数