mediasoup源码分享(四)channel返回信令及notify通知

news/2024/12/22 20:02:42/

channel返回信令及notify通知

    • 前言
    • 正常成功返回Accept
    • 异常失败返回Reject
    • mediasoup主动推送,Emit
    • mediasoup推送数据到信令服务的具体实现:

前言

mediasoup源码分析(三)channel创建及信令交互一文中介绍了mediasoup中channel的创建,接下来在本文中,会说明mediasoup c++层如何将处理后的数据返回到信令服务中。

在 Mediasoup 中,C++ 编写的媒体层(mediasoup-worker)与信令服务之间的通信通常是通过 JSON 格式的数据进行的。这种数据格式易于解析和生成,并且被广泛用于 Web 服务和客户端之间的通信。
一个有三个返回的接口

正常成功返回Accept

void Request::Accept(Json::Value& data){MS_TRACE();static Json::Value emptyData(Json::objectValue);static const Json::StaticString JsonStringId{ "id" };static const Json::StaticString JsonStringAccepted{ "accepted" };static const Json::StaticString JsonStringData{ "data" };MS_ASSERT(!this->replied, "Request already replied");this->replied = true;Json::Value json(Json::objectValue);json[JsonStringId]       = Json::UInt{ this->id };json[JsonStringAccepted] = true;if (data.isObject() || data.isArray())json[JsonStringData] = data;elsejson[JsonStringData] = emptyData;this->channel->Send(json);}

数据格式eg:

  1. 无数据
createRouter返回
{"accepted":true,"data":{},"id":12610940}
  1. 有数据
createWebRtcTransport返回
{"accepted": true,"data": {"dtlsLocalParameters": {"fingerprints": [{"algorithm": "sha-1","value": "4F:6A:93:00:BF:8D:BD:C6:B5:18:64:65:35:83:7A:6F:2F:BF:FE:45"}, {"algorithm": "sha-224","value": "04:9D:E6:35:24:62:C6:1D:B3:34:63:DB:B0:28:8E:77:4D:DD:AD:6B:F7:7A:F9:3D:7D:44:F1:CC"}, {"algorithm": "sha-256","value": "15:8C:71:05:B6:F9:B8:CD:09:7A:BB:1A:D2:CC:64:53:D4:4B:8C:E3:1F:56:C3:2F:F5:C1:73:F8:06:86:5C:C2"}, {"algorithm": "sha-384","value": "FB:61:03:17:88:DE:A8:F7:B3:52:1F:7E:0B:4A:9D:0A:4F:E2:83:B2:F1:60:3E:B4:0F:42:48:AA:48:01:26:A5:82:57:82:33:33:77:9C:3F:4B:C0:C7:30:4F:D0:1C:E4"}, {"algorithm": "sha-512","value": "78:30:EA:FD:D9:2B:FB:9E:45:43:BE:FE:05:D2:33:DC:38:70:63:D7:0F:8E:DC:4F:C9:88:44:75:0C:31:38:70:64:E2:56:3E:78:71:0E:E8:CE:DA:28:97:05:0F:99:90:0A:E6:CE:B1:D0:33:F3:E6:B9:00:67:EC:19:BD:77:48"}],"role": "auto"},"dtlsState": "new","headerExtensionIds": {},"iceLocalCandidates": [{"family": "ipv4","foundation": "udpcandidate","ip": "0.0.0.0","port": 52000,"priority": 1078862079,"protocol": "udp","type": "host"}],"iceLocalParameters": {"iceLite": true,"password": "t6f2mqxz6d3cy46xe7cfr3ywruk9ftji","usernameFragment": "21256833097-1899696"},"iceRole": "controlled","iceState": "new","rtpListener": {"muxIdTable": {},"ridTable": {},"ssrcTable": {}},"transportId": 1899696},"id": 62863332
}

异常失败返回Reject

	void Request::Reject(const char* reason){MS_TRACE();static const Json::StaticString JsonStringId{ "id" };static const Json::StaticString JsonStringRejected{ "rejected" };static const Json::StaticString JsonStringReason{ "reason" };MS_ASSERT(!this->replied, "Request already replied");this->replied = true;Json::Value json(Json::objectValue);json[JsonStringId]       = Json::UInt{ this->id };json[JsonStringRejected] = true;if (reason != nullptr)json[JsonStringReason] = reason;this->channel->Send(json);}

数据格式eg

{"rejected":true,"reason":{},"id":12610945}

mediasoupEmit_122">mediasoup主动推送,Emit

	void Notifier::Emit(uint32_t targetId, const std::string& event, Json::Value& data){MS_TRACE();static const Json::StaticString JsonStringTargetId{ "targetId" };static const Json::StaticString JsonStringEvent{ "event" };static const Json::StaticString JsonStringData{ "data" };Json::Value json(Json::objectValue);json[JsonStringTargetId] = Json::UInt{ targetId };json[JsonStringEvent]    = event;json[JsonStringData]     = data;this->channel->Send(json);}

数据格式eg

{"data":{"entries":[[87344059,-89],[42826186,-24]]},"event":"audiolevels","targetId":37286065}

mediasoup_148">mediasoup推送数据到信令服务的具体实现:

由上面三段代码可以看到,无论是Accept,还是Reject,亦或者是Emit,最终实际都调用的是UnixStreamSocket模块的send接口channel->Send接口

void UnixStreamSocket::Send(Json::Value& msg)
{if (IsClosed())return;// MS_TRACE_STD();std::ostringstream stream;std::string nsPayload;size_t nsPayloadLen;size_t nsNumLen;size_t nsLen;this->jsonWriter->write(msg, &stream);nsPayload    = stream.str();nsPayloadLen = nsPayload.length();if (nsPayloadLen > MessageMaxSize){MS_ERROR_STD("mesage too big");return;}if (nsPayloadLen == 0){nsNumLen       = 1;WriteBuffer[0] = '0';WriteBuffer[1] = ':';WriteBuffer[2] = ',';}else{nsNumLen = static_cast<size_t>(std::ceil(std::log10(static_cast<double>(nsPayloadLen) + 1)));std::sprintf(reinterpret_cast<char*>(WriteBuffer), "%zu:", nsPayloadLen);std::memcpy(WriteBuffer + nsNumLen + 1, nsPayload.c_str(), nsPayloadLen);WriteBuffer[nsNumLen + nsPayloadLen + 1] = ',';}nsLen = nsNumLen + nsPayloadLen + 2;Write(WriteBuffer, nsLen);
}
  • this->jsonWriter:指的是当前 UnixStreamSocket 类实例的 jsonWriter 成员,这个成员是一个指向 JSON 写入器的指针。这个写入器负责将 Json::Value 对象转换成 JSON 格式的字符串。
  • write(msg, &stream):是 jsonWriter 指针所指向对象的一个成员函数,它接受两个参数:
    • msg:要序列化的 Json::Value 对象。
    • &stream:一个引用,指向 std::ostringstream 类型的 stream 对象。这个对象是一个输出流,用于接收序列化后的 JSON 字符串。

当这个函数调用执行时,jsonWriter 会将 msg 对象中的数据转换成 JSON 格式,并将结果输出到 stream 中。这样,stream 就会包含序列化后的 JSON 字符串,这个字符串可以用于后续的网络传输或其他用途。

void UnixStreamSocket::Write(const uint8_t* data, size_t len)
{if (this->closed)return;if (len == 0)return;uv_buf_t buffer;int written;int err;// First try uv_try_write(). In case it can not directly send all the given data// then build a uv_req_t and use uv_write().buffer  = uv_buf_init(reinterpret_cast<char*>(const_cast<uint8_t*>(data)), len);written = uv_try_write(reinterpret_cast<uv_stream_t*>(this->uvHandle), &buffer, 1);// All the data was written. Done.if (written == static_cast<int>(len)){return;}// Cannot write any data at first time. Use uv_write().else if (written == UV_EAGAIN || written == UV_ENOSYS){// Set written to 0 so pendingLen can be properly calculated.written = 0;}// Error. Should not happen.else if (written < 0){MS_ERROR_STD("uv_try_write() failed, closing the socket: %s", uv_strerror(written));Close();// Notify the subclass.UserOnUnixStreamSocketClosed(this->isClosedByPeer);return;}size_t pendingLen = len - written;// Allocate a special UvWriteData struct pointer.auto* writeData = static_cast<UvWriteData*>(std::malloc(sizeof(UvWriteData) + pendingLen));std::memcpy(writeData->store, data + written, pendingLen);writeData->req.data = (void*)writeData;buffer = uv_buf_init(reinterpret_cast<char*>(writeData->store), pendingLen);err = uv_write(&writeData->req,reinterpret_cast<uv_stream_t*>(this->uvHandle),&buffer,1,static_cast<uv_write_cb>(onWrite));if (err != 0)MS_ABORT("uv_write() failed: %s", uv_strerror(err));
}

这段代码使用了libuv库,这是一个跨平台的异步I/O库,用于网络编程。先uv_buf_init初始化,随后使用了uv_try_write和uv_write函数来尝试写入数据,如果一次性写入不成功,则使用异步写入的方式继续写入剩余的数据。此外,代码中还包含了错误处理和内存分配的逻辑。

tips:
关于libuv的介绍,可以看我写的网络编程-libuv介绍一文。


http://www.ppmy.cn/news/1470783.html

相关文章

利用LinkedHashMap实现一个LRU缓存

一、什么是 LRU LRU是 Least Recently Used 的缩写&#xff0c;即最近最少使用&#xff0c;是一种常用的页面置换算法&#xff0c;选择最近最久未使用的页面予以淘汰。 简单的说就是&#xff0c;对于一组数据&#xff0c;例如&#xff1a;int[] a {1,2,3,4,5,6}&#xff0c;…

selenium 处理网页上的弹窗

处理网页上的弹窗按钮&#xff0c;主要取决于弹窗的类型。在Web自动化测试中&#xff0c;常见的弹窗类型包括&#xff1a;JavaScript弹窗&#xff08;如alert、confirm和prompt弹窗&#xff09;和Web页面自定义弹窗&#xff08;通常是HTML元素实现的&#xff09;。以下是处理这…

超级数据查看器 教程pdf 1-31集 百度网盘

百度网盘链接 提取码1234https://pan.baidu.com/s/1s_2lbwZ2_Su83vDElv76ag?pwd1234 通过百度网盘分享的文件&#xff1a;超级数据查看器 … 链接:https://pan.baidu.com/s/1s_2lbwZ2_Su83vDElv76ag?pwd1234 提取码:1234 复制这段内容打开「百度网盘APP 即可获取」

Apple - Secure Coding Guide

本文翻译整理自&#xff1a;Secure Coding Guide https://developer.apple.com/library/archive/documentation/Security/Conceptual/SecureCodingGuide/Introduction.html#//apple_ref/doc/uid/TP40002477-SW1 文章目录 一、安全编码指南简介1、概览黑客和攻击者没有平台是免疫…

kafka 管理节点 Controller 角色分析

kafka 管理节点 Controller 角色分析 kafka controller 如何管理分区的创建、状态监测、故障切换、内容复制、如何管控分区副本的状态检测故障切换、数据同步、learder 选举?Kafka Controller 是 Kafka 集群中的一个关键组件,负责管理分区的创建、状态监测、故障切换、内容 …

java面试(企业场景)

设计模式 工厂方法模式 简单工厂模式 简单工厂包括以下角色&#xff1a; 抽象产品&#xff1a;定义了产品的规范&#xff0c;描述了产品的主要特性和功能具体产品&#xff1a;实现或者继承抽象产品的子类具体工厂&#xff1a;提供了创建产品的机会&#xff0c;调用者通过该…

工业用焦炉集气管压力控制状态远程预警方法

工业用焦炉集气管压力控制状态远程预警方法 一、项目提出前状况: 焦化厂焦炉集气管压力是炼焦生产过程中重要的工艺参数(其控制目标80~120Pa),焦炉集气管压力的稳定是焦炉正常生产的重要保证。集气管压力过高会造成焦炉炉体冒烟冒火,污染环境,对操作人员的人身安全构成…

Flink nc -l -p 监听端口测试

1、9999端口未占用 netstat -apn|grep 99992、消息发送端 nc -l -k -p 9999 {"user":"ming","url":"www.baidu1.com", "timestamp":1200L, "score":1} {"user":"xiaohu","url":…