pytthon实现webrtc通过whip推送实时流式音频流

ops/2025/1/18 10:08:45/

需求背景:通过whip的方式推送流式的实时音频流到流媒体服务器平台(基于srs服务器改造的平台)数据传递采用48khz、16bit、双声道音频流,将需要发送的数据,从16khz、16bit、单声道音频流转换成所需传递的格式,并对源音频流进行大小切割,阻塞限速,按照实际播放器的一倍速进行推送;

核心代码:

1:将音频流转换成可传递的音频帧

    async def bytes_to_audio_frames(self, byte_array, num_samples_per_frame=320):encode_array = base64.b64decode(byte_array)audio_data = np.frombuffer(encode_array, dtype=np.int16)samples = int(AUDIO_PTIME * self.sample_rate)num_samples = len(audio_data)logger.debug(f"stmId={self.manager.stmId},bytes_to_audio_frames")for i in range(0, num_samples, num_samples_per_frame):frame_samples = audio_data[i:i + num_samples_per_frame]frame = av.AudioFrame(samples=len(frame_samples), layout='mono', format='s16')frame.planes[0].update(frame_samples.tobytes())frame.rate = self.sample_rateframe.pts = self.last_pts * samples# 将帧放入播放队列self.playback_queue.put_nowait(frame)self.last_pts += 1

2:按照实际一倍速阻塞推,同时添加到轨道中。原理:以第一帧时间为基准时间,根据音频的采样率等信息,预估一个下一帧的到达时间,通过比对下一帧的实际到达时间跟预估时间差值,决定等待时长,每一帧的等待时间都是不一样的,这样才能达到音频播放最优效果;

  async def recv(self):if not self.playback_queue.empty():# 异步地从 playback_queue 中获取音频帧try:frame = await self.playback_queue.get()  # 异步获取一帧音频数据samples = int(AUDIO_PTIME * self.sample_rate)if hasattr(self, "_timestamp"):self._timestamp += sampleswait = self._start + (self._timestamp / self.sample_rate) - time.time()logger.info(f"stmId={self.manager.stmId},"f"音频帧时间戳(累加字节数)={self._timestamp},"f"该帧期望到达时间:{(self._start + (self._timestamp / self.sample_rate)) * 1000}ms,"f"此刻到达时间戳:{time.time() * 1000}ms,"f"实际音频需等待时间{wait * 1000}ms,"f"frame: {frame}")await asyncio.sleep(wait)else:self._start = time.time()self._timestamp = 0logger.info(f"stmId={self.manager.stmId},第一帧初始化,"f"音频帧时间戳(累加字节数)={self._timestamp},"f"该帧期望到达时间:{(self._start + (self._timestamp / self.sample_rate)) * 1000}ms,"f"此刻到达时间戳:{time.time() * 1000}ms")return frameexcept asyncio.QueueEmpty:logger.error(f"stmId={self.manager.stmId},recv音频异常时,修改manager.flag=False,触发关闭连接")self.manager.flag = Falsereturn Noneelse:logger.info(f"stmId={self.manager.stmId},recv检测无后续音频,修改manager.flag=False,触发关闭连接")self.manager.flag = Falsereturn None

3:通道及轨道关闭。如果音频推送完成,通过回调函数告知程序,程序关闭对应轨道及通道

    @propertydef flag(self):return self._flag@flag.setterdef flag(self, value):if self._flag != value:self._flag = valueif self._flag is False:logger.info(f"stmId={self.stmId},音频帧推送完毕,触发关闭事件")self.pushEndStatus()asyncio.create_task(self.close_connection())async def close_connection(self):try:if self.pc:await self.pc.close()del self.pushManager.connections[self.stmId]logger.info(f"通道关闭,移除{self.stmId}绑定关系,关闭后剩余列表为:{self.pushManager.connections}")self.pc = Noneself.is_connected = Falselogger.info(f"function=close_connection,stmId={self.stmId},push连接已关闭")else:logger.info(f"stmId={self.stmId},尝试关闭连接时 self.pc 为 None")except Exception as e:logger.error(f"stmId={self.stmId},关闭连接时发生错误: {e}")

4:轨道建立代码

    async def push_run(self, url, audioData, stmId, manager):if self.pc is None:rtc_conf = RTCConfiguration()rtc_conf.iceServers = []self.pc = RTCPeerConnection(rtc_conf)logger.debug(f"stmId={stmId}, audioData: {audioData}")logger.info(f"push通道创建RTCPeerConnection, url: {url}, stmId:{stmId}")if self.audio_track is None:self.audio_track = AudioTrack(manager)await self.audio_track.add_audio_data(audioData)self.pc.addTransceiver(self.audio_track, "sendonly")# 绑定事件到onicecandidateself.pc.onicecandidate = lambda candidate: asyncio.create_task(send_candidate(candidate))try:offer = await self.pc.createOffer()await self.pc.setLocalDescription(offer)answer = await send_sdp(offer, url)if answer:await self.pc.setRemoteDescription(answer)self.is_connected = True  # 更新连接状态logger.info(f"stmId={stmId}-push收到有效offer,设置远程描述成功")else:logger.info(f"stmId={stmId}收到无效answer")returnexcept Exception as e:logger.error(f"push通道建立连接过程中发生错误: {e}")returnself.pc.on("connectionstatechange", self.on_connection_state_change)else:logger.info(f"push通道的RTCPeerConnection 已存在,准备添加数据: {audioData}")await self.audio_track.add_audio_data(audioData)async def send_sdp(e_sdp, url):async with aiohttp.ClientSession() as session:async with session.post(url,data=e_sdp.sdp.encode(),headers={"Content-Type": "application/sdp","Content-Length": str(len(e_sdp.sdp))},ssl=False) as response:response_data = await response.text()return RTCSessionDescription(sdp=response_data, type='answer')async def send_candidate(candidate):if candidate:logger.info(f"收集到的候选: {candidate}")


http://www.ppmy.cn/ops/151061.html

相关文章

使用 Java 操作 Excel 的实用教程

💖 欢迎来到我的博客! 非常高兴能在这里与您相遇。在这里,您不仅能获得有趣的技术分享,还能感受到轻松愉快的氛围。无论您是编程新手,还是资深开发者,都能在这里找到属于您的知识宝藏,学习和成长…

用户中心项目教程(一)--Ant design pro初始化的学习和使用

文章目录 1.项目定位2.项目开发流程3.需求分析4.技术选型5.Ant design pro初始化5.1快速使用5.2初始化过程 6.项目依赖的报错处理6.1项目出现的问题6.2怎么查看问题6.3怎么解决报错6.4关于pnpm的安装 7.项目启动和运行7.1项目如何启动7.2双击跳转7.3登录和注册7.4页面分析7.5关…

Vue2+OpenLayers实现点位拖拽功能(提供Gitee源码)

目录 一、案例截图 二、安装OpenLayers库 三、代码实现 3.1、初始化变量 3.2、创建一个点 3.3、将点添加到地图上 3.4、实现点位拖拽 3.5、完整代码 四、Gitee源码 一、案例截图 可以随意拖拽点位到你想要的位置 二、安装OpenLayers库 npm install ol 三、代码实现…

Red Hat8:搭建FTP服务器

一、匿名FTP访问 1、搭建yum源 2、安装VSFTPD 3、 打开配置文件 4、设置配置文件如下几个参数 5、重启vsftpd服务 6、进入图形化界面配置网络 添加IP地址 激活连接 7、查看IP地址 7、没有ftp命令就安装一个 8、遇到拒绝连接 (1)关闭防火墙 &#x…

分布式CAP理论介绍

分布式CAP理论是分布式系统设计中的一个核心概念,由加州大学伯克利分校的Eric Brewer教授在2000年的ACM研讨会上首次提出,随后在2002年由Seth Gilbert和Nancy Lynch从理论上证明。以下是对分布式CAP理论的详细剖析: 文章目录 一、CAP理论的基本概念二、CAP理论的取舍策略三、…

Git版本控制 – 创建和维护项目Repository

Git版本控制 – 创建和维护项目Repository Version Control by Git - Create and Maintain a Repository for a Project 1. 本地数据 a. 创建本地项目项目文件夹 b. 初始化本地Git环境 c. 执行相应Git命令 2. 远程数据Repository a. 创建remote 的Repository 1&#xff09…

使用nginx搭建通用的图片代理服务器,支持http/https/重定向式图片地址

从http切换至https 许多不同ip的图片地址需要统一进行代理 部分图片地址是重定向地址 nginx配置 主站地址:https://192.168.123.100/ 主站nginx配置 server {listen 443 ssl;server_name localhost;#ssl证书ssl_certificate ../ssl/ca.crt; #私钥文件ssl_ce…

生成树机制实验

1 实验内容 1、基于已有代码,实现生成树运行机制,对于给定拓扑(four_node_ring.py),计算输出相应状态下的生成树拓扑 2、构造一个不少于7个节点,冗余链路不少于2条的拓扑,节点和端口的命名规则可参考four_node_ring.py,使用stp程序计算输出生成树拓扑 2 实验原理 一、…