需求背景:通过whip的方式推送流式的实时音频流到流媒体服务器平台(基于srs服务器改造的平台)数据传递采用48khz、16bit、双声道音频流,将需要发送的数据,从16khz、16bit、单声道音频流转换成所需传递的格式,并对源音频流进行大小切割,阻塞限速,按照实际播放器的一倍速进行推送;
核心代码:
1:将音频流转换成可传递的音频帧
async def bytes_to_audio_frames(self, byte_array, num_samples_per_frame=320):encode_array = base64.b64decode(byte_array)audio_data = np.frombuffer(encode_array, dtype=np.int16)samples = int(AUDIO_PTIME * self.sample_rate)num_samples = len(audio_data)logger.debug(f"stmId={self.manager.stmId},bytes_to_audio_frames")for i in range(0, num_samples, num_samples_per_frame):frame_samples = audio_data[i:i + num_samples_per_frame]frame = av.AudioFrame(samples=len(frame_samples), layout='mono', format='s16')frame.planes[0].update(frame_samples.tobytes())frame.rate = self.sample_rateframe.pts = self.last_pts * samples# 将帧放入播放队列self.playback_queue.put_nowait(frame)self.last_pts += 1
2:按照实际一倍速阻塞推,同时添加到轨道中。原理:以第一帧时间为基准时间,根据音频的采样率等信息,预估一个下一帧的到达时间,通过比对下一帧的实际到达时间跟预估时间差值,决定等待时长,每一帧的等待时间都是不一样的,这样才能达到音频播放最优效果;
async def recv(self):if not self.playback_queue.empty():# 异步地从 playback_queue 中获取音频帧try:frame = await self.playback_queue.get() # 异步获取一帧音频数据samples = int(AUDIO_PTIME * self.sample_rate)if hasattr(self, "_timestamp"):self._timestamp += sampleswait = self._start + (self._timestamp / self.sample_rate) - time.time()logger.info(f"stmId={self.manager.stmId},"f"音频帧时间戳(累加字节数)={self._timestamp},"f"该帧期望到达时间:{(self._start + (self._timestamp / self.sample_rate)) * 1000}ms,"f"此刻到达时间戳:{time.time() * 1000}ms,"f"实际音频需等待时间{wait * 1000}ms,"f"frame: {frame}")await asyncio.sleep(wait)else:self._start = time.time()self._timestamp = 0logger.info(f"stmId={self.manager.stmId},第一帧初始化,"f"音频帧时间戳(累加字节数)={self._timestamp},"f"该帧期望到达时间:{(self._start + (self._timestamp / self.sample_rate)) * 1000}ms,"f"此刻到达时间戳:{time.time() * 1000}ms")return frameexcept asyncio.QueueEmpty:logger.error(f"stmId={self.manager.stmId},recv音频异常时,修改manager.flag=False,触发关闭连接")self.manager.flag = Falsereturn Noneelse:logger.info(f"stmId={self.manager.stmId},recv检测无后续音频,修改manager.flag=False,触发关闭连接")self.manager.flag = Falsereturn None
3:通道及轨道关闭。如果音频推送完成,通过回调函数告知程序,程序关闭对应轨道及通道
@propertydef flag(self):return self._flag@flag.setterdef flag(self, value):if self._flag != value:self._flag = valueif self._flag is False:logger.info(f"stmId={self.stmId},音频帧推送完毕,触发关闭事件")self.pushEndStatus()asyncio.create_task(self.close_connection())async def close_connection(self):try:if self.pc:await self.pc.close()del self.pushManager.connections[self.stmId]logger.info(f"通道关闭,移除{self.stmId}绑定关系,关闭后剩余列表为:{self.pushManager.connections}")self.pc = Noneself.is_connected = Falselogger.info(f"function=close_connection,stmId={self.stmId},push连接已关闭")else:logger.info(f"stmId={self.stmId},尝试关闭连接时 self.pc 为 None")except Exception as e:logger.error(f"stmId={self.stmId},关闭连接时发生错误: {e}")
4:轨道建立代码
async def push_run(self, url, audioData, stmId, manager):if self.pc is None:rtc_conf = RTCConfiguration()rtc_conf.iceServers = []self.pc = RTCPeerConnection(rtc_conf)logger.debug(f"stmId={stmId}, audioData: {audioData}")logger.info(f"push通道创建RTCPeerConnection, url: {url}, stmId:{stmId}")if self.audio_track is None:self.audio_track = AudioTrack(manager)await self.audio_track.add_audio_data(audioData)self.pc.addTransceiver(self.audio_track, "sendonly")# 绑定事件到onicecandidateself.pc.onicecandidate = lambda candidate: asyncio.create_task(send_candidate(candidate))try:offer = await self.pc.createOffer()await self.pc.setLocalDescription(offer)answer = await send_sdp(offer, url)if answer:await self.pc.setRemoteDescription(answer)self.is_connected = True # 更新连接状态logger.info(f"stmId={stmId}-push收到有效offer,设置远程描述成功")else:logger.info(f"stmId={stmId}收到无效answer")returnexcept Exception as e:logger.error(f"push通道建立连接过程中发生错误: {e}")returnself.pc.on("connectionstatechange", self.on_connection_state_change)else:logger.info(f"push通道的RTCPeerConnection 已存在,准备添加数据: {audioData}")await self.audio_track.add_audio_data(audioData)async def send_sdp(e_sdp, url):async with aiohttp.ClientSession() as session:async with session.post(url,data=e_sdp.sdp.encode(),headers={"Content-Type": "application/sdp","Content-Length": str(len(e_sdp.sdp))},ssl=False) as response:response_data = await response.text()return RTCSessionDescription(sdp=response_data, type='answer')async def send_candidate(candidate):if candidate:logger.info(f"收集到的候选: {candidate}")