十、基于I/O模型的网络开发
10.10 重叠I/O 模型
10.10.1 基本概念
在 Winsock 中,重叠 I/O(Overlapped I/O) 模型能达到更佳的系统性能,高于 select 模 型、异步选择和事件选择3种。重叠模型的基本设计原理便是让应用程序使用一个重叠的数据 结 构(WSAOVERLAPPED), 一次投递一个或多个 Winsock I/O 请求。针对这些提交的请求, 在它们完成之后,我们的应用程序会收到通知,于是我们就可以对数据进行处理了。
重叠I/O 这个概念来自文件I/O 操作。在Win32 文件I/O 操作中,当调用ReadFile 和 WriteFile时,如果最后一个参数lpOverlapped设置为NULL, 那么线程就阻塞在这里,直到读 写完指定的数据后才返回。这样在读写大文件的时候,很多时间都浪费在等待 ReadFile 和WriteFile的返回上面。如果ReadFile 和 WriteFile 是往管道里读写数据,那么有可能阻塞得更 久,导致程序性能下降。
为了解决这个问题,Windows 引进了重叠I/O的概念,它能够同时以多个线程处理多个I/O。 其实你自己开多个线程也可以处理多个I/O, 但是系统内部对I/O 的处理在性能上有很大的优 化。重叠I/O 是 Windows 下实现异步I/O 常用的方式。
Windows 为几乎全部类型的文件提供这个工具:磁盘文件、通信端口、命名管道和套接 字。通常,使用ReadFile 和 WriteFile 就可以很好地执行重叠I/O。
重叠模型的核心是一个重叠数据结构。若想以重叠方式使用文件,必须用 FILE_FLAG_OVERLAPPED 标志打开它,例如:
HANDLE hFile = CreateFile(lpFileName, GENERIC READ I GENERIC WRITE,FILE_SHARE_READ I FILE_SHARE_WRITE, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL);
如果没有指定该标志,那么针对这个文件(句柄)而言,重叠I/O 是不可用的。如果设置 了该标志,当调用ReadFile 和 WriteFile 操作这个文件(句柄)时,必须为最后一个参数提供 OVERLAPPED 结构:
// WINBASE.H
typedef struct _OVERLAPPED
{ULONG_PTR Internal;ULONG_PTR InternalHigh;union{struct{DWORD Offset;DWORD OffsetHigh;} DUMMYSTRUCTNAME;PVOID Pointer;} DUMMYUNIONNAME;HANDLE hEvent;
} OVERLAPPED, *LPOVERLAPPED;
-
Internal:I/O 请求的状态代码。发出请求时,系统将此成员设置为状态“挂起”,以 指示操作尚未启动。请求完成后,系统将此成员设置为已完成请求的状态代码。该字 段由系统内部使用。
-
InternalHigh:被传输数据的长度。
-
DUMMYUNIONNAME.DUMMYSTRUCTNAME.Offset: 启动输入输出请求的文件位 置的低阶部分,由用户指定。只有在支持偏移(也称为文件指针机制)概念的查找设 备(如文件)上执行I/O 请求时此成员才为非零;否则,此成员必须为零。
-
DUMMYUNIONNAME.DUMMYSTRUCTNAME.OffsetHigh: 启动输入输出请求的文 件位置的高阶部分,由用户指定。只有在支持偏移(也称为文件指针机制)概念的查 找设备(如文件)上执行I/O 请求时此成员才为非零;否则,此成员必须为零。
-
DUMMYUNIONNAME.Pointer: 保留供系统使用。初始化为零后不要使用。
-
hEvent: 当操作完成时,由系统设置为信号状态的事件句柄。在将此结构传递给任何重叠函数之前,用户必须使用CreateEvent函数将此成员初始化为零或有效的事件句 柄。然后可以使用此事件同步设备的同时I/O 请求。函数(如ReadFile和 WriteFile) 在开始I/O操作之前将此句柄设置为非签名状态。操作完成后,句柄将设置为信号状 态。诸如GetOverlappedResult 和同步等待函数等函数将自动重置事件重置为非信号 状态。因此,应该使用手动重置事件。如果使用自动重置事件,等待操作完成,然后 使 用bWait 参数设置为TRUE 调 用GetOverlappedResult, 则应用程序可以停止响应。
-
在函数调用中使用该结构之前,应始终将该结构的任何未使用成员初始化为零;否则,函 数可能会失败并返回ERROR INVALID PARAMETER 。Offset 和 OffsetHigh 成员一起表示64 位文件位置。它是从文件或类似文件的设备开始的字节偏移量,由用户指定,系统不会修改这 些值。调用进程必须在将重叠结构传递给使用偏移量的函数(如ReadFile 、WriteFile 或其他相 关函数)之前设置此成员。
-
因 为I/O 异步发生,就不能确定操作是否按顺序完成。因此,这里没有当前位置的概念。 对于文件的操作,总是规定该偏移量。在数据流下(如COM 端口或socket), 没有寻找精确 偏移量的方法,所以在这些情况中系统忽略偏移量。这4个字段不应由应用程序直接进行处理 或使用,OVERLAPPED 结构的最后一个参数是可选的事件句柄。稍后会提到怎样使用这个参 数来设定事件通知完成 I/O 。现在,假定该句柄是NULL 。设 置 了OVERLAPPED 参数后, ReadFile/WriteFile 的调用会立即返回,这时你可以去做其他的事(所谓异步),系统会自动替 你完成ReadFile/WriteFile 相关的I/O 操作。你也可以同时发出几个ReadFile/WriteFile 的调用 (所谓重叠)。当系统完成I/O操作时,会将OVERLAPPED.hEvent 置信(置有信号状态), 我们可以通过调用WaitForSingleObject/WaitForMultipleObjects来等待这个I/O 完成通知,在得 到通知信号后,就可以调用GetOverlappedResult来查询I/O 操作的结果,并进行相关处理。由 此可以看出,OVERLAPPED 结构在一个重叠I/O 请求的初始化及其后续的完成之间提供了一 种沟通或通信机制。
-
以Win32 重 叠I/O 机制为基础,自Winsock2发布开始,重叠I/O 便已集成到新的Winsock 函数中,比如WSARecv/WSASend。这样一来,重叠I/O 模型便能适用于安装了Winsock 2的 所 有Windows 平台。可以一次投递一个或多个Winsock I/O请求。针对那些提交的请求,在它 们完成之后,应用程序可为它们提供服务(对I/O 的数据进行处理)。
-
比起阻塞、select 、WSAAsyncSelect 以 及WSAEventSelect 等 模 型 ,Winsock 的 重 叠I/O (Overlapped I/O) 模型使应用程序能达到更佳的系统性能。因为它和这4种模型不同的是, 使用重叠模型的应用程序通知缓冲区收发系统直接使用数据。也就是说,如果应用程序投递了 一个10KB大小的缓冲区来接收数据,且数据已经到达套接字,那么该数据将直接被复制到投递的缓冲区。而其他4种模型中,数据到达先复制到套接字自己的接收缓冲区中,此时应用程序会被系统通知有数据可读。当应用程序调用接收函数之后,数据才从套接字自己的缓冲区复 制到应用程序的缓冲区,这样就多了一次从套接字缓冲区到应用程序缓冲区的复制,性能差别 就在于此。
10.10.2 创建重叠I/O模型下的套接字
要想在一个套接字上使用重叠 I/O 模型来处理网络数据通信,首先必须使用 WSA_FLAG_OVERLAPPED标志来创建一个套接字:
sOCKET s = WSASocket(AF_INET, SOCK_STEAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
创建套接字的时候,假如使用的是socket 函数,而非WSASocket 函数,那么会默认设置 WSA_FLAG_OVERLAPPED 标志。成功创建好了一个套接字,将其与一个本地接口绑定到一 起后,便可开始进行这个套接字上的重叠I/O 操作。
为了要使用重叠结构,我们常用的 send、 recv 等收发数据的函数也都要被WSASend 、WSARecv 替换掉了。方法是调用以下Winsock 2函数,同时为它们指定一个WSAOVERLAPPED 结构参数(可选): WSASend、WSASendTo、
WSARecv、WSARecvFrom、WSAloctl、AcceptEx、TransmitFile。 若 随 WSAOVERLAPPED
结构一起调用这些函数,则函数会立即返回,无论套接字是否设为锁定模式。它们依赖于 WSAOVERLAPPED 结构来返回一个I/O 请求操作的结果。
在Windows NT和Windows 2000中,重叠I/O 模型也允许应用程序以一种重叠方式实现对套接字连接的处理。具体的做法是在监听套接字上调用AcceptEx 函数。AcceptEx 是一个特 殊的Winsock 1.1扩展函数,该函数最初的设计宗旨是在Windows NT与 Windows 2000操作 系统上使用Win32 的重叠I/O 机制。事实上,它也适用于Winsock 2中的重叠I/O 。AcceptEx 的声明如下:
BOOL AcceptEx(SOCKET SListenSocket,SOCKET SAcceptSocket,PVOID SAcceptSocket,DWORD dwReceiveDataLength,DWORD dwLocalAddressLength,DWORD dwRemoteAddressLength,LPOVERLAPPED lpOverlapped
)
- sListenSocket: 标识已用listen 函数调用过的套接字的描述符。服务器程序在此套接 字上等待连接。
- sAcceptSocket: 一种描述符,用于标识接受传入连接的套接字。
- lpOutputBuffer:指向缓冲区的指针。该缓冲区是一个特殊的缓冲区,因为它要负责3 种数据的接收:服务器的本地地址,客户机的远程地址,以及在新建连接上发送的第 一个数据块。
- dwReceiveDataLength:以字节为单位,指定了在lpOutputBuffer 缓冲区中,保留多大 的空间来接收数据。如果将这个参数设为0,那么在接受连接的过程中不会再一起接 收任何数据。
- dwLocalAddressLength: 为本地地址信息保留的字节数。此值必须至少比正在使用的 传输协议的最大地址长度多16字节。举个例子,假定正在使用的是TCP/IP 协议,那么这里的大小应设为“SOCKADDR IN 结构的长度+16字节”。
- dwRemoteAddressLength: 为远程地址信息保留的字节数。此值必须至少比正在使用 的传输协议的最大地址长度多16字节,不能为零。
- lpdwBytesReceived:用于返回接收到的实际数据量,以字节为单位。只有在操作以同 步方式完成的前提下才会设置这个参数。假如 AcceptEx 函数返回 ERROR_IO_PENDING, 那么这个参数永远都不会设置,我们必须利用完成事件通知 机制来获知实际读取的字节量。
- lpOverlapped: 它对应的是一个OVERLAPPED 结构,允许AcceptEx 以一种异步方式 工作。如我们早先所述,只有在一个重叠I/O应用中该函数才需要使用事件对象通知 机制(hEvent 字段),这是由于此时没有一个完成例程参数可供使用。
如果函数成功,就返回 TRUE 。 如果函数失败,就返回 FALSE, 此时可以调用 WSAGetLastError 函数返回错误码。若WSAGetLastError 返 回ERROR_IO_PENDING, 则操作 已成功启动,并且仍在进行中。若错误码为WSAECONNRESET, 则表示有一个传入连接,但 随后在接受呼叫之前被远程对等端终止。
10.10.3 获取重叠I/O操作完成结果
异步I/O 请求挂起后,最终要知道I/O操作是否完成。 一个重叠I/O 请求最终完成后,应 用程序要负责取回重叠I/O 操作的结果。对于读,直到I/O 完成,接收缓冲区才有效。对于写, 要知道写是否成功。有几种方法可以做到这一点,最直接的方法是调用 WSAGetOverlappedResult, 其函数原型如下:
BOOL WSAAPI WSAGetOverlappedResult(SOCKET s,LPWSAOVERLAPPED lpOverlapped,LPDWORD lpcbTransfer,BOOL fWait,LPDWORD lpdwFlags
);
- s: 套接字句柄。
- lpOverlapped: 关联的WSAOVERLAPPED 结构,在调用CreateFile 、WSASocket 或 AcceptEx 时指定。
- lpcbTransfer: 指向字节计数指针,负责接收一次重叠发送或接收操作实际传输的字 节 数 。
- fWait: 确定命令是否等待的标志,用于决定函数是否应该等待一次重叠操作完成。 若将该参数设为TRUE, 则直到操作完成函数才返回;若设为FALSE, 而且操作仍 然处于未完成状态,那么WSAGetOverlappedResult 函数会返回FALSE 值。
- lpdwFlags: 指向32位变量的指针,该变量将接收一个或多个补充完成状态的标志。 如果重叠操作是通过WSARecv 或 WSARecvFrom 启动的,那么此参数将包含lpFlags
- 参数的结果值。此参数不能是空指针。
如果函数成功,那么返回值为TRUE 。这意味着重叠操作已成功完成,并且lpcbTransfer 指向的值已更新。
如果函数回FALSE, 就意味着重叠操作尚未完成,或者重叠操作已完成但有错误,或者 由 于WSAGetOverlappedResult 的一个或多个参数中的错误而无法确定重叠操作的完成状态。 失败时,lpcbTransfer 指向的值将不会更新。使用WSAGetLastError 可以确定失败的原因。
下面介绍两种常用重叠I/O 完成通知的方法。
10.10.4 基于事件通知(有64个socket 的限制)
套接字重叠I/O 的事件通知方法要求事件对象与WSAOVERLAPPED 结构关联在一起。 当 I/O 操作完成后,该事件对象从未触发状态变为触发状态。在应用程序中先调用 WSAWaitForMultipleEvents 函数等待该事件的发生。获得该事件对象对应的 WSAOVERLAPPED 结构后可以根据 Internal 和 InternalHigh 字段(也可以调用 WSAGetOverlappedResult 函 数 ) 判 断I/O 完成的情况。
具体步骤如下:
- (1)创建具有WSAOVERLAPPED 标志的套接字。如果调用socket()函数,那么默认创 建具有 WSAOVERLAPPED 标志的套接字。如果调用WSASocket 函数,就需要明确指定 WSAOVERLAPPED 标志。
- (2)为套接字定义WSAOVERLAPPED 结构,并清零。
- (3)调用WSACreateEvent 函数创建事件对象,并将该事件句柄分配给WSAOVERLAPPED 结构的hEvent 字段。
- (4)调用接收或者发送函数。
- (5)调用WSAWaitForMultipleEvents 函数等待与重叠I/O 关联的事件变为已触发状态。
- (6)WSAWaitForMultipleEvents 返回后,调用WSAResetEvent 函数,将该事件对象恢复 为未触发态。
- (7) 调 用WSAGetOverlappedResult 函数判断重叠I/O 的完成状态。
下面的实例演示了使用socket 重 叠I/O 模型开发服务程序的步骤。该程序涉及两个线程: 接收线程用于接受客户端连接请求,初始化重叠I/O 操作;服务线程用于重叠I/O 处理。
【例10.5】利用事件通知实现重叠I/O模型
#define _WINSOCK_DEPRECATED_NO_WARNINGS
#include <winsock2.h>
#include <Windows.h>
#include <iostream>
#pragma comment(lib,"Ws2_32.lib")using std::cout;
using std::cin;
using std::endl;
using std::ends;void WSAEventServerSocket()
{SOCKET server = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);if (server == INVALID_SOCKET) {cout << "创建SOCKET失败!,错误代码:" << WSAGetLastError() << endl;return;}int error = 0;sockaddr_in addr_in;addr_in.sin_family = AF_INET;addr_in.sin_port = htons(6000);addr_in.sin_addr.s_addr = INADDR_ANY;error = ::bind(server, (sockaddr*)&addr_in, sizeof(sockaddr_in));if (error == SOCKET_ERROR) {cout << "绑定端口失败!,错误代码:" << WSAGetLastError() << endl;return;}listen(server, 5);if (error == SOCKET_ERROR) {cout << "监听失败!,错误代码:" << WSAGetLastError() << endl;return;}cout << "成功监听端口 :" << ntohs(addr_in.sin_port) << endl;WSAEVENT eventArray[WSA_MAXIMUM_WAIT_EVENTS]; // 事件对象数组SOCKET sockArray[WSA_MAXIMUM_WAIT_EVENTS]; // 事件对象数组对应的SOCKET句柄int nEvent = 0; // 事件对象数组的数量 WSAEVENT event0 = ::WSACreateEvent();::WSAEventSelect(server, event0, FD_ACCEPT | FD_CLOSE);eventArray[nEvent] = event0;sockArray[nEvent] = server;nEvent++;while (true) {int nIndex = ::WSAWaitForMultipleEvents(nEvent, eventArray, false, WSA_INFINITE, false);if (nIndex == WSA_WAIT_IO_COMPLETION || nIndex == WSA_WAIT_TIMEOUT) {cout << "等待时发生错误!错误代码:" << WSAGetLastError() << endl;break;}nIndex = nIndex - WSA_WAIT_EVENT_0;WSANETWORKEVENTS event;SOCKET sock = sockArray[nIndex];::WSAEnumNetworkEvents(sock, eventArray[nIndex], &event);if (event.lNetworkEvents & FD_ACCEPT) {if (event.iErrorCode[FD_ACCEPT_BIT] == 0) {if (nEvent >= WSA_MAXIMUM_WAIT_EVENTS) {cout << "事件对象太多,拒绝连接" << endl;continue;}sockaddr_in addr;int len = sizeof(sockaddr_in);SOCKET client = ::accept(sock, (sockaddr*)&addr, &len);if (client != INVALID_SOCKET) {cout << "接受了一个客户端连接 " << inet_ntoa(addr.sin_addr) << ":" << ntohs(addr.sin_port) << endl;WSAEVENT eventNew = ::WSACreateEvent();::WSAEventSelect(client, eventNew, FD_READ | FD_CLOSE | FD_WRITE);eventArray[nEvent] = eventNew;sockArray[nEvent] = client;nEvent++;}}}else if (event.lNetworkEvents & FD_READ) {if (event.iErrorCode[FD_READ_BIT] == 0) {char buf[2500];ZeroMemory(buf, 2500);int nRecv = ::recv(sock, buf, 2500, 0);if (nRecv > 0) {cout << "收到一个消息 :" << buf << endl;char strSend[] = "hi,client,I am server, I recvived your message.";::send(sock, strSend, strlen(strSend), 0);}}}else if (event.lNetworkEvents & FD_CLOSE) {::WSACloseEvent(eventArray[nIndex]);::closesocket(sockArray[nIndex]);cout << "一个客户端连接已经断开了连接" << endl;for (int j = nIndex; j < nEvent - 1; j++) {eventArray[j] = eventArray[j + 1];sockArray[j] = sockArray[j + 1];}nEvent--;}else if (event.lNetworkEvents & FD_WRITE) {cout << "一个客户端连接允许写入数据" << endl;}} // end while::closesocket(server);
}int main(){WSADATA wsaData;int error;WORD wVersionRequested;wVersionRequested = WINSOCK_VERSION;error = WSAStartup(wVersionRequested, &wsaData);if (error != 0) {WSACleanup();return 0;}WSAEventServerSocket();WSACleanup();return 0;
}
这个模型与其他模型不同的是它使用Winsock2 提供的异步I/O 函 数WSARecv。 在调用 WSARecv 时,指定一个WSAOVERLAPPED 结构,这个调用不是阻塞的,也就是说,它会立 刻返回。一旦有数据到达,被指定的WSAOVERLAPPED 结构中的hEvent被 Signaled。在 取 得接收的数据后,把数据原封不动地打印出来,或者同志们也可以调用一下 send 函数,把数 据发送回客户端(客户端那里有接收线程),然后重新激活一个WSARecv 异步操作。有一个 函数值得注意,那就是用于接收数据的函数WSARecv。该函数声明如下:
int WSAAPI WSARecv(SOCKET s,LPWSABUF lpBuffers,DWORD dwBufferCount,LPDWORD lpNumberOfBytesRecvd,LPDWORD lpFlags,LPWSAOVERLAPPED lpOverlapped,LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);
- s: 标识一个已连接的套接字的描述符。
- lpBuffers: 指 向WSABUF 结构数组的指针。每个WSABUF 结构都包含一个指向缓 冲区的指针和缓冲区的长度(以字节为单位)。
- dwBufferCount:lpBuffers 数组中WSABUF 结构的个数。
- lpNumberOfBytesRecvd: 如果接收操作立即完成,就指向接收数据的字节数指针。
如果 lpOverlapped 参数不是空值,就对此参数使用空值,以避免潜在的错误结果。只有 lpOverlapped参数不为空时,此参数才能为空。
- lpFlags: 指向用于修改WSARecv 函数调用行为的标志的指针。
- lpOverlapped: 指 向 W SAOVERLAPPED 结构的指针(对于非重叠的套接字忽略)。
- lpCompletionRoutine: 当接收操作完成时调用的完成例程的指针(对于非重叠的套接 字忽略)。
如果没有发生错误,并且接收操作立即完成,则函数返回零。在这种情况下, 一旦调用线 程处于可警报状态,就已经计划调用完成例程。否则,将返回SOCKET_ERROR, 此时可以通 过调用WSAGetLastError 来检索特定的错误代码。错误代码WSA _IO_PENDING 表示重叠操 作已成功启动,稍后将指示完成。任何其他错误代码都表示重叠操作未成功启动,不会出现完 成指示 。
10.10.5 基于完成例程
10.10.5.1 基本概念
如果你想要使用重叠I/O 机制带来的高性能模型,又懊恼于基于事件通知的重叠模型要收到64个等待事件的限制,还有点畏惧完成端口稍显复杂的初始化过程,那么“完成例程”无 疑是最好的选择!因为完成例程摆脱了事件通知的限制,可以连入任意数量客户端而不用另开 线程,也就是说只用很简单的一些代码就可以利用Windows 内部的I/O 机制来获得网络服务 器的高性能。
在基于事件通知的重叠I/O 模型中,在你投递了一个请求(比如WSARecv) 以后,系统 在完成以后是用事件来通知你的,而在完成例程中,系统在网络操作完成以后会自动调用你提 供的回调函数,区别仅此而已。采用完成例程的服务器端,通信流程如图10-12所示。
从图10-12中可以看到,服务器端存在一个明显的异步过程,也就是说我们把客户端连入的 SOCKET 与一个重叠结构绑定之后,便可以将通信过程全权交给系统内部去帮我们调度处理,我 们在主线程中可以边做其他的事情边等候系统完成的通知。这就是完成例程高性能的原因所在。
如果还没有看明白,我们打个通俗易懂的比方:完成例程的处理过程,也就像我们告诉系 统,“我想要在网络上接收网络数据,你去帮我办一下”(投递WSARecv 操作),“不过我 并不知道网络数据何时到达,总之在接收到网络数据之后,你就直接调用我给你的这个函数(比 如 CompletionProess),把它们保存到内存或者显示到界面中等,全权交给你处理了”。于是 乎,系统在接收到网络数据之后, 一方面会给我们一个通知,另一方面系统也会自动调用我们 事先准备好的回调函数,就不需要我们自己操心了。
10.10.5.2 完成例程的优点
基于完成例程的重叠I/O 模型相对于事件选择I/O 模型的优越之处在于,重叠I/O 模型完 全解决了recv 的阻塞问题。在以前的模型当中,recv 只是接收数据到来的通知,之后依旧要己去内核复制数据到用户空间中,如今recv 过程全部交由操作系统完成,减去了数据等待 与将数据从内核复制到程序缓冲区的时间,并且其间不占用程序自身的时间片。
基于完成例程的重叠I/O 异步模型如图10-13所示。
事件选择网络模型的数据复制如图10-14所示。
此外,完成例程相比基于事件响应的重叠I/O 模型的优越之处在于,完成例程并没有64 个事件的上限,而是操作系统调用完成例程(也就是一个由操作系统调用的回调函数)对接收 到的数据进行处理。虽然都是基于重叠I/O,但是因为前两种模型都需要自己来管理任务的分 派,所以性能上没有区别。
10.10.5.3 完成例程的关键函数
完成例程方式和前面的事件通知方式最大的不同之处在于,我们需要提供一个回调函数供 系统收到网络数据后自动调用,回调函数的参数定义应该遵照如下函数原型:
void CALLBACK_CompletionRoutineFunc(DWORD dwError,DWORD cbTransferred,LPWSAOVERLAPPED lpOverlapped,DWORD dwFlags);
- 其中,参数dwError 标志咱们投递的重叠操作,比如WSARecv, 完成的状态是什么;
- 参 数 cbTransferred 指明了在重叠操作期间,实际传输的字节量是多大;
- 参数lpOverlapped参数 指明传递到最初的I/O 调用内的一个重叠结构;
- 参数dwFlags 返回操作结束时可能用的标志(一 般没用)。
注意:函数名字随便起,但是参数类型不能错。还有一点需要重点提一下,因为我们需要 给系统提供一个如上面定义的那样的回调函数,以便系统在完成了网络操作后自动调用,这里 就需要提一下究竟是如何把这个函数与系统内部绑定的。比如,在WSARecv 函数中是这样绑 定 的 :
int WSARecv(SOCKET s,LPWSABUF lpBuffers, DWORD dwBufferCount,LPDWORD lpNumberOfBytesRecvd, LPDWORD lpFlags,LPWSAOVERLAPPED lpOverlapped,LPWSAOVERLAPPED COMPLETION ROUTINE lpCompletionRoutine);
这个函数前面其实介绍过,因为比较重要,我们再次简单地讲述一下这个函数。
- 参数 s 投递这个操作的套接字;
- 参数IpBuffers表示接收缓冲区,与recv函数不同,这里需要一个由 WSABUF结构构成的数组;
- 参数dwBufferCount 表示数组中WSABUF 结构的数量,设置为1 即可;
- 参数lpNumberOfBytesRecvd 表示当接收操作完成时,会返回函数调用所接收到的字节 数 ;
- lpOverlapped 表示“绑定”的重叠结构;
- 最后一个参数lpCompletionRoutine 就是完成例程 函数的指针。
我们的回调函数 CompletionRoutineFunc和 WSARecv 操作关联起来了,系统一 完成接收数据,就回调我们的函数,之后我们就可以在里面处理数据了。
如果觉得有些抽象,我们可以看一段代码:
SOCKET s;
WSABUF DataBuf; // 定义WSABUF结构的缓冲区
#define DATA BUFSIZE 4096 // 初始化一下DataBufchar buffer[DATA BUFSIZE];
ZeroMemory(buffer, DATA BUFSIZE);
DataBuf.len = DATA BUFSIZE;
DataBuf.buf = buffer;
DWORD dwBufferCount = 1, dwRecvBytes = 0, Flags = 0;// 建立需要的重叠结构,每个连入的sOCKET上的每一个重叠操作都得绑定一个
WSAOVERLAPPED Acceptoverlapped; // 如果要处理多个操作,这里当然需要一个WSAOVERLAPPED 数组
ZeroMemory(&AcceptOverlapped, sizeof(WSAOVERLAPPED));// 做了这么多工作,终于可以使用WSARecv 来把我们的完成例程函数绑定上了
// 当然,假设我们的_CompletionRoutine 函数已经定义好了
WSARecv(s, &DataBuf, dwBufferCount, &dwRecvBytes, &Flags, &AcceptOverlapped, _CompletionRoutine);
其他参数我们可以先不用细看,重点关注最后一个。
10.10.5.4 完成例程的实现步骤
理论知识方面需要知道的就是这么多。下面我们配合代码, 一步步地讲解如何亲手实现一 个基于完成例程的重叠I/O 模型。
具体步骤如下:
- 第一步,创建一个套接字,开始在指定的端口上监听连接请求。
- 第一步很简单,和其他的SOCKET 初始化并无多大区别。需要注意的是,为了突出重点, 笔者去掉了错误处理,具体开发时可不能这样,尽管这里出错的概率比较小:
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData); // 创建TCP 套接字
ListenSocket = socket(AF INET, SOCK STREAM, IPPROTO TCP);
SOCKADDR_IN ServerAddr; // 分配端口及协议簇并绑定 ServerAddr.sin family=AF INET;
ServerAddr.sin addr.S un.S addr = htonl(INADDR ANY); // 在8888端口监听,端口号可以随意更改,但最好不要少于1024
ServerAddr.sin port = htons(8888);
bind(ListenSocket, (LPSOCKADDR)&ServerAddr, sizeof(ServerAddr)); // 绑定套接字
listen(ListenSocket, 5); // 开始监听
第二步,接受一个入站的连接请求。调用accept 函数即可:
AcceptSocket = accept(ListenSocket, NULL, NULL);
如果想要获得连入客户端的信息,则accept 的后两个参数不要用NULL, 而是这样:
SOCKADDR IN ClientAddr; // 定义一个客户端的地址结构作为参数
int addr length = sizeof(ClientAddr);
AcceptSocket = accept(ListenSocket, (SOCKADDR *)&ClientAddr, &addr length);
// 于是乎,我们就可以轻松得知连入客户端的信息了
LPCTSTR lpIP = inet ntoa(ClientAddr.sin addr); // 连入客户端的 IP
UINT nPort = ClientAddr.sin port; // 连入客户端的Port
- 第三步,准备好我们的重叠结构。有新的套接字连入以后,新建一个WSAOVERLAPPED 重叠结构(当然也可以提前建立 好),准备绑定到我们的重叠操作上去。这里也可以看到和基于事件的明显区别,就是不用再 为WSAOVERLAPPED 结构绑定一个hEvent 了 。这里只定义一个,实际上是每一个SOCKET 的每一个操作都需要绑定一个重叠结构,所 以在实际使用面对多个客户端的时候要定义为数组,详见示例代码:
WSAOVERLAPPED AcceptOverlapped;
ZeroMemory(&AcceptOverlapped, sizeof(WSAOVERLAPPED)); // 置 零
- 第四步,开始在套接字上投递WSARecv 请求。这一步需要将第三步准备的WSAOVERLAPPED 结构和我们定义的完成例程函数为参数。 各个变量都已经初始化完成以后,我们就可以开始进行具体的Socket 通信函数调用了,然后 让系统内部的重叠结构来替我们管理I/O 请求,我们只用等待网络通信完成后调用回调函数就 可 以 了 。这个步骤的重点是绑定一个Overlapped 变量和一个完成例程函数:
// 将WSAOVERLAPPED 结构指定为一个参数,在套接字上投递一个异步WSARecv() 请求 // 并提供下面的作为完成例程的_CompletionRoutine 回调函数(函数名字)
if (WSARecv(AcceptSocket, &DataBuf, 1,&dwRecvBytes, &Flags,&AcceptOverlapped,_CompletionRoutine) == SOCKET_ERR OR) // 注意我们传入的回调函数指针
{if (WSAGetLastError() != WSA_IO_PENDING){ReleaseSocket(nSockIndex);continue;}
}
- 第五步,调用WSAWaitForMultipleEvents 函数或者SleepEx 函数等待重叠操作返回的结 果。我们在前面提到过,投递完WSARecv 操作,并绑定了Overlapped 结构和完成例程函数之 后,基本就完事大吉了。等到系统自己去完成网络通信,并在接收到数据的时候,会自动调用 我们的完成例程函数。
我们在主线程中需要做的事情只有做别的事情,并且等待系统完成了完成例程调用后的返 回结果。就是说在WSARecv 调用发起完毕之后,我们不得不在后面紧跟上一些等待完成结果 的代码。有两种办法可以实现:
- (1)和上一节重叠I/O 中讲到的一样,我们可以使用WSAWaitForMultipleEvent 来等待 重叠操作的事件通知,演示如下:
/*因为WSAWaitForMultipleEvents()API 要求在一个或多个事件对象上等待,但是这个事件
数组已经不是和SOCKET 相关联的了,因此不得不创建一个伪事件对象*/
WSAEVENT EventArray[1];
EventArray[0] = WSACreateEvent(); // 建立一个事件
// 然后等待重叠请求完成就可以了。注意保存返回值,这个很重要
DWORD dwIndex = WSAWaitForMultipleEvents(1, EventArray, FALSE, WSA_INFINITE, TRUE);
WSAWaitForMultipleEvents参数的含义上一节中已经介绍过了。调用这个函数以后,线程 就会置于一个警觉的等待状态。注意,fAlertable 参数一定要设置为TRUE。
- (2)可以直接使用SleepEx 函数来完成等待,效果是一样的。SleepEx 函数调用起来就简 单得多了,它的函数原型定义是这样的:
DWORD SleepEx(DWORD dwMilliseconds, BOOL bAlertable);
其中,参数dwMilliseconds 为等待的超时时间,如果设置为INFINITE 就会一直等待下去; 参数bAlertable 表示是否置于警觉状态,如果为FALSE, 则一定要等待超时时间完毕之后才 会返回。如果指定的时间间隔已过期,则函数返回值为零。如果函数由于一个或多个I/O 完成 回调函数而返回,则返回值为WAIT_IO_COMPLETION, 只有当bAlertable为 TRUE, 并且调 用SleepEx函数的线程与调用扩展I/O函数的线程相同时才会发生这种情况。
这里我们希望重叠操作一完成就能返回,所以一定要设置为TRUE。调用这个函数的时候, 同样注意用一个DWORD 类型变量来保存它的返回值,后面会派上用场。
- 第六步,通过等待函数的返回值取得重叠操作的完成结果。这是我们最关心的事情,费了那么大劲投递的这个重叠操作究竟是什么结果呢?就是通过 上一步中我们调用的等待函数的DWORD 类型的返回值。正常情况下,在操作完成之后,应 该是返回 WAIT_IO_COMPLETION, 如果返回的是WAIT_TIMEOUT, 则表示等待设置的超时时间到了,但是重叠操作依旧没有完成,应该通过循环再继续等待。如果是其他返回值,就 坏事了,说明网络通信出现了其他异常,程序就可以报错退出了。
判断返回值的代码大致如下:
// 返 回WAIT IO COMPLETION 表示一个重叠请求完成例程代码的结束,继续为更多的完成例程服务
if (dwIndex == WAIT IO COMPLETION)
{TRACE(" 重叠操作完成...\n");
}
else if (dwIndex == WAIT TIMEOUT)
{TRACE(" 超时了,继续调用等待函数\n");
}
else
{TRACE(" 出错了.…\n");
}
操作完成了之后,就说明我们上一个操作已经成功了。成功了之后做什么?当然是继续投 递下一个重叠操作了。继续上面的循环。
- 第七步,继续回到第四步,在套接字上继续投递WSARecv 请求,重复第4~7步。
- 第八步,处理接收到的数据。
忙活了这么久,客户端传来的数据到底在哪里接收啊?怎么一点都没有提到呢?这个问题 提得好,我们写了这么多代码图什么呢?其实想要读取客户端的数据很简单,因为我们在 WSARecv 调用的时候传递了一个WSABUF 变量,用于保存网络数据,而在我们写的完成例 程回调函数里面就可以取到客户端传送来的网络数据了。系统在调用我们完成例程函数的时候 网络操作已经完成了,WSABUF 里面已经有我们需要的数据了,只是通过完成例程来进行后 期的处理。具体可以参考示例代码。其中,DataBuf.buf就是一个char*字符串指针。
下面我们来看两个例子,分别使用SleepEx 和 WSAWaitForMultipleEvents 函数。
【例10.6】基于完成例程的重叠I/O例 子(SleepEx 版 )
# define _WINSOCK_DEPRECATED_NO_WARNINGS
#include <WINSOCK2.H>
#include <stdio.h>#define PORT 6000
#define MSGSIZE 1024#pragma comment(lib, "ws2_32.lib")typedef struct
{WSAOVERLAPPED overlap;WSABUF Buffer;char szMessage[MSGSIZE];DWORD NumberOfBytesRecvd;DWORD Flags;SOCKET sClient;
}PER_IO_OPERATION_DATA, * LPPER_IO_OPERATION_DATA;DWORD WINAPI WorkerThread(LPVOID);
void CALLBACK CompletionROUTINE(DWORD, DWORD, LPWSAOVERLAPPED, DWORD);SOCKET g_sNewClientConnection;
BOOL g_bNewConnectionArrived = FALSE;int main()
{WSADATA wsaData;SOCKET sListen;SOCKADDR_IN local, client;DWORD dwThreadId;int iaddrSize = sizeof(SOCKADDR_IN);// Initialize Windows Socket libraryWSAStartup(0x0202, &wsaData);// Create listening socketsListen = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);// Bindlocal.sin_addr.S_un.S_addr = htonl(INADDR_ANY);local.sin_family = AF_INET;local.sin_port = htons(PORT);bind(sListen, (struct sockaddr*)&local, sizeof(SOCKADDR_IN));// Listenlisten(sListen, 3);puts("服务器已经启动。。。\n");// Create worker threadCreateThread(NULL, 0, WorkerThread, NULL, 0, &dwThreadId);while (TRUE){// Accept a connectiong_sNewClientConnection = accept(sListen, (struct sockaddr*)&client, &iaddrSize);g_bNewConnectionArrived = TRUE;printf("Accepted client:%s:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));}
}DWORD WINAPI WorkerThread(LPVOID lpParam)
{LPPER_IO_OPERATION_DATA lpPerIOData = NULL;while (TRUE){if (g_bNewConnectionArrived){// Launch an asynchronous operation for new arrived connectionlpPerIOData = (LPPER_IO_OPERATION_DATA)HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(PER_IO_OPERATION_DATA));lpPerIOData->Buffer.len = MSGSIZE;lpPerIOData->Buffer.buf = lpPerIOData->szMessage;lpPerIOData->sClient = g_sNewClientConnection;WSARecv(lpPerIOData->sClient,&lpPerIOData->Buffer,1,&lpPerIOData->NumberOfBytesRecvd,&lpPerIOData->Flags,&lpPerIOData->overlap,CompletionROUTINE);g_bNewConnectionArrived = FALSE;}SleepEx(1000, TRUE);}return 0;
}void CALLBACK CompletionROUTINE(DWORD dwError,DWORD cbTransferred,LPWSAOVERLAPPED lpOverlapped,DWORD dwFlags)
{LPPER_IO_OPERATION_DATA lpPerIOData = (LPPER_IO_OPERATION_DATA)lpOverlapped;if (dwError != 0 || cbTransferred == 0){// Connection was closed by clientclosesocket(lpPerIOData->sClient);HeapFree(GetProcessHeap(), 0, lpPerIOData);}else{lpPerIOData->szMessage[cbTransferred] = '\0';puts(lpPerIOData->szMessage);//打印下收到的客户端信息send(lpPerIOData->sClient, lpPerIOData->szMessage, cbTransferred, 0);//再重新发回给客户端// Launch another asynchronous operationmemset(&lpPerIOData->overlap, 0, sizeof(WSAOVERLAPPED));lpPerIOData->Buffer.len = MSGSIZE;lpPerIOData->Buffer.buf = lpPerIOData->szMessage;WSARecv(lpPerIOData->sClient,&lpPerIOData->Buffer,1,&lpPerIOData->NumberOfBytesRecvd,&lpPerIOData->Flags,&lpPerIOData->overlap,CompletionROUTINE);}
}
用完成例程来实现重叠I/O 比用事件通知简单得多。在这个模型中,主线程只用不停地接 受连接即可;辅助线程判断有没有新的客户端连接被建立,如果有,就为那个客户端套接字激 活一个异步的WSARecv 操作,然后调用SleepEx使线程处于一种可警告的等待状态,以使得 I/O完成后CompletionROUTINE 可以被内核调用。如果辅助线程不调用SleepEx, 则内核在完 成一次I/O 操作后,无法调用完成例程(因为完成例程的运行应该和当初激活WSARecv 异步 操作的代码在同一个线程之内)。
完成例程内的实现代码比较简单,它取出接收到的数据,然后将数据原封不动地发送给客 户端,最后重新激活另一个WSARecv 异步操作。注意,在这里用到了“尾随数据”。我们在 调用 WSARecv 的时候,参数 lpOverlapped 实际上指向一个比它大得多的结构 PER_IO_OPERATION_DATA, 这个结构除了WSAOVERLAPPED 以外,还被我们附加了缓 冲区的结构信息,另外还包括客户端套接字等重要的信息。这样,在完成例程中通过参数 lpOverlapped 拿到的不仅仅是WSAOVERLAPPED 结构,还有后边尾随的包含客户端套接字和 接收数据缓冲区等重要信息。这样的C 语言技巧在后面介绍完成端口的时候还会使用到。
【例10.7】基于完成例程的重叠I/O例 子(WSAWaitForMultipleEvents 版 )
#include <WinSock2.h>
#include <process.h>
#include <stdio.h>
#pragma comment(lib,"ws2_32.lib")#define PORT 6000
#define MAXBUF 128//自定义一个存放socket信息的结构体,用于完成例程中对OVERLAPPED的转换
typedef struct _SOCKET_INFORMATION {OVERLAPPED Overlapped; //这个字段一定要放在第一个,否则转换的时候,数据的赋值会出错 SOCKET Socket; //后面的字段顺序可打乱并且不限制字段数,也就是说你还可以多定义几个字段CHAR Buffer[MAXBUF];WSABUF wsaBuf;
} SOCKET_INFORMATION, *LPSOCKET_INFORMATION;SOCKET g_sClient; //不断新加进来的client//打开服务器
BOOL OpenServer(SOCKET* sServer)
{BOOL bRet = FALSE;WSADATA wsaData = { 0 };SOCKADDR_IN addrServer = { 0 };addrServer.sin_family = AF_INET;addrServer.sin_port = htons(PORT);addrServer.sin_addr.S_un.S_addr = inet_addr("127.0.0.1");do{if (!WSAStartup(MAKEWORD(2, 2), &wsaData)){if (LOBYTE(wsaData.wVersion) == 2 || HIBYTE(wsaData.wVersion) == 2){//在套接字上使用重叠I/O模型,必须使用WSA_FLAG_OVERLAPPED标志创建套接字//g_sServer = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP,NULL,0,WSA_FLAG_OVERLAPPED);*sServer = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);if (*sServer != INVALID_SOCKET){if (SOCKET_ERROR != bind(*sServer, (SOCKADDR*)&addrServer, sizeof(addrServer))){if (SOCKET_ERROR != listen(*sServer, SOMAXCONN)){bRet = TRUE;break;}closesocket(*sServer);}closesocket(*sServer);}}}} while (FALSE);return bRet;
}//完成例程
void CALLBACK CompeletRoutine(DWORD dwError, DWORD dwBytesTransferred, LPWSAOVERLAPPED Overlapped, DWORD dwFlags)
{DWORD dwRecvBytes;DWORD dwFlag;//强制转换为我们自定义的结构,这里就解释了为什么第一个字段要是OVERLAPPED//因为转换后首地址肯定会相同,读取的数据一定会是Overlapped的数据//所以要先把Overlapped的数据保存下来,接下来内存中的数据再由系统分配到各个字段中LPSOCKET_INFORMATION pSi = (LPSOCKET_INFORMATION)Overlapped;if (dwError != 0) //错误显示printf("I/O operation failed with error %d\n", dwError);if (dwBytesTransferred == 0)printf("Closing socket %d\n\n", pSi->Socket);if (dwError != 0 || dwBytesTransferred == 0) //错误处理{closesocket(pSi->Socket);GlobalFree(pSi);return;}//如果已经发送完成了,接着投递下一个WSARecvprintf("Recv%d:%s\n", pSi->Socket, pSi->wsaBuf.buf);dwFlag = 0;ZeroMemory(&(pSi->Overlapped), sizeof(WSAOVERLAPPED));pSi->wsaBuf.len = MAXBUF;pSi->wsaBuf.buf = pSi->Buffer;if (WSARecv(pSi->Socket, &(pSi->wsaBuf), 1, &dwRecvBytes, &dwFlag, &(pSi->Overlapped), CompeletRoutine) == SOCKET_ERROR){if (WSAGetLastError() != WSA_IO_PENDING){printf("WSARecv() failed with error %d\n", WSAGetLastError());return;}}
}//把client和完成例程绑定起来
unsigned int __stdcall ThreadBind(void* lparam)
{DWORD dwFlags;LPSOCKET_INFORMATION pSi;DWORD dwIndex;DWORD dwRecvBytes;WSAEVENT eventArry[1];eventArry[0] = (WSAEVENT)lparam;while (1){//等待一个完成例程返回while (TRUE){dwIndex = WSAWaitForMultipleEvents(1, eventArry, FALSE, WSA_INFINITE, TRUE);if (dwIndex == WSA_WAIT_FAILED){printf("WSAWaitForMultipleEvents() failed with error %d\n", WSAGetLastError());return FALSE;}if (dwIndex != WAIT_IO_COMPLETION)break;}//重设事件WSAResetEvent(eventArry[0]);//为SOCKET_INFORMATION分配一个全局内存空间,相当于全局变量了//这里为什么要分配全局的呢?因为我们要在完成例程中引用socket的数据if ((pSi = (LPSOCKET_INFORMATION)GlobalAlloc(GPTR, sizeof(SOCKET_INFORMATION))) == NULL){printf("GlobalAlloc() failed with error %d\n", GetLastError());return 1;}//填充各个字段pSi->Socket = g_sClient;ZeroMemory(&(pSi->Overlapped), sizeof(WSAOVERLAPPED));pSi->wsaBuf.len = MAXBUF;pSi->wsaBuf.buf = pSi->Buffer;dwFlags = 0;//投递一个WSARecvif (WSARecv(pSi->Socket, &(pSi->wsaBuf), 1, &dwRecvBytes, &dwFlags,&(pSi->Overlapped), CompeletRoutine) == SOCKET_ERROR){if (WSAGetLastError() != WSA_IO_PENDING){printf("WSARecv() failed with error %d\n", WSAGetLastError());return 1;}}printf("Socket %d got connected...\n", g_sClient);}return 0;
}//接受client请求线程
unsigned int __stdcall ThreadAccept(void* lparam)
{SOCKET sServer = *(SOCKET*)lparam;WSAEVENT event = WSACreateEvent();_beginthreadex(NULL, 0, ThreadBind, event, 0, NULL);while (TRUE){g_sClient = accept(sServer, NULL, NULL);if (g_sClient != INVALID_SOCKET)WSASetEvent(event);}return 0;
}int main(int argc, char **argv)
{SOCKET sServer = INVALID_SOCKET;puts("服务器已经启动...\n");if (OpenServer(&sServer))_beginthreadex(NULL, 0, ThreadAccept, &sServer, 0, NULL);Sleep(10000000);return 0;
}
参考书籍《Visual C++2017网络编程实战》