bprc二次封装

news/2024/9/23 22:24:52/

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 一、封装的思想
  • 二、封装单个服务的信道管理类
    • 1.成员变量
    • 2.成员函数
  • 三、封装总体的服务信道管理类
    • 1.成员变量
    • 2.成员函数
  • 四.etcd和brpc联合测试
    • 1.服务注册客户端
    • 2.服务发现客户端


一、封装的思想

brpc的二次封装:
brpc本质上来说–进行rpc调用的,但是向谁调用什么服务得管理起来 – 搭配etcd实现的注册中心管理
原因:通过注册中心,能够获知谁能提供什么服务,进而能够连接它发起这个服务调用。
封装思想:
主要是管理起来网络通信的信道----将不同服务节点主机的通信信道管理起来
封装的是服务节点信道的管理,而不是rpc调用的管理。
封装:
1.指定服务的信道管理类:
一个服务可能会有多个节点提供服务,每个节点都有自己的channe
建立服务与信道的映射关系,并且关系是一对多,采用RR轮转策略进行获取
2.总体的服务信道管理类
将多个服务的信道管理对象管理起来

二、封装单个服务的信道管理类

1.成员变量

1.需要一个string来表视当前服务的信道管理类的服务名称因为我们是把一个服务的信道管理起来,所以需要知道该服务的名称。
2.一个服务可能会有多个主机,也就是一个服务可能会有多个信道。所以我么需要一个数组来存储当前服务节点的信道。
3.当一个节点主机下线后,我们需要删除释放这个channel,所以我们需要一个哈希表来进行主机地址与channel的映射关系,方便我们进行删除。
4.需要一个rr轮转下标,当需要对该服务进行rpc调用时,可以使用rr轮转负载均衡式的返回channel.
5.需要一个互斥锁,保证容器的线程安全。

 std::mutex _mutex;int32_t _index; //rr轮转下标std::string _service_name;  //服务名称std::vector<ChannelPtr> _channels;  //当前服务对应的通信集合std::unordered_map<std::string,ChannelPtr> _hosts;  //主机地址与信道映射关系

2.成员函数

构造函数中,只需要初始化服务名称,然后轮转下标初始化为0.

ServiceChannel(const std::string& service_name):_index(0),_service_name(service_name){}

当服务上线一台主机时,需要创建一个channel进行管理,那么该函数中需要有一个参数,就是主机的地址。

void append(const std::string& host){//创建一个channel,并进行连接ChannelPtr channel = std::make_shared<brpc::Channel>();brpc::ChannelOptions options;options.connect_timeout_ms = -1;options.timeout_ms = -1;options.max_retry = 3;options.protocol = "baidu_std";int ret = channel->Init(host.c_str(), &options);if (ret == -1) {LOG_ERROR("初始化{}-{}信道失败!", _service_name, host);return;}std::unique_lock<std::mutex> lock(_mutex);_hosts.insert(std::make_pair(host, channel));_channels.push_back(channel);}

对应的,当主机下线时,需要将该主机的channel删除并释放

 void remove(const std::string& host){std::unique_lock<std::mutex> lock(_mutex);auto it = _hosts.find(host);if (it == _hosts.end()) {LOG_WARN("{}-{}节点删除信道时,没有找到信道信息!", _service_name, host);return;}for (auto vit = _channels.begin(); vit != _channels.end(); ++vit) {if (*vit == it->second) {_channels.erase(vit);break;}}_hosts.erase(it);}

当需要对该服务发起rpc调用时,我们可以返回该服务的channel。通过rr轮转方式实现一个负载均衡。

ChannelPtr choose(){std::unique_lock<std::mutex> lock(_mutex);if (_channels.size() == 0) {LOG_ERROR("当前没有能够提供 {} 服务的节点!", _service_name);return ChannelPtr();}int32_t idx = _index++ % _channels.size();return _channels[idx];}

三、封装总体的服务信道管理类

我们的服务有多个,而每一个服务也会有多个主机节点。因此我们需要把这些服务总的管理起来。

1.成员变量

1.需要一个哈希表来实现服务名称和该服务的信道管理类的映射关系。
2.项目有多个服务,而我们并不是对所有的服务都要关心的。我们需要使用一个unordered_set来记录我们关心的服务。
3.一个互斥锁,保证容器的线程安全。

2.成员函数

添加一个关心的服务,只需要把服务名称传递进来,进行一个插入即可

 void declared(const std::string& service_name){std::unique_lock<std::mutex> lock(_mutex);_follow_services.insert(service_name);}

当一个对应的服务上线一个主机节点时,为该服务节点主机创建一个channel,并管理起来。这里直接调用上面的服务的信道管理类就行。

void onServiceOnline(const std::string &service_instance,const std::string& host){std::string service_name = getServiceName(service_instance);ServiceChannel::ptr service;{std::unique_lock<std::mutex> lock(_mutex);auto fit = _follow_services.find(service_name);if (fit == _follow_services.end()) {LOG_DEBUG("{}-{} 服务上线了,但是当前并不关心!", service_name, host);return;}//先获取管理对象,没有则创建,有则添加节点auto sit = _services.find(service_name);if (sit == _services.end()) {service = std::make_shared<ServiceChannel>(service_name);_services.insert(std::make_pair(service_name, service));}else {service = sit->second;}}if (!service) {LOG_ERROR("新增 {} 服务管理节点失败!", service_name);return ;}service->append(host);LOG_DEBUG("{}-{} 服务上线新节点,进行添加管理!", service_name, host);}

服务主机节点下线时,需要找到对应服务的信道管理类,来进行一个对应主机节点的channel的删除和释放。

void onServiceOffline(const std::string &service_instance,const std::string& host){std::string service_name = getServiceName(service_instance);ServiceChannel::ptr service;{std::unique_lock<std::mutex> lock(_mutex);auto fit = _follow_services.find(service_name);if (fit == _follow_services.end()) {LOG_DEBUG("{}-{} 服务下线了,但是当前并不关心!", service_name, host);return;}//先获取管理对象,没有则创建,有则添加节点auto sit = _services.find(service_name);if (sit == _services.end()) {LOG_WARN("删除{}服务节点时,没有找到管理对象", service_name);return;}service = sit->second;}service->remove(host);LOG_DEBUG("{}-{} 服务下线节点,进行删除管理!", service_name, host);}

上面的两个接口,是当服务的主机节点上线和下线时调用的,形参中的两个参数就是服务名称和主机节点。
那么什么时候会调用这两个函数呢?我们怎么知道什么时候主机上线下线呢?在前面我们封装了一个etcd,在服务发现客户端中,我们是同watcher来监控一个服务。当服务的数据发送改变时,就会调用我们传入的两个回调函数,那么这里的两个函数就可以作为俩个回调函数传入。也就是当服务新增节点和删除节点时调用。

获取指定服务的节点信道,传入指定服务,返回一个服务的信道。

//获取指定服务的节点信道ServiceChannel::ChannelPtr choose(const std::string& service_name){std::unique_lock<std::mutex> lock(_mutex);auto sit = _services.find(service_name);if (sit == _services.end()) {LOG_ERROR("当前没有能够提供 {} 服务的节点!", service_name);return ServiceChannel::ChannelPtr();}return sit->second->choose();}

四.etcd和brpc联合测试

前面我们封装了etcd,有一个服务注册客户端和一个服务发现客户端。那么我们可以用刚刚封装的brpc和etcd进行一个连接测试。

我们是可以用brpc进行rpc调用的,但是向谁进行rpc调用呢?我们可以使用服务发现客户端对服务进行一个监控,当服务注册客户端注册一个服务后,服务发现客户端就会触发一个回调函数,在这个回调函数就会对触发的事件进行一个判断,如果是新增事件,则调用用户设置的put_cb,如果是删除事件,则调用用户的del_cb;那么我们是不是可以把我们刚刚封装的bprc中的onServiceOnline和onServiceOffline作为对应的回调函数设置进去呢?

大体思路:

  • 服务注册客户端这边,首先创建一个brpc服务器,然后新增EchoService服务,并启动服务器。最后向etcd中注册一个/service/echo/instance服务,并指明自己的主机地址,表明自己的主机可以提供echo服务。
  • 服务发现客户端这边,首先创建一个总的信道管理类对象,然后构造一个服务发现对象,对/service/echo服务进行监控。接着通过总的信道管理类对象获取到一个channel,通过这个channel,就可以创建stub进行服务调用。

1.服务注册客户端

首先创建一个服务器对象,因为我们要提供rpc服务。

    brpc::Server server;

接着新增服务。

//3.向服务器对象中,新增EchoService服务EchoServiceImpl echo_service;int ret = server.AddService(&echo_service,brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);if (ret == -1) {std::cout << "添加Rpc服务失败!\n";return -1;}

启动服务器,监听8085端口。

    brpc::ServerOptions options;options.idle_timeout_sec = -1;options.num_threads = 1;ret = server.Start(8085,&options);if (ret == -1) {std::cout << "启动服务器失败!\n";return -1;}

向etcd注册中心注册一个/service/echo/instance服务,设置val为自己的主机地址。代表当前主机可以提供echo服务。
此时etcd注册中心就会有一个{key:/service/echo/instance ; val:127.0.0.1:8085};

//5.注册服务Registry::ptr rClient = std::make_shared<Registry>("127.0.0.1:2379");rClient->registry("/service/echo/instance","127.0.0.1:8085");

2.服务发现客户端

先构建以恶个总的信道管理对象,需要通过这个对象来"知道"哪个主机可以提供服务。但具体是怎么发现的呢?可以看到,我们调用了declared这个函数代表我们关心/service/echo这个服务。此时这个字符串就会被插入到一个unordered_set中。其次我们将这个对象中的两个成员函数获取到了。这两个函数后续我们要设置进服务发现客户端中。

 //1.先构建Rpc信道管理对象auto sm = std::make_shared<ServiceManager>();sm->declared("/service/echo");auto put_cb = std::bind(&ServiceManager::onServiceOnline,sm.get(),std::placeholders::_1,std::placeholders::_2);auto del_cb = std::bind(&ServiceManager::onServiceOffline,sm.get(),std::placeholders::_1,std::placeholders::_2);

构造服务发现对象,这里的构造函数我们传入了四个参数,第一个参数是etcd注册中心的地址,因为我们要和etcd通信.
第二个参数是我们要监控的服务。我们的服务发现对象中有一个成员watcher,他会对指定的服务进行监控,如服务数据有变化,就会进行处理。而这里的第二个参数就是watcher进行监控的服务。另外这个监控它是会进行递归的,/service/下的所有目录发现变化,他都可以感知到。第三个第四个函数就是我们设置进的回调函数,当watcher感知到服务发送变化,会获取到发送变化的服务的key和val,也就是服务名称和主机地址。这是就可以调用我们设置的两个回调函数,我们的回调函数刚好可以接受两个参数。

而这两个参数一个是/service/echo/instance,一个是127.0.0.1:8085。我们的服务信道管理类是管理 服务的信道的。我们服务发现客户端关心的是/service/echo服务。而这里的/service/echo/instance本质上是一个/service/echo服务。所以在onServiceOnline和onServiceOffline中我们对这个参数进行了一个劫取子串,只需要/service/echo这一部分。然后为这一部分创建一个服务信道管理类对象。

//2. 构造服务发现对象Discovery::ptr dclient = std::make_shared<Discovery>("127.0.0.1:2379","/service", put_cb, del_cb);

接着就是获取服务的channel,我们关心哪个服务就获取什么服务的channel.

 //3. 通过Rpc信道管理对象,获取提供Echo服务的信道ServiceChannel::ChannelPtr channel = sm->choose("/service/echo");if(channel == nullptr){return -1;}

通过channel来发起rpc调用

//4.发起rpc调用example::EchoService_Stub stub(channel.get());example::EchoRequest req;req.set_message("你好,Lkm");brpc::Controller *cntl = new brpc::Controller();example::EchoResponse *rsp = new example::EchoResponse();stub.Echo(cntl, &req, rsp, nullptr);if(cntl->Failed() == true){std::cout << "Rpc调用失败:" << cntl->ErrorText() << std::endl;return;}std::cout << "收到响应: " << rsp->message() << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));

http://www.ppmy.cn/news/1529518.html

相关文章

线程同步:消费者模型(非常重要的模型)

一.线程同步的概念 线程同步&#xff1a;是指在互斥的基础上&#xff0c;通过其它机制实现访问者对 资源的有序访问。条件变量&#xff1a;线程库提供的专门针对线程同步的机制线程同步比较典型的应用场合就是 生产者与消费者 二、生产者与消费者模型原理 在这个模型中&…

10月23-27日六西格玛绿带公开课即将在雄安新区开课

在金秋送爽、硕果累累的季节里&#xff0c;天行健管理咨询公司宣布了一项重要决定——定于10月23日至27日&#xff0c;在充满未来气息的河北雄安新区&#xff0c;举办一场旨在提升企业质量管理水平、培养精英人才的六西格玛绿带公开课。此次课程的举办&#xff0c;不仅是对当前…

Android MediaPlayer + GLSurfaceView 播放视频

Android使用OpenGL 播放视频 概述TextureView的优缺点OpenGL的优缺点 实现复杂图形效果的场景参考 概述 在Android开发中&#xff0c;使用OpenGL ES来渲染视频是一种常见的需求&#xff0c;尤其是在需要实现自定义的视频播放界面或者视频特效时。结合MediaPlayer&#xff0c;我…

【RTT-Studio】详细使用教程十六:DAC7311外部DAC使用

文章目录 一、简介二、驱动程序三、DAC设置注册四、完整代码五、测试验证 一、简介 8 位 DAC5311、10 位 DAC6311 和 12 位 DAC7311 (DACx311) 是低功耗、单通道、电压输出数模转换器 (DAC)。DACx311 在正常工作状态下具有低功耗&#xff08;5V 时为 0.55mW&#xff0c;断电模式…

哪个快?用300万个图斑测试ArcGIS Pro的成对叠加与经典叠加

​​​ 点击下方全系列课程学习 点击学习—>ArcGIS全系列实战视频教程——9个单一课程组合系列直播回放 点击学习——>遥感影像综合处理4大遥感软件ArcGISENVIErdaseCognition 在使用ArcGIS Pro的过程中&#xff0c;很多朋友发现&#xff0c;Pro有个成对叠加工具集。很多…

是德科技Keysight N4433D ECal模块 26.5GHz 4端口3.5毫米

是德科技Keysight N4433D ECal模块 26.5GHz 4端口3.5毫米 Keysight N4433D 射频电子校准 (ECal) 模块使是德科技矢量网络分析仪的校准变得快速、简单和准确。N4433D 是一款精密 4 端口电子校准件模块&#xff0c;支持选择 3.5 毫米连接器&#xff0c;最高可混频至 26.5 GHz。选…

数据结构:时间复杂度与空间复杂度

目录 算法效率时间复杂度大O渐进表示法时间复杂度计算案例 空间复杂度空间复杂度案例 复杂度算法题 算法效率 算法在编写成可执行程序后&#xff0c;运⾏时需要耗费时间资源和空间(内存)资源 。因此衡量⼀个算法的好坏&#xff0c;⼀般是从时间和空间两个维度来衡量的&#xf…

Spring 全局与局部异常处理解析 + 实战案例

Spring 全局与局部异常处理解析 实战案例 在 Java 开发中&#xff0c;异常处理是非常重要的一环。它不仅能够提升代码的健壮性&#xff0c;还能为用户提供有意义的错误提示。在 Spring Framework 中&#xff0c;我们可以通过全局和局部异常处理来优雅地管理异常。本文将结合 …