Zookeeper Java 开发,自定义分布式锁示例

news/2025/1/23 17:45:47/

文章目录

    • 一、概述
    • 二、导入依赖包
    • 三、创建锁的过程
      • 3.1 通过 create 创建节点信息
      • 3.2 AsyncCallback.StringCallback 回调函数
      • 3.3 AsyncCallback.Children2Callback 的回调函数
      • 3.4 Watcher 的回调函数
    • 四、完整示例
      • 4.1 完整分布式锁代码
      • 4.2 测试类

如果您还没有安装Zookeeper请看ZooKeeper 安装说明,Zookeeper 命令使用方法和数据说明,Zookeeper Java 开发入门。

一、概述

  • 情景:假设有10个客户端(分散的10台主机)要执行一个任务,这个任务某些过程需要保持原子性。那么我们就需要一个分布式锁。

  • 原理:通过在Zookeeper中创建序列节点来实现获得锁,删除节点来释放锁。其实质是一个按先来后到的排序过程,实现过程如下:

    • 客户端发起请求,创建锁序列节点(/lock/xxxxxxxx)

    • 获取所有锁节点,判断自己是否为最小节点

      • 如果自己是最小序列节点,则立即获得锁
      • 否则不能获得锁,但要监控前一个序列节点的状态
    • 获得锁的客户端开始执行任务。

    • 执行完任务后释放锁。

      • 由于后一个节点监控了前一个节点,当前一个节点删除时,后一个客户端会收到回调。

      • 在这个回调中再获取所有锁节点,判断自己是否为最小节点。

      • 以此类推,直到全部结束。

  • 流程如下

在这里插入图片描述

  • 如果您对没有做过 Zookeeper 开发,强烈建立先看 Zookeeper Java 开发入门。

二、导入依赖包

  • 在 pom.xml 文件中导入 Zookeeper 包,注意一般这个包的版本要和您安装的 Zookeeper 服务端版本一致。

    <dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.2</version>
    </dependency>
    

三、创建锁的过程

3.1 通过 create 创建节点信息

  • 通过 create 创建序列节点信息。他是异步方式,创建成功后会调用 AsyncCallback.StringCallback.processResult 回调函数。
    public void lock() throws InterruptedException, LockException {zooKeeper.create("/lock", "xxx".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, context);countDownLatch.await();if(StringUtils.isEmpty(this.lockNodePath)){throw new LockException("创建锁失败");}System.out.println(this.appName + " 获得锁");}

3.2 AsyncCallback.StringCallback 回调函数

  • 在 AsyncCallback.StringCallback 的回调函数中通过 getChildren 方法获取 ZooKeeper 锁节点下的所有节点信息。这个方法是异步的,调用成功后会调用 AsyncCallback.Children2Callback.processResult 回调函数。
    // AsyncCallback.StringCallback@Overridepublic void processResult(int i, String s, Object o, String s1) {if(StringUtils.isEmpty(s1)){// 这里是创建锁失败的情况。this.countDownLatch.countDown();return;}System.out.println(this.appName + " create lock node="+s1);this.lockNodePath = s1;// 获取 ZooKeeper 锁节点下的所有节点信息,以此来判断我是不是第一个创建节点,如果就获得锁,否则监控前一个节点信息。zooKeeper.getChildren("/", false, this, context);}

3.3 AsyncCallback.Children2Callback 的回调函数

  • 在 AsyncCallback.Children2Callback 的回调函数判断我是不是第一个创建节点,如果就获得锁,否则监控前一个节点信息。监控前一个节点信息使用 exists 方法,这个方法设置了 Watcher 的 processResult 回调函数
    // AsyncCallback.Children2Callback@Overridepublic void processResult(int i, String s, Object o, List<String> list, Stat stat) {Collections.sort(list);//        for (String s1 : list) {
//            System.out.println("\t "+this.lockNodePath+" previous lock node="+s1);
//        }int index = list.indexOf(lockNodePath.substring(1));if(0 == index){// 如果我现在是第一个节点,则获得锁try {zooKeeper.setData("/", this.lockNodePath.getBytes(), -1);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}this.countDownLatch.countDown();}else {// 我不是第一个节点,监控前一个节点信息(等他删除后我就是可能是第一个了)String watchNodePath = "/" + list.get(index - 1);System.out.println("\t "+this.lockNodePath+" watch node:"+ watchNodePath);zooKeeper.exists(watchNodePath, this, new StatCallback() {@Overridepublic void processResult(int i, String s, Object o, Stat stat) {}}, context);}}

3.4 Watcher 的回调函数

  • 在 Watcher 的回调函数,我们通过判断 watchedEvent.getType() 为 NodeDeleted 类型时,重新获取 ZooKeeper 锁节点下的所有节点信息,这使得消息回到了 “3.3”步,判断谁是第一个节点,然后获得得,完成整个流程。
    // Watcher@Overridepublic void process(WatchedEvent watchedEvent) {switch (watchedEvent.getType()) {case None:break;case NodeCreated:break;case NodeDeleted:zooKeeper.getChildren("/", false, this, context);break;case NodeDataChanged:break;case NodeChildrenChanged:break;case DataWatchRemoved:break;case ChildWatchRemoved:break;case PersistentWatchRemoved:break;}}

四、完整示例

4.1 完整分布式锁代码

package top.yiqifu.study.p131;import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class ZookeeperLock implements Watcher, AsyncCallback.StringCallback,AsyncCallback.Children2Callback {private String appName;private ZooKeeper zooKeeper;private Object context;private String lockNodePath;private CountDownLatch countDownLatch = new CountDownLatch(1);public ZookeeperLock(String name, ZooKeeper zk){this.appName = name;this.zooKeeper = zk;this.context = this;}public void lock() throws InterruptedException, LockException {zooKeeper.create("/lock", "xxx".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, context);countDownLatch.await();if(StringUtils.isEmpty(this.lockNodePath)){throw new LockException("创建锁失败");}System.out.println(this.appName + " 获得锁");}public void unlock() throws KeeperException, InterruptedException, LockException {if(StringUtils.isEmpty(this.lockNodePath)){throw new LockException("没有获得锁,无法释放锁");}zooKeeper.delete(lockNodePath, -1);System.out.println(this.appName + " 释放锁");}// AsyncCallback.StringCallback@Overridepublic void processResult(int i, String s, Object o, String s1) {if(StringUtils.isEmpty(s1)){// 这里是创建锁失败的情况。this.countDownLatch.countDown();return;}System.out.println(this.appName + " create lock node="+s1);this.lockNodePath = s1;// 获取 ZooKeeper 锁节点下的所有节点信息,以此来判断我是不是第一个创建节点,如果就获得锁,否则监控前一个节点信息。zooKeeper.getChildren("/", false, this, context);}// AsyncCallback.Children2Callback@Overridepublic void processResult(int i, String s, Object o, List<String> list, Stat stat) {Collections.sort(list);//        for (String s1 : list) {
//            System.out.println("\t "+this.lockNodePath+" previous lock node="+s1);
//        }int index = list.indexOf(lockNodePath.substring(1));if(0 == index){// 如果我现在是第一个节点,则获得锁try {zooKeeper.setData("/", this.lockNodePath.getBytes(), -1);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}this.countDownLatch.countDown();}else {// 我不是第一个节点,监控前一个节点信息(等他删除后我就是可能是第一个了)String watchNodePath = "/" + list.get(index - 1);System.out.println("\t "+this.lockNodePath+" watch node:"+ watchNodePath);zooKeeper.exists(watchNodePath, this, new StatCallback() {@Overridepublic void processResult(int i, String s, Object o, Stat stat) {}}, context);}}// Watcher@Overridepublic void process(WatchedEvent watchedEvent) {switch (watchedEvent.getType()) {case None:break;case NodeCreated:break;case NodeDeleted:zooKeeper.getChildren("/", false, this, context);break;case NodeDataChanged:break;case NodeChildrenChanged:break;case DataWatchRemoved:break;case ChildWatchRemoved:break;case PersistentWatchRemoved:break;}}public class LockException extends  Exception{public LockException(String message){super(message);}}
}

4.2 测试类

package top.yiqifu.study.p131;import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class Test06_ZookeeperLock {public static void main(String[] args) {try {// 创建 ZooKeeper 对象final ZooKeeper zooKeeper = testCreateZookeeper();int clientCount = 10;final CountDownLatch countDownLatch = new CountDownLatch(clientCount);for (int i = 0; i < clientCount; i++) {new Thread(new Runnable(){@Overridepublic void run() {TestLock(zooKeeper);countDownLatch.countDown();}}).start();}countDownLatch.await();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}private static void  TestLock(ZooKeeper zooKeeper){try {String appName = Thread.currentThread().getName();ZookeeperLock zookeeperLock = new ZookeeperLock(appName, zooKeeper);// 加锁(获得分布式锁)zookeeperLock.lock();System.out.println(appName + " 执行任务");Thread.sleep(1000);// 释放锁zookeeperLock.unlock();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();} catch (ZookeeperLock.LockException e) {e.printStackTrace();}}private static ZooKeeper testCreateZookeeper() throws IOException, InterruptedException, KeeperException {final CountDownLatch countDownLatch = new CountDownLatch(1);// ZooKeeper 集群地址(没连接池的概念,是Session的概念)//String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181";String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181/aaa";// ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。Integer sessionTimeout = 3000;// ZooKeeper Session 级别 Watcher(Watch只发生在读方法上,如 get、exists)final ZooKeeper zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {Event.KeeperState state = watchedEvent.getState();Event.EventType type = watchedEvent.getType();String path = watchedEvent.getPath();switch (state) {case Unknown:break;case Disconnected:break;case NoSyncConnected:break;case SyncConnected:countDownLatch.countDown();break;case AuthFailed:break;case ConnectedReadOnly:break;case SaslAuthenticated:break;case Expired:break;case Closed:break;}switch (type) {case None:break;case NodeCreated:break;case NodeDeleted:break;case NodeDataChanged:break;case NodeChildrenChanged:break;case DataWatchRemoved:break;case ChildWatchRemoved:break;case PersistentWatchRemoved:break;}System.out.println("Session watch state=" + state);System.out.println("Session watch type=" + type);System.out.println("Session watch path=" + path);} catch (Exception e) {e.printStackTrace();}}});countDownLatch.await();ZooKeeper.States state = zooKeeper.getState();switch (state) {case CONNECTING:break;case ASSOCIATING:break;case CONNECTED:break;case CONNECTEDREADONLY:break;case CLOSED:break;case AUTH_FAILED:break;case NOT_CONNECTED:break;}System.out.println("ZooKeeper state=" + state);return zooKeeper;}}

http://www.ppmy.cn/news/1222319.html

相关文章

Ubuntu/Debian Hat 系 Linux 使用

目录 1. Ubuntu/Debian Hat 系 Linux 使用1.1. 包1.1.1. Install Package1.1.2. Convert .rpm package to .deb1.1.3. Install RPM Package Directly Onto the System on Ubuntu 1. Ubuntu/Debian Hat 系 Linux 使用 1.1. 包 1.1.1. Install Package dpkg -i <name of pa…

Nginx(四) absolute_redirect、server_name_in_redirect、port_in_redirect 请求重定向指令组合测试

本篇文章主要用来测试absolute_redirect、server_name_in_redirect和port_in_redirect三个指令对Nginx请求重定向的影响&#xff0c;Nginx配置详解请参考另一篇文章 Nginx(三) 配置文件详解 接下来&#xff0c;在Chrome无痕模式下进行测试。 测试1&#xff1a;absolute_redi…

基于SpringBoot+Vue的新能源汽车充电桩管理系统

基于SpringBootVue的新能源汽车充电桩管理系统的设计与实现~ 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringBootMyBatisVue工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 主页 充电桩详情 管理员界面 摘要 本项目是基于Spring Boot 和 …

简单回顾矩阵的相乘(点乘)230101

[[1 0 1][1 1 0]] [[3 0 0 3][2 2 1 3][1 3 1 1]] [[4. 3. 1. 4.][5. 2. 1. 6.]]乘以 c11 a11*b11 a12*b21 a13*b31 1*3 0*2 1*1 4 c12 a11*b12 a12*b22 a13*b32 1*0 0*2 1*3 3 c13a11*b13 a12*b23a13*b33 c14a11*b14 a12*b24a13*b34 c21a21*b11 a22*b21 a23*b…

excel中的OFFSET函数

介绍 OFFSET函数是确定从基点出发移动后的引用区域。它有5个参数&#xff1a; 第1个参数是引用的参考基点区域第2个参数是移动的行数&#xff0c;正数代表向下移动的行数&#xff0c;负数代表向上移动的行数第3个参数是移动的列数&#xff0c;正数代表向右移动的列数&#xf…

【C++】数组中出现次数超过一半的数字

代码&#xff1a; class Solution { public:/*** 代码中的类名、方法名、参数名已经指定&#xff0c;请勿修改&#xff0c;直接返回方法规定的值即可** * param numbers int整型vector * return int整型*/int MoreThanHalfNum_Solution(vector<int>& numbers) {int …

2024清理mac苹果电脑内存免费工具CleanMyMac X4.15

当你使用苹果电脑时&#xff0c;内存的优化和清理变得至关重要。随着时间的推移&#xff0c;我们的电脑内存可能会变得拥挤&#xff0c;导致性能下降。清理内存可以提高电脑的速度和反应能力&#xff0c;并确保它始终在良好状态下运行。本文将向您介绍怎么清理苹果电脑内存的方…

vs2017打开工程提示若要解决此问题,请使用以下选择启动 Visual Studio 安装程序: 用于 x86 和 x64 的 Visual C++ MFC

下载 error MSB8036: 找不到 Windows SDK 版本8.1。请安装所需的版本的 Windows SDK 或者在项目属性页中或通过右键单击解决方案并选择“重定解决方案目标”来更改 SDK 版本。 error&#xff1a;D8016 “/ZI”和“/Gy-”命令行选项不兼容 ”问题解决