跳转到内容

8. 交互二开使用指南

交互相关的端侧二次开发提供两类开发模式:

  1. 智元交互简单拓展
    • 智元交互能力保留,由智元提供 tts 和动作表情等基本接口,可以通过这些接口操作机器人进行基本展厅导览、讲解等流程,同时机器人可正常走智元语音交互方案。
  2. 二次开发完全接管
    • 关闭智元交互能力,由智元提供包含本体降噪后的机器人麦克风输入,完全由二次开发程序接管语音交互相关内容。

以下为二者详细对比表格

模式智元交互能力是否保留智元侧提供接口联网需求适用场景开发难度及工作量
智元交互简单拓展tts播放、扬声器播放智元交互方案以及 tts 接口都需要联网开发简单展厅导览演示 demo,固定流程编排较为简单,工作量取决于流程复杂程度,一般较小
二次开发完全接管降噪音频、扬声器机器人初始化时需要联网,后续使用无需联网自行开发完整交互方案难度较大,需要有完整的相关研发团队,工作量较大

该模式下课使用智元提供的表情、动作播放、TTS播放、人脸识别等接口实现一定的功能,但目前多数接口暂未开放,后续将在开放相关接口后做详细说明。

交互二开涉及关闭智元交互能力、麦克风音频获取、扬声器使用、三个部分。以下将从这三个部分介绍如何进行交互二开。

要想二次开发完全接管,首先需关闭智元交互能力,仅输出包含本体降噪后的麦克风音频,操作如下:

调整 agent 模块为 only_voice 模式,可以使用麦克风管理部分 SetAgentPropertiesRequest 接口。

  • only_voice:仅输出降噪麦克音频 /agent/process_audio_output,后续链路全部断开

    所需调用的 rpc 如下:

    Terminal window
    curl -i \
    -H 'content-type:application/json' \
    -X POST 'http://192.168.100.110:59301/rpc/aimdk.protocol.AgentControlService/SetAgentPropertiesRequest' \
    -d '{ "contents": { "properties": { "2": "only_voice" } } }'

    调用后需要重启机器人才能生效,可以待第二步的修改完成后再一块重启即可。重启后可以通过使用麦克风管理部分 GetAgentPropertiesRequest 接口查询当前交互运行模式,判断模式是否修改成功。 (如需恢复,则将SetAgentPropertiesRequest 接口中的 “only_voice” 改为 “normal” 即可)

注意:要获取以下音频需要机器人开机时至少联网 2 分钟以上完成音频相关鉴权操作,否则将无原始音频输出,如需离线使用,请首先保证该接口有音频输出后再断网

退出交互链路后,可通过降噪麦克音频 Topic 接口 /agent/process_audio_output 获得本体降噪后的麦克风音频信息。获取本体降噪后的麦克风音频示例程序如下:

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSHistoryPolicy, QoSProfile, QoSReliabilityPolicy
from ros2_plugin_proto.msg import RosMsgWrapper
from aimdk.protocol_pb2 import ProcessedAudioOutput, AudioVADState
import datetime
import os
class AudioSubscriber(Node):
def __init__(self):
super().__init__("audio_subscriber")
# 音频缓冲区,按 stream_id 分别存储
self.audio_buffers = {} # {stream_id: bytearray()}
self.recording_state = {} # {stream_id: bool} 记录是否正在录音
# 创建音频文件存储目录
self.audio_output_dir = "audio_recordings"
os.makedirs(self.audio_output_dir, exist_ok=True)
qos_profile = QoSProfile(
history=QoSHistoryPolicy.KEEP_LAST,
depth=10,
reliability=QoSReliabilityPolicy.BEST_EFFORT,
)
self.subscription = self.create_subscription(
RosMsgWrapper,
"/agent/process_audio_output/pb_3Aaimdk_2Eprotocol_2EProcessedAudioOutput",
self.audio_callback,
qos_profile,
)
self.get_logger().info("开始订阅降噪音频数据。..")
def audio_callback(self, msg):
try:
# 检查序列化类型是否为 pb
if msg.serialization_type != "pb":
self.get_logger().warn(f"不支持的序列化类型:{msg.serialization_type}")
return
# 将 data 字段从 list[bytes] 转换为 bytes
audio_data_bytes = b"".join(msg.data)
# 使用生成的 protobuf 类解析消息
processed_audio = ProcessedAudioOutput()
processed_audio.ParseFromString(audio_data_bytes)
self.get_logger().info(
f"收到音频数据:stream_id={processed_audio.stream_id}, "
f"vad_state={processed_audio.vad_state}, "
f"audio_size={len(processed_audio.audio_data)} bytes"
)
# 根据 VAD 状态处理音频
self.handle_vad_state(processed_audio)
except Exception as e:
self.get_logger().error(f"处理音频消息时出错:{e}")
def handle_vad_state(self, processed_audio):
"""处理不同的 VAD 状态"""
vad_state = processed_audio.vad_state
stream_id = processed_audio.stream_id
audio_data = processed_audio.audio_data
# 初始化该 stream_id 的缓冲区(如果不存在)
if stream_id not in self.audio_buffers:
self.audio_buffers[stream_id] = bytearray()
self.recording_state[stream_id] = False
# VAD 状态名称映射
vad_state_names = {
AudioVADState.AUDIO_VAD_STATE_NONE: "无语音",
AudioVADState.AUDIO_VAD_STATE_BEGIN: "语音开始",
AudioVADState.AUDIO_VAD_STATE_PROCESSING: "语音处理中",
AudioVADState.AUDIO_VAD_STATE_END: "语音结束",
}
stream_names = {1: "内置麦克风", 2: "外置麦克风"}
self.get_logger().info(
f"[{stream_names.get(stream_id, f'未知流{stream_id}')}] "
f"VAD 状态:{vad_state_names.get(vad_state, f'未知状态{vad_state}')} "
f"音频数据:{len(audio_data)} bytes"
)
# 根据 VAD 状态处理音频数据
if vad_state == AudioVADState.AUDIO_VAD_STATE_BEGIN:
self.get_logger().info("🎤 检测到语音开始")
# 开始新的录音,清空缓冲区
self.audio_buffers[stream_id].clear()
self.recording_state[stream_id] = True
# 添加当前音频数据
if len(audio_data) > 0:
self.audio_buffers[stream_id].extend(audio_data)
elif vad_state == AudioVADState.AUDIO_VAD_STATE_PROCESSING:
self.get_logger().info("🔄 语音处理中。..")
# 如果正在录音,继续添加音频数据到缓冲区
if self.recording_state[stream_id] and len(audio_data) > 0:
self.audio_buffers[stream_id].extend(audio_data)
elif vad_state == AudioVADState.AUDIO_VAD_STATE_END:
self.get_logger().info("✅ 语音结束")
# 添加最后的音频数据
if self.recording_state[stream_id] and len(audio_data) > 0:
self.audio_buffers[stream_id].extend(audio_data)
# 保存完整的音频段
if (
self.recording_state[stream_id]
and len(self.audio_buffers[stream_id]) > 0
):
self.save_audio_segment(bytes(self.audio_buffers[stream_id]), stream_id)
# 结束录音
self.recording_state[stream_id] = False
elif vad_state == AudioVADState.AUDIO_VAD_STATE_NONE:
# 无语音状态,不进行录音
if self.recording_state[stream_id]:
self.get_logger().info("⏹️ 录音状态重置")
self.recording_state[stream_id] = False
# 输出当前缓冲区状态
if stream_id in self.audio_buffers:
buffer_size = len(self.audio_buffers[stream_id])
recording = self.recording_state[stream_id]
self.get_logger().debug(
f"[Stream {stream_id}] 缓冲区大小:{buffer_size} bytes, 录音状态:{recording}"
)
def save_audio_segment(self, audio_data, stream_id):
"""保存音频段 16kHz, 16 位,单声道 PCM"""
if len(audio_data) > 0:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
# 按 stream_id 创建子目录
stream_dir = os.path.join(self.audio_output_dir, f"stream_{stream_id}")
os.makedirs(stream_dir, exist_ok=True)
# 生成文件名
stream_names = {1: "internal_mic", 2: "external_mic"}
stream_name = stream_names.get(stream_id, f"stream_{stream_id}")
filename = f"{stream_name}_{timestamp}.pcm"
filepath = os.path.join(stream_dir, filename)
try:
with open(filepath, "wb") as f:
f.write(audio_data)
self.get_logger().info(
f"音频段已保存:{filepath} (大小:{len(audio_data)} bytes)"
)
# 记录音频文件的时长(假设 16kHz, 16 位,单声道)
sample_rate = 16000
bits_per_sample = 16
channels = 1
bytes_per_sample = bits_per_sample // 8
total_samples = len(audio_data) // (bytes_per_sample * channels)
duration_seconds = total_samples / sample_rate
self.get_logger().info(
f"音频时长:{duration_seconds:.2f} 秒 ({total_samples} 样本)"
)
except Exception as e:
self.get_logger().error(f"保存音频文件失败:{e}")
def get_buffer_info(self):
"""获取所有缓冲区的信息(用于调试)"""
info = {}
for stream_id in self.audio_buffers:
info[stream_id] = {
"buffer_size": len(self.audio_buffers[stream_id]),
"recording": self.recording_state[stream_id],
}
return info
def main(args=None):
rclpy.init(args=args)
audio_subscriber = AudioSubscriber()
try:
audio_subscriber.get_logger().info("正在监听降噪音频数据,按 Ctrl+C 退出。..")
rclpy.spin(audio_subscriber)
except KeyboardInterrupt:
audio_subscriber.get_logger().info("收到退出信号,正在关闭。..")
finally:
audio_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

以上程序依赖 python 协议包 a2_aimdk 和 ros2 协议包 ros2_plugin_proto,相应的包已经放置在 AimDK 开发包的 prebuilt 目录下,python 包使用 pip install prebuilt/a2_aimdk-3.0.0-py3-none-any.whl 安装,ros2 包需要 source prebuilt/ros2_plugin_proto_aarch64/share/ros2_plugin_proto/local_setup.bash后使用。

请注意,以上程序会接收 ros2 消息,需要设置如下环境变量:

Terminal window
export ROS_DOMAIN_ID=232
export FASTRTPS_DEFAULT_PROFILES_FILE=/agibot/software/v0/entry/cfg/ros_dds_configuration.xml

其中音频数据为 16kHz 采样率,16 位单声道 PCM 数据(小端序),输出的音频为纯净人声,已经过降噪处理,回声消除等,可以直接用于 ASR 识别。

ProcessedAudioOutput消息包含以下字段:

字段名类型说明
headerHeader通用消息头,包含时间戳和消息 ID
stream_iduint32音频流 ID(1:内置麦克风,2:外置麦克风)
vad_stateAudioVADState语音活动检测状态
audio_databytes降噪处理后的 PCM 音频数据

AudioVADState 枚举定义:

枚举值数值说明
AUDIO_VAD_STATE_NONE0无语音
AUDIO_VAD_STATE_BEGIN1语音开始
AUDIO_VAD_STATE_PROCESSING2语音处理中
AUDIO_VAD_STATE_END3语音结束

注意:当前版本外置麦克风的接口输出 vad_state 存在问题,预期一条语音输入的状态为 122222222223,实际输出的状态为 0111111111112,此问题仅在外置麦场景下出现(内置麦正常),并计划在后续版本修复。当前版本建议手动对状态执行 +1 补偿。

要想使用扬声器首先需获取音频焦点:

Terminal window
ros2 service call /audio_5Fmsgs/srv/RequestAudioFocus audio_msgs/srv/RequestAudioFocus "{focus_requester: {pkg_name: audio_examples_sender, priority: 6, priority_weight: 1}}"

pkg_name为ROS包名,意味着音频最后必须以在ROS包工程内的形式进行发送。priority为优先级固定为6即可,priority_weight为音频权重,可设置为1-100任意数。

申请音频焦点后即可将自身TTS返回的pcm数据流通过音频流播放接口进行播放,扬声器播放流式音频具体例程如下,需打包成ros_pkg。

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from builtin_interfaces.msg import Time as TimeMsg
from audio_msgs.msg import AudioPlayback,AudioInfo,AudioData # 需确保 audio msg 可导入
class AudioPlaybackPublisher(Node):
def __init__(self):
super().__init__('audio_playback_publisher')
self.pub = self.create_publisher(AudioPlayback, '/audiohal/audio/playback', 10)
self.timer = self.create_timer(1.0, self.timer_callback) # 每秒发送
def timer_callback(self):
msg = AudioPlayback()
# 填写时间戳(字段名 attachments 中为 stamps)
now = self.get_clock().now().to_msg()
# 兼容性:优先使用 stamps,否则尝试 stamp
if hasattr(msg, 'stamps'):
msg.stamps = now
elif hasattr(msg, 'stamp'):
msg.stamp = now
# 填写 info(AudioInfo)
info = AudioInfo()
info.channels = 1 # 单声道
info.sample_rate = 16000 # 16kHz
info.sample_format = 'S16LE'
info.coding_format = 'pcm'
msg.info = info
# 填写 data(AudioData),示例给一小段原始 bytes
sample_bytes = b'\x00\x01\x02\x03\x04\x05\x06\x07' # 示例数据
msg.data = AudioData(data=list(sample_bytes))
msg.pkg_name = 'audio_examples_sender'
msg.token_id = 'token-0001'
self.pub.publish(msg)
self.get_logger().info(
f'Published AudioPlayback pkg={msg.pkg_name} token={msg.token_id} '
f'data_len={len(msg.data.data)} sample_rate={msg.info.sample_rate}'
)
def main(args=None):
rclpy.init(args=args)
node = AudioPlaybackPublisher()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

以上程序在AimDK开发包agent目录下展示,且程序依赖 python 协议包 a2_aimdk 和 ros2 协议包 ros2_plugin_proto,相应的包已经放置在 AimDK 开发包的 prebuilt 目录下,python 包使用 pip install prebuilt/a2_aimdk-3.0.0-py3-none-any.whl 安装,ros2 包需要 source prebuilt/audio_msgs_proto_aarch64/share/audio_msgs/local_setup.bash后使用。

若 agent 设置为 only_voice 模式,智元还提供唤醒结果上报能力, Topic 接口为 /agent/wakeup/pb_3Aaimdk_2Eprotocol_2EWakeUpResult

获取唤醒结果上报示例程序如下:

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rclpy.qos import (
QoSDurabilityPolicy,
QoSHistoryPolicy,
QoSProfile,
QoSReliabilityPolicy,
)
from ros2_plugin_proto.msg import RosMsgWrapper
from aimdk.protocol_pb2 import WakeUpResult
TOPIC = "/agent/wakeup/pb_3Aaimdk_2Eprotocol_2EWakeUpResult"
class WakeUpSubscriber(Node):
def __init__(self):
super().__init__("wakeup_result_subscriber")
qos_profile = QoSProfile(
history=QoSHistoryPolicy.KEEP_LAST,
depth=10,
reliability=QoSReliabilityPolicy.RELIABLE,
durability=QoSDurabilityPolicy.VOLATILE,
)
self.subscription = self.create_subscription(
RosMsgWrapper,
TOPIC,
self.wakeup_callback,
qos_profile,
)
self.get_logger().info(f"已开始订阅 WakeUpResult: {TOPIC}")
def wakeup_callback(self, msg):
try:
if msg.serialization_type != "pb":
self.get_logger().warn(
f"收到不支持的序列化类型: {msg.serialization_type}"
)
return
# 拼接 bytes
raw_bytes = b"".join(msg.data)
# 解析 protobuf
wakeup_result = WakeUpResult()
wakeup_result.ParseFromString(raw_bytes)
# 日志输出
import json
from google.protobuf.json_format import MessageToDict
self.get_logger().info(
f"WakeUpResult: {json.dumps(MessageToDict(wakeup_result, preserving_proto_field_name=True), ensure_ascii=False, indent=2)}"
)
except Exception as e:
self.get_logger().error(f"解析 WakeUpResult 数据时出现错误: {e}")
def main(args=None):
rclpy.init(args=args)
node = WakeUpSubscriber()
try:
node.get_logger().info("正在监听 WakeUpResult,按 Ctrl+C 退出...")
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info("退出中...")
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

以上程序依赖 python 协议包 a2_aimdk 和 ros2 协议包 ros2_plugin_proto,相应的包已经放置在 AimDK 开发包的 prebuilt 目录下,python 包使用 pip install prebuilt/a2_aimdk-3.0.0-py3-none-any.whl 安装,ros2 包需要 source prebuilt/ros2_plugin_proto_aarch64/share/ros2_plugin_proto/local_setup.bash后使用。

请注意,以上程序会接收 ros2 消息,需要设置如下环境变量:

Terminal window
export ROS_DOMAIN_ID=232
export FASTRTPS_DEFAULT_PROFILES_FILE=/agibot/software/v0/entry/cfg/ros_dds_configuration.xml