Zookeeper官网Java示例代码解读(一)

news/2024/9/17 23:16:50/ 标签: java-zookeeper, zookeeper, java

2024-08-22

1. 基本信息

  • 官网地址:
    https://zookeeper.apache.org/doc/r3.8.4/javaExample.html

  • 示例设计思路

Conventionally, ZooKeeper applications are broken into two units, one which maintains the connection, and the other which monitors data. In this application, the class called the Executor maintains the ZooKeeper connection, and the class called the DataMonitor monitors the data in the ZooKeeper tree. Also, Executor contains the main thread and contains the execution logic. It is responsible for what little user interaction there is, as well as interaction with the executable program you pass in as an argument and which the sample (per the requirements) shuts down and restarts, according to the state of the znode.

  • Demo的功能
    借助Zookeeper实现分布式环境中的配置文件实时更新

2. 环境准备

  • 准备一台虚拟机(也可以在本机启动ZooKeeper)
  • 安装ZooKeeper、JDK
  • 启动ZooKeeper Server
  • 启动客户端,创建znode,用于测试

3. 示例代码

3.1 Executor

java">package com.agileluo.zookeeperdemo.simple_watch;  /**  * A simple example program to use DataMonitor to start and * stop executables based on a znode. The program watches the * specified znode and saves the data that corresponds to the * znode in the filesystem. It also starts the specified program * with the specified arguments when the znode exists and kills * the program if the znode goes away. */import java.io.FileOutputStream;  
import java.io.IOException;  
import java.io.InputStream;  
import java.io.OutputStream;  import org.apache.zookeeper.KeeperException;  
import org.apache.zookeeper.WatchedEvent;  
import org.apache.zookeeper.Watcher;  
import org.apache.zookeeper.ZooKeeper;  public class Executor  implements Watcher, Runnable, DataMonitor.DataMonitorListener  
{  String znode;  DataMonitor dm;  ZooKeeper zk;  String filename;  String exec[];  Process child;  static{  System.setProperty("zookeeper.sasl.client", "false");  }  public Executor(String hostPort, String znode, String filename,  String exec[]) throws KeeperException, IOException {  this.filename = filename;  this.exec = exec;  zk = new ZooKeeper(hostPort, 3000, this);  dm = new DataMonitor(zk, znode, null, this);  }  /**  * @param args  */  public static void main(String[] args) {  if (args.length < 4) {  System.err  .println("USAGE: Executor hostPort znode filename program [args ...]");  System.exit(2);  }  String hostPort = args[0];  String znode = args[1];  String filename = args[2];  String exec[] = new String[args.length - 3];  System.arraycopy(args, 3, exec, 0, exec.length);  try {  new Executor(hostPort, znode, filename, exec).run();  } catch (Exception e) {  e.printStackTrace();  }  }  /***************************************************************************  * We do process any events ourselves, we just need to forward them on.     *     * @see org.apache.zookeeper.Watcher #process(org.apache.zookeeper.proto.WatcherEvent)  */    public void process(WatchedEvent event) {  dm.process(event);  }  public void run() {  try {  synchronized (this) {  while (!dm.dead) {  wait();  }  }  } catch (InterruptedException e) {  }  }  public void closing(int rc) {  synchronized (this) {  notifyAll();  }  }  static class StreamWriter extends Thread {  OutputStream os;  InputStream is;  StreamWriter(InputStream is, OutputStream os) {  this.is = is;  this.os = os;  start();  }  public void run() {  byte b[] = new byte[80];  int rc;  try {  while ((rc = is.read(b)) > 0) {  os.write(b, 0, rc);  }  } catch (IOException e) {  }  }  }  /**  * DataMonitor.DataMonitorListener 接口方法exists()的实现  * @param data  */  public void exists(byte[] data) {  if (data == null) { //zooKeeper客户端操作(delete /my_test)时触发  if (child != null) {  System.out.println("Killing process");  child.destroy();  try {  child.waitFor();  } catch (InterruptedException e) {  }  }  child = null;  } else {  //zooKeeper客户端操作(set /my_test test_data)时触发  if (child != null) {  System.out.println("Stopping child");  child.destroy();  try {  child.waitFor();  } catch (InterruptedException e) {  e.printStackTrace();  }  }  try { //将变化的配置写入文件,默认路径为项目源文件的根目录  FileOutputStream fos = new FileOutputStream(filename);  fos.write(data);  fos.close();  } catch (IOException e) {  e.printStackTrace();  }  try {  System.out.println("Starting child");  //从控制台读取命令行,并执行命令  child = Runtime.getRuntime().exec(exec);  new StreamWriter(child.getInputStream(), System.out);  new StreamWriter(child.getErrorStream(), System.err);  } catch (IOException e) {  e.printStackTrace();  }  }  }  
}

3.2 DataMonitor

java">package com.agileluo.zookeeperdemo.simple_watch;  /**  * A simple class that monitors the data and existence of a ZooKeeper * node. It uses asynchronous ZooKeeper APIs. */import java.util.Arrays;  import org.apache.zookeeper.KeeperException;  
import org.apache.zookeeper.WatchedEvent;  
import org.apache.zookeeper.Watcher;  
import org.apache.zookeeper.ZooKeeper;  
import org.apache.zookeeper.AsyncCallback.StatCallback;  
import org.apache.zookeeper.KeeperException.Code;  
import org.apache.zookeeper.data.Stat;  public class DataMonitor implements Watcher, StatCallback {  ZooKeeper zk;  String znode;  Watcher chainedWatcher;  boolean dead;  DataMonitorListener listener;  byte prevData[];  public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,  DataMonitorListener listener) {  this.zk = zk;  this.znode = znode;  this.chainedWatcher = chainedWatcher;  this.listener = listener;  // Get things started by checking if the node exists. We are going  // to be completely event driven        zk.exists(znode, true, this, null);  }  /**  * Other classes use the DataMonitor by implementing this method     */    public interface DataMonitorListener {  /**  * The existence status of the node has changed.         */        void exists(byte data[]);  /**  * The ZooKeeper session is no longer valid.         *         * @param rc  *                the ZooKeeper reason code  */        void closing(int rc);  }  public void process(WatchedEvent event) {  String path = event.getPath();  if (event.getType() == Event.EventType.None) {  // We are are being told that the state of the  // connection has changed            switch (event.getState()) {  case SyncConnected:  // In this particular example we don't need to do anything  // here - watches are automatically re-registered with                    // server and any watches triggered while the client was                    // disconnected will be delivered (in order of course)                    break;  case Expired:  // It's all over  dead = true;  listener.closing(KeeperException.Code.SessionExpired);  break;  }  } else {  if (path != null && path.equals(znode)) {  // Something has changed on the node, let's find out  zk.exists(znode, true, this, null);  }  }  if (chainedWatcher != null) {  chainedWatcher.process(event);  }  }  public void processResult(int rc, String path, Object ctx, Stat stat) {  boolean exists;  switch (rc) {  case Code.Ok:  exists = true;  break;  case Code.NoNode:  exists = false;  break;  case Code.SessionExpired:  case Code.NoAuth:  dead = true;  listener.closing(rc);  return;  default:  // Retry errors  zk.exists(znode, true, this, null);  return;  }  byte b[] = null;  if (exists) {  try {  b = zk.getData(znode, false, null);  } catch (KeeperException e) {  // We don't need to worry about recovering now. The watch  // callbacks will kick off any exception handling                e.printStackTrace();  } catch (InterruptedException e) {  return;  }  }  if ((b == null && b != prevData)  || (b != null && !Arrays.equals(prevData, b))) {  listener.exists(b);  prevData = b;  }  }  
}

4. 测试

运行Executor,参数传入: 192.168.206.100:2181 /my_test filename calc

其中192.168.206.100:2181为ZooKeeper的访问串;
/my_test 是预先创建的Znode
filename 是变动的Znode数据写入的文件,只保留最后的数据,
calc 指定执行完成后,此例为打开计算器(因为是在Windows下跑,所以可以有cmd,run,calc可以用来做测试)

5 注意点

5.1 防火墙

查看防火墙的状态
systemctl status firewalld.service

 firewalld.service - firewalld - dynamic firewall daemonLoaded: loaded (/usr/lib/systemd/system/firewalld.service; enabled; vendor preset: enabled)Active: active (running) since Tue 2024-08-27 19:41:00 PDT; 2s agoDocs: man:firewalld(1)Main PID: 2967 (firewalld)Tasks: 2CGroup: /system.slice/firewalld.service└─2967 /usr/bin/python2 -Es /usr/sbin/firewalld --nofork --nopid

关闭/开启VM的防火墙
systemctl stop|start firewalld.service

5.2 关闭SASL安全验证

Executor类中增加代码:

java">static{  System.setProperty("zookeeper.sasl.client", "false");  
}

http://www.ppmy.cn/news/1518513.html

相关文章

c-数据结构(顺序表、链表)

概念 对于n各元素的线性表&#xff0c;严格数学定义&#xff1a;其中任意一个数据元素a[i]&#xff0c;有且仅有一个前驱a[i-1]&#xff0c;有且仅有一个后继a[i1]&#xff1b;首元素a[0]无前驱&#xff0c;尾元素a[n-1]无后继。 顺序表 属于线性表&#xff0c;数据之间的空…

在 Java 中使用泛型时遇到的问题,,无法正确将响应数据映射为需要的数据

public <T> List<T> getOrderList(String shopId, Class<T> tClass) {// --- 省略一些中间过程----ParameterizedTypeReference<KeRuYunCommonResultVO<KPOSPageResultVO<T>>> responseType new ParameterizedTypeReference<KeRuYunCom…

Python日志,按日期分割日志文件(每天一个新的日志文件)

为了创建一个Python类来管理日志&#xff0c;并使其支持按日期分割日志文件&#xff08;每天一个新的日志文件&#xff09;&#xff0c;你可以使用Python标准库中的logging模块和logging.handlers.TimedRotatingFileHandler。下面是一个简单的示例&#xff0c;展示了如何实现这…

linux怎么安装Android Studio

方法一 下载安装包到linux系统解压 tar.gz文件的解压方式为 tar -zxvf 文件名&#xff08;tar -zxvf filename.tar.gz 命令的作用是&#xff0c;使用gzip解压缩&#xff08;-z&#xff09;&#xff0c;解包&#xff08;-x&#xff09;名为filename.tar.gz的归档文件&#xf…

使用PostgreSQL的CLI客户端查询数据不显示问题

问题 今天在使用PostgreSQL的命令行工具&#xff08;CLI&#xff09;查询数据时&#xff0c;数据不显示问题。 解决 使用CLI客户端登录数据库后&#xff0c;需要设置打印&#xff0c;设置边框为2: peterlocalhost testdb> \pset border 2或者&#xff0c;使用元组方式显…

clerk中authenticateWithRedirect方法讲解

clerk.authenticateWithRedirect 主要用于处理 Clerk 的 OAuth 登录过程&#xff0c;其工作流程大致如下&#xff1a; 1、用户发起登录请求&#xff1a; 用户点击登录按钮&#xff0c;触发 OAuth 登录流程。 2、重定向到 OAuth 提供商&#xff1a; clerk.authenticateWithRed…

回溯法-0/1背包问题

什么是回溯法&#xff1f; 回溯法是一种搜索算法&#xff0c;它通过深度优先搜索的方式来解决决策问题。它从根节点开始&#xff0c;逐步扩展节点&#xff0c;直到找到所有可能的解。 回溯法的基本思想 开始节点&#xff1a;从根节点出发&#xff0c;这个节点是解空间的起点…

word文档转html(只支持段落和表格)

maven依赖<dependency> <groupId>org.apache.poi</groupId> <artifactId>poi-ooxml</artifactId> <version>5.2.3</version> </dependency> import org.apache.poi.xwpf.usermodel.*;import java.io.*;public class Wor…

基于Java+SpringMvc+Vue求职招聘系统详细设计实现

基于JavaSpringMvcVue求职招聘系统详细设计实现 &#x1f345; 作者主页 网顺技术团队 &#x1f345; 欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; &#x1f345; 文末获取源码联系方式 &#x1f4dd; &#x1f345; 查看下方微信号获取联系方式 承接各种定制系统 &…

【60天备战软考高级系统架构设计师——第三天:软件工程原则与常用方法】

开篇 软件工程的原则和方法指导开发团队在项目中组织和管理代码及架构。这些原则和方法可以帮助团队提高软件的可维护性和可扩展性。今天&#xff0c;我将重点介绍软件工程中的一些基本原则以及常用方法和工具&#xff0c;帮助大家更好地应对实际开发中的挑战。 软件工程的基…

【Spring Boot 3】【Web】配置HTTPS

【Spring Boot 3】【Web】配置HTTPS 背景介绍开发环境开发步骤及源码工程目录结构背景 软件开发是一门实践性科学,对大多数人来说,学习一种新技术不是一开始就去深究其原理,而是先从做出一个可工作的DEMO入手。但在我个人学习和工作经历中,每次学习新技术总是要花费或多或…

【Android】UIMode

要修改 Android 设备的 UiMode&#xff08;用户界面模式&#xff09;&#xff0c;可以使用 UiModeManager 类进行设置。不同的 UI 模式适用于不同的使用场景&#xff0c;比如夜间模式、汽车模式等。下面是一些常见的修改方法&#xff1a; 1. 修改夜间模式 夜间模式可以通过 U…

Ubuntu/Linux 配置 locale

文章目录 Ubuntu/Linux 配置 locale1 概述2 locale2.1 locale 规则命令规则环境变量优先级 2.2 查看当前 locale 设置2.3 查看当前系统所有可用的 locale2.4 安装中文 locale 语言环境/字符集2.5 安装 locales 包2.6 使用 locale-gen 命令生成语言支持2.7 设置当前默认字符集 3…

基于机器学习的工业制造缺陷分析预测系统

B站视频及代码下载&#xff1a;基于机器学习的工业制造缺陷分析预测系统-视频-代码 1. 项目简介 制造缺陷是工业生产过程中面临的重大挑战之一&#xff0c;对产品质量和生产效率产生直接影响。准确预测和分析制造缺陷的发生&#xff0c;可以帮助企业提高生产质量、降低成本&…

【Linux】:文件IO

目录 1.C文件接口 1.1 当前路径是什么&#xff1f; 1.2 "w"和"a"​编辑 2.系统文件I/O 2.1 "比特宏"标识符的实现: 2.2 open 1.系统默认创建文件的权限只写 2.设置新建文件的权限 3. 覆盖写/清空写/追加写 3.访问文件的本质 3.1 文件…

Java 入门指南:Java Socket 网络通信编程

Socket Socket&#xff08;套接字&#xff09;是用于网络通信的编程接口、网络通信的基础&#xff0c;通过它可以实现不同计算机之间的数据传输&#xff0c;应用程序可以通过它发送或接收数据&#xff1b;就像操作文件那样可以打开、读写和关闭。它提供了一种机制&#xff0c;…

Scrcpy手机投屏投屏到电脑上(windows/mac)

项目场景&#xff1a; 在开发app程序时&#xff0c;需要进行投屏演示程序。市面上有很多可用软件&#xff0c;但是体验感不友好&#xff08;1收费、2操作繁琐&#xff0c;3手机端PC端都需要下载对应的软件&#xff09;很头痛&#xff01;&#xff01;&#xff01; Scrcpy是真的…

需方软件供应链安全保障要求及开源场景对照自评表(上)

国标《信息安全技术 软件供应链安全要求》确立了软件供应链安全目标&#xff0c;规定了软件供应链安全风险管理要求和供需双方的组织管理和供应活动管理安全要求。 开源软件供应链作为软件供应链的一种特殊形式&#xff0c;该国标亦适用于指导开源软件供应链中的供需双方开展组…

灵神算法题单——不定长滑动窗口(求最长最大)

1208. 尽可能使字符串相等 简单的滑动窗口三部曲&#xff1a;移入窗口、是否移出、更新结果。 算差值这里采用abs()函数来实现 class Solution { public:int equalSubstring(string s, string t, int maxCost) {int ls.size();int ant0,miINT_MIN;for(int i0,j0;i<l;i){a…

什么是IP地域封锁?有什么作用?

在互联网的广阔世界里&#xff0c;信息流通无界&#xff0c;但出于安全、管理或特定业务需求&#xff0c;对访问者的地域进行限制成为了一种常见的做法。这就是所谓的“IP地域封锁”。本文将深入探讨IP地域封锁的定义、实施方式以及其在实际应用中的作用。 一、IP地域封锁的定…