WebRTC服务质量(10)- Pacer机制(02) RoundRobinPacketQueue

devtools/2024/12/30 21:52:17/

WebRTC服务质量(01)- Qos概述
WebRTC服务质量(02)- RTP协议
WebRTC服务质量(03)- RTCP协议
WebRTC服务质量(04)- 重传机制(01) RTX NACK概述
WebRTC服务质量(05)- 重传机制(02) NACK判断丢包
WebRTC服务质量(06)- 重传机制(03) NACK找到真正的丢包
WebRTC服务质量(07)- 重传机制(04) 接收NACK消息
WebRTC服务质量(08)- 重传机制(05) RTX机制
WebRTC服务质量(09)- Pacer机制(01) 流程概述
WebRTC服务质量(10)- Pacer机制(02) RoundRobinPacketQueue
WebRTC服务质量(11)- Pacer机制(03) IntervalBudget
WebRTC服务质量(12)- Pacer机制(04) 向Pacer中插入数据

一、前言:

RoundRobinPacketQueue 是Pacer模块当中一个非常重要的循环队列模块,主要目的是在多流场景中,根据每个流的 优先级 以及流内的 RTP 包的 入队顺序类型 和其他因素,动态决定数据包的发送顺序。

二、类关系图:

2.1、类图:

在这里插入图片描述

关键部分加了注释,便于理解:

  1. 多流支持: 每个流按 SSRC(Synchronization Source Identifier,同步信源标识符)进行区分,每个流维护独立的优先级队列。
  2. 多级优先级调度: 不同流间按优先级调度(基于 StreamPrioKey),单个流内的包按严格的队列顺序管理。
  3. 静态与动态属性结合: 例如,属性包括每个包的优先级(priority)、入队时间(enqueue_time)、重传标志、带宽开销等。
  4. 分组调度: 使用轮询(Round-Robin)机制,从不同流根据流的权重和优先级选取包发送,提高音频、视频和反馈等多种类型数据的实时性。

2.2、重要成员关系:

在这里插入图片描述

  1. stream_priorities当中存放StreamPrioKey和ssrc对应关系;
  2. streams当中存放ssrc和Stream的对应关系;
  3. 这样就建立了StreamPrioKey和Stream之间的关系;
  4. 反过来,也可以通过Stream当中的priority_it找到stream_priorities当中的某一项;

小结下:重要成员变量的功能

成员变量功能
streams_保存所有流,键是流的 ssrc,每条流存有独立队列和优先级信息。
stream_priorities_将所有流按照 StreamPrioKey 排列,用于实现流间优先级调度。
enqueue_times_一个多重集合,保存所有包的入队时间,便于快速找到最早入队的包时间。
size_packets_size_分别记录队列中包的个数和总字节数,动态调整。
transport_overhead_per_packet_计算包传输的额外开销(如包头)。
time_last_updated_用于统计队列的更新时间,辅助计算入队和等待时间等。

三、重要函数:

3.1、Push

Push 会根据包的优先级、流的权重、总队列大小,以及包的类型等,将包插入到对应的流(Stream)队列,并更新其他与队列状态关联的元数据。总体思路如下:

处理一个新包时:

  1. 确定该包所属的流,如果不存在该流,则创建一个 Stream 对象。
  2. 将包包装成为 QueuedPacket 并插入到流的优先级队列。
  3. 更新流的优先级键 StreamPrioKey,并在 stream_priorities_ 中重新排序。
  4. 更新队列元数据(如队列包大小、队列时间等)。
void RoundRobinPacketQueue::Push(int priority,Timestamp enqueue_time,uint64_t enqueue_order,std::unique_ptr<RtpPacketToSend> packet) {RTC_DCHECK(packet->packet_type().has_value());if (size_packets_ == 0) {// Single packet fast-path.single_packet_queue_.emplace(QueuedPacket(priority, enqueue_time, enqueue_order,enqueue_times_.end(), std::move(packet)));UpdateQueueTime(enqueue_time);single_packet_queue_->SubtractPauseTime(pause_time_sum_);size_packets_ = 1;size_ += PacketSize(*single_packet_queue_);} else {MaybePromoteSinglePacketToNormalQueue();Push(QueuedPacket(priority, enqueue_time, enqueue_order,enqueue_times_.insert(enqueue_time), std::move(packet)));}
}void RoundRobinPacketQueue::Push(QueuedPacket packet) {auto stream_info_it = streams_.find(packet.Ssrc());if (stream_info_it == streams_.end()) {stream_info_it = streams_.emplace(packet.Ssrc(), Stream()).first;stream_info_it->second.priority_it = stream_priorities_.end();stream_info_it->second.ssrc = packet.Ssrc();}Stream* stream = &stream_info_it->second;if (stream->priority_it == stream_priorities_.end()) {RTC_CHECK(!IsSsrcScheduled(stream->ssrc));stream->priority_it = stream_priorities_.emplace(StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());} else if (packet.Priority() < stream->priority_it->first.priority) {stream_priorities_.erase(stream->priority_it);stream->priority_it = stream_priorities_.emplace(StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());}if (packet.EnqueueTimeIterator() == enqueue_times_.end()) {packet.UpdateEnqueueTimeIterator(enqueue_times_.insert(packet.EnqueueTime()));} else {UpdateQueueTime(packet.EnqueueTime());packet.SubtractPauseTime(pause_time_sum_);size_packets_ += 1;size_ += PacketSize(packet);}stream->packet_queue.push(packet);
}
  • 从streams中找packet所属的Ssrc的stream,如果没有,则在streams中插入一项;
  • 查看stream的priority_it是否等于stream_priorities_的end()
    • 如果相等,则在stream_priorities_中插入一项;
    • 否则,如果新包的优先高(注意,优先级数值越小表示优先级越高),则更新其ssrc对应队列的优先级;
  • 更新队列总时长;
  • 入队时间减去暂停时间(一般不会有暂停);
  • 队列总包数+1;
  • 队列总字节大小 = 包的负载大小+Padding大小;
  • 插入到stream中的packet_queue中;

3.2、Pop

Pop 会轮询不同的流并从当前优先级最高的流中取出一个包发送,同时维护包的发送顺序。总体思路如下:

  1. 调用 GetHighestPriorityStream 获取当前优先级最高的流。
  2. 从该流的优先级队列(PriorityPacketQueue)中取出队首包,并更新流的状态(如剩余大小、时间等)。
  3. 如果该流没有剩余包,删除对应的流优先级键。
std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {if (single_packet_queue_.has_value()) {RTC_DCHECK(stream_priorities_.empty());std::unique_ptr<RtpPacketToSend> rtp_packet(single_packet_queue_->RtpPacket());single_packet_queue_.reset();queue_time_sum_ = TimeDelta::Zero();size_packets_ = 0;size_ = DataSize::Zero();return rtp_packet;}RTC_DCHECK(!Empty());Stream* stream = GetHighestPriorityStream();const QueuedPacket& queued_packet = stream->packet_queue.top();stream_priorities_.erase(stream->priority_it);// Calculate the total amount of time spent by this packet in the queue// while in a non-paused state. Note that the |pause_time_sum_ms_| was// subtracted from |packet.enqueue_time_ms| when the packet was pushed, and// by subtracting it now we effectively remove the time spent in in the// queue while in a paused state.TimeDelta time_in_non_paused_state =time_last_updated_ - queued_packet.EnqueueTime() - pause_time_sum_;queue_time_sum_ -= time_in_non_paused_state;RTC_CHECK(queued_packet.EnqueueTimeIterator() != enqueue_times_.end());enqueue_times_.erase(queued_packet.EnqueueTimeIterator());// Update |bytes| of this stream. The general idea is that the stream that// has sent the least amount of bytes should have the highest priority.// The problem with that is if streams send with different rates, in which// case a "budget" will be built up for the stream sending at the lower// rate. To avoid building a too large budget we limit |bytes| to be within// kMaxLeading bytes of the stream that has sent the most amount of bytes.DataSize packet_size = PacketSize(queued_packet);stream->size =std::max(stream->size + packet_size, max_size_ - kMaxLeadingSize);max_size_ = std::max(max_size_, stream->size);size_ -= packet_size;size_packets_ -= 1;RTC_CHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());std::unique_ptr<RtpPacketToSend> rtp_packet(queued_packet.RtpPacket());stream->packet_queue.pop();// If there are packets left to be sent, schedule the stream again.RTC_CHECK(!IsSsrcScheduled(stream->ssrc));if (stream->packet_queue.empty()) {stream->priority_it = stream_priorities_.end();} else {int priority = stream->packet_queue.top().Priority();stream->priority_it = stream_priorities_.emplace(StreamPrioKey(priority, stream->size), stream->ssrc);}return rtp_packet;
}
  • 获得优先级最高的stream;

  • 从stream的packet_queue当中取出第一个包;

  • 将stream在stream_priorities_中的项删除;

    • 思考下虽然包删除了,但是stream还在,为啥要删除?

      答案:因为stream_priorities_ 是multimap,允许出现相同key,也就是说,这个包没有,它的stream优先级全靠它撑着(因为是先pop优先级高的包),所以,现在不应该由它撑着了,换下一个优先级最高的包撑着。

  • 计算Packet入队后距离现在的时间(不包括暂停时间);

  • 将这段时间从队列的总时间减去;

  • enqueue_times_中将Packet的项删除;

  • 总包数减去1;

  • 总字节数减去包的字节数;

  • 将包从stream中的queue中弹出;

  • 如果stream中的队列为空,则令stream的priority_it指向stream_priorities_end();

  • 否则,从stream队列头部取出Packet,将该Packet的priority插入到stream_priorities_中;

四、优先级调度:

  • 每个流有单独的优先级队列(PriorityPacketQueue),保存 QueuedPacket 对象。

  • QueuedPacket 是一个包装类,表示单个 RTP 数据包及其附属信息(例如入队时间、优先级、是否为重传包等)。

  • 列表中的流按 StreamPrioKey 保存在 stream_priorities_ 中,通过此键决定流次序:

    struct StreamPrioKey {int priority;    // 流的优先级,数值越低,优先级越高DataSize size;   // 数据包大小,用于平衡负载
    };
    

    优先规则:优先级低(priority 值小) > 数据包大小小(size 值小)。

五、轮询调度:

  • 核心逻辑通过 RoundRobin 的方式轮询多个流。但由于流可能有不同的优先级,某些流会被更多次轮询到。
  • 函数 GetHighestPriorityStream 定位当前最高优先级的流,从流对应的队列中取得包然后发送。

六、Stream定义与管理:

每个流由 Stream 类表示,Stream 是该流独有的数据结构,包括:

  • 当前队列状态,如总字节数、包大小和优先级。
  • 内部维护单独的优先级队列(PriorityPacketQueue)。
  • 调度时实时更新优先级,确保新加入高优先级包时能调整队列次序。

七、其他特性:

7.1、重传支持:

  • 标记是否重传的包,在出队时可能依据该标记进行特殊处理。

7.2、时间相关:

  • queue_time_sum_pause_time_sum_ 用于统计包在队列中的存留时间,这对于带宽控制和流量管理很有用。
  • 提供接口如 AverageQueueTime 计算平均队列时间,用于监控流的实时性。

7.3、队列字节限制:

队列有最大可存储的字节数(max_size_),以防止占用过多资源。

7.4、暂停/恢复功能:

可以通过 SetPauseState 暂停或者恢复队列处理。劝你最好别用!!!

八、总结:

RoundRobinPacketQueue 是一个高效的多流、多优先级调度队列,适用于 RTP 媒体数据的分组发送场景。它通过流内、流间的双重调度机制,结合优先级动态提升、统计队列时间和暂停控制等特性,确保在带宽有限的网络环境中最大程度提高数据的实时性和发送效率,是 WebRTC Pacer 模块的核心部分。


http://www.ppmy.cn/devtools/146386.html

相关文章

读书笔记~管理修炼-缄默效应

缄默效应&#xff1a;学会正确批评下属 员工明明犯了错误&#xff0c;却不及时告知你&#xff0c;总是拖到最后一刻无法弥补时才不得不承认出了问题——你遇到过这样的问题吗&#xff1f; 这其实是缄默效应在发挥作用。 在职场中&#xff0c;即使再扁平化的环境&…

接口自动化测试完整版(文档+视频)

1. 什么是接口测试 顾名思义&#xff0c;接口测试是对系统或组件之间的接口进行测试&#xff0c;主要是校验数据的交换&#xff0c;传递和控制管理过程&#xff0c;以及相互逻辑依赖关系。其中接口协议分为HTTP,WebService,Dubbo,Thrift,Socket等类型&#xff0c;测试类型又主…

【Golang 面试题】每日 3 题(八)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/UWz06 &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 Golang 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞&#x1f44d;收藏…

设计模式之外观模式:从电脑组装到系统架构的简化之道

~犬&#x1f4f0;余~ “我欲贱而贵&#xff0c;愚而智&#xff0c;贫而富&#xff0c;可乎&#xff1f; 曰&#xff1a;其唯学乎” 一、外观模式概述 \quad 在软件开发中&#xff0c;我们经常会遇到一些复杂的系统&#xff0c;这些系统可能包含许多子系统和组件。直接使用这些子…

网络安全公司150强

本清单主要列出专门或主要专注于网络安全的公司。 公司名称 业务描述 1Password 企业密码安全管理 A10 Networks 安全云应用服务 Abnormal Security 云原生邮件安全 Absolute 终端防护平台 Agari 电子邮件和网络钓鱼威胁防护 Aqua Security 云原生应用保护 Arcse…

Express.js 有哪些常用的中间件?

在使用 Express.js 开发应用程序时&#xff0c;中间件&#xff08;Middleware&#xff09;是处理请求和响应的关键组件。它们可以执行各种任务&#xff0c;如解析请求体、添加HTTP头部、记录日志等。以下是一些常用的中间件&#xff1a; body-parser 用于解析传入的请求体。它…

光滑曲线弧长公式的推导

前言 本文将介绍如何用定积分计算空间中一段光滑曲线的弧长。首先我们会给出光滑曲线以及曲线弧长的定义&#xff0c;然后从定义出发&#xff0c;用求黎曼和的思想推导出弧长的计算公式。 光滑曲线的定义 设平面曲线的参数方程为 { x x ( t ) , y y ( t ) , t ∈ [ T 1 , …

css一道光闪过动效

展示效果 <div class"box">slogan </div> css部分 .box {position: relative;width: 183px;height: 22px;border-radius: 6px 0 6px 0;background-color: #005ed9;overflow: hidden;display: flex;align-items: center;justify-content: center;font-s…