一、服务订阅与发现
服务提供者暴漏服务的时候会向注册中心注册服务信息,当服务消费者引入服务的时候会去订阅服务提供者信息。RegistryDirectory#subscribe
public void subscribe(URL url) {setSubscribeUrl(url);consumerConfigurationListener.addNotifyListener(this);referenceConfigurationListener = new ReferenceConfigurationListener(url.getOrDefaultModuleModel(), this, url);registry.subscribe(url, this);
}
RegistryDirectory本身实现了接口RegistryService,具备服务的注册与订阅功能。
public interface RegistryService {/*** Register data, such as : provider service, consumer address, route rule, override rule and other data.** @param url Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin*/void register(URL url);/*** Unregister** @param url Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin*/void unregister(URL url);/*** Subscribe to eligible registered data and automatically push when the registered data is changed.consumer://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin* @param listener A listener of the change event, not allowed to be empty*/void subscribe(URL url, NotifyListener listener);/*** Unsubscribeconsumer://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin* @param listener A listener of the change event, not allowed to be empty*/void unsubscribe(URL url, NotifyListener listener);/*** Query the registered data that matches the conditions. Corresponding to the push mode of the subscription, this is the pull mode and returns only one result.** @param url Query condition, is not allowed to be empty, e.g. consumer://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin* @return The registered information list, which may be empty, the meaning is the same as the parameters of {@link org.apache.dubbo.registry.NotifyListener#notify(List<URL>)}.* @see org.apache.dubbo.registry.NotifyListener#notify(List)*/List<URL> lookup(URL url);}
当订阅服务提供方url的时候,需要提供一个NotifyListener对象 。方法动作很好理解,register 方法就是将url 写入注册中心,subscribe 则将监听器注册到url 上,当服务url 有变化时,则触发监听器的notify 方法,重新生成invoker。
public void doSubscribe(final URL url, final NotifyListener listener) {try {//......省略部分代码CountDownLatch latch = new CountDownLatch(1);try {List<URL> urls = new ArrayList<>();for (String path : toCategoriesPath(url)) {ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, path, k, latch));if (zkListener instanceof RegistryChildListenerImpl) {((RegistryChildListenerImpl) zkListener).setLatch(latch);}zkClient.create(path, false);List<String> children = zkClient.addChildListener(path, zkListener);if (children != null) {urls.addAll(toUrlsWithEmpty(url, path, children));}}notify(url, listener, urls);} finally {// tells the listener to run only after the sync notification of main thread finishes.latch.countDown();}} catch (Throwable e) {throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);}
}
public interface NotifyListener {/*** Triggered when a service change notification is received.* @param urls The list of registered information , is always not empty. The meaning is the same as the return value of {@link org.apache.dubbo.registry.RegistryService#lookup(URL)}.*/void notify(List<URL> urls);default void addServiceListener(ServiceInstancesChangedListener instanceListener) {}default URL getConsumerUrl() {return null;}}
以zookeeper为例,当某个服务发生变动,notify 触发回来的urls 信息也同样包含这些信息
toCategoriesPath会根据消费者提供的url信息,将其转换为服务提供者的url,分别是服务提供者的【providers,configurators,routers】目录
private String[] toCategoriesPath(URL url) {String[] categories;if (ANY_VALUE.equals(url.getCategory())) {categories = new String[]{PROVIDERS_CATEGORY, CONSUMERS_CATEGORY, ROUTERS_CATEGORY, CONFIGURATORS_CATEGORY};} else {categories = url.getCategory(new String[]{DEFAULT_CATEGORY});}String[] paths = new String[categories.length];for (int i = 0; i < categories.length; i++) {paths[i] = toServicePath(url) + PATH_SEPARATOR + categories[i];}return paths;
}
监听触发逻辑在notify 方法中,主要职责便是监听到的url 信息转化为invoker 实体,提供给Dubbo 使用。为了性能,在RegistryDirectory 中可以看到有很多的缓存容器,
urlInvokerMap/methodInvokerMap/cachedInvokerUrls 等用来缓存服务的信息。也就是说,notify 的作用是更改这些缓存信息,而Dubbo在rpc 过程中,则是直接使用缓存中的信息。
notify过程会调用到RegistryDirectory#toInvokers方法将url转换为invoker。
private Map<URL, Invoker<T>> toInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, List<URL> urls) {Map<URL, Invoker<T>> newUrlInvokerMap = new ConcurrentHashMap<>(urls == null ? 1 : (int) (urls.size() / 0.75f + 1));if (urls == null || urls.isEmpty()) {return newUrlInvokerMap;}String queryProtocols = this.queryMap.get(PROTOCOL_KEY);for (URL providerUrl : urls) {// ......省略部分代码URL url = mergeUrl(providerUrl);// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer againInvoker<T> invoker = oldUrlInvokerMap == null ? null : oldUrlInvokerMap.remove(url);if (invoker == null) { // Not in the cache, refer againtry {boolean enabled = true;if (url.hasParameter(DISABLED_KEY)) {enabled = !url.getParameter(DISABLED_KEY, false);} else {enabled = url.getParameter(ENABLED_KEY, true);}if (enabled) {invoker = protocol.refer(serviceType, url);}} catch (Throwable t) {logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);}if (invoker != null) { // Put new invoker in cachenewUrlInvokerMap.put(url, invoker);}} else {newUrlInvokerMap.put(url, invoker);}}return newUrlInvokerMap;
}
这里要强调一下,在Dubbo 中,URL 是整个服务发布和调用流程的串联信息,它包含了服务的基本信息(服务名、服务方法、版本、分组),注册中心配置,应用配置等等信息,还包括在dubbo 的消费端发挥作用的各种组件信息如:filter、loadbalance、cluster 等等。
在消费端notify 中收到这些url 信息时,意味着这个组件信息也已经得到了。Dubbo 此时便扩展逻辑,来加入这些组件功能了。
最后,总结下服务订阅与发现机制:
基于注册中心的事件通知(订阅与发布),一切支持事件订阅与发布的框架都可以作为Dubbo 注册中心的选型。
服务提供者在暴露服务时,会向注册中心注册自己,具体就是在${serviceinterface}/providers 目录下添加一个节点(临时),服务提供者需要与注册中心保持长连接,一旦连接断掉(重试连接)会话信息失效后,注册中心会认为该服务提供者不可用(提供者节点会被删除)。
消费者在启动时,首先也会向注册中心注册自己,具体在${interfaceinterface}/consumers 目录下创建一个节点。
消费者订阅${service interface}/ [ providers、configurators、routers ]三个目录,这些目录下的节点删除、新增事件都会通知消费者,根据通知的url信息,重新生成服务调用器(Invoker)。