跳转到内容

6. 二次开发快速上手

本章节将通过几个简单的示例程序演示如何快速跑起第一个二次开发程序,对此建立一个基本认知。

二次开发程序推荐部署在个人 PC 或额外加装算力板上,系统要求为 Ubuntu 22.04 installed ROS2 Humble with Fastdds,通过网线直连到机器人背部调试网口(192.168.2.50)或预留 to_orin 网口(192.168.100.100)。本教程中以个人 PC网线直连调试网口(192.168.2.50 )为例,网口不同的情况只需修改 ip 即可。

首先将 PC 的有线网口 ip 调整为固定 ip 192.168.2.150,然后将网线两端一端连接 PC 有线网口,一端连接机器人背后右侧调试网口。

机器人开机后,在个人 PC 上应该能够 ping 通 192.168.2.50

Terminal window
ping 192.168.2.50

首先 ssh 连接到机器人,将相应配置文件备份后修改配置文件

Terminal window
ssh agi@192.168.2.50
Terminal window
cd /agibot/software/v0/entry/bin/cfg/
cp ros_dds_configuration.xml ros_dds_configuration.xml.original
cp privileged_ros_dds_configuration.xml privileged_ros_dds_configuration.xml.original

然后使用 vim 修改以上两个配置文件

Terminal window
vim ros_dds_configuration.xml
vim privileged_ros_dds_configuration.xml

此处仅展示修改后的片段(两个文件以上片段均相同)

<interfaceWhiteList>
<address>127.0.0.1</address>
<address>10.42.0.100</address>
<address>192.168.2.50</address>
</interfaceWhiteList>

修改后重启机器人,待机器人初始化完成后,在个人 PC 上执行如下命令,能够看到机上诸多话题并正常 echo 对应话题即为成功。

Terminal window
source /opt/ros/humble/setup.bash
export ROS_DOMAIN_ID=232
export ROS_LOCALHOST_ONLY=0
ros2 topic list --no-daemon
ros2 topic echo /motion/control/arm_joint_state

本示例采用 HTTP 请求获取机器人 BMS 状态,仅依赖 5.1.1 节中有线连接。另外若机器人与 PC 连接到统一局域网中,可通过机器人在局域网中的 ip 来访问相应服务。

直接在个人 PC 上运行如下脚本:

Terminal window
curl -i -H 'content-type:application/json' \
-H 'timeout: 1000' -X POST 'http://192.168.2.50:56421/rpc/aimdk.protocol.HalBmsService/GetBmsState' \
-d '{}'

返回结果应如下所示(具体字段含义在后续接口章节中有详细解释):

HTTP/1.1 200 OK
Access-Control-Allow-Origin: *
Content-Type: application/json
Content-Length: 709
{"data":{"ver":{"hardware_major":49,"hardware_minor":48,"hardware_revision":49,"software_major":53,"software_minor":48,"software_revision":49},"voltage":54200,"current":28,"power":0,"temperature":285,"capacity":14090,"charge":100,"power_supply_health":"PowerSupplyHealth_GOOD","power_supply_status":"PowerSupplyStatus_IDEL","cycles_num":100,"cycles_capacity":10,"abnormal_state":"PowerAbnormalStatus_OVERVOLTAGE","charger_state":"ChargerConnected","bms_state":"BatteryStatus_Connected","max_current":29568,"battery_firmware_type":"BatteryFirmwareType_OLD","battery_key_state":"BatteryKeyStatus_SHORT_CURCUIT","battery_pack_state":"BatteryPackStatus_NORMAL","battery_comm_state":"BatteryCommunication_NORMAL"}}

6.3 获取机器人上肢关节状态示例

Section titled “6.3 获取机器人上肢关节状态示例”

本实例采用 ROS2 程序订阅机上对应话题,依赖 5.1 节中所有前置条件。不推荐使用局域无线网获取 ROS2 话题等数据(延迟高不可控,且 ROS2 话题暴露在局域网中可能存在多机器人干扰等问题,安全性大幅降低)。

将以下代码在 PC 上保存为 joint_state.py 文件。

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
from sensor_msgs.msg import JointState
class JointStateSubscriber(Node):
def __init__(self, topic_name: str):
super().__init__("joint_state_subscriber")
self.topic_name = topic_name
qos_profile = QoSProfile(
history=QoSHistoryPolicy.KEEP_LAST, depth=10, reliability=QoSReliabilityPolicy.BEST_EFFORT
)
self.subscription = self.create_subscription(JointState, topic_name, self.listener_callback, qos_profile)
def listener_callback(self, msg: JointState):
self.get_logger().info(f"=== Received {self.topic_name} ===")
self.get_logger().info(f" header: {msg.header}")
self.get_logger().info(f" name: {msg.name}")
self.get_logger().info(f" position: {msg.position}")
self.get_logger().info(f" velocity: {msg.velocity}")
self.get_logger().info(f" effort: {msg.effort}")
def main(args=None):
rclpy.init(args=args)
# 手臂关节状态: /motion/control/arm_joint_state
# 脖子关节状态: /motion/control/neck_joint_state
joint_state_node = JointStateSubscriber("/motion/control/arm_joint_state")
try:
rclpy.spin(joint_state_node)
except KeyboardInterrupt:
pass
finally:
joint_state_node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

然后执行如下命令

Terminal window
source /opt/ros/humble/setup.bash
export ROS_DOMAIN_ID=232
export ROS_LOCALHOST_ONLY=0
python3 joint_state.py

我们为头部同时提供了 HTTP 接口和 ROS2 接口。

使用 HTTP 接口控制较为简单,如下脚本

#!/usr/bin/env python3
## 功能:设置头部关节值
import requests
from datetime import datetime
# 生成请求头部
def create_header():
now = datetime.utcnow()
header = {
"timestamp": {
"seconds": int(now.timestamp()),
"nanos": now.microsecond * 1000,
"ms_since_epoch": int(now.timestamp() * 1000),
},
"control_source": "ControlSource_SAFE",
}
return header
def create_neck_data():
neck_data = {
"shake": {
"name": "idx27_head_joint1",
"position": 0.5,
"velocity": 0.0,
"effort": 0.0,
},
"nod": {
"name": "idx28_head_joint2",
"position": 0.5,
"velocity": 0.0,
"effort": 0.0,
},
}
return neck_data
def service_call():
# 发送服务请求
url = (
"http://192.168.2.50:56322/rpc/aimdk.protocol.McMotionService/SetNeckCommand"
)
headers = {"Content-Type": "application/json", "timeout": "60000"}
payload = {"header": create_header(), "data": create_neck_data()}
response = requests.Session().post(url, headers=headers, json=payload)
return response
# 主程序入口
def main():
response = service_call()
print(response.text)
requests.Session().close()
if __name__ == "__main__":
main()

其中 idx28_head_joint2 是无效的,但是必须保留,青春款机器人仅有摇头一个自由度,其中的 velocity 和 effort 均无效,仅 position 会生效。

使用 ROS2 接口调用摇头程序需要一些前置条件:

  1. 需要先切换运动控制模块的状态机至 RL_LOCOMOTION_ARM_EXT_JOINT_SERVO 状态,可在 AimMaster 上进行切换,后续详细接口讲解也会介绍如何使用程序接口进行切换。

  2. 调用如下命令释放动作播放模块对于上半身关节的控制

    Terminal window
    curl -i \
    -H 'content-type:application/json' \
    -H 'timeout: 1000' \
    -X POST http://192.168.2.50:56444/rpc/aimdk.protocol.MotionCommandService/DisableMotionPlayer \
    -d '{}'

接下来将以下脚本保存为 neck.py

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
from sensor_msgs.msg import JointState
class NeckPublisher(Node):
def __init__(self, neck_topic_name: str):
super().__init__("neck_publisher")
qos_profile = QoSProfile(
history=QoSHistoryPolicy.KEEP_LAST,
depth=10,
reliability=QoSReliabilityPolicy.BEST_EFFORT,
)
self.publisher = self.create_publisher(JointState, neck_topic_name, qos_profile)
timer_period = 0.02
self.timer = self.create_timer(timer_period, self.timer_callback)
def timer_callback(self):
msg = JointState()
msg.header.stamp = self.get_clock().now().to_msg()
# 仅支持位控
# 第一个值代表摇头,范围 -0.785 - 0.785
# 第二个值无效,仍需要填写
msg.name = ["idx27_head_joint1", "idx28_head_joint2"]
msg.position = [0.5, 0.5]
# 以下字段无实际作用,但必须设置,否则可能导致运控 crash
msg.velocity = [0.0, 0.0]
msg.effort = [0.0, 0.0]
self.publisher.publish(msg)
def main(args=None):
rclpy.init(args=args)
neck_publisher = NeckPublisher("/motion/control/neck_joint_command")
try:
rclpy.spin(neck_publisher)
except KeyboardInterrupt:
pass
finally:
neck_publisher.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

然后运行

source /opt/ros/humble/setup.bash
export ROS_DOMAIN_ID=232
export ROS_LOCALHOST_ONLY=0
python3 neck.py