TcpServer 服务器优化之后,加了多线程,对心跳包进行优化
TcpServer.h
#ifndef TCPSERVER_H
#define TCPSERVER_H#include <iostream>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <vector>
#include <map>
#include <string>
#include <ctime>// 引入静态链接库
#pragma comment(lib, "ws2_32.lib")
#define HEARTBEATTIME 1000class TcpServer {
public:TcpServer();~TcpServer();// 启动服务器,监听指定端口bool start(int port);// 停止服务器void stop();// 发送数据给指定客户端int sendData(SOCKET clientSocket, const char* data, int dataLength);// 处理服务器业务逻辑,通常在循环中调用void handle();//链接static DWORD WINAPI ThreadAccept(LPVOID lpParam);//接收数据static DWORD WINAPI ThreadRecvData(LPVOID lpParam);//心跳包static DWORD WINAPI ThreadHeartBeat(LPVOID lpParam);
public:std::vector<SOCKET> socketsToRemove;BOOL m_bExit;//程序是否关闭BOOL m_bHeartBeat;//是否启用心跳包int heartbeatInterval; // 心跳包间隔时间(秒)
private:SOCKET listenSocket;std::vector<SOCKET> clientSockets;std::map<SOCKET, std::time_t> clientLastHeartbeatTime;// 设置套接字为非阻塞模式bool setSocketNonBlocking(SOCKET socket);// 接受新的客户端连接void acceptNewClients();// 接收客户端数据void receiveClientData();// 发送心跳包给客户端,并检测客户端响应void sendHeartbeatsAndCheck();// 移除已断开连接的客户端void removeDisconnectedClients(std::vector<SOCKET> &socketsToRemove);
};#endif
TcpServer.cpp
#include "TcpServer.h"// 构造函数,初始化相关成员变量
TcpServer::TcpServer() : listenSocket(INVALID_SOCKET),
heartbeatInterval(5), m_bExit(false), m_bHeartBeat(false)
{WSADATA wsaData;int result = WSAStartup(MAKEWORD(2, 2), &wsaData);if (result != 0) {std::cerr << "WSAStartup failed: " << result << std::endl;}
}// 析构函数,关闭套接字并清理WinSock环境
TcpServer::~TcpServer()
{stop();WSACleanup();
}// 启动服务器,监听指定端口
bool TcpServer::start(int port)
{listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);if (listenSocket == INVALID_SOCKET){std::cerr << "Socket creation failed: " << WSAGetLastError() << std::endl;return false;}if (!setSocketNonBlocking(listenSocket)) {std::cerr << "Failed to set listen socket non-blocking" << std::endl;closesocket(listenSocket);return false;}sockaddr_in serverAddr;serverAddr.sin_family = AF_INET;serverAddr.sin_addr.s_addr = INADDR_ANY;serverAddr.sin_port = htons(port);int result = bind(listenSocket, (sockaddr*)&serverAddr, sizeof(serverAddr));if (result == SOCKET_ERROR){std::cerr << "Bind failed: " << WSAGetLastError() << std::endl;closesocket(listenSocket);return false;}result = listen(listenSocket, SOMAXCONN);if (result == SOCKET_ERROR){std::cerr << "Listen failed: " << WSAGetLastError() << std::endl;closesocket(listenSocket);return false;}return true;
}// 停止服务器
void TcpServer::stop()
{if (listenSocket != INVALID_SOCKET) {closesocket(listenSocket);listenSocket = INVALID_SOCKET;}for (SOCKET clientSocket : clientSockets) {closesocket(clientSocket);}clientSockets.clear();clientLastHeartbeatTime.clear();
}// 设置套接字为非阻塞模式
bool TcpServer::setSocketNonBlocking(SOCKET socket)
{u_long iMode = 1;int result = ioctlsocket(socket, FIONBIO, &iMode);if (result == SOCKET_ERROR){std::cerr << "ioctlsocket failed: " << WSAGetLastError() << std::endl;return false;}return true;
}// 发送数据给指定客户端
int TcpServer::sendData(SOCKET clientSocket, const char* data, int dataLength)
{if (clientSocket == INVALID_SOCKET) {std::cerr << "Invalid client socket, cannot send data" << std::endl;return SOCKET_ERROR;}int totalBytesSent = 0;while (totalBytesSent < dataLength) {int bytesSent = ::send(clientSocket, data + totalBytesSent, dataLength - totalBytesSent, 0);if (bytesSent == SOCKET_ERROR){if (WSAGetLastError() == WSAEWOULDBLOCK) {// 暂时无法发送,等待下次尝试continue;}return SOCKET_ERROR;}totalBytesSent += bytesSent;}return totalBytesSent;
}// 接受新的客户端连接
void TcpServer::acceptNewClients()
{SOCKET newClientSocket = accept(listenSocket, NULL, NULL);if (newClientSocket == INVALID_SOCKET){if (WSAGetLastError() != WSAEWOULDBLOCK) {std::cerr << "Accept failed: " << WSAGetLastError() << std::endl;}return;}else{std::cout << "Accept success: " << newClientSocket << std::endl;}if (!setSocketNonBlocking(newClientSocket)) {std::cerr << "Failed to set client socket non-blocking" << std::endl;closesocket(newClientSocket);return;}clientSockets.push_back(newClientSocket);clientLastHeartbeatTime[newClientSocket] = std::time(nullptr);
}// 接收客户端数据
void TcpServer::receiveClientData()
{for (size_t i = 0; i < clientSockets.size(); ++i){SOCKET clientSocket = clientSockets[i];char buffer[1024];int bytesReceived = ::recv(clientSocket, buffer, sizeof(buffer), 0);if (bytesReceived == SOCKET_ERROR) {if (WSAGetLastError() == WSAEWOULDBLOCK) {// 暂时无数据可读,继续检查下一个客户端continue;}}else{buffer[bytesReceived] = '\0';std::string receivedData(buffer);// 在这里可以根据接收到的数据进行具体业务逻辑处理,比如解析命令等std::cout << "Received from client " << clientSocket << ": " << receivedData << std::endl;clientLastHeartbeatTime[clientSocket] = std::time(nullptr);std::string heartbeatData = buffer;heartbeatData+=" recvok:";int sentBytes = sendData(clientSocket, heartbeatData.c_str(), heartbeatData.length());}}
}// 发送心跳包给客户端,并检测客户端响应
void TcpServer::sendHeartbeatsAndCheck()
{const char* heartbeatData = "HEARTBEAT"; // 简单的心跳包内容,可自定义int dataLength = strlen(heartbeatData);for (auto& clientPair : clientLastHeartbeatTime){SOCKET clientSocket = clientPair.first;std::time_t& lastHeartbeatTime = clientPair.second;std::time_t currentTime = std::time(nullptr);if (currentTime - lastHeartbeatTime > heartbeatInterval) {// 超过心跳间隔时间没收到心跳响应,认为客户端连接异常socketsToRemove.push_back(clientSocket);continue;}int sentBytes = sendData(clientSocket, heartbeatData, dataLength);if (sentBytes == SOCKET_ERROR){// 发送心跳包失败,认为客户端连接可能有问题socketsToRemove.push_back(clientSocket);continue;}}
}// 移除已断开连接的客户端(更新函数定义,无参数)
void TcpServer::removeDisconnectedClients(std::vector<SOCKET>&socketsToRemove)
{for (SOCKET socketToRemove : socketsToRemove){auto it = std::find(clientSockets.begin(), clientSockets.end(), socketToRemove);if (it != clientSockets.end()) {std::cout << "Remove :"<< * it << std::endl;clientSockets.erase(it);clientLastHeartbeatTime.erase(socketToRemove);}}
}//接收链接线程
DWORD WINAPI TcpServer::ThreadAccept(LPVOID lpParam)
{TcpServer* t_Server = static_cast<TcpServer*>(lpParam);while (t_Server->m_bExit==false){t_Server->acceptNewClients();}return 0;
}
//接收数据
DWORD WINAPI TcpServer::ThreadRecvData(LPVOID lpParam)
{TcpServer* t_Server = static_cast<TcpServer*>(lpParam);while (t_Server->m_bExit == false){t_Server->receiveClientData();}return 0;
}//心跳包
DWORD WINAPI TcpServer::ThreadHeartBeat(LPVOID lpParam)
{TcpServer* t_Server = static_cast<TcpServer*>(lpParam);while (t_Server->m_bExit == false){Sleep(HEARTBEATTIME);t_Server->sendHeartbeatsAndCheck();if (t_Server->heartbeatInterval > 0){t_Server->removeDisconnectedClients(t_Server->socketsToRemove);}}return 0;
}
// 处理服务器业务逻辑,通常在循环中调用
void TcpServer::handle()
{//创建4个线程,分别进行接收链接 接收数据 发送数据 发送心跳包// 创建线程,传入当前对象指针作为参数,线程启动函数为 SendHeartbeatHANDLE acceptThreadHandle = CreateThread(NULL, 0, ThreadAccept, this, 0, NULL);if (acceptThreadHandle == NULL){std::cerr << "Create accept thread failed: " << GetLastError() << std::endl;}else{std::cerr << "Create accept thread success: " << acceptThreadHandle << std::endl;}HANDLE recvDatatThreadHandle = CreateThread(NULL, 0, ThreadRecvData, this, 0, NULL);if (acceptThreadHandle == NULL){std::cerr << "Create recvData thread failed: " << GetLastError() << std::endl;}else{std::cerr << "Create recvData thread success: " << recvDatatThreadHandle << std::endl;}if (m_bHeartBeat == true){HANDLE heartBeatThreadHandle = CreateThread(NULL, 0, ThreadHeartBeat, this, 0, NULL);if (acceptThreadHandle == NULL){std::cerr << "Create heartBeat thread failed: " << GetLastError() << std::endl;}else{std::cerr << "Create heartBeat thread success: " << heartBeatThreadHandle << std::endl;}}}
main.cpp
#include "TcpServer.h"int main()
{TcpServer server;server.heartbeatInterval = 30;server.m_bHeartBeat = true;if (server.start(8080)) {while (true) {server.handle();// 可以在这里添加适当的延时,避免过于频繁地循环处理,消耗过多CPU资源Sleep(100);break;}}else{std::cout << "server initiatefail" << std::endl;}Sleep(1000000);return 0;
}