RPC(3)--基于 Nacos 的服务发现与负载均衡版

news/2024/11/14 15:41:02/

nacos:提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。Nacos 是构建以“服务”为中心的现代应用架构 (例如微服务范式、云原生范式) 的服务基础设施。

nacos架构如下(图片来源)
在这里插入图片描述
依赖包:

<dependency><groupId>com.alibaba.nacos</groupId><artifactId>nacos-client</artifactId><version>1.3.0</version></dependency>

使用如下:

//创建命名服务
NamingService namingService = NamingFactory.createNamingService("127.0.0.1:10058");
//服务提供方:注册服务
namingService.registerInstance(serviceName, address.getHostName(), address.getPort());
//服务消费方:获取服务
namingService.getAllInstances(serviceName);

整体架构

在这里插入图片描述

过程详解

服务注册和服务发现

调用封装在NacosUtil工具类中,该变量定义了静态域来获取namingService(服务端和消费端同步),实现了注册服务和服务发现以及注销服务三个静态方法。
NacosServiceDiscovery类提供了服务发现,以及根据负载均衡算法选择服务接口
ServiceProvider:默认的服务注册表,保存服务端本地服务,一个Map<serviceName,service>,一个Set<serviceName>
+++±—addServiceProvider()
+++±—getServiceProvider()

Server启动

  1. 创建接口实现类
  2. 创建server类,host+port+serializer
  3. 发布服务:1. 将服务,host和port注册到nacos。2. 将服务和服务类保存到本地注册表ServiceProvoder中。
  4. 启动netty的channel监听端口:1. 序列化和反序列化handler使用自己定义的协议:魔数+请求码(request/response)+序列化码+序列化数据长度和byte数据。2. NettyServerHandler得到request对象,并在ServiceProvoder注册表中查找调用服务,通过ChannelHandlerContext.writeAndFlush(response)返回结果。3. response经过序列化返回

Client启动

  1. 创建客户端client,指定序列化算法,负载均衡策略,
  2. 创建代理对象,参数为client(在invoke中调用client的sendRequest方法)
  3. 拿到服务接口的代理对象,调用方法hello,执行invoke方法(封装request),执行sendRequest,等待response返回。
  4. 通过namingService.getAllInstances(serviceName)拿到nacos对应服务host+port获取Channel连接server,发送请求数据:1. NettyClientHandler,保存response。

优化

心跳

参考连接
相比于上个版本,本版使用netty提供的IdleStateHandler心跳机制,Netty在应用层实现心跳机制(TCP协议层也有Keeplive机制,Netty中添加ChannelOption.SO_KEEPALIVE, true)的option。

  1. 服务端
    服务端添加IdleStateHandler心跳检测处理器,并添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理;

IdleStateHandler 心跳检测机制,30秒内ChannelRead()方法未被调用则触发一次userEventTrigger()方法

ChannelPipeline pipeline = ch.pipeline();//IdleStateHandler 心跳检测机制,30秒内ChannelRead()方法未被调用则触发一次userEventTrigger()方法pipeline.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS)).addLast(new CommonEncoder(serializer)).addLast(new CommonDecoder()).addLast(new NettyServerHandler());
@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleState state = ((IdleStateEvent) evt).state();if (state == IdleState.READER_IDLE) {logger.info("长时间未收到心跳包,断开连接...");ctx.close();}} else {super.userEventTriggered(ctx, evt);}}
  1. 客户端

客户端添加IdleStateHandler心跳检测处理器,并添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理;

设定IdleStateHandler心跳检测每5秒进行一次写检测,如果四秒内write()方法未被调用则触发一次userEventTrigger()方法,实现客户端每四秒向服务端发送一次消息;

ch.pipeline().addLast(new CommonEncoder(serializer)).addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS)).addLast(new CommonDecoder()).addLast(new NettyClientHandler());
@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleState state = ((IdleStateEvent) evt).state();if (state == IdleState.WRITER_IDLE) {logger.info("发送心跳包 [{}]", ctx.channel().remoteAddress());Channel channel = ChannelProvider.get((InetSocketAddress) ctx.channel().remoteAddress(), CommonSerializer.getByCode(CommonSerializer.DEFAULT_SERIALIZER));RpcRequest rpcRequest = new RpcRequest();rpcRequest.setHeartBeat(true);channel.writeAndFlush(rpcRequest).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);}} else {super.userEventTriggered(ctx, evt);}}

服用Channel

消费端如采用 Netty 方式,会复用 Channel 避免多次连接

封装ChannelProvider类,使用map保存已经创建的channel,key为inetSocketAddress.toString() + serializer.getCode(),value为channel。

同一个channel的异步

客户端拿到服务端的结果,最开始的时候是通过 AttributeMap 绑定到Channel上实现的,客户端发送数据之后就阻塞等待。
在这里插入图片描述

使用 CompletableFuture 进行优化
创建一个unprocessedRequests的map来保存未处理的request,key为requestId,value为future。

在请求发出之前,先将对应的future入map,然后在客户端接受handler更新数据(complete(response)),然后sendRequest返回该future,此时future已经有值,调用get方法即可拿到。这样就不用阻塞等一个请求结果返回之后,再发出下一个请求了。


http://www.ppmy.cn/news/27493.html

相关文章

力扣-文章浏览

大家好&#xff0c;我是空空star&#xff0c;本篇带大家了解一道简单的力扣sql练习题。 文章目录前言一、题目&#xff1a;1148. 文章浏览二、解题1.正确示范①提交SQL运行结果2.正确示范②提交SQL运行结果3.正确示范③提交SQL运行结果4.其他总结前言 一、题目&#xff1a;1148…

扬帆优配|反弹涨超70%,昨收三连板,稀土行业或迎大事件

本年第一批稀土挖掘锻炼目标行将发放。 2月22日晚&#xff0c;东易日盛公告称&#xff0c;公司收到董事、副总经理兼财务总监李双侠出具的《关于未严格执行股份减持方案的致歉函》&#xff0c;其此次减持方案已施行结束&#xff0c;但在施行减持方案时&#xff0c;因操作失误&a…

【性能测试】loadrunner(一)知识准备

【性能测试】loadrunner&#xff08;一&#xff09;知识准备 目录&#xff1a;导读 1.0. 前言 1.1 性能测试术语介绍 1.2 性能测试分类 1.3 HTTP我们需要知道的 1.4 Loadrunner 12.55安装 1.0. 前言 ​ 在性能测试中&#xff0c;牵扯到了许多比较杂的知识点&#xff0c;…

【python】考前复习,python基础语法知识点整理

文章目录1.常量与表达式2.变量和数据类型创建变量数据类型动态类型数据类型的转换3.注释4.字符串字符串的定义方式字符串的拼接字符串的格式化①字符串格式化的精度控制字符串的格式化②对表达式进行格式化5.从控制台输入(input)6.运算符算术运算符赋值运算符布尔类型和比较运算…

深入浅出 MySQL 索引(一)

MySQL 索引&#xff08;基础篇&#xff09; 你好&#xff0c;我是悟空。 本文目录如下&#xff1a; 一、前言 最近在梳理 MySQL 核心知识&#xff0c;刚好梳理到了 MySQL 索引相关的知识&#xff0c;我的文章风格很多都是原理 实战的方式带你去了解知识点&#xff0c;所以…

Nordic nRF芯片FDS模块学习

FDS系统学习 文章目录FDS系统学习一、ROM&#xff0c;RAM&#xff0c;FLASH作用二、ROM,RAM和FLASH在单片中的运作原理三、Flash访问模块FDS用法1. FDS在sdk_config.h中的配置2. fds_register()注册3. fds_record_write()写记录4. fds_record_find()查找5. fds_record_open()读…

秒杀服务------技术点及亮点

大技术使用Redisson使用Redisson在秒杀服务中有两个作用&#xff0c;一个是作为分布式锁来确保多个秒杀服务同时在线时同时上架秒杀商品&#xff0c;只允许有一个秒杀服务成功上架秒杀商品&#xff0c;其他的上架失败。第二个作用是作为分布式信号量&#xff0c;每个秒杀商品在…

通过一张照片来定位拍摄地点和网站的域名 LA CTF 2023

简介 这次打ctf遇到了一个比较经典的osint类题目&#xff0c;在这里分享一下如何做此类题目 题目链接&#xff1a; https://platform.lac.tf/challs题目简介&#xff1a; 你能猜出这个猫天堂的名字吗&#xff1f;答案是此位置的网站域。例如&#xff0c;如果答案是 ucla&…