# KKP2P SDK介绍
kkp2p sdk是库快科技(官网试用下载)研发的支持p2p通信的中间件,是一套与业务无关的通用的p2p sdk库, 可以免费下载试用版本使用。
一句话概括其特点:支持面向登录账号进行网络编程。
即只要传入对端的账号id,应用层就能得到一个socket句柄fd,应用层通过读写该socket fd句柄和对端进行通信。
优秀特性 | 说明 |
跨平台 | kkp2p的sdk库是由c语言开发,在linux、windows、android、ios等平台编译出了静态库,以及其他一些嵌入式平台也编译出了静态库,大家可以直接下载进行使用。云端服务是由golang语言开发,也支持在各种平台下编译出直接可以运行的程序,配置也比较简单,大家下载之后就可以按照官网文档说明自行进行部署。 |
体积小 | kkp2p不依赖于任何第三方库,编译出来的库只有500KB左右大小。 |
性能强 | 在服务器上测试,P2P方式通信的速度可以超过10MB每秒;中转方式通信的速度取决于您云端服务器的带宽 |
易使用 | 提供了类似于socket编程接口的kkp2p_connect、kkp2p_listen、kkp2p_accept、kkp2p_read、kkp2p_write几个核心函数,使用起来非常简单方便。您只要指定对端的登录账号通过kkp2p_connect函数就能和对端创建一个虚拟的传输管道,然后通过kkp2p_read和kkp2p_write函数来读写数据和对端进行通信。您还可以通过参数指定是使用P2P方式通信还是使用中转(relay)方式通信,完全不用关心底层传输通道的创建和管理细节,一切由kkp2p的sdk库帮您解决。 |
高安全 | 支持加密通道传输,您只需要在kkp2p_connect的参数中指定需要加密数据即可。sdk会自动创建一个加密的虚拟通信管道出来;您写入明文,sdk会自动加密成密文传输;sdk收到密文,会自动解密成明文返回给您。通信双方的共同密钥是双方的sdk通过DH算法自动协商而成,外界无法获取;并且每次sdk的启动都会自动协商生成一个新的动态密钥,严格保障您的通信数据的安全。如果您为了提升数据传输的性能,不想对数据进行加解密,只需要在创建连接的函数kkp2p_connect参数中指定不需要加密数据即可。sdk的数据加解密功能只有在商业版本中才有,在个人试用版本中没有该功能。 |
通用性 | kkp2p是一套适用于各种场景的通用的通信中间件,会完全透传用户的数据,您可以灵活的自定义通信双方的协议,kkp2p不会解析您的业务数据。kkp2p的P2P通信是基于udp实现的,kkp2p会自动帮您解决丢包、乱序、重传问题,也会根据您的实际带宽做自适应的带宽流控,您使用起来具有tcp传输的效果,相当于是用udp模拟实现了tcp。中转(relay)通信是基于tcp原生实现的。 |
# 源码介绍
本例子用于举例说明,如何基于p2p使用rtmp协议。众所周知,rtmp传输协议广泛用于直播点播系统中,主要用于传输音视频数据。
本例子分两个程序配合演示,一个作为rtmp server的接入代理(peer client);一个作为服务端(rtmp server)。最后演示时候用ffmpeg推流,用vlc播放器播放。测试流程描述如下:
首先我们看peer client代码,该代码在windows平台下用visual studio编译测试通过。peer client利用接口kkp2p_start_proxy启动一个本地代理和rtmp server进行通信。源码讲解如下:
#include <windows.h>
#include <process.h>
#include <iostream>
#include <stdio.h>
#include <stdint.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/timeb.h>
#include <signal.h>// 去kkuai.com获取最新的头文件和库
#include "kkp2p_sdk.h"#pragma comment(lib,"Ws2_32.lib")
#pragma comment(lib, "iphlpapi.lib")// CTRL+c退出程序
char run_flag = 1;
void SignalHandler(int signal)
{printf("exit...\n");run_flag = 0;
}// 总共5个参数,为代理ip、port、对端通信的peer id,代理和peer id建联模式
int main(int argc, char** argv) {if (argc < 5) {printf("usage:%s proxy_ip proxy_port peer_id connect_mode\n", argv[0]);return -1;}typedef void(*SignalHandlerPointer)(int);SignalHandlerPointer previousHandler;previousHandler = signal(SIGINT, SignalHandler);WSADATA wsadata;//注释2WSAStartup(MAKEWORD(2, 2), &wsadata);// 设置p2p服务端的登录域名和端口kkp2p_engine_conf_t kkp2p_conf;kkp2p_conf.login_domain = (char*)"124.71.217.198";kkp2p_conf.login_port = 3080;kkp2p_conf.lan_search_port = 3549;kkp2p_conf.max_log_size = 1024 * 1024 * 10;kkp2p_conf.log_path = NULL;kkp2p_engine_t* g_engine = kkp2p_engine_init(&kkp2p_conf, 5000);kkp2p_switch_log_level(g_engine, 4);kkp2p_connect_ctx_t ctx;memset(&ctx, 0, sizeof(kkp2p_connect_ctx_t));strncpy(ctx.peer_id, argv[3], 32);ctx.timeout = 5000;// 和peer id建连模式,0为自动模式,1为仅p2p模式,2为仅relay模式ctx.connect_mode = atoi(argv[4]);uint32_t proxyId = 0;int ret = kkp2p_start_proxy(g_engine, argv[1], atoi(argv[2]), &ctx, &proxyId);if (ret < 0) {printf("create proxy(%s:%d) to peer %s error.\n", argv[1], atoi(argv[2]), argv[3]);return -1;}else {printf("create proxy(%s:%d) to peer %s success.\n", argv[1], atoi(argv[2]), argv[3]);}while (run_flag) {Sleep(1000);}kkp2p_stop_proxy(g_engine, proxyId);kkp2p_engine_destroy(g_engine);return 0;
}
peer server(rtmp server)端模式流程简介如下,主线程不断调用kkp2p_accept接入新的连接,如果有新的连接过来,会得到一个句柄fd,然后在该fd句柄上处理rtmp协议即可。服务端代码来自于互联网,在原有代码上做了p2p的使用适配,我们放在最后讲解,该服务端代码在linux平台行测试通过。由于仅是演示作用,有些细节考虑不完善,仅做参考使用。
我们现在看测试过程,首先再windows下启动peer client,即rtmp proxy代理, 地址为127.0.0.1:32915
然后再linux平台下启动peer server,即rtmp server模块
接着再windows下启动vlc播放器
然后再windows下平台启动ffmpeg进行推流
最后可以看到vlc的播放画面
从上面例子可以看到,我们可以基于库快科技的p2p库,用p2p( 基于udp协议)传输方式,来传输rtmp协议数据,这样再一对一音视频传输场景下,会为您节约大量的带宽成本。
最后我们看peer server端(rtmp服务端)源码,该源码大部分都是rtmp协议相关,主要来自于互联网,仅做了p2p适配处理,我们可以看到库快科技的p2p库是极易使用的。
编译该源码除了链接库快科技的p2p库之外,还需要链接rtmp的库
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/time.h>
#include <fcntl.h>#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <string>
#include <vector>
#include <list>
#include <set>
#include <map>#include <librtmp/rtmp.h>
#include <librtmp/log.h>#include "kkp2p_sdk.h"kkp2p_engine_t* g_engine = NULL;class CMutex
{pthread_mutex_t m_mutex;public:CMutex(){pthread_mutex_init(&m_mutex, NULL);}~CMutex(){pthread_mutex_destroy(&m_mutex);}void lock(){pthread_mutex_lock(&m_mutex);}void unlock(){pthread_mutex_unlock(&m_mutex);}
};template<typename T>
class CAutoLock
{T* m_pLock;public:CAutoLock(T* pLock) : m_pLock(pLock) {m_pLock->lock();}~CAutoLock() { m_pLock->unlock();}
};class CConnection
{uint32_t m_uConnID;uint32_t m_uNextStreamID;RTMP* m_pRtmp;std::string m_strApp;int m_nStreamType;std::set<uint32_t> m_setUsingStreamID;std::map<uint32_t, std::string> m_mapPublishStreamIDPlayPath;std::map<uint32_t, std::string> m_mapPlayStreamIDPlayPath;std::list<RTMPPacket*> m_listpPacket;CMutex m_mutex;sem_t m_sem;public:// 流类型enum EStreamType{Unkown = 0,Publish,Play};CConnection(uint32_t uConnID, int nSocket): m_uConnID(uConnID), m_uNextStreamID(1), m_pRtmp(NULL), m_strApp(""), m_nStreamType(Unkown){m_pRtmp = RTMP_Alloc();RTMP_Init(m_pRtmp);m_pRtmp->m_sb.sb_socket = nSocket;sem_init(&m_sem, 0, 0);}virtual ~CConnection(){RTMP_Close(m_pRtmp);RTMP_Free(m_pRtmp);sem_destroy(&m_sem);}uint32_t ConnID(){return m_uConnID;}RTMP* Rtmp(){return m_pRtmp;}int Socket(){return m_pRtmp->m_sb.sb_socket;}void setAppName(const std::string& strApp){m_strApp = strApp;}const std::string& getAppName() const{return m_strApp;}void setStreamType(EStreamType emType){m_nStreamType = emType;}EStreamType getStreamType(){return (EStreamType)m_nStreamType;}uint32_t genStreamID(){uint32_t uStreamID = m_uNextStreamID++;m_setUsingStreamID.insert(uStreamID);return uStreamID;}// 检查流ID是否合法bool isValidStreamID(uint32_t uStreamID){return (m_setUsingStreamID.find(uStreamID) != m_setUsingStreamID.end());}// 登记 推流ID与playpath 映射关系void bindPublishPlayPath(uint32_t uStreamID, const std::string& strPlayPath){CAutoLock<CMutex> lock(&m_mutex);m_setUsingStreamID.erase(uStreamID);m_mapPublishStreamIDPlayPath[uStreamID] = strPlayPath;}// 取消登记 推流ID与playpath 映射关系void unbindPublishPlayPath(uint32_t uStreamID){CAutoLock<CMutex> lock(&m_mutex);m_mapPublishStreamIDPlayPath.erase(uStreamID);}// 获取 推流ID映射关系std::string getPublishPlayPath(uint32_t uStreamID){CAutoLock<CMutex> lock(&m_mutex);auto iter = m_mapPublishStreamIDPlayPath.find(uStreamID);if (iter == m_mapPublishStreamIDPlayPath.end())return "";return iter->second;}// 断连时获取 推流的playpath列表const void getPublishPlayPaths(std::vector<std::string>& vecPlayPath){CAutoLock<CMutex> lock(&m_mutex);for (auto iter = m_mapPublishStreamIDPlayPath.begin(); iter != m_mapPublishStreamIDPlayPath.end(); ++iter){vecPlayPath.push_back(iter->second);}}// 断连时清除 推流ID与playpath 映射关系void cleanPublishPlayPath(){CAutoLock<CMutex> lock(&m_mutex);m_mapPublishStreamIDPlayPath.clear();}// 登记 拉流ID与playpath 映射关系void bindPlayPlayPath(uint32_t uStreamID, const std::string& strPlayPath){CAutoLock<CMutex> lock(&m_mutex);m_setUsingStreamID.erase(uStreamID);m_mapPlayStreamIDPlayPath[uStreamID] = strPlayPath;}// 取消登记 拉流ID与playpath 映射关系void unbindPlayPlayPath(uint32_t uStreamID){CAutoLock<CMutex> lock(&m_mutex);m_mapPlayStreamIDPlayPath.erase(uStreamID);}// 获取 拉流ID映射关系std::string getPlayPlayPath(uint32_t uStreamID){CAutoLock<CMutex> lock(&m_mutex);auto iter = m_mapPlayStreamIDPlayPath.find(uStreamID);if (iter == m_mapPlayStreamIDPlayPath.end())return "";return iter->second;}// 断连时获取 拉流的playpath列表const void getPlayPlayPaths(std::vector<std::string>& vecPlayPath){CAutoLock<CMutex> lock(&m_mutex);for (auto iter = m_mapPlayStreamIDPlayPath.begin(); iter != m_mapPlayStreamIDPlayPath.end(); ++iter){vecPlayPath.push_back(iter->second);}}// 断连时清除 拉流ID与playpath 映射关系void cleanPlayPlayPath(){CAutoLock<CMutex> lock(&m_mutex);m_mapPlayStreamIDPlayPath.clear();}// 通知指定的playpath即将重置void tellResetPlayPath(const std::string& strPlayPath){CAutoLock<CMutex> lock(&m_mutex);uint32_t uStreamID = 0;for (auto iter = m_mapPublishStreamIDPlayPath.begin(); iter != m_mapPublishStreamIDPlayPath.end(); ++iter){if (iter->second == strPlayPath){uStreamID = iter->first;m_mapPublishStreamIDPlayPath.erase(iter);break;}}if (uStreamID == 0){for (auto iter = m_mapPlayStreamIDPlayPath.begin(); iter != m_mapPlayStreamIDPlayPath.end(); ++iter){if (iter->second == strPlayPath){uStreamID = iter->first;m_mapPlayStreamIDPlayPath.erase(iter);break;}}}}// 提取待发送的报文RTMPPacket* popPacket(){struct timeval tv;gettimeofday(&tv, NULL);double ftime = tv.tv_sec + (tv.tv_usec + 500000) / (double)1000000;struct timespec ts = { (long)ftime, (long)((ftime - (int)ftime) * 1000000000) };sem_timedwait(&m_sem, &ts);CAutoLock<CMutex> lock(&m_mutex);if (m_listpPacket.empty())return NULL;RTMPPacket* pPacket = m_listpPacket.front();m_listpPacket.pop_front();return pPacket;}// 向连接拷贝多个报文void copyPackets(const std::string& strPlayPath, const std::vector<RTMPPacket*>& vecpPacket){CAutoLock<CMutex> lock(&m_mutex);for (auto iter = vecpPacket.begin(); iter != vecpPacket.end(); ++iter){m_listpPacket.push_back(*iter);sem_post(&m_sem);}}
};// 客户端连接管理 =>class CConnections
{uint32_t m_uNextConnID;std::map<uint32_t, CConnection*> m_mapConnection;CMutex m_mutex;public:CConnections(): m_uNextConnID(1){}virtual ~CConnections() {}CConnection* createConnection(int nSocket){CAutoLock<CMutex> lock(&m_mutex);CConnection* pConnection = new CConnection(m_uNextConnID++, nSocket);m_mapConnection[pConnection->ConnID()] = pConnection;return pConnection;}void releaseConnection(uint32_t uConnID){CAutoLock<CMutex> lock(&m_mutex);CConnection* pConn = __getConnection(uConnID);if (pConn){m_mapConnection.erase(uConnID);delete pConn;}}CConnection* getConnection(uint32_t uConnID){CAutoLock<CMutex> lock(&m_mutex);return __getConnection(uConnID);}private:CConnection* __getConnection(uint32_t uConnID){auto iter = m_mapConnection.find(uConnID);if (iter == m_mapConnection.end())return NULL;return iter->second;}
};CConnections g_Conns;// 节目容器 =>class CPlayPath
{std::string m_strPlayPath;bool m_bEOF;uint32_t m_uPublishConnID;std::set<uint32_t> m_setPlayConnID;CMutex m_mutex;public:CPlayPath(const std::string& strPlayPath): m_strPlayPath(strPlayPath), m_uPublishConnID(0), m_bEOF(true){}virtual ~CPlayPath() {}const std::string& getName() const{return m_strPlayPath;}// 设置/获取 结束标志void setEOF(){m_bEOF = true;}bool isEOF(){return m_bEOF;}// 重置节目对象void reset(bool bCleanPlayer = false){// 清除结束标志m_bEOF = false;uint32_t uPublishConnID = 0;std::set<uint32_t> setPlayConnID;// 清除推流和拉流连接{CAutoLock<CMutex> lock(&m_mutex);uPublishConnID = m_uPublishConnID;m_uPublishConnID = 0;if (bCleanPlayer){m_setPlayConnID.swap(setPlayConnID);}}// 通知推流连接做清除处理if (uPublishConnID > 0){CConnection* pConn = g_Conns.getConnection(uPublishConnID);if (pConn){pConn->tellResetPlayPath(m_strPlayPath);}}// 通知拉流连接做清除处理for (auto iter = setPlayConnID.begin(); iter != setPlayConnID.end(); ++iter){CConnection* pConn = g_Conns.getConnection( (*iter) );if (pConn){pConn->tellResetPlayPath(m_strPlayPath);}}}// 登记推流连接void setPublishConn(uint32_t uConnID){CAutoLock<CMutex> lock(&m_mutex);m_uPublishConnID = uConnID;}// 取消登记推流连接bool unsetPublishConn(){CAutoLock<CMutex> lock(&m_mutex);m_uPublishConnID = 0;}// 登记拉流连接bool addPlayConn(uint32_t uConnID){CAutoLock<CMutex> lock(&m_mutex);auto iter = m_setPlayConnID.find(uConnID);if (iter != m_setPlayConnID.end())return false;m_setPlayConnID.insert(uConnID);return true;}// 取消登记拉流连接bool delPlayConn(uint32_t uConnID){CAutoLock<CMutex> lock(&m_mutex);auto iter = m_setPlayConnID.find(uConnID);if (iter == m_setPlayConnID.end())return false;m_setPlayConnID.erase(uConnID);return true;}// 暂存媒体报文void cacheMediaPacket(RTMPPacket* pPacket){std::set<uint32_t> setPlayConnID;{CAutoLock<CMutex> lock(&m_mutex);setPlayConnID = m_setPlayConnID;}// 简单起见,直接拷贝到拉流连接for (auto iter = setPlayConnID.begin(); iter != setPlayConnID.end(); ++iter){CConnection* pConn = g_Conns.getConnection( (*iter) );if (pConn == NULL)continue;std::vector<RTMPPacket*> vecpPacket;RTMPPacket* pPacketCP = new RTMPPacket;RTMPPacket_Reset(pPacketCP);memcpy(pPacketCP, pPacket, sizeof(RTMPPacket));RTMPPacket_Alloc(pPacketCP, pPacket->m_nBodySize);memcpy(pPacketCP->m_body, pPacket->m_body, pPacket->m_nBodySize);pPacketCP->m_headerType = RTMP_PACKET_SIZE_MEDIUM;vecpPacket.push_back(pPacketCP);pConn->copyPackets(m_strPlayPath, vecpPacket);}}
};// 应用容器 =>
class CApp
{std::string m_strApp;std::map<std::string, CPlayPath*> m_mappPlayPath;CMutex m_mutex;public:CApp(const std::string& strApp) : m_strApp(strApp) {}virtual ~CApp() {}const std::string& getName() const{return m_strApp;}CPlayPath* getPlayPath(const std::string& strPlayPath, bool bCreate = true){CAutoLock<CMutex> lock(&m_mutex);auto iter = m_mappPlayPath.find(strPlayPath);if (iter != m_mappPlayPath.end())return iter->second;if (bCreate){CPlayPath* pPlayPath = new CPlayPath(strPlayPath);m_mappPlayPath[strPlayPath] = pPlayPath;return pPlayPath;}return NULL;}
};// 应用集合管理 =>class CApps
{std::map<std::string, CApp*> m_mapApp;CMutex m_mutex;public:CApps() {}virtual ~CApps() {}CApp* getApp(const std::string& strApp, bool bCreate = true){CAutoLock<CMutex> lock(&m_mutex);auto iter = m_mapApp.find(strApp);if (iter != m_mapApp.end())return iter->second;if (bCreate){CApp* pApp = new CApp(strApp);m_mapApp[strApp] = pApp;return pApp; }return NULL;}
};CApps g_Apps;// 程序逻辑 =>void* ClientThread(void* _lp);
bool MyHandshake(int nSocket);
bool Dispatch(CConnection* pConn, RTMPPacket* pPacket);
int HandleInvoke(CConnection* pConn, RTMPPacket* pPacket);
int HandleMediaPacket(CConnection* pConn, RTMPPacket* pPacket);
bool sendWindowAckSize(CConnection* pConn);
bool sendPeerOutputBandWide(CConnection* pConn);
bool sendOutputChunkSize(CConnection* pConn);
bool sendConnectResult(CConnection* pConn, int nOperateID);
bool sendCreateStreamResult(CConnection* pConn, int nOperateID, uint32_t nStreamID);
bool sendPublishStatus(CConnection* pConn, int nInputStreamID);
bool sendPublishError(CConnection* pConn, int nInputStreamID);
bool sendPlayStreamBegin(CConnection* pConn, int nInputStreamID);
bool sendPlayStatus(CConnection* pConn, int nInputStreamID);int main(int argc, char* argv[])
{if (argc < 3) {printf("usage:%s peer_id peer_key\n", argv[0]);return -1;}RTMP_LogSetLevel(RTMP_LOGDEBUG);kkp2p_engine_conf_t kkp2p_conf;kkp2p_conf.login_domain = "124.71.217.198";kkp2p_conf.login_port = 3080;kkp2p_conf.lan_search_port = 3549;kkp2p_conf.max_log_size = 1024*1024*10;kkp2p_conf.log_path = NULL;g_engine = kkp2p_engine_init(&kkp2p_conf, 5000);kkp2p_switch_log_level(g_engine, 4);kkp2p_join_lan(g_engine, argv[1]);kkp2p_join_net(g_engine, argv[1], argv[2]);kkp2p_channel_t* channel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t));while(1) {int ret = kkp2p_accept(g_engine, 1000, channel);if (ret < 0) {// errorprintf("kkp2p_accept error,exit\n");free(channel);break;} else if (ret == 0) {// timeoutcontinue;} else {// successpthread_t ThreadId;printf("accept new connection,fd:%d, mode is %d,channel id:%u.\n",channel->fd, channel->transmit_mode, channel->channel_id);pthread_create(&ThreadId, NULL, ClientThread,(void*)channel);channel = (kkp2p_channel_t*)calloc(1, sizeof(kkp2p_channel_t));}}return 0;
}void* ClientThread(void* param)
{pthread_detach(pthread_self());kkp2p_channel_t* channel = (kkp2p_channel_t*)param;// use default blockint val = fcntl(channel->fd, F_GETFL, 0);fcntl(channel->fd, F_SETFL, val & (~O_NONBLOCK));g_Conns.createConnection(channel->fd);CConnection* pConn = g_Conns.createConnection(channel->fd);printf("connection:[%d] coming... \n", pConn->ConnID());// 握手bool b = MyHandshake(pConn->Socket());if (!b){printf("connection:[%d] handshake failed! \n", pConn->ConnID());g_Conns.releaseConnection(pConn->ConnID());return NULL;}while (true){RTMPPacket packet;packet.m_body = NULL;packet.m_chunk = NULL;RTMPPacket_Reset(&packet);// 读取报文if (!RTMP_ReadPacket(pConn->Rtmp(), &packet)){printf("connection:[%d] read error! \n", pConn->ConnID());break;}if (!RTMPPacket_IsReady(&packet))continue;//printf("connection:[%d] read headerType:[%d] packetType:[%d] CSID:[%d] StreamID:[%d] hasAbsTimestamp:[%d] nTimeStamp:[%d] m_nBodySize:[%d] \n",// pConn->ConnID(), packet.m_headerType, packet.m_packetType, packet.m_nChannel, packet.m_nInfoField2, packet.m_hasAbsTimestamp, packet.m_nTimeStamp, packet.m_nBodySize);// 报文分派交互bool b = Dispatch(pConn, &packet);RTMPPacket_Free(&packet);if (!b){printf("connection:[%d] Dispatch failed! \n", pConn->ConnID());break;}if (pConn->getStreamType() == CConnection::Play){printf("connection:[%d] now play... \n", pConn->ConnID());break;}}// 进入拉流状态struct timeval tv;gettimeofday(&tv, NULL);double fLastReadTime = tv.tv_sec + tv.tv_usec / (double)1000000;while (pConn->getStreamType() == CConnection::Play){RTMPPacket* pPacket = pConn->popPacket();struct timeval tvNow;gettimeofday(&tvNow, NULL);double fNowReadTime = tvNow.tv_sec + tvNow.tv_usec / (double)1000000;// 超时检查if (pPacket == NULL){if (fNowReadTime - fLastReadTime < 30)continue;printf("connection:[%d] too time no packet \n", pConn->ConnID());break;}fLastReadTime = fNowReadTime;// 下发媒体报文bool b = RTMP_SendPacket(pConn->Rtmp(), pPacket, FALSE);RTMPPacket_Free(pPacket);delete pPacket;if (!b){printf("connection:[%d] send failed! \n", pConn->ConnID());break;}}// 连接退出前关系解除switch (pConn->getStreamType()){case CConnection::Publish:{std::vector<std::string> vecPlayPath;pConn->getPublishPlayPaths(vecPlayPath);for (auto iter = vecPlayPath.begin(); iter != vecPlayPath.end(); ++iter){CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath((*iter), false);if (pPlayPath){pPlayPath->setEOF();pPlayPath->unsetPublishConn();}}pConn->cleanPublishPlayPath();}break;case CConnection::Play:{std::vector<std::string> vecPlayPath;pConn->getPlayPlayPaths(vecPlayPath);for (auto iter = vecPlayPath.begin(); iter != vecPlayPath.end(); ++iter){CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath((*iter), false);if (pPlayPath){pPlayPath->delPlayConn(pConn->ConnID());}}pConn->cleanPlayPlayPath();}break;}printf("connection:[%d] exit! \n", pConn->ConnID());g_Conns.releaseConnection(pConn->ConnID());kkp2p_close_fd(channel->fd);kkp2p_close_channel(g_engine, channel->channel_id);free(channel);return NULL;
}int send_data(int fd, char* buff, int len) {int sended = 0 ;while (sended < len) {int wl = send(fd, buff + sended, len - sended, 0);if (wl < 0) {printf("SendData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len, errno, strerror(errno));return -1;}sended += wl;}return len;
}int recv_data(int fd, char* buff, int len) {int recved = 0 ;while (recved < len) {int wl = recv(fd, buff + recved, len - recved, 0);if (wl < 0) {printf("RecvData error,fd:%d,ret:%d,len:%d,errno:%d,desc:%s.\n",fd,wl, len,errno, strerror(errno));return -1;}recved += wl;}return len;
}// 握手操作
#define RTMP_SIG_SIZE 1536
bool MyHandshake(int nSocket)
{char type = 0;if (recv_data(nSocket, (char*)&type, 1) != 1) {return false;}if (type != 3) {return false;}char sClientSIG[RTMP_SIG_SIZE] = {0};if (recv_data(nSocket, sClientSIG, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {return false;}if (send_data(nSocket, sClientSIG, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {return false;}char sServerSIG[1 + RTMP_SIG_SIZE] = {0};sServerSIG[0] = 3;if (send_data(nSocket, sServerSIG, 1 + RTMP_SIG_SIZE) != 1 + RTMP_SIG_SIZE) {return false;}if (recv_data(nSocket, sServerSIG + 1, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) {return false;}return true;
}// 报文分派交互处理
bool Dispatch(CConnection* pConn, RTMPPacket* pPacket)
{switch (pPacket->m_packetType){case 0x01:{if (pPacket->m_nBodySize >= 4){pConn->Rtmp()->m_inChunkSize = AMF_DecodeInt32(pPacket->m_body);printf("connection:[%d] received: chunk size change to %d \n", pConn->ConnID(), pConn->Rtmp()->m_inChunkSize);}}break;case 0x04:{}break;case 0x05:{if (pPacket->m_nBodySize >= 4){int nWindowAckSize = AMF_DecodeInt32(pPacket->m_body);printf("connection:[%d] received: window ack size change to %d \n", pConn->ConnID(), nWindowAckSize);}}break;case 0x06:{if (pPacket->m_nBodySize >= 4){int nOutputBW = AMF_DecodeInt32(pPacket->m_body);printf("connection:[%d] received: output bw change to %d \n", pConn->ConnID(), nOutputBW);}if (pPacket->m_nBodySize >= 5){int nOutputBW2 = pPacket->m_body[4];printf("connection:[%d] received: output bw2 change to %d \n", pConn->ConnID(), nOutputBW2);}}break;case 0x08:{HandleMediaPacket(pConn, pPacket);}break;case 0x09:{HandleMediaPacket(pConn, pPacket);}break;case 0x12:{}break;case 0x14:{if (HandleInvoke(pConn, pPacket) < 0)return false;}break;}return true;
}#define SAVC(x) static const AVal av_##x = AVC(#x)
SAVC(connect);
SAVC(_result);
SAVC(releaseStream);
SAVC(FCPublish);
SAVC(createStream);
SAVC(publish);
SAVC(onStatus);
SAVC(FCUnpublish);
SAVC(deleteStream);
SAVC(play);AVal makeAVal(const char* pStr)
{return {(char*)pStr, (int)strlen(pStr)};
}// 处理远程调用
int HandleInvoke(CConnection* pConn, RTMPPacket* pPacket)
{if (pPacket->m_body[0] != 0x02){printf("connection:[%d] invalid invoke! \n", pConn->ConnID());return -1;}uint32_t nInputStreamID = pPacket->m_nInfoField2;AMFObject obj;int nSize = AMF_Decode(&obj, pPacket->m_body, pPacket->m_nBodySize, FALSE);if (nSize < 0){printf("connection:[%d] invalid packet! \n", pConn->ConnID());return -1;}AVal method;AMFProp_GetString(AMF_GetProp(&obj, NULL, 0), &method);int nOperateID = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 1));printf("connection:[%d] server invoking <%s> %d \n", pConn->ConnID(), method.av_val, nOperateID);if (AVMATCH(&method, &av_connect)){AMFObject obj1;AMFProp_GetObject(AMF_GetProp(&obj, NULL, 2), &obj1);AVal appName = makeAVal("app");AVal app;AMFProp_GetString(AMF_GetProp(&obj1, &appName, -1), &app);std::string strApp(app.av_val, app.av_len);printf("connection:[%d] connect, app:[%s] \n", pConn->ConnID(), strApp.c_str());pConn->setAppName(strApp);if (!sendWindowAckSize(pConn))return -1;if (!sendPeerOutputBandWide(pConn))return -1;if (!sendOutputChunkSize(pConn))return -1;if (!sendConnectResult(pConn, nOperateID))return -1;}else if (AVMATCH(&method, &av_releaseStream)){AVal playpath;AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);std::string strPlayPath(playpath.av_val, playpath.av_len);printf("connection:[%d] releaseStream, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());// 检查该节目是否推流结束CPlayPath* pPlayPath = g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true);if (!pPlayPath->isEOF()){if (!sendPublishError(pConn, nInputStreamID))return -1;return 0;}// 重置节目pPlayPath->reset(false);}else if (AVMATCH(&method, &av_FCPublish)){AVal playpath;AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);std::string strPlayPath(playpath.av_val, playpath.av_len);printf("connection:[%d] FCPublish, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());// 安全起见,初使化节目g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true);}else if (AVMATCH(&method, &av_createStream)){// 生成流IDuint32_t uStreamID = pConn->genStreamID();printf("connection:[%d] createStream, streamID:[%d] \n", pConn->ConnID(), uStreamID);if (!sendCreateStreamResult(pConn, nOperateID, uStreamID))return -1;}else if (AVMATCH(&method, &av_publish)){AVal playpath;AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);std::string strPlayPath(playpath.av_val, playpath.av_len);printf("connection:[%d] publish, streamID:[%d] playpath:[%s] \n", pConn->ConnID(), nInputStreamID, strPlayPath.c_str());// 检查streamID的有效性if (!pConn->isValidStreamID(nInputStreamID)){printf("connection:[%d] publish, streamID:[%d] invalid! \n", pConn->ConnID(), nInputStreamID);return -1;}// 连接与节目 建立双向关联pConn->setStreamType(CConnection::Publish);pConn->bindPublishPlayPath(nInputStreamID, strPlayPath);g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->setPublishConn(pConn->ConnID());if (!sendPublishStatus(pConn, nInputStreamID))return -1;}else if (AVMATCH(&method, &av_play)){AVal playpath;AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);int time = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 4));std::string strPlayPath(playpath.av_val, playpath.av_len);printf("connection:[%d] play, streamID:[%d] playpath:[%s] time:[%d] \n", pConn->ConnID(), nInputStreamID, strPlayPath.c_str(), time);// 检查streamID的有效性if (!pConn->isValidStreamID(nInputStreamID)){printf("connection:[%d] play, streamID:[%d] invalid! \n", pConn->ConnID(), nInputStreamID);return -1;}// 连接与节目 建立双向关联pConn->setStreamType(CConnection::Play);pConn->bindPlayPlayPath(nInputStreamID, strPlayPath);g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->addPlayConn(pConn->ConnID());if (!sendPlayStreamBegin(pConn, nInputStreamID))return -1;if (!sendPlayStatus(pConn, nInputStreamID))return -1;}else if (AVMATCH(&method, &av_FCUnpublish)){AVal playpath;AMFProp_GetString(AMF_GetProp(&obj, NULL, 3), &playpath);std::string strPlayPath(playpath.av_val, playpath.av_len);printf("connection:[%d] FCUnpublish, playpath:[%s] \n", pConn->ConnID(), strPlayPath.c_str());g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->setEOF();}else if (AVMATCH(&method, &av_deleteStream)){int nStreamID = (int)AMFProp_GetNumber(AMF_GetProp(&obj, NULL, 3));printf("connection:[%d] deleteStream, streamID:[%d] \n", pConn->ConnID(), nStreamID);// 连接与节目 解除双向关联std::string strPlayPath = pConn->getPublishPlayPath(nStreamID);if (strPlayPath != ""){pConn->unbindPublishPlayPath(nStreamID);g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath, true)->unsetPublishConn();}strPlayPath = pConn->getPlayPlayPath(nStreamID);if (strPlayPath != ""){pConn->unbindPlayPlayPath(nStreamID);g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->delPlayConn(pConn->ConnID());}}AMF_Reset(&obj);return 0;
}// 处理媒体报文
int HandleMediaPacket(CConnection* pConn, RTMPPacket* pPacket)
{uint32_t nInputStreamID = pPacket->m_nInfoField2;const std::string& strPlayPath = pConn->getPublishPlayPath(nInputStreamID);g_Apps.getApp(pConn->getAppName())->getPlayPath(strPlayPath)->cacheMediaPacket(pPacket);return 0;
}// 发送应答窗口大小报文
bool sendWindowAckSize(CConnection* pConn)
{char sBuf[256] = {0};char* pEnd = sBuf + sizeof(sBuf);RTMPPacket packet;packet.m_nChannel = 0x02;packet.m_headerType = RTMP_PACKET_SIZE_LARGE;packet.m_packetType = 0x05;packet.m_nTimeStamp = 0;packet.m_nInfoField2 = 0;packet.m_hasAbsTimestamp = 0;packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;AMF_EncodeInt32(packet.m_body, pEnd, 5000000);packet.m_nBodySize = 4;if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE)){printf("connection:[%d] send packet for set window ack size failed! \n", pConn->ConnID());return false;}return true;
}// 发送设置对端输出带宽报文
bool sendPeerOutputBandWide(CConnection* pConn)
{char sBuf[256] = {0};char* pEnd = sBuf + sizeof(sBuf);RTMPPacket packet;packet.m_nChannel = 0x02;packet.m_headerType = RTMP_PACKET_SIZE_LARGE;packet.m_packetType = 0x06;packet.m_nTimeStamp = 0;packet.m_nInfoField2 = 0;packet.m_hasAbsTimestamp = 0;packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;AMF_EncodeInt32(packet.m_body, pEnd, 5000000);packet.m_body[4] = 2;packet.m_nBodySize = 5;if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE)){printf("connection:[%d] send packet for set peer output bandwide size failed! \n", pConn->ConnID());return false;}return true;
}// 发送设置输出块大小报文
bool sendOutputChunkSize(CConnection* pConn)
{pConn->Rtmp()->m_outChunkSize = 4096;char sBuf[256] = {0};char* pEnd = sBuf + sizeof(sBuf);RTMPPacket packet;packet.m_nChannel = 0x02;packet.m_headerType = RTMP_PACKET_SIZE_LARGE;packet.m_packetType = 0x01;packet.m_nTimeStamp = 0;packet.m_nInfoField2 = 0;packet.m_hasAbsTimestamp = 0;packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;AMF_EncodeInt32(packet.m_body, pEnd, 4096);packet.m_nBodySize = 4;if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE)){printf("connection:[%d] send packet for set chunk size failed! \n", pConn->ConnID());return false;}return true;
}// 发送连接响应报文
bool sendConnectResult(CConnection* pConn, int nOperateID)
{char sBuf[256] = {0};char* pEnd = sBuf + sizeof(sBuf);RTMPPacket packet;packet.m_nChannel = 0x03;packet.m_headerType = RTMP_PACKET_SIZE_LARGE;packet.m_packetType = 0x14;packet.m_nTimeStamp = 0;packet.m_nInfoField2 = 0;packet.m_hasAbsTimestamp = 0;packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;char* pEnc = packet.m_body;pEnc = AMF_EncodeString(pEnc, pEnd, &av__result);pEnc = AMF_EncodeNumber(pEnc, pEnd, nOperateID);AMFObject obj1 = {0, NULL};AMFObjectProperty fmsVer;fmsVer.p_name = makeAVal("fmsVer");fmsVer.p_type = AMF_STRING;fmsVer.p_vu.p_aval = makeAVal("FMS/3,0,1,123");AMF_AddProp(&obj1, &fmsVer);AMFObjectProperty capabilities;capabilities.p_name = makeAVal("capabilities");capabilities.p_type = AMF_NUMBER;capabilities.p_vu.p_number = 31;AMF_AddProp(&obj1, &capabilities);pEnc = AMF_Encode(&obj1, pEnc, pEnd);AMFObject obj2 = {0, NULL};AMFObjectProperty level;level.p_name = makeAVal("level");level.p_type = AMF_STRING;level.p_vu.p_aval = makeAVal("status");AMF_AddProp(&obj2, &level);AMFObjectProperty code;code.p_name = makeAVal("code");code.p_type = AMF_STRING;code.p_vu.p_aval = makeAVal("NetConnection.Connect.Success");AMF_AddProp(&obj2, &code);pEnc = AMF_Encode(&obj2, pEnc, pEnd);packet.m_nBodySize = pEnc - packet.m_body;if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE)){printf("connection:[%d] send packet for connect _result failed! \n", pConn->ConnID());return false;}return true;
}// 发送创建流响应报文
bool sendCreateStreamResult(CConnection* pConn, int nOperateID, uint32_t nStreamID)
{char sBuf[256] = {0};char* pEnd = sBuf + sizeof(sBuf);RTMPPacket packet;packet.m_nChannel = 0x03;packet.m_headerType = RTMP_PACKET_SIZE_LARGE;packet.m_packetType = 0x14;packet.m_nTimeStamp = 0;packet.m_nInfoField2 = 0;packet.m_hasAbsTimestamp = 0;packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;char* pEnc = packet.m_body;pEnc = AMF_EncodeString(pEnc, pEnd, &av__result);pEnc = AMF_EncodeNumber(pEnc, pEnd, nOperateID);*pEnc++ = AMF_NULL;pEnc = AMF_EncodeNumber(pEnc, pEnd, nStreamID);packet.m_nBodySize = pEnc - packet.m_body;if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE)){printf("connection:[%d] send packet for createStream _result failed! \n", pConn->ConnID());return false;}return true;
}// 发送推流状态响应报文
bool sendPublishStatus(CConnection* pConn, int nInputStreamID)
{char sBuf[256] = {0};char* pEnd = sBuf + sizeof(sBuf);RTMPPacket packet;packet.m_nChannel = 0x05;packet.m_headerType = RTMP_PACKET_SIZE_LARGE;packet.m_packetType = 0x14;packet.m_nTimeStamp = 0;packet.m_nInfoField2 = nInputStreamID;packet.m_hasAbsTimestamp = 0;packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;char* pEnc = packet.m_body;pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);*pEnc++ = AMF_NULL;AMFObject obj2 = {0, NULL};AMFObjectProperty level;level.p_name = makeAVal("level");level.p_type = AMF_STRING;level.p_vu.p_aval = makeAVal("status");AMF_AddProp(&obj2, &level);AMFObjectProperty code;code.p_name = makeAVal("code");code.p_type = AMF_STRING;code.p_vu.p_aval = makeAVal("NetStream.Publish.Start");AMF_AddProp(&obj2, &code);pEnc = AMF_Encode(&obj2, pEnc, pEnd);packet.m_nBodySize = pEnc - packet.m_body;if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE)){printf("connection:[%d] send packet for publish onStatus failed! \n", pConn->ConnID());return false;}return true;
}// 发送推流错误响应报文
bool sendPublishError(CConnection* pConn, int nInputStreamID)
{char sBuf[256] = {0};char* pEnd = sBuf + sizeof(sBuf);RTMPPacket packet;packet.m_nChannel = 0x05;packet.m_headerType = RTMP_PACKET_SIZE_LARGE;packet.m_packetType = 0x14;packet.m_nTimeStamp = 0;packet.m_nInfoField2 = nInputStreamID;packet.m_hasAbsTimestamp = 0;packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;char* pEnc = packet.m_body;pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);*pEnc++ = AMF_NULL;AMFObject obj2 = {0, NULL};AMFObjectProperty level;level.p_name = makeAVal("level");level.p_type = AMF_STRING;level.p_vu.p_aval = makeAVal("error");AMF_AddProp(&obj2, &level);AMFObjectProperty code;code.p_name = makeAVal("code");code.p_type = AMF_STRING;code.p_vu.p_aval = makeAVal("NetStream.Publish.BadName");AMF_AddProp(&obj2, &code);AMFObjectProperty description;description.p_name = makeAVal("description");description.p_type = AMF_STRING;description.p_vu.p_aval = makeAVal("Already publishing");AMF_AddProp(&obj2, &description);pEnc = AMF_Encode(&obj2, pEnc, pEnd);packet.m_nBodySize = pEnc - packet.m_body;if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE)){printf("connection:[%d] send packet for publish onStatus failed! \n", pConn->ConnID());return false;}return true;
}// 发送拉流事件报文
bool sendPlayStreamBegin(CConnection* pConn, int nInputStreamID)
{char sBuf[256] = {0};char* pEnd = sBuf + sizeof(sBuf);RTMPPacket packet;packet.m_nChannel = 0x02;packet.m_headerType = RTMP_PACKET_SIZE_LARGE;packet.m_packetType = 0x04;packet.m_nTimeStamp = 0;packet.m_nInfoField2 = 0;packet.m_hasAbsTimestamp = 0;packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;char* pEnc = packet.m_body;pEnc = AMF_EncodeInt16(pEnc, pEnd, 0);pEnc = AMF_EncodeInt32(pEnc, pEnd, nInputStreamID);packet.m_nBodySize = pEnc - packet.m_body;if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE)){printf("connection:[%d] send packet for play event failed! \n", pConn->ConnID());return false;}return true;
}// 发送拉流状态响应报文
bool sendPlayStatus(CConnection* pConn, int nInputStreamID)
{char sBuf[256] = {0};char* pEnd = sBuf + sizeof(sBuf);RTMPPacket packet;packet.m_nChannel = 0x05;packet.m_headerType = RTMP_PACKET_SIZE_LARGE;packet.m_packetType = 0x14;packet.m_nTimeStamp = 0;packet.m_nInfoField2 = nInputStreamID;packet.m_hasAbsTimestamp = 0;packet.m_body = sBuf + RTMP_MAX_HEADER_SIZE;char* pEnc = packet.m_body;pEnc = AMF_EncodeString(pEnc, pEnd, &av_onStatus);pEnc = AMF_EncodeNumber(pEnc, pEnd, 0);*pEnc++ = AMF_NULL;AMFObject obj2 = {0, NULL};AMFObjectProperty level;level.p_name = makeAVal("level");level.p_type = AMF_STRING;level.p_vu.p_aval = makeAVal("status");AMF_AddProp(&obj2, &level);AMFObjectProperty code;code.p_name = makeAVal("code");code.p_type = AMF_STRING;code.p_vu.p_aval = makeAVal("NetStream.Play.Start");AMF_AddProp(&obj2, &code);pEnc = AMF_Encode(&obj2, pEnc, pEnd);packet.m_nBodySize = pEnc - packet.m_body;if (!RTMP_SendPacket(pConn->Rtmp(), &packet, FALSE)){printf("connection:[%d] send packet for play onStatus failed! \n", pConn->ConnID());return false;}return true;
}