Zookeeper:Zookeeper JavaAPI操作与分布式锁

server/2024/11/14 13:12:16/

文章目录

  • 一、Zookeeper JavaAPI操作
    • 1、Curator介绍
    • 2、创建、查询、修改、删除节点
    • 3、Watch事件监听
  • 二、Zookeeper分布式锁原理

一、Zookeeper JavaAPI操作

1、Curator介绍

  • Curator是Apache Zookeeper的Java客户端。
  • 常见的Zookeeper Java API:
    • 原生Java API。
    • ZkClient。
    • Curator。
  • Curator项目目标是简化Zookeeper客户端的使用。
  • Curator最初是Netfix研发的,后来捐献了Apache基金会,目前是Apache的顶级项目。
  • 官网:https://curator.apache.org/docs/about

2、创建、查询、修改、删除节点

public class CuratorTest {private CuratorFramework client;@Beforepublic void init() {// 1、方式一RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("localhost:2181", 60 * 1000, 15 * 1000, retryPolicy);// 2、方式二CuratorFramework client = CuratorFrameworkFactory.builder().connectString("localhost:2181").sessionTimeoutMs(60 * 1000).connectionTimeoutMs(15 * 1000).retryPolicy(retryPolicy).namespace("test").build();// 开启连接client.start();this.client = client;}/*** 1、基本创建:client.create().forPath("/app1")* 2、创建节点,带有数据:client.create().forPath("/app1", data)* 3、设置节点的类型: client.create().withMode(CreateMode.EPHEMERAL).forPath("/app1")* 4、创建多级节点: client.create().creatingParentsIfNeeded().forPath("/app1/app2")*/@Testpublic void testCreate() {// 1、基本创建// 如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储try {String path = client.create().forPath("/app1");System.out.println(path);} catch (Exception e) {throw new RuntimeException(e);}}/*** 查询节点:* 1、查询数据:get* 2、查询子节点: ls* 3、查询节点状态信息: ls -s*/@Testpublic void testQueryData() {// 1、查询数据: gettry {byte[] data = client.getData().forPath("/app1");System.out.println(new String(data));} catch (Exception e) {throw new RuntimeException(e);}}@Testpublic void testQueryChildren() {// 查询子节点: lstry {List<String> stringList = client.getChildren().forPath("/");System.out.println(stringList);} catch (Exception e) {throw new RuntimeException(e);}}@Testpublic void testQueryState() {// 查询节点状态信息: ls -sStat status = new Stat();try {client.getData().storingStatIn(status).forPath("/app1");System.out.println(status);} catch (Exception e) {throw new RuntimeException(e);}}/*** 修改数据:* 1、修改数据。* 2、根据版本修改*/@Testpublic void testSet() {// 修改数据try {client.setData().forPath("/app1", "haha".getBytes());} catch (Exception e) {throw new RuntimeException(e);}}@Testpublic void testSetForVersion() throws Exception {// 根据版本修改Stat stat = new Stat();// 查询节点状态信息: ls -sclient.getData().storingStatIn(stat).forPath("/app1");int version = stat.getVersion();client.setData().withVersion(version).forPath("/app1", "hehe".getBytes());}/*** 删除节点:delete、deleteall* 1、删除单个节点* 2、删除带有子节点的节点* 3、必须成功的删除: 为了防止网络抖动。本质就是重试。* 4、回调*/@Testpublic void testSingleDelete() throws Exception {// 1、删除单个节点client.delete().forPath("/app1");// 2、删除带有子节点的节点client.delete().deletingChildrenIfNeeded().forPath("/app1");// 3、必须成功删除client.delete().guaranteed().forPath("/app1");//4、回调client.delete().guaranteed().inBackground((client, event) -> {System.out.println("我被删除了");System.out.println(event);}).forPath("/app1");}@Afterpublic void close() {if (client != null) {client.close();}}
}

3、Watch事件监听

  • Zookeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,Zookeeper服务端会将事件通知到感兴趣的客户端上去,该机制是Zookeeper实现分布式协调服务的重要性。
  • Zookeeper中引入了Watcher机制来实现了发布/订阅功能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。
  • Zookeeper原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便需要开发人员自己反复注册Watcher,比较繁琐。
  • Curator引入了Cache来实现对Zookeeper服务端事件的监听。
  • Zookeeper提供了三种Watcher:
    • NodeCache:只是监听某一个特定的节点。
    • PathChildrenCache:监控一个ZNode的子节点。
    • TreeCache:可以监控整个树上所有节点,类似于PathChildrenCache和NodeCache的组合。

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;public class CuratorWatcherTest {private CuratorFramework client;@Beforepublic void init() {// 1、方式一RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("localhost:2181", 60 * 1000, 15 * 1000, retryPolicy);// 2、方式二CuratorFramework client = CuratorFrameworkFactory.builder().connectString("localhost:2181").sessionTimeoutMs(60 * 1000).connectionTimeoutMs(15 * 1000).retryPolicy(retryPolicy).namespace("test").build();// 开启连接client.start();this.client = client;}@Testpublic void testPathChildrenCache() {// 1、创建监听对象PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true);// 2、绑定监听器pathChildrenCache.getListenable().addListener((curatorFramework, event) -> {System.out.println("- 子节点变化了 -");System.out.println(event);// 监听子节点的数据变更,并且拿到变更后的数据// 获取类型PathChildrenCacheEvent.Type type = event.getType();// 判断类型是否是updateif (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {System.out.println("数据变了");byte[] data = event.getData().getData();System.out.println(new String(data));}});}@Testpublic void testTreeCache() {// 创建监听器TreeCache treeCache = new TreeCache(client, "/app2");// 2、注册监听器treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event) throws Exception {System.out.println("节点变化了");System.out.println(event);}});}@Afterpublic void close() {if (client != null) {client.close();}}}

二、Zookeeper分布式锁原理

1、 分布式

  • 在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题。
  • 但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程间的锁解决同步问题。
  • 那么就需要一种更加高级的锁机制,来处理这种跨机器的进程之间的数据同步问题–分布式锁。
    在这里插入图片描述

2、Zookeeper分布式

(1)原理

核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除节点。

  • 客户端获取锁时,在lock节点下创建临时顺序节点。
  • 然后获取lock下面所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁,使用完锁后,删除该节点。
  • 如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。
  • 如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是lock子节点中序列号最小的,如果是则获取锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。
    在这里插入图片描述

在这里插入图片描述

(2)Curator实现分布式锁API

在Curator中有五种锁方案:

  • InterProcessSemaphoreMutex:分布式排他锁(非重入锁)。
  • InterProcessMutex:分布式可重入排他锁。
  • InterProcessReadWriteLock:分布式读写锁。
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器。
  • InterProcessSemaphoreV2:共享信号量。

http://www.ppmy.cn/server/58205.html

相关文章

给您介绍工控CAN总线

CAN是什么 CAN&#xff0c;全称Controller Area Network&#xff0c;即控制器局域网&#xff0c;是一种由Bosch公司在1983年开发的通信协议。它主要用于汽车和工业环境中的电子设备之间的通信。CAN协议定义了物理层和数据链路层的通信机制&#xff0c;使得不同的设备能够通过CA…

redis运维:sentinel模式如何查看所有从节点

1. 连接到sentinel redis-cli -h sentinel_host -p sentinel_port如&#xff1a; redis-cli -h {域名} -p 200182. 发现Redis主服务器 连接到哨兵后&#xff0c;我们可以使用SENTINEL get-master-addr-by-name命令来获取当前的Redis主服务器的地址。 SENTINEL get-master-a…

Vulkan 学习(1)---- Vulkan 基本概念和发展历史

目录 Vulkan及其演化史Vulkan 基本概念基本术语 Vulkan 的原理Vulkan应用程序Vulkan的编程模型硬件初始化窗口展示表面资源设置流水线设置描述符和描述符缓冲池基于SPIR-V的着色器流水线管理指令的记录队列的提交 Vulkan及其演化史 目前主流的图形渲染API有OpenGL、OpenGL ES、…

JVM 堆内存分配过程

设置堆内存大小和 OOM Java 堆用于存储 Java 对象实例&#xff0c;那么堆的大小在 JVM 启动的时候就确定了&#xff0c;我们可以通过 -Xmx 和 -Xms 来设定 -Xms 用来表示堆的起始内存&#xff0c;等价于 -XX:InitialHeapSize-Xmx 用来表示堆的最大内存&#xff0c;等价于 -XX…

【selenium】元素等待

【selenium】元素等待 1、三种元素等待的区别2、sleep等待3、隐式等待4、显示等待4.1 WebDriverWait类4.2 expected_conditions类 1、三种元素等待的区别 特点sleep隐式等待显示等待原理不论网页/元素是否加载完成&#xff0c;都会强制等待x秒设置一个等待时间&#xff0c;等待…

ggplot2绘图点的形状不够用怎么办?

群里有这么一个问题&#xff1a; 请问老师&#xff0c;fviz_pca_ind 做pca&#xff0c;当设置geom.ind “point”&#xff0c;group>6时&#xff0c;就不能显示第7&#xff0c;8组的点&#xff0c;应该如何处理&#xff08;在不设置为文本的情况下&#xff09;&#xff0c;…

hdu物联网硬件实验3 按键和中断

学院 班级 学号 姓名 日期 成绩 实验题目 按键和中断 实验目的 实现闪灯功能转换 硬件原理 无 关键代码及注释 /* Button Turns on and off a light emitting diode(LED) connected to digital pin 13, when pressing a pushbutton attached…

Monaco 添加 CodeAction

Monaco 中的 CodeAction 会在当前代码行上方添加一个&#x1f4a1;&#xff0c;点击&#x1f4a1;时出现一个命令列表&#xff0c;名列列表可以点击&#xff0c;在 VSCode 中很常见的就是代码出现问题&#xff0c;点击会给出修改建议。 Monaco 中添加 CodeAction&#xff0c;…