3. 二次开发接口概述
3. 二次开发接口概述
Section titled “3. 二次开发接口概述”上述软件系统中模块大多数均使用 AimRT 作为底层框架,AimRT 是一款对标 ROS2 的机器人运行时底层框架,由智元机器人公司在 2024/09 开源,AimRT 中提供了两种基本的底层通信方式,分别为 Channel 和 RPC(更多内容可以参考 AimRT 官方文档),其中两种通信方式又可以对应多个不同通信后端。
为降低二次开发心智负担,我们为二次开发提供了两种形式使用较为简单和通用的接口,分别为 HTTP JSON RPC 和 ROS2 Topic,其中 HTTP JSON RPC 适用于低频接口多对一调用,ROS2 Topic 适用于高频接口多对多调用。以下为二者通信关系图。

针对开发语言,二次开发程序不限制具体语言。只要能够收发 HTTP 请求,即可调用所有 HTTP JSON RPC 接口;只要支持 ROS 2 Jazzy(基于 Fast DDS),即可通过订阅和发布 ROS 2 Topic 调用所有本体接口。
这里给出两个具体的接口调用示例,建立更加清晰的接口认知:
3.1 HTTP JSON RPC
Section titled “3.1 HTTP JSON RPC”以下为查询静默模式的命令行 RPC 请求示例
curl --location --request POST 'http://10.42.10.10:59301/rpc/aimdk.protocol.AgentControlService/GetVoiceEnable' \ --header 'Content-Type: application/json' \ --data-raw '{}'其中
10.42.10.10为机器人 HDU 在内网互联网段中的 IP 地址;10.42.10.12为 MDU 在同一网段中的 IP 地址。文档中的接口 URL 会明确写出应访问哪一台工控机,如需远程调用,请改为对应设备在当前网络中的可达 IP(可通过 AimMaster 查看或在目标设备执行ip a查询)。59301相应服务的 HTTP 端口号,这个一般会在详细接口示例中给出。content-type:application/json为请求头,表示请求体为 json 格式,必须要加上这个请求头。aimdk.protocol.AgentControlService/GetVoiceEnable为查询静默模式 RPC 接口名称,各个接口的具体名称在对应接口的文档和示例中可以找到。'{}'为请求体,这里为空代表其中的字段都为默认值,如果需要传入特定字段,可以传入 json 字符串,例如'{"bms_state": 1}'表示传入 bms_state 字段,值为 1(仅为举例,具体字段名称和值需要参考对应接口的文档和示例)。
3.2 ROS2 Topic
Section titled “3.2 ROS2 Topic”3.2.1 环境变量设置
Section titled “3.2.1 环境变量设置”话题接口采用 ROS2 原生话题,在调用任意 ROS2 Topic 接口之前必须设置一些环境变量,如下:
export ROS_DOMAIN_ID=232export FASTRTPS_DEFAULT_PROFILES_FILE=/agibot/software/v0/entry/cfg/ros_dds_configuration.xml如果您的程序需要以 root 权限启动则将上述 dds 配置文件改为使用以下配置文件:
Terminal window export FASTRTPS_DEFAULT_PROFILES_FILE=/agibot/software/v0/entry/bin/privileged_ros_dds_configuration.xml在
ros_dds_configuration.xml中同时启用了共享内存通信和 udp 通信,而在privileged_ros_dds_configuration.xml中则仅启用了 udp 通信,用以规避共享内存的权限问题。
注意环境变量仅在当前终端有效,若新开终端则需重新设置
3.2.2 QoS 设置
Section titled “3.2.2 QoS 设置”另外还需注意机上话题默认 QoS 为
history: keep_last depth: 10 reliability: best_effort使用时需保持 QoS 一致以保证通信顺畅。
3.2.3 调用示例
Section titled “3.2.3 调用示例”以下为订阅关节状态的 ROS2 Topic 示例脚本。
#!/usr/bin/env python3import rclpyfrom rclpy.node import Nodefrom rclpy.qos import QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicyfrom sensor_msgs.msg import JointState
class JointStateSubscriber(Node): def __init__(self, topic_name: str): super().__init__("joint_state_subscriber")
self.topic_name = topic_name
qos_profile = QoSProfile( history=QoSHistoryPolicy.KEEP_LAST, depth=10, reliability=QoSReliabilityPolicy.BEST_EFFORT )
self.subscription = self.create_subscription(JointState, topic_name, self.listener_callback, qos_profile)
def listener_callback(self, msg: JointState): self.get_logger().info(f"=== Received {self.topic_name} ===") self.get_logger().info(f" header: {msg.header}") self.get_logger().info(f" name: {msg.name}") self.get_logger().info(f" position: {msg.position}") self.get_logger().info(f" velocity: {msg.velocity}") self.get_logger().info(f" effort: {msg.effort}")
def main(args=None): rclpy.init(args=args) # 手臂关节状态: /motion/control/arm_joint_state # 脖子关节状态: /motion/control/neck_joint_state joint_state_node = JointStateSubscriber("/motion/control/arm_joint_state") try: rclpy.spin(joint_state_node) except KeyboardInterrupt: pass finally: joint_state_node.destroy_node() rclpy.shutdown()
if __name__ == "__main__": main()