nacos:提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。Nacos 是构建以“服务”为中心的现代应用架构 (例如微服务范式、云原生范式) 的服务基础设施。
nacos架构如下(图片来源)
依赖包:
<dependency><groupId>com.alibaba.nacos</groupId><artifactId>nacos-client</artifactId><version>1.3.0</version></dependency>
使用如下:
//创建命名服务
NamingService namingService = NamingFactory.createNamingService("127.0.0.1:10058");
//服务提供方:注册服务
namingService.registerInstance(serviceName, address.getHostName(), address.getPort());
//服务消费方:获取服务
namingService.getAllInstances(serviceName);
整体架构
过程详解
服务注册和服务发现
调用封装在NacosUtil工具类中,该变量定义了静态域来获取namingService(服务端和消费端同步),实现了注册服务和服务发现以及注销服务三个静态方法。
NacosServiceDiscovery类提供了服务发现,以及根据负载均衡算法选择服务接口
ServiceProvider:默认的服务注册表,保存服务端本地服务,一个Map<serviceName,service>,一个Set<serviceName>
+++±—addServiceProvider()
+++±—getServiceProvider()
Server启动
- 创建接口实现类
- 创建server类,host+port+serializer
- 发布服务:1. 将服务,host和port注册到nacos。2. 将服务和服务类保存到本地注册表ServiceProvoder中。
- 启动netty的channel监听端口:1. 序列化和反序列化handler使用自己定义的协议:魔数+请求码(request/response)+序列化码+序列化数据长度和byte数据。2. NettyServerHandler得到request对象,并在ServiceProvoder注册表中查找调用服务,通过ChannelHandlerContext.writeAndFlush(response)返回结果。3. response经过序列化返回
Client启动
- 创建客户端client,指定序列化算法,负载均衡策略,
- 创建代理对象,参数为client(在invoke中调用client的sendRequest方法)
- 拿到服务接口的代理对象,调用方法hello,执行invoke方法(封装request),执行sendRequest,等待response返回。
- 通过namingService.getAllInstances(serviceName)拿到nacos对应服务host+port获取Channel连接server,发送请求数据:1. NettyClientHandler,保存response。
优化
心跳
参考连接
相比于上个版本,本版使用netty提供的IdleStateHandler心跳机制,Netty在应用层实现心跳机制(TCP协议层也有Keeplive机制,Netty中添加ChannelOption.SO_KEEPALIVE, true)的option。
- 服务端
服务端添加IdleStateHandler心跳检测处理器,并添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理;
IdleStateHandler 心跳检测机制,30秒内ChannelRead()方法未被调用则触发一次userEventTrigger()方法
ChannelPipeline pipeline = ch.pipeline();//IdleStateHandler 心跳检测机制,30秒内ChannelRead()方法未被调用则触发一次userEventTrigger()方法pipeline.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS)).addLast(new CommonEncoder(serializer)).addLast(new CommonDecoder()).addLast(new NettyServerHandler());
@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleState state = ((IdleStateEvent) evt).state();if (state == IdleState.READER_IDLE) {logger.info("长时间未收到心跳包,断开连接...");ctx.close();}} else {super.userEventTriggered(ctx, evt);}}
- 客户端
客户端添加IdleStateHandler心跳检测处理器,并添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理;
设定IdleStateHandler心跳检测每5秒进行一次写检测,如果四秒内write()方法未被调用则触发一次userEventTrigger()方法,实现客户端每四秒向服务端发送一次消息;
ch.pipeline().addLast(new CommonEncoder(serializer)).addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS)).addLast(new CommonDecoder()).addLast(new NettyClientHandler());
@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleState state = ((IdleStateEvent) evt).state();if (state == IdleState.WRITER_IDLE) {logger.info("发送心跳包 [{}]", ctx.channel().remoteAddress());Channel channel = ChannelProvider.get((InetSocketAddress) ctx.channel().remoteAddress(), CommonSerializer.getByCode(CommonSerializer.DEFAULT_SERIALIZER));RpcRequest rpcRequest = new RpcRequest();rpcRequest.setHeartBeat(true);channel.writeAndFlush(rpcRequest).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);}} else {super.userEventTriggered(ctx, evt);}}
服用Channel
消费端如采用 Netty 方式,会复用 Channel 避免多次连接
封装ChannelProvider类,使用map保存已经创建的channel,key为inetSocketAddress.toString() + serializer.getCode(),value为channel。
同一个channel的异步
客户端拿到服务端的结果,最开始的时候是通过 AttributeMap 绑定到Channel上实现的,客户端发送数据之后就阻塞等待。
使用 CompletableFuture 进行优化
创建一个unprocessedRequests的map来保存未处理的request,key为requestId,value为future。
在请求发出之前,先将对应的future入map,然后在客户端接受handler更新数据(complete(response)),然后sendRequest返回该future,此时future已经有值,调用get方法即可拿到。这样就不用阻塞等一个请求结果返回之后,再发出下一个请求了。