IOCP实现UDP Server

news/2024/10/19 9:41:36/

IOCP实现UDP Server

1、IOCP原理图

参考文献1:IOCP详解-阿里云开发者社区 (aliyun.com)

参考文献2:IOCP编程之基本原理 - 史D芬周 - 博客园 (cnblogs.com)

原理图

image.png

同步以及异步

image.png

image.png

2、UDP Server代码以及测试代码

// iocpudpdemo.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//// UDP Server
// RIOTest.cpp : Defines the entry point for the console application.
//
#pragma comment(lib, "ws2_32.lib")#include <WS2tcpip.h>
#include <map>
#include <memory>
#include <cstring>
#include <thread>
#include <iostream>using namespace std;SOCKET g_s;
HANDLE g_hIOCP = 0;
long g_workIterations = 0;
LARGE_INTEGER g_frequency;
LARGE_INTEGER g_startCounter;
LARGE_INTEGER g_stopCounter;volatile long g_packets = 0;static const DWORD EXPECTED_DATA_SIZE = 8192;
static const DWORD RIO_MAX_RESULTS = 1000;
static const DWORD TIMING_THREAD_AFFINITY_MASK = 1;
static const unsigned short PORT = 8081;struct EXTENDED_OVERLAPPED : public OVERLAPPED
{WSABUF buf;
};inline void ErrorExit(const char* pFunction,const DWORD lastError)
{cout << "Error: " << pFunction << " failed: " << lastError << endl;exit(0);
}inline void ErrorExit(const char* pFunction)
{const DWORD lastError = ::GetLastError();ErrorExit(pFunction, lastError);
}inline void SetupTiming(const char* pProgramName,const bool lockToThreadForTiming = true)
{cout << pProgramName << endl;cout << "Work load: " << g_workIterations << endl;cout << "Max results: " << RIO_MAX_RESULTS << endl;if (lockToThreadForTiming){HANDLE hThread = ::GetCurrentThread();if (0 == ::SetThreadAffinityMask(hThread, TIMING_THREAD_AFFINITY_MASK)){ErrorExit("SetThreadAffinityMask");}}if (!::QueryPerformanceFrequency(&g_frequency)){ErrorExit("QueryPerformanceFrequency");}
}inline void PrintTimings(const char* pDirection = "Received ")
{LARGE_INTEGER elapsed;elapsed.QuadPart = (g_stopCounter.QuadPart - g_startCounter.QuadPart) / (g_frequency.QuadPart / 1000);cout << "Complete in " << elapsed.QuadPart << "ms" << endl;cout << pDirection << g_packets << " datagrams" << endl;if (elapsed.QuadPart != 0){const double perSec = g_packets / elapsed.QuadPart * 1000.00;cout << perSec << " datagrams per second" << endl;}
}inline void InitialiseWinsock()
{WSADATA data;WORD wVersionRequested = 0x202;if (0 != ::WSAStartup(wVersionRequested, &data)){ErrorExit("WSAStartup");}
}inline SOCKET CreateSocket(const DWORD flags = 0)
{g_s = ::WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, flags);if (g_s == INVALID_SOCKET){ErrorExit("WSASocket");}return g_s;
}inline HANDLE CreateIOCP()
{g_hIOCP = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);if (0 == g_hIOCP){ErrorExit("CreateIoCompletionPort");}return g_hIOCP;
}inline void Bind(SOCKET s,const unsigned short port)
{sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = INADDR_ANY;if (SOCKET_ERROR == ::bind(s, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr))){ErrorExit("bind");}
}template <typename TV, typename TM>
inline TV RoundDown(TV Value, TM Multiple)
{return((Value / Multiple) * Multiple);
}template <typename TV, typename TM>
inline TV RoundUp(TV Value, TM Multiple)
{return(RoundDown(Value, Multiple) + (((Value % Multiple) > 0) ? Multiple : 0));
}inline void StartTiming()
{if (!::QueryPerformanceCounter(&g_startCounter)){ErrorExit("QueryPerformanceCounter");}cout << "Timing started" << endl;
}inline void StopTiming()
{if (!::QueryPerformanceCounter(&g_stopCounter)){ErrorExit("QueryPerformanceCounter");}cout << "Timing stopped" << endl;
}inline char* AllocateBufferSpace(const DWORD recvBufferSize,const DWORD pendingRecvs,DWORD& bufferSize,DWORD& receiveBuffersAllocated)
{const DWORD preferredNumaNode = 0;const SIZE_T largePageMinimum = 0;SYSTEM_INFO systemInfo;::GetSystemInfo(&systemInfo);systemInfo.dwAllocationGranularity;const unsigned __int64 granularity = (largePageMinimum == 0 ? systemInfo.dwAllocationGranularity : largePageMinimum);const unsigned __int64 desiredSize = recvBufferSize * pendingRecvs;unsigned __int64 actualSize = RoundUp(desiredSize, granularity);if (actualSize > (std::numeric_limits<DWORD>::max)()){actualSize = ((std::numeric_limits<DWORD>::max)() / granularity) * granularity;}receiveBuffersAllocated = std::min<DWORD>(pendingRecvs, static_cast<DWORD>(actualSize / recvBufferSize));bufferSize = static_cast<DWORD>(actualSize);char* pBuffer = reinterpret_cast<char*>(VirtualAllocExNuma(GetCurrentProcess(), 0, bufferSize, MEM_COMMIT | MEM_RESERVE | (largePageMinimum != 0 ? MEM_LARGE_PAGES : 0), PAGE_READWRITE, preferredNumaNode));if (pBuffer == 0){ErrorExit("VirtualAlloc");}return pBuffer;
}inline char* AllocateBufferSpace(const DWORD recvBufferSize,const DWORD pendingRecvs,DWORD& receiveBuffersAllocated)
{DWORD notUsed;return AllocateBufferSpace(recvBufferSize, pendingRecvs, notUsed, receiveBuffersAllocated);
}inline void PostIOCPRecvs(const DWORD recvBufferSize,const DWORD pendingRecvs)
{DWORD totalBuffersAllocated = 0;while (totalBuffersAllocated < pendingRecvs){DWORD receiveBuffersAllocated = 0;char* pBuffer = AllocateBufferSpace(recvBufferSize, pendingRecvs, receiveBuffersAllocated);totalBuffersAllocated += receiveBuffersAllocated;DWORD offset = 0;const DWORD recvFlags = 0;EXTENDED_OVERLAPPED* pBufs = new EXTENDED_OVERLAPPED[receiveBuffersAllocated];DWORD bytesRecvd = 0;DWORD flags = 0;for (DWORD i = 0; i < receiveBuffersAllocated; ++i){EXTENDED_OVERLAPPED* pOverlapped = pBufs + i;ZeroMemory(pOverlapped, sizeof(EXTENDED_OVERLAPPED));pOverlapped->buf.buf = pBuffer + offset;pOverlapped->buf.len = recvBufferSize;offset += recvBufferSize;if (SOCKET_ERROR == ::WSARecvFrom(g_s, &(pOverlapped->buf), 1, &bytesRecvd, &flags, NULL, NULL, pOverlapped, 0)){const DWORD lastError = ::GetLastError();if (lastError != ERROR_IO_PENDING){ErrorExit("WSARecv", lastError);}}}if (totalBuffersAllocated != pendingRecvs){cout << pendingRecvs << " receives pending" << endl;}}cout << totalBuffersAllocated << " total receives pending" << endl;
}int main(int argc, char* argv[])
{std::map<std::size_t, std::pair<std::size_t, std::shared_ptr<char>>> packets;SetupTiming("IOCP UDP");InitialiseWinsock();SOCKET s = CreateSocket(WSA_FLAG_OVERLAPPED);HANDLE hIOCP = CreateIOCP();Bind(s, PORT);if (0 == ::CreateIoCompletionPort(reinterpret_cast<HANDLE>(s), hIOCP, 0, 0)){ErrorExit("CreateIoCompletionPort");}struct sockaddr_in sname;int snamesize = sizeof(struct sockaddr_in);::getsockname(s, (struct sockaddr*)&sname, &snamesize);std::cout << sname.sin_port << std::endl;std::cout << ntohs(sname.sin_port) << std::endl;PostIOCPRecvs(8192, 2000);bool done = false;DWORD numberOfBytes = 0;ULONG_PTR completionKey = 0;OVERLAPPED* pOverlapped = 0;if (!::GetQueuedCompletionStatus(hIOCP, &numberOfBytes, &completionKey, &pOverlapped, INFINITE)){ErrorExit("GetQueuedCompletionStatus");}StartTiming();//std::thread killIOCP([&]() {//    std::cout << "iocp kill start" << std::endl;//    std::this_thread::sleep_for(std::chrono::seconds(5));//    std::cout << "kill iocp" << std::endl;//    CloseHandle(hIOCP);//    });//killIOCP.detach();DWORD bytesRecvd = 0;DWORD flags = 0;std::size_t times = 0;do{if (numberOfBytes == EXPECTED_DATA_SIZE || numberOfBytes == 100){g_packets++;EXTENDED_OVERLAPPED* pExtOverlapped = static_cast<EXTENDED_OVERLAPPED*>(pOverlapped);if (SOCKET_ERROR == ::WSARecvFrom(g_s, &(pExtOverlapped->buf), 1, &bytesRecvd, &flags, NULL, NULL, pExtOverlapped, 0)){const DWORD lastError = ::GetLastError();std::shared_ptr<char> packet(new char[numberOfBytes]);memmove(packet.get(), pExtOverlapped->buf.buf, numberOfBytes);if (numberOfBytes == 100) {std::cout << pExtOverlapped->buf.buf[2] << std::endl;}auto ppp = std::make_pair<std::size_t, std::shared_ptr<char>&>(numberOfBytes, packet);packets.insert({ g_packets,ppp });if (lastError != ERROR_IO_PENDING){ErrorExit("WSARecv", lastError);}}}else{g_packets++;EXTENDED_OVERLAPPED* pExtOverlapped = static_cast<EXTENDED_OVERLAPPED*>(pOverlapped);if (SOCKET_ERROR == ::WSARecvFrom(g_s, &(pExtOverlapped->buf), 1, &bytesRecvd, &flags, NULL, NULL, pExtOverlapped, 0)){const DWORD lastError = ::GetLastError();std::shared_ptr<char> packet(new char[numberOfBytes]);memmove(packet.get(), pExtOverlapped->buf.buf, numberOfBytes);auto ppp = std::make_pair<std::size_t, std::shared_ptr<char>&>(numberOfBytes, packet);packets.insert({ g_packets,ppp });std::cout << "use count:" << packet.use_count() << std::endl;if (lastError != ERROR_IO_PENDING){ErrorExit("WSARecv", lastError);}}std::cout << "packets size: " << packets.size() << std::endl;StopTiming();done = true;}if (!done){if (!::GetQueuedCompletionStatus(hIOCP, &numberOfBytes, &completionKey, &pOverlapped, INFINITE)){DWORD error = GetLastError();if (ERROR_ABANDONED_WAIT_0 == error || ERROR_INVALID_HANDLE == error) {StopTiming();std::cout << error << std::endl;break;}ErrorExit("GetQueuedCompletionStatus");}}} while (!done);PrintTimings();packets.clear();return 0;
}

测试代码

#include <boost/asio.hpp>
#include <cstring>
#include <iostream>char* makeMem(size_t size){char* mem = (char*)malloc(size);memset(mem,'1',size);memset(mem+(size-8296),'2',8296);return mem;
}int main(){boost::asio::io_context context;boost::asio::ip::udp::endpoint destEndpoint = boost::asio::ip::udp::endpoint(boost::asio::ip::make_address("10.10.1.40"),8081);boost::asio::ip::udp::socket transmitter = boost::asio::ip::udp::socket(context,boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 10120));size_t size = 20*1024*1024 + 100;// 20MBchar* mem = makeMem(size);char* sendData = mem;std::size_t payload = 8192;std::size_t leftLastSize = size %payload;std::size_t sendTimes = size/payload;char headerData[100] = {3};memset(headerData,3,sizeof(headerData));transmitter.send_to(boost::asio::buffer(headerData, sizeof(headerData)), destEndpoint);for(size_t idx =0;idx<sendTimes;++idx,sendData+=payload){transmitter.send_to(boost::asio::buffer(sendData, payload), destEndpoint);std::cout << idx<<std::endl;//std::this_thread::sleep_for(std::chrono::milliseconds(1));}transmitter.send_to(boost::asio::buffer(sendData, leftLastSize), destEndpoint);std::cout << "------------"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(10));sendData = mem;transmitter.send_to(boost::asio::buffer(headerData, sizeof(headerData)), destEndpoint);for(size_t idx =0;idx<sendTimes;++idx,sendData+=payload){transmitter.send_to(boost::asio::buffer(sendData, payload), destEndpoint);std::cout << idx<<std::endl;//std::this_thread::sleep_for(std::chrono::milliseconds(1));}transmitter.send_to(boost::asio::buffer(sendData, leftLastSize), destEndpoint);char tailerData[10] = {9};transmitter.send_to(boost::asio::buffer(tailerData, sizeof(tailerData)), destEndpoint);delete mem;return 0;
}

最后,推荐一个项目,上述代码基本来自于该项目

LenHolgate/RIO: Code that explores the Windows Registered I/O Networking Extensions (github.com)


http://www.ppmy.cn/news/1450385.html

相关文章

手机配置在线检测工具微信小程序源码

手机配置在线检测工具微信小程序源码&#xff0c;这是一款升级版检测工具&#xff0c;自动检测手机真伪,序列号等。另外还可以给手机检测各项功能是否正常。 支持多做流量主模式,还有外卖CPS,和友情小程序推荐等 源码免费下载地址 抄笔记(chaobiji.cn)

基于Spring Boot的校园疫情防控系统设计与实现

基于Spring Boot的校园疫情防控系统设计与实现 开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/idea 系统部分展示 管理员登录首页界面图&#xff0c;管理员进入校园疫…

Git使用指北

目录 创建一个Git仓库本地仓库添加文件文件提交到本地仓库缓冲区添加远程仓库地址本地仓库推送到远程仓库创建新的分支拉取代码同步删除缓冲区的文件&#xff0c;远程仓库的文件.gitignore文件 创建一个Git仓库 Git仓库分为远程和本地两种&#xff0c;远程仓库如Githu上创建的…

uniapp0基础编写安卓原生插件和调用第三方jar包和编写语音播报插件之编写语音播放安卓插件

前言 如果你不会编写安卓插件,你可以先看看我之前零基础的文章(uniapp0基础编写安卓原生插件和调用第三方jar包和编写语音播报插件之零基础编写安卓插件), 我们使用android.speech.tts.TextToSpeech进行编写语言播放插件 开始 uniapp <template><view style=&q…

计算机毕业设计PHP+vue体检预约管理系统d1yu38

防止在使用不同数据库时&#xff0c;由于底层数据库技术不同造成接口程序紊乱的问题。通过本次系统设计可以提高自己的编程能力&#xff0c;强化对所学知识的理解和运用 本系统是一个服务于医院先关内容的网站&#xff0c;在用户打开网站的第一眼就要明白网站开发的目的&#x…

css 居中方法

行内元素水平居中: 行内元素指的是&#xff1a;text、image、超链接等。 #id {text-align: center; }块级元素水平居中 块级元素指的是&#xff1a;div、h1-h6、ul等&#xff0c;使用如下代码即可&#xff0c;必须指定宽度。 #id {margin: 0 auto;width: 100px; }

PHP SMTP 电子邮件错误‘354结束数据‘data: 354 Enter mail, end with .

荆轲刺秦王 公司的邮箱发送验证码功能 之前一直正常工作 但是最近停止了 查看代码发现邮件类使用的是codeigniter框架的Email class 原有的代码是&#xff1a; /application/config.php mail >[protocol > smtp,smtp_host > mail.baidu.com,smtp_user > roo…

HaLo-NeRF:利用视觉和语言模型对场景的精准定位和细粒度语义理解

包含大量摄影师拍摄的照片的互联网图像集有望实现对大型旅游地标的数字探索。然而&#xff0c;先前的工作主要集中在几何重建和可视化上&#xff0c;忽略了语言在为导航和细粒度理解提供语义界面方面的关键作用。 项目&#xff1a;HaLo-NeRF: Learning Geometry-Guided Semant…