一、前言:
Pacer 是一种数据发送调度机制。它的主要功能是根据网络带宽限制、网络拥塞控制的反馈以及媒体的发送策略,对数据包的发送进行适配和节奏调度,以避免网络拥塞、减少丢包并保证流媒体传输的平滑性。
二、核心概念:
2.1 Pacer 的作用:
- 流量平滑:音频、视频等媒体数据包可能在生成时间上分布不均,通过 Pacer,可以将这些数据包的发送节奏调整成更均匀的形式,避免突发流量冲击网络。
- 速率控制:Pacer 会根据传输码率(由拥塞控制算法提供,例如 Google 的 Congestion Control),动态调整包的发送速率,确保发送端不超过网络容量。
- 优先级处理:Pacer 支持区分不同的流类型(例如音频、视频、FEC 冗余流等),为高优先级流(如音频)留出更多资源或者保证其更低的延迟。
2.2 Pacer 的工作流程:
- 音视频流媒体按照编码器的设定发送数据包。
- 数据包会先进入一个队列(Packet Queue)。
- Pacer 通过定时器或事件驱动机制,根据当前允许的发送速率(由拥塞控制模块动态提供)从队列中提取数据包发送。
- 如果发送速率受限,多余的包会继续留在队列中等待下一轮调度。
以下是简化的机制模型:
[Media Encoder] -> [Packet Queue] -> Pacer -> [Network Sender]
三、核心代码:
先想想,根据前面的概念阐述,如果要让你写,你怎么写呢?是不是重点实现以下几个模块:
packet_router.h/cpp
:负责将数据包分发到具体的网络发送端。packet_sender.h/cpp
:实现了调度核心逻辑。packet_queue.h/cpp
:实现用于存储待发送数据包的队列。
现在具体看看webrtc如何做的。
3.1、Pacer对象创建:
-
PeerConnection当中创建Call对象;
// 从上到下调用顺序如下: PeerConnectionFactory::CreatePeerConnectionOrError --> PeerConnectionFactory::CreateCall_w --> CallFactory::CreateCall --> Call::Create
-
Call对象创建
RtpTransportControllerSend
:创建Call时候首先得创建Call::Config,便于以后创建Pacer对象时候作为参数;
Call* Call::Create(const Call::Config& config,Clock* clock,rtc::scoped_refptr<SharedModuleThread> call_thread,std::unique_ptr<ProcessThread> pacer_thread) {RTC_DCHECK(config.task_queue_factory);return new internal::Call(clock, config,std::make_unique<RtpTransportControllerSend>(clock, config.event_log, config.network_state_predictor_factory,config.network_controller_factory, config.bitrate_config,std::move(pacer_thread), config.task_queue_factory, config.trials),std::move(call_thread), config.task_queue_factory); }
这儿的
config.trials
,会导致后面创建PacedSender对象的时候,处理模式为kPeriodic,也就是周期处理模式; -
RtpTransportControllerSend
创建PacerSender
对象:RtpTransportControllerSend
是 WebRTC 中负责发送侧 RTP 流量控制的核心模块。它是 RTP 传输的控制器,负责管理和协调发送侧的各种功能,包括比特率管理(带宽估算 BWE)、节奏控制(Pacer)、网络状态预测和反馈处理等。我们先看看构造函数:RtpTransportControllerSend::RtpTransportControllerSend(Clock* clock,webrtc::RtcEventLog* event_log,NetworkStatePredictorFactoryInterface* predictor_factory,NetworkControllerFactoryInterface* controller_factory,const BitrateConstraints& bitrate_config,std::unique_ptr<ProcessThread> process_thread,TaskQueueFactory* task_queue_factory,const WebRtcKeyValueConfig* trials): clock_(clock),event_log_(event_log),bitrate_configurator_(bitrate_config),process_thread_started_(false),process_thread_(std::move(process_thread)),use_task_queue_pacer_(IsEnabled(trials, "WebRTC-TaskQueuePacer")),process_thread_pacer_(use_task_queue_pacer_? nullptr: new PacedSender(clock,&packet_router_,event_log,trials,process_thread_.get())),task_queue_pacer_(use_task_queue_pacer_? new TaskQueuePacedSender(clock,&packet_router_,event_log,trials,task_queue_factory,/*hold_back_window = */ PacingController::kMinSleepTime): nullptr),observer_(nullptr),controller_factory_override_(controller_factory),controller_factory_fallback_(std::make_unique<GoogCcNetworkControllerFactory>(predictor_factory)),process_interval_(controller_factory_fallback_->GetProcessInterval()),last_report_block_time_(Timestamp::Millis(clock_->TimeInMilliseconds())),reset_feedback_on_route_change_(!IsEnabled(trials, "WebRTC-Bwe-NoFeedbackReset")),send_side_bwe_with_overhead_(!IsDisabled(trials, "WebRTC-SendSideBwe-WithOverhead")),add_pacing_to_cwin_(IsEnabled(trials, "WebRTC-AddPacingToCongestionWindowPushback")),relay_bandwidth_cap_("relay_cap", DataRate::PlusInfinity()),transport_overhead_bytes_per_packet_(0),network_available_(false),retransmission_rate_limiter_(clock, kRetransmitWindowSizeMs),task_queue_(task_queue_factory->CreateTaskQueue("rtp_send_controller",TaskQueueFactory::Priority::NORMAL)) {ParseFieldTrial({&relay_bandwidth_cap_},trials->Lookup("WebRTC-Bwe-NetworkRouteConstraints"));initial_config_.constraints = ConvertConstraints(bitrate_config, clock_);initial_config_.event_log = event_log;initial_config_.key_value_config = trials;RTC_DCHECK(bitrate_config.start_bitrate_bps > 0);pacer()->SetPacingRates(DataRate::BitsPerSec(bitrate_config.start_bitrate_bps), DataRate::Zero());if (absl::StartsWith(trials->Lookup("WebRTC-LazyPacerStart"), "Disabled")) {EnsureStarted();} }
我们分段看下,主要分为四大步:
(1) Pacer 类型选择
构造函数根据配置决定使用
TaskQueuePacedSender
或传统的PacedSender
:use_task_queue_pacer_(IsEnabled(trials, "WebRTC-TaskQueuePacer")), process_thread_pacer_(use_task_queue_pacer_? nullptr: new PacedSender(clock, &packet_router_, event_log, trials, process_thread_.get())), task_queue_pacer_(use_task_queue_pacer_? new TaskQueuePacedSender(clock, &packet_router_, event_log, trials,task_queue_factory,/*hold_back_window=*/PacingController::kMinSleepTime): nullptr),
- 如果
WebRTC-TaskQueuePacer
被启用,task_queue_pacer_
将被初始化,表示使用 TaskQueue 驱动的实现。 - 如果未启用,则使用传统的
PacedSender
,并传入线程process_thread_
用于驱动调度。
(2) 任务队列初始化
任务队列
task_queue_
的创建逻辑:task_queue_(task_queue_factory->CreateTaskQueue("rtp_send_controller",TaskQueueFactory::Priority::NORMAL))
- 为
rtp_send_controller
创建了一个中等优先级的任务队列。这意味着发送侧 RTP 控制器的任务拥有一定优先,但不会抢占核心实时任务(如音频编码)。
(3) 启动策略
根据
WebRTC-LazyPacerStart
配置决定是否延迟启动 Pacer:if (absl::StartsWith(trials->Lookup("WebRTC-LazyPacerStart"), "Disabled")) {EnsureStarted(); }
- “Lazy Start” 功能通常用于优化资源启动时的开销。如果禁用延迟启动,则直接调用
EnsureStarted()
启动所有功能。
(4) Pacer 初始化
为 Pacer 设置初始的码率:
pacer()->SetPacingRates(DataRate::BitsPerSec(bitrate_config.start_bitrate_bps), DataRate::Zero());
SetPacingRates
需要传入目标的比特率(start_bitrate_bps
)和填充数据的比特率(这里为0
)。
- 如果
3.2、PacedSender:
PacedSender 是 WebRTC 中最核心的 Pacer 调度类,负责实现基于速率的节奏化数据发送。
PacedSender::PacedSender(Clock* clock,PacketRouter* packet_router,RtcEventLog* event_log,const WebRtcKeyValueConfig* field_trials,ProcessThread* process_thread): process_mode_( (field_trials != nullptr &&absl::StartsWith(field_trials->Lookup("WebRTC-Pacer-DynamicProcess"), "Enabled"))? PacingController::ProcessMode::kDynamic: PacingController::ProcessMode::kPeriodic),pacing_controller_(clock, // 构造 PacingControllerpacket_router,event_log,field_trials,process_mode_),clock_(clock),process_thread_(process_thread) {if (process_thread_)process_thread_->RegisterModule(&module_proxy_, RTC_FROM_HERE);
}
PacedSender
主要干两件事:
- 设置
PacedSender
处理模式为周期模式ProcessMode::kPeriodic
(field_trials带回来的); - 创建了一个
PacingController
对象到pacing_controller_
,Pacer模块中的主要逻辑都是由PacingController
完成的;
构建好PacedSender
和PacingCOntroller
对象之后,最后启动Pacer线程,以后所有需要发送的数据包,都需要通过PacingController
的ProcessPackets
去发送。
3.3、两个重要函数:
1)PacingController::NextSendTime():
Pacing模块当中,获取每次执行的时间(5ms)
Timestamp PacingController::NextSendTime() const {// 。。。// kPeriodic模式(其实目前也仅仅支持这种周期模式)if (mode_ == ProcessMode::kPeriodic) {// In periodic non-probing mode, we just have a fixed interval.// 在周期性非探测模式下,我们只有一个固定间隔。就是使用上次处理的时间,加上min_packet_limit_(默认是5ms)return last_process_time_ + min_packet_limit_;}// ...return last_process_time_ + kPausedProcessInterval;
}
2)PacingController::ProcessPackets():
周期发送数据包。这个函数同样很长,我只保留关键逻辑:
- 关键逻辑:
void PacingController::ProcessPackets() {// 动态模式目前版本不会用到(现在只用到周期模式)if (mode_ == ProcessMode::kDynamic) {// 。。。}// 判断当前是否应该发送保活包(当没有发送音视频数据的是时候,应该发送保活)if (ShouldSendKeepalive(now)) {// 。。。}// 如果处于暂停状态(重新进行媒体协商的时候,需要置为暂停状态),不能发送音视频数据包if (paused_) {return;}// 如果逝去的时间大于0(说明已经过了一段时间了),为media budget设置目标码率// webrtc是通过media budget设置目标码率的,ProcessPackets 正是根据media budget控制码流大小的// 如果发现media budget是500kbps,那么每次处理就是按照500kbps进行发送if (elapsed_time > TimeDelta::Zero()) {// 将前面设置给pacer的目标码率设置给media_budget}// 是否需要探测带宽码率,这块主要根据拥塞控制来if (is_probing) {// 获得pacing_info}// The paused state is checked in the loop since it leaves the critical// section allowing the paused state to be changed from other code.// 核心逻辑while (!paused_) { // 是否处于暂停状态// 下面单独分析}if (is_probing) {// 发送探测包}
}
- 核心逻辑:(同样因为代码很多,只保留关键逻辑)
// 核心逻辑while (!paused_) { // 是否处于暂停状态// Fetch the next packet, so long as queue is not empty or budget is not// exhausted.// 从Packet队列( RoundRobinPacketQueue packet_queue_ )中,取出我们要发送的packet// 如果我们获取到有数据的packet,那么就发送出去,但是如果我们没有获取到有数据的packet,rtp_packet为null,又分为两种情况:// 1.packet队列的确没有音视频数据包,这个时候,我们应该发送padding,让底层网络给我们将带宽留着// 否则,底层网络会将带宽进行回收,等到真正需要发送数据的时候,没有带宽,会造成延迟;// 2.有数据,但是我们现在已经发送的数据,达到了我们设定的目标码流,也不能让发送了。std::unique_ptr<RtpPacketToSend> rtp_packet =GetPendingPacket(pacing_info, target_send_time, now);if (rtp_packet == nullptr) {// 没有获取到数据包// 1、检查是否应该发送padding包,如果需要,将Padding包插入队列,并重新执行while循环。// 2、如果不需要,则退出while循环,说明这次不能再发送数据了。DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);if (padding_to_add > DataSize::Zero()) {std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =packet_sender_->GeneratePadding(padding_to_add);if (padding_packets.empty()) {// No padding packets were generated, quite send loop.break;}for (auto& packet : padding_packets) {EnqueuePacket(std::move(packet));}// Continue loop to send the padding that was just added.continue;}// Can't fetch new packet and no padding to send, exit send loop.break;}// 否则,说明我们获取了一个packet,那么计算packet的payload 大小是多少字节const RtpPacketMediaType packet_type = *rtp_packet->packet_type();DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() +rtp_packet->padding_size());// 将Packet发送出去packet_sender_->SendPacket(std::move(rtp_packet), pacing_info);for (auto& packet : packet_sender_->FetchFec()) {EnqueuePacket(std::move(packet));}data_sent += packet_size;// Send done, update send/process time to the target send time.// 记录发送的packet大小和时间(下次的发送时间就是根据我们这次更新的时间计算出来的)OnPacketSent(packet_type, packet_size, target_send_time);}
核心逻辑就是发数据包,有媒体数据就考虑发送媒体数据,没有就发送填充数据,进行保活,当然,发送两者的时候都应该考虑目标码率,已经超过我们设置的目标码率就等等再发。
-
获取待发送的数据包:
看看上面获取数据包的函数
GetPendingPacket
究竟干了啥:std::unique_ptr<RtpPacketToSend> PacingController::GetPendingPacket(const PacedPacketInfo& pacing_info,Timestamp target_send_time,Timestamp now) {if (packet_queue_.Empty()) {return nullptr;}// First, check if there is any reason _not_ to send the next queued packet.// Unpaced audio packets and probes are exempted from send checks.// 不需要平滑处理的特殊情况:// pace_audio_ 为false,说明音频包不需要平滑处理,并且,// packet_queue_.LeadingAudioPacketEnqueueTime().has_value() 说明队列中下一个数据包是音频包bool unpaced_audio_packet =!pace_audio_ && packet_queue_.LeadingAudioPacketEnqueueTime().has_value();bool is_probe = pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe;if (!unpaced_audio_packet && !is_probe) {// 网络拥塞状况下不能发送if (Congested()) {// Don't send anything if congested.return nullptr;}// 没有拥塞,会检测模式是否为周期模式if (mode_ == ProcessMode::kPeriodic) {// media_budget_ 中可供我们使用的字节数不大于0,表示我们已经达到了目标码率,我们也不发送if (media_budget_.bytes_remaining() <= 0) {// Not enough budget.return nullptr;}} else {// 动态模式现在还不成熟,不关心}}// 如果不是上面的情况,那么,我们需要从队列中pop出一个packet,让ProcessPackets函数将其发送出去return packet_queue_.Pop(); }
主要就是先做了一些发送状态的校验:网络拥塞情况下不能发送、达到目标码率了也不能发送,如果校验通过可以发送,就从队列中取出一个数据包进行发送。
四、总结:
本文主要介绍了Pacer模块的作用,以及相关重要的框架代码,其实还有很多细节后面再打开看看。