让流媒体服务SRS支持P2P通信

news/2024/11/2 16:34:32/

#  srs支持p2p通信简介
流媒体服务srs是国内开发且开源的一款功能强大,性能强劲的优秀的流媒体服务器,目前正被越来越广泛的使用。srs一般被用户部署在公网上的云主机,这样方便用户进行推拉流等各种操作。但将srs部署在公网的云主机上也会带来一个问题,就是随着同时在线的推拉流用户比较多,会需要较多的带宽,消耗较多的带宽成本。

本文档用来阐述一种p2p解决方案,通过库快科技p2p技术可以将srs服务部署在您的局域网(比如工作局域网环境或者居家环境),此种方案不用修改srs的任何代码或者配置,就可以让srs支持p2p通信,为您极大的节约带宽成本。

#  系统架构

先上图如下所示:

               

如上图所示,我们用库快科技的sdk开发一个srs的p2p接入代理进程,srs仅和srs的p2p代理(srs p2p proxy)交互通信,由srs的p2p代理来和外部进行通信,srs和其代理可以部署在同一台主机上,或者同一个局域网机房内。

同时,外部的推拉流客户端或者app,也和p2p接入代理(client p2p proxy)直接进行通信,将推拉流请求发送给代理服务。由于client p2p proxy和srs p2p proxy均由支持p2p通信的sdk开发,所以在网络可以穿透情况下,可以直接进行p2p通信,流媒体数据不经过云端服务器中转,为您节约带宽成本;当代理之间网络无法穿透情况,会自动使用p2p的云端服务进行中转通信。根据统计在国内网络穿透率可以至少达到三分之二,所以这种部署可以为您减少三分之二的带宽成本。

库快科技( https://kkuai.com) 的p2p sdk提供类似于网络编程的sdk接口,极易掌握。client p2p proxy和srs p2p proxy的核心代码仅有100行左右,就可以支持强大的代理接入功能。您也可以将client p2p proxy的代码逻辑内置在您的业务程序代码里面(启动127.0.0.1 的接入代理,您的app直接访问127.0.0.1这个url)

#  流媒体服务srs p2p proxy源码

srs的p2p代理服务仅100行代码左右,这里给一个demo例子,一个连接启动一个线程,如果同时在线并发量比较高,读者可以自行改成epoll网络模型模式,以支持更多的在线用户。

该例子在Linux以及mac机器上编译验证通过。

#include <errno.h>
#include <stdio.h>
#include <signal.h>
#include <sys/syscall.h>
#include <sys/stat.h>
#include <sys/syscall.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <poll.h>
#include <string>
using namespace std;// 到kkuai.com获取最新的头文件和库
#include "kkp2p_sdk.h"char g_run_flag = 1;
kkp2p_engine_t* g_engine = NULL;// srs服务的rtmp的ip和端口
string g_srs_ip;
unsigned short g_srs_port;void set_exit_flag(int sig)
{g_run_flag = 0;
}int send_data(int fd, char* buff, int len)
{int sended = 0 ;while (sended < len){// 1秒超时int wl = kkp2p_write(fd, buff + sended, len - sended, 1000);if (wl < 0){printf("SendData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len, errno, strerror(errno));return -1;}sended += wl;}return len;
}void* process_client(void* arg)
{kkp2p_channel_t* channel = (kkp2p_channel_t*)arg;// 连接srs服务器struct sockaddr_in addr;memset(&addr, 0, sizeof(sockaddr_in));addr.sin_family = AF_INET;addr.sin_addr.s_addr = inet_addr(g_srs_ip.c_str());addr.sin_port = htons(g_srs_port);int namelen = sizeof(addr);int sockFd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);int ret = connect(sockFd, (struct sockaddr*)&addr, sizeof(sockaddr_in));if (ret < 0){printf("connect %s:%d error,thread exit.\n",g_srs_ip.c_str(), g_srs_port);close(sockFd);kkp2p_close_fd(channel->fd);kkp2p_close_channel(g_engine, channel->channel_id);free(channel);return NULL;} // 设置非阻塞模式int val = fcntl(sockFd, F_GETFL,0);fcntl(sockFd, F_SETFL, val|O_NONBLOCK);// 开始在srs和客户端之间中转数据struct pollfd waitFd[2];memset(waitFd,0,sizeof(waitFd));waitFd[0].fd = sockFd;waitFd[0].events = POLLIN;waitFd[1].fd = channel->fd;waitFd[1].events = POLLIN;printf("sockFd %d,channel fd:%d\n",sockFd,channel->fd);char szBuff[1024];int rl = 0 ;int wl = 0;int loop = 1;while(loop){int ret = poll(waitFd, 2, 1000);if (ret < 0){if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK){continue;}else{printf("poll error,errno:%d,desc:%s.\n",errno,strerror(errno));break;}}else if (ret == 0){continue;}for( int i =0 ; i<2; i++){int fd = waitFd[i].fd;if (waitFd[i].revents & POLLIN){rl = kkp2p_read(fd, szBuff, sizeof(szBuff), 0);      if (rl < 0){printf("kkp2p_read fd:%d error,errno:%d,desc:%s,sockFd:%d,channel fd:%d\n",fd,errno,strerror(errno),sockFd, channel->fd);loop = 0;break;}else if (rl == 0){continue;}else{int writeFd = 0;if (fd == sockFd) {writeFd = channel->fd;}else {writeFd = sockFd;}wl = send_data(writeFd, szBuff, rl);if (wl < 0){printf("kkp2p_write fd:%d error,errno:%d,desc:%s.\n",writeFd,errno,strerror(errno));loop = 0;break;}}}else if ((waitFd[i].revents & POLLHUP) || (waitFd[i].revents & POLLERR) || (waitFd[i].revents & POLLNVAL)) {printf("fd revents %d error,fd:%d,sockFd:%d,channel fd:%d.\n",waitFd[i].revents,waitFd[i].fd,sockFd,channel->fd);loop = 0 ;break;}}}close(sockFd);kkp2p_close_fd(channel->fd);printf("close channel,channelId:%u.\n",channel->channel_id);kkp2p_close_channel(g_engine, channel->channel_id);free(channel);return NULL;
}// 总共4个参数,p2p系统登录账号和密码,以及srs的ip和端口号
int main(int argc, char** argv)
{if (argc < 5){printf("usage:%s peer_id peer_key srs_ip srs_port\n", argv[0]);return -1;}// 利用usr1信号终止进程服务// kill -user1 pidstruct sigaction actions;memset(&actions, 0, sizeof(actions));sigemptyset(&actions.sa_mask);actions.sa_flags = 0;actions.sa_handler = set_exit_flag ;sigaction(SIGUSR1,&actions,NULL);kkp2p_engine_conf_t kkp2p_conf;// p2p云端服务的登录域名(ip)和端口号等信息// 根据实际部署情况填写,从kkuai.com下载云端服务自行部署kkp2p_conf.login_domain = (char*)"p2ptest.com";kkp2p_conf.login_port = 3080;kkp2p_conf.lan_search_port = 3549;kkp2p_conf.max_log_size = 1024*1024*10;kkp2p_conf.log_path = NULL;g_engine = kkp2p_engine_init(&kkp2p_conf, 5000);// 日志级别调整为debug级别kkp2p_switch_log_level(g_engine, 4);// 将peer加入到p2p网络kkp2p_join_lan(g_engine, argv[1]);kkp2p_join_net(g_engine, argv[1], argv[2]);g_srs_ip = argv[3];g_srs_port = atoi(argv[4]);kkp2p_channel_t channel ;while(g_run_flag){// 循环接收外部连接请求int ret = kkp2p_accept(g_engine, 1000, &channel);if (ret < 0){// errorprintf("kkp2p_accept error,exit\n");break;}else if (ret == 0){// timeoutcontinue;}else{pthread_t ThreadId;// 接收到一个远程连接,transmit_mode为1表示p2p通信,为2表示中转通信// connect_desc是双方约定的连接描述信息,可以用于表示协议编号// demo这里不作判断,统一默认是rtmp推拉流协议printf("accept new channel,fd:%d,mode:%d,conn_desc:%d\n",channel.fd, channel.transmit_mode,channel.connect_desc);kkp2p_channel_t* newChannel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t));memcpy(newChannel, &channel, sizeof(kkp2p_channel_t));// 启动线程处理pthread_create(&ThreadId, NULL, process_client,(void*)newChannel);}}kkp2p_engine_destroy(g_engine);return 0;
}

整个逻辑比较简单,就是循环accept连接,当有连接过来就启动一个线程处理,在srs和远程连接之间透传数据即可。读者可以改成epoll模型。

#  客户端p2p代理client p2p proxy源码

推拉流或者您的业务进程的p2p代理服务源码如下,原理也比较简单,就是启动一个ip和端口代理服务,外部服务推拉流请求先发给代理,由代理和远端的srs的p2p代理服务通信。该代理服务代码仅仅数行,非常方便您在业务程序源码里面直接使用。

该代码在windows平台下调试通过

#include <windows.h>
#include <process.h>
#include <iostream>
#include <stdio.h>
#include <stdint.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/timeb.h>
#include <signal.h>// 到kkuai.com获取最新的头文件和库
#include "kkp2p_sdk.h"#pragma comment(lib,"Ws2_32.lib")
#pragma comment(lib, "iphlpapi.lib")// 利用信号退出
char run_flag = 1;
void SignalHandler(int signal)
{printf("exit...\n");run_flag = 0;
}// 输入参数为代理ip和端口(组成推拉流的url),以及需要连接的流媒体srs的p2p账号
// connect_mode为连接模式,0为自动模式,1为p2p连接,2为中转连接
int main(int argc, char** argv) {if (argc < 5) {printf("usage:%s proxy_ip proxy_port peer_id connect_mode\n", argv[0]);return -1;}// 利用ctrl+z退出进程typedef void(*SignalHandlerPointer)(int);SignalHandlerPointer previousHandler;previousHandler = signal(SIGINT, SignalHandler);WSADATA wsadata;//注释2WSAStartup(MAKEWORD(2, 2), &wsadata);kkp2p_engine_conf_t kkp2p_conf;// p2p云端服务的登录域名(ip)和端口// 从kkuai.com下载云端服务自行部署kkp2p_conf.login_domain = (char*)"p2ptest.com";kkp2p_conf.login_port = 3080;kkp2p_conf.lan_search_port = 3549;kkp2p_conf.max_log_size = 1024 * 1024 * 10;kkp2p_conf.log_path = NULL;kkp2p_engine_t* g_engine = kkp2p_engine_init(&kkp2p_conf, 5000);kkp2p_switch_log_level(g_engine, 4);// 建连参数kkp2p_connect_ctx_t ctx;memset(&ctx, 0, sizeof(kkp2p_connect_ctx_t));memcpy(ctx.peer_id, argv[3], strlen(argv[3]));// 连接超时时间ctx.timeout = 5000;// 建连模式ctx.connect_mode = atoi(argv[4]);// 启动代理服务,该代理服务会接收推拉流服务请求,并和远端流媒体服务的p2p代理通信uint32_t proxyId = 0;int ret = kkp2p_start_proxy(g_engine, argv[1], atoi(argv[2]), &ctx, &proxyId);if (ret < 0) {printf("create proxy(%s:%d) to peer %s error.\n", argv[1], atoi(argv[2]), argv[3]);return -1;}else {printf("create proxy(%s:%d) to peer %s success.\n", argv[1], atoi(argv[2]), argv[3]);}while (run_flag) {Sleep(1000);}kkp2p_stop_proxy(g_engine, proxyId);kkp2p_engine_destroy(g_engine);return 0;
}

#  效果演示,srs服务端

为了方便演示,我们用同一个局域网的两台机器进行演示,一台mac机器,一台windows机器。mac机器运行srs和srs p2p proxy服务。

启动srs服务,srs缺省的rtmp服务端口号是1935

启动srs的p2p代理服务srs p2p proxy

如上图所示,srs p2p proxy输入参数为登录p2p服务系统的登录账号和登录密码,以及srs服务的rtmp的侦听ip和端口,因为部署在同一台机器上,所以这里ip地址为127.0.0.1

#  效果演示,推流端

首先启动推流端代理服务,在windows机器上启动。

推流端代理服务启动

启动一个127.0.0.1:32915的p2p代理服务,该代理服务和test-00097进行通信,p2p建连模式为0,优先创建p2p连接,p2p不通则自动转中转连接。

在同一台windows机器上用ffmpeg进行推流,推流地址为127.0.0.1:32915,启动命令如下

ffmpeg -re -i spartacus.mkv -c copy -f flv rtmp://127.0.0.1:32915/live/a

将视频spartacus.mkv推流到127.0.0.1:32915

 #  效果演示,拉流端

在同一台windows机器上执行,首先启动拉流代理,端口号为32916

然后再启动vlc播放器输入流媒体地址进行播放,流媒体地址为

最后可以看到流畅的音视频播放画面

 #  总结

库快科技专注于p2p通信领域,提供的p2p通信中间件易用性,适用性极强,接口也极易使用,可以助您开发各种p2p应用的程序和工具,官网上可以下载的云端程序和sdk库,以及供测试使用的p2p登录账号和密码,非常方便您的试用。


http://www.ppmy.cn/news/132250.html

相关文章

P2P流媒体

作 者&#xff1a;王洪波 马轶慧 1 P2P流媒体系统 1.1P2P流媒体系统播送方式 P2P流媒体系统按照其播送方式可分为直播系统和点播系统&#xff0c;此外近期还出现了一些既可以提供直播服务也可以提供点播服务的P2P流媒体系统。 1.1.1直播 在流媒体直播服务中&#xff0c;用户只…

基于P2P的流媒体技术概述

摘 要&#xff1a;P2P流媒体技术已成为网络应用中热门的技术之一,本文主要介绍流媒体,P2P的相关概念,并着重对P2P流媒体的关键技术进行了研究,最后又提出了P2P流媒体技术应该面对的挑战。 关键词&#xff1a;P2P ;流媒体;应用层组播;激励机制 1 引言      随着互联网的…

P2P流媒体开源项目介绍

P2P流媒体开源项目介绍 1. PeerCast 2002年成立&#xff0c;最早的开源P2P流媒体项目。PeerCast把节点按树结构组织起来&#xff0c; 每个频道都是一个树&#xff0c; 直播源是根节点&#xff0c;父节点只给子节点提供数据。节点离根节点越远&#xff0c;传输时延就越大&#x…

流媒体:浅谈传统媒体—流媒体—加P2P的流媒体的演变之路

强烈推荐一个大神的人工智能的教程&#xff1a;http://www.captainbed.net/zhanghan 【前言】 今天发现二哥在搞流媒体&#xff0c;顿时来了兴趣&#xff08;之前在考试维护的时候经常听老师说P2P等&#xff09;&#xff0c;追问之下之前林哥搞成功过&#xff0c;而且写了一系…

【P2P】【转载】P2P流媒体开源项目介绍

大神的整理 P2P流媒体开源项目介绍 前言&#xff1a; 最近在做一个网站&#xff0c;发现p2p流媒体技术对于解决高流量高带宽问题真的很不错。 据说现在一些视频和直播公司在研究p2pcdn&#xff0c;证明了p2p永不过时。 先记录先来&#xff0c;有时间慢慢研究 PeerCast 2002…

流媒体协议之WebRTC实现p2p视频通话(二)

阿里P7移动互联网架构师进阶视频&#xff08;每日更新中&#xff09;免费学习请点击&#xff1a;https://space.bilibili.com/474380680 简介 目的 帮助自己了解webrtc 实现端对端通信 # 使用流程git clone https://gitee.com/wjj0720/webrtc.gitcd ./webRTCnpm inpm run dev…

P2P流媒体直播点播(带宽节约95%以上)技术分享

作者&#xff1a;key zhou QQ&#xff1a;215420465 邮箱&#xff1a;215420465qq.com 欢迎交流&#xff0c;共同进步。 ------------------------------------------------------------------------------------------------------- 介绍 从2011年接触P2P技术至今&#x…

钰泰ETA9030蓝牙耳机充电仓双向通讯,带NTC

双向通讯充电仓方案ETA9030:带充放电NTC&#xff0c;*低功耗3uA,带I2C功能详细内容 ETA9030特征 1.输入耐压高达28V 2.充电1.**电流可调节 3.*低功耗3uA 4.左右耳双通道*立控制 5.可实现充电仓和耳机双向通讯 6.输出1.**电流 7.带I2C功能 8.QFN3*3-16封装 9.带充电放电NTC