手写RPC框架,与Spring整合,基于Netty作为网络框架,protobuf作为序列化协议。可以和实际项目相结合完美运行

devtools/2024/9/24 8:58:17/

注:由于RPC框架过于庞大所以本篇文章只是作为阅读RPC源码的一个指导,设计精巧之处还需要各位读者结合源码进行实践

RPC源码地址:https://github.com/xhpcd/rpc

git clone: https://github.com/xhpcd/rpc.git

如果觉得有收获麻烦留下一颗start

先看效果

项目分为两部分一部分时rpc包下的用来实现rpc服务端和客户端逻辑,一部分是rpc-demo作为框架的测试,api是公共接口,公用的部分

服务端

@RpcService(interfaceClass = OrderService.class)
public class OrderServiceImpl implements OrderService {@Overridepublic String getOrder(String userId, String orderId) {return "user: "+userId+"orderId:"+orderId;}
}

服务端就是见到那一个springboot项目引入了rpc-server

客户端

@RestController
@RequestMapping("/order")
public class OrderController {@RpcRemoteprivate OrderService orderService;@GetMapping("/getOrder")public String getOrder(String userId,String orderId){return orderService.getOrder(userId,orderId);}}

使用自定义注解然后整合springboot同时引入rpc-client,接着就可以远程调用这里的getOrder接口

效果

先从服务架构演变开始

单体架构是最初的软件设计模式,所有的功能模块都集中在一个单独的应用程序中。这种架构的优点包括:

 单体架构的设计和部署相对简单,易于理解和维护。

 所有的功能模块集中在一起,方便统一管理和测试。

 由于模块间调用的开销较小,单体架构通常具有较好的性能。

但是,单体架构也存在一些缺点:

当应用程序规模增大时,单体架构会变得难以扩展和维护。

 各功能模块之间存在较强的耦合,不利于技术栈的灵活选择。

 对单体应用的任何变更都需要重新部署整个应用程序。

就算是水平扩展依然解决不了的问题就是模块热点问题,一个模块比较热点会导致其它模块受到性能的影响,因为毕竟是部署到同一台服务器

紧接着出现了垂直架构

了解决单体架构的问题,垂直架构应运而生。垂直架构将应用程序划分为多个相对独立的服务,每个服务都专注于特定的功能领域。这种架构的优点包括:

 各个服务可以独立扩展,提高了应用程序的整体可扩展性。

 每个服务可以选择最合适的技术栈,提高了技术选型的灵活性。

 只需部署变更的服务,而不需要重新部署整个应用程序。

但是,垂直架构也存在一些缺点:

各个服务之间需要通过网络进行通信,引入了额外的复杂性。

 服务之间的依赖关系需要仔细管理,以确保应用程序的正常运行。

分布式环境下,监控和故障排查变得更加复杂。

为了进一步解决垂直架构的问题,分布式架构应运而生。分布式架构将应用程序拆分为更细粒度的服务,每个服务都可以独立部署和扩展。这种架构的优点包括:

每个服务可以独立扩展,提高了应用程序的整体可扩展性。

每个服务可以选择最合适的技术栈,提高了技术选型的灵活性。

只需部署变更的服务,而不需要重新部署整个应用程序。

服务之间的故障可以相互隔离,提高了应用程序的可靠性。

但是,分布式架构也存在一些缺点:

服务之间需要通过网络进行通信,引入了额外的复杂性。

服务之间的依赖关系需要仔细管理,以确保应用程序的正常运行。

分布式环境下,监控和故障排查变得更加复杂。

微服务架构

微服务架构是分布式架构的一种特殊形式,它将应用程序拆分为更小、更专注的服务。这种架构的优点包括:

各个服务之间高度解耦,易于独立开发、部署和扩展。

每个服务可以选择最合适的技术栈,提高了技术选型的灵活性。

各个服务可以独立扩展,提高了应用程序的整体可扩展性。

 服务之间的故障可以相互隔离,提高了应用程序的可靠性。

RPC 作为一种常见的服务间通信机制,在从单体架构到微服务架构的演化过程中发挥了重要作用。RPC 在从单体架构到微服务架构的演化过程中,一直在解决服务间通信、服务发现和服务调用透明性等关键问题,为不同架构模式下的服务治理提供了重要支持。

正文:

RPC是远程调用的缩写,它使得客户端像调用本地方法一样去调用远端的接口

其本质就是通过把参数调用的接口等信息通过网络传递给远端服务,然后接受响应。

既然是远程调用就需要保证调用可靠

在分布式环境下,服务提供者的地址和端口信息可能会发生变化,服务消费者需要能够动态发现可用的服务实例。Zookeeper 提供了一个服务注册中心,服务提供者可以将自己的信息注册到 Zookeeper 上,服务消费者可以通过查询 Zookeeper 来发现可用的服务。这解决了服务发现的问题。

当有多个服务提供者实例时,RPC 系统需要能够将请求合理地分配到不同的实例上,以实现负载均衡。Zookeeper 可以根据服务实例的负载情况,为服务消费者提供一个负载均衡的服务实例列表。

RPC 系统通常需要一些全局性的配置信息,如超时时间、重试次数等。Zookeeper 可以作为配置中心,存储这些配置信息,服务提供者和消费者可以动态地从 Zookeeper 获取配置,避免了配置信息的硬编码。

通过zookeeper我们可以实现服务的协调管理

接下来要解决的就是网络通信的问题,本例子采用的是基于TCP协议的通信而不是基于Http的,基于TCP的通信我们就需要自己实现通信过程。所以选用的是Java高性能网络框架Netty(之前文章对其高性能做了分析)。

我们解决了网络和服务协调的问题后,我们还要指定一些规则来解决tcp流式传输的问题粘包粘包的问题,以及通信信息的格式。粘包问题有许多种解决方法这里就采用比较通用简单的方式

消息长度头:在每个数据包的头部添加一个长度字段,用于表示数据包的长度,接收方可以根据长度字段来判断每个数据包的结束位置。

接下来就要解决java数据在网络传输的过程了,其实可以采用java自带的序列化方法,但是相比较来说还是有点冗余了,所以采用google的protobuf序列化。

通用部分

为了通用我们把RPC过程需要的请求响应进行封装

public class RpcRequest {private String requestId;private String className;private String methodName;private Class<?>[] parameterTypes;private Object[] parameters;
}

public class RpcResponse {private String requestId;private Object result;private Throwable cause;public boolean hasError(){return cause!=null;}
}

我们首先先来完成rpc服务端

服务端实现类

@RpcService(interfaceClass = OrderService.class)
public class OrderServiceImpl implements OrderService {@Overridepublic String getOrder(String userId, String orderId) {return "user: "+userId+"orderId:"+orderId;}
}

首先先要完成的就是服务的注册所,谓服务的注册就是来向zk提供自己用于远程服务的信息

RpcService是我们的注解,这个注解的特殊之处在于其上整合了@Component注解可以作为bean被spring管理,然后同时也指明了intefaceClass作为zk的服务注册信息

作为服务提供类的实现我们想把其注入的容器中,我们同时还需要能在调用时获得到此类

首先我们封装了一个spring工具类利用Aware机制注入spring容器

@Component("springBeanFactory")
public class SpringBeanFactory implements ApplicationContextAware {private static ApplicationContext context;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.context = applicationContext;}public static <T> T getBean(Class<T> cls){return context.getBean(cls);}public static Object getBean(String name){return context.getBean(name);}public static Map<String,Object> getBeanByAnnotation(Class<? extends Annotation> annotaionClass){return context.getBeansWithAnnotation(annotaionClass);}
}
public class ZkRegistry implements RpcRegistry {@Autowiredprivate ServerZKit serverZKit;@Autowiredprivate RpcServerConfiguration rpcServerConfiguration;@Autowiredprivate RpcServer rpcServer;@Overridepublic void serviceRegistry() {Map<String, Object> beanByAnnotation = SpringBeanFactory.getBeanByAnnotation(RpcService.class);if(beanByAnnotation!=null&&!beanByAnnotation.isEmpty()) {//根节点的创建serverZKit.createRootNode();//ip获取String serverIp = IpUtils.getRealIp();for (Map.Entry<String, Object> entry : beanByAnnotation.entrySet()) {RpcService annotation = entry.getValue().getClass().getAnnotation(RpcService.class);Class<?> interfaceClass = annotation.interfaceClass();//服务名称String name = interfaceClass.getName();serverZKit.createPersistentNode(name);String providerNode = serverIp+":"+rpcServerConfiguration.getRpcPort();serverZKit.createNode(name+"/"+providerNode);log.info("服务{}-{}完成了注册",name,providerNode);}rpcServer.start();}}
}

这里我们完成了向zk存储信息接下来我们就来看一下整合spring部分

首先通过工具类获通过自定义注解得到spring容器中的用于提供服务的实现类,紧接着获得其接口信息,并把接口信息和端口号等信息(来自配置文件)封装作为节点名称保存在zk中

紧接着我们来看服务端通信关键部分

   protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) throws Exception {RpcResponse rpcResponse = new RpcResponse();rpcResponse.setRequestId(rpcRequest.getRequestId());try {String className = rpcRequest.getClassName();String methodName = rpcRequest.getMethodName();Object[] parameters = rpcRequest.getParameters();Class<?>[] parameterTypes = rpcRequest.getParameterTypes();//通过spring容器获取实现类Object bean = SpringBeanFactory.getBean(Class.forName(className));Method method = bean.getClass().getMethod(methodName, parameterTypes);Object result = method.invoke(bean,parameters);rpcResponse.setResult(result);} catch (Exception e){rpcResponse.setCause(e);log.error("RpcRequestHandler service has error");}finally {channelHandlerContext.channel().writeAndFlush(rpcResponse);}}

通过请求传递的接口,参数,参数类型等信息获得到我们注入到spring容器中的服务提供bean,然后通过反射调用对应方法进而得到结果进行封装,至于和netty的结合源码中有详细代码本文只做辅助阅读。

接下来我们来看客户端代理

作为代理我们需要在其字段上加上注解

@Documented
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcRemote {String value() default "";Class<?> interfaceClass() default void.class;}

作为我们的注入依据,为了和spring整合我们利用了spring bean的生命周期

@Component
public class RpcAnnotationProcessor implements BeanPostProcessor {@Autowiredprivate ProxyFactory proxyFactory;@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {Class<?> aClass = bean.getClass();Field[] declaredFields = aClass.getDeclaredFields();for (Field declaredField : declaredFields) {RpcRemote annotation = declaredField.getAnnotation(RpcRemote.class);if(annotation != null){declaredField.setAccessible(true);Class<?> type = declaredField.getType();Object o = proxyFactory.newProxyInstance(type);try {declaredField.set(bean,o);} catch (IllegalAccessException e) {log.error("filed {} inject field",declaredField);throw new RuntimeException(e);}}}return bean;}
}

我们通过遍历每一个bean 然后去遍历其所有的字段查找被RpcRemote标志的。然后通过cglib动态代理技术(之前文章有详细讲述)生成代理类封装了通信过程。然后利用反射进行了属性的注入

  public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {log.info("method:{} 执行代理调用",method.getName());RpcRequest request = RpcRequest.builder().parameters(objects).parameterTypes(method.getParameterTypes()).className(method.getDeclaringClass().getName()).methodName(method.getName()).requestId(UUID.randomUUID().toString()).build();RpcRequestManager rpcRequestManager = SpringBeanFactory.getBean(RpcRequestManager.class);RpcResponse response = rpcRequestManager.sendRequest(request);if(response.hasError()){throw response.getCause();}return response.getResult() ;}

把请求封装然后通过Netty进行传输。在进行传输之前我们要面对一个问题就是传输给谁。

我们此时可以利用zk获得到远程服务提供者的ip和端口。这时我们就需要进行服务的发现,同时还要监听节点,因为服务端可能出现一些服务器上下线的变化,然后我们把信息缓存起来

   public void rpcServerDiscovery() {List<String> service = clientZKit.getService();for (String s : service) {List<ServiceProvider> serviceInfos = clientZKit.getServiceInfos(s);serviceProviderCache.put(s,serviceInfos);clientZKit.subscribeZKNode(s);log.info("client subscribe {},services{}",s,serviceInfos);}}public void subscribeZKNode(String name){String node = configuration.getZkRoot() + "/" + name;zkClient.subscribeChildChanges(node, new IZkChildListener() {@Overridepublic void handleChildChange(String s, List<String> list) throws Exception {if(!CollectionUtils.isEmpty(list)){List<ServiceProvider> serviceProviders = convertToProviderService(s, list);serviceProviderCache.update(s,serviceProviders);}}});}

这时拿到了服务提供者,我们在面对众多提供者的时候又有问题了那就是如何选择有许多种方法比如说轮询,随机,hash等我在代码中都有实现可以对看一下。

接下来看一下Netty如何建立连接进行通讯

 Channel channel;if(!RpcHolder.channelExist(serviceProvider.getServerIp(),serviceProvider.getRcpPort())){NioEventLoopGroup group = new NioEventLoopGroup(NettyRuntime.availableProcessors()*2);Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast("FrameDecoder",new FrameDecoder());pipeline.addLast("RpcResponseDecoder",new RpcResponseDecoder());pipeline.addLast("FrameEncoder",new FrameEncoder());pipeline.addLast("RpcRequestEncoder",new RpcRequestEncoder());pipeline.addLast("RpcResponseHandler",new RpcResponseHandler());}});try {ChannelFuture future = bootstrap.connect(new InetSocketAddress(serviceProvider.getServerIp(), serviceProvider.getRcpPort())).sync();if (future.isSuccess()) {channel = future.channel();RpcHolder.setChannelMapping(new ChannelMapping(serviceProvider.getServerIp(), serviceProvider.getRcpPort(), channel));}}catch (Exception e){group.shutdownGracefully();}}try {ChannelMapping channelMapping = RpcHolder.getChannelMapping(serviceProvider.getServerIp(), serviceProvider.getRcpPort());channel = channelMapping.getChannel();channel.writeAndFlush(request);RequestPromise requestPromise = new RequestPromise(channel.eventLoop());RpcHolder.set(request.getRequestId(),requestPromise);RpcResponse rpcResponse = (RpcResponse) requestPromise.get();return rpcResponse;}catch (Exception e){e.printStackTrace();}return new RpcResponse();}

这里就涉及到一个线程间通信问题了,Netty有自己的线程,所以为了共享结果我们使用了promise,但是由于Netty可能有许多人调用所以我们根据请求Id和对应的promise进行了hash存储,这样就可以在众多并发中找到对应的promise。


http://www.ppmy.cn/devtools/87958.html

相关文章

学会网络安全:开启广阔职业与责任之旅

在数字化时代&#xff0c;网络安全已成为社会经济发展的重要基石。随着互联网的普及和技术的飞速发展&#xff0c;网络安全威胁日益复杂多变&#xff0c;对国家安全、社会稳定以及个人隐私构成了严峻挑战。因此&#xff0c;掌握网络安全技能不仅意味着拥有了一项高价值的职业技…

【小知识】站在前人的肩膀上写程序——STL库初阶算法函数的使用

【小知识】站在前人的肩膀上写程序——STL库初阶算法函数的使用 1.墨水瓶算法和swap函数2.打擂台算法和max&#xff0c;min函数3.排序——sort函数 1.墨水瓶算法和swap函数 如果想交换两个墨水瓶的墨水该怎么办呢&#xff1f;我们可以准备第三个墨水瓶。将第一个墨水瓶的墨水倒…

Docker镜像仓库

目录 前言 1. 常见的镜像仓库 2. 搭建私有镜像仓库 3. 私有库的推送、拉取镜像 4. 总结 前言 Docker镜像仓库简单来说就是存储和管理Docker镜像的平台或服务。它允许开发人员上传自己创建的镜像&#xff0c;并与团队成员共享和协作使用。 1. 常见的镜像仓库 镜像仓库有公…

PADS Router 扇出失败问题详细解决方法。

第一步&#xff1a;确定单位是一致的,我的单位是 “密尔”&#xff0c;不是“公制”。 第二步&#xff1a;进去pads router 右键选择特性&#xff0c;注意&#xff0c;是右键点击任意板框内空白位置的特性&#xff0c;这个是涵盖整体的设置&#xff0c;和单独点击一个元器件选…

基于docker的 nacos安装部署

一、拉取镜像 拉取nacos官方镜像&#xff0c;这里使用默认命令 docker pull nacos/nacos-server二、创建挂载目录 创建本地的映射文件application.properties mkdir -p /home/docker/nacos/conf /home/docker/nacos/logstouch /home/docker/nacos/conf/application.propert…

SQL Server数据库的黑匣子:实现自定义日志记录

SQL Server数据库的黑匣子&#xff1a;实现自定义日志记录 在数据库管理中&#xff0c;日志记录是监控和审计数据库活动的重要手段。SQL Server提供了多种日志记录机制&#xff0c;但有时这些默认的日志记录可能无法满足特定的业务需求。在这种情况下&#xff0c;实现数据库的…

【Docker项目实战】使用Docker部署轻量级Markdown文本编辑器

【【Docker项目实战】使用Docker部署轻量级Markdown文本编辑器 一、项目介绍1.1 项目简介1.2 使用方法二、本次实践介绍2.1 本地环境规划2.2 本次实践介绍三、本地环境检查3.1 安装Docker环境3.2 检查Docker服务状态3.3 检查Docker版本3.4 检查docker compose 版本四、拉取容器…

DUILib 创建自定义文本编辑控件

DUILib 是一个非常灵活的用户界面库&#xff0c;它允许我们轻松地创建和自定义各种控件。在这篇博客中&#xff0c;我们将详细解释一个自定义文本编辑控件的各个属性&#xff0c;并展示如何使用 DUILib 进行设置。 <!-- 文本编辑控件 --> <Edit pos"92,426,0,0&…