8. 交互二开使用指南
8. 交互二开使用指南
Section titled “8. 交互二开使用指南”交互相关的端侧二次开发提供两类开发模式:
- 智元交互简单拓展
- 智元交互能力保留,由智元提供 tts 和动作表情等基本接口,可以通过这些接口操作机器人进行基本展厅导览、讲解等流程,同时机器人可正常走智元语音交互方案。
- 二次开发完全接管
- 关闭智元交互能力,由智元提供包含本体降噪后的机器人麦克风输入,完全由二次开发程序接管语音交互相关内容。
以下为二者详细对比表格
| 模式 | 智元交互能力是否保留 | 智元侧提供接口 | 联网需求 | 适用场景 | 开发难度及工作量 |
|---|---|---|---|---|---|
| 智元交互简单拓展 | 是 | tts播放、扬声器播放 | 智元交互方案以及 tts 接口都需要联网 | 开发简单展厅导览演示 demo,固定流程编排 | 较为简单,工作量取决于流程复杂程度,一般较小 |
| 二次开发完全接管 | 否 | 降噪音频、扬声器 | 机器人初始化时需要联网,后续使用无需联网 | 自行开发完整交互方案 | 难度较大,需要有完整的相关研发团队,工作量较大 |
8.1 智元交互简单拓展
Section titled “8.1 智元交互简单拓展”该模式下课使用智元提供的表情、动作播放、TTS播放、人脸识别等接口实现一定的功能,但目前多数接口暂未开放,后续将在开放相关接口后做详细说明。
8.2 二次开发完全接管
Section titled “8.2 二次开发完全接管”交互二开涉及关闭智元交互能力、麦克风音频获取、扬声器使用、三个部分。以下将从这三个部分介绍如何进行交互二开。
8.2.1 关闭智元交互能力
Section titled “8.2.1 关闭智元交互能力”要想二次开发完全接管,首先需关闭智元交互能力,仅输出包含本体降噪后的麦克风音频,操作如下:
调整 agent 模块为 only_voice 模式,可以使用麦克风管理部分 SetAgentPropertiesRequest 接口。
-
only_voice:仅输出降噪麦克音频 /agent/process_audio_output,后续链路全部断开
所需调用的 rpc 如下:
Terminal window curl -i \-H 'content-type:application/json' \-X POST 'http://192.168.100.110:59301/rpc/aimdk.protocol.AgentControlService/SetAgentPropertiesRequest' \-d '{ "contents": { "properties": { "2": "only_voice" } } }'调用后需要重启机器人才能生效,可以待第二步的修改完成后再一块重启即可。重启后可以通过使用麦克风管理部分
GetAgentPropertiesRequest接口查询当前交互运行模式,判断模式是否修改成功。 (如需恢复,则将SetAgentPropertiesRequest接口中的 “only_voice” 改为 “normal” 即可)
8.2.2 麦克风音频获取
Section titled “8.2.2 麦克风音频获取”注意:要获取以下音频需要机器人开机时至少联网 2 分钟以上完成音频相关鉴权操作,否则将无原始音频输出,如需离线使用,请首先保证该接口有音频输出后再断网
退出交互链路后,可通过降噪麦克音频 Topic 接口 /agent/process_audio_output 获得本体降噪后的麦克风音频信息。获取本体降噪后的麦克风音频示例程序如下:
#!/usr/bin/env python3
import rclpyfrom rclpy.node import Nodefrom rclpy.qos import QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicyfrom ros2_plugin_proto.msg import RosMsgWrapper
from aimdk.protocol_pb2 import ProcessedAudioOutput, AudioVADState
import datetimeimport os
class AudioSubscriber(Node): def __init__(self): super().__init__("audio_subscriber")
# 音频缓冲区,按 stream_id 分别存储 self.audio_buffers = {} # {stream_id: bytearray()} self.recording_state = {} # {stream_id: bool} 记录是否正在录音
# 创建音频文件存储目录 self.audio_output_dir = "audio_recordings" os.makedirs(self.audio_output_dir, exist_ok=True)
qos_profile = QoSProfile( history=QoSHistoryPolicy.KEEP_LAST, depth=10, reliability=QoSReliabilityPolicy.BEST_EFFORT, )
self.subscription = self.create_subscription( RosMsgWrapper, "/agent/process_audio_output/pb_3Aaimdk_2Eprotocol_2EProcessedAudioOutput", self.audio_callback, qos_profile, )
self.get_logger().info("开始订阅降噪音频数据。..")
def audio_callback(self, msg): try: # 检查序列化类型是否为 pb if msg.serialization_type != "pb": self.get_logger().warn(f"不支持的序列化类型:{msg.serialization_type}") return
# 将 data 字段从 list[bytes] 转换为 bytes audio_data_bytes = b"".join(msg.data)
# 使用生成的 protobuf 类解析消息 processed_audio = ProcessedAudioOutput() processed_audio.ParseFromString(audio_data_bytes)
self.get_logger().info( f"收到音频数据:stream_id={processed_audio.stream_id}, " f"vad_state={processed_audio.vad_state}, " f"audio_size={len(processed_audio.audio_data)} bytes" )
# 根据 VAD 状态处理音频 self.handle_vad_state(processed_audio)
except Exception as e: self.get_logger().error(f"处理音频消息时出错:{e}")
def handle_vad_state(self, processed_audio): """处理不同的 VAD 状态""" vad_state = processed_audio.vad_state stream_id = processed_audio.stream_id audio_data = processed_audio.audio_data
# 初始化该 stream_id 的缓冲区(如果不存在) if stream_id not in self.audio_buffers: self.audio_buffers[stream_id] = bytearray() self.recording_state[stream_id] = False
# VAD 状态名称映射 vad_state_names = { AudioVADState.AUDIO_VAD_STATE_NONE: "无语音", AudioVADState.AUDIO_VAD_STATE_BEGIN: "语音开始", AudioVADState.AUDIO_VAD_STATE_PROCESSING: "语音处理中", AudioVADState.AUDIO_VAD_STATE_END: "语音结束", }
stream_names = {1: "内置麦克风", 2: "外置麦克风"}
self.get_logger().info( f"[{stream_names.get(stream_id, f'未知流{stream_id}')}] " f"VAD 状态:{vad_state_names.get(vad_state, f'未知状态{vad_state}')} " f"音频数据:{len(audio_data)} bytes" )
# 根据 VAD 状态处理音频数据 if vad_state == AudioVADState.AUDIO_VAD_STATE_BEGIN: self.get_logger().info("🎤 检测到语音开始") # 开始新的录音,清空缓冲区 self.audio_buffers[stream_id].clear() self.recording_state[stream_id] = True # 添加当前音频数据 if len(audio_data) > 0: self.audio_buffers[stream_id].extend(audio_data)
elif vad_state == AudioVADState.AUDIO_VAD_STATE_PROCESSING: self.get_logger().info("🔄 语音处理中。..") # 如果正在录音,继续添加音频数据到缓冲区 if self.recording_state[stream_id] and len(audio_data) > 0: self.audio_buffers[stream_id].extend(audio_data)
elif vad_state == AudioVADState.AUDIO_VAD_STATE_END: self.get_logger().info("✅ 语音结束") # 添加最后的音频数据 if self.recording_state[stream_id] and len(audio_data) > 0: self.audio_buffers[stream_id].extend(audio_data)
# 保存完整的音频段 if ( self.recording_state[stream_id] and len(self.audio_buffers[stream_id]) > 0 ): self.save_audio_segment(bytes(self.audio_buffers[stream_id]), stream_id)
# 结束录音 self.recording_state[stream_id] = False
elif vad_state == AudioVADState.AUDIO_VAD_STATE_NONE: # 无语音状态,不进行录音 if self.recording_state[stream_id]: self.get_logger().info("⏹️ 录音状态重置") self.recording_state[stream_id] = False
# 输出当前缓冲区状态 if stream_id in self.audio_buffers: buffer_size = len(self.audio_buffers[stream_id]) recording = self.recording_state[stream_id] self.get_logger().debug( f"[Stream {stream_id}] 缓冲区大小:{buffer_size} bytes, 录音状态:{recording}" )
def save_audio_segment(self, audio_data, stream_id): """保存音频段 16kHz, 16 位,单声道 PCM""" if len(audio_data) > 0: timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
# 按 stream_id 创建子目录 stream_dir = os.path.join(self.audio_output_dir, f"stream_{stream_id}") os.makedirs(stream_dir, exist_ok=True)
# 生成文件名 stream_names = {1: "internal_mic", 2: "external_mic"} stream_name = stream_names.get(stream_id, f"stream_{stream_id}") filename = f"{stream_name}_{timestamp}.pcm" filepath = os.path.join(stream_dir, filename)
try: with open(filepath, "wb") as f: f.write(audio_data) self.get_logger().info( f"音频段已保存:{filepath} (大小:{len(audio_data)} bytes)" )
# 记录音频文件的时长(假设 16kHz, 16 位,单声道) sample_rate = 16000 bits_per_sample = 16 channels = 1 bytes_per_sample = bits_per_sample // 8 total_samples = len(audio_data) // (bytes_per_sample * channels) duration_seconds = total_samples / sample_rate
self.get_logger().info( f"音频时长:{duration_seconds:.2f} 秒 ({total_samples} 样本)" )
except Exception as e: self.get_logger().error(f"保存音频文件失败:{e}")
def get_buffer_info(self): """获取所有缓冲区的信息(用于调试)""" info = {} for stream_id in self.audio_buffers: info[stream_id] = { "buffer_size": len(self.audio_buffers[stream_id]), "recording": self.recording_state[stream_id], } return info
def main(args=None): rclpy.init(args=args)
audio_subscriber = AudioSubscriber()
try: audio_subscriber.get_logger().info("正在监听降噪音频数据,按 Ctrl+C 退出。..") rclpy.spin(audio_subscriber) except KeyboardInterrupt: audio_subscriber.get_logger().info("收到退出信号,正在关闭。..") finally: audio_subscriber.destroy_node() rclpy.shutdown()
if __name__ == "__main__": main()以上程序依赖 python 协议包 a2_aimdk 和 ros2 协议包 ros2_plugin_proto,相应的包已经放置在 AimDK 开发包的 prebuilt 目录下,python 包使用 pip install prebuilt/a2_aimdk-3.0.0-py3-none-any.whl 安装,ros2 包需要 source prebuilt/ros2_plugin_proto_aarch64/share/ros2_plugin_proto/local_setup.bash后使用。
请注意,以上程序会接收 ros2 消息,需要设置如下环境变量:
export ROS_DOMAIN_ID=232export FASTRTPS_DEFAULT_PROFILES_FILE=/agibot/software/v0/entry/cfg/ros_dds_configuration.xml其中音频数据为 16kHz 采样率,16 位单声道 PCM 数据(小端序),输出的音频为纯净人声,已经过降噪处理,回声消除等,可以直接用于 ASR 识别。
ProcessedAudioOutput消息包含以下字段:
| 字段名 | 类型 | 说明 |
|---|---|---|
| header | Header | 通用消息头,包含时间戳和消息 ID |
| stream_id | uint32 | 音频流 ID(1:内置麦克风,2:外置麦克风) |
| vad_state | AudioVADState | 语音活动检测状态 |
| audio_data | bytes | 降噪处理后的 PCM 音频数据 |
AudioVADState 枚举定义:
| 枚举值 | 数值 | 说明 |
|---|---|---|
| AUDIO_VAD_STATE_NONE | 0 | 无语音 |
| AUDIO_VAD_STATE_BEGIN | 1 | 语音开始 |
| AUDIO_VAD_STATE_PROCESSING | 2 | 语音处理中 |
| AUDIO_VAD_STATE_END | 3 | 语音结束 |
注意:当前版本外置麦克风的接口输出 vad_state 存在问题,预期一条语音输入的状态为 122222222223,实际输出的状态为 0111111111112,此问题仅在外置麦场景下出现(内置麦正常),并计划在后续版本修复。当前版本建议手动对状态执行 +1 补偿。
8.2.3 扬声器音频播放
Section titled “8.2.3 扬声器音频播放”要想使用扬声器首先需获取音频焦点:
ros2 service call /audio_5Fmsgs/srv/RequestAudioFocus audio_msgs/srv/RequestAudioFocus "{focus_requester: {pkg_name: audio_examples_sender, priority: 6, priority_weight: 1}}"pkg_name为ROS包名,意味着音频最后必须以在ROS包工程内的形式进行发送。priority为优先级固定为6即可,priority_weight为音频权重,可设置为1-100任意数。
申请音频焦点后即可将自身TTS返回的pcm数据流通过音频流播放接口进行播放,扬声器播放流式音频具体例程如下,需打包成ros_pkg。
#!/usr/bin/env python3
import rclpyfrom rclpy.node import Nodefrom builtin_interfaces.msg import Time as TimeMsgfrom audio_msgs.msg import AudioPlayback,AudioInfo,AudioData # 需确保 audio msg 可导入
class AudioPlaybackPublisher(Node): def __init__(self): super().__init__('audio_playback_publisher') self.pub = self.create_publisher(AudioPlayback, '/audiohal/audio/playback', 10) self.timer = self.create_timer(1.0, self.timer_callback) # 每秒发送
def timer_callback(self): msg = AudioPlayback()
# 填写时间戳(字段名 attachments 中为 stamps) now = self.get_clock().now().to_msg() # 兼容性:优先使用 stamps,否则尝试 stamp if hasattr(msg, 'stamps'): msg.stamps = now elif hasattr(msg, 'stamp'): msg.stamp = now
# 填写 info(AudioInfo) info = AudioInfo() info.channels = 1 # 单声道 info.sample_rate = 16000 # 16kHz info.sample_format = 'S16LE' info.coding_format = 'pcm' msg.info = info
# 填写 data(AudioData),示例给一小段原始 bytes sample_bytes = b'\x00\x01\x02\x03\x04\x05\x06\x07' # 示例数据 msg.data = AudioData(data=list(sample_bytes))
msg.pkg_name = 'audio_examples_sender' msg.token_id = 'token-0001'
self.pub.publish(msg) self.get_logger().info( f'Published AudioPlayback pkg={msg.pkg_name} token={msg.token_id} ' f'data_len={len(msg.data.data)} sample_rate={msg.info.sample_rate}' )
def main(args=None): rclpy.init(args=args) node = AudioPlaybackPublisher() try: rclpy.spin(node) except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown()
if __name__ == '__main__': main()以上程序在AimDK开发包agent目录下展示,且程序依赖 python 协议包 a2_aimdk 和 ros2 协议包 ros2_plugin_proto,相应的包已经放置在 AimDK 开发包的 prebuilt 目录下,python 包使用 pip install prebuilt/a2_aimdk-3.0.0-py3-none-any.whl 安装,ros2 包需要 source prebuilt/audio_msgs_proto_aarch64/share/audio_msgs/local_setup.bash后使用。
8.3 唤醒结果上报
Section titled “8.3 唤醒结果上报”若 agent 设置为 only_voice 模式,智元还提供唤醒结果上报能力, Topic 接口为 /agent/wakeup/pb_3Aaimdk_2Eprotocol_2EWakeUpResult
获取唤醒结果上报示例程序如下:
#!/usr/bin/env python3
import rclpyfrom rclpy.node import Nodefrom rclpy.qos import ( QoSDurabilityPolicy, QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy,)from ros2_plugin_proto.msg import RosMsgWrapper
from aimdk.protocol_pb2 import WakeUpResult
TOPIC = "/agent/wakeup/pb_3Aaimdk_2Eprotocol_2EWakeUpResult"
class WakeUpSubscriber(Node): def __init__(self): super().__init__("wakeup_result_subscriber")
qos_profile = QoSProfile( history=QoSHistoryPolicy.KEEP_LAST, depth=10, reliability=QoSReliabilityPolicy.RELIABLE, durability=QoSDurabilityPolicy.VOLATILE, )
self.subscription = self.create_subscription( RosMsgWrapper, TOPIC, self.wakeup_callback, qos_profile, )
self.get_logger().info(f"已开始订阅 WakeUpResult: {TOPIC}")
def wakeup_callback(self, msg): try: if msg.serialization_type != "pb": self.get_logger().warn( f"收到不支持的序列化类型: {msg.serialization_type}" ) return
# 拼接 bytes raw_bytes = b"".join(msg.data)
# 解析 protobuf wakeup_result = WakeUpResult() wakeup_result.ParseFromString(raw_bytes)
# 日志输出 import json from google.protobuf.json_format import MessageToDict
self.get_logger().info( f"WakeUpResult: {json.dumps(MessageToDict(wakeup_result, preserving_proto_field_name=True), ensure_ascii=False, indent=2)}" )
except Exception as e: self.get_logger().error(f"解析 WakeUpResult 数据时出现错误: {e}")
def main(args=None): rclpy.init(args=args) node = WakeUpSubscriber()
try: node.get_logger().info("正在监听 WakeUpResult,按 Ctrl+C 退出...") rclpy.spin(node) except KeyboardInterrupt: node.get_logger().info("退出中...") finally: node.destroy_node() rclpy.shutdown()
if __name__ == "__main__": main()以上程序依赖 python 协议包 a2_aimdk 和 ros2 协议包 ros2_plugin_proto,相应的包已经放置在 AimDK 开发包的 prebuilt 目录下,python 包使用 pip install prebuilt/a2_aimdk-3.0.0-py3-none-any.whl 安装,ros2 包需要 source prebuilt/ros2_plugin_proto_aarch64/share/ros2_plugin_proto/local_setup.bash后使用。
请注意,以上程序会接收 ros2 消息,需要设置如下环境变量:
export ROS_DOMAIN_ID=232export FASTRTPS_DEFAULT_PROFILES_FILE=/agibot/software/v0/entry/cfg/ros_dds_configuration.xml