应用层自定义协议 + 序列化和反序列化
- 一.应用层
- 1.再谈 "协议"
- 2.序列化 和 反序列化
- 二. Jsoncpp
- 1.序列化
- 2.反序列化
- 三. Tcp全双工 + 面向字节流
- 四.自定义协议 + 保证报文的完整性
- 1.Makefile
- 2.Mutex.hpp
- 3.Cond.hpp
- 4.Log.hpp
- 5.Thread.hpp
- 6.ThreadPool.hpp
- 7.Common.hpp
- 8.InetAddr.hpp
- 9.Protocol.hpp
- 10.Calculator.hpp
- 11.Deamon.hpp
- 12.TcpServer.hpp
- 13.TcpServer.cc
- 14.TcpClient.cc
- 15.运行操作
一.应用层
我们程序员写的一个个解决我们实际问题,满足我们日常需求的网络程序,都是在应用层。
1.再谈 “协议”
- 协议是一种 “约定”。Socket API 的接口,在读写数据时,都是按 “字符串” 的方式来发送接收的。如果我们要传输一些 “结构化的数据” 怎么办呢?
- 其实,协议就是双方约定好的结构化的数据。
2.序列化 和 反序列化
问题:如何传输结构化数据?
- 序列化:是指将结构化信息转换为字符串的过程。简单来说,就是把内存中的对象变成一种可以保存到文件或者在网络上传输的格式。
- 反序列化:是序列化的逆过程,即把序列化后的字符串恢复为结构化信息。当接收方收到序列化的数据后,通过反序列化操作将其还原为原始的对象,以便在程序中继续使用。
- 网络传输:由于网络传输的是字节流,因此需要将对象序列化为字节序列进行传输,接收方再将其反序列化得到原始对象。
二. Jsoncpp
- Jsoncpp 是一个用于处理 JSON 数据的 C++ 库。它提供了将 JSON 数据序列化为字符串以及从字符串反序列化为 C++ 数据结构的功能。Jsoncpp 是开源的,广泛用于各种需要处理 JSON 数据的 C++ 项目中。
特性:
- 简单易用:Jsoncpp 提供了直观的 API,使得处理 JSON 数据变得简单。
- 高性能:Jsoncpp 的性能经过优化,能够高效地处理大量 JSON 数据。
- 全面支持:支持 JSON 标准中的所有数据类型,包括对象、数组、字符串、数字、布尔值和 null。
- 错误处理:在解析 JSON 数据时,Jsoncpp 提供了详细的错误信息和位置,方便开发者调试。
当使用 Jsoncpp 库进行 JSON 的序列化和反序列化时,确实存在不同的做法和工具类可供选择。以下是对 Jsoncpp 中序列化和反序列化操作的详细介绍:
安装:
# C++
Ubuntu: sudo apt-get install libjsoncpp-dev
Centos: sudo yum install jsoncpp-devel
1.序列化
序列化指的是将数据结构或对象转换为一种格式,以便在网络上传输或存储到文件中。
#include <iostream>
#include <string>
#include <sstream>
#include <memory>
#include <jsoncpp/json/json.h>int main()
{Json::Value root;root["name"] = "xzy";root["sex"] = "男";// 创建一个 Json::StreamWriterBuilder 对象Json::StreamWriterBuilder wb;// 设置禁止 Unicode 转义wb.settings_["emitUTF8"] = true;// 创建一个 Json::StreamWriter 对象std::unique_ptr<Json::StreamWriter> w(wb.newStreamWriter());// 将 JSON 数据写入流对象 ss 中std::stringstream ss;w->write(root, &ss);std::string str = ss.str();std::cout << str << std::endl;std::cout << str.size() << std::endl;return 0;
}
xzy@hcss-ecs-b3aa:~$ g++ test.cc -ljsoncpp
xzy@hcss-ecs-b3aa:~$ ./a.out
{"name" : "xzy","sex" : "男"
}
35
2.反序列化
反序列化指的是将序列化后的数据重新转换为原来的数据结构或对象。
#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>int main()
{// JSON 字符串std::string json_string = "{\"name\":\"张三\",\"age\":30, \"city\":\"北京\"}";// {"name":"张三","age":30, "city":"北京"}// 解析 JSON 字符串Json::Reader reader;Json::Value root;// 从字符串中读取 JSON 数据bool parsingSuccessful = reader.parse(json_string, root);if (!parsingSuccessful){// 解析失败,输出错误信息std::cout << "Failed to parse JSON: " << reader.getFormattedErrorMessages() << std::endl;return 1;}// 访问 JSON 数据std::string name = root["name"].asString();int age = root["age"].asInt();std::string city = root["city"].asString();// 输出结果std::cout << "Name: " << name << std::endl;std::cout << "Age: " << age << std::endl;std::cout << "City: " << city << std::endl;return 0;
}
xzy@hcss-ecs-b3aa:~$ ./a.out
Name: 张三
Age: 30
City: 北京
三. Tcp全双工 + 面向字节流
- write本质是将用户缓冲区buffer数据拷贝到发送缓冲区中、read本质是将接受缓冲区数据拷贝到用户缓冲区buffer中。
- TCP全双工:在任何一台主机上,TCP 连接既有发送缓冲区,又有接受缓冲区,所以在内核中,可以在发消息的同时,也可以收消息。这就是为什么一个 TCP sockfd 读写都是它的原因!
- TCP传输控制协议:实际数据什么时候发,发多少,出错了怎么办,由 TCP 控制。
- TCP面向字节流:发送/接收的报文是不完整的,需要应用层保证报文的完整性!
- UDP面向数据报:发送/接收的报文是完整的。
在网络通信的过程中,操作系统内部可能存在大量的报文,操作系统需要管理报文:先描述再组织!下面是部分内核代码:
四.自定义协议 + 保证报文的完整性
期望的报文格式:
- 其中有效载荷的内容我们用 JSON 数据!
- 如何保证获取完整的请求报文,需要我们处理!
- 如何将服务器脱离终端,用户退出登入/注销时,服务器不受影响?将服务器设计为守护进程!
1.Makefile
.PHONY:all
all:server_tcp client_tcpclient_tcp:TcpClient.ccg++ -o $@ $^ -std=c++17 -ljsoncpp
server_tcp:TcpServer.ccg++ -o $@ $^ -std=c++17 -lpthread -ljsoncpp.PHONY:clean
clean:rm -f client_tcp server_tcp
2.Mutex.hpp
#pragma once#include <pthread.h>namespace MutexModule
{class Mutex{Mutex(const Mutex &m) = delete;const Mutex &operator=(const Mutex &m) = delete;public:Mutex(){::pthread_mutex_init(&_mutex, nullptr);}~Mutex(){::pthread_mutex_destroy(&_mutex);}void Lock(){::pthread_mutex_lock(&_mutex);}void Unlock(){::pthread_mutex_unlock(&_mutex);}pthread_mutex_t *LockAddr() { return &_mutex; }private:pthread_mutex_t _mutex;};class LockGuard{public:LockGuard(Mutex &mutex): _mutex(mutex){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:Mutex &_mutex; // 使用引用: 互斥锁不支持拷贝};
}
3.Cond.hpp
#pragma#include <pthread.h>
#include "Mutex.hpp"namespace CondModule
{using namespace MutexModule;class Cond{public:Cond() {::pthread_cond_init(&_cond, nullptr);}~Cond() {::pthread_cond_destroy(&_cond);}void Wait(Mutex &mutex) // 线程释放曾经持有的锁, 不能拷贝{::pthread_cond_wait(&_cond, mutex.LockAddr());}void Signal(){::pthread_cond_signal(&_cond);}void Broadcast(){::pthread_cond_broadcast(&_cond);}private:pthread_cond_t _cond;};
}
4.Log.hpp
#pragma once#include <iostream>
#include <cstdio>
#include <string>
#include <filesystem>
#include <fstream>
#include <sstream>
#include <memory>
#include <unistd.h>
#include <time.h>
#include "Mutex.hpp"namespace LogModule
{using namespace MutexModule;// 获取系统时间std::string CurrentTime(){time_t time_stamp = ::time(nullptr); // 获取时间戳struct tm curr;localtime_r(&time_stamp, &curr); // 将时间戳转化为可读性强的信息char buffer[1024];snprintf(buffer, sizeof(buffer), "%4d-%02d-%02d %02d:%02d:%02d",curr.tm_year + 1900,curr.tm_mon + 1,curr.tm_mday,curr.tm_hour,curr.tm_min,curr.tm_sec);return buffer;}// 日志文件: 默认路径和默认文件名const std::string defaultlogpath = "./log/";const std::string defaultlogname = "log.txt";// 日志等级enum class LogLevel{DEBUG = 1,INFO,WARNING,ERROR,FATAL};std::string Level2String(LogLevel level){switch (level){case LogLevel::DEBUG:return "DEBUG";case LogLevel::INFO:return "INFO";case LogLevel::WARNING:return "WARNING";case LogLevel::ERROR:return "ERROR";case LogLevel::FATAL:return "FATAL";default:return "NONE";}}// 3. 策略模式: 刷新策略class LogStrategy{public:virtual ~LogStrategy() = default;// 纯虚函数: 无法实例化对象, 派生类可以重载该函数, 实现不同的刷新方式virtual void SyncLog(const std::string &message) = 0;};// 3.1 控制台策略class ConsoleLogStrategy : public LogStrategy{public:ConsoleLogStrategy() {}~ConsoleLogStrategy() {}void SyncLog(const std::string &message) override{LockGuard lockguard(_mutex);std::cout << message << std::endl;}private:Mutex _mutex;};// 3.2 文件级(磁盘)策略class FileLogStrategy : public LogStrategy{public:FileLogStrategy(const std::string &logpath = defaultlogpath, const std::string &logname = defaultlogname): _logpath(logpath), _logname(logname){// 判断_logpath目录是否存在if (std::filesystem::exists(_logpath)){return;}try{std::filesystem::create_directories(_logpath);}catch (std::filesystem::filesystem_error &e){std::cerr << e.what() << std::endl;}}~FileLogStrategy() {}void SyncLog(const std::string &message) override{LockGuard lockguard(_mutex);std::string log = _logpath + _logname;std::ofstream out(log, std::ios::app); // 以追加的方式打开文件if (!out.is_open()){return;}out << message << "\n"; // 将信息刷新到out流中out.close();}private:std::string _logpath;std::string _logname;Mutex _mutex;};// 4. 日志类: 构建日志字符串, 根据策略进行刷新class Logger{public:Logger(){// 默认往控制台上刷新_strategy = std::make_shared<ConsoleLogStrategy>();}~Logger() {}void EnableConsoleLog(){_strategy = std::make_shared<ConsoleLogStrategy>();}void EnableFileLog(){_strategy = std::make_shared<FileLogStrategy>();}// 内部类: 记录完整的日志信息class LogMessage{public:LogMessage(LogLevel level, const std::string &filename, int line, Logger &logger): _currtime(CurrentTime()), _level(level), _pid(::getpid()), _filename(filename), _line(line), _logger(logger){std::stringstream ssbuffer;ssbuffer << "[" << _currtime << "] "<< "[" << Level2String(_level) << "] "<< "[" << _pid << "] "<< "[" << _filename << "] "<< "[" << _line << "] - ";_loginfo = ssbuffer.str();}~LogMessage(){if(_logger._strategy){_logger._strategy->SyncLog(_loginfo);}}template <class T>LogMessage &operator<<(const T &info){std::stringstream ssbuffer;ssbuffer << info;_loginfo += ssbuffer.str();return *this;}private:std::string _currtime; // 当前日志时间LogLevel _level; // 日志水平pid_t _pid; // 进程pidstd::string _filename; // 文件名uint32_t _line; // 日志行号Logger &_logger; // 负责根据不同的策略进行刷新std::string _loginfo; // 日志信息};// 故意拷贝, 形成LogMessage临时对象, 后续在被<<时,会被持续引用,// 直到完成输入,才会自动析构临时LogMessage, 至此完成了日志的刷新,// 同时形成的临时对象内包含独立日志数据, 未来采用宏替换, 获取文件名和代码行数LogMessage operator()(LogLevel level, const std::string &filename, int line){return LogMessage(level, filename, line, *this);}private:// 纯虚类不能实例化对象, 但是可以定义指针std::shared_ptr<LogStrategy> _strategy; // 日志刷新策略方案};// 定义全局logger对象Logger logger;// 编译时进行宏替换: 方便随时获取行号和文件名
#define LOG(level) logger(level, __FILE__, __LINE__)// 提供选择使用何种日志策略的方法
#define ENABLE_CONSOLE_LOG() logger.EnableConsoleLog()
#define ENABLE_FILE_LOG() logger.EnableFileLog()
}
5.Thread.hpp
#pragma once#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>namespace ThreadModule
{using func_t = std::function<void(std::string)>;static int number = 1;// 强类型枚举: 枚举的成员名称被限定在枚举类型的作用域内enum class TSTATUS{NEW,RUNNING,STOP};class Thread{private:// 成员方法: 需要加上static表示不需要this指针, 否则回调函数报错// 而要执行_func()函数又需要由this指针, 所以Routine函数传this指针static void *Routine(void *args){Thread *t = static_cast<Thread *>(args);t->_func(t->Name());return nullptr;}void EnableDetach() { _joinable = false; }public:Thread(func_t func): _func(func), _status(TSTATUS::NEW), _joinable(true){_name = "Thread-" + std::to_string(number++);_pid = getpid();}~Thread() {}// 线程创建bool Start(){if (_status != TSTATUS::RUNNING){int n = pthread_create(&_tid, nullptr, Routine, this);if (n != 0)return false;_status = TSTATUS::RUNNING;return true;}return false;}// 线程退出bool Stop(){if (_status == TSTATUS::RUNNING){int n = ::pthread_cancel(_tid);if (n != 0)return false;_status = TSTATUS::STOP;return true;}return false;}// 线程等待bool Join(){if (_joinable){int n = ::pthread_join(_tid, nullptr);if (n != 0)return false;_status = TSTATUS::STOP;return true;}return false;}// 线程分离bool Detach(){EnableDetach();int n = ::pthread_detach(_tid);if (n != 0)return false;return true;}// 线程是否分离bool IsJoinable() { return _joinable; }std::string Name() { return _name; }private:std::string _name;pthread_t _tid;pid_t _pid;bool _joinable; // 线程是否是分离的, 默认不是func_t _func;TSTATUS _status;};
}
6.ThreadPool.hpp
#pragma once#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include <memory>
#include "Mutex.hpp"
#include "Cond.hpp"
#include "Thread.hpp"
#include "Log.hpp"namespace ThreadPoolModule
{using namespace MutexModule;using namespace CondModule;using namespace ThreadModule;using namespace LogModule;using thread_t = std::shared_ptr<Thread>;const static int defaultnum = 15;template <class T>class ThreadPool{private:bool IsEmpty() { return _taskq.empty(); }void HandlerTask(std::string name){LOG(LogLevel::INFO) << "线程: " << name << ", 进入HandlerTask执行逻辑";while (true){// 1. 拿任务: 访问共享资源, 需要加锁T task;{LockGuard lockguard(_mutex);while (IsEmpty() && _isrunning) // while替代if: 防止伪唤醒{_wait_num++;_cond.Wait(_mutex); // 没任务时: 线程在条件变量上阻塞等待_wait_num--;}// 2. 任务队列不为空 && 线程池退出if (IsEmpty() && !_isrunning)break;task = _taskq.front();_taskq.pop();}// 3. 处理任务: 并发处理, 不需要持有锁task();}LOG(LogLevel::INFO) << "线程: " << name << ", 退出";}ThreadPool(int num = defaultnum): _num(num), _wait_num(0), _isrunning(false){for (int i = 0; i < _num; i++){// 在类中: bind类的公有方法, 需要取地址 + 传入this指针// 在类外: bind类的公有方法, 需要取地址 + 传入类的匿名对象_threads.push_back(std::make_shared<Thread>(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1))); // push_back()会调用移动构造LOG(LogLevel::INFO) << "构建线程" << _threads.back()->Name() << "对象...成功";}}ThreadPool<T>(const ThreadPool<T> &) = delete;ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;public:~ThreadPool() {}// 获取单例对象static ThreadPool<T> *GetInstance(){// 若单例为空: 需要加锁创建单例对象if(instance == nullptr){LockGuard lockguard(_lock);if(instance == nullptr){LOG(LogLevel::INFO) << "单例首次被执行, 需要加载对象...";instance = new ThreadPool<T>();instance->Start();}}// 若单例不为空: 直接返回单例对象return instance;}void Equeue(T in){LockGuard lockguard(_mutex);if (!_isrunning) return;_taskq.push(in);if (_wait_num > 0){_cond.Signal(); // 唤醒线程}}void Start(){if (_isrunning) return;_isrunning = true;for (auto &thread_ptr : _threads){thread_ptr->Start();LOG(LogLevel::INFO) << "启动线程" << thread_ptr->Name() << "...成功";}}void Stop(){LockGuard lockguard(_mutex);if (_isrunning){// 1. 不能再新增任务了_isrunning = false;// 2. 让线程自己退出(唤醒所有的线程) && 历史任务被执行完if (_wait_num > 0){_cond.Broadcast();}}}void Wait(){for (auto &thread_ptr : _threads){thread_ptr->Join();LOG(LogLevel::INFO) << "回收线程" << thread_ptr->Name() << "...成功";}}private:int _num; // 线程的个数std::vector<thread_t> _threads; // 线程池std::queue<T> _taskq; // 共享资源: 任务队列int _wait_num; // 等待的线程数目bool _isrunning; // 线程池是否运行Mutex _mutex; // 锁Cond _cond; // 条件变量static ThreadPool<T> *instance; // 单例对象static Mutex _lock; // 用来保护单例};// 静态成员: 类内声明, 类外定义template<class T>ThreadPool<T> *ThreadPool<T>::instance = nullptr;template<class T>Mutex ThreadPool<T>::_lock;
}
7.Common.hpp
#pragma once#include <iostream>
#include <string>#define Die(code) \do \{ \exit(code); \} while (0)#define CONV(v) (struct sockaddr *)(v)enum
{USAGE_ERR = 1,SOCKET_ERR,BIND_ERR,LISTEN_ERR
};bool ParseOneLine(std::string &str, std::string *out, const std::string &sep)
{auto pos = str.find(sep);if (pos == std::string::npos)return false;*out = str.substr(0, pos);str.erase(0, pos + sep.size());return true;
}// Connection: keep-alive
// 解析后: key = Connection; value = keep-alive
bool SplitString(const std::string &header, const std::string sep, std::string *key, std::string *value)
{auto pos = header.find(sep);if (pos == std::string::npos)return false;*key = header.substr(0, pos);*value = header.substr(pos + sep.size());return true;
}
8.InetAddr.hpp
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>#include "Common.hpp"class InetAddr
{
private:// 端口号: 网络序列->主机序列void PortNetToHost(){_port = ::ntohs(_net_addr.sin_port);}// IP: 网络序列->主机序列void IpNetToHost(){char ipbuffer[64];::inet_ntop(AF_INET, &_net_addr.sin_addr, ipbuffer, sizeof(ipbuffer));_ip = ipbuffer;}public:InetAddr() {}InetAddr(const struct sockaddr_in &addr): _net_addr(addr){PortNetToHost();IpNetToHost();}InetAddr(uint16_t port): _port(port), _ip(""){_net_addr.sin_family = AF_INET;_net_addr.sin_port = ::htons(_port);_net_addr.sin_addr.s_addr = INADDR_ANY;}~InetAddr() {}bool operator==(const InetAddr &addr) { return _ip == addr._ip && _port == addr._port; }struct sockaddr *NetAddr() { return CONV(&_net_addr); }socklen_t NetAddrLen() { return sizeof(_net_addr); }std::string Ip() { return _ip; }uint16_t Port() { return _port; }std::string Addr() { return Ip() + ":" + std::to_string(Port()); }void SetAddr(sockaddr_in &client){_net_addr = client;PortNetToHost();IpNetToHost();}private:struct sockaddr_in _net_addr;std::string _ip; // 主机序列: IPuint16_t _port; // 主机序列: 端口号
};
9.Protocol.hpp
#pragma once#include <iostream>
#include <string>
#include <memory>
#include <jsoncpp/json/json.h>const std::string Sep = "\r\n"; // 回车换行符// 封装: {json} -> len\r\n{json}\r\n
bool Encode(std::string &message)
{if (message.size() == 0)return false;// 添加报头, 形成完整的报文std::string package = std::to_string(message.size()) + Sep + message + Sep;message = package;return true;
}// 解包: len\r\n{json}\r\n -> {json}
// 同时需要保证报文的完整性!
// 123\r\n
// 123\r\n{json}
// 123\r\n{json}\r\n
// 123\r\n{json}\r\n123\r\n{json}\r\n
bool Decode(std::string &package, std::string *content)
{auto pos = package.find(Sep);if (pos == std::string::npos)return false;std::string content_len_str = package.substr(0, pos);int content_len = std::stoi(content_len_str);// 完整报文的长度int full_len = content_len_str.size() + content_len + 2 * Sep.size();if (package.size() < full_len)return false;// {json}字符串的长度*content = package.substr(pos + Sep.size(), content_len);package.erase(0, full_len);return true;
}class Request
{
public:Request(): _x(0), _y(0), _oper(0){}Request(int x, int y, char oper): _x(x), _y(y), _oper(oper){}// 序列化bool Serialize(std::string &out_string){Json::Value root;root["x"] = _x;root["y"] = _y;root["oper"] = _oper;Json::StreamWriterBuilder wb;std::unique_ptr<Json::StreamWriter> w(wb.newStreamWriter());std::stringstream ss;w->write(root, &ss);out_string = ss.str();return true;}// 反序列化bool Deserialize(std::string &in_string){Json::Value root;Json::Reader reader;bool parsingSuccessful = reader.parse(in_string, root);if (!parsingSuccessful){std::cout << "Failed to parse JSON: " << reader.getFormattedErrorMessages() << std::endl;return false;}_x = root["x"].asInt();_y = root["y"].asInt();_oper = root["oper"].asInt();return true;}void Print(){std::cout << "x: " << _x << std::endl;std::cout << "oper: " << _oper << std::endl;std::cout << "y: " << _y << std::endl;}int X() const { return _x; }int Y() const { return _y; }char Oper() const { return _oper; }private:int _x;int _y;char _oper;
};class Response
{
public:Response(): _result(0), _code(0){}Response(int result, int code): _result(result), _code(code){}// 序列化bool Serialize(std::string &out_string){Json::Value root;root["result"] = _result;root["code"] = _code;Json::StreamWriterBuilder wb;std::unique_ptr<Json::StreamWriter> w(wb.newStreamWriter());std::stringstream ss;w->write(root, &ss);out_string = ss.str();return true;}// 反序列化bool Deserialize(std::string &in_string){Json::Value root;Json::Reader reader;bool parsingSuccessful = reader.parse(in_string, root);if (!parsingSuccessful){std::cout << "Failed to parse JSON: " << reader.getFormattedErrorMessages() << std::endl;return false;}_result = root["result"].asInt();_code = root["code"].asInt();return true;}void Print(){std::cout << "result: " << _result << std::endl;std::cout << "code: " << _code << std::endl;}int Result() const { return _result; }int Code() const { return _code; }void SetResult(int result) { _result = result; }void SetCode(int code) { _code = code; }private:int _result; // 结果int _code; // 错误码
};
10.Calculator.hpp
#pragma once#include <iostream>
#include "Protocol.hpp"class Calculator
{
public:Calculator() {}~Calculator() {}Response Execute(const Request &req){// 拿到的是结构化的对象Response resp;switch (req.Oper()){case '+':resp.SetResult(req.X() + req.Y());break;case '-':resp.SetResult(req.X() - req.Y());break;case '*':resp.SetResult(req.X() * req.Y());break;case '/':{if (req.Y() == 0){resp.SetCode(1); // 1: 除零}else{resp.SetResult(req.X() / req.Y());}}break;case '%':{if (req.Y() == 0){resp.SetCode(2); // 2: 模零}else{resp.SetResult(req.X() % req.Y());}}break;default:resp.SetCode(3); // 3: 类型无法识别break;}return resp;}
};
11.Deamon.hpp
#pragma once#include <iostream>
#include <cstdlib>
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>#define ROOT "/"
#define devnull "/dev/null"void Deamon(bool ischdir, bool isclose)
{// 1. 守护进程一般要屏蔽一些特定的信号signal(SIGCHLD, SIG_IGN);signal(SIGPIPE, SIG_IGN);// 2. 成为非组长进程: 创建子进程if(fork()) exit(0);// 3. 建立新会话setsid();// 4. 每一个进程都有自己的CWD, 是否将其修改为根目录if(ischdir) chdir(ROOT);// 5. 脱离终端: 将标准输入、输出重定向到字符文件"/dev/null"中if(isclose){::close(0);::close(1);::close(2);}else{// 建议这样!int fd = ::open(devnull, O_WRONLY);if(fd > 0){::dup2(fd, 0);::dup2(fd, 1);::dup2(fd, 2);::close(fd);}}
}
12.TcpServer.hpp
#pragma once#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <functional>#include <pthread.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>#include "Log.hpp"
#include "Common.hpp"
#include "InetAddr.hpp"
#include "ThreadPool.hpp"using namespace LogModule;
using namespace ThreadPoolModule;using task_t = std::function<void()>;
using handler_t = std::function<std::string(std::string&)>;#define BACKLOG 8uint16_t gport = 8080;class TcpServer
{
public:TcpServer(handler_t handler, int port = gport): _handler(handler), _port(port), _isrunning(false){}~TcpServer() {}void InitServer(){// 1. 创建TCP套接字_listensockfd = ::socket(AF_INET, SOCK_STREAM, 0);if (_listensockfd < 0){LOG(LogLevel::FATAL) << "socket error";Die(SOCKET_ERR);}LOG(LogLevel::INFO) << "socket create success, listensockfd is: " << _listensockfd;// 2. 绑定struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = ::htons(_port);local.sin_addr.s_addr = INADDR_ANY;int n = ::bind(_listensockfd, CONV(&local), sizeof(local));if (n < 0){LOG(LogLevel::FATAL) << "bind error";Die(BIND_ERR);}LOG(LogLevel::INFO) << "bind success";// 3. Tcp是面向连接的, 就要求Tcp随时随地等待被客户端连接, Tcp需要将socket设置为监听状态// 客户端请求连接时: 将客户端连接请求放入一个监听队列中n = ::listen(_listensockfd, BACKLOG);if (n < 0){LOG(LogLevel::FATAL) << "listen error";Die(LISTEN_ERR);}LOG(LogLevel::INFO) << "listen success";// ::signal(SIGCHLD, SIG_IGN); // 子进程退出, 父进程无需wait, 操作系统自动回收资源}// Tcp也是全双工的: 在同一个文件描述符中, 既可以读又可以写void HandlerRequest(int sockfd){LOG(LogLevel::INFO) << "开始处理客户端请求...";char inbuffer[4096];std::string package;while (true){// 约定: 客户端发过来的是一条完整的命令string// 1. 读取客户端发送来的消息ssize_t n = ::recv(sockfd, inbuffer, sizeof(inbuffer) - 1, 0); // 读取是不完善的if (n > 0) {inbuffer[n] = 0;package += inbuffer; // len\r\n{json}\r\n// 保证了报文的完整性std::string reault = _handler(package);if(reault.empty()) continue;// 2. 向客户端发送消息::send(sockfd, reault.c_str(), reault.size(), 0); // 写入也是不完善的}else if (n == 0){// read如果读取的返回值是0, 表示客户端退出LOG(LogLevel::INFO) << "客户端退出: " << sockfd;break;}else{// 读取失败break;}}::close(sockfd); // 关闭fd, 防止fd泄漏问题}void Start(){_isrunning = true;while (_isrunning){// 1. Tcp不能直接获取数据: 需要获取新连接// 阻塞等待, 直到有客户端连接请求进入监听队列, 然后从队列中取出一个请求, 为该客户端建立连接,// 并返回一个新的套接字描述符, 通过这个新的套接字描述符就可以与客户端进行数据的发送和接收struct sockaddr_in peer;socklen_t peerlen = sizeof(peer);int sockfd = ::accept(_listensockfd, CONV(&peer), &peerlen);if (sockfd < 0){LOG(LogLevel::WARNING) << "accept error: " << strerror(errno);continue;}LOG(LogLevel::INFO) << "accept success, sockfd is: " << sockfd;// 获取客户端的信息: IP + 端口号InetAddr addr(peer);LOG(LogLevel::INFO) << "client info: " << addr.Addr();// version1->单进程版本: 单客户端访问// HandlerRequest(sockfd);// version2->多进程版本: 多客户端访问// pid_t id = fork();// if (id == 0)// {// // 子进程: 继承父进程的文件描述符表, 有两张表// ::close(_listensockfd); // 关闭不需要的文件描述符: 监听套接字// if(fork() > 0) exit(0); // 子进程退出// // 孙子进程->孤儿进程: 不断与客户端的数据传输, 退出后被操作系统自动回收// HandlerRequest(sockfd);// exit(0);// }// // 父进程: 不断与客户端建立连接// ::close(sockfd); // 关闭不需要的文件描述符: socket// // waitpid不会阻塞// int rid = ::waitpid(id, nullptr, 0);// if(rid < 0)// {// LOG(LogLevel::WARNING) << "waitpid error";// }// version3->多线程版本: 多客户端访问// 主线程和新线程共享同一张文件描述符表// pthread_t tid;// ThreadData *data = new ThreadData();// data->sockfd = sockfd;// data->self = this;// pthread_create(&tid, nullptr, ThreadEntry, (void *)data);// version4->线程池版本: 多客户端访问(适合处理短任务)task_t task = std::bind(&TcpServer::HandlerRequest, this, sockfd); // 构建任务ThreadPool<task_t>::GetInstance()->Equeue(task);// ThreadPool<task_t>::GetInstance()->Equeue([this, &sockfd](){// this->HandlerRequest(sockfd);// });}}struct ThreadData{int sockfd;TcpServer *self;};// 类中ThreadEntry函数带有this指针, 需要加上static// 而没有this指针, 又无法调用HandlerReques函数// 解决方法: 封装ThreadData结构体static void *ThreadEntry(void *argc){pthread_detach(pthread_self()); // 线程分离: 线程退出时由操作系统自动回收, 防止类似僵尸进程的问题ThreadData *data = (ThreadData *)argc;data->self->HandlerRequest(data->sockfd);return nullptr;}void Stop(){_isrunning = false;}private:int _listensockfd; // 监听套接字uint16_t _port;bool _isrunning;handler_t _handler; // 处理客户端发来的任务
};
13.TcpServer.cc
#include <memory>#include "TcpServer.hpp"
#include "Protocol.hpp"
#include "Calculator.hpp"
#include "Deamon.hpp"using cal_t = std::function<Response(Request &req)>;// package不一定是完整的!
// 1. 不完整: 继续读取
// 2. 完整: 直接提取, 执行以下步骤
class Parse
{
public:Parse() {}Parse(cal_t cal): _cal(cal){}std::string Entry(std::string &package){std::string message;std::string respstr;// 1. 删除报头字段(连续处理多个报文)LOG(LogLevel::DEBUG) << "接收的报文: \n" << package;while (Decode(package, &message)){LOG(LogLevel::DEBUG) << "删除报头字段: \n" << message;if (message.empty())break;// 2. 反序列化Request req;req.Deserialize(message);LOG(LogLevel::DEBUG) << "反序列化: ";req.Print();// 3. 响应请求Response resp = _cal(req);LOG(LogLevel::DEBUG) << "计算: ";resp.Print();// 4. 序列化std::string res;resp.Serialize(res);LOG(LogLevel::DEBUG) << "序列化: \n" << res;// 5. 添加报头字段Encode(res);LOG(LogLevel::DEBUG) << "添加报头字段: \n" << res;// 6. 拼接应答respstr += res;}LOG(LogLevel::DEBUG) << "发送的报文: \n" << respstr;return respstr;}private:cal_t _cal;
};int main()
{ENABLE_FILE_LOG(); // 往文件中打印Deamon(false, false); // 守护进程// 1. 计算模块: 应用层Calculator cal;// 2. 解析模块: 表示层(序列化和反序列化)Parse parse([&cal](const Request &req){return cal.Execute(req);});// Parse parse(std::bind(&Calculator::Execute, std::ref(cal), std::placeholders::_1));// 3. 通信模块: 网络层std::shared_ptr<TcpServer> tsvr = std::make_shared<TcpServer>([&parse](std::string &package){return parse.Entry(package);});// std::shared_ptr<TcpServer> tsvr = std::make_shared<TcpServer>(std::bind(&Parse::Entry, std::ref(parse), std::placeholders::_1));tsvr->InitServer();tsvr->Start();return 0;
}
14.TcpClient.cc
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>#include "Common.hpp"
#include "Protocol.hpp"// ./client_tcp server_ip server_port
int main(int argc, char *argv[])
{if (argc != 3){std::cout << "Usage: ./client_tcp server_ip server_port" << std::endl;return 1;}std::string server_ip = argv[1];int server_port = std::stoi(argv[2]);// 1. 创建套接字int sockfd = ::socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0){std::cout << "socket error" << std::endl;return 2;}// 2. 客户端不需要显示的进行绑定, 但是需要连接服务器, 在建立连接的过程由操作系统进行绑定IP和端口号struct sockaddr_in server;memset(&server, 0, sizeof(server));server.sin_family = AF_INET;server.sin_port = ::htons(server_port);server.sin_addr.s_addr = ::inet_addr(server_ip.c_str());int n = ::connect(sockfd, CONV(&server), sizeof(server));if (n < 0){std::cout << "connect error" << std::endl;return 3;}std::string message;while (true){int x, y;char oper;std::cout << "input x: ";std::cin >> x;std::cout << "input y: ";std::cin >> y;std::cout << "input oper: ";std::cin >> oper;Request req(x, y, oper);// 1. 序列化req.Serialize(message);// 2. 添加报头字段Encode(message);// 3. 向服务器发送报文n = ::send(sockfd, message.c_str(), message.size(), 0);if (n > 0){// 4. 获取服务器响应后的报文char inbuffer[1024];int m = ::recv(sockfd, inbuffer, sizeof(inbuffer) - 1, 0);if (m > 0){inbuffer[m] = 0;std::string package = inbuffer; // 认为该是报文完整// 5. 删除报头字段std::string content;Decode(package, &content);// 6. 反序列化Response resp;resp.Deserialize(content);// 7. 得到结构化数据std::cout << resp.Result() << "[" << resp.Code() << "]" << std::endl;}else break;}else break;}::close(sockfd);return 0;
}
15.运行操作
xzy@hcss-ecs-b3aa:~$ ./server_tcp xzy@hcss-ecs-b3aa:~$ ps -axj | head -1 && ps -axj | grep server_tcpPPID PID PGID SID TTY TPGID STAT UID TIME COMMAND1 1369094 1369094 1369094 ? -1 Ss 1000 0:00 ./server_tcp
1365157 1369098 1369097 1365157 pts/2 1369097 S+ 1000 0:00 grep --color=auto server_tcpxzy@hcss-ecs-b3aa:~$ ./client_tcp 127.0.0.1 8080
input x: 100
input y: 200
input oper: +
300[0]
...