手写RPC框架04-过滤器模块实现

news/2024/10/27 19:29:33/

源代码地址:https://github.com/lhj502819/IRpc/tree/v5

系列文章:

  • 注册中心模块实现
  • 路由模块实现
  • 序列化模块实现
  • 过滤器模块实现
  • 自定义SPI机制增加框架的扩展性的设计与实现

为什么需要过滤器?

目前整个RPC框架的功能基本已经齐全了,但是在实际的开发过程中我们可能会有如下的需求:

  • 对client的请求做鉴权
  • 对服务进行分组管理
  • 记录请求日志
  • 基于IP的请求直连

首先我们先对这几个需求进行简单解释,只有弄清楚需求的来源才能更好的去理解、设计和实现。

对client的请求做鉴权

随着业务的不断发展,服务的种类变得越来越丰富,有些重要的操作可能安全性比较高,需要进行鉴权,因此需要在RPC框架中增加对鉴权的支持。
在这里插入图片描述

对服务进行分组管理

在进行团队协作或者服务升级的时候,可能会遇到需要对Service Provider进行分组,比如分为V1、V2,方便进行流量的划分,当V2版本出现问题时,我们只需要将所有的调用调整为V1即可,同时也可以进行迭代升级,以免出现ALL IN时升级的功能出现问题导致整个系统不可用。.
在这里插入图片描述

基于IP的请求直连

在测试和联调阶段比较常见,例如在服务部署之后,发现两个provider对相同的服务,相同的参数,返回的结果却不同,此时就可以通过指定IP进行直连,方便问题定位。
在这里插入图片描述

记录请求日志

在实际的业务中我们在进行服务调用的时候需要做一些日志埋点,对调用信息进行记录,方便进行问题的排查

以上的这些处理其实就是一个链条,我们仅需要将这些功能按照顺序插入到整个链条中,并在适当的位置执行整个链条即可,而这些一个个的功能,则类似于我们常见的过滤器一样。

如何实现?

过滤器的实现一般都会基于责任链设计模式去设计,在目前比较流行的API网关SprngCloudGateway中也有类似的实现。
在这里插入图片描述

我曾经对SCG的2.2.6版本源码进行过解析,感兴趣的小伙伴可前往查看,地址:https://www.yuque.com/lihongjian/gui608
在我们的RPC框架中也采用类似的方式,只不过进行了简单的变形。我们首先定义了过滤器标记接口:IFilter

public interface IFilter {
}

由于我们的过滤器分为Client和Server两端使用的,因此分别抽象出IClinetFilterIServerFilter

public interface IServerFilter extends IFilter {void doFilter(RpcInvocation rpcInvocation);}public interface IClientFilter extends IFilter {void doFilter(List<ChannelFutureWrapper> src, RpcInvocation rpcInvocation);}

服务分组过滤器

在Client发起调用时,我们将分组信息存储到了attachements中,是一个Map结构。

public class ClientGroupFilterImpl implements IClientFilter{@Overridepublic void doFilter(List<ChannelFutureWrapper> src, RpcInvocation rpcInvocation) {String group = (String) rpcInvocation.getAttachments().get("group");if (StrUtil.isBlank(group)){return;}Iterator<ChannelFutureWrapper> iterator = src.iterator();while (iterator.hasNext()) {ChannelFutureWrapper channelFutureWrapper = iterator.next();if (!channelFutureWrapper.getGroup().equals(group)){iterator.remove();}}if (CollectionUtil.isEmpty(src)){throw new RuntimeException("no provider match for group " + group);}}
}

IP直连过滤器

与“服务分组过滤器”一样,Client会在发起调用时将请求的ip存储到attachments中。

public class DirectInvokeFilterImpl implements IClientFilter {@Overridepublic void doFilter(List<ChannelFutureWrapper> src, RpcInvocation rpcInvocation) {String url = (String) rpcInvocation.getAttachments().get("url");if (StrUtil.isBlank(url)) {return;}Iterator<ChannelFutureWrapper> iterator = src.iterator();while (iterator.hasNext()) {ChannelFutureWrapper channelFutureWrapper = iterator.next();if (!(channelFutureWrapper.getHost() + ":" + channelFutureWrapper.getPort()).equals(url)) {iterator.remove();}if (CollectionUtil.isEmpty(src)) {throw new RuntimeException("no match for url:" + url);}}}
}

Token校验过滤器

Client发起调用时会将token放入attachments中,Server端在过滤器中会拿到本次请求的token和内存中对应服务的token进行比对。

public class ServerTokenFilterImpl implements IServerFilter {@Overridepublic void doFilter(RpcInvocation rpcInvocation) {String token = (String) rpcInvocation.getAttachments().get("token");ServiceWrapper serviceWrapper = PROVIDER_SERVICE_WRAPPER_MAP.get(rpcInvocation.getTargetServiceName());if (serviceWrapper == null) {return;}String matchToken = serviceWrapper.getServiceToken();if (StrUtil.isBlank(matchToken)) {return;}if (StrUtil.isNotBlank(token) && matchToken.equals(token)) {return;}throw new RuntimeException("token is " + token + " verify result is false!");}
}

过滤器链FilterChain

过滤器链主要是负责将所有的过滤器按照一定顺序串起来。与过滤器类似,过滤器链也需要分为Client和Server端。

ServerFilterChain

public class ServerFilterChain {private static List<IServerFilter> iServerFilters = new ArrayList<>();public void addServerFilter(IServerFilter serverFilter){iServerFilters.add(serverFilter);}public void doFilter(RpcInvocation rpcInvocation){for (IServerFilter iServerFilter : iServerFilters) {iServerFilter.doFilter(rpcInvocation);}}}

ClientFilterChain

public class ClientFilterChain {private static List<IClientFilter> iClientFilters = new ArrayList<>();public void addServerFilter(IClientFilter clientFilter) {iClientFilters.add(clientFilter);}public void doFilter(List<ChannelFutureWrapper> src, RpcInvocation rpcInvocation) {for (IClientFilter iClientFilter : iClientFilters) {iClientFilter.doFilter(src, rpcInvocation);}}}

RPC框架接入

Client端接入

由于像服务分组过滤器、IP直连过滤器都需要根据指定的规则选择出对应的Provider,因此我们将执行过滤链条的逻辑插入在cn.onenine.irpc.framework.core.client.ConnectionHandler#getChannelFuture中。

/*** 默认走随机策略获取ChannelFuture*/
public static ChannelFuture getChannelFuture(RpcInvocation rpcInvocation) {ChannelFutureWrapper[] channelFutureWrappers = SERVICE_ROUTER_MAP.get(rpcInvocation.getTargetServiceName());if (channelFutureWrappers == null || channelFutureWrappers.length == 0) {throw new RuntimeException("no provider exist for " + rpcInvocation.getTargetServiceName());}//doFilterCLIENT_FILTER_CHAIN.doFilter(Lists.newArrayList(channelFutureWrappers), rpcInvocation);Selector selector = new Selector();selector.setProviderServiceName(rpcInvocation.getTargetServiceName());selector.setChannelFutureWrappers(channelFutureWrappers);//通过指定的路由算法选择一个Provider ChannelFuturereturn IROUTER.select(selector).getChannelFuture();
}

Server端接入

在初始化时按照指定顺序将所有的Filter插入到FilterChain中,在接收到请求后执行整个链条即可,也就是ChannelInboundHandlerAdapter#channelRead

public class ServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//服务端接收数据的时候以RpcProtocol协议的格式接收RpcProtocol rpcProtocol = (RpcProtocol) msg;RpcInvocation rpcInvocation = SERVER_SERIALIZE_FACTORY.deserialize(rpcProtocol.getContent(),RpcInvocation.class);//doFilterSERVER_FILTER_CHAIN.doFilter(rpcInvocation);//省略部分代码......rpcInvocation.setResponse(result);RpcProtocol respRpcProtocol = new RpcProtocol(SERVER_SERIALIZE_FACTORY.serialize(rpcInvocation));ctx.writeAndFlush(respRpcProtocol);}}

总结

本版本我们基于责任链模式完成了对RPC框架中流程化功能的整合,这些零零散散的功能我们通过过滤器的方式进行了实现,比如服务的分组选择、IP直连、Token统一校验等,减少了各个模块间的耦合性,如果需要扩展新的过滤器,只需要实现Client或者Server对应的接口即可。

问题

到目前版本,我们的RPC框架实现了注册中心模块、路由选择模块、序列化模块、过滤器模块,我们基本都提供了对应的抽象接口,使用者可以进行扩展,但目前想要扩展的话,比如自定义一个过滤器接入到框架中,还需要对源码进行调整,重新编译才可以,不太方便,后续我们会通过SPI的方式来解决这个问题。


http://www.ppmy.cn/news/9797.html

相关文章

【蓝桥杯基础题】2021年省赛填空题—卡片

&#x1f451;专栏内容&#xff1a;蓝桥杯刷题⛪个人主页&#xff1a;子夜的星的主页&#x1f495;座右铭&#xff1a;前路未远&#xff0c;步履不停 目录一、题目背景二、题目描述三、题目分析1.检查思路2.思路优化四、代码汇总1.C语言代码2. C代码3.运行结果五、总结1.枚举思…

美团开放平台SDK自动生成技术与实践

总第549篇2023年 第001篇美团开放平台为整个美团提供了20业务场景的开放API&#xff0c;为了使开发者能够快速且安全的接入美团开放平台&#xff0c;美团开放平台提供了多种语言的SDK来提高开发者的接入效率。本文介绍了美团开放平台如何自动生成SDK代码的相关技术实现方案&…

GO——函数(二)

函数匿名函数可变参数Deferred函数Panic异常Recover捕获异常匿名函数 拥有函数名的函数只能在包级语法块中被声明&#xff0c;通过函数字面量&#xff0c;我们可绕过这一限制&#xff0c;在任何表达式中表示一个函数值。 函数字面量的语法和函数声明相似&#xff0c;区别在于…

商用密码安全性评估

商用密码应用安全性评估&#xff08;简称“密评”&#xff09;指在采用商用密码技术、产品和服务集成建设的网络和信息系统中&#xff0c;对其密码应用的合规性、正确性和有效性等进行评估。01办理依据 GM/T0054-2018《信息系统密码应用基本要求》 《信息系统密码测评要求&…

阻塞式队列

文章目录一、阻塞队列阻塞队列的概念阻塞队列相关类和方法生产者消费者模型二、自定义实现阻塞队列自定义实现循环队列自定义实现阻塞队列生产者消费者模型体验一、阻塞队列 阻塞队列的概念 队列我们并不默认&#xff0c;一提起队列&#xff0c;我们立马就能想到 "先进先…

实验二十一 配置NAT

实验二十一 配置NAT实验要求&#xff1a; 静态NAT: 在Router的公网侧接口GE0/0/1下配置静态NAT&#xff0c;将私有 IP地址 192.168.0.2与公有IP地址202.10.1.3绑定起来。 NAT SERVER的配置 动态NAT和easy IP的配置网络拓扑图&#xff1a;操作步骤&#xff1a;一、静态NAT1、配置…

用纯C实现单链表

前言 什么是单链表&#xff1f;链表是一种物理存储结构上非连续、非顺序的存储结构&#xff0c;数据元素的逻辑顺序是通过链表中的指针链接次序实现的 。链表的创建 需要创建一个小项目工程 创建三个文件 ⭐SListNode.h放单链表的头文件&#xff0c;函数声明 ⭐SListNode.c放单…

微信公众号调用扫一扫功能

手把手教你调用微信扫一扫&#xff0c;三分钟包会_前端人的博客-CSDN博客_调用微信扫一扫 第一次搞公众号&#xff0c;还以为跟上回调用企业微信扫一扫一样。。。调起扫一扫功能的过程自然是不同的&#xff0c;要注意的地方还挺多&#xff0c;记录一下 。 其实&#xff0c;在使…