6. 二次开发快速上手
6. 二次开发快速上手
Section titled “6. 二次开发快速上手”本章节将通过几个简单的示例程序演示如何快速跑起第一个二次开发程序,对此建立一个基本认知。
快速上手教程以二次开发程序部署在 ORIN 上且使用 Python venv 虚拟环境为例。
6.1 前置条件
Section titled “6.1 前置条件”6.1.1 连接 ORIN
Section titled “6.1.1 连接 ORIN”连接 ORIN 有多种方式,以下方式任选其一即可。
-
有线连接
将 PC 的有线网口 ip 调整为固定 ip 192.168.2.100,然后将网线两端一端连接 PC 有线网口,一端连接机器人背后右侧调试网口。
ping 192.168.2.50 # 确认是否联通ssh agi@192.168.2.50 -
无线 AP 热点连接
开发机连接上 ORIN AP 热点,一般以 Robot- 开头,默认密码可在 AimDK 包中查看 password.md 文件,然后通过 AP 热点 IP ssh 登录到机器人上
Terminal window ssh agi@192.168.88.88 -
通过 X86 WiFi 跳转
需要开发机和机器人连接到同一个无线网络,从 AimMaster 的设置-无线局域网界面上获取机器人 ip,实际为X86 WiFi的 ip,ssh 连接上后再通过 192.168.100.110 跳转到 ORIN
Terminal window ssh agi@192.168.xxx.xxx # x86 ipssh agi@192.168.100.110ORIN 和 X86 之间通过网线直连在 192.168.100.0/24 网段上互联,可以通过 192.168.100.100 和 192.168.100.110 ip 进行互相跳转。
也可以使用 ssh 的 proxyjump 功能一次性跳转过去(仍需输入两次密码)
Terminal window ssh -J agi@192.168.xxx.xxx agi@192.168.100.110proxyjump 功能同样可用于 rsync 将本地文件夹快速同步到 ORIN 上无需在 x86 上中转传输,如下所示(x86 wifi ip 和文件夹需要修改):
Terminal window rsync -avzP --exclude ".git/" \-e "ssh -J agi@192.168.xxx.xxx" \/path/to/local/dir agi@192.168.100.110:/agibot/data/home/agi/Desktop/working_dir
6.1.2 创建部署程序文件夹
Section titled “6.1.2 创建部署程序文件夹”推荐的程序部署目录为 /agibot/data/home/agi/Desktop,机上磁盘清理模块会定时清理空间,该文件夹为白名单文件夹不会被清理。
程序、数据放置于其他路径可能造成分区写满、文件被误删等后果,请严格遵循使用 /agibot/data/home/agi/Desktop 目录的约定。
mkdir -p /agibot/data/home/agi/Desktop如遇权限问题,可考虑使用如下命令进行文件夹创建
sudo mkdir -p /agibot/data/home/agi/Desktopsudo chown -R agi:agi /agibot/data/home/agi6.1.3 创建 Python 虚拟环境
Section titled “6.1.3 创建 Python 虚拟环境”cd /agibot/data/home/agi/Desktoppython -m venv mydevsource mydev/bin/activate后续示例都假设已创建虚拟环境并激活,不再重复此步骤。
6.2 获取电池电量程序运行示例
Section titled “6.2 获取电池电量程序运行示例”本示例采用 HTTP 请求获取机器人 BMS 状态。
直接在 ORIN 命令行运行如下脚本:
curl -i -H 'content-type:application/json' \ -H 'timeout: 1000' -X POST 'http://192.168.100.100:56421/rpc/aimdk.protocol.HalBmsService/GetBmsState' \ -d '{}'返回结果应如下所示(具体字段含义在后续接口章节中有详细解释):
HTTP/1.1 200 OKAccess-Control-Allow-Origin: *Content-Type: application/jsonContent-Length: 705
{"data":{"ver":{"hardware_major":48,"hardware_minor":48,"hardware_revision":49,"software_major":51,"software_minor":48,"software_revision":49},"voltage":54410,"current":-304,"power":0,"temperature":308,"capacity":13971,"charge":99,"power_supply_health":"PowerSupplyHealth_GOOD","power_supply_status":"PowerSupplyStatus_IDEL","cycles_num":129,"cycles_capacity":12,"abnormal_state":"PowerAbnormalStatus_NORMAL","charger_state":"ChargerConnected","bms_state":"BatteryStatus_Connected","max_current":25490,"battery_firmware_type":"BatteryFirmwareType_OLD","battery_key_state":"BatteryKeyStatus_SHORT_CURCUIT","battery_pack_state":"BatteryPackStatus_NORMAL","battery_comm_state":"BatteryCommunication_NORMAL"}}6.3 获取机器人上肢关节状态示例
Section titled “6.3 获取机器人上肢关节状态示例”将以下代码在 ORIN 上保存为 /agibot/data/home/agi/Desktop/joint_state.py 文件。
#!/usr/bin/env python3import rclpyfrom rclpy.node import Nodefrom rclpy.qos import QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicyfrom sensor_msgs.msg import JointState
class JointStateSubscriber(Node): def __init__(self, topic_name: str): super().__init__("joint_state_subscriber")
self.topic_name = topic_name
qos_profile = QoSProfile( history=QoSHistoryPolicy.KEEP_LAST, depth=10, reliability=QoSReliabilityPolicy.BEST_EFFORT )
self.subscription = self.create_subscription(JointState, topic_name, self.listener_callback, qos_profile)
def listener_callback(self, msg: JointState): self.get_logger().info(f"=== Received {self.topic_name} ===") self.get_logger().info(f" header: {msg.header}") self.get_logger().info(f" name: {msg.name}") self.get_logger().info(f" position: {msg.position}") self.get_logger().info(f" velocity: {msg.velocity}") self.get_logger().info(f" effort: {msg.effort}")
def main(args=None): rclpy.init(args=args) # 手臂关节状态: /motion/control/arm_joint_state # 脖子关节状态: /motion/control/neck_joint_state joint_state_node = JointStateSubscriber("/motion/control/arm_joint_state") try: rclpy.spin(joint_state_node) except KeyboardInterrupt: pass finally: joint_state_node.destroy_node() rclpy.shutdown()
if __name__ == "__main__": main()然后执行如下命令
source /opt/ros/humble/setup.bashexport ROS_DOMAIN_ID=232export ROS_LOCALHOST_ONLY=0export FASTRTPS_DEFAULT_PROFILES_FILE=/agibot/software/v0/entry/bin/cfg/ros_dds_configuration.xmlpython3 /agibot/data/home/agi/Desktop/joint_state.py6.4 控制摇头接口示例
Section titled “6.4 控制摇头接口示例”我们为头部同时提供了 HTTP 接口和 ROS2 接口。
6.4.1 摇头 HTTP 示例
Section titled “6.4.1 摇头 HTTP 示例”使用 HTTP 接口控制较为简单,如下脚本保存并运行即可
#!/usr/bin/env python3
## 功能:设置头部关节值
import requestsfrom datetime import datetime
# 生成请求头部def create_header(): now = datetime.utcnow() header = { "timestamp": { "seconds": int(now.timestamp()), "nanos": now.microsecond * 1000, "ms_since_epoch": int(now.timestamp() * 1000), }, "control_source": "ControlSource_SAFE", } return header
def create_neck_data(): neck_data = { "shake": { "name": "idx27_head_joint1", "position": 0.5, "velocity": 0.0, "effort": 0.0, }, "nod": { "name": "idx28_head_joint2", "position": 0.5, "velocity": 0.0, "effort": 0.0, }, }
return neck_data
def service_call(): # 发送服务请求
url = ( "http://192.168.100.100:56322/rpc/aimdk.protocol.McMotionService/SetNeckCommand" ) headers = {"Content-Type": "application/json", "timeout": "60000"}
payload = {"header": create_header(), "data": create_neck_data()}
response = requests.Session().post(url, headers=headers, json=payload)
return response
# 主程序入口def main(): response = service_call() print(response.text) requests.Session().close()
if __name__ == "__main__": main()其中的 velocity 和 effort 均无效,仅 position 为有效值。
6.4.2 摇头 ROS2 示例
Section titled “6.4.2 摇头 ROS2 示例”使用 ROS2 接口调用摇头程序需要一些前置条件:
-
需要先切换运动控制模块的状态机至 RL_LOCOMOTION_ARM_EXT_JOINT_SERVO 状态,可在 AimMaster 上进行切换至上肢动作模式,后续详细接口讲解也会介绍如何使用程序接口进行切换。
-
调用如下命令释放动作播放模块对于上半身关节的控制
Terminal window curl -i \-H 'content-type:application/json' \-H 'timeout: 1000' \-X POST http://192.168.100.100:56444/rpc/aimdk.protocol.MotionCommandService/DisableMotionPlayer \-d '{}'
接下来将以下脚本保存为 /agibot/data/home/agi/Desktop/neck.py
#!/usr/bin/env python3import rclpyfrom rclpy.node import Nodefrom rclpy.qos import QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicyfrom sensor_msgs.msg import JointState
class NeckPublisher(Node): def __init__(self, neck_topic_name: str): super().__init__("neck_publisher")
qos_profile = QoSProfile( history=QoSHistoryPolicy.KEEP_LAST, depth=10, reliability=QoSReliabilityPolicy.BEST_EFFORT, )
self.publisher = self.create_publisher(JointState, neck_topic_name, qos_profile)
timer_period = 0.02 self.timer = self.create_timer(timer_period, self.timer_callback)
def timer_callback(self): msg = JointState() msg.header.stamp = self.get_clock().now().to_msg()
# 仅支持位控 # 第一个值代表摇头,范围 -0.785 - 0.785 # 第二个值无效,仍需要填写 msg.name = ["idx27_head_joint1", "idx28_head_joint2"] msg.position = [0.5, 0.5]
# 以下字段无实际作用,但必须设置,否则可能导致运控 crash msg.velocity = [0.0, 0.0] msg.effort = [0.0, 0.0]
self.publisher.publish(msg)
def main(args=None): rclpy.init(args=args) neck_publisher = NeckPublisher("/motion/control/neck_joint_command") try: rclpy.spin(neck_publisher) except KeyboardInterrupt: pass finally: neck_publisher.destroy_node() rclpy.shutdown()
if __name__ == "__main__": main()然后运行
source /opt/ros/humble/setup.bashexport ROS_DOMAIN_ID=232export ROS_LOCALHOST_ONLY=0export FASTRTPS_DEFAULT_PROFILES_FILE=/agibot/software/v0/entry/bin/cfg/ros_dds_configuration.xmlpython3 /agibot/data/home/agi/Desktop/neck.py