Dubbo源码解析-服务订阅与发现(八)

ops/2024/11/25 7:59:57/

一、服务订阅与发现

服务提供者暴漏服务的时候会向注册中心注册服务信息,当服务消费者引入服务的时候会去订阅服务提供者信息。RegistryDirectory#subscribe

public void subscribe(URL url) {setSubscribeUrl(url);consumerConfigurationListener.addNotifyListener(this);referenceConfigurationListener = new ReferenceConfigurationListener(url.getOrDefaultModuleModel(), this, url);registry.subscribe(url, this);
}

 RegistryDirectory本身实现了接口RegistryService,具备服务的注册与订阅功能。

public interface RegistryService {/*** Register data, such as : provider service, consumer address, route rule, override rule and other data.** @param url  Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin*/void register(URL url);/*** Unregister** @param url Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin*/void unregister(URL url);/*** Subscribe to eligible registered data and automatically push when the registered data is changed.consumer://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin* @param listener A listener of the change event, not allowed to be empty*/void subscribe(URL url, NotifyListener listener);/*** Unsubscribeconsumer://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin* @param listener A listener of the change event, not allowed to be empty*/void unsubscribe(URL url, NotifyListener listener);/*** Query the registered data that matches the conditions. Corresponding to the push mode of the subscription, this is the pull mode and returns only one result.** @param url Query condition, is not allowed to be empty, e.g. consumer://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin* @return The registered information list, which may be empty, the meaning is the same as the parameters of {@link org.apache.dubbo.registry.NotifyListener#notify(List<URL>)}.* @see org.apache.dubbo.registry.NotifyListener#notify(List)*/List<URL> lookup(URL url);}

当订阅服务提供方url的时候,需要提供一个NotifyListener对象 。方法动作很好理解,register 方法就是将url 写入注册中心,subscribe 则将监听器注册到url 上,当服务url 有变化时,则触发监听器的notify 方法,重新生成invoker。

public void doSubscribe(final URL url, final NotifyListener listener) {try {//......省略部分代码CountDownLatch latch = new CountDownLatch(1);try {List<URL> urls = new ArrayList<>();for (String path : toCategoriesPath(url)) {ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, path, k, latch));if (zkListener instanceof RegistryChildListenerImpl) {((RegistryChildListenerImpl) zkListener).setLatch(latch);}zkClient.create(path, false);List<String> children = zkClient.addChildListener(path, zkListener);if (children != null) {urls.addAll(toUrlsWithEmpty(url, path, children));}}notify(url, listener, urls);} finally {// tells the listener to run only after the sync notification of main thread finishes.latch.countDown();}} catch (Throwable e) {throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);}
}
public interface NotifyListener {/*** Triggered when a service change notification is received.* @param urls The list of registered information , is always not empty. The meaning is the same as the return value of {@link org.apache.dubbo.registry.RegistryService#lookup(URL)}.*/void notify(List<URL> urls);default void addServiceListener(ServiceInstancesChangedListener instanceListener) {}default URL getConsumerUrl() {return null;}}

 以zookeeper为例,当某个服务发生变动,notify 触发回来的urls 信息也同样包含这些信息

 toCategoriesPath会根据消费者提供的url信息,将其转换为服务提供者的url,分别是服务提供者的【providers,configurators,routers】目录

private String[] toCategoriesPath(URL url) {String[] categories;if (ANY_VALUE.equals(url.getCategory())) {categories = new String[]{PROVIDERS_CATEGORY, CONSUMERS_CATEGORY, ROUTERS_CATEGORY, CONFIGURATORS_CATEGORY};} else {categories = url.getCategory(new String[]{DEFAULT_CATEGORY});}String[] paths = new String[categories.length];for (int i = 0; i < categories.length; i++) {paths[i] = toServicePath(url) + PATH_SEPARATOR + categories[i];}return paths;
}

监听触发逻辑在notify 方法中,主要职责便是监听到的url 信息转化为invoker 实体,提供给Dubbo 使用。为了性能,在RegistryDirectory 中可以看到有很多的缓存容器,
urlInvokerMap/methodInvokerMap/cachedInvokerUrls 等用来缓存服务的信息。也就是说,notify 的作用是更改这些缓存信息,而Dubbo在rpc 过程中,则是直接使用缓存中的信息。

notify过程会调用到RegistryDirectory#toInvokers方法将url转换为invoker。

private Map<URL, Invoker<T>> toInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, List<URL> urls) {Map<URL, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1));if (urls == null || urls.isEmpty()) {return newUrlInvokerMap;}String queryProtocols = this.queryMap.get(PROTOCOL_KEY);for (URL providerUrl : urls) {// ......省略部分代码URL url = mergeUrl(providerUrl);// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer againInvoker<T> invoker = oldUrlInvokerMap == null ? null : oldUrlInvokerMap.remove(url);if (invoker == null) { // Not in the cache, refer againtry {boolean enabled = true;if (url.hasParameter(DISABLED_KEY)) {enabled = !url.getParameter(DISABLED_KEY, false);} else {enabled = url.getParameter(ENABLED_KEY, true);}if (enabled) {invoker = protocol.refer(serviceType, url);}} catch (Throwable t) {logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);}if (invoker != null) { // Put new invoker in cachenewUrlInvokerMap.put(url, invoker);}} else {newUrlInvokerMap.put(url, invoker);}}return newUrlInvokerMap;
}

这里要强调一下,在Dubbo 中,URL 是整个服务发布和调用流程的串联信息,它包含了服务的基本信息(服务名、服务方法、版本、分组),注册中心配置,应用配置等等信息,还包括在dubbo 的消费端发挥作用的各种组件信息如:filter、loadbalance、cluster 等等。
在消费端notify 中收到这些url 信息时,意味着这个组件信息也已经得到了。Dubbo 此时便扩展逻辑,来加入这些组件功能了。

最后,总结下服务订阅与发现机制:

基于注册中心的事件通知(订阅与发布),一切支持事件订阅与发布的框架都可以作为Dubbo 注册中心的选型。
服务提供者在暴露服务时,会向注册中心注册自己,具体就是在${serviceinterface}/providers 目录下添加一个节点(临时),服务提供者需要与注册中心保持长连接,一旦连接断掉(重试连接)会话信息失效后,注册中心会认为该服务提供者不可用(提供者节点会被删除)。
消费者在启动时,首先也会向注册中心注册自己,具体在${interfaceinterface}/consumers 目录下创建一个节点。
消费者订阅${service interface}/ [ providers、configurators、routers ]三个目录,这些目录下的节点删除、新增事件都会通知消费者,根据通知的url信息,重新生成服务调用器(Invoker)。


http://www.ppmy.cn/ops/136515.html

相关文章

SQL注入--布尔盲注--理论

布尔盲注的原理 在SQL注入时&#xff0c;我们查询的数据不会直接回显在界面上&#xff0c;且界面对正确的查询和错误的查询有不同的回显&#xff0c;我们就可以使用布尔盲注。 比如sqli-labs的第八题&#xff0c;当我们输入的查询语句是正确的时候&#xff0c;回显就是you ar…

ftdi_sio应用学习笔记 5 - SPI

目录 1. 查找设备 2. 打开设备 3. 验证 3.1 遍历设备 3.2 打开关闭设备 3.3 读flash id SPI的SCK/MOSI/MISO分别对应&#xff08;A/B)D0/D1/D2&#xff0c;其他IO作为CS。和I2C一样&#xff0c;最大支持2路MPSSE通道&#xff0c;一路MPSSE最大13路SPI。 #define FTDI_DE…

Perforce《2024游戏技术现状报告》Part3:生成式AI、版本控制、CI/CD等游戏技术的未来趋势与应用

游戏开发者一直处于创新前沿。他们的实践、工具和技术受到各行各业的广泛关注&#xff0c;正在改变着组织进行数字创作的方式。 近期&#xff0c;Perforce发布了《2024游戏技术现状报告》&#xff0c;通过收集来自游戏、媒体与娱乐、汽车和制造业等高增长行业的从业者、管理人…

二.LoadBalancer负载均衡服务调用(1)

1.spring-cloud-loadbalancer概述 (1)官网 2.客户端负载VS服务器端负载 loadbalancer本地负载均衡客户端VSNginx服务端负载均衡区别 Nginx是服务器端负载均衡,客户端所有请求都会交给nignx,然后由nginx实现转发请求,即负载均衡是由服务端实现的 loadbalancer本地负载均衡,在…

使用var/let/const的选择

对于var的使用 1、需要明白一个事实&#xff0c;var所表现出来的特殊性:比如作用域提升、window全局对象、没有块级作用域等都是一些历史遗留问题&#xff1b; 2、其实是JavaScript在设计之初的一种语言缺陷&#xff1b; 3、在实际工作中&#xff0c;我们可以使用最新的规范来…

HashMap的寻址算法(源码分析)

建议先看完我这篇文章HashMap底层原理-CSDN博客 hashmap插入值的时候&#xff0c;是如何找到数组索引位置的呢&#xff1f; 例如下图左边四个连续红点&#xff0c;是如何在插入的时候定位到了数组下标为3的位置&#xff1f; 来看看put方法的源码&#xff0c;里面有个hash&…

洛谷 B2038:奇偶 ASCII 值判断

【题目来源】https://www.luogu.com.cn/problem/B2038http://shnoip.openjudge.cn/level1/39/【题目描述】 任意输入一个字符&#xff0c;判断其 ASCII 是否是奇数&#xff0c;若是&#xff0c;输出 YES&#xff0c;否则&#xff0c;输出 NO。 例如&#xff0c;字符 A 的 ASCII…

MQ重复消费与消息顺序

如何避免消息重复消费 RocketMQ&#xff1a;给每个消息分配了一个MessageID。这个MessageID就可以作为消费者判断幂等的依据。这种方式不太建议&#xff0c;原因是在高并发场景下这个MessageID不保证全局唯一性。 最好由业务方创建一个与业务相关的全局唯一的ID来区分消息&am…