WebRTC服务质量(09)- Pacer机制(01) 流程概述

news/2024/12/24 8:33:29/

一、前言:

Pacer 是一种数据发送调度机制。它的主要功能是根据网络带宽限制、网络拥塞控制的反馈以及媒体的发送策略,对数据包的发送进行适配和节奏调度,以避免网络拥塞、减少丢包并保证流媒体传输的平滑性。

二、核心概念:

2.1 Pacer 的作用:

  • 流量平滑:音频、视频等媒体数据包可能在生成时间上分布不均,通过 Pacer,可以将这些数据包的发送节奏调整成更均匀的形式,避免突发流量冲击网络
  • 速率控制:Pacer 会根据传输码率(由拥塞控制算法提供,例如 Google 的 Congestion Control),动态调整包的发送速率,确保发送端不超过网络容量。
  • 优先级处理:Pacer 支持区分不同的流类型(例如音频、视频、FEC 冗余流等),为高优先级流(如音频)留出更多资源或者保证其更低的延迟。

2.2 Pacer 的工作流程:

  1. 音视频流媒体按照编码器的设定发送数据包。
  2. 数据包会先进入一个队列(Packet Queue)。
  3. Pacer 通过定时器或事件驱动机制,根据当前允许的发送速率(由拥塞控制模块动态提供)从队列中提取数据包发送。
  4. 如果发送速率受限,多余的包会继续留在队列中等待下一轮调度。

以下是简化的机制模型:

[Media Encoder] -> [Packet Queue] -> Pacer -> [Network Sender]

三、核心代码:

先想想,根据前面的概念阐述,如果要让你写,你怎么写呢?是不是重点实现以下几个模块:

  • packet_router.h/cpp:负责将数据包分发到具体的网络发送端。
  • packet_sender.h/cpp:实现了调度核心逻辑。
  • packet_queue.h/cpp:实现用于存储待发送数据包的队列。

现在具体看看webrtc如何做的。

3.1、Pacer对象创建:

  • PeerConnection当中创建Call对象;

    // 从上到下调用顺序如下:
    PeerConnectionFactory::CreatePeerConnectionOrError -->
    PeerConnectionFactory::CreateCall_w -->
    CallFactory::CreateCall --> 
    Call::Create
    
  • Call对象创建RtpTransportControllerSend:

    创建Call时候首先得创建Call::Config,便于以后创建Pacer对象时候作为参数;

    Call* Call::Create(const Call::Config& config,Clock* clock,rtc::scoped_refptr<SharedModuleThread> call_thread,std::unique_ptr<ProcessThread> pacer_thread) {RTC_DCHECK(config.task_queue_factory);return new internal::Call(clock, config,std::make_unique<RtpTransportControllerSend>(clock, config.event_log, config.network_state_predictor_factory,config.network_controller_factory, config.bitrate_config,std::move(pacer_thread), config.task_queue_factory, config.trials),std::move(call_thread), config.task_queue_factory);
    }
    

    这儿的config.trials,会导致后面创建PacedSender对象的时候,处理模式为kPeriodic,也就是周期处理模式;

  • RtpTransportControllerSend创建PacerSender对象:

    RtpTransportControllerSend 是 WebRTC 中负责发送侧 RTP 流量控制的核心模块。它是 RTP 传输的控制器,负责管理和协调发送侧的各种功能,包括比特率管理(带宽估算 BWE)、节奏控制(Pacer)、网络状态预测和反馈处理等。我们先看看构造函数:

    RtpTransportControllerSend::RtpTransportControllerSend(Clock* clock,webrtc::RtcEventLog* event_log,NetworkStatePredictorFactoryInterface* predictor_factory,NetworkControllerFactoryInterface* controller_factory,const BitrateConstraints& bitrate_config,std::unique_ptr<ProcessThread> process_thread,TaskQueueFactory* task_queue_factory,const WebRtcKeyValueConfig* trials): clock_(clock),event_log_(event_log),bitrate_configurator_(bitrate_config),process_thread_started_(false),process_thread_(std::move(process_thread)),use_task_queue_pacer_(IsEnabled(trials, "WebRTC-TaskQueuePacer")),process_thread_pacer_(use_task_queue_pacer_? nullptr: new PacedSender(clock,&packet_router_,event_log,trials,process_thread_.get())),task_queue_pacer_(use_task_queue_pacer_? new TaskQueuePacedSender(clock,&packet_router_,event_log,trials,task_queue_factory,/*hold_back_window = */ PacingController::kMinSleepTime): nullptr),observer_(nullptr),controller_factory_override_(controller_factory),controller_factory_fallback_(std::make_unique<GoogCcNetworkControllerFactory>(predictor_factory)),process_interval_(controller_factory_fallback_->GetProcessInterval()),last_report_block_time_(Timestamp::Millis(clock_->TimeInMilliseconds())),reset_feedback_on_route_change_(!IsEnabled(trials, "WebRTC-Bwe-NoFeedbackReset")),send_side_bwe_with_overhead_(!IsDisabled(trials, "WebRTC-SendSideBwe-WithOverhead")),add_pacing_to_cwin_(IsEnabled(trials, "WebRTC-AddPacingToCongestionWindowPushback")),relay_bandwidth_cap_("relay_cap", DataRate::PlusInfinity()),transport_overhead_bytes_per_packet_(0),network_available_(false),retransmission_rate_limiter_(clock, kRetransmitWindowSizeMs),task_queue_(task_queue_factory->CreateTaskQueue("rtp_send_controller",TaskQueueFactory::Priority::NORMAL)) {ParseFieldTrial({&relay_bandwidth_cap_},trials->Lookup("WebRTC-Bwe-NetworkRouteConstraints"));initial_config_.constraints = ConvertConstraints(bitrate_config, clock_);initial_config_.event_log = event_log;initial_config_.key_value_config = trials;RTC_DCHECK(bitrate_config.start_bitrate_bps > 0);pacer()->SetPacingRates(DataRate::BitsPerSec(bitrate_config.start_bitrate_bps), DataRate::Zero());if (absl::StartsWith(trials->Lookup("WebRTC-LazyPacerStart"), "Disabled")) {EnsureStarted();}
    }
    

    我们分段看下,主要分为四大步:

    (1) Pacer 类型选择

    构造函数根据配置决定使用 TaskQueuePacedSender 或传统的 PacedSender

    use_task_queue_pacer_(IsEnabled(trials, "WebRTC-TaskQueuePacer")),
    process_thread_pacer_(use_task_queue_pacer_? nullptr: new PacedSender(clock, &packet_router_, event_log, trials, process_thread_.get())),
    task_queue_pacer_(use_task_queue_pacer_? new TaskQueuePacedSender(clock, &packet_router_, event_log, trials,task_queue_factory,/*hold_back_window=*/PacingController::kMinSleepTime): nullptr),
    
    • 如果 WebRTC-TaskQueuePacer 被启用,task_queue_pacer_ 将被初始化,表示使用 TaskQueue 驱动的实现。
    • 如果未启用,则使用传统的 PacedSender,并传入线程 process_thread_ 用于驱动调度。
    (2) 任务队列初始化

    任务队列 task_queue_ 的创建逻辑:

    task_queue_(task_queue_factory->CreateTaskQueue("rtp_send_controller",TaskQueueFactory::Priority::NORMAL))
    
    • rtp_send_controller 创建了一个中等优先级的任务队列。这意味着发送侧 RTP 控制器的任务拥有一定优先,但不会抢占核心实时任务(如音频编码)。
    (3) 启动策略

    根据 WebRTC-LazyPacerStart 配置决定是否延迟启动 Pacer:

    if (absl::StartsWith(trials->Lookup("WebRTC-LazyPacerStart"), "Disabled")) {EnsureStarted();
    }
    
    • “Lazy Start” 功能通常用于优化资源启动时的开销。如果禁用延迟启动,则直接调用 EnsureStarted() 启动所有功能。
    (4) Pacer 初始化

    为 Pacer 设置初始的码率:

    pacer()->SetPacingRates(DataRate::BitsPerSec(bitrate_config.start_bitrate_bps), DataRate::Zero());
    
    • SetPacingRates 需要传入目标的比特率(start_bitrate_bps)和填充数据的比特率(这里为 0)。

3.2、PacedSender:

PacedSender 是 WebRTC 中最核心的 Pacer 调度类,负责实现基于速率的节奏化数据发送。

PacedSender::PacedSender(Clock* clock,PacketRouter* packet_router,RtcEventLog* event_log,const WebRtcKeyValueConfig* field_trials,ProcessThread* process_thread): process_mode_( (field_trials != nullptr &&absl::StartsWith(field_trials->Lookup("WebRTC-Pacer-DynamicProcess"), "Enabled"))? PacingController::ProcessMode::kDynamic: PacingController::ProcessMode::kPeriodic),pacing_controller_(clock, // 构造 PacingControllerpacket_router,event_log,field_trials,process_mode_),clock_(clock),process_thread_(process_thread) {if (process_thread_)process_thread_->RegisterModule(&module_proxy_, RTC_FROM_HERE);
}

PacedSender 主要干两件事:

  • 设置PacedSender 处理模式为周期模式ProcessMode::kPeriodic (field_trials带回来的);
  • 创建了一个PacingController对象到pacing_controller_ ,Pacer模块中的主要逻辑都是由PacingController完成的;

构建好PacedSenderPacingCOntroller对象之后,最后启动Pacer线程,以后所有需要发送的数据包,都需要通过PacingControllerProcessPackets去发送。

3.3、两个重要函数:

1)PacingController::NextSendTime():

Pacing模块当中,获取每次执行的时间(5ms)

Timestamp PacingController::NextSendTime() const {// 。。。// kPeriodic模式(其实目前也仅仅支持这种周期模式)if (mode_ == ProcessMode::kPeriodic) {// In periodic non-probing mode, we just have a fixed interval.// 在周期性非探测模式下,我们只有一个固定间隔。就是使用上次处理的时间,加上min_packet_limit_(默认是5ms)return last_process_time_ + min_packet_limit_;}// ...return last_process_time_ + kPausedProcessInterval;
}

2)PacingController::ProcessPackets():

周期发送数据包。这个函数同样很长,我只保留关键逻辑:

  • 关键逻辑:
void PacingController::ProcessPackets() {// 动态模式目前版本不会用到(现在只用到周期模式)if (mode_ == ProcessMode::kDynamic) {// 。。。}// 判断当前是否应该发送保活包(当没有发送音视频数据的是时候,应该发送保活)if (ShouldSendKeepalive(now)) {// 。。。}// 如果处于暂停状态(重新进行媒体协商的时候,需要置为暂停状态),不能发送音视频数据包if (paused_) {return;}// 如果逝去的时间大于0(说明已经过了一段时间了),为media budget设置目标码率// webrtc是通过media budget设置目标码率的,ProcessPackets 正是根据media budget控制码流大小的// 如果发现media budget是500kbps,那么每次处理就是按照500kbps进行发送if (elapsed_time > TimeDelta::Zero()) {// 将前面设置给pacer的目标码率设置给media_budget}// 是否需要探测带宽码率,这块主要根据拥塞控制来if (is_probing) {// 获得pacing_info}// The paused state is checked in the loop since it leaves the critical// section allowing the paused state to be changed from other code.// 核心逻辑while (!paused_) { // 是否处于暂停状态// 下面单独分析}if (is_probing) {// 发送探测包}
}
  • 核心逻辑:(同样因为代码很多,只保留关键逻辑)
  // 核心逻辑while (!paused_) { // 是否处于暂停状态// Fetch the next packet, so long as queue is not empty or budget is not// exhausted.// 从Packet队列( RoundRobinPacketQueue packet_queue_ )中,取出我们要发送的packet// 如果我们获取到有数据的packet,那么就发送出去,但是如果我们没有获取到有数据的packet,rtp_packet为null,又分为两种情况:// 1.packet队列的确没有音视频数据包,这个时候,我们应该发送padding,让底层网络给我们将带宽留着//   否则,底层网络会将带宽进行回收,等到真正需要发送数据的时候,没有带宽,会造成延迟;// 2.有数据,但是我们现在已经发送的数据,达到了我们设定的目标码流,也不能让发送了。std::unique_ptr<RtpPacketToSend> rtp_packet =GetPendingPacket(pacing_info, target_send_time, now);if (rtp_packet == nullptr) {// 没有获取到数据包// 1、检查是否应该发送padding包,如果需要,将Padding包插入队列,并重新执行while循环。// 2、如果不需要,则退出while循环,说明这次不能再发送数据了。DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);if (padding_to_add > DataSize::Zero()) {std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =packet_sender_->GeneratePadding(padding_to_add);if (padding_packets.empty()) {// No padding packets were generated, quite send loop.break;}for (auto& packet : padding_packets) {EnqueuePacket(std::move(packet));}// Continue loop to send the padding that was just added.continue;}// Can't fetch new packet and no padding to send, exit send loop.break;}// 否则,说明我们获取了一个packet,那么计算packet的payload 大小是多少字节const RtpPacketMediaType packet_type = *rtp_packet->packet_type();DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() +rtp_packet->padding_size());// 将Packet发送出去packet_sender_->SendPacket(std::move(rtp_packet), pacing_info);for (auto& packet : packet_sender_->FetchFec()) {EnqueuePacket(std::move(packet));}data_sent += packet_size;// Send done, update send/process time to the target send time.// 记录发送的packet大小和时间(下次的发送时间就是根据我们这次更新的时间计算出来的)OnPacketSent(packet_type, packet_size, target_send_time);}

核心逻辑就是发数据包,有媒体数据就考虑发送媒体数据,没有就发送填充数据,进行保活,当然,发送两者的时候都应该考虑目标码率,已经超过我们设置的目标码率就等等再发。

  • 获取待发送的数据包:

    看看上面获取数据包的函数GetPendingPacket究竟干了啥:

    std::unique_ptr<RtpPacketToSend> PacingController::GetPendingPacket(const PacedPacketInfo& pacing_info,Timestamp target_send_time,Timestamp now) {if (packet_queue_.Empty()) {return nullptr;}// First, check if there is any reason _not_ to send the next queued packet.// Unpaced audio packets and probes are exempted from send checks.// 不需要平滑处理的特殊情况:// pace_audio_ 为false,说明音频包不需要平滑处理,并且,// packet_queue_.LeadingAudioPacketEnqueueTime().has_value() 说明队列中下一个数据包是音频包bool unpaced_audio_packet =!pace_audio_ && packet_queue_.LeadingAudioPacketEnqueueTime().has_value();bool is_probe = pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe;if (!unpaced_audio_packet && !is_probe) {// 网络拥塞状况下不能发送if (Congested()) {// Don't send anything if congested.return nullptr;}// 没有拥塞,会检测模式是否为周期模式if (mode_ == ProcessMode::kPeriodic) {// media_budget_ 中可供我们使用的字节数不大于0,表示我们已经达到了目标码率,我们也不发送if (media_budget_.bytes_remaining() <= 0) {// Not enough budget.return nullptr;}} else {// 动态模式现在还不成熟,不关心}}// 如果不是上面的情况,那么,我们需要从队列中pop出一个packet,让ProcessPackets函数将其发送出去return packet_queue_.Pop();
    }

    主要就是先做了一些发送状态的校验:网络拥塞情况下不能发送、达到目标码率了也不能发送,如果校验通过可以发送,就从队列中取出一个数据包进行发送。

四、总结:

本文主要介绍了Pacer模块的作用,以及相关重要的框架代码,其实还有很多细节后面再打开看看。


http://www.ppmy.cn/news/1557692.html

相关文章

每天40分玩转Django:Django文件上传

Django文件上传 一、今日学习内容概述 学习模块重要程度主要内容基础文件上传⭐⭐⭐⭐⭐文件字段、基本配置自定义存储⭐⭐⭐⭐⭐存储后端、云存储集成文件处理⭐⭐⭐⭐图片处理、文件验证异步上传⭐⭐⭐⭐AJAX上传、进度显示 二、模型和表单设计 # models.py from django.…

Android OpenGLES2.0开发(十):FBO离屏渲染

人生是一场单程的旅行&#xff0c;即使有些遗憾我们也没有从头再来的机会&#xff0c;与其纠结无法改变的过去不如微笑着珍惜未来。 Android OpenGLES开发&#xff1a;EGL环境搭建Android OpenGLES2.0开发&#xff08;一&#xff09;&#xff1a;艰难的开始Android OpenGLES2.0…

面向对象 类函数的区别 实例方法 类方法 静态方法 抽象方法

前言&#xff1a;面向对象类方法的说明&#xff1a; 实例方法 定义&#xff1a;实例方法是在类中定义的&#xff0c;用于操作类的实例&#xff08;对象&#xff09;的属性和行为的方法。它的第一个参数通常是self&#xff08;在 Python 中&#xff09;或this&#xff08;在 Jav…

#渗透测试#漏洞挖掘#红蓝攻防#护网#sql注入介绍06-基于子查询的SQL注入(Subquery-Based SQL Injection)

免责声明 本教程仅为合法的教学目的而准备&#xff0c;严禁用于任何形式的违法犯罪活动及其他商业行为&#xff0c;在使用本教程前&#xff0c;您应确保该行为符合当地的法律法规&#xff0c;继续阅读即表示您需自行承担所有操作的后果&#xff0c;如有异议&#xff0c;请立即停…

基于字节大模型的论文翻译(含免费源码)

基于字节大模型的论文翻译 源代码&#xff1a; &#x1f44f; star ✨ https://github.com/boots-coder/LLM-application 展示 项目简介 本项目是一个基于大语言模型&#xff08;Large Language Model, LLM&#xff09;的论文阅读与翻译辅助工具。它通过用户界面&#xff08…

【Django篇】--动手实践Django基础知识

一、url视图映射 在url.py中定义两个视图函数&#xff0c;并添加到urlpatterns中用于访问。 from django.contrib import admin from django.urls import path from django.shortcuts import HttpResponse# 默认的地址为&#xff1a;http://127.0.0.1:8000/# 如果我想要访问默…

Java 8使用Stream流去除一个list中包含另一个list已存在的某个字段的对象

项目场景&#xff1a; 在Java中&#xff0c;我们经常会遇到需要对List中的数据进行操作的情况。有时候&#xff0c;我们需要从一个List中删除另一个List已经包含的数据。这种情况下&#xff0c;我们可以使用Java Stream来简洁高效地完成操作。 代码示例 假设我们有两个对象列表…

CentOS7网络配置,解决不能联网、ping不通外网、主机的问题

1. 重置 关闭Centos系统 编辑->虚拟网络编辑器 还原默认设置 2. 记录基本信息 查看网关地址,并记录在小本本上 查看网段,记录下 3. 修改网卡配置 启动Centos系统 非root用户,切换root su root查看Mac地址 ifconfig 或 ip addr记录下来 修改配置文件 vim /et…