6. 二次开发快速上手
6. 二次开发快速上手
Section titled “6. 二次开发快速上手”本章节将通过几个简单的示例程序演示如何快速跑起第一个二次开发程序,对此建立一个基本认知。
6.1 前置条件
Section titled “6.1 前置条件”二次开发程序推荐部署在个人 PC 或额外加装算力板上,系统要求为 Ubuntu 22.04 installed ROS2 Humble with Fastdds,通过网线直连到机器人背部调试网口(192.168.2.50)或预留 to_orin 网口(192.168.100.100)。本教程中以个人 PC网线直连调试网口(192.168.2.50 )为例,网口不同的情况只需修改 ip 即可。
6.1.1 有线连接调试网口
Section titled “6.1.1 有线连接调试网口”首先将 PC 的有线网口 ip 调整为固定 ip 192.168.2.150,然后将网线两端一端连接 PC 有线网口,一端连接机器人背后右侧调试网口。
机器人开机后,在个人 PC 上应该能够 ping 通 192.168.2.50
ping 192.168.2.506.1.2 添加 DDS 配置白名单
Section titled “6.1.2 添加 DDS 配置白名单”首先 ssh 连接到机器人,将相应配置文件备份后修改配置文件
ssh agi@192.168.2.50cd /agibot/software/v0/entry/bin/cfg/cp ros_dds_configuration.xml ros_dds_configuration.xml.originalcp privileged_ros_dds_configuration.xml privileged_ros_dds_configuration.xml.original然后使用 vim 修改以上两个配置文件
vim ros_dds_configuration.xmlvim privileged_ros_dds_configuration.xml此处仅展示修改后的片段(两个文件以上片段均相同)
<interfaceWhiteList> <address>127.0.0.1</address> <address>10.42.0.100</address> <address>192.168.2.50</address> </interfaceWhiteList>修改后重启机器人,待机器人初始化完成后,在个人 PC 上执行如下命令,能够看到机上诸多话题并正常 echo 对应话题即为成功。
source /opt/ros/humble/setup.bashexport ROS_DOMAIN_ID=232export ROS_LOCALHOST_ONLY=0ros2 topic list --no-daemonros2 topic echo /motion/control/arm_joint_state6.2 获取电池电量程序运行示例
Section titled “6.2 获取电池电量程序运行示例”本示例采用 HTTP 请求获取机器人 BMS 状态,仅依赖 5.1.1 节中有线连接。另外若机器人与 PC 连接到统一局域网中,可通过机器人在局域网中的 ip 来访问相应服务。
直接在个人 PC 上运行如下脚本:
curl -i -H 'content-type:application/json' \ -H 'timeout: 1000' -X POST 'http://192.168.2.50:56421/rpc/aimdk.protocol.HalBmsService/GetBmsState' \ -d '{}'返回结果应如下所示(具体字段含义在后续接口章节中有详细解释):
HTTP/1.1 200 OKAccess-Control-Allow-Origin: *Content-Type: application/jsonContent-Length: 709
{"data":{"ver":{"hardware_major":49,"hardware_minor":48,"hardware_revision":49,"software_major":53,"software_minor":48,"software_revision":49},"voltage":54200,"current":28,"power":0,"temperature":285,"capacity":14090,"charge":100,"power_supply_health":"PowerSupplyHealth_GOOD","power_supply_status":"PowerSupplyStatus_IDEL","cycles_num":100,"cycles_capacity":10,"abnormal_state":"PowerAbnormalStatus_OVERVOLTAGE","charger_state":"ChargerConnected","bms_state":"BatteryStatus_Connected","max_current":29568,"battery_firmware_type":"BatteryFirmwareType_OLD","battery_key_state":"BatteryKeyStatus_SHORT_CURCUIT","battery_pack_state":"BatteryPackStatus_NORMAL","battery_comm_state":"BatteryCommunication_NORMAL"}}6.3 获取机器人上肢关节状态示例
Section titled “6.3 获取机器人上肢关节状态示例”本实例采用 ROS2 程序订阅机上对应话题,依赖 5.1 节中所有前置条件。不推荐使用局域无线网获取 ROS2 话题等数据(延迟高不可控,且 ROS2 话题暴露在局域网中可能存在多机器人干扰等问题,安全性大幅降低)。
将以下代码在 PC 上保存为 joint_state.py 文件。
#!/usr/bin/env python3import rclpyfrom rclpy.node import Nodefrom rclpy.qos import QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicyfrom sensor_msgs.msg import JointState
class JointStateSubscriber(Node): def __init__(self, topic_name: str): super().__init__("joint_state_subscriber")
self.topic_name = topic_name
qos_profile = QoSProfile( history=QoSHistoryPolicy.KEEP_LAST, depth=10, reliability=QoSReliabilityPolicy.BEST_EFFORT )
self.subscription = self.create_subscription(JointState, topic_name, self.listener_callback, qos_profile)
def listener_callback(self, msg: JointState): self.get_logger().info(f"=== Received {self.topic_name} ===") self.get_logger().info(f" header: {msg.header}") self.get_logger().info(f" name: {msg.name}") self.get_logger().info(f" position: {msg.position}") self.get_logger().info(f" velocity: {msg.velocity}") self.get_logger().info(f" effort: {msg.effort}")
def main(args=None): rclpy.init(args=args) # 手臂关节状态: /motion/control/arm_joint_state # 脖子关节状态: /motion/control/neck_joint_state joint_state_node = JointStateSubscriber("/motion/control/arm_joint_state") try: rclpy.spin(joint_state_node) except KeyboardInterrupt: pass finally: joint_state_node.destroy_node() rclpy.shutdown()
if __name__ == "__main__": main()然后执行如下命令
source /opt/ros/humble/setup.bashexport ROS_DOMAIN_ID=232export ROS_LOCALHOST_ONLY=0python3 joint_state.py6.4 控制摇头接口示例
Section titled “6.4 控制摇头接口示例”我们为头部同时提供了 HTTP 接口和 ROS2 接口。
6.4.1 摇头 HTTP 示例
Section titled “6.4.1 摇头 HTTP 示例”使用 HTTP 接口控制较为简单,如下脚本
#!/usr/bin/env python3
## 功能:设置头部关节值
import requestsfrom datetime import datetime
# 生成请求头部def create_header(): now = datetime.utcnow() header = { "timestamp": { "seconds": int(now.timestamp()), "nanos": now.microsecond * 1000, "ms_since_epoch": int(now.timestamp() * 1000), }, "control_source": "ControlSource_SAFE", } return header
def create_neck_data(): neck_data = { "shake": { "name": "idx27_head_joint1", "position": 0.5, "velocity": 0.0, "effort": 0.0, }, "nod": { "name": "idx28_head_joint2", "position": 0.5, "velocity": 0.0, "effort": 0.0, }, }
return neck_data
def service_call(): # 发送服务请求
url = ( "http://192.168.2.50:56322/rpc/aimdk.protocol.McMotionService/SetNeckCommand" ) headers = {"Content-Type": "application/json", "timeout": "60000"}
payload = {"header": create_header(), "data": create_neck_data()}
response = requests.Session().post(url, headers=headers, json=payload)
return response
# 主程序入口def main(): response = service_call() print(response.text) requests.Session().close()
if __name__ == "__main__": main()其中 idx28_head_joint2 是无效的,但是必须保留,青春款机器人仅有摇头一个自由度,其中的 velocity 和 effort 均无效,仅 position 会生效。
6.4.2 摇头 ROS2 示例
Section titled “6.4.2 摇头 ROS2 示例”使用 ROS2 接口调用摇头程序需要一些前置条件:
-
需要先切换运动控制模块的状态机至 RL_LOCOMOTION_ARM_EXT_JOINT_SERVO 状态,可在 AimMaster 上进行切换,后续详细接口讲解也会介绍如何使用程序接口进行切换。
-
调用如下命令释放动作播放模块对于上半身关节的控制
Terminal window curl -i \-H 'content-type:application/json' \-H 'timeout: 1000' \-X POST http://192.168.2.50:56444/rpc/aimdk.protocol.MotionCommandService/DisableMotionPlayer \-d '{}'
接下来将以下脚本保存为 neck.py
#!/usr/bin/env python3import rclpyfrom rclpy.node import Nodefrom rclpy.qos import QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicyfrom sensor_msgs.msg import JointState
class NeckPublisher(Node): def __init__(self, neck_topic_name: str): super().__init__("neck_publisher")
qos_profile = QoSProfile( history=QoSHistoryPolicy.KEEP_LAST, depth=10, reliability=QoSReliabilityPolicy.BEST_EFFORT, )
self.publisher = self.create_publisher(JointState, neck_topic_name, qos_profile)
timer_period = 0.02 self.timer = self.create_timer(timer_period, self.timer_callback)
def timer_callback(self): msg = JointState() msg.header.stamp = self.get_clock().now().to_msg()
# 仅支持位控 # 第一个值代表摇头,范围 -0.785 - 0.785 # 第二个值无效,仍需要填写 msg.name = ["idx27_head_joint1", "idx28_head_joint2"] msg.position = [0.5, 0.5]
# 以下字段无实际作用,但必须设置,否则可能导致运控 crash msg.velocity = [0.0, 0.0] msg.effort = [0.0, 0.0]
self.publisher.publish(msg)
def main(args=None): rclpy.init(args=args) neck_publisher = NeckPublisher("/motion/control/neck_joint_command") try: rclpy.spin(neck_publisher) except KeyboardInterrupt: pass finally: neck_publisher.destroy_node() rclpy.shutdown()
if __name__ == "__main__": main()然后运行
source /opt/ros/humble/setup.bashexport ROS_DOMAIN_ID=232export ROS_LOCALHOST_ONLY=0python3 neck.py