跳转到内容

6. 二次开发快速上手

本章节将通过几个简单的示例程序演示如何快速跑起第一个二次开发程序,对此建立一个基本认知。

快速上手教程以二次开发程序部署在 ORIN 上且使用 Python venv 虚拟环境为例。

连接 ORIN 有多种方式,以下方式任选其一即可。

  1. 有线连接

    将 PC 的有线网口 ip 调整为固定 ip 192.168.2.100,然后将网线两端一端连接 PC 有线网口,一端连接机器人背后右侧调试网口。

    ping 192.168.2.50 # 确认是否联通
    ssh agi@192.168.2.50
  2. 无线 AP 热点连接

    开发机连接上 ORIN AP 热点,一般以 Robot- 开头,默认密码可在 AimDK 包中查看 password.md 文件,然后通过 AP 热点 IP ssh 登录到机器人上

    Terminal window
    ssh agi@192.168.88.88
  3. 通过 X86 WiFi 跳转

    需要开发机和机器人连接到同一个无线网络,从 AimMaster 的设置-无线局域网界面上获取机器人 ip,实际为X86 WiFi的 ip,ssh 连接上后再通过 192.168.100.110 跳转到 ORIN

    Terminal window
    ssh agi@192.168.xxx.xxx # x86 ip
    ssh agi@192.168.100.110

    ORIN 和 X86 之间通过网线直连在 192.168.100.0/24 网段上互联,可以通过 192.168.100.100 和 192.168.100.110 ip 进行互相跳转。

    也可以使用 ssh 的 proxyjump 功能一次性跳转过去(仍需输入两次密码)

    Terminal window
    ssh -J agi@192.168.xxx.xxx agi@192.168.100.110

    proxyjump 功能同样可用于 rsync 将本地文件夹快速同步到 ORIN 上无需在 x86 上中转传输,如下所示(x86 wifi ip 和文件夹需要修改):

    Terminal window
    rsync -avzP --exclude ".git/" \
    -e "ssh -J agi@192.168.xxx.xxx" \
    /path/to/local/dir agi@192.168.100.110:/agibot/data/home/agi/Desktop/working_dir

推荐的程序部署目录为 /agibot/data/home/agi/Desktop,机上磁盘清理模块会定时清理空间,该文件夹为白名单文件夹不会被清理。

程序、数据放置于其他路径可能造成分区写满、文件被误删等后果,请严格遵循使用 /agibot/data/home/agi/Desktop 目录的约定。

Terminal window
mkdir -p /agibot/data/home/agi/Desktop

如遇权限问题,可考虑使用如下命令进行文件夹创建

Terminal window
sudo mkdir -p /agibot/data/home/agi/Desktop
sudo chown -R agi:agi /agibot/data/home/agi
Terminal window
cd /agibot/data/home/agi/Desktop
python -m venv mydev
source mydev/bin/activate

后续示例都假设已创建虚拟环境并激活,不再重复此步骤。

本示例采用 HTTP 请求获取机器人 BMS 状态。

直接在 ORIN 命令行运行如下脚本:

curl -i -H 'content-type:application/json' \
-H 'timeout: 1000' -X POST 'http://192.168.100.100:56421/rpc/aimdk.protocol.HalBmsService/GetBmsState' \
-d '{}'

返回结果应如下所示(具体字段含义在后续接口章节中有详细解释):

HTTP/1.1 200 OK
Access-Control-Allow-Origin: *
Content-Type: application/json
Content-Length: 705
{"data":{"ver":{"hardware_major":48,"hardware_minor":48,"hardware_revision":49,"software_major":51,"software_minor":48,"software_revision":49},"voltage":54410,"current":-304,"power":0,"temperature":308,"capacity":13971,"charge":99,"power_supply_health":"PowerSupplyHealth_GOOD","power_supply_status":"PowerSupplyStatus_IDEL","cycles_num":129,"cycles_capacity":12,"abnormal_state":"PowerAbnormalStatus_NORMAL","charger_state":"ChargerConnected","bms_state":"BatteryStatus_Connected","max_current":25490,"battery_firmware_type":"BatteryFirmwareType_OLD","battery_key_state":"BatteryKeyStatus_SHORT_CURCUIT","battery_pack_state":"BatteryPackStatus_NORMAL","battery_comm_state":"BatteryCommunication_NORMAL"}}

6.3 获取机器人上肢关节状态示例

Section titled “6.3 获取机器人上肢关节状态示例”

将以下代码在 ORIN 上保存为 /agibot/data/home/agi/Desktop/joint_state.py 文件。

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
from sensor_msgs.msg import JointState
class JointStateSubscriber(Node):
def __init__(self, topic_name: str):
super().__init__("joint_state_subscriber")
self.topic_name = topic_name
qos_profile = QoSProfile(
history=QoSHistoryPolicy.KEEP_LAST, depth=10, reliability=QoSReliabilityPolicy.BEST_EFFORT
)
self.subscription = self.create_subscription(JointState, topic_name, self.listener_callback, qos_profile)
def listener_callback(self, msg: JointState):
self.get_logger().info(f"=== Received {self.topic_name} ===")
self.get_logger().info(f" header: {msg.header}")
self.get_logger().info(f" name: {msg.name}")
self.get_logger().info(f" position: {msg.position}")
self.get_logger().info(f" velocity: {msg.velocity}")
self.get_logger().info(f" effort: {msg.effort}")
def main(args=None):
rclpy.init(args=args)
# 手臂关节状态: /motion/control/arm_joint_state
# 脖子关节状态: /motion/control/neck_joint_state
joint_state_node = JointStateSubscriber("/motion/control/arm_joint_state")
try:
rclpy.spin(joint_state_node)
except KeyboardInterrupt:
pass
finally:
joint_state_node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

然后执行如下命令

source /opt/ros/humble/setup.bash
export ROS_DOMAIN_ID=232
export ROS_LOCALHOST_ONLY=0
export FASTRTPS_DEFAULT_PROFILES_FILE=/agibot/software/v0/entry/bin/cfg/ros_dds_configuration.xml
python3 /agibot/data/home/agi/Desktop/joint_state.py

我们为头部同时提供了 HTTP 接口和 ROS2 接口。

使用 HTTP 接口控制较为简单,如下脚本保存并运行即可

#!/usr/bin/env python3
## 功能:设置头部关节值
import requests
from datetime import datetime
# 生成请求头部
def create_header():
now = datetime.utcnow()
header = {
"timestamp": {
"seconds": int(now.timestamp()),
"nanos": now.microsecond * 1000,
"ms_since_epoch": int(now.timestamp() * 1000),
},
"control_source": "ControlSource_SAFE",
}
return header
def create_neck_data():
neck_data = {
"shake": {
"name": "idx27_head_joint1",
"position": 0.5,
"velocity": 0.0,
"effort": 0.0,
},
"nod": {
"name": "idx28_head_joint2",
"position": 0.5,
"velocity": 0.0,
"effort": 0.0,
},
}
return neck_data
def service_call():
# 发送服务请求
url = (
"http://192.168.100.100:56322/rpc/aimdk.protocol.McMotionService/SetNeckCommand"
)
headers = {"Content-Type": "application/json", "timeout": "60000"}
payload = {"header": create_header(), "data": create_neck_data()}
response = requests.Session().post(url, headers=headers, json=payload)
return response
# 主程序入口
def main():
response = service_call()
print(response.text)
requests.Session().close()
if __name__ == "__main__":
main()

其中的 velocity 和 effort 均无效,仅 position 为有效值。

使用 ROS2 接口调用摇头程序需要一些前置条件:

  1. 需要先切换运动控制模块的状态机至 RL_LOCOMOTION_ARM_EXT_JOINT_SERVO 状态,可在 AimMaster 上进行切换至上肢动作模式,后续详细接口讲解也会介绍如何使用程序接口进行切换。

  2. 调用如下命令释放动作播放模块对于上半身关节的控制

    Terminal window
    curl -i \
    -H 'content-type:application/json' \
    -H 'timeout: 1000' \
    -X POST http://192.168.100.100:56444/rpc/aimdk.protocol.MotionCommandService/DisableMotionPlayer \
    -d '{}'

接下来将以下脚本保存为 /agibot/data/home/agi/Desktop/neck.py

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
from sensor_msgs.msg import JointState
class NeckPublisher(Node):
def __init__(self, neck_topic_name: str):
super().__init__("neck_publisher")
qos_profile = QoSProfile(
history=QoSHistoryPolicy.KEEP_LAST,
depth=10,
reliability=QoSReliabilityPolicy.BEST_EFFORT,
)
self.publisher = self.create_publisher(JointState, neck_topic_name, qos_profile)
timer_period = 0.02
self.timer = self.create_timer(timer_period, self.timer_callback)
def timer_callback(self):
msg = JointState()
msg.header.stamp = self.get_clock().now().to_msg()
# 仅支持位控
# 第一个值代表摇头,范围 -0.785 - 0.785
# 第二个值无效,仍需要填写
msg.name = ["idx27_head_joint1", "idx28_head_joint2"]
msg.position = [0.5, 0.5]
# 以下字段无实际作用,但必须设置,否则可能导致运控 crash
msg.velocity = [0.0, 0.0]
msg.effort = [0.0, 0.0]
self.publisher.publish(msg)
def main(args=None):
rclpy.init(args=args)
neck_publisher = NeckPublisher("/motion/control/neck_joint_command")
try:
rclpy.spin(neck_publisher)
except KeyboardInterrupt:
pass
finally:
neck_publisher.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

然后运行

source /opt/ros/humble/setup.bash
export ROS_DOMAIN_ID=232
export ROS_LOCALHOST_ONLY=0
export FASTRTPS_DEFAULT_PROFILES_FILE=/agibot/software/v0/entry/bin/cfg/ros_dds_configuration.xml
python3 /agibot/data/home/agi/Desktop/neck.py