前言
在mediasoup源码分析(三)channel创建及信令交互一文中介绍了mediasoup中channel的创建,接下来在本文中,会说明mediasoup c++层如何将处理后的数据返回到信令服务中。
在 Mediasoup 中,C++ 编写的媒体层(mediasoup-worker)与信令服务之间的通信通常是通过 JSON 格式的数据进行的。这种数据格式易于解析和生成,并且被广泛用于 Web 服务和客户端之间的通信。
一个有三个返回的接口
正常成功返回Accept
void Request::Accept(Json::Value& data){MS_TRACE();static Json::Value emptyData(Json::objectValue);static const Json::StaticString JsonStringId{ "id" };static const Json::StaticString JsonStringAccepted{ "accepted" };static const Json::StaticString JsonStringData{ "data" };MS_ASSERT(!this->replied, "Request already replied");this->replied = true;Json::Value json(Json::objectValue);json[JsonStringId] = Json::UInt{ this->id };json[JsonStringAccepted] = true;if (data.isObject() || data.isArray())json[JsonStringData] = data;elsejson[JsonStringData] = emptyData;this->channel->Send(json);}
数据格式eg:
- 无数据
createRouter返回
{"accepted":true,"data":{},"id":12610940}
- 有数据
createWebRtcTransport返回
{"accepted": true,"data": {"dtlsLocalParameters": {"fingerprints": [{"algorithm": "sha-1","value": "4F:6A:93:00:BF:8D:BD:C6:B5:18:64:65:35:83:7A:6F:2F:BF:FE:45"}, {"algorithm": "sha-224","value": "04:9D:E6:35:24:62:C6:1D:B3:34:63:DB:B0:28:8E:77:4D:DD:AD:6B:F7:7A:F9:3D:7D:44:F1:CC"}, {"algorithm": "sha-256","value": "15:8C:71:05:B6:F9:B8:CD:09:7A:BB:1A:D2:CC:64:53:D4:4B:8C:E3:1F:56:C3:2F:F5:C1:73:F8:06:86:5C:C2"}, {"algorithm": "sha-384","value": "FB:61:03:17:88:DE:A8:F7:B3:52:1F:7E:0B:4A:9D:0A:4F:E2:83:B2:F1:60:3E:B4:0F:42:48:AA:48:01:26:A5:82:57:82:33:33:77:9C:3F:4B:C0:C7:30:4F:D0:1C:E4"}, {"algorithm": "sha-512","value": "78:30:EA:FD:D9:2B:FB:9E:45:43:BE:FE:05:D2:33:DC:38:70:63:D7:0F:8E:DC:4F:C9:88:44:75:0C:31:38:70:64:E2:56:3E:78:71:0E:E8:CE:DA:28:97:05:0F:99:90:0A:E6:CE:B1:D0:33:F3:E6:B9:00:67:EC:19:BD:77:48"}],"role": "auto"},"dtlsState": "new","headerExtensionIds": {},"iceLocalCandidates": [{"family": "ipv4","foundation": "udpcandidate","ip": "0.0.0.0","port": 52000,"priority": 1078862079,"protocol": "udp","type": "host"}],"iceLocalParameters": {"iceLite": true,"password": "t6f2mqxz6d3cy46xe7cfr3ywruk9ftji","usernameFragment": "21256833097-1899696"},"iceRole": "controlled","iceState": "new","rtpListener": {"muxIdTable": {},"ridTable": {},"ssrcTable": {}},"transportId": 1899696},"id": 62863332
}
异常失败返回Reject
void Request::Reject(const char* reason){MS_TRACE();static const Json::StaticString JsonStringId{ "id" };static const Json::StaticString JsonStringRejected{ "rejected" };static const Json::StaticString JsonStringReason{ "reason" };MS_ASSERT(!this->replied, "Request already replied");this->replied = true;Json::Value json(Json::objectValue);json[JsonStringId] = Json::UInt{ this->id };json[JsonStringRejected] = true;if (reason != nullptr)json[JsonStringReason] = reason;this->channel->Send(json);}
数据格式eg
{"rejected":true,"reason":{},"id":12610945}
mediasoupEmit_122">mediasoup主动推送,Emit
void Notifier::Emit(uint32_t targetId, const std::string& event, Json::Value& data){MS_TRACE();static const Json::StaticString JsonStringTargetId{ "targetId" };static const Json::StaticString JsonStringEvent{ "event" };static const Json::StaticString JsonStringData{ "data" };Json::Value json(Json::objectValue);json[JsonStringTargetId] = Json::UInt{ targetId };json[JsonStringEvent] = event;json[JsonStringData] = data;this->channel->Send(json);}
数据格式eg
{"data":{"entries":[[87344059,-89],[42826186,-24]]},"event":"audiolevels","targetId":37286065}
mediasoup_148">mediasoup推送数据到信令服务的具体实现:
由上面三段代码可以看到,无论是Accept,还是Reject,亦或者是Emit,最终实际都调用的是UnixStreamSocket模块的send接口channel->Send接口
void UnixStreamSocket::Send(Json::Value& msg)
{if (IsClosed())return;// MS_TRACE_STD();std::ostringstream stream;std::string nsPayload;size_t nsPayloadLen;size_t nsNumLen;size_t nsLen;this->jsonWriter->write(msg, &stream);nsPayload = stream.str();nsPayloadLen = nsPayload.length();if (nsPayloadLen > MessageMaxSize){MS_ERROR_STD("mesage too big");return;}if (nsPayloadLen == 0){nsNumLen = 1;WriteBuffer[0] = '0';WriteBuffer[1] = ':';WriteBuffer[2] = ',';}else{nsNumLen = static_cast<size_t>(std::ceil(std::log10(static_cast<double>(nsPayloadLen) + 1)));std::sprintf(reinterpret_cast<char*>(WriteBuffer), "%zu:", nsPayloadLen);std::memcpy(WriteBuffer + nsNumLen + 1, nsPayload.c_str(), nsPayloadLen);WriteBuffer[nsNumLen + nsPayloadLen + 1] = ',';}nsLen = nsNumLen + nsPayloadLen + 2;Write(WriteBuffer, nsLen);
}
- this->jsonWriter:指的是当前 UnixStreamSocket 类实例的 jsonWriter 成员,这个成员是一个指向 JSON 写入器的指针。这个写入器负责将 Json::Value 对象转换成 JSON 格式的字符串。
- write(msg, &stream):是 jsonWriter 指针所指向对象的一个成员函数,它接受两个参数:
- msg:要序列化的 Json::Value 对象。
- &stream:一个引用,指向 std::ostringstream 类型的 stream 对象。这个对象是一个输出流,用于接收序列化后的 JSON 字符串。
当这个函数调用执行时,jsonWriter 会将 msg 对象中的数据转换成 JSON 格式,并将结果输出到 stream 中。这样,stream 就会包含序列化后的 JSON 字符串,这个字符串可以用于后续的网络传输或其他用途。
void UnixStreamSocket::Write(const uint8_t* data, size_t len)
{if (this->closed)return;if (len == 0)return;uv_buf_t buffer;int written;int err;// First try uv_try_write(). In case it can not directly send all the given data// then build a uv_req_t and use uv_write().buffer = uv_buf_init(reinterpret_cast<char*>(const_cast<uint8_t*>(data)), len);written = uv_try_write(reinterpret_cast<uv_stream_t*>(this->uvHandle), &buffer, 1);// All the data was written. Done.if (written == static_cast<int>(len)){return;}// Cannot write any data at first time. Use uv_write().else if (written == UV_EAGAIN || written == UV_ENOSYS){// Set written to 0 so pendingLen can be properly calculated.written = 0;}// Error. Should not happen.else if (written < 0){MS_ERROR_STD("uv_try_write() failed, closing the socket: %s", uv_strerror(written));Close();// Notify the subclass.UserOnUnixStreamSocketClosed(this->isClosedByPeer);return;}size_t pendingLen = len - written;// Allocate a special UvWriteData struct pointer.auto* writeData = static_cast<UvWriteData*>(std::malloc(sizeof(UvWriteData) + pendingLen));std::memcpy(writeData->store, data + written, pendingLen);writeData->req.data = (void*)writeData;buffer = uv_buf_init(reinterpret_cast<char*>(writeData->store), pendingLen);err = uv_write(&writeData->req,reinterpret_cast<uv_stream_t*>(this->uvHandle),&buffer,1,static_cast<uv_write_cb>(onWrite));if (err != 0)MS_ABORT("uv_write() failed: %s", uv_strerror(err));
}
这段代码使用了libuv库,这是一个跨平台的异步I/O库,用于网络编程。先uv_buf_init初始化,随后使用了uv_try_write和uv_write函数来尝试写入数据,如果一次性写入不成功,则使用异步写入的方式继续写入剩余的数据。此外,代码中还包含了错误处理和内存分配的逻辑。
tips:
关于libuv的介绍,可以看我写的网络编程-libuv介绍一文。