文章目录:
- 简单的TCP网络程序
- 服务端创建套接字
- 服务端绑定
- 服务端监听
- 服务端获取连接
- 服务端处理请求
- TCP客户端实现
- 服务器测试
- TCP 网络程序(多进程版本)
- TCP 网络程序(多线程版本)
- TCP 网络程序(线程池版本)
简单的TCP网络程序
TCP 服务端的实现过程如下:
- 使用 socket() 创建 TCP 套接字。
- 使用 bind() 将套接字绑定到服务器地址。
- 使用 listen(),将服务器套接字置于被动模式,等待与客户端建立连接。
- 此时,客户端和服务端之间建立了连接,并且已经准备好传输数据。
服务端创建套接字
在 TCP 网络程序中,使用 socket 函数创建服务器端套接字。socket 函数的定义如下:
#include <sys/types.h>
#include <sys/socket.h>int socket(int domain, int type, int protocol);
TCP 服务器端调用 socket 函数创建套接字时,参数设置如下:
- 协议族(domain)选择
AF_INET
,表示使用 IPv4 地址族进行网络通信。 - 服务类型(type)选择
SOCK_STREAM
,表示创建一个面向连接的流套接字,用于提供可靠的、有序的、全双工的传输服务。 - 协议类型(protocol)设置为0,表示使用默认的传输协议。
下列代码段用于创建 TCP 服务器的套接字,它包含一个成员变量 listenSocket_ 表示服务器监听套接字。
#define SOCKET_ERR 1
#define BIND_ERR 2
#define LISTEN_ERR 3
#define USAGE_ERR 4
#define CONN_ERR 5class ServerTcp
{
public:ServerTcp() :listenSocket_(-1) {}~ServerTcp() {if(listenSocket_>0)close(listenSocket_);}void init(){// 1.创建socketlistenSocket_ = socket(PF_INET, SOCK_STREAM, 0);if (listenSocket_ < 0){std::cerr << "socket create failed : " << strerror(errno) << std::endl;exit(SOCKET_ERR);}}
private:int listenSocket_;
};
服务端绑定
套接字创建成功之后,我们需要使用 bind 函数将套接字与特定的IP地址和端口号进行绑定,以便在网络上唯一标识该套接字。这样,其它的计算机就可以通过指定的IP地址和端口号与该套接字进行通信。
我们创建一个 sockaddr_in
结构体,填写相应的字段(参考代码),然后使用 bind
函数将套接字与该地址结构进行绑定:
class ServerTcp
{
public:ServerTcp(uint16_t port, const std::string &ip = "") : port_(port), listenSocket_(-1) {}~ServerTcp() {if(listenSocket_>0)close(listenSocket_);}void init(){// 1.创建socketlistenSocket_ = socket(PF_INET, SOCK_STREAM, 0);if (listenSocket_ < 0){std::cerr << "socket create failed : " << strerror(errno) << std::endl;exit(SOCKET_ERR);}// 2.bind// 2.1 填充服务器信息struct sockaddr_in local;memset(&local, 0, sizeof local);local.sin_family = PF_INET;local.sin_port = htons(port_);ip_.empty() ? (local.sin_addr.s_addr = INADDR_ANY) : (inet_aton(ip_.c_str(), &local.sin_addr));// 2.2 本地socket信息,写入sock_对应的内核区域if (bind(listenSocket_, (const struct sockaddr *)&local, sizeof local) < 0){std::cerr << "bind failed : " << strerror(errno) << std::endl;exit(BIND_ERR);}}private:int listenSocket_;uint16_t port_;
};
使用 htons
将端口号从主机字节序转化为网络字节序(大端字节序),以确保在不同的系统上都能够正确的解析该端口号。
服务端监听
TCP 服务器的上述两个操作与 UDP 基本是相同的,但是 TCP 服务器是面向连接的,客户端在正式向服务器发送数据时,需要先与 TCP 服务器建立连接,然后才能与服务器进行通信。
因此 TCP 服务器需要客户端的连接请求,此时需要将 TCP 服务器的套接字设置为监听状态。在这里,我们使用 listen
函数将套接字设置为监听状态,并指定最大连接数。对于每个连接请求,TCP 服务器都会创建一个新的套接字来与客户端进行通信。
listen
函数用于将套接字设置为监听状态,以准备接收客户端的连接请求,它的函数定义如下:
#include <sys/types.h>
#include <sys/socket.h>int listen(int sockfd, int backlog);
参数解释:
sockfd
:要进行监听的套接字描述符。backlog
:全连接队列的最大长度,即可以等待处理的连接请求的数量。若同时又多个客户端的连接请求,此时没有被服务器处理的连接就放入该连接队列中。
返回值:
如果 listen
函数调用成功,则返回0。如果调用失败,则返回-1,同时错误码被设置。
TCP 服务器的监听套接字代码如下:
class ServerTcp
{
public:ServerTcp(uint16_t port, const std::string &ip = "") : port_(port), listenSocket_(-1) {}~ServerTcp() {if(listenSocket_>0)close(listenSocket_);}void init(){// 1.创建socketlistenSocket_ = socket(PF_INET, SOCK_STREAM, 0);if (listenSocket_ < 0){std::cerr << "socket create failed : " << strerror(errno) << std::endl;exit(SOCKET_ERR);}// 2.bind// 2.1 填充服务器信息struct sockaddr_in local;memset(&local, 0, sizeof local);local.sin_family = PF_INET;local.sin_port = htons(port_);ip_.empty() ? (local.sin_addr.s_addr = INADDR_ANY) : (inet_aton(ip_.c_str(), &local.sin_addr));// 2.2 本地socket信息,写入sock_对应的内核区域if (bind(listenSocket_, (const struct sockaddr *)&local, sizeof local) < 0){std::cerr << "bind failed : " << strerror(errno) << std::endl;exit(BIND_ERR);}// 3.监听socket,为何要监听呢?tcp是面向连接的!if (listen(listenSocket_, 5) < 0){std::cerr << "listen failed : " << strerror(errno) << std::endl;exit(LISTEN_ERR);}}private:int listenSocket_;uint16_t port_;
};
可以根据实际的需求设置合适的 backlog 的值。该值决定了全连接队列的最大长度,即服务器可以等待处理的连接请求的数量。如果连接请求超过了队列长度,后续的连接请求可能会被丢弃或被客户端拒绝。
服务端获取连接
TCP 服务器在初始化完成之后,需要调用 accept
函数来获取客户端的连接请求。accept
函数会阻塞程序的执行,知道有客户端连接请求的到达。
accept
函数用于接收客户端的连接请求,并创建一个新的套接字与客户端进行通信。它的函数定义如下:
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
参数解释:
sockfd
:服务器的监听套接字描述符。addr
:指向 sockaddr 的结构体指针,用于存储客户端的地址信息。addrlen
:指向 socklen_t 类型的函数指针,表示 addr 结构体的长度。
返回值:
获取连接成功,则返回新的套接字描述符。调用失败,则返回 -1 ,并且错误码被设置。
当 accept 函数成功接收客户端的连接请求时,它会返回一个新的套接字描述符。这个新的套接字描述符与之前的监听套接字描述符之间的区别和作用?
- 监听套接字用于侦听客户端的连接请求,它保持在监听状态,等待客户端连接。
- 监听套接字只负责接收连接请求,不进行实际的数据传输,一旦有连接请求到达,它会创建一个新的套接字用于与该客户端进行通信。
- 通过返回新的套接字描述符,服务器可以使用该套接字与对应的客户端进行数据传输。这样,服务器就可以同时处理多个客户端的请求,每个客户端都有自己专属的套接字与服务器进行通信。
TCP 服务器获取连接的代码如下:
class ServerTcp
{
public:ServerTcp(uint16_t port, const std::string &ip = "") : port_(port), ip_(ip), listenSocket_(-1) {}void loop(){while (true){struct sockaddr_in peer;socklen_t len = sizeof(peer);// 4.获取连接,accept的返回值是一个新的socket fdint serviceSock = accept(listenSocket_, (struct sockaddr *)&peer, &len);if (serviceSock < 0){std::cerr << "accept failed : " << strerror(errno) << std::endl;continue; // 服务器不可能因为一个客户端的连接失败退出,因此继续连接}// 4.1 获取客户端基本信息uint16_t peerPort = ntohs(peer.sin_port);std::string peerIp = inet_ntoa(peer.sin_addr);std::cout << "获取连接成功,sock -> " << serviceSock << std::endl;// 提供服务// ...}}private:int listenSocket_;uint16_t port_;std::string ip_;
};
接下来,对 TCP 服务端的连接进行测试,查看当前的服务器是否能成功接收连接请求,测试代码如下:
static void Usage(std::string proc)
{std::cerr << "Usage:\n\t" << proc << " prot ip" << std::endl;std::cerr << "Usage:\n\t" << proc << " 8080 127.0.0.1\n"<< std::endl;
}// ./serverTcp local_port local_ip
int main(int argc, char *argv[])
{if (argc != 2 && argc != 3){Usage(argv[0]);exit(USAGE_ERR);}uint16_t port = atoi(argv[1]);std::string ip;if (argc == 3)ip = argv[2];ServerTcp svr(port, ip);svr.init();svr.loop();return 0;
}
运行代码并测试:
服务端运行之后,可以使用 netstat
命令查看当前的网络连接状态,该服务器的本地地址为 0.0.0.0
,表示该 TCP 服务器可以接收到任意的IP地址连接请求。且当前的服务器处于 LISTEN
状态。
目前,我们还没有编写客户端的相应代码。在这里,可以使用 telnet
来连接到服务器。使用 telnet
来连接服务器,可以看到每多一个连接,套接字的数值就+1,每一次的第一个连接的套接字都是4。这是因为 0、1、2 分别是标志输入、标志输出和标志错误。而3号描述符被分配给了监听套接字。
服务端处理请求
为了能让通信双方都能看到对应的现象,这里实现一个简单的字符串中字母小写转大写的服务,服务器端读取到客户端发来的请求,完成对应的转换之后,将转换后的数据发送给客户端。客户端对转换后的字符串进行输出。
代码如下:
class ServerTcp
{
public:// 字符串中小写字符转换为大写字符的服务void transService(int socket, const std::string &clientIp, uint16_t clientPort){assert(socket >= 0);assert(!clientIp.empty());assert(clientPort >= 1024);#define BUFFER_SIZE 1024char inbuffer[BUFFER_SIZE];while (true){ssize_t s = read(socket, inbuffer, sizeof(inbuffer) - 1);if (s > 0){inbuffer[s] = '\0';// 将小写字符转换为大写字符for (int i = 0; i < s; i++){if (isalpha(inbuffer[i]) && islower(inbuffer[i]))inbuffer[i] = toupper(inbuffer[i]);}write(socket, inbuffer, strlen(inbuffer));}else if (s == 0){std::cout << "client quit -- " << clientIp.c_str() << "->" << clientPort << std::endl;break;}else{std::cout << "read failed : " << strerror(errno) << " " << clientIp.c_str() << "->" << clientPort << std::endl;break;}}// 程序执行到此处时,一定是客户端退出了,服务结束。close(socket);}private:int listenSocket_;uint16_t port_;std::string ip_;
};
TCP客户端实现
在客户端代码中,connect 函数用于建立与服务器的连接。它用于向服务器发起连接请求。
TCP 客户端的代码实现如下:
class ClientTcp
{
public:ClientTcp(const std::string &ip, uint16_t port): ip_(ip), port_(port), socket_(-1){}~ClientTcp(){if (socket_ > 0)close(socket_);}void init(){socket_ = socket(AF_INET, SOCK_STREAM, 0);if (socket_ < 0){std::cerr << "socket call failed : " << strerror(errno) << std::endl;exit(SOCKET_ERR);}}void start(){// connect,向服务器发起请求// 首先填充需要连接的远端主机的基本信息struct sockaddr_in server;bzero(&server, sizeof server);server.sin_family = AF_INET;server.sin_port = htons(port_);inet_aton(ip_.c_str(), &server.sin_addr);// 发送连接请求,connect会自动帮我们进行bindif (connect(socket_, (const sockaddr *)&server, sizeof(server)) != 0){std::cerr << "connect call failed : " << strerror(errno) << std::endl;exit(CONN_ERR);}std::cout << "connect success!!!" << std::endl;}void trans(){std::string message;while (true){message.clear();std::cout << "Please Enter# ";std::getline(std::cin, message);ssize_t s = write(socket_, message.c_str(), message.size());if (s > 0){message.resize(1024);ssize_t s = read(socket_, (char *)(message.c_str()), 1024);if (s > 0)message[s] = '\0';std::cout << "Server Echo# " << message << std::endl;}elsebreak;}}private:int socket_;std::string ip_;uint16_t port_;
};static void Usage(std::string proc)
{std::cerr << "Usage:\n\t" << proc << " prot ip" << std::endl;std::cerr << "Example:\n\t" << proc << " 127.0.0.1 8080\n"<< std::endl;
}int main(int argc, char *argv[])
{if (argc != 3){Usage(argv[0]);exit(USAGE_ERR);}uint16_t serverPort = atoi(argv[2]);std::string serverIp = argv[1];ClientTcp client(serverIp, serverPort);client.init();client.start();client.trans();return 0;
}
由于客户端不需要固定的端口号,因此不必调用 bind() ,客户端的端口号由内核自动分配。
注意:
- 客户端不是不允许调用 bind(),只是没有必要 bind() 固定一个端口号。否则如果在同一台机器上启动多个客户端,就会出现端口号被占用导致不能正确建立连接;
- 服务端也不是必须调用 bind(),但如果服务器不调用 bind(),内核会自动给服务器分配监听端口,每次启动服务器时端口号都不一样,客户端要连接服务器就会遇到麻烦。
服务器测试
运行服务端与客户端程序,然后使用 netstat
命令查看连接情况。此时就可以看到我们写得服务端进程,该进程当前处于监听状态。
接下来测试服务器端与客户端的功能,客户端输入字符串,服务端将该字符串进行相应的转换之后,将转换后的数据给到客户端,客户端对完成转换的字符串进行输出打印。当客户端退出后,服务端调用 read 函数的返回值为0,此时服务器便知道客户端退出了,于是终止对该客户端提供服务。
上面的测试是针对一个客户端的,接下来测试多个客户端连接服务器。
如下所示,单执行流服务器无法同时处理多个请求。在这种服务器模型中,服务器一次只能处理一个客户端的请求,而其它客户端的请求则需要等待前一个客户端的处理完成。
我们 accept 了一个请求之后,就一直在 while 循环尝试 read,没有继续调用到 accept,导致不能接收到新的请求。
这样会导致如下问题:当一个客户端连接到服务器并发送请求时,服务器会处理该请求,但其它客户端的请求会被阻塞,知道前一个客户端处理完成请求。这回导致其它客户端的响应时间延迟。
为了解决以上问题,可以使用多线程或多进程来实现并发处理。这样可以允许服务器同时处理多个客户端的请求,提高系统的并发性和响应性能。每个客户端连接通常会分配一个独立的线程或进程来处理请求,这样不同的客户端之间可以并行处理,互不干扰。
TCP 网络程序(多进程版本)
单执行流服务器改为多进程版本的服务器。
在这种多进程模型中,当服务器通过 accept 函数获取到新的客户端连接时,主进程会调用 fork 函数创建一个子进程。子进程将继承父进程的文件描述符和其它的相关资源,包括套接字。
接下来,父进程和子进程会成为两个独立的执行流。父进程继续监听新的连接请求,而子进程负责为父进程获取到的连接提供服务。通过这种方式,父进程可以继续接收并处理新的连接,而不用等待子进程完成当前连接的服务。这样实现了并发处理多个客户端的能力。
如何等待子进程退出?
在多进程 TCP 网络程序中,父进程创建子进程后,通常需要对子进程进行等待,以避免僵尸进程的产生。导致子进程资源无法完全释放,从而引发内存泄漏等问题。
阻塞等待
:父进程可以调用 wait/waitpid 函数进行阻塞等待,直到子进程退出之后才可以继续执行。这意味着父进程需要等待当前客户端的服务完成,才能继续接收下一个连接请求,这回导致服务器以串行化的方式为客户端提供服务。非阻塞等待
:非阻塞式的等待子进程意味着父进程在调用等待函数时不会被阻塞,可以立即返回并执行后续代码。虽然这样父进程可以继续接收新的连接请求,不会被当前子进程的服务阻塞。但是,父进程需要保持子进程的 PID,并定期检测子进程是否退出,以便及时回收子进程资源。这种方式需要额外的处理逻辑,需要不断的轮询检测子进程的状态,会引入一定的复杂度和更多的开销。
无论使用阻塞式等待还是非阻塞式等待,都不是很好,此时我们需要采取其它的方法。
方法一:使用 signal 函数捕捉 SIGCHLD 信号,将其处理动作设置为忽略。
方法二:父进程创建子进程,子进程再次创建子进程,让孙子进程为客户端提供服务。
捕捉 SIGCHLD 信号并将其处理动作设置为忽略
通过调用 signal 函数,设置 SIGCHLD 的处理方式为忽略。这样,子进程退出时,操作系统会自动回收子进程的资源,父进程不需要关系子进程。
class ServerTcp
{
public:void loop(){signal(SIGCHLD,SIG_IGN); // only linuxwhile (true){struct sockaddr_in peer;socklen_t len = sizeof(peer);// 4.获取连接,accept的返回值是一个新的socket fdint serviceSock = accept(listenSocket_, (struct sockaddr *)&peer, &len);if (serviceSock < 0){std::cerr << "accept failed : " << strerror(errno) << std::endl;continue;}// 4.1 获取客户端基本信息uint16_t peerPort = ntohs(peer.sin_port);std::string peerIp = inet_ntoa(peer.sin_addr);// 多进程版本 - 父进程打开的文件描述符会被子进程继承pid_t id=fork();assert(id!=-1);if(id==0){close(listenSocket_);transService(serviceSock,peerIp,peerPort);exit(0);}close(serviceSock);}}
private:int listenSocket_;uint16_t port_;std::string ip_;
};
运行并测试:
我们运行服务器,然后用两个客户端去连接该服务器。使用脚本 while :; do ps axj | head -1 && ps axj | grep serverTcp | grep -v grep; echo "--------------------------------";sleep 2;done
去对服务器程序进行监控。如下所示,当还没有客户端连接时,只有一个进程。当增加一个客户端连接服务器,就多了一个进程,多出来的这个进程就是父进程创建的子进程,一对一的为客户端提供服务。
当客户端退出时,对应的进程也就退出了 。客户端全部退出之后,就只有一个服务端进程了。该进程用于获取新的连接。当这个服务器也退出的时候,进程也就全部退出了。
让孙子进程提供服务
在 TCP 服务器程序中,可以使用多级进程派生的方法,让子进程再次进行 fork,创建孙子进程为客户端提供服务。这种方法可以避免等待孙子进程退出的情况。
在多级派生的方法中,父进程创建了一个子进程,然后子进程再次创建了一个孙子进程。在这个过程中,子进程在创建孙子进程后就立即退出,成为一个僵尸进程。而孙子进程由于没有父进程可以等待它的退出,所以成为一个孤儿进程。它的退出状态将由操作系统处理,不再需要父进程等待。
代码如下:
class ServerTcp
{
public:void loop(){signal(SIGCHLD,SIG_IGN); // only linuxwhile (true){struct sockaddr_in peer;socklen_t len = sizeof(peer);// 4.获取连接,accept的返回值是一个新的socket fdint serviceSock = accept(listenSocket_, (struct sockaddr *)&peer, &len);if (serviceSock < 0){std::cerr << "accept failed : " << strerror(errno) << std::endl;continue;}// 4.1 获取客户端基本信息uint16_t peerPort = ntohs(peer.sin_port);std::string peerIp = inet_ntoa(peer.sin_addr);pid_t id = fork();if (id == 0){close(listenSocket_);if (fork() > 0)exit(0);// 孤儿进程,被系统领养transService(serviceSock, peerIp, peerPort);exit(0);}close(serviceSock);pid_t ret = waitpid(id, nullptr, 0);assert(ret > 0);(void)ret;}}
private:int listenSocket_;uint16_t port_;std::string ip_;
};
多级派生的方式对并发处理多个客户端连接有什么优势?
- 多级派生的方式允许每个客户端连接都有一个独立的孙子进程来处理,因此可以同时处理多个客户端请求,实现并发处理。每个孙子进程都可以独立的处理客户端的请求,不会相互干扰或阻塞。
- 它们拥有自己的资源空间。可以避免多个客户端之间的资源冲突,提高系统的稳定性和安全性。
- 多级派生方式可以快速创建并启动孙子进程来处理客户端请求,而无需等待孙子进程退出。可以减少客户端连接的等待时间,提高系统的响应速度。
运行并测试:
重新编译程序并运行服务端和客户端。使用监控脚本 while :; do ps axj | head -1 && ps axj | grep serverTcp | grep -v grep; echo "--------------------------------";sleep 2;done
对服务端进程进行监控。如下所示,运行服务端,此时还没有客户端连接时,此时只有一个服务进程。
此时我们让多个客户端连接服务器,可以看到每多一个客户端连接服务器,就多一个 PPID 为1的进程,该进程就是由操作系统管理的孤儿进程。每个客户端都有一个对应的孤儿进程为它提供服务。
随着客户端的退出,对应的孤儿进程也跟着退出了,这些孤儿进程被系统回收了。
TCP 网络程序(多线程版本)
在上述的多进程版本的 TCP 网络中,我们通过创建进程来实现多个客户端的并发访问,每一个客户端对应一个进程,而创建进程的成本相对而言是较高的。而创建线程的成本较低,因为线程共享了进程的大部分资源,如地址空间、文件描述符等。因此,在实现多执行流的服务器时,使用多线程相较于使用多进程通常是更好的选择。
当服务进程调用 accept
函数接收新连接时,可以创建一个新线程来为对应的客户端提供服务。这样可以充分利用多核处理器的并行性,同时提供服务器的响应性能。对应创建后的线程,可以使用 pthread_detach
函数进行线程分离。这样创建出来的线程在退出时会自动回收所占用的资源,而不需要主线程进行等待。这样主线程可以继续处理新的连接请求,并将服务对应的客户端交给新线程处理。
使用 ThreadData 结构体来传递参数
class ThreadData
{
public:uint16_t clientPort_;std::string clientIp_;int sock_;ServerTcp *this_;ThreadData(uint16_t port, std::string ip, int sock, ServerTcp *ts): clientPort_(port), clientIp_(ip), sock_(sock), this_(ts){}
};
- ThreadData 类封装了线程所需要的参数,包括客户端的IP地址、端口号、socket 文件描述符以及指向 ServerTcp 的指针。通过将这些参数封装在一个类中,便于传递参数给线程函数,并且可以将数据组织在一起,提高代码的可读性和可维护性。
- 线程函数需要在堆上动态分配 ThreadData 对象,以确保线程函数执行完毕后能够正确释放内存。通过使用 new 运算符在堆上创建 ThreadData 对象,可以确保对象的生命周期延长至线程结束。
- 线程函数需要访问 ServerTcp 对象的成员函数 transService() 来处理客户端请求。ThreadData 类中包含一个指向 ServerTcp 对象的指针,线程可以通过该指针访问 ServerTcp 对象的成员函数和成员变量。
- 在线程函数中,通过调用 delete 运算符来删除 ThreadData 对象,释放分配的内存。
线程执行函数 threadRoutine 应设置为静态函数(static)
在使用 pthread_create 函数创建新线程时,新线程的执行函数需要是一个参数为 void* ,返回值为 void* 的函数。如果将线程执行函数定义为类的成员函数,由于隐含的 this 指针,无法直接匹配的正确的函数。因此,我们需要将线程执行函数定义为静态成员函数。静态成员函数不依赖于类的实例。
多线程版 TCP 网络程序服务端更改代码如下:
class ServerTcp;
class ThreadData
{
public:uint16_t clientPort_;std::string clientIp_;int sock_;ServerTcp *this_;ThreadData(uint16_t port, std::string ip, int sock, ServerTcp *ts): clientPort_(port), clientIp_(ip), sock_(sock), this_(ts){}
};class ServerTcp
{
public:void loop(){while (true){struct sockaddr_in peer;socklen_t len = sizeof(peer);// 4.获取连接,accept的返回值是一个新的socket fdint serviceSock = accept(listenSocket_, (struct sockaddr *)&peer, &len);if (serviceSock < 0){std::cerr << "accept failed : " << strerror(errno) << std::endl;continue;}// 4.1 获取客户端基本信息uint16_t peerPort = ntohs(peer.sin_port);std::string peerIp = inet_ntoa(peer.sin_addr);// 多线程版本// 多线程不需要关闭文件描述符,因为多线程会共享文件描述符表!ThreadData *td = new ThreadData(peerPort, peerIp, serviceSock, this);pthread_t tid;pthread_create(&tid, nullptr, threadRoutine, (void *)td);}}static void *threadRoutine(void *args){pthread_detach(pthread_self());ThreadData *td = static_cast<ThreadData *>(args);td->this_->transService(td->sock_, td->clientIp_, td->clientPort_);delete td;}
private:int listenSocket_;uint16_t port_;std::string ip_;
};
运行程序并测试:
重新编译程序并运行服务端和客户端。使用监控脚本 while :; do ps -aL | head -1&&ps -aL | grep serverTcp; echo "------------------------------------";sleep 1;done
对服务端进程进行监控。如下所示,运行服务端,此时还没有客户端连接时,此时只有一个服务进程。
运行多个客户端连接服务器,此时每一个客户端分配一个线程为其提供服务,如下,每个线程的 PID 是一样的,因为它们属于同一个进程的不同线程。
当客户端依次退出时,为其提供服务的线程也会相应退出,最后只有服务器的进程继续执行等待连接请求。
TCP 网络程序(线程池版本)
仅仅是使用多线程版的服务器,它还是存在一些问题的:
- 每当有新的连接到来时,服务器主线程都会为该客户端创建一个新的线程来提供服务,服务结束之后该线程就会被销毁。这种方式既繁琐又低效。
- 如果有大量的客户端连接请求,线程数量将会增多,增加 CPU 的负担,因为 CPU 需要不断在这些线程之间切换。线程切换的成本变高,线程调度周期变长,导致客户端的响应时间延长,影响客户端的体验。
因此,我们需要采取其它的方法来解决这个问题,如下:
- 在服务端预先创建一批线程,形成线程池。当有客户端连接请求时,从线程池中选择一个空闲的线程为该客户端提供服务。这样,客户端一来就有线程可以用,避免了动态创建线程的开销。
- 当某一个线程为客户端提供完成服务之后,不立即退出线程,而是继续为下一个客户端提供服务。如果当前没有客户端连接请求,则让该线程进入休眠状态,等待新的连接来时才唤醒该线程。这样可以避免线程频繁的创建和销毁,提高线程的复用率和效率。
- 线程池中的线程数量需要适中。这样可以保持 CPU 的负载在合理范围内。如果有客户端连接到来,但是线程池中的线程都在为其它客户端提供服务,此时服务端不应再创建新的线程,而是让新的连接请求进入全连接队列排队。当线程池中有空闲线程时,再从队列中获取连接请求并为其提供服务。
上面我们提到了用线程池来解决此问题,这里我们就要引入线程池了,关于线程池的详细介绍在 Linux线程池 中有详细的介绍。线程池实现的代码如下:
class Mutex
{
public:Mutex() { pthread_mutex_init(&_lock, nullptr); }void lock() { pthread_mutex_lock(&_lock); }void unlock() { pthread_mutex_unlock(&_lock); }~Mutex() { pthread_mutex_destroy(&_lock); }private:pthread_mutex_t _lock;
};class LockGuard
{
public:LockGuard(Mutex *mutex) : _mutex(mutex) { _mutex->lock(); }~LockGuard() { _mutex->unlock(); }private:Mutex *_mutex;
};int gThreadNum = 5;template <class T>
class ThreadPool
{
private:ThreadPool(int threadNum = gThreadNum) : isStart_(false), threadNum_(threadNum){assert(threadNum_ > 0);pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&cond_, nullptr);}ThreadPool(const ThreadPool<T> &) = delete;void operator=(const ThreadPool<T> &) = delete;private:// 类内成员,成员函数,都有默认参数thisstatic void *threadRoutine(void *args){pthread_detach(pthread_self());ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);prctl(PR_SET_NAME, "follower");while (1){tp->lockQueue();while (!tp->haveTask()){tp->waitForTask();}// 这个任务就被拿到了线程的上下文中了T t = tp->pop();tp->unlockQueue();t(); // 让指定的线程处理这个任务}}T pop(){T temp = taskQueue_.front();taskQueue_.pop();return temp;}void lockQueue() { pthread_mutex_lock(&mutex_); }void unlockQueue() { pthread_mutex_unlock(&mutex_); }bool haveTask() { return !taskQueue_.empty(); } // 检测当前是否有任务void waitForTask() { pthread_cond_wait(&cond_, &mutex_); }void choiceThreadForHandler() { pthread_cond_signal(&cond_); }public:static ThreadPool<T> *getInstance(){static Mutex mutex;if (nullptr == instance) // 仅仅是过滤重复的判断{LockGuard lockguard(&mutex); // 进入代码块,加锁,退出代码块,自动解锁if (instance == nullptr){instance = new ThreadPool<T>();}}return instance;}void start(){assert(!isStart_);for (int i = 0; i < threadNum_; ++i){pthread_t temp;pthread_create(&temp, nullptr, threadRoutine, this);}isStart_ = true;}void push(const T &in){lockQueue();taskQueue_.push(in);choiceThreadForHandler();unlockQueue();}~ThreadPool(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&cond_);}int threadNum(){return threadNum_;}private:bool isStart_; // 判断线程池是否启动int threadNum_;queue<T> taskQueue_;pthread_mutex_t mutex_;pthread_cond_t cond_;static ThreadPool<T> *instance;
};template <class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;
我们向服务端引入了一个线程池,需要在服务类中新增一个指向线程池的指针成员变量。在服务器实例化服务器对象时,可以先将线程池指针初始化为空。
服务器初始化完成之后,可以构造出实际的线程池对象。在线程池代码中,我们可以指定线程池中线程的数量,也可以使用代码中给出的默认值。
在启动服务器之前,需要对线程池进行初始化。这时创建并启动线程池中的所有线程,这些线程不断的从任务队列中获取任务并进行处理。
当服务进程调用 accept 函数获取到一个新的连接请求后,可以根据该客户端的套接字、IP地址以及端口号构建一个任务。然后,调用线程池中的 push 接口将该任务放入任务队列中。
即服务端将任务交给线程池处理,线程池中的线程负责从任务队列中获取任务并执行。服务类代码如下:
class ServerTcp;// 字符串中小写字符转换为大写字符的服务
void transService(int socket, const std::string &clientIp, uint16_t clientPort)
{assert(socket >= 0);assert(!clientIp.empty());assert(clientPort >= 1024);#define BUFFER_SIZE 1024char inbuffer[BUFFER_SIZE];while (true){ssize_t s = read(socket, inbuffer, sizeof(inbuffer) - 1);if (s > 0){inbuffer[s] = '\0';// 将小写字符转换为大写字符for (int i = 0; i < s; i++){if (isalpha(inbuffer[i]) && islower(inbuffer[i]))inbuffer[i] = toupper(inbuffer[i]);}write(socket, inbuffer, strlen(inbuffer));}else if (s == 0){std::cout << "client quit -- " << clientIp.c_str() << "->" << clientPort << std::endl;break;}else{std::cout << "read failed : " << strerror(errno) << " " << clientIp.c_str() << "->" << clientPort << std::endl;break;}}// 程序执行到此处时,一定是客户端退出了,服务结束。close(socket);
}class ThreadData
{
public:uint16_t clientPort_;std::string clientIp_;int sock_;ServerTcp *this_;ThreadData(uint16_t port, std::string ip, int sock, ServerTcp *ts): clientPort_(port), clientIp_(ip), sock_(sock), this_(ts){}
};class ServerTcp
{
public:ServerTcp(uint16_t port, const std::string &ip = "") : port_(port), ip_(ip), listenSocket_(-1),tp_(nullptr) {}~ServerTcp(){if (listenSocket_ > 0)close(listenSocket_);}void init(){// 1.创建socketlistenSocket_ = socket(PF_INET, SOCK_STREAM, 0);if (listenSocket_ < 0){std::cerr << "socket create failed : " << strerror(errno) << std::endl;exit(SOCKET_ERR);}// 2.bind// 2.1 填充服务器信息struct sockaddr_in local;memset(&local, 0, sizeof local);local.sin_family = PF_INET;local.sin_port = htons(port_);ip_.empty() ? (local.sin_addr.s_addr = INADDR_ANY) : (inet_aton(ip_.c_str(), &local.sin_addr));// 2.2 本地socket信息,写入sock_对应的内核区域if (bind(listenSocket_, (const struct sockaddr *)&local, sizeof local) < 0){std::cerr << "bind failed : " << strerror(errno) << std::endl;exit(BIND_ERR);}// 3.监听socket,为何要监听呢?tcp是面向连接的!if (listen(listenSocket_, 5) < 0){std::cerr << "listen failed : " << strerror(errno) << std::endl;exit(LISTEN_ERR);}// 4.加载线程池tp_=ThreadPool<Task>::getInstance();}void loop(){tp_->start();while (true){struct sockaddr_in peer;socklen_t len = sizeof(peer);// 4.获取连接,accept的返回值是一个新的socket fdint serviceSock = accept(listenSocket_, (struct sockaddr *)&peer, &len);if (serviceSock < 0){std::cerr << "accept failed : " << strerror(errno) << std::endl;continue;}// 4.1 获取客户端基本信息uint16_t peerPort = ntohs(peer.sin_port);std::string peerIp = inet_ntoa(peer.sin_addr);// 线程池版本Task t(serviceSock,peerIp,peerPort,transService);tp_->push(t);}}
private:int listenSocket_;uint16_t port_;std::string ip_;ThreadPool<Task> *tp_;
};
在上面代码中,我们直接把需要执行的任务(transService)写在了服务器代码中,也可以将它拿出来写在单独的类中,这里我们就做简单的处理了。
设计一个任务类Task,该任务类的代码如下:
class Task
{
public:// typedef std::function<void(int, std::string, uint16_t)> callback_t;using callback_t = std::function<void(int, std::string, uint16_t)>;private:int sock_; // 给用户提供IO服务的sockuint16_t port_; // client portstd::string ip_; // client ipcallback_t func_; // 任务的回调方法
public:Task() : sock_(-1), port_(-1) {}Task(int sock, std::string ip, uint16_t port, callback_t func): sock_(sock), ip_(ip), port_(port), func_(func) {}~Task() {}void operator()(){ func_(sock_, ip_, port_);}
};
在上面的任务类中,包含成员变量 sock_
、port_
和 ip_
,分别表示给用户提供 IO 服务的套接字、客户端的端口号和IP地址。成员变量 func_
是一个回调函数,用于处理任务。任务类重载了函数调用运算符 operator()
,当任务被对象调用时,会执行回调函数,并传递套接字、IP地址和端口号作为参数。
当任务队列中有任务时,线程池中的线程会先定义出一个 Task 对象,该对象调用任务队列中的 pop 函数,从任务队列中获取任务。
通过这种方式,可以将任务的处理逻辑与任务类的定义分离,使得任务类更加灵活和可复用。在使用任务类时,可以通过传递不同的回调函数来实现不同的任务处理逻辑。
运行程序并测试:
多个客户端连接服务器时,可以同时为其提供服务,如下所示: