C++使用ZeroMQ和MessagePack实现简单又轻量级的RPC框架

embedded/2025/3/17 2:53:34/

在现代的分布式系统中,远程过程调用(RPC)是一个非常重要的机制,它允许不同的服务或组件之间的通信,就像调用本地函数一样。本文将介绍如何使用ZeroMQ和MessagePack来构建一个轻量级的RPC框架,并提供一个简单的使用示例。

ZeroMQ简介

ZeroMQ(也称为0MQ)是一个高性能的异步消息库,旨在使用标准的、对等的传输协议实现消息的发送与接收。ZeroMQ的核心是提供一个消息队列,使得消息发送和接收更加高效和可靠。ZeroMQ支持多种消息模式,如请求/响应(Request/Reply)、发布/订阅(Publish/Subscribe)等。在上述代码中,ZMQ_REP表示响应模式,用于服务器端,而ZMQ_REQ则用于客户端,发起请求并等待响应。

MessagePack简介

MessagePack是一种高效的二进制序列化库,类似于JSON,但更紧凑、更快。它使用二进制格式来表示数据,使得数据传输更加高效。MessagePack可以用于多种编程语言,包括C++,并且不需要额外的依赖,如Boost。

安装MessagePack和ZeroMQ

安装MessagePack
MessagePack的C++版本可以通过多种方式进行安装,例如使用包管理器或从源代码编译安装。以下是通过CMake安装MessagePack的方法:

  1. 下载MessagePack源码:

    git clone https://github.com/msgpack/msgpack-c.git
    cd msgpack-c
    
  2. 创建构建目录并编译安装:

    mkdir build
    cd build
    cmake ..
    make
    sudo make install
    

安装ZeroMQ

下载ZeroMQ源码

可以从ZeroMQ的官方GitHub仓库下载最新的源码。使用git clone命令进行下载:

git clone https://github.com/zeromq/libzmq.git
cd libzmq
编译和安装

进入源码目录后,可以按照以下步骤进行编译和安装:

  1. 生成配置文件:使用autogen.sh脚本生成配置文件。

    ./autogen.sh
    
  2. 配置编译选项:使用configure脚本进行配置。可以添加各种选项来定制编译过程,例如指定安装路径等。

    ./configure
    

    如果需要指定安装路径(例如/usr/local),可以添加--prefix选项:

    ./configure --prefix=/usr/local
    
  3. 编译源码:使用make命令进行编译。

   make
  1. 安装库文件:使用make install命令安装编译好的库文件到系统中。
   sudo make install

如果需要运行在嵌入式linux上则需要交叉编译,如下:

# 下载源码
git clone https://github.com/zeromq/libzmq.git
cd libzmq# 配置交叉编译环境(示例:ARM架构)
export CC=arm-linux-gnueabihf-gcc
export CXX=arm-linux-gnueabihf-g++# 配置编译选项(禁用非必要功能)
./configure --host=arm-linux-gnueabihf \--prefix=/opt/zmq-embedded \--without-libsodium \      # 禁用加密--without-docs# 编译并安装
make -j4 && make install
验证安装

安装完成后,可以通过编写一个简单的ZeroMQ程序来验证安装是否成功。以下是一个简单的示例程序:

服务器端代码 (server.cpp)

#include <zmq.hpp>
#include <iostream>
#include <thread>
#include <chrono>int main() {zmq::context_t context(1);zmq::socket_t socket(context, ZMQ_REP);socket.bind("tcp://*:5555");while (true) {zmq::message_t request;// 等待客户端请求socket.recv(request, zmq::recv_flags::none);// 处理请求std::cout << "Received Hello" << std::endl;// 发送响应zmq::message_t reply(5);memcpy(reply.data(), "World", 5);socket.send(reply, zmq::send_flags::none);std::this_thread::sleep_for(std::chrono::seconds(1));}return 0;
}

客户端代码 (client.cpp)

#include <zmq.hpp>
#include <iostream>int main() {zmq::context_t context(1);zmq::socket_t socket(context, ZMQ_REQ);socket.connect("tcp://localhost:5555");for (int request_nbr = 0; request_nbr != 10; request_nbr++) {zmq::message_t request(5);memcpy(request.data(), "Hello", 5);socket.send(request, zmq::send_flags::none);zmq::message_t reply;socket.recv(reply, zmq::recv_flags::none);std::cout << "Received " << reply.to_string() << std::endl;}return 0;
}
编译示例程序

使用g++编译上述两个示例程序。确保编译时链接ZeroMQ库:

g++ -o server server.cpp -lzmq
g++ -o client client.cpp -lzmq
运行示例程序

首先运行服务器端程序,然后运行客户端程序。客户端将向服务器发送请求,服务器收到请求后将返回响应。

./server
# 在另一个终端中
./client
实现简单的RPC框架

下面是一个基于ZeroMQ和MessagePack的简单RPC框架的实现示例。该框架支持注册、调用和异步调用远程方法。

定义RpcHandler类型

using RpcHandler = std::function<msgpack::object(const msgpack::object&)>;

初始化服务器和客户端

bool LightweightRPC::initServer(const std::string& serverAddress) {try {// 创建服务器套接字m_serverSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_REP);// 绑定地址m_serverSocket->bind(serverAddress);return true;} catch (const zmq::error_t& e) {std::cerr << "ZeroMQ error: " << e.what() << std::endl;return false;}
}bool LightweightRPC::initClient(const std::string& serverAddress) {try {// 创建客户端套接字m_clientSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_REQ);// 连接到服务器m_clientSocket->connect(serverAddress);return true;} catch (const zmq::error_t& e) {std::cerr << "ZeroMQ error: " << e.what() << std::endl;return false;}
}

注册和调用方法

void LightweightRPC::registerMethod(const std::string& methodName, RpcHandler handler) {std::lock_guard<std::mutex> lock(m_methodsMutex);m_methods[methodName] = handler;
}msgpack::object LightweightRPC::callMethod(const std::string& methodName, const msgpack::object& params, int timeout_ms) {if (!m_clientSocket) {throw std::runtime_error("RPC client not initialized");}// 序列化请求zmq::message_t request = serializeRequest(methodName, params);// 发送请求m_clientSocket->send(request, zmq::send_flags::none);// 设置接收超时m_clientSocket->set(zmq::sockopt::rcvtimeo, timeout_ms);// 接收响应zmq::message_t reply;auto result = m_clientSocket->recv(reply);// 反序列化响应return deserializeResponse(reply);
}

异步调用方法

std::future<msgpack::object> LightweightRPC::callMethodAsync(const std::string& methodName, const msgpack::object& params) {// 创建promise和futureauto promise = std::make_shared<std::promise<msgpack::object>>();auto future = promise->get_future();// 在新线程中调用方法std::thread([this, methodName, params, promise]() {try {msgpack::object result = this->callMethod(methodName, params);promise->set_value(result);} catch (const std::exception& e) {promise->set_exception(std::make_exception_ptr(e));}}).detach();return future;
}

启动和停止服务器

void LightweightRPC::startServer(int threadCount) {if (!m_serverSocket) {throw std::runtime_error("RPC server not initialized");}// 启动服务器m_running = true;// 创建工作线程for (int i = 0; i < threadCount; ++i) {m_workerThreads.emplace_back(&LightweightRPC::workerThread, this);}
}void LightweightRPC::stopServer() {// 停止服务器m_running = false;// 等待所有工作线程结束for (auto& thread : m_workerThreads) {if (thread.joinable()) {thread.join();}}// 清空工作线程m_workerThreads.clear();
}

服务器处理请求的逻辑

void LightweightRPC::handleRequest(zmq::message_t& request, zmq::message_t& reply) {try {// 反序列化请求auto [methodName, params] = deserializeRequest(request);// 查找方法并调用RpcHandler handler;{std::lock_guard<std::mutex> lock(m_methodsMutex);auto it = m_methods.find(methodName);if (it == m_methods.end()) {reply = serializeError("Method not found: " + methodName);return;}handler = it->second;}// 调用方法处理函数msgpack::object result = handler(params);// 序列化响应reply = serializeResponse(result);} catch (const std::exception& e) {reply = serializeError(std::string("RPC Error: ") + e.what());}
}

序列化和反序列化消息

zmq::message_t LightweightRPC::serializeRequest(const std::string& methodName, const msgpack::object& params) {msgpack::sbuffer sbuf;msgpack::packer<msgpack::sbuffer> packer(sbuf);packer.pack_map(2);packer.pack("method");packer.pack(methodName);packer.pack("params");packer.pack(params);return zmq::message_t(sbuf.data(), sbuf.size());
}std::pair<std::string, msgpack::object> LightweightRPC::deserializeRequest(const zmq::message_t& request) {msgpack::object_handle oh = msgpack::unpack(static_cast<const char*>(request.data()), request.size());msgpack::object obj = oh.get();std::string methodName;msgpack::object params;if (obj.type != msgpack::type::MAP || obj.via.map.size != 2) {throw std::runtime_error("Invalid RPC request format");}for (uint32_t i = 0; i < obj.via.map.size; ++i) {auto key = obj.via.map.ptr[i].key;auto val = obj.via.map.ptr[i].val;if (key.type == msgpack::type::STR) {std::string keyStr(key.via.str.ptr, key.via.str.size);if (keyStr == "method") {methodName = std::string(val.via.str.ptr, val.via.str.size);} else if (keyStr == "params") {params = val;}}}if (methodName.empty()) {throw std::runtime_error("Method name not found in RPC request");}return std::make_pair(methodName, params);
}zmq::message_t LightweightRPC::serializeResponse(const msgpack::object& result) {msgpack::sbuffer sbuf;msgpack::packer<msgpack::sbuffer> packer(sbuf);packer.pack_map(1);packer.pack("result");packer.pack(result);return zmq::message_t(sbuf.data(), sbuf.size());
}msgpack::object LightweightRPC::deserializeResponse(const zmq::message_t& response) {msgpack::object_handle oh = msgpack::unpack(static_cast<const char*>(response.data()), response.size());msgpack::object obj = oh.get();if (obj.type != msgpack::type::MAP || obj.via.map.size != 1) {throw std::runtime_error("Invalid RPC response format");}auto key = obj.via.map.ptr[0].key;auto val = obj.via.map.ptr[0].val;if (key.type == msgpack::type::STR) {std::string keyStr(key.via.str.ptr, key.via.str.size);if (keyStr == "result") {return val;} else if (keyStr == "error") {std::string errorMsg(val.via.str.ptr, val.via.str.size);throw std::runtime_error(errorMsg);}}throw std::runtime_error("Invalid RPC response format");
}zmq::message_t LightweightRPC::serializeError(const std::string& errorMessage) {msgpack::sbuffer sbuf;msgpack::packer<msgpack::sbuffer> packer(sbuf);packer.pack_map(1);packer.pack("error");packer.pack(errorMessage);return zmq::message_t(sbuf.data(), sbuf.size());
}

使用示例

#include "lightweight_rpc.h"
#include <iostream>
#include <string>
#include <thread>
#include <chrono>// 示例RPC方法:加法
int add(int a, int b) {std::cout << "RPC Server: Calculating " << a << " + " << b << std::endl;return a + b;
}// 示例RPC方法:字符串连接
std::string concat(const std::string& a, const std::string& b) {std::cout << "RPC Server: Concatenating \"" << a << "\" and \"" << b << "\"" << std::endl;return a + b;
}// 服务器示例
void runServer() {try {std::cout << "Starting RPC server..." << std::endl;// 创建RPC服务器hub::RpcServer server("tcp://*:5555");// 注册RPC方法server.registerMethod("add", add);server.registerMethod("concat", concat);// 启动服务器server.start();std::cout << "RPC server started. Press Ctrl+C to stop." << std::endl;while (true) {std::this_thread::sleep_for(std::chrono::seconds(1));}} catch (const std::exception& e) {std::cerr << "Server error: " << e.what() << std::endl;}
}// 客户端示例
void runClient() {try {std::cout << "Starting RPC client..." << std::endl;// 创建RPC客户端hub::RpcClient client("tcp://localhost:5555");// 调用加法方法auto result1 = client.call<int, int>("add", 2000, 10, 20);int sum;result1.convert(sum);std::cout << "RPC Client: 10 + 20 = " << sum << std::endl;// 调用字符串连接方法auto result2 = client.call<std::string, std::string>("concat", 2000, "Hello, ", "World!");std::string concatResult;result2.convert(concatResult);std::cout << "RPC Client: concat result = " << concatResult << std::endl;// 异步调用示例auto future = client.callAsync<int, int>("add", 30, 40);std::cout << "RPC Client: Async call made, waiting for result..." << std::endl;// 等待结果auto result5 = future.get();int asyncSum;result5.convert(asyncSum);std::cout << "RPC Client: Async result 30 + 40 = " << asyncSum << std::endl;} catch (const std::exception& e) {std::cerr << "Client error: " << e.what() << std::endl;}
}int main(int argc, char* argv[]) {if (argc < 2) {std::cerr << "Usage: " << argv[0] << " [server|client]" << std::endl;return 1;}std::string mode = argv[1];if (mode == "server") {runServer();} else if (mode == "client") {runClient();} else {std::cerr << "Invalid mode. Use 'server' or 'client'." << std::endl;return 1;}return 0;
}
完整源码
#ifndef LIGHTWEIGHT_RPC_H
#define LIGHTWEIGHT_RPC_H#include <string>
#include <functional>
#include <unordered_map>
#include <mutex>
#include <memory>
#include <vector>
#include <future>
#include <thread>
#include <atomic>
#include <zmq.hpp>
#include <msgpack.hpp>namespace hub {/*** @brief 轻量级RPC框架,基于ZeroMQ和MessagePack*/
class LightweightRPC {
public:// RPC方法处理函数类型using RpcHandler = std::function<msgpack::object(const msgpack::object&)>;/*** @brief 获取LightweightRPC的单例实例* * @return LightweightRPC& 单例实例*/static LightweightRPC& getInstance();/*** @brief 初始化RPC服务器* * @param serverAddress 服务器地址,例如"tcp://*:5555"* @return true 如果初始化成功,false 否则*/bool initServer(const std::string& serverAddress);/*** @brief 初始化RPC客户端* * @param serverAddress 服务器地址,例如"tcp://localhost:5555"* @return true 如果初始化成功,false 否则*/bool initClient(const std::string& serverAddress);/*** @brief 注册RPC方法* * @param methodName 方法名* @param handler 处理函数*/void registerMethod(const std::string& methodName, RpcHandler handler);/*** @brief 调用远程方法* * @param methodName 方法名* @param params 参数* @param timeout_ms 超时时间(毫秒)* @return msgpack::object 返回结果*/msgpack::object callMethod(const std::string& methodName, const msgpack::object& params, int timeout_ms = 1000);/*** @brief 异步调用远程方法* * @param methodName 方法名* @param params 参数* @return std::future<msgpack::object> 返回结果的future*/std::future<msgpack::object> callMethodAsync(const std::string& methodName, const msgpack::object& params);/*** @brief 启动RPC服务器* * @param threadCount 工作线程数量*/void startServer(int threadCount = 1);/*** @brief 停止RPC服务器*/void stopServer();/*** @brief 关闭RPC客户端*/void closeClient();private:LightweightRPC();~LightweightRPC();LightweightRPC(const LightweightRPC&) = delete;LightweightRPC& operator=(const LightweightRPC&) = delete;// 服务器相关std::unique_ptr<zmq::context_t> m_context;std::unique_ptr<zmq::socket_t> m_serverSocket;std::unique_ptr<zmq::socket_t> m_clientSocket;std::unordered_map<std::string, RpcHandler> m_methods;std::mutex m_methodsMutex;std::vector<std::thread> m_workerThreads;std::atomic<bool> m_running;// 工作线程函数void workerThread();// 处理RPC请求void handleRequest(zmq::message_t& request, zmq::message_t& reply);// 序列化RPC请求zmq::message_t serializeRequest(const std::string& methodName, const msgpack::object& params);// 反序列化RPC请求std::pair<std::string, msgpack::object> deserializeRequest(const zmq::message_t& request);// 序列化RPC响应zmq::message_t serializeResponse(const msgpack::object& result);// 反序列化RPC响应msgpack::object deserializeResponse(const zmq::message_t& response);// 序列化RPC错误zmq::message_t serializeError(const std::string& errorMessage);
};// 便捷的RPC客户端类
class RpcClient {
public:/*** @brief 构造函数* * @param serverAddress 服务器地址*/RpcClient(const std::string& serverAddress);/*** @brief 析构函数*/~RpcClient();/*** @brief 调用远程方法* * @param methodName 方法名* @param params 参数* @param timeout_ms 超时时间(毫秒)* @return msgpack::object 返回结果*/template<typename... Args>msgpack::object call(const std::string& methodName, int timeout_ms = 1000, Args... args) {// 打包参数msgpack::type::tuple<Args...> params(args...);msgpack::sbuffer sbuf;msgpack::pack(sbuf, params);msgpack::object_handle oh = msgpack::unpack(sbuf.data(), sbuf.size());msgpack::object obj = oh.get();// 调用方法return LightweightRPC::getInstance().callMethod(methodName, obj, timeout_ms);}/*** @brief 异步调用远程方法* * @param methodName 方法名* @param params 参数* @return std::future<msgpack::object> 返回结果的future*/template<typename... Args>std::future<msgpack::object> callAsync(const std::string& methodName, Args... args) {// 打包参数msgpack::type::tuple<Args...> params(args...);msgpack::sbuffer sbuf;msgpack::pack(sbuf, params);msgpack::object_handle oh = msgpack::unpack(sbuf.data(), sbuf.size());msgpack::object obj = oh.get();// 异步调用方法return LightweightRPC::getInstance().callMethodAsync(methodName, obj);}
};// 便捷的RPC服务器类
class RpcServer {
public:/*** @brief 构造函数* * @param serverAddress 服务器地址* @param threadCount 工作线程数量*/RpcServer(const std::string& serverAddress, int threadCount = 1);/*** @brief 析构函数*/~RpcServer();/*** @brief 注册RPC方法* * @param methodName 方法名* @param handler 处理函数*/template<typename Func>void registerMethod(const std::string& methodName, Func handler) {LightweightRPC::getInstance().registerMethod(methodName, [handler](const msgpack::object& params) -> msgpack::object {// 调用处理函数return invokeHandler(handler, params);});}/*** @brief 启动RPC服务器*/void start();/*** @brief 停止RPC服务器*/void stop();private:int m_threadCount;// 调用处理函数并返回结果template<typename Func, typename... Args>static msgpack::object invokeHandler(Func handler, const msgpack::object& params) {try {// 解包参数msgpack::type::tuple<Args...> args;params.convert(args);// 调用处理函数auto result = callHandlerWithTuple(handler, args);// 打包结果msgpack::sbuffer sbuf;msgpack::pack(sbuf, result);msgpack::object_handle oh = msgpack::unpack(sbuf.data(), sbuf.size());return oh.get();} catch (const std::exception& e) {// 处理异常msgpack::sbuffer sbuf;msgpack::pack(sbuf, std::string("RPC Error: ") + e.what());msgpack::object_handle oh = msgpack::unpack(sbuf.data(), sbuf.size());return oh.get();}}// 使用tuple调用处理函数template<typename Func, typename Tuple, std::size_t... I>static auto callHandlerWithTupleImpl(Func handler, Tuple&& tuple, std::index_sequence<I...>) {return handler(std::get<I>(std::forward<Tuple>(tuple))...);}template<typename Func, typename Tuple>static auto callHandlerWithTuple(Func handler, Tuple&& tuple) {constexpr auto size = std::tuple_size<typename std::decay<Tuple>::type>::value;return callHandlerWithTupleImpl(handler, std::forward<Tuple>(tuple), std::make_index_sequence<size>{});}
};} // namespace hub#endif // LIGHTWEIGHT_RPC_H 
#include "lightweight_rpc.h"
#include <iostream>
#include <chrono>namespace hub {// 单例实例
LightweightRPC& LightweightRPC::getInstance() {static LightweightRPC instance;return instance;
}LightweightRPC::LightweightRPC() : m_running(false) {// 创建ZeroMQ上下文m_context = std::make_unique<zmq::context_t>(1);
}LightweightRPC::~LightweightRPC() {// 停止服务器stopServer();// 关闭客户端closeClient();
}bool LightweightRPC::initServer(const std::string& serverAddress) {try {// 创建服务器套接字m_serverSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_REP);// 绑定地址m_serverSocket->bind(serverAddress);return true;} catch (const zmq::error_t& e) {std::cerr << "ZeroMQ error: " << e.what() << std::endl;return false;}
}bool LightweightRPC::initClient(const std::string& serverAddress) {try {// 创建客户端套接字m_clientSocket = std::make_unique<zmq::socket_t>(*m_context, ZMQ_REQ);// 连接到服务器m_clientSocket->connect(serverAddress);return true;} catch (const zmq::error_t& e) {std::cerr << "ZeroMQ error: " << e.what() << std::endl;return false;}
}void LightweightRPC::registerMethod(const std::string& methodName, RpcHandler handler) {std::lock_guard<std::mutex> lock(m_methodsMutex);m_methods[methodName] = handler;
}msgpack::object LightweightRPC::callMethod(const std::string& methodName, const msgpack::object& params, int timeout_ms) {if (!m_clientSocket) {throw std::runtime_error("RPC client not initialized");}try {// 序列化请求zmq::message_t request = serializeRequest(methodName, params);// 发送请求m_clientSocket->send(request, zmq::send_flags::none);// 设置接收超时m_clientSocket->set(zmq::sockopt::rcvtimeo, timeout_ms);// 接收响应zmq::message_t reply;auto result = m_clientSocket->recv(reply);if (!result.has_value() || result.value() == 0) {throw std::runtime_error("RPC call timeout");}// 反序列化响应return deserializeResponse(reply);} catch (const std::exception& e) {std::cerr << "RPC call error: " << e.what() << std::endl;// 返回错误msgpack::sbuffer sbuf;msgpack::pack(sbuf, std::string("RPC Error: ") + e.what());msgpack::object_handle oh = msgpack::unpack(sbuf.data(), sbuf.size());return oh.get();}
}std::future<msgpack::object> LightweightRPC::callMethodAsync(const std::string& methodName, const msgpack::object& params) {// 创建promise和futureauto promise = std::make_shared<std::promise<msgpack::object>>();auto future = promise->get_future();// 在新线程中调用方法std::thread([this, methodName, params, promise]() {try {// 调用方法msgpack::object result = this->callMethod(methodName, params);// 设置结果promise->set_value(result);} catch (const std::exception& e) {// 设置异常promise->set_exception(std::make_exception_ptr(e));}}).detach();return future;
}void LightweightRPC::startServer(int threadCount) {if (!m_serverSocket) {throw std::runtime_error("RPC server not initialized");}// 停止现有的服务器stopServer();// 启动服务器m_running = true;// 创建工作线程for (int i = 0; i < threadCount; ++i) {m_workerThreads.emplace_back(&LightweightRPC::workerThread, this);}
}void LightweightRPC::stopServer() {// 停止服务器m_running = false;// 等待所有工作线程结束for (auto& thread : m_workerThreads) {if (thread.joinable()) {thread.join();}}// 清空工作线程m_workerThreads.clear();
}void LightweightRPC::closeClient() {// 关闭客户端套接字m_clientSocket.reset();
}void LightweightRPC::workerThread() {while (m_running) {try {// 接收请求zmq::message_t request;auto result = m_serverSocket->recv(request, zmq::recv_flags::dontwait);if (result.has_value() && result.value() > 0) {// 处理请求zmq::message_t reply;handleRequest(request, reply);// 发送响应m_serverSocket->send(reply, zmq::send_flags::none);} else {// 没有请求,休眠一段时间std::this_thread::sleep_for(std::chrono::milliseconds(10));}} catch (const std::exception& e) {std::cerr << "RPC server error: " << e.what() << std::endl;}}
}void LightweightRPC::handleRequest(zmq::message_t& request, zmq::message_t& reply) {try {// 反序列化请求auto [methodName, params] = deserializeRequest(request);// 查找方法处理函数RpcHandler handler;{std::lock_guard<std::mutex> lock(m_methodsMutex);auto it = m_methods.find(methodName);if (it == m_methods.end()) {// 方法不存在reply = serializeError("Method not found: " + methodName);return;}handler = it->second;}// 调用方法处理函数msgpack::object result = handler(params);// 序列化响应reply = serializeResponse(result);} catch (const std::exception& e) {// 处理异常reply = serializeError(std::string("RPC Error: ") + e.what());}
}zmq::message_t LightweightRPC::serializeRequest(const std::string& methodName, const msgpack::object& params) {// 创建请求对象msgpack::sbuffer sbuf;msgpack::packer<msgpack::sbuffer> packer(sbuf);// 打包方法名和参数packer.pack_map(2);packer.pack("method");packer.pack(methodName);packer.pack("params");packer.pack(params);// 创建ZeroMQ消息return zmq::message_t(sbuf.data(), sbuf.size());
}std::pair<std::string, msgpack::object> LightweightRPC::deserializeRequest(const zmq::message_t& request) {// 解包请求msgpack::object_handle oh = msgpack::unpack(static_cast<const char*>(request.data()), request.size());msgpack::object obj = oh.get();// 提取方法名和参数std::string methodName;msgpack::object params;if (obj.type != msgpack::type::MAP || obj.via.map.size != 2) {throw std::runtime_error("Invalid RPC request format");}for (uint32_t i = 0; i < obj.via.map.size; ++i) {auto key = obj.via.map.ptr[i].key;auto val = obj.via.map.ptr[i].val;if (key.type == msgpack::type::STR) {std::string keyStr(key.via.str.ptr, key.via.str.size);if (keyStr == "method") {if (val.type != msgpack::type::STR) {throw std::runtime_error("Method name must be a string");}methodName = std::string(val.via.str.ptr, val.via.str.size);} else if (keyStr == "params") {params = val;}}}if (methodName.empty()) {throw std::runtime_error("Method name not found in RPC request");}return std::make_pair(methodName, params);
}zmq::message_t LightweightRPC::serializeResponse(const msgpack::object& result) {// 创建响应对象msgpack::sbuffer sbuf;msgpack::packer<msgpack::sbuffer> packer(sbuf);// 打包结果packer.pack_map(1);packer.pack("result");packer.pack(result);// 创建ZeroMQ消息return zmq::message_t(sbuf.data(), sbuf.size());
}msgpack::object LightweightRPC::deserializeResponse(const zmq::message_t& response) {// 解包响应msgpack::object_handle oh = msgpack::unpack(static_cast<const char*>(response.data()), response.size());msgpack::object obj = oh.get();// 提取结果if (obj.type != msgpack::type::MAP || obj.via.map.size != 1) {throw std::runtime_error("Invalid RPC response format");}auto key = obj.via.map.ptr[0].key;auto val = obj.via.map.ptr[0].val;if (key.type == msgpack::type::STR) {std::string keyStr(key.via.str.ptr, key.via.str.size);if (keyStr == "result") {return val;} else if (keyStr == "error") {if (val.type == msgpack::type::STR) {std::string errorMsg(val.via.str.ptr, val.via.str.size);throw std::runtime_error(errorMsg);} else {throw std::runtime_error("Unknown RPC error");}}}throw std::runtime_error("Invalid RPC response format");
}zmq::message_t LightweightRPC::serializeError(const std::string& errorMessage) {// 创建错误对象msgpack::sbuffer sbuf;msgpack::packer<msgpack::sbuffer> packer(sbuf);// 打包错误packer.pack_map(1);packer.pack("error");packer.pack(errorMessage);// 创建ZeroMQ消息return zmq::message_t(sbuf.data(), sbuf.size());
}// RpcClient实现
RpcClient::RpcClient(const std::string& serverAddress) {if (!LightweightRPC::getInstance().initClient(serverAddress)) {throw std::runtime_error("Failed to initialize RPC client");}
}RpcClient::~RpcClient() {LightweightRPC::getInstance().closeClient();
}// RpcServer实现
RpcServer::RpcServer(const std::string& serverAddress, int threadCount) : m_threadCount(threadCount) {if (!LightweightRPC::getInstance().initServer(serverAddress)) {throw std::runtime_error("Failed to initialize RPC server");}
}RpcServer::~RpcServer() {stop();
}void RpcServer::start() {LightweightRPC::getInstance().startServer(m_threadCount);
}void RpcServer::stop() {LightweightRPC::getInstance().stopServer();
}} // namespace hub 
总结

通过上述步骤,我们可以构建一个简单的、基于ZeroMQ和MessagePack的RPC框架。该框架支持注册、调用和异步调用远程方法,具备较高的性能和可靠性。希望本文能帮助大家更好地理解和使用ZeroMQ与MessagePack来实现高效的分布式通信。


http://www.ppmy.cn/embedded/173218.html

相关文章

Qt程序基于共享内存读写CodeSys的变量

文章目录 1.背景2.结构体从CodeSys导出后导入到C2.1.将结构体从CodeSys中导出2.2.将结构体从m4文件提取翻译成c格式 3.添加RTTR注册信息4.读取PLC变量值5.更改PLC变量值 1.背景 在文章【基于RTTR在C中实现结构体数据的多层级动态读写】中&#xff0c;我们实现了通过字符串读写…

Vue3 + Vite + Yarn + Fabricjs构建的开源演示系统

Next-Slides 本项目灵感来源于 Prezi&#xff0c;旨在提供一个现代化的在线演示工具&#xff0c;可以作为传统PPT的替代方案。项目采用 TypeScript Vue3 Vite Yarn 技术栈构建&#xff0c;专注于在线教育和会议演示场景&#xff0c;提供交互式课件和智能课件功能。 主仓库…

嵌入式学习L6网络编程D5UDP编程

网络编程 UDPclient端 /*udp demo */ /* usage:* ./client serv_ip serv_port */ #include "net.h" void usage(char *s) {printf("\nThis is udp demo!\n");printf("\nUsage:\n\t %s serv_ip serv_port",s);printf("\n\t serv_ip: udp …

周志华机器学习西瓜书 第九章 聚类-学习笔记

一、聚类任务 聚类是无监督学习中非常典型的任务&#xff0c;聚类的目的是将数据样本划分为若干个通常不相交的子集&#xff0c;每一个子集成为"簇-cluster"&#xff0c;其即可以作为一个单独过程&#xff0c;用于找寻数据内在的分布结构&#xff0c;也可作为分类等其…

【数据分析大屏】基于Django+Vue汽车销售数据分析可视化大屏(完整系统源码+数据库+开发笔记+详细部署教程+虚拟机分布式启动教程)✅

目录 一、项目背景 二、项目创新点 三、项目功能 四、开发技术介绍 五、项目功能展示 六、权威视频链接 一、项目背景 汽车行业数字化转型加速&#xff0c;销售数据多维分析需求激增。本项目针对传统报表系统交互性弱、实时性差等痛点&#xff0c;基于DjangoVue架构构建…

鸿蒙系统liteos_m开发环境配置

在工作中开发基于HC32F4A0的鸿蒙liteos_m的操作系统移植时&#xff0c;开发环境选的命令行模式&#xff0c;官方的参考请看链接《快速入门概述》 在ubuntu18.04环境中安装时&#xff0c;安装库和工具集时官方提供的安装库的指令无法进行安装&#xff0c;部分库应该是有安装顺序…

IIS EXPRESS 虚拟目录经验谈!

最近在给客户开发一个事件提醒软件&#xff0c;用的是c# 版本是vs2022&#xff0c;在运行调试程序时&#xff0c;电脑会自动启动IIS Express,电脑右小角出现两个虚拟目录&#xff0c;对应两个端口&#xff0c;图示如下&#xff1a; 只能点击选择http://localhost:52726&#xf…

在群晖DS923+手动安装我Wordpress最新版

1.准备好群晖环境。 2.打开数据库&#xff0c;新建数据库。 数据库名&#xff0c;wordpress 3.下载最新版wordpress&#xff0c;并安装WordPress主程序 访问WordPress官方网站,下载最新版.在群晖Web文件夹下创建博客主目录,命名为wordpress(或其他任意文件名).将下载并解压的…