IOCP实现UDP Server

ops/2024/10/19 13:20:07/

IOCP实现UDP Server

1、IOCP原理图

参考文献1:IOCP详解-阿里云开发者社区 (aliyun.com)

参考文献2:IOCP编程之基本原理 - 史D芬周 - 博客园 (cnblogs.com)

原理图

image.png

同步以及异步

image.png

image.png

2、UDP Server代码以及测试代码

// iocpudpdemo.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//// UDP Server
// RIOTest.cpp : Defines the entry point for the console application.
//
#pragma comment(lib, "ws2_32.lib")#include <WS2tcpip.h>
#include <map>
#include <memory>
#include <cstring>
#include <thread>
#include <iostream>using namespace std;SOCKET g_s;
HANDLE g_hIOCP = 0;
long g_workIterations = 0;
LARGE_INTEGER g_frequency;
LARGE_INTEGER g_startCounter;
LARGE_INTEGER g_stopCounter;volatile long g_packets = 0;static const DWORD EXPECTED_DATA_SIZE = 8192;
static const DWORD RIO_MAX_RESULTS = 1000;
static const DWORD TIMING_THREAD_AFFINITY_MASK = 1;
static const unsigned short PORT = 8081;struct EXTENDED_OVERLAPPED : public OVERLAPPED
{WSABUF buf;
};inline void ErrorExit(const char* pFunction,const DWORD lastError)
{cout << "Error: " << pFunction << " failed: " << lastError << endl;exit(0);
}inline void ErrorExit(const char* pFunction)
{const DWORD lastError = ::GetLastError();ErrorExit(pFunction, lastError);
}inline void SetupTiming(const char* pProgramName,const bool lockToThreadForTiming = true)
{cout << pProgramName << endl;cout << "Work load: " << g_workIterations << endl;cout << "Max results: " << RIO_MAX_RESULTS << endl;if (lockToThreadForTiming){HANDLE hThread = ::GetCurrentThread();if (0 == ::SetThreadAffinityMask(hThread, TIMING_THREAD_AFFINITY_MASK)){ErrorExit("SetThreadAffinityMask");}}if (!::QueryPerformanceFrequency(&g_frequency)){ErrorExit("QueryPerformanceFrequency");}
}inline void PrintTimings(const char* pDirection = "Received ")
{LARGE_INTEGER elapsed;elapsed.QuadPart = (g_stopCounter.QuadPart - g_startCounter.QuadPart) / (g_frequency.QuadPart / 1000);cout << "Complete in " << elapsed.QuadPart << "ms" << endl;cout << pDirection << g_packets << " datagrams" << endl;if (elapsed.QuadPart != 0){const double perSec = g_packets / elapsed.QuadPart * 1000.00;cout << perSec << " datagrams per second" << endl;}
}inline void InitialiseWinsock()
{WSADATA data;WORD wVersionRequested = 0x202;if (0 != ::WSAStartup(wVersionRequested, &data)){ErrorExit("WSAStartup");}
}inline SOCKET CreateSocket(const DWORD flags = 0)
{g_s = ::WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, flags);if (g_s == INVALID_SOCKET){ErrorExit("WSASocket");}return g_s;
}inline HANDLE CreateIOCP()
{g_hIOCP = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);if (0 == g_hIOCP){ErrorExit("CreateIoCompletionPort");}return g_hIOCP;
}inline void Bind(SOCKET s,const unsigned short port)
{sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = INADDR_ANY;if (SOCKET_ERROR == ::bind(s, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr))){ErrorExit("bind");}
}template <typename TV, typename TM>
inline TV RoundDown(TV Value, TM Multiple)
{return((Value / Multiple) * Multiple);
}template <typename TV, typename TM>
inline TV RoundUp(TV Value, TM Multiple)
{return(RoundDown(Value, Multiple) + (((Value % Multiple) > 0) ? Multiple : 0));
}inline void StartTiming()
{if (!::QueryPerformanceCounter(&g_startCounter)){ErrorExit("QueryPerformanceCounter");}cout << "Timing started" << endl;
}inline void StopTiming()
{if (!::QueryPerformanceCounter(&g_stopCounter)){ErrorExit("QueryPerformanceCounter");}cout << "Timing stopped" << endl;
}inline char* AllocateBufferSpace(const DWORD recvBufferSize,const DWORD pendingRecvs,DWORD& bufferSize,DWORD& receiveBuffersAllocated)
{const DWORD preferredNumaNode = 0;const SIZE_T largePageMinimum = 0;SYSTEM_INFO systemInfo;::GetSystemInfo(&systemInfo);systemInfo.dwAllocationGranularity;const unsigned __int64 granularity = (largePageMinimum == 0 ? systemInfo.dwAllocationGranularity : largePageMinimum);const unsigned __int64 desiredSize = recvBufferSize * pendingRecvs;unsigned __int64 actualSize = RoundUp(desiredSize, granularity);if (actualSize > (std::numeric_limits<DWORD>::max)()){actualSize = ((std::numeric_limits<DWORD>::max)() / granularity) * granularity;}receiveBuffersAllocated = std::min<DWORD>(pendingRecvs, static_cast<DWORD>(actualSize / recvBufferSize));bufferSize = static_cast<DWORD>(actualSize);char* pBuffer = reinterpret_cast<char*>(VirtualAllocExNuma(GetCurrentProcess(), 0, bufferSize, MEM_COMMIT | MEM_RESERVE | (largePageMinimum != 0 ? MEM_LARGE_PAGES : 0), PAGE_READWRITE, preferredNumaNode));if (pBuffer == 0){ErrorExit("VirtualAlloc");}return pBuffer;
}inline char* AllocateBufferSpace(const DWORD recvBufferSize,const DWORD pendingRecvs,DWORD& receiveBuffersAllocated)
{DWORD notUsed;return AllocateBufferSpace(recvBufferSize, pendingRecvs, notUsed, receiveBuffersAllocated);
}inline void PostIOCPRecvs(const DWORD recvBufferSize,const DWORD pendingRecvs)
{DWORD totalBuffersAllocated = 0;while (totalBuffersAllocated < pendingRecvs){DWORD receiveBuffersAllocated = 0;char* pBuffer = AllocateBufferSpace(recvBufferSize, pendingRecvs, receiveBuffersAllocated);totalBuffersAllocated += receiveBuffersAllocated;DWORD offset = 0;const DWORD recvFlags = 0;EXTENDED_OVERLAPPED* pBufs = new EXTENDED_OVERLAPPED[receiveBuffersAllocated];DWORD bytesRecvd = 0;DWORD flags = 0;for (DWORD i = 0; i < receiveBuffersAllocated; ++i){EXTENDED_OVERLAPPED* pOverlapped = pBufs + i;ZeroMemory(pOverlapped, sizeof(EXTENDED_OVERLAPPED));pOverlapped->buf.buf = pBuffer + offset;pOverlapped->buf.len = recvBufferSize;offset += recvBufferSize;if (SOCKET_ERROR == ::WSARecvFrom(g_s, &(pOverlapped->buf), 1, &bytesRecvd, &flags, NULL, NULL, pOverlapped, 0)){const DWORD lastError = ::GetLastError();if (lastError != ERROR_IO_PENDING){ErrorExit("WSARecv", lastError);}}}if (totalBuffersAllocated != pendingRecvs){cout << pendingRecvs << " receives pending" << endl;}}cout << totalBuffersAllocated << " total receives pending" << endl;
}int main(int argc, char* argv[])
{std::map<std::size_t, std::pair<std::size_t, std::shared_ptr<char>>> packets;SetupTiming("IOCP UDP");InitialiseWinsock();SOCKET s = CreateSocket(WSA_FLAG_OVERLAPPED);HANDLE hIOCP = CreateIOCP();Bind(s, PORT);if (0 == ::CreateIoCompletionPort(reinterpret_cast<HANDLE>(s), hIOCP, 0, 0)){ErrorExit("CreateIoCompletionPort");}struct sockaddr_in sname;int snamesize = sizeof(struct sockaddr_in);::getsockname(s, (struct sockaddr*)&sname, &snamesize);std::cout << sname.sin_port << std::endl;std::cout << ntohs(sname.sin_port) << std::endl;PostIOCPRecvs(8192, 2000);bool done = false;DWORD numberOfBytes = 0;ULONG_PTR completionKey = 0;OVERLAPPED* pOverlapped = 0;if (!::GetQueuedCompletionStatus(hIOCP, &numberOfBytes, &completionKey, &pOverlapped, INFINITE)){ErrorExit("GetQueuedCompletionStatus");}StartTiming();//std::thread killIOCP([&]() {//    std::cout << "iocp kill start" << std::endl;//    std::this_thread::sleep_for(std::chrono::seconds(5));//    std::cout << "kill iocp" << std::endl;//    CloseHandle(hIOCP);//    });//killIOCP.detach();DWORD bytesRecvd = 0;DWORD flags = 0;std::size_t times = 0;do{if (numberOfBytes == EXPECTED_DATA_SIZE || numberOfBytes == 100){g_packets++;EXTENDED_OVERLAPPED* pExtOverlapped = static_cast<EXTENDED_OVERLAPPED*>(pOverlapped);if (SOCKET_ERROR == ::WSARecvFrom(g_s, &(pExtOverlapped->buf), 1, &bytesRecvd, &flags, NULL, NULL, pExtOverlapped, 0)){const DWORD lastError = ::GetLastError();std::shared_ptr<char> packet(new char[numberOfBytes]);memmove(packet.get(), pExtOverlapped->buf.buf, numberOfBytes);if (numberOfBytes == 100) {std::cout << pExtOverlapped->buf.buf[2] << std::endl;}auto ppp = std::make_pair<std::size_t, std::shared_ptr<char>&>(numberOfBytes, packet);packets.insert({ g_packets,ppp });if (lastError != ERROR_IO_PENDING){ErrorExit("WSARecv", lastError);}}}else{g_packets++;EXTENDED_OVERLAPPED* pExtOverlapped = static_cast<EXTENDED_OVERLAPPED*>(pOverlapped);if (SOCKET_ERROR == ::WSARecvFrom(g_s, &(pExtOverlapped->buf), 1, &bytesRecvd, &flags, NULL, NULL, pExtOverlapped, 0)){const DWORD lastError = ::GetLastError();std::shared_ptr<char> packet(new char[numberOfBytes]);memmove(packet.get(), pExtOverlapped->buf.buf, numberOfBytes);auto ppp = std::make_pair<std::size_t, std::shared_ptr<char>&>(numberOfBytes, packet);packets.insert({ g_packets,ppp });std::cout << "use count:" << packet.use_count() << std::endl;if (lastError != ERROR_IO_PENDING){ErrorExit("WSARecv", lastError);}}std::cout << "packets size: " << packets.size() << std::endl;StopTiming();done = true;}if (!done){if (!::GetQueuedCompletionStatus(hIOCP, &numberOfBytes, &completionKey, &pOverlapped, INFINITE)){DWORD error = GetLastError();if (ERROR_ABANDONED_WAIT_0 == error || ERROR_INVALID_HANDLE == error) {StopTiming();std::cout << error << std::endl;break;}ErrorExit("GetQueuedCompletionStatus");}}} while (!done);PrintTimings();packets.clear();return 0;
}

测试代码

#include <boost/asio.hpp>
#include <cstring>
#include <iostream>char* makeMem(size_t size){char* mem = (char*)malloc(size);memset(mem,'1',size);memset(mem+(size-8296),'2',8296);return mem;
}int main(){boost::asio::io_context context;boost::asio::ip::udp::endpoint destEndpoint = boost::asio::ip::udp::endpoint(boost::asio::ip::make_address("10.10.1.40"),8081);boost::asio::ip::udp::socket transmitter = boost::asio::ip::udp::socket(context,boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 10120));size_t size = 20*1024*1024 + 100;// 20MBchar* mem = makeMem(size);char* sendData = mem;std::size_t payload = 8192;std::size_t leftLastSize = size %payload;std::size_t sendTimes = size/payload;char headerData[100] = {3};memset(headerData,3,sizeof(headerData));transmitter.send_to(boost::asio::buffer(headerData, sizeof(headerData)), destEndpoint);for(size_t idx =0;idx<sendTimes;++idx,sendData+=payload){transmitter.send_to(boost::asio::buffer(sendData, payload), destEndpoint);std::cout << idx<<std::endl;//std::this_thread::sleep_for(std::chrono::milliseconds(1));}transmitter.send_to(boost::asio::buffer(sendData, leftLastSize), destEndpoint);std::cout << "------------"<<std::endl;std::this_thread::sleep_for(std::chrono::seconds(10));sendData = mem;transmitter.send_to(boost::asio::buffer(headerData, sizeof(headerData)), destEndpoint);for(size_t idx =0;idx<sendTimes;++idx,sendData+=payload){transmitter.send_to(boost::asio::buffer(sendData, payload), destEndpoint);std::cout << idx<<std::endl;//std::this_thread::sleep_for(std::chrono::milliseconds(1));}transmitter.send_to(boost::asio::buffer(sendData, leftLastSize), destEndpoint);char tailerData[10] = {9};transmitter.send_to(boost::asio::buffer(tailerData, sizeof(tailerData)), destEndpoint);delete mem;return 0;
}

最后,推荐一个项目,上述代码基本来自于该项目

LenHolgate/RIO: Code that explores the Windows Registered I/O Networking Extensions (github.com)


http://www.ppmy.cn/ops/29022.html

相关文章

Python中的类(Class)详解——新手指南

在Python编程中&#xff0c;类&#xff08;Class&#xff09;是一个非常重要的概念&#xff0c;它允许程序员创建自己的对象类型。这些对象类型可以包含数据&#xff08;称为属性&#xff09;和函数&#xff08;称为方法&#xff09;&#xff0c;它们定义了这些对象的行为。本文…

GPT3 探索指南(一)

原文&#xff1a;zh.annas-archive.org/md5/e19ec4b9c1d08c12abd2983dace7ff20 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 序言 如果这本书是由人工智能写的呢&#xff1f;你会阅读吗&#xff1f;我希望会&#xff0c;因为其中的部分确实是由人工智能写的。是的&…

单例模式及其应用

单例模式介绍&#xff1a; 单例模式是一种常见的设计模式&#xff0c;其目的是确保某个类只有一个实例存在&#xff0c;并提供一个全局访问点。 在实现单例模式时&#xff0c;一般需要注意以下几点&#xff1a; 私有化构造函数&#xff1a;防止外部直接实例化对象。私有静态…

截取视频第一帧当做封面

看了好多处理视频的框架 比如ffmpeg&#xff0c;很多都需要依赖安装第三方插件&#xff0c;比较麻烦&#xff0c;找到一个内嵌进去不需要额外安装的&#xff1a;jcodec 一 首先代码中添加依赖 <!--视频生成预览图用--><dependency><groupId>org.jcodec</…

Linux搭建mysql环境

搭建 MySQL 环境 1、使用 wget 下载安装包&#xff0c;下载到 opt 目录中 wget http://dev.mysql.com/get/mysql57-community-release-el7-10.noarch.rpm2、安装 MySQL 公钥 rpm -i mysql57-community-release-el7-10.noarch.rpmrpm --import https://repo.mysql.com/RPM-GP…

【Linux】理解 Ubuntu 中的 kill 和 killall 命令

我把我唱给你听 把你纯真无邪的笑容给我吧 我们应该有快乐的 幸福的晴朗的时光 我把我唱给你听 用我炙热的感情感动你好吗 岁月是值得怀念的留恋的 害羞的红色脸庞 谁能够代替你呀 趁年轻尽情的爱吧 最最亲爱的人啊 路途遥远我们在一起吧 &#x1f3b5; 叶…

AI智能体的未来:引领科技创新潮流

Agent AI智能体的未来 随着Agent AI智能体的智能化水平不断提高&#xff0c;它们在未来社会中的角色、发展路径以及可能带来的挑战也引起了广泛关注。快来分享一下你的看法吧~ 技术进步与创新 随着科技的发展&#xff0c;新的产业和商业模式不断涌现&#xff0c;创造了新的收…

CSS之动画

前言&#xff1a; 一个好的事物应该是有生命&#xff0c;而不是死板的&#xff0c;我们在制作网页时也是如此&#xff0c;为了让画面变得更加生动有趣&#xff0c;我们会给它加上动画的效果&#xff0c;例如过度、变形、旋转、放缩。 动画&#xff1a; 什么是动画&#xff1a;…