【dubbo triple provider 底层流转】

news/2024/12/22 1:12:29/

一、maven依赖

<dependency><groupId>io.netty</groupId><artifactId>netty-codec-http2</artifactId><version>4.1.90.Final</version>
</dependency><dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo</artifactId><version>3.1.8</version><!--  <version>3.2.2</version>-->
</dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.7.0</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions>
</dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.1.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.3.0</version>
</dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-x-discovery</artifactId><version>5.3.0</version>
</dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>4.0.0-rc-2</version>
</dependency>

二、demo代码

public interface TestTripleService {String sayHello(String name);
}public class TestTripleServiceImpl implements TestTripleService{@Overridepublic String sayHello(String name) {System.out.println(Thread.currentThread().getName() + " call :" + name);return "hello :" + name;}
}public class TripleConsumer {public static void main(String[] args) throws IOException {System.setProperty("dubbo.application.logger", "slf4j");ReferenceConfig<TestTripleService> ref = new ReferenceConfig<>();ref.setInterface(TestTripleService.class);//ref.setCheck(false);ref.setProtocol(CommonConstants.TRIPLE);// ref.setLazy(true);ref.setTimeout(100000);ref.setApplication(new ApplicationConfig("triple-consumer"));ref.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2188"));final TestTripleService tripleService = ref.get();System.out.println("dubbo ref started");String result =  tripleService.sayHello("123");System.out.println(Thread.currentThread().getName() + " result :" + result);}
}public class TripleProvider {public static void main(String[] args) throws InterruptedException {System.setProperty("dubbo.application.logger", "slf4j");ServiceConfig<TestTripleService> service = new ServiceConfig<>();service.setInterface(TestTripleService.class);service.setRef(new TestTripleServiceImpl());// 这里需要显示声明使用的协议为tripleservice.setProtocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051));service.setApplication(new ApplicationConfig("triple-provider"));service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2188"));service.export();System.out.println("dubbo service started");new CountDownLatch(1).await();}
}

三、其他相关配置

1、 jdk使用1.8

2、zookeeper使用3.7.1 【适配java jar版本】

四、调试问题

1、 一开始使用dubbo3.2.2的版本 provider 可以 正常启动,但是consumer启动报缺少一个类,发现的是dubbo自已的maven依赖冲突了, 优先读取了相同签名但是有缺陷的类

10:40:02.675 [main] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@62628e78
10:40:02.772 [NettyClientWorker-6-1] WARN io.netty.channel.ChannelInitializer - Failed to initialize a channel. Closing: [id: 0x961f8cf7]
java.lang.NoSuchMethodError: io.netty.handler.codec.http2.Http2FrameCodecBuilder: method <init>()V not foundat org.apache.dubbo.rpc.protocol.tri.TripleHttp2FrameCodecBuilder.<init>(TripleHttp2FrameCodecBuilder.java:32)at org.apache.dubbo.rpc.protocol.tri.TripleHttp2FrameCodecBuilder.fromConnection(TripleHttp2FrameCodecBuilder.java:37)at org.apache.dubbo.rpc.protocol.tri.TripleHttp2FrameCodecBuilder.forClient(TripleHttp2FrameCodecBuilder.java:45)

2、 换成3.1.8 版本之后, consumer 和 provider 都正常启动了,但是consumer这边拿不到provider的结果,观察日志发现,provider接收到了consumer的请求,并进行了日志的打印,但是没有走到最后的invoker进行调用,追踪代码调用发现,调用过程中会有一个异常抛出 【找不到类 com.google.protobuf.Message】这个像是依赖进行下载的时候没下载到,而且日志里面没有打印出这个异常,后续加上相关的依赖后,可以拿到provider的相关结果。

<dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>4.0.0-rc-2</version>
</dependency>

五、triple底层调用分析

1、入口类 TripleProtocol

2、服务暴露的相关流程

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {pathResolver.add(url.getServiceKey(), invoker);pathResolver.add(url.getServiceModel().getServiceModel().getInterfaceName(), invoker);PortUnificationExchanger.bind(url, new DefaultPuHandler());
}
  • pathResolver 收集暴露的服务,后续进行使用
  • PortUnificationExchanger 开启端口进行监听 (和 dubbo协议类似)
org.apache.dubbo.remoting.transport.netty4.NettyPortUnificationServer#doOpen() {bootstrap.group(bossGroup, workerGroup).channel(NettyEventLoopFactory.serverSocketChannelClass()).option(ChannelOption.SO_REUSEADDR, Boolean.TRUE).childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// Do not add idle state handler here, because it should be added in the protocol handler.final ChannelPipeline p = ch.pipeline();final NettyPortUnificationServerHandler puHandler;puHandler = new NettyPortUnificationServerHandler(getUrl(), sslContext, true, getProtocols(),NettyPortUnificationServer.this, NettyPortUnificationServer.this.dubboChannels,getSupportedUrls(), getSupportedHandlers());p.addLast("negotiation-protocol", puHandler);}}); 
}

这里依然使用netty进行请求处理后续大量的异步都是依赖 NettyPortUnificationServerHandler 这个类展开的。

3、provider处理请求的相应流程

1、 channelHandler配置

到达TripleHttp2FrameServerHandler之后,就开始准备进行service的调用了,trip使用的是 grpc 和 http2的协议 传过来的报文 分为两种类型 请求头,和请求体,所以这里相应的有两部部分的处理逻辑到这一步还是使用的 netty的 handler链是由netty进行的调用

2、请求头的报文数据

3、请求体的报文数据

4、调用实现

拿到请求请求头和请求体后,基本就可进行实际的方法调用了,这边在拿到请求头和请求体时会进行不同的操作,而且都是以异步的方式进行了,不在是netty链路上【有相关的配置应该可以指定使用哪部分线程池】

a、拿到请求头的操作
 public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame msg) throws Exception {TripleServerStream tripleServerStream = new TripleServerStream(ctx.channel(),frameworkModel, executor,pathResolver, acceptEncoding, filters);ctx.channel().attr(SERVER_STREAM_KEY).set(tripleServerStream);tripleServerStream.transportObserver.onHeader(msg.headers(), msg.isEndStream());
}@Override
public void onHeader(Http2Headers headers, boolean endStream) {executor.execute(() -> processHeader(headers, endStream));
}private void processHeader(Http2Headers headers, boolean endStream) {
....Invoker<?> invoker = getInvoker(headers, serviceName);ServerStream.Listener  listener = new ReflectionAbstractServerCall(invoker, TripleServerStream.this,frameworkModel, acceptEncoding, serviceName, originalMethodName, filters,executor);// must before onHeaderDeframer deframer = new TriDecoder(deCompressor, new ServerDecoderListener(listener));listener.onHeader(requestMetadata);
}protected void startCall() {RpcInvocation invocation = buildInvocation(methodDescriptor);ServerCall.Listener listener = startInternalCall(invocation, methodDescriptor, invoker);
}protected ServerCall.Listener startInternalCall() {switch (methodDescriptor.getRpcType()) {case UNARY:listener = new UnaryServerCallListener(invocation, invoker, responseObserver);request(2);break;case SERVER_STREAM:listener = new ServerStreamServerCallListener(invocation, invoker,responseObserver);request(2);break;case BI_STREAM:case CLIENT_STREAM:listener = new BiStreamServerCallListener(invocation, invoker,responseObserver);request(1);break;default:throw new IllegalStateException("Can not reach here");
}
return listener;  
}

上面的一系列操作主要是进行服务调用的前的数据准备以及相关的类封装

异步调用 processHeader 方法主要完成 以下几件事
  • getInvoker(headers, serviceName) 找到实际要调用的invoker
  • ReflectionAbstractServerCall 这个实际上实现了两个接口 ServerStream.Listener ,ServerCall 这里是为了 构造 ServerDecoderListener 所以使用 ServerStream.Listener 进行承接
  •  创建 Deframer的实现类 TriDecoder

明确一下:

   ServerStream.Listener listener

    Deframer deframer

     这两个类实例都在 TripleServerStream 的实例里面 这里是准备好了后续调用所需的类数据了

ServerStream.Listener.onHeader方法调用

这个方法调用主要是为了生成 ServerCall.Listener 这个生成的实例是属于 ReflectionAbstractServerCall 这个类实例的

所以 进过上面方法的调用 类的从属关系如下:

b、 拿到请求体操作
org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2FrameServerHandler#onDataReadorg.apache.dubbo.rpc.protocol.tri.stream.TripleServerStream.ServerTransportObserver#onDataorg.apache.dubbo.rpc.protocol.tri.stream.TripleServerStream.ServerTransportObserver#doOnDataorg.apache.dubbo.rpc.protocol.tri.frame.TriDecoder#deframeorg.apache.dubbo.rpc.protocol.tri.frame.TriDecoder#deliverorg.apache.dubbo.rpc.protocol.tri.frame.TriDecoder#processBodyorg.apache.dubbo.rpc.protocol.tri.frame.TriDecoder.Listener#onRawMessageorg.apache.dubbo.rpc.protocol.tri.stream.TripleServerStream.ServerStream.Listener#onRawMessageorg.apache.dubbo.rpc.protocol.tri.stream.Stream.Listener#onMessageorg.apache.dubbo.rpc.protocol.tri.call.UnaryServerCallListener#onMessageorg.apache.dubbo.rpc.RpcInvocation#setArgumentsorg.apache.dubbo.rpc.protocol.tri.frame.TriDecoder.Listener#closeorg.apache.dubbo.rpc.protocol.tri.stream.ServerStream.Listener#onCompleteorg.apache.dubbo.rpc.protocol.tri.call.ServerCall.Listener#onCompleteorg.apache.dubbo.rpc.protocol.tri.call.AbstractServerCallListener#invokeorg.apache.dubbo.rpc.protocol.tri.call.AbstractServerCallListener#onReturn  返回数据

总之数据就是一层一层往进传


http://www.ppmy.cn/news/607262.html

相关文章

Pytorch学习记录-torchtext和Pytorch的实例( 使用神经网络训练Seq2Seq代码)

Pytorch学习记录-torchtext和Pytorch的实例1 0. PyTorch Seq2Seq项目介绍 1. 使用神经网络训练Seq2Seq 1.1 简介&#xff0c;对论文中公式的解读 1.2 数据预处理 我们将在PyTorch中编写模型并使用TorchText帮助我们完成所需的所有预处理。我们还将使用spaCy来协助数据的标记化。…

大数据Apache Druid(八):Druid JDBC API和其他OLAP分析框架对比

文章目录 Druid JDBC API和其他OLAP分析框架对比 一、Druid JDBC API 1、首先需要在maven项目中导入Druid的依赖

[JS]在ACM模式下获取输入

输入输出处理 核心代码模式处理 不需要处理任何输入输出&#xff0c;直接返回值即可。 ACM 模式 你的代码需要处理输入输出&#xff0c;请使用如下样例代码读取输入和打印输出&#xff1a; while (line readline()) {var lines line.split( );var a parseInt(lines[0])…

[JS][dfs]题解 | #迷宫问题#

题解 | #迷宫问题# 题目链接 迷宫问题 题目描述 定义一个二维数组 N*M &#xff0c;如 5 5 数组下所示&#xff1a; int maze[5][5] { 0, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 1, 0, };它表示一个迷宫&#xff0c;其中的1表示墙壁&#x…

结巴分词原理介绍

20220331 jieba.cut与jieba.lcut的区别_stay_foolish12的博客-CSDN博客_jieba.lcut 20211130 ​​​​​​三种模式、两个函数掌握Python结巴分词重点功能 三种划分方法 1.结巴长度优先 2. 权重相同的时候 前面的先分 zhyl pk zh ylpk 结果是第一种 3.权重不相…

LLVM 编译器和工具链技术

LLVM 编译器和工具链技术 LLVM概述 LLVM项目是模块化和可重用的编译器和工具链技术的集合。尽管名称如此&#xff0c;LLVM与传统虚拟机几乎没有关系。LLVM本身不是首字母缩略词&#xff0c;项目的全名。 LLVM开始是作为伊利诺大学的一个研究项目&#xff0c;提供提一个现代化的…

【AUTOSAR】BMS开发实际项目讲解(一)----产品需求规格书

功能需求 信号 范围 精度要求 条件 备注 正常电压 9 &#xff5e;16 系统正常工作&#xff0c;满足所有功能与性能要求 通讯电压 6.5 &#xff5e;18 此电压范围内保证通信正常及信号有效性 低电压 0&#xff5e;6.5 不要求BMS工作 反向电压 -14 &#xff5e;0 …

大数据ClickHouse(十二):MergeTree系列表引擎之CollapsingMergeTree

文章目录 MergeTree系列表引擎之CollapsingMergeTree 一、CollapsingMergeTree基本讲解 二、测试实例