WebrtcNode publish 流程

news/2024/12/5 7:31:26/

WebrtcNode publish 流程

1. AmqpClient - RpcServer New message received

AmqpClient - RpcServer New message received {method: 'publish',args: ['67f9309ce6e645fc8a4bb9cac6406eb2','webrtc',{transportId: '67f9309ce6e645fc8a4bb9cac6406eb2',tracks: [Array],controller: 'conference-56eea170c80b82268f69@192.168.221.62_0',owner: 'LRvj457Mpnk8QX0uAAAB'}],corrID: 13,replyTo: 'amq.gen-p1mY173rLRg1y9y5sqwpWA'
}

这个消息是从conference-agent 发送过来的。

2. WebrtcNode - publish

webrtc_agent/webrtc/index.js

WebrtcNode - publish, 
operationId: 67f9309ce6e645fc8a4bb9cac6406eb2 
connectionType: webrtc 
options: {transportId: '67f9309ce6e645fc8a4bb9cac6406eb2',tracks: [{mid: '0',source: 'mic',type: 'audio',formatPreference: [Object]},{mid: '1',source: 'camera',type: 'video',formatPreference: [Object]}],controller: 'conference-56eea170c80b82268f69@192.168.221.62_0',owner: 'LRvj457Mpnk8QX0uAAAB'
}2023-04-04T11:45:26.657  - DEBUG: WebrtcNode - publish, options.formatPreference {optional: [{ codec: 'opus', sampleRate: 48000, channelNum: 2 },{ codec: 'isac', sampleRate: 16000 },{ codec: 'isac', sampleRate: 32000 },{ codec: 'g722', sampleRate: 16000, channelNum: 1 },{ codec: 'pcma' },{ codec: 'pcmu' },{ codec: 'aac' },{ codec: 'ac3' },{ codec: 'nellymoser' },{ codec: 'ilbc' }]
}
2023-04-04T11:45:26.657  - DEBUG: WebrtcNode - publish, options.formatPreference {optional: [{ codec: 'h264' },{ codec: 'vp8' },{ codec: 'vp9' },{ codec: 'av1' },{ codec: 'h265' }]
}
		/** For operations on type webrtc, publicTrackId is connectionId.* For operations on type internal, operationId is connectionId.*/// functions: publish, unpublish, subscribe, unsubscribe, linkup, cutoff// options = {//   transportId,//   tracks = [{mid, type, formatPreference, scalabilityMode}],//   controller, owner// }// 其中 controller - 发送消息,发送给谁 to// // formatPreference = {preferred: MediaFormat, optional: [MediaFormat]}that.publish = function (operationId, connectionType, options, callback) {log.debug('publish, operationId:', operationId, 'connectionType:', connectionType, 'options:', options);if (mappingTransports.has(operationId)) {return callback('callback', {type: 'failed', reason: 'Connection already exists:'+operationId});}// WrtcConnectionvar conn = null;if (connectionType === 'webrtc') {if (!options.transportId) {// Generate a transportId}// 1. 创建 WebRTCConnectionconn = createWebRTCConnection(options.transportId, options.controller, options.owner);// 2. addTrackOperationoptions.tracks.forEach(function trackOp(t) { // t = options.tracksconn.addTrackOperation(operationId, 'sendonly', t);});mappingTransports.set(operationId, options.transportId);callback('callback', 'ok');} else {log.error('Connection type invalid:' + connectionType);}if (!conn) {log.error('Create connection failed', operationId, connectionType);callback('callback', {type: 'failed', reason: 'Create Connection failed'});}};

???callback 是哪里来的

2.1 createWebRTCConnection

创建WebRtcConnection

小节 3

2.2 addTrackOperation

小节 4

3. WebrtcNode - createWebRTCConnection——返回WrtcConnection

webrtc_agent/webrtc/index.js

 var createWebRTCConnection = function (transportId, controller, owner) {if (peerConnections.has(transportId)) {log.debug('PeerConnection already created:', transportId);return peerConnections.get(transportId);}var connection = new WrtcConnection({connectionId: transportId,threadPool: threadPool,ioThreadPool: ioThreadPool,network_interfaces: global.config.webrtc.network_interfaces,owner,}, function onTransportStatus(status) {notifyTransportStatus(controller, transportId, status);}, function onTrack(trackInfo) {handleTrackInfo(transportId, trackInfo, controller);});// map 存放WebRtcconnecitonpeerConnections.set(transportId, connection);// map 在同一个transportId,存放trackId对应的publicTrackId的mapmappingPublicId.set(transportId, new Map());connection.controller = controller;return connection;};

3.1 peerConnections 成员

存放 WrtcConnection,transportId与connection一一对应

// Map { transportId => WrtcConnection }
var peerConnections = new Map();
----------------------------------       
peerConnections.set(transportId, connection);

3.2 mappingPublicId成员

// Map { transportId => Map { trackId => publicTrackId } }
var mappingPublicId = new Map();
--------------------------------------------
mappingPublicId.set(transportId, new Map());
  • 主要是存放publicTrackId, 在handleTrackInfotrack-added,或 destroyTransport 中获取publicTrackId
  • createWebRTCConnection 的时候,只是创建了空的Map,而Map中存放的{ trackId => publicTrackId } },是在 handleTrackInfotrackInfo.type === 'track-added' 中存入的。

dist-debug/webrtc_agent/webrtc/index.js

 var handleTrackInfo = function (transportId, trackInfo, controller) {var publicTrackId;var updateInfo;if (trackInfo.type === 'track-added') {// Generate public track IDconst track = trackInfo.track;publicTrackId = transportId + '-' + track.id;if (mediaTracks.has(publicTrackId)) {log.error('Conflict public track id:', publicTrackId, transportId, track.id);return;}...mappingPublicId.get(transportId).set(track.id, publicTrackId);...}

mappingTransports

		// Map { operationId => transportId }var mappingTransports = new Map();

主要是在publish,subscribe的时候,存放了transportId,在publish,subscribe 重复创建connection。

====关于id的小结

  • transportId, 对应的就是WebRtcConnection,各种连接conneciton id都是这个;存放在peerConnections

  • trackId,就是对应的track,例如audio track,videotrack, 从0开始,往上递增;在同一个transportId下,tackId 对应的就是publicTrackId;

  • publicTrackId = transportId + ‘-’ + trackId; publicTrackId 对应的就是 WrtcTrack, 存放在mediatTracks;

  • operationId 和transportId 一一对应,看日志,目前两个值是一样的, 存放在mappingTransports;
    在这里插入图片描述

3.3 new WrtcConnection——创建rtc连接,并初始化

webrtc_agent/webrtc/wrtcConnection.js

 module.exports = function (spec, on_status, on_track) {...wrtc = new Connection(wrtcId, threadPool, ioThreadPool, { ipAddresses });// CallBase后面的文章会讲到wrtc.callBase = new CallBase();// wrtc.addMediaStream(wrtcId, {label: ''}, direction === 'in');initWebRtcConnection(wrtc);return that;
};

3.3.1 new Connection

小节 3.4 , 创建WebrtcConnection

3.3.2 WrtcConnection - initWebRtcConnection

小节 3.5,主要是注册监听事件, sdp,candidate 协商结果, 都是从c++ callback 回来

3.4 new Connection——创建c++的WebrtcConnection

webrtc_agent/webrtc/connection.js

class Connection extends EventEmitter {constructor (id, threadPool, ioThreadPool, options = {}) {super();log.info(`message: Connection, id: ${id}`);this.id = id;this.threadPool = threadPool;this.ioThreadPool = ioThreadPool;this.mediaConfiguration = 'default';this.mediaStreams = new Map();this.initialized = false;this.options = options;this.ipAddresses = options.ipAddresses || '';this.trickleIce = options.trickleIce || false;this.metadata = this.options.metadata || {};this.isProcessingRemoteSdp = false;this.ready = false;
// native 的addon.WebRtcConnectionthis.wrtc = this._createWrtc();}
...
}

3.4.1 -----------Connection._createWrtc

 _createWrtc() {var wrtc = new addon.WebRtcConnection(this.threadPool, this.ioThreadPool, this.id,global.config.webrtc.stunserver,global.config.webrtc.stunport,global.config.webrtc.minport,global.config.webrtc.maxport,false, //this.trickleIce,this._getMediaConfiguration(this.mediaConfiguration),false,'', // turnserver,'', // turnport,'', //turnusername,'', //turnpass,'', //networkinterfacethis.ipAddresses);return wrtc;}

3.4.2 NAN_METHOD(WebRtcConnection::New)

source/agent/webrtc/rtcConn/WebRtcConnection.cc

NAN_METHOD(WebRtcConnection::New) {...WebRtcConnection* obj = new WebRtcConnection();obj->me = std::make_shared<erizo::WebRtcConnection>(worker, io_worker, wrtcId, iceConfig,rtp_mappings, ext_mappings, obj);uv_async_init(uv_default_loop(), &obj->async_, &WebRtcConnection::eventsCallback);obj->Wrap(info.This());info.GetReturnValue().Set(info.This());obj->asyncResource_ = new Nan::AsyncResource("WebRtcConnectionCallback");...
}

3.4.3 WebRtcConnection::WebRtcConnection

source/agent/webrtc/rtcConn/erizo/src/erizo/WebRtcConnection.cpp

WebRtcConnection::WebRtcConnection(std::shared_ptr<Worker> worker, std::shared_ptr<IOWorker> io_worker,const std::string& connection_id, const IceConfig& ice_config, const std::vector<RtpMap> rtp_mappings,const std::vector<erizo::ExtMap> ext_mappings, WebRtcConnectionEventListener* listener) :connection_id_{connection_id},audio_enabled_{false}, video_enabled_{false}, bundle_{false}, conn_event_listener_{listener},ice_config_{ice_config}, rtp_mappings_{rtp_mappings}, extension_processor_{ext_mappings},worker_{worker}, io_worker_{io_worker},remote_sdp_{std::make_shared<SdpInfo>(rtp_mappings)}, local_sdp_{std::make_shared<SdpInfo>(rtp_mappings)},audio_muted_{false}, video_muted_{false}, first_remote_sdp_processed_{false}{ELOG_INFO("%s message: constructor, stunserver: %s, stunPort: %d, minPort: %d, maxPort: %d",toLog(), ice_config.stun_server.c_str(), ice_config.stun_port, ice_config.min_port, ice_config.max_port);stats_ = std::make_shared<Stats>();// distributor_ = std::unique_ptr<BandwidthDistributionAlgorithm>(new TargetVideoBWDistributor());global_state_ = CONN_INITIAL;trickle_enabled_ = ice_config_.should_trickle;slide_show_mode_ = false;sending_ = true;
}

3.5 WrtcConnection - initWebRtcConnection

webrtc_agent/webrtc/wrtcConnection.js

/** Given a WebRtcConnection waits for the state CANDIDATES_GATHERED for set remote SDP.*/// wrtc 是Connection,Connection 继承于EventEmittervar initWebRtcConnection = function (wrtc) {// EventEmitter.on()用于监听事件// 在c++从回调到js中,就是在Connection.init中触发// Connection wrtcwrtc.on('status_event', (evt, status) => {if (evt.type === 'answer') {processAnswer(evt.sdp);const message = localSdp.toString();log.debug('Answer SDP', message);on_status({type: 'answer', sdp: message});} else if (evt.type === 'candidate') {let message = evt.candidate;networkInterfaces.forEach((i) => {if (i.ip_address && i.replaced_ip_address) {message = message.replace(new RegExp(i.ip_address, 'g'), i.replaced_ip_address);}});on_status({type: 'candidate', candidate: message});} else if (evt.type === 'failed') {log.warn('ICE failed, ', status, wrtc.id);on_status({type: 'failed', reason: 'Ice procedure failed.'});} else if (evt.type === 'ready') {log.debug('Connection ready, ', wrtc.wrtcId);on_status({type: 'ready'});}});// Connection wrtc, 小节 3.5.2//  var wrtcId = spec.connectionId;就是transportIdwrtc.init(wrtcId);};

3.5.1 Connection/EventEmitter.on——监听事件

wrtc.on('status_event', (evt, status) => {if (evt.type === 'answer') {processAnswer(evt.sdp);const message = localSdp.toString();log.debug('Answer SDP', message);on_status({type: 'answer', sdp: message});} else if (evt.type === 'candidate') {let message = evt.candidate;networkInterfaces.forEach((i) => {if (i.ip_address && i.replaced_ip_address) {message = message.replace(new RegExp(i.ip_address, 'g'), i.replaced_ip_address);}});on_status({type: 'candidate', candidate: message});} else if (evt.type === 'failed') {log.warn('ICE failed, ', status, wrtc.id);on_status({type: 'failed', reason: 'Ice procedure failed.'});} else if (evt.type === 'ready') {log.debug('Connection ready, ', wrtc.wrtcId);on_status({type: 'ready'});}});

3.5.2 Connection.init——初始化c++的WebRtcConnection

webrtc_agent/webrtc/connection.js

//  streamId 就是 var wrtcId = spec.connectionId;就是transportId
init(streamId) {if (this.initialized) {return false;}const firstStreamId = streamId;this.initialized = true;log.debug(`message: Init Connection, connectionId: ${this.id} `+`${logger.objectToLog(this.options)}`);this.sessionVersion = 0;// WebRtcConnection c++ wrapper, 调用c++this.wrtc.init((newStatus, mess, streamId) => {// 对应日志3.5.6log.debug('message: WebRtcConnection status update, ' +'id: ' + this.id + ', status: ' + newStatus +', ' + logger.objectToLog(this.metadata) + mess);switch(newStatus) {case CONN_INITIAL:// 触发3.5.1this.emit('status_event', {type: 'started'}, newStatus);break;case CONN_SDP_PROCESSED:this.isProcessingRemoteSdp = false;// this.latestSdp = mess;// this._maybeSendAnswer(newStatus, streamId);break;case CONN_SDP:this.latestSdp = mess;this._maybeSendAnswer(newStatus, streamId);break;case CONN_GATHERED:this.alreadyGathered = true;this.latestSdp = mess;this._maybeSendAnswer(newStatus, firstStreamId);break;case CONN_CANDIDATE:mess = mess.replace(this.options.privateRegexp, this.options.publicIP);this.emit('status_event', {type: 'candidate', candidate: mess}, newStatus);break;case CONN_FAILED:log.warn('message: failed the ICE process, ' + 'code: ' + WARN_BAD_CONNECTION +', id: ' + this.id);this.emit('status_event', {type: 'failed', sdp: mess}, newStatus);break;case CONN_READY:log.debug('message: connection ready, ' + 'id: ' + this.id +', ' + 'status: ' + newStatus + ' ' + mess + ',' + streamId);if (!this.ready) {this.ready = true;this.emit('status_event', {type: 'ready'}, newStatus);}break;}});if (this.options.createOffer) {log.debug('message: create offer requested, id:', this.id);const audioEnabled = this.options.createOffer.audio;const videoEnabled = this.options.createOffer.video;const bundle = this.options.createOffer.bundle;// WebRtcConnection c++ wrapper, 调用c++this.wrtc.createOffer(videoEnabled, audioEnabled, bundle);}// 触发3.5.1,代码就是3.5this.emit('status_event', {type: 'initializing'});return true;}

3.5.3 NAN_METHOD(WebRtcConnection::init)

source/agent/webrtc/rtcConn/WebRtcConnection.cc

NAN_METHOD(WebRtcConnection::init) {WebRtcConnection* obj = Nan::ObjectWrap::Unwrap<WebRtcConnection>(info.Holder());std::shared_ptr<erizo::WebRtcConnection> me = obj->me;obj->eventCallback_ = new Nan::Callback(info[0].As<Function>());bool r = me->init();info.GetReturnValue().Set(Nan::New(r));
}

3.5.4 WebRtcConnection::init()

source/agent/webrtc/rtcConn/erizo/src/erizo/WebRtcConnection.cpp

bool WebRtcConnection::init() {maybeNotifyWebRtcConnectionEvent(global_state_, "");return true;
}

3.5.5 WebRtcConnection::maybeNotifyWebRtcConnectionEvent

void WebRtcConnection::maybeNotifyWebRtcConnectionEvent(const WebRTCEvent& event, const std::string& message,const std::string& stream_id) {boost::mutex::scoped_lock lock(event_listener_mutex_);if (!conn_event_listener_) {return;}conn_event_listener_->notifyEvent(event, message, stream_id);
}

—> 3.5.2

this.wrtc.init((newStatus, mess, streamId) => {log.debug('message: WebRtcConnection status update, ' +'id: ' + this.id + ', status: ' + newStatus +', ' + logger.objectToLog(this.metadata) + mess);switch(newStatus) {case CONN_INITIAL:this.emit('status_event', {type: 'started'}, newStatus);break;....});

—> 3.5.1

wrtc.on('status_event', (evt, status) => {...
});

3.5.6 log—》3.5.2

这里对应的就是3.5.2 小节中 从c++ 调用到js的状态更新,CONN_INITIAL=101 初始化成功

Connection - message: WebRtcConnection status update, 
id: 67f9309ce6e645fc8a4bb9cac6406eb2, status: 101, {}

CONN_INITIAL = 101

3.5.7 status

const CONN_INITIAL        = 101;
const CONN_STARTED        = 102;
const CONN_GATHERED       = 103;
const CONN_READY          = 104;
const CONN_FINISHED       = 105;
const CONN_CANDIDATE      = 201;
const CONN_SDP            = 202;
const CONN_SDP_PROCESSED  = 203;
const CONN_FAILED         = 500;
const WARN_BAD_CONNECTION = 502;

4. WrtcConnection.addTrackOperation

dist/webrtc_agent/webrtc/wrtcConnection.js

// option = {mid, type, formatPreference, scalabilityMode}that.addTrackOperation = function (operationId, sdpDirection, option) {var ret = false;var {mid, type, formatPreference, scalabilityMode} = option;if (!operationMap.has(mid)) {log.debug(`MID ${mid} for operation ${operationId} add`);const enabled = true;// mapoperationMap.set(mid, {operationId, type, sdpDirection, formatPreference, enabled});if (scalabilityMode) {operationMap.get(mid).scalabilityMode = scalabilityMode;}ret = true;} else {log.warn(`MID ${mid} has mapped operation ${operationMap.get(mid).operationId}`);}return ret;};

4.1 WrtcConnection.operationMap

  // mid => { operationId, sdpDirection, type, formatPreference, rids, enabled, finalFormat }var operationMap = new Map();

mid 就是trackId,

存放了track相关的信息,如operationId, sdpDirection, type, formatPreference, rids, enabled, finalFormat

5. 流程图

在这里插入图片描述


http://www.ppmy.cn/news/107597.html

相关文章

c#快速入门(下)

欢迎来到Cefler的博客&#x1f601; &#x1f54c;博客主页&#xff1a;那个传说中的man的主页 &#x1f3e0;个人专栏&#xff1a;题目解析 &#x1f30e;推荐文章&#xff1a;题目大解析2 目录 &#x1f449;&#x1f3fb;Inline和lambda委托和lambda &#x1f449;&#x1f…

深入解析CPU性能火焰图生成的内部原理

在进行CPU性能优化的时候&#xff0c;我们经常先需要分析出来我们的应用程序中的CPU资源在哪些函数中使用的比较多&#xff0c;这样才能高效地优化。一个非常好的分析工具就是《性能之巅》作者 Brendan Gregg 发明的火焰图。 我们今天就来介绍下火焰图的使用方法&#xff0c;以…

仪表板展示 | DataEase看世界:数据呈现世界油价变化

背景介绍 最近几个月&#xff0c;全球能源市场一直处于动荡不安的状态&#xff0c;与石油相关的新闻也非常频繁。2023年2月10日&#xff0c;面对西方多轮限价举措&#xff0c;俄罗斯副总理亚历山大诺瓦克宣布&#xff0c;俄罗斯将在3月把每日原油产量下调50万桶。目前&#xf…

Apache应用和配置

目录 构建虚拟 Web 主机基于域名的虚拟主机基于IP地址的虚拟主机基于端口的虚拟主机 Apache 连接保持构建Web虚拟目录与用户授权限制Apache 日志分割 构建虚拟 Web 主机 虚拟Web主机指的是在同一台服务器中运行多个Web站点&#xff0c;其中每一个站点实际上并不独立占用整个服务…

Linux——进程的等待

目录 前言&#xff1a; 一.进程等待 父进程回收子进程信息的相关函数1&#xff1a;wait函数 实验案例1&#xff1a;设置wait函数参数为NULL 实验案例2&#xff1a;wait函数带wstatus参数的案例&#xff1a;当子进程正常运行完退出时 情况3&#xff1a; wait函数带wstatus参数…

ABP VNext认证授权获取Token

ABP VNext认证授权获取Token 1.Password授权方式获取1.1 请求说明1.2 请求示例1.3 请求参数 2.authorization_code模式获取2.1 无认证授权&#xff0c;跳转至授权认证中心2.2 用户密码登录2.3 登录成功&#xff0c;服务器会跳转至redirect_url所指地址 1.Password授权方式获取 …

数据库的三大设计范式和BCNF

数据库的三大设计范式 第一范式&#xff08;1NF&#xff09;&#xff1a;确保数据表中的每个列都是原子的&#xff0c;即每个列都包含不可再分的数据项。这意味着在每个列中不能有重复的数据&#xff0c;也不能包含多个值。每个数据项应该是独立的&#xff0c;以便能够对其进行…

线程的取消和互斥

线程的互斥和同步 临界资源概念&#xff1a; 不能同时访问的资源&#xff0c;比如写文件&#xff0c;只能由一个线程写&#xff0c;同时写会写乱。 比如外设打印机&#xff0c;打印的时候只能由一个程序使用。 外设基本上都是不能共享的资源。 生活中比如卫生间&#xff0…