深入理解序列化/反序列化与TCP通信协议
一、序列化与反序列化
1.1 基本概念
- 序列化(Serialization): 将数据结构或对象状态转换为可存储/传输格式的过程
- 反序列化(Deserialization): 将序列化后的数据恢复为原始数据结构的过程
示例:
// 原始数据结构
struct Person {string name;int age;double salary;
};// 序列化 -> {"name":"John","age":30,"salary":5000.0}
// 反序列化 -> 重建Person对象
1.2 存在意义
1.3 实现方式对比
格式 | 可读性 | 体积 | 解析速度 | 典型应用场景 |
---|---|---|---|---|
JSON | 好 | 中等 | 较快 | Web API、配置文件 |
XML | 好 | 大 | 慢 | 企业级系统交互 |
Protobuf | 无 | 小 | 极快 | 高性能RPC通信 |
MessagePack | 无 | 小 | 快 | 移动端数据传输 |
二、TCP通信关键特性
2.1 全双工通信
实现原理:
// 内核数据结构示意
struct sock {struct sk_buff_head receive_queue; // 接收缓冲区struct sk_buff_head write_queue; // 发送缓冲区// ...
};
- 每个socket维护两个独立缓冲区
- 发送/接收操作互不阻塞
tcp全双工实现原理" />
2.2 面向字节流的特点
关键问题:
客户端发送:"HelloWorld"
服务端可能分两次接收:"Hello" 和 "World"
解决方案:
自定义协议格式:
[4字节长度头][有效载荷]
示例:
0x0000000A{"name":"John"}
2.3 应用层协议设计
推荐格式:
#pragma pack(push, 1)
struct PacketHeader {uint32_t length; // 有效载荷长度uint16_t version; // 协议版本uint32_t checksum; // 数据校验码
};
#pragma pack(pop)
设计要点:
- 固定长度报文头
- 包含版本控制字段
- 添加数据校验机制
- 使用网络字节序(大端)
三、JSON序列化实战(jsoncpp)
3.1 基础示例
#include <json/json.h>// 序列化
Json::Value root;
root["name"] = "John";
root["age"] = 30;
root["salary"] = 5000.0;Json::StreamWriterBuilder builder;
string jsonStr = Json::writeString(builder, root);// 反序列化
Json::CharReaderBuilder readerBuilder;
Json::Value parsedRoot;
string errs;
istringstream iss(jsonStr);
Json::parseFromStream(readerBuilder, iss, &parsedRoot, &errs);
3.2 调试技巧
启用格式化输出:
builder["indentation"] = "\t"; // 设置缩进
cout << Json::writeString(builder, root);
输出结果:
{"name": "John","age": 30,"salary": 5000.0
}
四、关键注意事项
-
字节序问题:
- 网络传输应统一使用大端字节序
- 使用
htonl()
/ntohl()
进行转换
-
版本兼容:
- 协议字段需要向后兼容
- 建议添加版本号字段
-
安全考虑:
- 限制最大报文长度
- 校验数据合法性
- 防止缓冲区溢出攻击
-
性能优化:
// 预分配缓冲区 jsonStr.reserve(1024); // 复用解析器实例 static thread_local Json::CharReaderBuilder readerBuilder;
-
错误处理:
if (!Json::parseFromStream(readerBuilder, iss, &parsedRoot, &errs)) {cerr << "JSON解析失败: " << errs << endl;// 实现重试或降级逻辑 }
五、扩展知识
5.1 二进制协议优化
对于高频通信场景,可考虑:
// 使用内存对齐结构
#pragma pack(push, 1)
struct BinaryProtocol {uint32_t magic; // 魔数标识 0x5A5AA5A5uint16_t cmdType; // 命令类型uint32_t bodyLen; // 数据体长度byte checksum; // 校验和// 变长数据体...
};
#pragma pack(pop)
5.2 现代序列化方案
- FlatBuffers:零拷贝反序列化
- Cap’n Proto:直接内存映射
- Avro:Schema动态验证
通过合理选择序列化方案和设计通信协议,可以构建出高效可靠的分布式系统。理解底层原理有助于在性能与开发效率之间做出最佳权衡。
板书
NetCal
TcpServer.cc
#include "TcpServer.hpp"
// #include "CommandExec.hpp"
#include <functional>
#include <memory>
#include "Protocol.hpp"
#include "Calculator.hpp"
#include "Daemon.hpp"// using task_t = function<std::string (std::string)>;
using cal_fun = std::function<Response(const Request &req)>;
// package不一定有完成报文,
// if 不完整-》继续读
// else 完整-》提取 ——》 反序列化-》构建Request对象-》调用计算模块
// using namespace class Parse
{
public:Parse(cal_fun c):_cal(c){}std::string Entry(std::string &package){// 判断报文完整性std::string message;std::string respstr;while(Decode(package, &message)){LOG(LogLevel::DEBUG)<<"Content:\n"<<message;if(message.empty()) break;// 2. 反序列化, message是一个曾经被序列化的requestRequest req;if (!req.Deserialize(message))break;std::cout << "#############" << std::endl;req.Print();std::cout << "#############" << std::endl;// 3. 计算Response resp = _cal(req);// 4. 序列化std::string res;resp.Serialize(res);LOG(LogLevel::DEBUG) << "序列化: \n" << res;// 5. 添加长度报头字段!Encode(res);LOG(LogLevel::DEBUG) << "Encode: \n" << res;// 6. 拼接应答respstr += res;}LOG(LogLevel::DEBUG) << "respstr: \n" << respstr;return respstr;}private:cal_fun _cal;
};int main(int argc, char *argv[])
{if(argc != 2){std::cout<<"Usage: ./server port"<<std::endl;Die(USAGE_ERR);}uint16_t port = std::stoi(argv[1]);ENABLE_CONSOLE_LOG();// Command cmd;// task_t task = [&cmd](std::string cmdstr){// return cmd.Execute(cmdstr);// };// Daemon(false, false);// 计算模块Calculator mycal;// 解析对象// printf("服务器启动\n");Parse myparse([&mycal](const Request &req){return mycal.Execute(req);});// 通信模块// 只负责IOstd::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>([&myparse](std::string &package){return myparse.Entry(package);}, port);tsvr->InitServer();tsvr->Start();return 0;
}
TcpServer.hpp
#pragma once#include <iostream>
#include <cstring>
#include <string>
#include <cerrno>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/wait.h>
#include <signal.h>
#include <pthread.h>
#include <functional>#include "Log.hpp"
#include "Common.hpp"
#include "InetAddr.hpp"
#include "ThreadPool.hpp"#define BACKLOG 8// using namespace LogModule;
using namespace ThreadPoolModule;
static const uint16_t gport = 8080;
using handler_t = std::function<std::string(std::string&)>;class TcpServer
{using task_t = std::function<void()>;struct ThreadData{int sockfd;TcpServer *self;};
public:TcpServer(handler_t handler, int port = gport):_handler(handler), _port(port),_isrunning(false){}void InitServer(){// 先监听_listensockfd = ::socket(AF_INET, SOCK_STREAM, 0);if(_listensockfd < 0){LOG(LogLevel::FATAL)<<"socket error";Die(SOCKET_ERR);}LOG(LogLevel::INFO)<<"socket create success, _listensocked is "<<_listensockfd;// 后bindstruct sockaddr_in local;// 网络通信用sockaddr_in, 本地用sockaddr_unmemset(&local, 0, sizeof local);local.sin_family = AF_INET;local.sin_port = htons(_port);local.sin_addr.s_addr = INADDR_ANY;int n = ::bind(_listensockfd, CONV(&local), sizeof(local));if(n < 0){LOG(LogLevel::FATAL)<<"bind error";Die(BIND_ERR);}LOG(LogLevel::INFO) << "bind success, sockfd is: "<<_listensockfd;// 设置为监听状态n = listen(_listensockfd, BACKLOG);if(n < 0){LOG(LogLevel::FATAL)<<"listen error";Die(LISTEN_ERR);}LOG(LogLevel::INFO)<<"listen success, socked is:"<<_listensockfd;// 此处可使用::signal(SIGCHLD, SIG_IGN)来将父子进程解绑}void HandlerRequest(int sockfd){LOG(LogLevel::INFO)<<"HandlerRequest, sockfd is:"<<sockfd;char inbuffer[4096];while(true){ssize_t n = recv(sockfd, inbuffer, sizeof inbuffer, 0);inbuffer[n] = 0;LOG(LogLevel::INFO)<<"server recived:"<<inbuffer;if(n > 0){inbuffer[n] = 0;std::string str(inbuffer);std::string cmd_result = _handler(str);// 回调::send(sockfd, cmd_result.c_str(), cmd_result.size(), 0);LOG(LogLevel::INFO)<<"server sent:"<<cmd_result;}else if(n == 0){LOG(LogLevel::INFO)<<"client quit"<<sockfd;break;}else{break;}}::close(sockfd);// 防止fd泄露}static void *ThreadEntry(void *args)// 设为静态函数就不用传递this{// 当使用多线程(不是封装好的线程池), pthread_thread_create的函数只能接收一个参数pthread_detach(pthread_self());ThreadData *data = (ThreadData *)args;data->self->HandlerRequest(data->sockfd);return nullptr;}void Start(){_isrunning = true;while(_isrunning){struct sockaddr_in peer;socklen_t peerlen = sizeof(peer);LOG(LogLevel::DEBUG)<<"accepting...";int sockfd = ::accept(_listensockfd, CONV(&peer), &peerlen);if(sockfd < 0){LOG(LogLevel::WARNING)<<"accept error:"<<strerror(errno);continue;}// 连接成功LOG(LogLevel::INFO)<<"accept success, sockfd is:"<<sockfd;InetAddr addr(peer);LOG(LogLevel::INFO)<<"client info:"<<addr.Addr();// 使用线程池实现ThreadPool<task_t>::getInstance()->Equeue([this, sockfd](){this->HandlerRequest(sockfd);});} }~TcpServer(){}private:int _listensockfd;// 监听socketuint16_t _port;bool _isrunning;// 处理上层任务入口handler_t _handler;
};
TcpClient.cc
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <arpa/inet.h>#include "Protocol.hpp" // 形成约定// ./client_tcp server_ip server_port
int main(int argc, char *argv[])
{if (argc != 3){std::cout << "Usage:./client_tcp server_ip server_port" << std::endl;return 1;}std::string server_ip = argv[1]; // "192.168.1.1"int server_port = std::stoi(argv[2]);int sockfd = ::socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0){std::cout << "create socket failed" << std::endl;return 2;}struct sockaddr_in server_addr;memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_port = htons(server_port);server_addr.sin_addr.s_addr = inet_addr(server_ip.c_str());// client 不需要显示的进行bind, tcp是面向连接的, connect 底层会自动进行bindint n = ::connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr));if (n < 0){std::cout << "connect failed" << std::endl;return 3;}// echo clientstd::string message;while (true){int x, y;char oper;std::cout << "input x: ";std::cin >> x;std::cout << "input y: ";std::cin >> y;std::cout << "input oper: ";std::cin >> oper;Request req(x, y, oper);// 1. 序列化req.Serialize(message);// 2. EncodeEncode(message);// 3. 发送n = ::send(sockfd, message.c_str(), message.size(), 0);if (n > 0){char inbuffer[1024];// 4. 获得应答int m = ::recv(sockfd, inbuffer, sizeof(inbuffer), 0);if (m > 0){inbuffer[m] = 0;std::string package = inbuffer;//TODOstd::string content;// 4. 读到应答完整--暂定, decodeDecode(package, &content);// 5. 反序列化Response resp;resp.Deserialize(content);// 6. 得到结构化数据std::cout << resp.Result() << "[" << resp.Code() << "]" << std::endl;}elsebreak;}elsebreak;}::close(sockfd);return 0;
}
Calculator.hpp
#pragma once
#include <iostream>
#include "Protocol.hpp"class Calculator
{
public:Calculator(){}Response Execute(const Request &req){// 我们拿到的都是结构化的数据,拿到的不就是类对象吗!!!Response resp;switch (req.Oper()){case '+':resp.SetResult(req.X() + req.Y());break;case '-':resp.SetResult(req.X() - req.Y());break;case '*':resp.SetResult(req.X() * req.Y());break;case '/':{if (req.Y() == 0){resp.SetCode(1); // 1 就是除0}else{resp.SetResult(req.X() / req.Y());}}break;case '%':{if (req.Y() == 0){resp.SetCode(2); // 2 就是mod 0}else{resp.SetResult(req.X() % req.Y());}}break;default:resp.SetCode(3); // 3 用户发来的计算类型,无法识别break;}return resp;}~Calculator(){}
};
Daemon.hpp
#pragma once#include <iostream>
#include <cstdlib>
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>#define ROOT "/"
#define devnull "/dev/null"void Daemon(bool ischdir, bool isclose)
{// 1. 守护进程一般要屏蔽到特定的异常信号signal(SIGCHLD, SIG_IGN);signal(SIGPIPE, SIG_IGN);// 2. 成为非组长if (fork() > 0)exit(0);// 3. 建立新会话setsid();// 4. 每一个进程都有自己的CWD,是否将当前进程的CWD更改成为 / 根目录if (ischdir)chdir(ROOT);// 5. 已经变成守护进程啦,不需要和用户的输入输出,错误进行关联了if (isclose){::close(0);::close(1);::close(2);}else{int fd = ::open(devnull, O_WRONLY);if (fd > 0){// 各种重定向dup2(fd, 0);dup2(fd, 1);dup2(fd, 2);close(fd);}}
}
Protocol.hpp
#pragma once#include <iostream>
#include <string>
#include <sstream>
#include <memory>
#include <jsoncpp/json/json.h>const std::string Sep = "\n\r";bool Encode(std::string &message)
{if(message.size() == 0) return false;std::string package = std::to_string(message.size()) + Sep + message + Sep;message = package;return true;
}bool Decode(std::string &package, std::string* content)
{auto pos = package.find(Sep);if(pos == std::string::npos) return false;std::string content_length_str = package.substr(0, pos);int content_length = std::stoi(content_length_str);int full_length = content_length_str.size() + content_length + 2 * Sep.size();if(package.size() < full_length) return false;*content = package.substr(pos + Sep.size(), content_length);package.erase(0, full_length);return true;
}class Request
{
public:Request(int x = 0, int y = 0, char oper = '+'):_x(x), _y(y), _oper(oper){}bool Serialize(std::string &out_string){Json::Value root;root["x"] = _x;root["y"] = _y;root["oper"] = _oper;Json::StreamWriterBuilder wb;std::unique_ptr<Json::StreamWriter> w(wb.newStreamWriter());std::stringstream ss;w->write(root, &ss);out_string = ss.str();return true;}bool Deserialize(std::string &in_string){Json::Value root;Json::Reader reader;bool parsingSuccessful = reader.parse(in_string, root);if(!parsingSuccessful){std::cout<<"Failed to parse JSON: "<< reader.getFormattedErrorMessages()<<std::endl;return false;}_x = root["x"].asInt();_y = root["y"].asInt();_oper = root["oper"].asInt();// char 也是 intreturn true;}void Print(){std::cout << _x << std::endl;std::cout << _oper << std::endl;std::cout << _y << std::endl;}int X() const { return _x; }int Y() const { return _y; }char Oper() const { return _oper; }
private:int _x;int _y;char _oper;
};class Response
{
public:Response():_result(0), _code(0){}Response(int result, int code):_result(result), _code(code){}bool Serialize(std::string &out_string){Json::Value root;root["result"] = _result;root["code"] = _code;Json::StreamWriterBuilder wb;std::unique_ptr<Json::StreamWriter> w(wb.newStreamWriter());std::stringstream ss;w->write(root, &ss);out_string = ss.str();return true;}bool Deserialize(std::string &in_string){Json::Value root;Json::Reader reader;bool parsingSuccessful = reader.parse(in_string, root);if(!parsingSuccessful){std::cout<<"Failed to parse JSON: "<<reader.getFormattedErrorMessages()<<std::endl;return false;}_result = root["result"].asInt();_code = root["code"].asInt();return true;}int Result() const { return _result; }int Code() const { return _code; }void SetResult(int res) { _result = res;}void SetCode(int c) {_code = c;}
private:int _result;int _code;
};