Zookeeper的监听机制及原理解析

devtools/2024/10/18 16:52:38/

系列文章目录

手把手教你安装Zookeeper 及可视化插件ZooInspector、ZKUI
Zookeeper入门篇,了解ZK存储特点

使用Zookeeper的监听及原理解析

  • 系列文章目录
  • 前言
  • 一、监听机制的基本概念
  • 二、Zookeeper监听原理
    • 1. 事件类型
    • 2. 监听模式与监听器类型
    • 3. 监听原理
      • (1)基础概念
      • (2)监听触发处理
  • 三、Zookeeper监听的使用Demo


前言

在这里插入图片描述

ZK在现在之所以能非常好用,它便捷的监听功能是很重要的,本次我们就以监听为题,分析一下ZK的监听是怎么设计和管理的,并在文末写了个demo验证我们的所学

📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 Zookeeper 专栏,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙 mysql Redis dubbo docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待


一、监听机制的基本概念

其实对于监听,我们并不陌生,我们曽在 Spring专栏 中提到过 《Spring监听器用法与原理详解》,其主要是基于观察者模式,如下图就是一个经典的观察者模型

在这里插入图片描述

Zookeeper的监听机制其实也是基于观察者模式,这种模式允许客户端在数据节点发生变化时得到通知。

而且最通俗的解释就是,当我想监听某个主题的变动时,就会向该主题登记一个观察者。最后当主题真的触发时,就遍历观察者列表,向每个观察者通知该事件。

二、Zookeeper监听原理

1. 事件类型

不管什么监听器,肯定都有自己想监听的内容,也即监听事件。只有当我想看的事件被触发时,才会让我的监听器有所反应。而ZK则提供了以下几种事件类型

  • NodeCreated:节点创建
  • NodeDeleted:节点被删除
  • NodeDataChanged:节点数据变更
  • NodeChildrenChanged:子节点变更
  • DataWatchRemoved:数据监听器被移除
  • ChildWatchRemoved:子节点监听器被移除
  • PersistentWatchRemoved 永久化监听器被移除

需要注意的是,事件操作并不是一回事。比如我们新增一个节点。它其实会触发当前节点的节点创建 和其父节点的子节点变更 两个事件。

同样,我们也不难发现,前4个事件是针对节点进行变更的事件,也是我们最常用的。而后面3种其实是监听器移除事件

2. 监听模式与监听器类型

(1)监听模式

明白了事件,我们再来看一下针对这些事件,我们能用怎样的方式来监听,也即监听模式

  • STANDARD 标准监听
  • PERSISTENT 永久监听
  • PERSISTENT_RECURSIVE 永久递归监听

所谓标准监听,其实就是某个节点的监听器一旦被触发了,这个监听器就会被删除,也就是所谓“一次性”监听。
永久监听就是永久存在,不会被删,可以一直触发。而永久递归监听则代表这个监听器不仅可以监听这个节点的事件,还能监听到该节点的所有子节点的事件,而且可以永久存在。

需要注意的是,监听模式可以叠加出不同的监听状态,比如说一个永久递归的监听器,可以再给他加一个标准监听,此时如果再删除永久递归监听器,那么还能够剩下一个标准监听器在工作,具体原理在源码的 WatchStats 部分

public final class WatchStats {private static final WatchStats[] WATCH_STATS = new WatchStats[] {new WatchStats(0), // NONEnew WatchStats(1), // STANDARDnew WatchStats(2), // PERSISTENTnew WatchStats(3), // STANDARD + PERSISTENTnew WatchStats(4), // PERSISTENT_RECURSIVEnew WatchStats(5), // STANDARD + PERSISTENT_RECURSIVEnew WatchStats(6), // PERSISTENT + PERSISTENT_RECURSIVEnew WatchStats(7), // STANDARD + PERSISTENT + PERSISTENT_RECURSIVE};/*** Stats that have no watchers attached.** <p>This could be used as start point to compute new stats using {@link #addMode(WatcherMode)}.*/public static final WatchStats NONE = WATCH_STATS[0];private final int flags;private WatchStats(int flags) {this.flags = flags;}private static int modeToFlag(WatcherMode mode) {return 1 << mode.ordinal();}/*** Compute stats after given mode attached to node.** @param mode watcher mode* @return a new stats if given mode is not attached to this node before, otherwise old stats*/public WatchStats addMode(WatcherMode mode) {int flags = this.flags | modeToFlag(mode);return WATCH_STATS[flags];}public WatchStats removeMode(WatcherMode mode) {int mask = ~modeToFlag(mode);int flags = this.flags & mask;if (flags == 0) {return NONE;}return WATCH_STATS[flags];}/*** Check whether given mode is attached to this node.** @param mode watcher mode* @return true if given mode is attached to this node.*/public boolean hasMode(WatcherMode mode) {int flags = modeToFlag(mode);return (this.flags & flags) != 0;}
}

(2)监听器类型

需要注意的是,知道了所有的事件类型,以及能选择的监听的模式。其实监听器怎么弄,完全取决于你,理论上能做出 m * n 种类型,但ZK在源码中其实做了归纳,只提供了五种类型

其中 ChildrenDataAny 是最开始提供的,也是非常容易理解,因为对事件我们也能归纳为 子节点事件数据事件,所以监听器归纳成 子节点监听器数据监听器 很合理。而PersistentPersistentRecursive 监听器则是在在后续加上的。主要是因为只归纳成这几类的话,如果想要单独删除永久化的监听器就没法做了。加入了这样的枚举后,就能指定更具体的监听器类型进行删除了。

在这里插入图片描述

3. 监听原理

(1)基础概念

知晓了事件类型 与 监听器类型 后,我们再来讲讲监听原理,其实监听整理起来主要就是两个结构和三个步骤。

因为节点和监听器是多对多的关系,一个节点能被多个监听器监听,一个监听器也能监听多个节点。所以两个结构就分别从节点、监听器的角度来对监听关系进行归纳,在源码中就是两个 HashMap:watchTablewatch2Paths

// key 为某个节点的具体路径, value 为该节点的所有监听器集合
private final Map<String, Set<Watcher>> watchTable = new HashMap<>();// key 为某个监听器,value 值为该监听器在不同路径下的监听状态
private final Map<Watcher, Map<String, WatchStats>> watch2Paths = new HashMap<>();

而三个步骤其实也很简单:

  1. 客户端注册监听器: 客户端通过创建一个监听器(Watcher)并将其注册到Zookeeper服务器上的指定节点上。
    在这里插入图片描述

  2. 节点变更通知: 当节点发生变化时(如节点数据被修改、节点被创建或删除等),Zookeeper服务器会将变更通知发送给所有对该节点注册了监听器的客户端。同时处理该监听器,如下图,标准监听器watch 1被触发后会在该节点上被删除,而永久监听器watch还能继续留存。

在这里插入图片描述

  1. 客户端处理节点变更: 客户端在收到节点变更通知后,会调用注册的监听器进行处理。客户端可以根据具体的业务需求,对节点变更进行相应的处理逻辑,如重新读取节点数据、重新注册监听器等。Watch 接口如下:

在这里插入图片描述

不难看出,Zookeeper监听机制的核心是Watcher接口通知机制。从整个流程来说,我们可以细分为3个步骤:

Watcher接口:Watcher接口是Zookeeper提供的一个回调接口,在客户端注册监听器时需要实现该接口。该接口中只有一个process方法,当节点发生变化时,Zookeeper会调用该方法通知客户端。

通知机制:Zookeeper的通知机制是基于事件触发的。当注册了Watcher的节点发生变化时,Zookeeper会生成一个事件,并将该事件放入事件队列中。客户端线程会从事件队列中获取事件并进行处理。

(2)监听触发处理

我们看一下,当某个节点发生变动后,它是怎么找到该节点的监听器,并触发它的。我们直接看源码并配上注释

    // WatchManager.java
/*** 触发watch事件** @param path    节点路径* @param type    事件类型* @param zxid    事务ID* @param acl     节点的ACL列表* @param supress 指定不触发的Watcher* @return 返回触发事件的Watcher或者BitSet*/
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, List<ACL> acl, WatcherOrBitSet supress) {// 创建WatchedEvent对象WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);// 创建Watcher集合Set<Watcher> watchers = new HashSet<>();synchronized (this) {// 遍历节点路径的父路径,因为父路径上可能有递归的监听器,也需要监听到此事件PathParentIterator pathParentIterator = getPathParentIterator(path);for (String localPath : pathParentIterator.asIterable()) {// 获取对应路径的Watcher集合Set<Watcher> thisWatchers = watchTable.get(localPath);// 如果Watcher集合为空,直接跳过if (thisWatchers == null || thisWatchers.isEmpty()) {continue;}// 遍历Watcher集合Iterator<Watcher> iterator = thisWatchers.iterator();while (iterator.hasNext()) {Watcher watcher = iterator.next();// 获取Watcher对应的路径映射Map<String, WatchStats> paths = watch2Paths.getOrDefault(watcher, Collections.emptyMap());// 获取Watcher对应路径的状态WatchStats stats = paths.get(localPath);// 如果状态为空,输出警告日志并跳过if (stats == null) {LOG.warn("inconsistent watch table for watcher {}, {} not in path list", watcher, localPath);continue;}// 如果不是在父路径上,则添加Watcherif (!pathParentIterator.atParentPath()) {watchers.add(watcher);// 【【【重要】】】:移除STANDARD模式的状态,而不是触发就删除,可见 2(1)的监听模式WatchStats newStats = stats.removeMode(WatcherMode.STANDARD);// 如果新的状态为空,则移除Watcher和路径映射if (newStats == WatchStats.NONE) {iterator.remove();paths.remove(localPath);} else if (newStats != stats) {paths.put(localPath, newStats);}} else if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {// 父路径当前只会有永久递归监听器能响应子节点事件,且响应完不会删除监听器,所以直接把该监听器触发,不用做其他操作watchers.add(watcher);}}// 如果Watcher集合为空,从watchTable中移除该路径if (thisWatchers.isEmpty()) {watchTable.remove(localPath);}}}// 如果Watcher集合为空,返回nullif (watchers.isEmpty()) {if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path);}return null;}// 触发Watcher的事件处理方法for (Watcher w : watchers) {if (supress != null && supress.contains(w)) {continue;}if (w instanceof ServerWatcher) {((ServerWatcher) w).process(e, acl);} else {w.process(e);}}// 根据事件类型更新服务器指标switch (type) {case NodeCreated:ServerMetrics.getMetrics().NODE_CREATED_WATCHER.add(watchers.size());break;case NodeDeleted:ServerMetrics.getMetrics().NODE_DELETED_WATCHER.add(watchers.size());break;case NodeDataChanged:ServerMetrics.getMetrics().NODE_CHANGED_WATCHER.add(watchers.size());break;case NodeChildrenChanged:ServerMetrics.getMetrics().NODE_CHILDREN_WATCHER.add(watchers.size());break;default:// Other types not logged.break;}// 返回Watcher或BitSetreturn new WatcherOrBitSet(watchers);
}

概括一下上述代码的逻辑:

    1. 首先,根据提供的节点路径,遍历该节点的所有父路径。
    1. 对于本路径及其每个父路径,获取与之关联的Watcher集合。如果Watcher集合为空,直接跳过。
    1. 逐个遍历Watcher集合中的Watcher对象,并根据其关联的路径状态来判断是否应该触发监听事件。
      – 如果当前不在父路径上,则添加Watcher到触发集合中,并更新该路径的状态。
      – 如果是父路径上的递归Persistent_Watcher模式,则添加Watcher到触发集合中。
    1. 如果触发集合中的Watcher为空,则结束。
    1. 逐个触发Watcher集合中的Watcher对象的事件处理方法,传入相应的参数。
    1. 根据事件的类型,更新服务器指标(例如,节点被创建的监听器数量)。
    1. 返回触发事件的Watcher集合或者BitSet对象

这里还有一点细节是比较有趣的,就是监听器虽然依附于节点,但并不意味着如果我们删除了节点,该节点的监听器就一定会消失。比如你在某节点设置了一个永久监听器,即使这个结点被删除了,它在 watchTablewatch2Paths 中也不会被删除,即节点与监听器的关联还在,所以当节点后续又被创建的时候,这个监听器仍然可以使用

三、Zookeeper监听的使用Demo

现在我们使用一个Demo来使用一下ZK的监听功能,我们打算使用一个永久监听器监听多个节点

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.List;public class ZooKeeperListenerDemo implements Watcher {private static final String ZK_SERVER = "localhost:2181";private static final int SESSION_TIMEOUT = 3000;private static final String[] PATHS = {"/zhanfu", "/zhanfu2"}; // 需要监听的路径private ZooKeeper zooKeeper;public static void main(String[] args) {ZooKeeperListenerDemo listenerDemo = new ZooKeeperListenerDemo();listenerDemo.connectZooKeeper();listenerDemo.registerWatchers();// 测试监听器,程序会一直运行,直到被中断try {Thread.sleep(Long.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();}listenerDemo.close();}// 连接ZooKeeper服务器public void connectZooKeeper() {try {zooKeeper = new ZooKeeper(ZK_SERVER, SESSION_TIMEOUT, this);System.out.println("Connected to ZooKeeper server: " + ZK_SERVER);} catch (IOException e) {e.printStackTrace();}}// 注册监听器public void registerWatchers() {for (String path : PATHS) {try {// AddWatchMode.PERSISTENT 即为永久监听器zooKeeper.addWatch(path, this, AddWatchMode.PERSISTENT);System.out.println("Registered watcher for path: " + path);} catch (KeeperException | InterruptedException e) {e.printStackTrace();}}}// 处理ZooKeeper事件public void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {if (event.getType() == Event.EventType.None && event.getPath() == null) {System.out.println("Connected to ZooKeeper server");} else {// 一般监听器只会处理部分事件,这里就不做限制,只打印日志System.out.println("Event received: " + event.getType() + ", path: " + event.getPath());// 在此处理收到的事件}}}// 关闭ZooKeeper连接public void close() {try {zooKeeper.close();System.out.println("ZooKeeper connection closed.");} catch (InterruptedException e) {e.printStackTrace();}}
}

最后我们运行一下,并修改节点内容、删除再重新添加节点,可以看到该监听器为永久的,一直在生效

在这里插入图片描述


http://www.ppmy.cn/devtools/91053.html

相关文章

qt的项目结构

目录 创建新的项目 第一个hell0程序&#xff0c;qt的项目结构 main函数 Widget头文件: pro文件 命名规范 QtCreator 常用快捷键 Qt里边绝大部分的类都是继承自QObject是一个顶层类 父子关系 Qt坐标系 QT常用API函数 对象树 信号和槽机制 自定义信号和槽 自定义信号…

【C++】初识面向对象:类与对象详解

C语法相关知识点可以通过点击以下链接进行学习一起加油&#xff01;命名空间缺省参数与函数重载C相关特性 本章将介绍C中一个重要的概念——类。通过类&#xff0c;我们可以类中定义成员变量和成员函数&#xff0c;实现模块化封装&#xff0c;从而构建更加抽象和复杂的工程。 &…

使用python爬取今日头条热搜

今天无意间找到了今日头条热搜的接口链接&#xff0c;顺手写了一个爬取今日头条热搜的爬虫&#xff0c;并保存到excel中 今日头条热搜接口 https://www.toutiao.com/hot-event/hot-board/?origintoutiao_pc&_signature_02B4Z6wo00f01yG9tdQAAIDCQrd1vxaJp9chmbFAAKpR4Dqk0…

JavaScript秒值转换为年月日时间字符串

当前效果&#xff1a; 因为后端传递过来的是秒值&#xff0c;显示的时候也是秒值。 但是这种不太友好&#xff0c;所以需要转换为 “xxxx年xx月xx日 xx:xx:xx” 的格式。 参考代码&#xff1a; formatDate (now) {const date new Date(now)var y date.getFullYear() // 年…

嵌入式面试准备

信号量 在Linux中&#xff0c;根据是否有唯一的名称&#xff0c;分为有名信号量和无名信号量。 无名信号量 无名信号量不是通过名称标识&#xff0c;而是直接通过sem_t结构的内存位置标识。 有名信号 sem_close()&#xff1a;关闭对应sem指向的有名信号量的引用&#xff0c;…

批发行业进销存-登录适配 android 横竖屏幕 源码CyberWinApp-SAAS 本地化及未来之窗行业应用跨平台架构

一、横竖屏切换的意义 以下是移动端横屏竖屏可切换在进销存中的一些重要应用&#xff1a; a、数据录入与查看 在录入商品信息、库存数量等大量数据时&#xff0c;横屏模式可以提供更宽阔的输入区域&#xff0c;减少输入错误。例如&#xff0c;在输入长串的商品编码或详细的商…

【算法设计题】基于front、rear和count的循环队列初始化、入队和出队操作,第6题(C/C++)

目录 第3题 基于front、rear和count的循环队列初始化、入队和出队操作 得分点&#xff08;必背&#xff09; 题解&#xff1a;基于front、rear和count的循环队列初始化、入队和出队操作 数据结构定义 代码解答 详细解释 1. 循环队列初始化 2. 循环队列入队 3. 循环队列…

简单的后端生成令牌,前端获取,然后ajax设置header标头,后端进行对比

在 Web 应用中实现令牌失效&#xff08;Token Expiration&#xff09;通常涉及到两个方面&#xff1a;客户端的令牌使用和服务器端的令牌验证 <html><meta http-equiv"content-type" content"text/html;charsetUTF-8"/><title>javascri…