liunx网络套接字 | 实现基于tcp协议的echo服务

ops/2024/11/1 4:19:53/

        前言:本节讲述linux网络下的tcp协议套接字相关内容。博主以实现tcp服务为主线,穿插一些小知识点。以先粗略实现,后精雕细琢为思路讲述实现服务的过程。下面开始我们的学习吧。

        ps:本节内容建议了解网络端口号的友友们观看哦。

目录

实现内容

线程池版本整体代码

准备文件

makefile

tcpserver.hpp

main.cc

tcpclient

version1运行结果

version2版本

version3版本

version4版本


实现内容

        本篇内容将要实现一个服务端, 一个客户端。 然后客户端用来链接服务端, 向服务端发送消息, 然后服务端能够接收到消息并将消息返回给客户端。 

        实现的版本有四个:

  •         version1:实现单执行流的客户服务echo服务, 就是服务端只为一个服务端进行服务。 
  •         version2:在version1的版本上, 添加进程, 实现多进程的客户服务echo服务, 就是服务端为多个客户端进行服务, 但是因为是多进程,所以开销大。
  •         version3:改进version2版本, 将多进程改成多线程。实现多线程的echo服务。 但是当用户很多的时候, 线程量太大, 无法控制。
  •         version4:终极版本, 改进version3, 以线程池为基础, 实现可控的多线程echo服务。 控制线程个数, 既保证了并发性, 又防止了用户太多,线程爆满的问题。

        博主先实现version1, 然后在version1的基础上进行改版。下面开始实现: 

线程池版本整体代码

tcpserver


#pragma once
#include "Log.hpp"
#include <iostream>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "ThreadPool.h"
#include "Task.h"
#include <sys/wait.h>
#include <unistd.h>
using namespace std;const int defaultfd = -1;
const int defaultport = 8080;
const string defaultip = "0.0.0.0";
const int backlog = 10; // 直接用,一般不要设置的太大。class TcpServer;  //声明class ThreadData
{
public:ThreadData(int fd, string ip, uint16_t port, TcpServer* const t): sockfd_(fd), clientip_(ip), clientport_(port), t_(t){}public:int sockfd_;string clientip_;uint16_t clientport_;
public:TcpServer* const t_;
};Log lg;enum
{SockError = 2,BindError,ListenError
};class TcpServer
{
public:TcpServer(int port = defaultport, string ip = defaultip, int sockfd = defaultfd): listensockfd_(sockfd), ip_(ip), port_(port){}void InitServer(){listensockfd_ = socket(AF_INET, SOCK_STREAM, 0);if (listensockfd_ < 0){lg(Fatal, "create socket, errno: %d, strerror: %s", errno, strerror(errno));exit(SockError);}//lg(Info, "create socket success, sockfd: %d", listensockfd_);struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port_);inet_aton(ip_.c_str(), &(local.sin_addr)); // 主机序列转网络学列。 inet_aton是一个线程安全的函数。// 绑定if (bind(listensockfd_, (sockaddr *)&local, sizeof(local)) < 0){lg(Fatal, "bind error, errno: %d, strerror: %s", errno, strerror(errno));exit(BindError);}// tcp面向字节流, 是被动的, 所以要将对应的socket设置为监听状态。if (listen(listensockfd_, backlog) < 0) // backlock表示的是底层全连接队列的长度。 这个参数对意思, 不做解释。{lg(Fatal, "Listen error, errno: %d, strerror: %s", errno, strerror(errno));exit(ListenError);}lg(Info, "Listen has success");}void Start(){ThreadPool<Task>::GetInstance()->Start();lg(Info, "tcpServer is running...");for (;;) // tcp协议也是一种一直处于运行的服务{// tcp是面向连接的, 所以他比udp还多了一步accept, 先将客户端与服务端连接起来。accept的返回值成功返回整数文件描述符,否则-1被返回, 错误码被设置// 1、获取新连接,struct sockaddr_in client; // 获取的是客户端的addrsocklen_t len = sizeof(client);int sockfd = accept(listensockfd_, (sockaddr *)&client, &len); // accept成功, 就能知道是谁连接的我。if (sockfd < 0)                                                // 关于这两个套接字, sockfd_的核心工作就只是把链接获取上来, 未来进行IO操作, 看的是sockfd。{lg(Waring, "listen error, errno: %d, strerror: %s", errno, strerror(errno));continue;}uint16_t clientport = ntohs(client.sin_port); // 网络序列转主机序列char clientip[32];inet_ntop(sockfd, &client.sin_addr, clientip, sizeof(clientip));// 2、根据新连接进行通信lg(Info, "get a new link..., sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip);//version--4线程池版本Task task_(sockfd, clientip, clientport);ThreadPool<Task>::GetInstance()->Push(task_);}}~TcpServer(){}private:int listensockfd_; // 监听套接字, 只用来升起服务器, 接收链接uint16_t port_;string ip_;
};

 main.cc

#include"tcpserver.hpp"
#include<memory>int main(int argc, char* argv[])
{if (argc != 2){cout << "has return" << endl;return 1;}//uint16_t port = stoi(argv[1]);unique_ptr<TcpServer> tcpsvr(new TcpServer(port));tcpsvr->InitServer();tcpsvr->Start();return 0;
}

 tcpclient

#include<iostream>
using namespace std;
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<string.h>
#include<netinet/in.h>int main(int argc, char* argv[])
{    //处理argc, argv[]if (argc != 3){cout << "has return " << endl;return 1;}//uint16_t serverport = stoi(argv[2]);string serverip = argv[1];//创建addr结构体, 设置端口号ip地址int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0){cout << "socket error" << endl;return 1;}sockaddr_in server;memset(&server, 0, sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(serverport);inet_pton(AF_INET, serverip.c_str(), &(server.sin_addr));//1、客户端要绑定端口号, 但是不需要显示的绑定, 而是系统进行随机端口的绑定。 int n = connect(sockfd,(sockaddr*)&server, sizeof(server));if (n < 0) {cerr << "connect error..." << endl;return 2;}//2、发送信息, 接收信息。string message;while (true){cout << "Please Enter# ";getline(cin, message);//write(sockfd, message.c_str(), message.size());char inbuffer[4096];int n = read(sockfd, inbuffer, sizeof(inbuffer) - 1);if (n > 0){inbuffer[n] = 0;cout << inbuffer << endl;}}return 0;
}

ThreadPool


#pragma once
#include<iostream>
#include<pthread.h>
#include<vector>
#include<string>
using namespace std;
#include<queue>
#include<ctime>
#include<unistd.h>//对线程的属性做一下封装, 有利于线程池的保存以及后面的处理
struct ThreadInfo
{pthread_t tid_;string name_;   
};template<class T>
class ThreadPool
{static const int defaultnum = 5;  //默认的线程池的大小(线程池的大小就是里面包含的线程的数量)private://加锁解锁void Lock(){pthread_mutex_lock(&mutex_);}void Unlock(){pthread_mutex_unlock(&mutex_);}//唤醒线程, 线程是可以被挂起的(就比如信号量)。 当任务没有的时候,线程就要被挂起, 有任务后再唤醒void Wakeup(){pthread_cond_signal(&cond_);}void ThreadSleep(){pthread_cond_wait(&cond_, &mutex_);}bool IsQueueEmpty(){return tasks_.empty();}public://线程要执行的函数static void* HandlerTask(void* args){ThreadPool<T>* tp = static_cast<ThreadPool<T>*>(args);while(true) {tp->Lock();while (tp->tasks_.empty()){tp->ThreadSleep(); //如果队列里面没有任务了, 就让线程去休眠。}//否则就去拿到tasks里面的任务T t = tp->tasks_.front();tp->tasks_.pop();//tp->Unlock();t();  //每一个线程先对任务进行消费, 消费完成之后处理任务。    }}//运行这个线程池, 也就是先将线程创建出来。 然后去运行线程void Start(){int num = threads_.size();for (int i = 0; i < num; i++){threads_[i].name_ = "thread-";threads_[i].name_ += to_string(i);   pthread_create(&(threads_[i].tid_), nullptr, HandlerTask, this);}}//主线程给线程池发送任务, 注意, 这个任务一定是可以被储存起来的。 因为当我们的任务很多很多的时候, 我们的线程池内的线程要一个一个地对这些任务进行处理void Push(const T& t){Lock();tasks_.push(t);Wakeup();Unlock();}//获取单例//改编成单例的步骤里面只有这里要说一下, 就是为什么我们要套双层判断。 其实这里的最外面的一层的判断是我们另外加上去的。 为什么//要加这个判断呢? 就是如果我们不加最外层这一层判断。 那么每一个线程获取单例都要申请所,加锁。 不就是相当于所有的线程都在串行执行? 这就有效率问题。 //解决方案就是这个再加一层判断。 这样假如有四个线程。 那么一开始四个线程都在判断, 那么它们四个线程都进入了if里面。 然后就都申请锁, 但是只有第一个线程能够//进入第二层里面, 其他的进入不了。 那么当这一轮的四个线程都申请一次锁候就都退出了函数, 然后就都去做自己的事情了。 问题是, 当下次它们再来申请单例对象的时候它们连//第一层判断都成功不了了, 也就都不用加锁解锁了, 这就大大提高了效率!!!static ThreadPool<T>* GetInstance(int num = defaultnum){if (tp_ == nullptr){pthread_mutex_lock(&tp_->lock_);if (tp_ == nullptr) {tp_ = new ThreadPool<T>(num);}pthread_mutex_unlock(&tp_->lock_);}return tp_;}private://构造函数私有化, 只有Getinstance里面才能创建。 ThreadPool(int num = defaultnum):threads_(num){pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&cond_, nullptr);}//单例模式只有一个对象, 所以要将拷贝构造和拷贝赋值封住, 为了防止有人在外部重新拷贝一个对象。 ThreadPool(const ThreadPool<T>& tp) = delete;const ThreadPool<T>& operator=(const ThreadPool<T>& tp) = delete;~ThreadPool(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&cond_);}private:vector<ThreadInfo> threads_;   //线程都维护在vector当中, 这个就是线程池里面的线程的个数,queue<T> tasks_ ;              //向线程池中发送任务, 这个队列里面保存的就是我们的任务的数目。 pthread_mutex_t mutex_;        //锁,用来生产者线程(本份代码只是主线程)给线程池发送任务时候加锁使用以及消费者线程抢夺任务时加锁使用 pthread_cond_t cond_;          //条件变量, 用来没有任务的时候,消费者要挂起。 static pthread_mutex_t lock_;         //锁, 这个锁是为了在获取单例的时候能够让线程原子性的访问if (tp_ == nullptr)。static ThreadPool<T>* tp_;    //tp指针, 这就是唯一个单例对象。 };template<class T>
ThreadPool<T>* ThreadPool<T>::tp_ = nullptr;template<class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;

Task

#include <iostream>
using namespace std;
#include"Log.hpp"
#include <vector>
#include <string>
extern Log lg;// Task.h文件里面包含了任务类, 这个是我们线程池要执行的任务
class Task
{
public:// 构造函数, 第一个参数data1, 第二个参数data2, 第三个参数是加减乘除的符号。 这个任务就是进行四则运算Task(int sockfd, string clientip, uint16_t clientport):sockfd_(sockfd), clientip_(clientip), clientport_(clientport){}~Task() {}// 执行任务的接口run(), 这个方法对三个变量进行判断, 然后进行运算。void run(){char buffer[4096];string temp = clientip_;while (true){ssize_t n = read(sockfd_, buffer, sizeof(buffer));if (n > 0){buffer[n] = 0;cout << "client say#: " << buffer << endl;string echo_string("tcpserver echo#  " + (string)buffer);write(sockfd_, echo_string.c_str(), echo_string.size());}else if (n == 0){lg(Info, "quit sockfd:%d ", sockfd_);exit(1);}else{lg(Waring, "Waring, sockfd:%d, clientport: %d, clientip: %s", sockfd_, clientport_, temp.c_str());}//}}// 仿函数, 为了方便我们的对象能够像函数一样使用。void operator()(){run();}private:// 每一个任务对象里面都有三个参数, 一个data1, 一个data2, 最后一个op_int sockfd_;string clientip_;uint16_t clientport_;
};

 

准备文件

先准备好文件。 tcpserver.hpp实现服务的接口, main.cc运行服务端。 然后tcpclient.cc运行客户端。 

 

makefile

makefile不解释, 直接上代码(这里加上-g是为了后续方便调试, 也可以不加)

.PHONY:all
all: tcpserver.exe tcpclient.exetcpserver.exe:main.ccg++ -o $@ $^ -std=c++11 -lpthread -g
tcpclient.exe:tcpclient.ccg++ -o $@ $^ -std=c++11 -lpthread -g.PHONY:clean
clean:rm -rf tcpserver.exe tcpclient.exe

 tcpserver.hpp

先来看框架

#pragma once
#include "Log.hpp"
#include <iostream>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include"ThreadPool.h"
#include"Task.h"
#include <sys/wait.h>
#include <unistd.h>
using namespace std;const int defaultfd = -1;
const int defaultport = 8080;
const string defaultip = "0.0.0.0";
const int backlog = 10; // 直接用,一般不要设置的太大。Log lg;enum
{SockError = 2,BindError,ListenError
};class TcpServer
{
public://构造函数, 将服务端的端口号, ip地址传过来TcpServer(int port = defaultport, string ip = defaultip, int sockfd = defaultfd): listensockfd_(sockfd), ip_(ip), port_(port){}//初始化服务端,分两步:绑定和监听void InitServer(){//绑定//监听}//运行服务端void Start(){}//执行相应的服务void Service(int sockfd, const string &clientip, uint16_t clientport){} //析构函数~TcpServer(){}private:int listensockfd_; // 监听套接字, 只用来升起服务器, 接收链接uint16_t port_;string ip_;
};

然后初始化就是创建sockaddr结构体, 创建套接字, 然后绑定。 因为tcp是面向字节流的。 所以还要对网卡进行监听。下面是初始化服务。

//对服务端进行初始化void InitServer(){listensockfd_ = socket(AF_INET, SOCK_STREAM, 0);if (listensockfd_ < 0){lg(Fatal, "create socket, errno: %d, strerror: %s", errno, strerror(errno));exit(SockError);}//lg(Info, "create socket success, sockfd: %d", listensockfd_);struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port_);inet_aton(ip_.c_str(), &(local.sin_addr)); // 主机序列转网络学列。 inet_aton是一个线程安全的函数。// 绑定if (bind(listensockfd_, (sockaddr *)&local, sizeof(local)) < 0){lg(Fatal, "bind error, errno: %d, strerror: %s", errno, strerror(errno));exit(BindError);}// tcp面向字节流, 是被动的, 所以要将对应的socket设置为监听状态。if (listen(listensockfd_, backlog) < 0) // backlock表示的是底层全连接队列的长度。 这个参数对意思, 不做解释。{lg(Fatal, "Listen error, errno: %d, strerror: %s", errno, strerror(errno));exit(ListenError);}lg(Info, "Listen has success");}

        然后是运行服务, 运行服务也是分两步: accept与客户端建立连接。 然后执行服务。

    void Start(){lg(Info, "tcpServer is running...");for (;;) // tcp协议也是一种一直处于运行的服务{// tcp是面向连接的, 所以他比udp还多了一步accept, 先将客户端与服务端连接起来。accept的返回值成功返回整数文件描述符,否则-1被返回, 错误码被设置// 1、获取新连接,struct sockaddr_in client; // 获取的是客户端的addrsocklen_t len = sizeof(client);int sockfd = accept(listensockfd_, (sockaddr *)&client, &len); // accept成功, 就能知道是谁连接的我。if (sockfd < 0)                                                // 关于这两个套接字, sockfd_的核心工作就只是把链接获取上来, 未来进行IO操作, 看的是sockfd。{lg(Waring, "listen error, errno: %d, strerror: %s", errno, strerror(errno));continue;}uint16_t clientport = ntohs(client.sin_port); // 网络序列转主机序列char clientip[32];inet_ntop(sockfd, &client.sin_addr, clientip, sizeof(clientip));// 2、根据新连接进行通信lg(Info, "get a new link..., sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip);//version--1 单进程版Service(sockfd, clientip, clientport);close(sockfd);}}

        然后执行的服务是echo服务, 就是先接受客户端发来的信息, 然后将信息加工一下发回去。

void Service(int sockfd, const string &clientip, uint16_t clientport){char buffer[4096];while (true){ssize_t n = read(sockfd, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;cout << "client say#: " << buffer << endl;string message("tcpserver echo#  " + string(buffer));//write(sockfd, message.c_str(), message.size());}else if (n == 0){lg(Info, "quit sockfd:%d ", sockfd);exit(1);}else{lg(Waring, "Waring, sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip.c_str());}}}

         上面就是整个的代码。

        我们这里说一下accept这个代码 

        accept代码的第一个参数是sockfd, 就是网卡的文件fd。 然后第二个参数和第三个参数都是输出型参数。 能够将对方也就是客户端的sockaddr带出来。 

main.cc

        主函数就是接收到传来的端口号, 创建服务端然后初始化并运行起来。

#include"tcpserver.hpp"
#include<memory>int main(int argc, char* argv[])
{if (argc != 2){cout << "has return" << endl;return 1;}//uint16_t port = stoi(argv[1]);unique_ptr<TcpServer> tcpsvr(new TcpServer(port));tcpsvr->InitServer();tcpsvr->Start();return 0;
}   

 tcpclient

        先看客户端的框架, 就是先链接服务端。 然后就给服务端发信息,接收信息。 (接收到的这个信息是被服务端处理过的)

#include<iostream>
using namespace std;
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<string.h>
#include<netinet/in.h>int main(int argc, char* argv[])
{//处理argc, argv[]//先连接//再发数据接受数据return 0;
}

处理argc的时候, 因为我们的参数一定是三个。即, 一个程序名一个端口号, 一个IP地址。 所以如果argc不是等于3的话直接返回。 为3的话就将端口号以及ip地址保存一下。 其中ip地址是要连接到的服务端的ip地址, 端口号是要连接到服务端的端口号。

    //处理argc, argv[]if (argc != 3){cout << "has return " << endl;return 1;}//uint16_t serverport = stoi(argv[2]);string serverip = argv[1];

然后创建addr结构,连接到服务端。 

    //创建addr结构体, 设置端口号ip地址int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0){cout << "socket error" << endl;return 1;}sockaddr_in server;memset(&server, 0, sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(serverport);inet_pton(AF_INET, serverip.c_str(), &(server.sin_addr));//1、客户端要绑定端口号, 但是不需要显示的绑定, 而是系统进行随机端口的绑定。 int n = connect(sockfd,(sockaddr*)&server, sizeof(server));if (n < 0) {cerr << "connect error..." << endl;return 2;}

最后就是收发消息, 这里创建循环, 让我们可以执行多次服务, 多次收发消息。

    //2、发送信息, 接收信息。string message;while (true){cout << "Please Enter# ";getline(cin, message);//write(sockfd, message.c_str(), message.size());char inbuffer[4096];int n = read(sockfd, inbuffer, sizeof(inbuffer) - 1);if (n > 0){inbuffer[n] = 0;cout << inbuffer << endl;}}

然后我们就能够运行了。下面是运行结果。

version1运行结果

可以看到已经可以收发消息。

version2版本

        version2版本是创建多进程。 但是, 要知道, 我们的父进程创建子进程后要进行等待。 否则会造成内存泄漏。 但是我们父进程等待, 父进程又不能向下运行代码了, 就不能继续创建子进程了。 所以, 为了解决这个问题。 我们就可以创建一个孤儿进程。 让子进程创建好孙子进程后直接退出, 将孙子进程托孤。 父进程等待子进程后就继续向下执行。 这样就能创建一批孙子进程并发访问!!!

void Start(){lg(Info, "tcpServer is running...");for (;;) // tcp协议也是一种一直处于运行的服务{// tcp是面向连接的, 所以他比udp还多了一步accept, 先将客户端与服务端连接起来。accept的返回值成功返回整数文件描述符,否则-1被返回, 错误码被设置// 1、获取新连接,struct sockaddr_in client; // 获取的是客户端的addrsocklen_t len = sizeof(client);int sockfd = accept(listensockfd_, (sockaddr *)&client, &len); // accept成功, 就能知道是谁连接的我。if (sockfd < 0)                                                // 关于这两个套接字, sockfd_的核心工作就只是把链接获取上来, 未来进行IO操作, 看的是sockfd。{lg(Waring, "listen error, errno: %d, strerror: %s", errno, strerror(errno));continue;}uint16_t clientport = ntohs(client.sin_port); // 网络序列转主机序列char clientip[32];inet_ntop(sockfd, &client.sin_addr, clientip, sizeof(clientip));// 2、根据新连接进行通信lg(Info, "get a new link..., sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip);// // version--1 单进程版// Service(sockfd, clientip, clientport);// close(sockfd);//version--2 多进程版pid_t id = fork();if (id == 0){close(listensockfd_);if (fork() > 0){exit(0);}Service(sockfd, clientip, clientport);close(sockfd);exit(0);//child}close(sockfd);pid_t rid = waitpid(id, nullptr, 0);}}

 version3版本

        第三版本是多线程版本, 多线程同样有主线程等待的问题,我们的主线程一旦要等待子线程, 那么就不能向后执行了。 所以为了能够并发, 就要对子线程进行分离。 

        下面是代码的改动:

        第一个改动地方就是创建一个ThreadData类:


class TcpServer;  //声明//创建这个类是为了能够将服务端对象传给线程去执行
class ThreadData
{
public:ThreadData(int fd, string ip, uint16_t port, TcpServer* const t): sockfd_(fd), clientip_(ip), clientport_(port), t_(t){}public:int sockfd_;string clientip_;uint16_t clientport_;
public:TcpServer* const t_;
};

        然后第二个改动的地方就是线程要执行的动作:

static void *pthrun(void *args){pthread_detach(pthread_self()); // 子线程直接分离// 一个进程打开的所有的文件描述符表, 其他进程能看到呢?ThreadData* td = static_cast<ThreadData*>(args);td->t_->Service(td->sockfd_, td->clientip_, td->clientport_);}

        第三个改动就是start函数里面执行服务的代码部分:

    void Start(){lg(Info, "tcpServer is running...");for (;;) // tcp协议也是一种一直处于运行的服务{// tcp是面向连接的, 所以他比udp还多了一步accept, 先将客户端与服务端连接起来。accept的返回值成功返回整数文件描述符,否则-1被返回, 错误码被设置// 1、获取新连接,struct sockaddr_in client; // 获取的是客户端的addrsocklen_t len = sizeof(client);int sockfd = accept(listensockfd_, (sockaddr *)&client, &len); // accept成功, 就能知道是谁连接的我。if (sockfd < 0)                                                // 关于这两个套接字, sockfd_的核心工作就只是把链接获取上来, 未来进行IO操作, 看的是sockfd。{lg(Waring, "listen error, errno: %d, strerror: %s", errno, strerror(errno));continue;}uint16_t clientport = ntohs(client.sin_port); // 网络序列转主机序列char clientip[32];inet_ntop(sockfd, &client.sin_addr, clientip, sizeof(clientip));// 2、根据新连接进行通信lg(Info, "get a new link..., sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip);// // version--1 单进程版// Service(sockfd, clientip, clientport);// close(sockfd);//version--2 多进程版// pid_t id = fork();// if (id == 0)// {//     close(listensockfd_);//     if (fork() > 0)//     {//         exit(0);//     }//     Service(sockfd, clientip, clientport);//     close(sockfd);//     exit(0);//     //child// }// close(sockfd);// pid_t rid = waitpid(id, nullptr, 0);// version--3多线程版本ThreadData* td = new ThreadData(sockfd, clientip, clientport, this);pthread_t tid;pthread_create(&tid, nullptr, pthrun, td);}}

运行结果

一开始一个server服务, 然后我们添加一个client就增加一个server服务, 增加一个client就增加一个服务。

version4版本

        version4线程池版本, version4的线程池版本就是我们创建好线程池。 然后创建一个任务类, 将我们要执行的服务作为任务类的一个结构。 这就要求我们的这个任务类里面必须有我们的ip, port, sockfd这样的。 字段。 并且我们还要引入几个头文件, 下面为代码:

首先要有两个新文件, 一个包含task类, 一个包含线程池类。

我们让task类里面包含服务的方法。就是run方法。


#include <iostream>
using namespace std;
#include"Log.hpp"
#include <vector>
#include <string>
extern Log lg;// Task.h文件里面包含了任务类, 这个是我们线程池要执行的任务
class Task
{
public:// 构造函数, 第一个参数data1, 第二个参数data2, 第三个参数是加减乘除的符号。 这个任务就是进行四则运算Task(int sockfd, string clientip, uint16_t clientport):sockfd_(sockfd), clientip_(clientip), clientport_(clientport){}~Task() {}// 执行任务的接口run(), 这个方法对三个变量进行判断, 然后进行运算。void run(){char buffer[4096];string temp = clientip_;while (true){ssize_t n = read(sockfd_, buffer, sizeof(buffer));if (n > 0){buffer[n] = 0;cout << "client say#: " << buffer << endl;string echo_string("tcpserver echo#  " + (string)buffer);write(sockfd_, echo_string.c_str(), echo_string.size());}else if (n == 0){lg(Info, "quit sockfd:%d ", sockfd_);exit(1);}else{lg(Waring, "Waring, sockfd:%d, clientport: %d, clientip: %s", sockfd_, clientport_, temp.c_str());}//}}// 仿函数, 为了方便我们的对象能够像函数一样使用。void operator()(){run();}private:// 每一个任务对象里面都有三个参数, 一个data1, 一个data2, 最后一个op_int sockfd_;string clientip_;uint16_t clientport_;
};

 然后线程池代友友们如果没写过可以自己实现一个,或者直接用博主的, 如下:


#pragma once
#include<iostream>
#include<pthread.h>
#include<vector>
#include<string>
using namespace std;
#include<queue>
#include<ctime>
#include<unistd.h>//对线程的属性做一下封装, 有利于线程池的保存以及后面的处理
struct ThreadInfo
{pthread_t tid_;string name_;   
};template<class T>
class ThreadPool
{static const int defaultnum = 5;  //默认的线程池的大小(线程池的大小就是里面包含的线程的数量)private://加锁解锁void Lock(){pthread_mutex_lock(&mutex_);}void Unlock(){pthread_mutex_unlock(&mutex_);}//唤醒线程, 线程是可以被挂起的(就比如信号量)。 当任务没有的时候,线程就要被挂起, 有任务后再唤醒void Wakeup(){pthread_cond_signal(&cond_);}void ThreadSleep(){pthread_cond_wait(&cond_, &mutex_);}bool IsQueueEmpty(){return tasks_.empty();}public://线程要执行的函数static void* HandlerTask(void* args){ThreadPool<T>* tp = static_cast<ThreadPool<T>*>(args);while(true) {tp->Lock();while (tp->tasks_.empty()){tp->ThreadSleep(); //如果队列里面没有任务了, 就让线程去休眠。}//否则就去拿到tasks里面的任务T t = tp->tasks_.front();tp->tasks_.pop();//tp->Unlock();t();  //每一个线程先对任务进行消费, 消费完成之后处理任务。    }}//运行这个线程池, 也就是先将线程创建出来。 然后去运行线程void Start(){int num = threads_.size();for (int i = 0; i < num; i++){threads_[i].name_ = "thread-";threads_[i].name_ += to_string(i);   pthread_create(&(threads_[i].tid_), nullptr, HandlerTask, this);}}//主线程给线程池发送任务, 注意, 这个任务一定是可以被储存起来的。 因为当我们的任务很多很多的时候, 我们的线程池内的线程要一个一个地对这些任务进行处理void Push(const T& t){Lock();tasks_.push(t);Wakeup();Unlock();}//获取单例//改编成单例的步骤里面只有这里要说一下, 就是为什么我们要套双层判断。 其实这里的最外面的一层的判断是我们另外加上去的。 为什么//要加这个判断呢? 就是如果我们不加最外层这一层判断。 那么每一个线程获取单例都要申请所,加锁。 不就是相当于所有的线程都在串行执行? 这就有效率问题。 //解决方案就是这个再加一层判断。 这样假如有四个线程。 那么一开始四个线程都在判断, 那么它们四个线程都进入了if里面。 然后就都申请锁, 但是只有第一个线程能够//进入第二层里面, 其他的进入不了。 那么当这一轮的四个线程都申请一次锁候就都退出了函数, 然后就都去做自己的事情了。 问题是, 当下次它们再来申请单例对象的时候它们连//第一层判断都成功不了了, 也就都不用加锁解锁了, 这就大大提高了效率!!!static ThreadPool<T>* GetInstance(int num = defaultnum){if (tp_ == nullptr){pthread_mutex_lock(&tp_->lock_);if (tp_ == nullptr) {tp_ = new ThreadPool<T>(num);}pthread_mutex_unlock(&tp_->lock_);}return tp_;}private://构造函数私有化, 只有Getinstance里面才能创建。 ThreadPool(int num = defaultnum):threads_(num){pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&cond_, nullptr);}//单例模式只有一个对象, 所以要将拷贝构造和拷贝赋值封住, 为了防止有人在外部重新拷贝一个对象。 ThreadPool(const ThreadPool<T>& tp) = delete;const ThreadPool<T>& operator=(const ThreadPool<T>& tp) = delete;~ThreadPool(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&cond_);}private:vector<ThreadInfo> threads_;   //线程都维护在vector当中, 这个就是线程池里面的线程的个数,queue<T> tasks_ ;              //向线程池中发送任务, 这个队列里面保存的就是我们的任务的数目。 pthread_mutex_t mutex_;        //锁,用来生产者线程(本份代码只是主线程)给线程池发送任务时候加锁使用以及消费者线程抢夺任务时加锁使用 pthread_cond_t cond_;          //条件变量, 用来没有任务的时候,消费者要挂起。 static pthread_mutex_t lock_;         //锁, 这个锁是为了在获取单例的时候能够让线程原子性的访问if (tp_ == nullptr)。static ThreadPool<T>* tp_;    //tp指针, 这就是唯一个单例对象。 };template<class T>
ThreadPool<T>* ThreadPool<T>::tp_ = nullptr;template<class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;

        然后就是start运行函数

void Start(){ThreadPool<Task>::GetInstance()->Start();lg(Info, "tcpServer is running...");for (;;) // tcp协议也是一种一直处于运行的服务{// tcp是面向连接的, 所以他比udp还多了一步accept, 先将客户端与服务端连接起来。accept的返回值成功返回整数文件描述符,否则-1被返回, 错误码被设置// 1、获取新连接,struct sockaddr_in client; // 获取的是客户端的addrsocklen_t len = sizeof(client);int sockfd = accept(listensockfd_, (sockaddr *)&client, &len); // accept成功, 就能知道是谁连接的我。if (sockfd < 0)                                                // 关于这两个套接字, sockfd_的核心工作就只是把链接获取上来, 未来进行IO操作, 看的是sockfd。{lg(Waring, "listen error, errno: %d, strerror: %s", errno, strerror(errno));continue;}uint16_t clientport = ntohs(client.sin_port); // 网络序列转主机序列char clientip[32];inet_ntop(sockfd, &client.sin_addr, clientip, sizeof(clientip));// 2、根据新连接进行通信lg(Info, "get a new link..., sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip);// // version--1 单进程版// Service(sockfd, clientip, clientport);// close(sockfd);//version--2 多进程版// pid_t id = fork();// if (id == 0)// {//     close(listensockfd_);//     if (fork() > 0)//     {//         exit(0);//     }//     Service(sockfd, clientip, clientport);//     close(sockfd);//     exit(0);//     //child// }// close(sockfd);// pid_t rid = waitpid(id, nullptr, 0);// // version--3多线程版本// ThreadData* td = new ThreadData(sockfd, clientip, clientport, this);// pthread_t tid;// pthread_create(&tid, nullptr, pthrun, td);//version--4线程池版本Task task_(sockfd, clientip, clientport);ThreadPool<Task>::GetInstance()->Push(task_);}}

 运行结果:

我们可以看到, 只要已启动服务端的瞬间, 就能创建出6个线程(一个主线程, 五个分线程)

 

 ——————以上就是本节全部内容哦, 如果对友友们有帮助的话可以关注博主, 方便学习更多知识哦!!!   


http://www.ppmy.cn/ops/130032.html

相关文章

海外著名门户媒体发稿之科技时报Tech Times - 大舍传媒

海外著名门户媒体发稿之科技时报Tech Times - 大舍传媒 在当今全球化的信息时代&#xff0c;企业和个人的声音想要在广阔的市场中脱颖而出&#xff0c;新闻媒体软文发稿、海外媒体发稿、海外媒体宣发以及境外媒体报道等手段显得尤为重要。而在众多海外媒体中&#xff0c;科技时…

【Python爬虫实战】多进程结合 BeautifulSoup 与 Scrapy 构建爬虫项目

#1024程序员节&#xff5c;征文# &#x1f308;个人主页&#xff1a;易辰君-CSDN博客 &#x1f525; 系列专栏&#xff1a;https://blog.csdn.net/2401_86688088/category_12797772.html ​ 前言 在大数据时代&#xff0c;爬虫技术是获取和处理网络数据的利器。面对需要处理大…

RabbitMQ几大应用问题

目录 1.幂等性保障 2.顺序性保障 3.消息积压 1.幂等性保障 &#xff08;1&#xff09;介绍幂等性 幂等性&#xff0c;最早期是数学和计算机科学中某些运算的性质&#xff0c;它们可以被多次应用&#xff0c;而不会改变初始应用的结果 比如说&#xff0c;重复多次调用同一…

深度学习:交叉熵损失(Cross Entropy Loss)

交叉熵损失&#xff08;Cross Entropy Loss&#xff09; 定义和数学表达 交叉熵损失是一种常用于评估概率分类模型性能的损失函数。它衡量的是模型预测的概率分布与真实分布之间的差异。交叉熵损失特别适用于分类任务中&#xff0c;尤其是多类分类问题。 数学上&#xff0c;…

【K8S】kubernetes-dashboard.yaml

https://raw.githubusercontent.com/kubernetes/dashboard/v3.0.0-alpha0/charts/kubernetes-dashboard.yaml 以下链接的内容&#xff1a; 由于国内访问不了&#xff0c;找到一些方法下载了这个文件内容&#xff0c; 部署是mages 对象的镜像 WEB docker.io/kubernetesui/dash…

智能护栏碰撞监测终端:内蒙古高速的安全守护者

​ ​一、引言 ​ ​在内蒙古那辽阔的大地上&#xff0c;高速公路如动脉般纵横交错&#xff0c;是经济发展与人员流动的重要通道。而保障这些公路安全运行的关键因素之一&#xff0c;便是智能护栏碰撞监测终端。它以其卓越的性能&#xff0c;实时为公路安全保驾护航&…

vue封装信号强度

图标下载链接: https://pan.baidu.com/s/1828AidkCKU1KTkw1SvBwQg?pwd4k7n 共五格信号 信号5为绿色&#xff0c;信号4为绿色&#xff0c;信号3为黄色&#xff0c;信号2为黄色&#xff0c;信号1为红色&#xff0c;信号0为灰色。 子组件 /components/SignalStrength/index.vu…

SpringMVC实战(3):拓展

四、RESTFul风格设计和实战 4.1 RESTFul风格概述 4.1.1 RESTFul风格简介 RESTful&#xff08;Representational State Transfer&#xff09;是一种软件架构风格&#xff0c;用于设计网络应用程序和服务之间的通信。它是一种基于标准 HTTP 方法的简单和轻量级的通信协议&…