Zookeeper Java SDK 开发入门

news/2025/2/13 15:21:48/

文章目录

    • 一、概述
    • 二、导入依赖包
    • 三、与 Zookeeper 建立连接
    • 四、判断 ZooKeeper 节点是否存在
    • 四、创建 ZooKeeper 节点数据
    • 五、获取 ZooKeeper 节点数据
    • 六、修改 ZooKeeper 节点数据
    • 七、异步获取 ZooKeeper 节点数据
    • 八、完整示例

如果您还没有安装Zookeeper请看ZooKeeper 安装说明,Zookeeper 命令使用方法和数据说明。

一、概述

  • ZooKeeper是一个开源的、分布式的协调服务,它主要用于分布式系统中的数据管理和协调任务。它提供了一个具有高可用性的分布式环境,用于存储和管理小规模数据,例如配置信息、命名服务、分布式锁等。

  • 本文主要介绍如何使用 Java 与 ZooKeeper 建立连接,进行数据创建、修改、读取、删除等操作。

  • 源码地址:https://github.com/apache/zookeeper

    在这里插入图片描述

二、导入依赖包

  • 在 pom.xml 文件中导入 Zookeeper 包,注意一般这个包的版本要和您安装的 Zookeeper 服务端版本一致。

    <dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.2</version>
    </dependency>

三、与 Zookeeper 建立连接

  • 与ZooKeeper集群建立连接使用 ZooKeeper 类,传递三个参数,分别是
    • connectionString ,是ZooKeeper 集群地址(没连接池的概念,是Session的概念)
    • sessionTimeout , 是ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。
    • watcher, ZooKeeper Session 级别监听器( Watcher),(Watch只发生在读方法上,如 get、exists等)
    private static ZooKeeper testCreateZookeeper() throws IOException, InterruptedException, KeeperException {final CountDownLatch countDownLatch = new CountDownLatch(1);// ZooKeeper 集群地址(没连接池的概念,是Session的概念)String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181";// ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。Integer sessionTimeout = 3000;// ZooKeeper Session 级别 Watcher(Watch只发生在读方法上,如 get、exists)final ZooKeeper zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {Event.KeeperState state = watchedEvent.getState();Event.EventType type = watchedEvent.getType();String path = watchedEvent.getPath();switch (state) {case Unknown:break;case Disconnected:break;case NoSyncConnected:break;case SyncConnected:countDownLatch.countDown();break;case AuthFailed:break;case ConnectedReadOnly:break;case SaslAuthenticated:break;case Expired:break;case Closed:break;}switch (type) {case None:break;case NodeCreated:break;case NodeDeleted:break;case NodeDataChanged:break;case NodeChildrenChanged:break;case DataWatchRemoved:break;case ChildWatchRemoved:break;case PersistentWatchRemoved:break;}System.out.println("Session watch state=" + state);System.out.println("Session watch type=" + type);System.out.println("Session watch path=" + path);} catch (Exception e) {e.printStackTrace();}}});// 由于建立连接是异步的,这里先阻塞等待连接结果countDownLatch.await();ZooKeeper.States state = zooKeeper.getState();switch (state) {case CONNECTING:break;case ASSOCIATING:break;case CONNECTED:break;case CONNECTEDREADONLY:break;case CLOSED:break;case AUTH_FAILED:break;case NOT_CONNECTED:break;}System.out.println("ZooKeeper state=" + state);return zooKeeper;}

四、判断 ZooKeeper 节点是否存在

  • 创建节点数据使用 exists 方法,传递四个参数
    • path , 表示节点目录名称
    • watch, 表示监听器(只对该路径有效)
    • stat, 判断结果回调函数
    • context, 自定义上下文对象
    private static void testExists(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {// 判断 ZooKeeper 节点是否存在Object context = new Object();zooKeeper.exists("/yiqifu", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}}, new AsyncCallback.StatCallback() {@Overridepublic void processResult(int i, String s, Object o, Stat stat) {if(null != stat){System.out.println("ZooKeeper /yiqifu 节点存在");}else {System.out.println("ZooKeeper /yiqifu 节点不存在");}}}, context);}

四、创建 ZooKeeper 节点数据

  • 创建节点数据使用 create 方法,传递四个参数
    • path , 表示节点目录名称
    • data , 表示节点数据
   private static void testCreateNode(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException 	{// 在 ZooKeeper 中创建节点String nodeName = zooKeeper.create("/yiqifu", "test create data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);System.out.println("ZooKeeper 创建节点成功:" + nodeName);}

五、获取 ZooKeeper 节点数据

  • 获取 ZooKeeper 节点数据使用 getData 方法,传递三个参数
    • path , 表示节点目录名称
    • watch, 表示路径级别的监听器,这个监听器只对该路径下的数据操作监听生效。
    private static void testGetdata(final ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {// 获取 ZooKeeper 节点数据,这里设置了Path级Watchfinal Stat stat = new Stat();byte[] nodeData = zooKeeper.getData("/yiqifu", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {Event.KeeperState state = watchedEvent.getState();Event.EventType type = watchedEvent.getType();String path = watchedEvent.getPath();System.out.println("Path watch state=" + state);System.out.println("Path watch type=" + type);System.out.println("Path watch path=" + path);//zooKeeper.getData("/yiqifu", true, stat); // 这里会使用Session级WatchzooKeeper.getData("/yiqifu", this, stat);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}, stat);System.out.println("ZooKeeper 同步获取节点数据:" + new String(nodeData));}

六、修改 ZooKeeper 节点数据

  • 修改 ZooKeeper 节点数据使用 setData 方法,传递三个参数
    • path , 表示节点目录名称。
    • data, 表示新数据。
    • version, 表示数据版本。
    private static void testSetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {// 更新 ZooKeeper 节点数据(修改数据会触发Watch)zooKeeper.setData("/yiqifu", "test modify data 1 ".getBytes(), 0);zooKeeper.setData("/yiqifu", "test modify data 2 ".getBytes(), 1);}

七、异步获取 ZooKeeper 节点数据

  • 修改 ZooKeeper 节点数据使用 getData 方法,传递三个参数

    • path , 表示节点目录名称。

    • watch, 表示是否触发监听器。

    • dataCallback, 表示异步获取数据的回调函数。

 private static void testAsyncGetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {// 获取 ZooKeeper 节点数据(使用异步回调方式)Object context = new Object();zooKeeper.getData("/yiqifu", false, new AsyncCallback.DataCallback() {@Overridepublic void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {System.out.println("ZooKeeper 同步获取节点数据的目录:"+s);System.out.println("ZooKeeper 异步获取节点数据:"+new String(bytes));}}, context);}

八、完整示例

package top.yiqifu.study.p131;import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.concurrent.CountDownLatch;public class Test01_Zookeeper {public static void main(String[] args) {try {// 创建 ZooKeeper 对象ZooKeeper zooKeeper = testCreateZookeeper();// 在 ZooKeeper 创建数据节点testCreateNode(zooKeeper);// 在 ZooKeeper 中同步获取节点数据testGetdata(zooKeeper);// 在 ZooKeeper 中更新节点数据testSetdata(zooKeeper);// 在 ZooKeeper 异步获取节点数据testAsyncGetdata(zooKeeper);Thread.sleep(3000);} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}private static ZooKeeper testCreateZookeeper() throws IOException, InterruptedException, KeeperException {final CountDownLatch countDownLatch = new CountDownLatch(1);// ZooKeeper 集群地址(没连接池的概念,是Session的概念)String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181";// ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。Integer sessionTimeout = 3000;// ZooKeeper Session 级别 Watcher(Watch只发生在读方法上,如 get、exists)final ZooKeeper zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {Event.KeeperState state = watchedEvent.getState();Event.EventType type = watchedEvent.getType();String path = watchedEvent.getPath();switch (state) {case Unknown:break;case Disconnected:break;case NoSyncConnected:break;case SyncConnected:countDownLatch.countDown();break;case AuthFailed:break;case ConnectedReadOnly:break;case SaslAuthenticated:break;case Expired:break;case Closed:break;}switch (type) {case None:break;case NodeCreated:break;case NodeDeleted:break;case NodeDataChanged:break;case NodeChildrenChanged:break;case DataWatchRemoved:break;case ChildWatchRemoved:break;case PersistentWatchRemoved:break;}System.out.println("Session watch state=" + state);System.out.println("Session watch type=" + type);System.out.println("Session watch path=" + path);} catch (Exception e) {e.printStackTrace();}}});countDownLatch.await();ZooKeeper.States state = zooKeeper.getState();switch (state) {case CONNECTING:break;case ASSOCIATING:break;case CONNECTED:break;case CONNECTEDREADONLY:break;case CLOSED:break;case AUTH_FAILED:break;case NOT_CONNECTED:break;}System.out.println("ZooKeeper state=" + state);return zooKeeper;}private static void testCreateNode(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {// 在 ZooKeeper 中创建节点String nodeName = zooKeeper.create("/yiqifu", "test create data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);System.out.println("ZooKeeper 创建节点成功:" + nodeName);}private static void testGetdata(final ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {// 获取 ZooKeeper 节点数据,这里设置了Path级Watchfinal Stat stat = new Stat();byte[] nodeData = zooKeeper.getData("/yiqifu", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {Event.KeeperState state = watchedEvent.getState();Event.EventType type = watchedEvent.getType();String path = watchedEvent.getPath();System.out.println("Path watch state=" + state);System.out.println("Path watch type=" + type);System.out.println("Path watch path=" + path);//zooKeeper.getData("/yiqifu", true, stat); // 这里会使用Session级WatchzooKeeper.getData("/yiqifu", this, stat);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}, stat);System.out.println("ZooKeeper 同步获取节点数据:" + new String(nodeData));}private static void testSetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {// 更新 ZooKeeper 节点数据(修改数据会触发Watch)zooKeeper.setData("/yiqifu", "test modify data 1 ".getBytes(), 0);zooKeeper.setData("/yiqifu", "test modify data 2 ".getBytes(), 1);}private static void testAsyncGetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {// 获取 ZooKeeper 节点数据(使用异步回调方式)Object context = new Object();zooKeeper.getData("/yiqifu", false, new AsyncCallback.DataCallback() {@Overridepublic void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {System.out.println("ZooKeeper 同步获取节点数据的目录:"+s);System.out.println("ZooKeeper 异步获取节点数据:"+new String(bytes));}}, context);}
}

http://www.ppmy.cn/news/1221193.html

相关文章

[msg_msg] corCTF2021 -- fire_of_salvation

前言 msg_msg 是 kernel pwn 中经常用作堆喷的结构体. 其包含一个 0x30 大小的 header. 但 msg_msg 的威力远不如此, 利用 msg_msg 配合其他堆漏洞可以实现任意地址读写的功能. 程序分析 本题给了源码, 可以直接对着源码看. 并且题目给了编译配置文件, 所以可以直接编译一个…

Python常用的第三方库----requests

文章目录 1. 发送GET请求代码示例:运行结果:2. 发送POST请求代码示例:运行结果:3. 上传文件代码示例:运行结果:4. 处理Cookies代码示例:运行结果:5. 自定义请求头代码示例:运行结果:6. 错误处理代码示例:运行结果:

Kafka 集群如何实现数据同步?

哈喽大家好&#xff0c;我是咸鱼 最近这段时间比较忙&#xff0c;将近一周没更新文章&#xff0c;再不更新我那为数不多的粉丝量就要库库往下掉了 T﹏T 刚好最近在学 Kafka&#xff0c;于是决定写篇跟 Kafka 相关的文章&#xff08;文中有不对的地方欢迎大家指出&#xff09;…

计算机专业“潜规则”必坑指南,给计算机专业学生的忠告

首先恭喜你选择计算机专业&#xff0c;计算机专业无疑是近几年来最火热的专业之一&#xff0c;但是计算机专业也有很多坑&#xff0c;避过这些坑你的人生更精彩&#xff0c;下面给大家整理了一些大学的具体操作&#xff0c;可以看下哦。如果能帮助到你&#xff0c;请给个关注和…

Python地理数据处理 24:基于arcpy批量操作汇总(六)

arcpy批量处理 1、裁剪 1、裁剪 目的&#xff1a; 对指定路径下的所有子文件中的tif影像进行批量裁剪&#xff0c;并生成对应的文件夹&#xff0c;保存裁剪后的tif文件。 # -*- coding: cp936 -*- import arcpy import os arcpy.CheckOutExtension("spatial")# 创建…

【WPF系列】- Window详解

【WPF系列】- Window详解 文章目录 【WPF系列】- Window详解一、概述二、WPF中Window类三、Window类实现四、Window属性五、方法六、事件七、参考 一、概述 用户通过窗口与Windows Presentation Foundation&#xff08;WPF&#xff09;应用程序交互。窗口的主要用途是托管使用…

【极客时间-系列教程】深入剖析Kubernetes-预习篇 · 小鲸鱼大事记(二):崭露头角

文章目录 崭露头角机遇厮杀结论 崭露头角 机遇 机会就在能解决痛点&#xff0c;这不是PaaS平台存在的痛点&#xff0c;dotCloud缺能解决&#xff0c;选择开源了自家的一个容器项目 Docker&#xff0c;自此开启了"Docker"的全新时代 Docker的商标是"鲸鱼"…

用电子签章软件怎么给标书一键签章的小故事

在这个数字化时代&#xff0c;电子签章已经成为了商务往来的重要一环。作为国内电子签章软件的佼佼者&#xff0c;微签凭借其19年的电子签研发应用经验&#xff0c;为中小企业提供了安全可靠的电子签章软件服务。 从审批场景到合同签署&#xff0c;微签都展现出卓越的电子签章…