Aubo ROS2 集成使用手册

本文面向现场使用人员、集成开发人员以及需要在 ARCS / ROS2 环境中接入 Aubo 机器人的开发者。本文档描述 ros_integrate 提供的 ROS2 模块,以及示教器侧接入 ROS2 的实现方式。

ros_integrate 的核心作用有两点:

  • aubo_sdk 的功能封装为可直接使用的 ROS2 Topic、Service 和 Action
  • 为示教器 / ARCS 脚本侧提供 ROS2 接入模块

1. 项目概览

ros_integrateaubo_sdk 的功能整理为统一、可直接访问的 ROS2 模块,并支持示教器侧接入 ROS2。

本手册说明 ros_integrate 的能力范围、接口组织方式以及各类能力的适用场景。

从使用对象来看,相关能力分为两组。

示教器侧能力:

  • 在示教器 / ARCS 脚本侧通过 Lua 模块收发 ROS2 消息

SDK 侧能力:

  • 通过 Service 访问 Aubo SDK 开放出来的查询、配置、运动和算法能力
  • 读取机器人状态 Topic
  • 在高阶连续控制场景下发送 joint_cmd
  • 在伺服会话管理场景下使用 servo_mode Action

除接口清单外,本手册还包括以下内容:

  • 各类接口对应的功能范围
  • 各接口的职责划分
  • 推荐的调用顺序

所有 Topic、Service 和 Action 均带机器人名前缀。

例如机器人名为 rob1 时:

  • /rob1/joint_states
  • /rob1/joint_cmd
  • /rob1/servo_mode
  • /rob1/RobotConfig/getDof
  • /rob1/MotionControl/moveJoint

多机器人场景下,每台机器人都有独立命名空间,接口互不冲突。


2. 部署

本章面向已经拿到 ros_integrate 交付包、需要在目标 ARCS 机器上安装的现场用户。

ros_integrate 不是一个独立启动的 ROS2 应用,而是作为 aubo_control 的扩展模块加载。部署过程会同时落地以下内容:

  • 最小 ROS2 Humble 运行时
  • ros_integrate 扩展模块
  • Lua 侧 ros_bind 模块
  • C++ 侧共享库 libros_bindings.so
  • aubo_control.conf 中的扩展加载配置

部署完成后,aubo_control 启动时会加载 ros_integrate,再由插件发布 Topic、注册 Service 和 Action。


2.1 部署包内容

标准部署包中通常包含以下目录和文件:

.
├── setup.sh
├── ros2_runtime/
│   ├── minimal-ros2-humble-x86_64-v0.3.zip
│   └── minimal-ros2-humble-arm64-v0.3.zip
├── extensions/
│   └── ros_integrate/
│       └── ros_integrate.so
└── lib/
    ├── libros_bindings.so
    └── lua/
        └── 5.3/
            └── ros_bind.so

其中:

  • setup.sh 是目标机部署入口
  • ros2_runtime/ 存放不同架构的最小 ROS2 运行时压缩包
  • extensions/ros_integrate/ros_integrate.so 是 ARCS 扩展模块
  • lib/lua/5.3/ros_bind.so 是示教器 / ARCS Lua 侧加载的 ros_bind 模块
  • lib/libros_bindings.so 是 C++ 与 Lua 共用的 ROS2 bridge 库

不同交付版本的压缩包文件名可能略有变化,实际部署时以交付包中的文件为准。

如果在源码仓库中调试部署脚本,脚本路径为 scripts/setup.sh;如果使用构建后的交付包,脚本位于交付包根目录。


2.2 目标机前置条件

目标机需要满足以下条件:

  • 系统中已经安装并配置 ARCS / aubo_control
  • /etc/default/arcs 存在,并能提供 ARCS_WS 环境变量
  • ARCS_WS/config/aubo_control.conf 文件存在
  • 当前用户具备写入 /opt/$ARCS_WS/etc/ld.so.conf.d/ 的权限
  • 系统中已安装 rsyncunzip

可用以下命令检查基础环境:

source /etc/default/arcs
echo "$ARCS_WS"
test -f "$ARCS_WS/config/aubo_control.conf"
command -v rsync
command -v unzip

如果上述检查失败,应先修复 ARCS 环境或安装缺失命令。


2.3 执行部署

进入部署包根目录后,建议先执行预演:

./setup.sh --dry-run

预演模式只打印将要执行的复制、解压和配置动作,不真正写入系统。

确认路径和文件无误后,再执行正式部署:

sudo ./setup.sh

脚本会根据 uname -m 自动选择运行时包:

  • x86_64:使用 x86_64 运行时压缩包
  • aarch64:使用 arm64 运行时压缩包

当前脚本会将运行时解压并合并到 /opt/,将扩展、Lua 模块和共享库同步到 ARCS 工作区。


2.4 脚本写入内容

setup.sh 的主要落地动作如下:

  1. /etc/default/arcs 读取 ARCS_WS
  2. 解压 ros2_runtime/ 中匹配当前架构的最小 ROS2 运行时
  3. 将运行时目录合并到 /opt/
  4. extensions/ 合并到 $ARCS_WS/extensions
  5. lib/lua/ 合并到 $ARCS_WS/lib/lua
  6. lib/libros_bindings.so 覆盖到 $ARCS_WS/lib/libros_bindings.so
  7. aubo_control.conf 中还没有 alias = "ros_integrate",则追加扩展配置
  8. 写入 /etc/ld.so.conf.d/ros-humble.conf
  9. 执行 ldconfig

目录同步采用合并覆盖策略:

  • 源目录中同名文件会覆盖目标文件
  • 目标目录中多余文件不会被删除

2.5 aubo_control.conf 配置

首次部署时,脚本会向 $ARCS_WS/config/aubo_control.conf 追加类似配置:

[[Extension]]
    location = "extensions/ros_integrate/ros_integrate.so"
    bundle = "ros_integrate"

    alias = "ros_integrate"
    enable = true

    min_arcs_version = "0.32.1-rc.6"

    [Extension.options]
        ament_prefix_env = "/opt/ros/humble"
        ld_library_env = "/opt/ros/humble/lib"
        rmw_imp = "rmw_fastrtps_cpp"
        domin_id = 0
        localhost_only = 0
        legacy_names = false
        debug = false
        enable_fjt_action = false
        fjt_mode = 1
        fjt_time = 0.0
        fjt_send_period = 0.0
        # rmw_qos_file = "/root/arcs_ws/config/fastdds.xml"

关键字段说明:

  • location:扩展模块在 ARCS 工作区中的相对路径
  • bundle:扩展包名
  • alias:扩展别名,脚本用它判断是否已经配置过
  • enable:是否启用该扩展
  • min_arcs_version:最低 ARCS 版本要求

Extension.options 字段说明:

  • ament_prefix_env:ROS2 安装前缀路径,用于补齐 AMENT_PREFIX_PATH;默认可使用 /opt/ros/humble
  • ld_library_env:ROS2 动态库路径,用于补齐 LD_LIBRARY_PATH;默认可使用 /opt/ros/humble/lib
  • rmw_imp:指定 ROS2 RMW 实现,例如 rmw_fastrtps_cpp;留空时按当前环境默认实现处理
  • domin_id:设置 ROS_DOMAIN_ID;该字段名沿用当前代码拼写,配置时需要写作 domin_id
  • localhost_only:设置 ROS_LOCALHOST_ONLY0 表示允许跨主机通信,1 表示仅本机通信
  • legacy_names:是否启用旧命名兼容;一般新项目保持 false
  • debug:是否打开 ros_integrate 调试日志;现场排查问题时可临时设为 true
  • enable_fjt_action:是否默认启用 FollowJointTrajectory Action 入口;启用入口不代表后台自动占用伺服控制权
  • fjt_modeFollowJointTrajectory 默认伺服模式;未通过 joint_cmd 动态覆盖时使用该值
  • fjt_timeFollowJointTrajectory 每个轨迹点传给底层伺服接口的默认时间参数;0.0 表示自动使用轨迹相邻点时间差
  • fjt_send_periodFollowJointTrajectory 会话默认发送周期;0.0 表示使用内部默认发送周期
  • rmw_qos_file:可选 Fast DDS QoS XML 文件路径;配置后会启用 FASTRTPS_DEFAULT_PROFILES_FILERMW_FASTRTPS_USE_QOS_FROM_XML

如果运行时又通过 joint_cmd 发送 fjt_control JSON,enable_fjt_actionfjt_modefjt_timefjt_send_period 的运行时命令会优先生效。


2.6 部署后检查

部署完成后,建议按以下顺序检查。

检查文件是否落地

source /etc/default/arcs
test -f "$ARCS_WS/extensions/ros_integrate/ros_integrate.so"
test -f "$ARCS_WS/lib/lua/5.3/ros_bind.so"
test -f "$ARCS_WS/lib/libros_bindings.so"
test -d /opt/ros/humble

检查动态库路径

cat /etc/ld.so.conf.d/ros-humble.conf
ldconfig -p | grep libros_bindings

检查配置是否追加

grep -n 'alias = "ros_integrate"' "$ARCS_WS/config/aubo_control.conf"

检查 ROS2 接口是否出现

启动或重启 aubo_control 后,在 ROS2 环境中查看接口:

source /opt/ros/humble/setup.bash
ros2 topic list | grep joint_states
ros2 service list | grep RobotConfig
ros2 action list | grep servo_mode

若机器人名为 rob1,通常应能看到类似接口:

  • /rob1/joint_states
  • /rob1/joint_cmd
  • /rob1/servo_mode
  • /rob1/RobotConfig/getDof

2.7 常见问题

找不到 /etc/default/arcs

说明目标机 ARCS 环境尚未按预期配置。应先确认 ARCS 安装是否完整,并确保 /etc/default/arcs 中存在 ARCS_WS

提示缺少 rsyncunzip

安装缺失命令后重新执行部署:

sudo apt-get update
sudo apt-get install -y rsync unzip

提示找不到运行时压缩包

确认部署包中存在 ros2_runtime/ 目录,并且其中有与当前架构匹配的最小 ROS2 运行时包。

可用以下命令查看当前架构:

uname -m

部署后没有 ROS2 Topic 或 Service

建议依次检查:

  1. aubo_control.conf 中是否存在并启用了 ros_integrate
  2. ros_integrate.so 是否已经同步到 $ARCS_WS/extensions/ros_integrate/
  3. /opt/ros/humble$ARCS_WS/lib 是否已经写入动态库搜索路径
  4. aubo_control 是否已经重启
  5. ROS_DOMAIN_IDROS_LOCALHOST_ONLY 是否与外部 ROS2 节点一致

重复执行部署是否安全

部署脚本支持重复执行。目录会合并覆盖,aubo_control.conf 中如果已经存在 alias = "ros_integrate",脚本不会重复追加同一段扩展配置。


3. Lua 模块

系统提供一个面向示教器 / ARCS 脚本侧使用的 Lua 5.3 模块:

  • ros_bind

Lua 侧典型使用方式为:

local ros_bind = require("ros_bind")

这个模块主要用于示教器侧脚本和 ROS2 系统之间的通信。常用方式包括:

  • 使用 ros_publisher_factory 创建一个 Topic 发布对象
  • 使用 ros_subscriber_factory 创建一个 Topic 订阅对象

创建时通常需要提供:

  • Topic 名称
  • 消息类型名称
  • 可选的队列深度参数 keep_last

典型接入场景如下:

  • 在示教器 / ARCS 脚本中发布或订阅标准 ROS2 Topic
  • 在示教器侧脚本环境中与外部 ROS2 节点交换数据
  • 以脚本方式实现轻量级通信逻辑接入

注意事项:

  • std_msgs 原子类型,Lua 可以直接使用 stringbooleannumber
  • 对非原子消息,不提供 Lua table 到 ROS 消息的自动映射
  • 非原子消息通常需要以二进制序列化后的 Lua string 形式收发
  • keep_last 是可选队列深度参数,省略时默认值为 10

最小示例如下:

local ros_bind = require("ros_bind")

local pub = ros_bind.ros_publisher_factory(
  "/demo/from_lua",
  "std_msgs/msg/String"
)

local sub = ros_bind.ros_subscriber_factory(
  "/demo/from_ros",
  "std_msgs/msg/String"
)

pub:write("hello from lua")

local ok, msg = sub:read_timeout(1000)
if ok then
  textmsg("recv:", msg)
else
  textmsg("timeout")
end

4. Topic 模块

4.1 /<robot_name>/joint_states Topic

joint_states 的作用

发布机器人当前关节状态。

joint_states 的典型用途

  • 获取当前关节角
  • 监控机器人状态
  • 为上层规划、记录、可视化提供输入

joint_states 的使用建议

在执行伺服控制、轨迹回放、闭环控制前,先确认该 Topic 正常更新。


4.2 /<robot_name>/joint_cmd Topic

joint_cmd 的作用

该 Topic 是连续伺服控制场景下的主要输入接口。客户端向该 Topic 发布字符串消息,服务端将其解析为伺服控制指令。

joint_cmd 支持的输入格式

推荐使用 JSON 协议。

4.2.1 JSON 格式

支持两种 type

  • servo_control
  • servo_point

这两类协议为推荐输入格式,详见第 7 章。


5. Service 模块

系统提供了大量 ROS2 Service,用于统一访问 Aubo 控制能力。


5.1 Service 命名规则

统一格式:

/<robot_name>/<Class>/<Method>

例如:

  • /rob1/RobotConfig/getDof
  • /rob1/RobotState/getJointPositions
  • /rob1/MotionControl/moveJoint
  • /rob1/RobotAlgorithm/inverseKinematics

5.2 Service 分类总览

主要服务类别包括:

  • Math
  • Serial
  • Socket
  • SystemInfo
  • RuntimeMachine
  • RegisterControl
  • SyncMove
  • Trace
  • RobotConfig
  • MotionControl
  • ForceControl
  • IoControl
  • RobotAlgorithm
  • RobotManage
  • RobotState

说明: 并非清单中的所有历史接口都已对外开放。若某些接口在现场环境中无法调用或未提供,请以当前发布版本实际开放情况为准。


5.3 Service 通用调用方法

所有 Service 的调用方式基本一致:

  1. 确认机器人命名空间
  2. 确认 Service 路径
  3. 按对应 .srv 类型构造请求
  4. 调用后检查:
    • code
    • message
    • result

一般情况下,Service 返回值至少包含:

  • code:调用状态码
  • message:描述信息
  • result:实际结果

通用判定建议

  • code == 0:一般表示调用成功
  • code != 0:表示失败或异常,需要查看 message

5.4 Service 分类与用途

下面按功能分类说明各类 Service 的常见用途。


5.4.1 RobotConfig:机器人配置查询与设置

常见用途:

  • 查询自由度:getDof
  • 查询机器人名称:getName
  • 查询周期:getCycletime
  • 读取/设置关节限制、TCP 限制
  • 设置碰撞等级、重力、负载、TCP 偏移

典型使用场景:

  • 上位机启动时读取 DOF
  • 控制前读取控制周期
  • 工艺切换时更新工具或负载参数

示例中的关键调用:

  • /<robot>/RobotConfig/getDof

用途:获取关节自由度,用来校验 joint_cmd 点长度和控制参数。


5.4.2 RobotState:机器人状态查询

常见用途:

  • 获取当前关节位置:getJointPositions
  • 判断机器人是否稳定:isSteady
  • 获取各种当前状态量

示例中的关键调用:

  • /<robot>/RobotState/getJointPositions
  • /<robot>/RobotState/isSteady

典型使用流程:

  1. getJointPositions 获取当前姿态
  2. 若需要先移动到起始点,则调用 moveJoint
  3. 再轮询 isSteady 等待机器人稳定

5.4.3 MotionControl:运动与伺服控制

这是运动控制相关的核心 Service 集合。

常见能力包括:

  • moveJoint
  • moveLine
  • moveCircle
  • servoJoint
  • servoCartesian
  • setServoMode
  • isServoModeEnabled
  • getExecId
  • stopMove
  • startMove

示例中的关键调用:

  • /<robot>/MotionControl/moveJoint
  • /<robot>/MotionControl/getExecId

moveJoint 的典型用途

在正式开始伺服之前,可先将机器人移动到轨迹起始点。

示例参数含义:

  • q:目标关节位置
  • a:加速度
  • v:速度
  • blend_radius:混合半径
  • duration:持续时间

getExecId 的典型用途

用于判断运动是否已经开始或结束:

  • -1 通常表示当前空闲
  • -1 通常表示有运动任务正在执行

示例脚本通过该接口判断 moveJoint 是否执行完成。


5.4.4 RobotAlgorithm:算法服务

常见用途:

  • 逆运动学:inverseKinematics
  • 正运动学
  • 姿态变换与算法计算

示例中的关键调用:

  • /<robot>/RobotAlgorithm/inverseKinematics

典型使用场景:

  • 当轨迹选择 cartesian 类型时,首点如果要先通过 moveJoint 到位,需要先做逆解,得到对应关节角

5.4.5 Math:数学工具服务

包含位姿运算、姿态转换、坐标变换等工具,例如:

  • poseAdd
  • poseSub
  • poseTrans
  • poseInverse
  • rpyToQuaternion
  • quaternionToRpy

适用场景:

  • 坐标变换
  • 工具姿态计算
  • 上位机几何辅助计算

5.4.6 RuntimeMachine 类服务

用于程序加载、启动、停止、暂停、恢复等运行时控制,例如:

  • loadProgram
  • runProgram
  • start
  • stop
  • abort
  • pause
  • resume

适用场景:

  • 控制机器人程序执行流程
  • 与示教/运行环境联动

5.4.7 RegisterControl 类服务

用于寄存器、变量、modbus 等控制,例如:

  • getBoolInput / setBoolInput
  • getInt32Register / setInt16Register
  • setWatchDog
  • modbusAddSignal
  • modbusGetSignalStatus

适用场景:

  • 与 PLC / 工业 IO / modbus 设备交互
  • 在机器人任务中共享变量

5.4.8 SyncMove 类服务

用于同步运动、坐标系管理、轴组控制,例如:

  • syncMoveSegment
  • syncMoveOff
  • frameAdd
  • frameAttach
  • axisGroupMoveJoint

适用场景:

  • 多轴联动
  • 外部轴组控制
  • 工件坐标系管理

5.4.9 ForceControl 类服务

用于力控相关接口,例如:

  • fcEnable
  • fcDisable
  • isFcEnabled
  • fcSetSensorThresholds
  • fcSetSensorLimits

适用场景:

  • 柔顺控制
  • 力传感器阈值配置
  • 力控工艺

5.4.10 Serial / Socket 类服务

用于串口和网络通信:

  • serialOpen / serialClose
  • serialReadString / serialSendString
  • socketOpen / socketClose
  • socketReadString / socketSendString

适用场景:

  • 与外部设备通信
  • 工装、扫码枪、传感器接入

5.4.11 SystemInfo / Trace 类服务

  • SystemInfo:版本、构建信息、系统时间
  • Trace:文本日志、提示、通知

适用场景:

  • 系统诊断
  • 状态追踪
  • 联调日志输出

5.5 Service 使用建议

  1. 所有调用前先确认机器人命名空间
  2. 优先检查 code == 0
  3. 对运动类接口,调用后结合状态接口判断是否真正执行完成
  4. 对算法类和配置类接口,注意参数维度与单位
  5. 多机器人时不要混用不同前缀的 service 路径

6. Action 模块

核心 Action 为:

  • /<robot_name>/servo_mode

该 Action 本身不直接承载轨迹点,而用于补充伺服会话管理与状态反馈。

在本文档中,可将该 Action 视为扩展能力接口,而非基础调用入口。

  • 推荐通过 /<robot_name>/joint_cmd 发送 servo_control + servo_point
  • 联调时建议优先采用第 7 章定义的 JSON 协议
  • servo_mode Action 更适合作为扩展接口或补充接口使用

6.1 Action 名称

例如机器人名为 rob1

  • /rob1/servo_mode

6.2 Action 的职责

servo_mode Action 负责:

  • 检查是否允许进入伺服模式
  • 建立伺服会话
  • 在会话期间持续提供 feedback
  • 检查 watchdog 超时
  • 处理取消、错误、会话关闭

而真正的目标点数据仍然通过:

  • /<robot_name>/joint_cmd

持续发送。

也就是说:

  • Action 管会话
  • Topic 管点流

推荐控制路径如下:

  • 先向 /<robot_name>/joint_cmd 发送 servo_control
  • 再持续发送 servo_point
  • 结束时再发送 servo_control stop

6.3 模式简介

无论通过 joint_cmd 还是 Action 管理伺服过程,均会涉及伺服模式号。

常见模式说明如下:

  • 1:截断式规划伺服,实时调整目标点和规划路线
  • 2:透传模式,直接下发
  • 3:透传模式,带缓存
  • 4:1ms 透传模式,带缓存
  • 5:规划伺服模式,强调经过所有目标点
  • 6:截断式规划伺服,可叠加力控
  • 7:规划伺服模式,可叠加力控

初次联调时,建议优先从行为更易观测的关节模式开始,再逐步切换至复杂模式。


6.4 Action 的适用场景

建议在以下场景中使用 Action:

  • 已完成 Service 与 joint_cmd 的基础联调
  • 需要额外的会话状态反馈
  • 需要基于 goal / feedback / result 机制管理伺服过程
  • 需要排查伺服会话被拒绝、取消或提前结束等问题

servo_mode Action 的字段、反馈、结果与示例见附录。


7. joint_cmd JSON 协议

这是主要用户接口之一。

joint_cmd 支持两类 JSON:

  • servo_control
  • servo_point

7.1 servo_control 协议

servo_control 的作用

用于开启、切换或停止伺服会话。

servo_control 标准格式

{
  "type": "servo_control",
  "command": "set_mode",
  "mode": 1,
  "watchdog_timeout": 0.5,
  "send_period": 0.005
}

servo_control 字段说明

字段 servo_control.type

固定为:

  • servo_control

字段 command

支持:

  • stop
  • set_mode

含义:

  • stop:停止伺服会话
  • set_mode:设置并启动伺服模式

字段 mode

  • 类型:整数
  • 作用:指定伺服模式
  • 规则:
    • 示例中使用 17
    • 0 通常表示退出伺服模式
    • 1:截断式规划伺服模式,会实时调整目标点和规划路线
    • 2:透传模式,直接下发
    • 3:透传模式,缓存
    • 4:1ms 透传模式,缓存
    • 5:规划伺服模式,保证经过所有目标点
    • 6:截断式规划伺服模式,可叠加力控
    • 7:规划伺服模式,可叠加力控

字段 servo_control.watchdog_timeout

  • 类型:浮点数,单位秒
  • 作用:设置看门狗超时
  • 规则:
    • 推荐不小于 0.05
    • 取值过小时可能导致会话因超时而退出

字段 send_period

  • 类型:浮点数,单位秒
  • 作用:期望发送周期
  • 规则:
    • 如果未设置或小于等于 0,则使用机器人控制周期

典型用法

启动伺服

{
  "type": "servo_control",
  "command": "set_mode",
  "mode": 4,
  "watchdog_timeout": 0.5,
  "send_period": 0.02
}

停止伺服

{
  "type": "servo_control",
  "command": "stop"
}

7.2 servo_point 协议

servo_point 的作用

在已启动的伺服会话中发送目标点。

servo_point 标准格式

{
  "type": "servo_point",
  "session_id": 123456789,
  "point_id": 1,
  "point_type": "joint",
  "position": [0, 0, 0, 0, 0, 0],
  "time": 0.02,
  "lookahead_time": 0.1,
  "gain": 200.0
}

servo_point 字段说明

字段 servo_point.type

固定为:

  • servo_point

如果未传 type,实现通常仍按点命令解析;工程使用中仍建议显式传入。

字段 session_id

  • 类型:整数
  • 作用:标识一次伺服会话
  • 规则:
    • 建议一次会话内保持不变
    • 若服务端首次收到带 session_id 的点,会锁定该值
    • 后续若切换为其他 session_id,该点会被丢弃

字段 point_id

  • 类型:整数
  • 作用:标识点序号
  • 规则:
    • JSON 点流必须带 point_id
    • 必须严格递增
    • point_id 小于等于上一次已接受点,则该点会被丢弃

字段 point_type

支持:

  • joint
  • cartesian

也支持数值:

  • 0 表示 joint
  • 1 表示 cartesian

字段 position

  • 类型:数组
  • 规则:
    • point_type = joint 时,长度必须等于机器人 DOF
    • point_type = cartesian 时,长度必须为 6

字段 lookahead_time

  • 类型:浮点数
  • 作用:伺服前瞻时间
  • positionServo 实现使用该参数

字段 gain

  • 类型:浮点数
  • 作用:伺服增益
  • positionServo 实现使用该参数

字段 acceleration

  • 兼容预留字段
  • 实现不依赖该字段

字段 velocity

  • 兼容预留字段
  • 实现不依赖该字段

字段 time

  • 类型:浮点数
  • 作用:点对应的伺服时间参数
  • 示例中显式发送该字段
  • 若未单独指定,通常可与 send_period 保持一致

7.3 servo_point 的处理规则

关节点模式 joint 说明

  • 直接将 position 作为关节目标
  • 长度必须等于 DOF
  • 适合作为初始联调输入类型

笛卡尔模式 cartesian 说明

  • position 必须为 6 维
  • 服务端会先根据当前关节状态做逆运动学
  • 再将逆解结果下发给位置伺服
  • 该模式仍在持续优化中,现场使用前建议完成充分联调与稳定性验证

综合考虑如下:

  • cartesian 更灵活
  • 但依赖逆解成功
  • 初次联调建议优先使用 joint

7.4 JSON 点流有效性规则

对于 JSON 点流,以下条件非常关键:

  1. 必须处于活动伺服会话中
  2. point_id 必须存在
  3. point_id 必须严格递增
  4. 若会话已锁定 session_id,后续点必须保持相同 session_id
  5. position 维度必须正确
  6. cartesian 点,逆解必须成功

否则该点可能被直接丢弃,或导致命令执行失败。


7.5 推荐发送策略

建议每次会话都使用:

  • 一个固定 session_id
  • 1 开始递增的 point_id
  • 稳定的 send_period

该策略便于问题定位,也更符合服务端处理逻辑。


8. 附录:示例脚本说明

在完成前述接口与调用流程理解后,可再查阅本章中的完整脚本与补充说明。

8.0 完整脚本

#!/usr/bin/env python3

"""
ros_control 伺服路径的最小 joint_cmd 示例。

示例功能说明:
- 发送 servo_control JSON 以建立或退出服务端 servo session
- 发送 servo_point JSON 作为 latest-reference 关节目标点流
- 支持从当前位姿密集插补至全 0 位的测试,以及 csv 轨迹回放
- 在 csv 模式下支持继续选择点位类型(joint/cartesian)

采用的 JSON 协议:
- servo_control: {"type":"servo_control","command":"set_mode"/"stop",...}
- servo_point: {"type":"servo_point","session_id":...,"point_id":...,
  "point_type":"joint","position":[...],"time":...,
  "lookahead_time":...,"gain":...}

说明:
- `--servo-mode 0` 表示退出伺服模式
- `--servo-mode 1` 表示截断式规划伺服模式,会实时调整目标点和规划路线
- `--servo-mode 2` 表示透传模式(直接下发)
- `--servo-mode 3` 表示透传模式(缓存)
- `--servo-mode 4` 表示 1ms 透传模式(缓存)
- `--servo-mode 5` 表示规划伺服模式,能保证经过所有目标点
- `--servo-mode 6` 表示截断式规划伺服模式,可叠加力控
- `--servo-mode 7` 表示规划伺服模式,可叠加力控
- 所有伺服模式都会发送 `send_period` 和 `time`

常用测试示例:
- current:
  uv run python example_joint_cmd_position_servo.py rob1 --mode current --servo-mode 4 --period 0.02 --time 0.02
- current:
  uv run python example_joint_cmd_position_servo.py rob1 --mode current --servo-mode 3 --period 0.02 --time 0.02
- csv:
  uv run python example_joint_cmd_position_servo.py rob1 --mode csv --servo-mode 3 --traj /path/to/traj.csv --point-type joint --period 0.02 --time 0.02
"""

import argparse
import json
import os
import subprocess
import sys
import time

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

from aubo_interfaces.srv import (
    GetDof,
    GetExecId,
    GetJointPositions,
    InverseKinematics,
    IsSteady,
    MoveJoint,
)


DEFAULT_TRAJ = '/root/GitLab/aubo_sdk/example/trajs/record6.offt'


def parse_args():
    parser = argparse.ArgumentParser(
        description='Test servo control through joint_cmd JSON.',
        formatter_class=argparse.RawTextHelpFormatter,
        epilog=(
            'Examples:\n'
            '  uv run python example_joint_cmd_position_servo.py rob1 --mode current --servo-mode 4 --period 0.02 --time 0.02\n'
            '  uv run python example_joint_cmd_position_servo.py rob1 --mode current --servo-mode 3 --period 0.02 --time 0.02\n'
            '  uv run python example_joint_cmd_position_servo.py rob1 --mode csv --servo-mode 3 --traj /path/to/traj.csv --point-type joint --period 0.02 --time 0.02'
        ),
    )
    parser.add_argument('robot', nargs='?', default='')
    parser.add_argument('--mode', choices=('current', 'csv'), default='')
    parser.add_argument(
        '--traj',
        default='',
        help='csv 模式下指定轨迹文件路径;未指定时会在交互模式下提示输入',
    )
    parser.add_argument(
        '--point-type',
        choices=('joint', 'cartesian'),
        default='',
        help='csv 模式下指定点位类型;未指定时会在交互模式下提示选择',
    )
    parser.add_argument('--period', type=float, default=-1.0)
    parser.add_argument('--watchdog', type=float, default=0.5)
    parser.add_argument(
        '--servo-mode',
        type=int,
        choices=(1, 2, 3, 4, 5, 6, 7),
        default=0,
        help='伺服模式号;未指定时可交互选择',
    )
    parser.add_argument(
        '--time',
        type=float,
        default=-1.0,
        help='servo_point.time;未指定时会在交互模式下提示输入',
    )
    parser.add_argument('--gain', type=float, default=200.0)
    parser.add_argument('--lookahead-time', type=float, default=0.1)
    parser.add_argument(
        '--interp-step',
        type=float,
        default=0.005,
        help='current 模式下,从当前位姿插补到全 0 位时的单关节最大步长',
    )
    parser.add_argument('--prefeed', type=int, default=10)
    parser.add_argument('--hold', type=int, default=10)
    parser.add_argument('--delta', type=float, default=0.02)
    parser.add_argument('--joint-index', type=int, default=0)
    parser.add_argument('--move-to-first', action='store_true', default=True)
    return parser.parse_args()


class JointCmdPositionServoTest(Node):
    def __init__(self, args):
        super().__init__('example_joint_cmd_position_servo')
        self.args = args
        self._argv = sys.argv[1:]
        self.args.servo_mode = self._resolve_servo_mode()
        self.robot = self._resolve_robot_name(args.robot)
        self.args.mode = self._resolve_run_mode()
        self._resolve_servo_timing()
        self.pub = self.create_publisher(String, f'/{self.robot}/joint_cmd', 20)
        self.get_dof_cli = self.create_client(
            GetDof, f'/{self.robot}/RobotConfig/getDof'
        )
        self.get_joint_positions_cli = self.create_client(
            GetJointPositions, f'/{self.robot}/RobotState/getJointPositions'
        )
        self.get_exec_id_cli = self.create_client(
            GetExecId, f'/{self.robot}/MotionControl/getExecId'
        )
        self.is_steady_cli = self.create_client(
            IsSteady, f'/{self.robot}/RobotState/isSteady'
        )
        self.move_joint_cli = self.create_client(
            MoveJoint, f'/{self.robot}/MotionControl/moveJoint'
        )
        self.inverse_kinematics_cli = self.create_client(
            InverseKinematics, f'/{self.robot}/RobotAlgorithm/inverseKinematics'
        )

        self._session_id = 0
        self._point_id = 0
        self._traj_points = []
        self._point_type = 'joint'
        self.dof = self._resolve_dof()
        if self.args.joint_index >= self.dof:
            self.args.joint_index = max(0, self.dof - 1)

        log_text = (
            f'robot={self.robot} mode={self.args.mode} '
            f'joint_cmd_topic=/{self.robot}/joint_cmd '
            f'servo_mode={self.args.servo_mode} period={self.args.period:.4f}s '
            f'dof={self.dof}'
        )
        if self.args.mode == 'csv':
            log_text += f' traj={self.resolve_traj_path()}'
        self.get_logger().info(log_text)

    def _arg_was_provided(self, name):
        return any(arg == name or arg.startswith(f'{name}=') for arg in self._argv)

    def _resolve_run_mode(self):
        if self.args.mode in ('csv', 'current'):
            return self.args.mode

        env_value = os.environ.get('AUBO_RUN_MODE', '').strip().lower()
        if env_value in ('1', 'csv', 'csv_playback'):
            return 'csv'
        if env_value in ('2', 'current', 'current_start', 'prefeed'):
            return 'current'

        if not sys.stdin.isatty():
            return 'csv'

        print('选择运行模式:')
        print('  1. csv     - 使用指定轨迹文件回放')
        print('  2. current - 从当前位姿密集插补到全 0 位')
        while True:
            try:
                choice = input('模式 [1/2] (默认 1): ').strip().lower()
            except EOFError:
                return 'csv'

            if choice in ('', '1', 'csv', 'csv_playback'):
                return 'csv'
            if choice in ('2', 'current', 'current_start', 'prefeed'):
                return 'current'
            print(f'无效选择: {choice!r},请输入 1 或 2。')

    def _resolve_robot_name(self, cli_value):
        candidate = cli_value.strip().lstrip('/')
        if candidate:
            return candidate

        env_value = os.environ.get('AUBO_ROBOT_NAME', '').strip().lstrip('/')
        if env_value:
            return env_value

        robot_names = self._discover_servo_robots()
        if robot_names:
            return self._select_robot_name(robot_names)

        return 'rob1'

    def _resolve_servo_mode(self):
        if self.args.servo_mode in (1, 2, 3, 4, 5, 6, 7):
            return self.args.servo_mode

        env_value = os.environ.get('AUBO_SERVO_MODE', '').strip().lower()
        if env_value in ('1', '2', '3', '4', '5', '6', '7'):
            return int(env_value)
        if env_value in ('position', 'positionservo', 'mode4'):
            return 4
        if env_value in ('mode1',):
            return 1
        if env_value in ('mode2',):
            return 2
        if env_value in ('servojoint', 'joint', 'mode3'):
            return 3
        if env_value in ('mode5',):
            return 5
        if env_value in ('mode6',):
            return 6
        if env_value in ('mode7',):
            return 7

        if not sys.stdin.isatty():
            return 4

        print('选择伺服模式:')
        print('  1. mode 1  - 截断式规划伺服,实时调整目标点/规划路线')
        print('  2. mode 2  - 透传模式(直接下发)')
        print('  3. mode 3  - 透传模式(缓存)')
        print('  4. mode 4  - 1ms 透传模式(缓存)')
        print('  5. mode 5  - 规划伺服,保证经过所有目标点')
        print('  6. mode 6  - 截断式规划伺服,可叠加力控')
        print('  7. mode 7  - 规划伺服,可叠加力控')
        while True:
            try:
                choice = input('伺服模式 [1/2/3/4/5/6/7] (默认 4): ').strip().lower()
            except EOFError:
                return 4

            if choice in ('', '4', 'position', 'positionservo', 'mode4'):
                return 4
            if choice in ('1', 'mode1'):
                return 1
            if choice in ('2', 'mode2'):
                return 2
            if choice in ('3', 'servojoint', 'joint', 'mode3'):
                return 3
            if choice in ('5', 'mode5'):
                return 5
            if choice in ('6', 'mode6'):
                return 6
            if choice in ('7', 'mode7'):
                return 7
            print(f'无效选择: {choice!r},请输入 1、2、3、4、5、6 或 7。')

    def _resolve_servo_timing(self):
        period_provided = self._arg_was_provided('--period')
        time_provided = self._arg_was_provided('--time')

        if not period_provided:
            env_period = os.environ.get('AUBO_SERVO_PERIOD', '').strip()
            if env_period:
                try:
                    value = float(env_period)
                    if value > 0.0:
                        self.args.period = value
                        period_provided = True
                except ValueError:
                    pass

        if not time_provided:
            env_time = os.environ.get('AUBO_SERVO_TIME', '').strip()
            if env_time:
                try:
                    value = float(env_time)
                    if value >= 0.0:
                        self.args.time = value
                        time_provided = True
                except ValueError:
                    pass

        if not sys.stdin.isatty():
            if not period_provided:
                self.args.period = 0.02
            if self.args.time < 0.0:
                self.args.time = 0.02
            return

        if not period_provided:
            while True:
                try:
                    value = input(
                        'send_period (s) [默认 0.0200]: '
                    ).strip()
                except EOFError:
                    value = ''

                if value == '':
                    self.args.period = 0.02
                    break
                try:
                    parsed = float(value)
                except ValueError:
                    print(f'无效数值: {value!r},请输入正数。')
                    continue
                if parsed <= 0.0:
                    print(f'无效数值: {value!r},请输入正数。')
                    continue
                self.args.period = parsed
                break

        if not time_provided:
            while True:
                try:
                    value = input(
                        'time (s) [默认 0.0200]: '
                    ).strip()
                except EOFError:
                    value = ''

                if value == '':
                    self.args.time = 0.02
                    break
                try:
                    parsed = float(value)
                except ValueError:
                    print(f'无效数值: {value!r},请输入非负数。')
                    continue
                if parsed < 0.0:
                    print(f'无效数值: {value!r},请输入非负数。')
                    continue
                self.args.time = parsed
                break

        if self.args.time < 0.0:
            self.args.time = 0.02

    def _discover_servo_robots(self):
        try:
            result = subprocess.run(
                ['ros2', 'action', 'list'],
                check=False,
                capture_output=True,
                text=True,
                timeout=2.0,
            )
        except Exception:
            return self._discover_joint_cmd_robots()

        robot_names = []
        for line in result.stdout.splitlines():
            action_name = line.strip()
            if action_name.endswith('/servo_mode'):
                robot_names.append(action_name.strip('/').split('/')[0])

        robot_names = sorted(set(robot_names))
        if robot_names:
            return robot_names
        return self._discover_joint_cmd_robots()

    def _discover_joint_cmd_robots(self):
        try:
            result = subprocess.run(
                ['ros2', 'topic', 'list'],
                check=False,
                capture_output=True,
                text=True,
                timeout=2.0,
            )
        except Exception:
            return []

        robot_names = []
        for line in result.stdout.splitlines():
            topic = line.strip()
            if topic.endswith('/joint_cmd'):
                robot_names.append(topic.strip('/').split('/')[0])
        return sorted(set(robot_names))

    def _select_robot_name(self, robot_names):
        if len(robot_names) == 1:
            self.get_logger().info(f'found 1 robot: {robot_names[0]}')
            return robot_names[0]

        for idx, robot_name in enumerate(robot_names, start=1):
            self.get_logger().info(f'  {idx}. {robot_name}')

        if not sys.stdin.isatty():
            self.get_logger().warning(
                f'non-interactive stdin detected, defaulting to {robot_names[0]}'
            )
            return robot_names[0]

        while True:
            try:
                choice = input(
                    f'选择机器人 [1-{len(robot_names)}] '
                    f'(默认 1: {robot_names[0]}): '
                ).strip()
            except EOFError:
                self.get_logger().warning(
                    f'input closed, defaulting to {robot_names[0]}'
                )
                return robot_names[0]

            if choice == '':
                return robot_names[0]
            if choice.isdigit():
                index = int(choice)
                if 1 <= index <= len(robot_names):
                    return robot_names[index - 1]
            print(f'无效选择: {choice!r},请输入编号。')

    def call_srv(self, client, request, name, timeout_sec=3.0):
        if not client.wait_for_service(timeout_sec=timeout_sec):
            self.get_logger().error(f'service not available: {name}')
            return None
        future = client.call_async(request)
        rclpy.spin_until_future_complete(self, future)
        response = future.result()
        if response is None:
            self.get_logger().error(f'service call failed: {name}')
        return response

    def _resolve_dof(self):
        resp = self.call_srv(
            self.get_dof_cli,
            GetDof.Request(),
            f'/{self.robot}/RobotConfig/getDof',
        )
        if resp is not None and resp.code == 0 and resp.result > 0:
            return int(resp.result)
        return 6

    def get_joint_positions(self):
        resp = self.call_srv(
            self.get_joint_positions_cli,
            GetJointPositions.Request(),
            f'/{self.robot}/RobotState/getJointPositions',
        )
        if resp is None or resp.code != 0:
            self.get_logger().error('getJointPositions failed')
            return None
        values = list(resp.result)
        if values:
            self.dof = len(values)
        return values

    def get_exec_id(self):
        resp = self.call_srv(
            self.get_exec_id_cli,
            GetExecId.Request(),
            f'/{self.robot}/MotionControl/getExecId',
        )
        if resp is None:
            return None
        if resp.code != 0:
            self.get_logger().warning(
                f'getExecId code={resp.code} message={resp.message}'
            )
        return resp.result

    def is_near_start_pose(self, tolerance=1e-3):
        if not self._traj_points:
            self.get_logger().error('trajectory not loaded')
            return False

        current_q = self.get_joint_positions()
        if current_q is None:
            return False

        start_q = self.resolve_move_joint_target(self._traj_points[0])
        if start_q is None:
            return False
        max_err = max(abs(current_q[i] - start_q[i]) for i in range(self.dof))
        self.get_logger().info(
            f'compare current pose to traj start: max_err={max_err:.6f}, '
            f'tolerance={tolerance:.6f}'
        )
        return max_err <= tolerance

    def move_to_first(self, point):
        joint_target = self.resolve_move_joint_target(point)
        if joint_target is None:
            return False

        req = MoveJoint.Request()
        req.q = list(joint_target)
        req.a = 0.3
        req.v = 0.3
        req.blend_radius = 0.0
        req.duration = 0.0
        resp = self.call_srv(
            self.move_joint_cli,
            req,
            f'/{self.robot}/MotionControl/moveJoint',
            timeout_sec=5.0,
        )
        if resp is None:
            return False
        self.get_logger().info(
            f'moveJoint result={resp.result} code={resp.code} message={resp.message}'
        )
        if resp.result == 0:
            return True
        if resp.result == 13:
            self.get_logger().warning(
                'moveJoint returned 13 (ignored); checking whether robot is already at trajectory start'
            )
            if self.is_near_start_pose():
                self.get_logger().info(
                    'robot is already at trajectory start; skip waiting for moveJoint'
                )
                return True
        return False

    def resolve_move_joint_target(self, point):
        if self._point_type == 'joint':
            if len(point) != self.dof:
                self.get_logger().error(
                    f'joint point dimension mismatch: expected {self.dof}, got {len(point)}'
                )
                return None
            return list(point)

        if self._point_type != 'cartesian':
            self.get_logger().error(f'unsupported point_type: {self._point_type}')
            return None

        if len(point) != 6:
            self.get_logger().error(
                f'cartesian point dimension mismatch: expected 6, got {len(point)}'
            )
            return None

        qnear = self.get_joint_positions()
        if qnear is None:
            self.get_logger().error('failed to read current joints before inverseKinematics')
            return None

        req = InverseKinematics.Request()
        req.qnear = list(qnear)
        req.pose = list(point)
        resp = self.call_srv(
            self.inverse_kinematics_cli,
            req,
            f'/{self.robot}/RobotAlgorithm/inverseKinematics',
        )
        if resp is None or resp.code != 0:
            self.get_logger().error(
                'inverseKinematics failed for cartesian first point'
            )
            return None
        joint_target, ik_rc = self.parse_ik_result(resp.result)
        if joint_target is None:
            self.get_logger().error(
                f'inverseKinematics returned unsupported result payload: {resp.result!r}'
            )
            return None
        if ik_rc not in (None, 0):
            self.get_logger().warning(
                f'inverseKinematics returned ik_rc={ik_rc} for cartesian first point'
            )
        if len(joint_target) != self.dof:
            self.get_logger().error(
                f'inverseKinematics result dimension mismatch: expected {self.dof}, '
                f'got {len(joint_target)}'
            )
            return None
        self.get_logger().info(
            f'resolved cartesian first point to joint target via inverseKinematics: {joint_target}'
        )
        return joint_target

    def parse_ik_result(self, value):
        def try_cast_sequence(seq):
            try:
                return [float(item) for item in seq]
            except (TypeError, ValueError):
                return None

        if isinstance(value, (list, tuple)):
            direct = try_cast_sequence(value)
            if direct is not None:
                return direct, 0
            if value and isinstance(value[0], (list, tuple)):
                joint_target = try_cast_sequence(value[0])
                if joint_target is None:
                    return None, None
                ik_code = value[1] if len(value) > 1 else 0
                try:
                    ik_code = int(ik_code)
                except (TypeError, ValueError):
                    ik_code = None
                return joint_target, ik_code
            return None, None

        if not isinstance(value, str):
            return None, None

        text = value.strip()
        if not text:
            return None, None

        try:
            parsed = json.loads(text)
            if isinstance(parsed, list):
                direct = try_cast_sequence(parsed)
                if direct is not None:
                    return direct, 0
                if parsed and isinstance(parsed[0], list):
                    joint_target = try_cast_sequence(parsed[0])
                    if joint_target is None:
                        return None, None
                    ik_code = parsed[1] if len(parsed) > 1 else 0
                    try:
                        ik_code = int(ik_code)
                    except (TypeError, ValueError):
                        ik_code = None
                    return joint_target, ik_code
        except (ValueError, TypeError):
            pass

        try:
            return [float(item.strip()) for item in text.split(',') if item.strip()], 0
        except ValueError:
            return None, None

    def wait_until_steady(self, timeout_sec=10.0):
        deadline = time.time() + timeout_sec
        while rclpy.ok() and time.time() < deadline:
            resp = self.call_srv(
                self.is_steady_cli,
                IsSteady.Request(),
                f'/{self.robot}/RobotState/isSteady',
            )
            if resp is not None and resp.code == 0 and resp.result:
                return True
            time.sleep(0.1)
        self.get_logger().error('robot did not become steady')
        return False

    def wait_move_done(self, timeout_sec=30.0):
        start_deadline = time.time() + min(timeout_sec, 3.0)
        exec_id = -1
        while rclpy.ok() and time.time() < start_deadline:
            value = self.get_exec_id()
            if value is None:
                return False
            exec_id = value
            if exec_id != -1:
                self.get_logger().info(f'moveJoint started exec_id={exec_id}')
                break
            time.sleep(0.05)

        if exec_id == -1:
            if self.is_near_start_pose():
                self.get_logger().info(
                    'moveJoint did not start, but robot is already at trajectory start'
                )
                return self.wait_until_steady()
            self.get_logger().error('moveJoint did not start within timeout')
            return False

        finish_deadline = time.time() + timeout_sec
        while rclpy.ok() and time.time() < finish_deadline:
            value = self.get_exec_id()
            if value is None:
                return False
            if value == -1:
                self.get_logger().info('moveJoint exec_id returned to idle')
                return self.wait_until_steady(timeout_sec=5.0)
            time.sleep(0.05)

        self.get_logger().error('moveJoint did not finish within timeout')
        return False

    def publish_json(self, payload, tag):
        msg = String()
        msg.data = json.dumps(payload, separators=(',', ':'))
        self.pub.publish(msg)
        self.get_logger().info(f'joint_cmd[{tag}] {msg.data}')

    def start_session(self):
        self._session_id = self.get_clock().now().nanoseconds
        self._point_id = 0

    # ros_control joint_cmd JSON 协议说明:
    # 1) {"type":"servo_control","command":"set_mode",...}
    #    用来进入/退出服务端的 servo session。
    # 2) {"type":"servo_point","session_id":...,"point_id":...,"position":[...],...}
    #    用来在该 session 内发送一条 latest-reference 伺服点。
    # 3) mode 语义以底层伺服模式定义为准。
    # 4) 所有模式都显式发送 send_period 和 time。
    def publish_set_mode(self):
        self.start_session()
        payload = {
            'type': 'servo_control',
            'command': 'set_mode',
            'mode': self.args.servo_mode,
            'watchdog_timeout': self.args.watchdog,
            'send_period': self.args.period,
        }
        self.publish_json(payload, 'set_mode')

    def publish_stop(self):
        payload = {
            'type': 'servo_control',
            'command': 'stop',
        }
        self.publish_json(payload, 'stop')

    def publish_point(self, values, tag):
        self._point_id += 1
        payload = {
            'type': 'servo_point',
            'session_id': self._session_id,
            'point_id': self._point_id,
            'point_type': self._point_type,
            'position': list(values),
            'time': self.args.time if self.args.time >= 0.0 else self.args.period,
            'lookahead_time': self.args.lookahead_time,
            'gain': self.args.gain,
        }
        self.publish_json(payload, tag)

    def build_current_test_points(self):
        current = self.get_joint_positions()
        if current is None:
            return []

        # current 模式采用从当前位姿到全 0 位的密集插补轨迹,
        # 用于验证 latest-reference joint stream 在连续轨迹下的表现。
        target = [0.0] * len(current)
        max_delta = max(abs(current[i] - target[i]) for i in range(len(current)))
        step = max(self.args.interp_step, 1e-4)
        segments = max(1, int(max_delta / step + 0.999999))

        points = []
        for _ in range(self.args.prefeed):
            points.append(list(current))

        for idx in range(1, segments + 1):
            ratio = idx / segments
            point = [
                current[j] + (target[j] - current[j]) * ratio
                for j in range(len(current))
            ]
            points.append(point)

        for _ in range(self.args.hold):
            points.append(list(target))

        self.get_logger().info(
            f'build current mode trajectory: start={current} target={target} '
            f'segments={segments} interp_step={step:.6f}'
        )
        self._traj_points = points
        return points

    def resolve_traj_path(self):
        if self.args.traj:
            return self.args.traj

        env_value = os.environ.get('AUBO_TRAJ_PATH', '').strip()
        if env_value:
            self.args.traj = env_value
            return self.args.traj

        if sys.stdin.isatty():
            while True:
                try:
                    value = input(
                        f'请输入 csv 轨迹文件路径 (默认: {DEFAULT_TRAJ}): '
                    ).strip()
                except EOFError:
                    value = ''

                self.args.traj = value or DEFAULT_TRAJ
                if os.path.isfile(self.args.traj):
                    return self.args.traj
                print(f'轨迹文件不存在: {self.args.traj!r},请重新输入。')

        self.args.traj = DEFAULT_TRAJ
        return self.args.traj

    def resolve_csv_point_type(self):
        if self.args.mode != 'csv':
            self._point_type = 'joint'
            return self._point_type

        if self.args.point_type in ('joint', 'cartesian'):
            self._point_type = self.args.point_type
            return self._point_type

        env_value = os.environ.get('AUBO_POINT_TYPE', '').strip().lower()
        if env_value in ('joint', 'cartesian'):
            self.args.point_type = env_value
            self._point_type = env_value
            return self._point_type

        if sys.stdin.isatty():
            while True:
                try:
                    choice = input(
                        '请选择 csv 点位类型 [1/2] (默认 1): '
                        '\n  1. joint'
                        '\n  2. cartesian'
                        '\n> '
                    ).strip().lower()
                except EOFError:
                    choice = ''

                if choice in ('', '1', 'joint'):
                    self.args.point_type = 'joint'
                    self._point_type = 'joint'
                    return self._point_type
                if choice in ('2', 'cartesian'):
                    self.args.point_type = 'cartesian'
                    self._point_type = 'cartesian'
                    return self._point_type
                print(f'无效选择: {choice!r},请输入 1 或 2。')

        self.args.point_type = 'joint'
        self._point_type = 'joint'
        return self._point_type

    def load_csv_points(self):
        traj_path = self.resolve_traj_path()
        if not traj_path:
            return []
        if not os.path.isfile(traj_path):
            self.get_logger().error(f'trajectory not found: {traj_path}')
            return []

        point_type = self.resolve_csv_point_type()
        expected_size = self.dof if point_type == 'joint' else 6

        points = []
        with open(traj_path, 'r', encoding='utf-8', errors='replace') as file:
            for lineno, line in enumerate(file, start=1):
                text = line.strip()
                if not text:
                    continue
                try:
                    point = [float(x.strip()) for x in text.split(',')]
                except ValueError:
                    self.get_logger().error(f'invalid line {lineno}: {text}')
                    return []
                points.append(point)

        if not points:
            self.get_logger().error('trajectory is empty')
            return []
        if any(len(point) != len(points[0]) for point in points):
            self.get_logger().error('trajectory dimension mismatch')
            return []
        if any(len(point) != expected_size for point in points):
            self.get_logger().error(
                f'trajectory point size mismatch: point_type={point_type} '
                f'expected={expected_size} actual={len(points[0])}'
            )
            return []
        self._traj_points = points
        self.get_logger().info(
            f'loaded csv trajectory: {traj_path} point_type={point_type}'
        )
        return points

    def run(self):
        if self.args.mode == 'csv':
            points = self.load_csv_points()
            if not points:
                return 1
            if self.args.move_to_first:
                if not self.move_to_first(points[0]):
                    return 1
                if not self.wait_move_done():
                    return 1
        else:
            self._point_type = 'joint'
            points = self.build_current_test_points()
            if not points:
                return 1

        self.publish_set_mode()
        time.sleep(min(self.args.period, 0.05))

        try:
            next_tp = time.monotonic()
            for index, point in enumerate(points, start=1):
                self.publish_point(point, f'point_{index}')
                next_tp += self.args.period
                delay = next_tp - time.monotonic()
                if delay > 0.0:
                    time.sleep(delay)
            time.sleep(min(self.args.watchdog * 0.5, 0.2))
        finally:
            self.publish_stop()
        return 0


def main():
    args = parse_args()
    rclpy.init()
    node = JointCmdPositionServoTest(args)
    try:
        rc = node.run()
    finally:
        node.destroy_node()
        rclpy.shutdown()
    raise SystemExit(rc)


if __name__ == '__main__':
    main()

8.1 脚本主流程说明

脚本主流程可以概括成下面这一段:

def run(self):
    if self.args.mode == 'csv':
        points = self.load_csv_points()
        if not points:
            return 1
        if self.args.move_to_first:
            if not self.move_to_first(points[0]):
                return 1
            if not self.wait_move_done():
                return 1
    else:
        self._point_type = 'joint'
        points = self.build_current_test_points()
        if not points:
            return 1

    self.publish_set_mode()
    time.sleep(min(self.args.period, 0.05))

    try:
        next_tp = time.monotonic()
        for index, point in enumerate(points, start=1):
            self.publish_point(point, f'point_{index}')
            next_tp += self.args.period
            delay = next_tp - time.monotonic()
            if delay > 0.0:
                time.sleep(delay)
        time.sleep(min(self.args.watchdog * 0.5, 0.2))
    finally:
        self.publish_stop()
    return 0

该段代码对应的调用顺序如下:

  1. 先准备好要发送的点列
  2. 如果是 csv 模式,可先 moveJoint 到起点
  3. 发送 servo_control set_mode 建立伺服会话
  4. 按固定周期连续发送 servo_point
  5. 结束时无论是否异常都发送 stop

8.2 脚本中使用到的 Service

脚本初始化时会创建这些 Service 客户端:

self.get_dof_cli = self.create_client(
    GetDof, f'/{self.robot}/RobotConfig/getDof'
)
self.get_joint_positions_cli = self.create_client(
    GetJointPositions, f'/{self.robot}/RobotState/getJointPositions'
)
self.get_exec_id_cli = self.create_client(
    GetExecId, f'/{self.robot}/MotionControl/getExecId'
)
self.is_steady_cli = self.create_client(
    IsSteady, f'/{self.robot}/RobotState/isSteady'
)
self.move_joint_cli = self.create_client(
    MoveJoint, f'/{self.robot}/MotionControl/moveJoint'
)
self.inverse_kinematics_cli = self.create_client(
    InverseKinematics, f'/{self.robot}/RobotAlgorithm/inverseKinematics'
)

它们分别用于:

  • getDof:读取机器人自由度
  • getJointPositions:读取当前关节位置
  • getExecId:判断 moveJoint 是否已经启动或结束
  • isSteady:确认机器人是否重新稳定
  • moveJoint:在 csv 模式下先移动到轨迹起点
  • inverseKinematics:当 csv 输入的是笛卡尔点时,把第一点转换成关节目标

8.3 current 模式

current 模式的核心逻辑如下:

def build_current_test_points(self):
    current = self.get_joint_positions()
    if current is None:
        return []

    target = [0.0] * len(current)
    max_delta = max(abs(current[i] - target[i]) for i in range(len(current)))
    step = max(self.args.interp_step, 1e-4)
    segments = max(1, int(max_delta / step + 0.999999))

    points = []
    for _ in range(self.args.prefeed):
        points.append(list(current))

    for idx in range(1, segments + 1):
        ratio = idx / segments
        point = [
            current[j] + (target[j] - current[j]) * ratio
            for j in range(len(current))
        ]
        points.append(point)

    for _ in range(self.args.hold):
        points.append(list(target))

    return points

处理逻辑如下:

  • 先读取机器人当前关节位置
  • 把目标位置设为全 0 位
  • 根据 interp_step 计算需要插补多少段
  • 前置重复发送若干次当前点,用于提高伺服会话启动阶段的接收稳定性
  • 中间发送一串从当前位姿逐步过渡到目标位姿的关节点
  • 末尾重复发送若干次目标点,用于结束阶段稳定保持

该模式适合作为基础联调验证方式,因为其不依赖外部轨迹文件。


8.4 csv 模式

csv 模式的核心逻辑如下:

def load_csv_points(self):
    traj_path = self.resolve_traj_path()
    if not os.path.isfile(traj_path):
        return []

    point_type = self.resolve_csv_point_type()
    expected_size = self.dof if point_type == 'joint' else 6

    points = []
    with open(traj_path, 'r', encoding='utf-8', errors='replace') as file:
        for lineno, line in enumerate(file, start=1):
            text = line.strip()
            if not text:
                continue
            point = [float(x.strip()) for x in text.split(',')]
            points.append(point)

    if any(len(point) != expected_size for point in points):
        return []
    self._traj_points = points
    return points

配合主流程里的这段判断:

if self.args.move_to_first:
    if not self.move_to_first(points[0]):
        return 1
    if not self.wait_move_done():
        return 1

csv 模式的主要特征如下:

  • 从文本轨迹中逐行读取点位
  • 支持 jointcartesian 两种点类型
  • 如果是 joint,每行长度必须等于 DOF
  • 如果是 cartesian,每行长度必须为 6
  • 在正式进入伺服前,可先将机器人移动至轨迹第一点附近

其中真正发送到 joint_cmd 的 JSON 格式如下:

def publish_set_mode(self):
    self.start_session()
    payload = {
        'type': 'servo_control',
        'command': 'set_mode',
        'mode': self.args.servo_mode,
        'watchdog_timeout': self.args.watchdog,
        'send_period': self.args.period,
    }
    self.publish_json(payload, 'set_mode')

def publish_point(self, values, tag):
    self._point_id += 1
    payload = {
        'type': 'servo_point',
        'session_id': self._session_id,
        'point_id': self._point_id,
        'point_type': self._point_type,
        'position': list(values),
        'lookahead_time': self.args.lookahead_time,
        'gain': self.args.gain,
    }
    self.publish_json(payload, tag)

关键字段说明如下:

  • servo_control 用于开始或结束会话
  • session_id 用于标识同一轮伺服会话
  • point_id 需要在同一会话内持续递增
  • point_type 指明当前点是关节点还是笛卡尔点
  • position 是本次目标点数据
  • lookahead_timegain 为实现实际消费的参数

results matching ""

    No results matching ""