Zookeeper 官方示例2-SyncPrimitive 代码解读(二)

server/2024/10/18 12:32:33/

测试命令
java jar .\ZookeeperDemo-0.0.1-SNAPSHOT.jar bTest 192.168.206.100:2181 2

1. Barrier(阻塞原语)

1.1 概念

[!quote] A barrier is a primitive that enables a group of processes to synchronize the beginning and the end of a computation. The general idea of this implementation is to have a barrier node that serves the purpose of being a parent for individual process nodes. Suppose that we call the barrier node "/b1". Each process "p" then creates a node "/b1/p". Once enough processes have created their corresponding nodes, joined processes can start the computation.

  • 阻塞是一个原语,它使一组进程能够同时开始计算。此实现的总体思想是拥有一个屏障节点,用于作为单个流程节点的父节点。
  • 假设我们将障碍节点称为“/b1”。然后,每个进程“ p”创建一个节点“/b1/p”。一旦有足够多的进程创建了相应的节点,联合进程就可以开始计算了。
  • 场景:当有些操作需要所有参与者全部准备好之后才能开始执行,并且对每个参与者来说必须等待所有参与者全部执行完毕,才算执行完毕。于是就需要一个屏障,来控制所有参与者同时开始,并等待所有参与者全部结束。

1.2 设计

  • 创建一个/b1的znode的持久化节点。
  • enter() 模拟往阻塞里增加执行进程(Join barrier)。往znode下增加子节点,并判断子节点数是否满足指定的个数n。若未满足条件则继续等待;反之则返回true。
  • leave() 模拟进程执行完毕后的离开(Wait until all reach barrier)。删除znode的子节点,并判断子节点是否大于0,若大于0则表示还有子进程没有执行完。

源码:

package com.agileluo.zookeeperdemo.barriers;  
import java.io.IOException;  
import java.net.InetAddress;  
import java.net.UnknownHostException;  
import java.nio.ByteBuffer;  
import java.util.List;  
import java.util.Random;  
import java.lang.Integer;  
import org.apache.commons.lang3.RandomStringUtils;  
import org.apache.zookeeper.CreateMode;  
import org.apache.zookeeper.KeeperException;  
import org.apache.zookeeper.WatchedEvent;  
import org.apache.zookeeper.Watcher;  
import org.apache.zookeeper.ZooKeeper;  
import org.apache.zookeeper.ZooDefs.Ids;  
import org.apache.zookeeper.data.Stat;  /**  * 1. Queue test * 1.1 Start a producer to create 100 elements *    java SyncPrimitive qTest localhost 100 p * 1.2 Start a consumer to consume 100 elements *    java SyncPrimitive qTest localhost 100 c * * 2.Barrier test * Start a barrier with 2 participants (start as many times as many participants you'd like to enter) *    java SyncPrimitive bTest localhost 2 */public class SyncPrimitive implements Watcher {  static ZooKeeper zk = null;  static Integer mutex;  String root;  static{  System.setProperty("zookeeper.sasl.client", "false");  }  SyncPrimitive(String address) {  if(zk == null){  try {  System.out.println("Starting ZK:");  zk = new ZooKeeper(address, 3000, this);  mutex = Integer.parseInt("-1");  System.out.println("Finished starting ZK: " + zk);  } catch (IOException e) {  System.out.println(e.toString());  zk = null;  }  }  //else mutex = new Integer(-1);  }  synchronized public void process(WatchedEvent event) {  synchronized (mutex) {  //System.out.println("Process: " + event.getType());  mutex.notify();  }  }  /**  * Barrier(阻塞原语)  *  A barrier is a primitive that enables a group of processes to synchronize the beginning and the end of a computation. The general idea of this implementation is to  *  have a barrier node that serves the purpose of being a parent for individual process nodes. Suppose that we call the barrier node &quot;/b1&quot;. Each process  *  &quot;p&quot; then creates a node &quot;/b1/p&quot;. Once enough processes have created their corresponding nodes, joined processes can start the computation.  *  阻塞是一个原语,它使一组进程能够同时开始计算。此实现的总体思想是拥有一个屏障节点,用于作为单个流程节点的父节点。  *  假设我们将障碍节点称为“/b1”。然后,每个进程“ p”创建一个节点“/b1/p”。一旦有足够多的进程创建了相应的节点,联合进程就可以开始计算了。  *  场景:当有些操作需要所有参与者全部准备好之后才能开始执行,并且对每个参与者来说必须等待所有参与者全部执行完毕,才算执行完毕。于是就需要一个屏障,来控制所有参与者同时开始,并等待所有参与者全部结束。  */  static public class Barrier extends SyncPrimitive {  //需要并行等待的子进程个数  int size;  /**  *  本参与者对应的子节点path  */        String name;  /**  * Barrier constructor         *         * @param address  * @param root  * @param size  */  Barrier(String address, String root, int size) {  super(address);  this.root = root;  this.size = size;  // Create barrier node(障碍节点必须是持久节点 CreateMode.PERSISTENT)  if (zk != null) {  try {  Stat s = zk.exists(root, false);  if (s == null) { // 如果根节点不存在,则创建  /**  *  zk.create(String path, byte[] data, List<ACL> acl, CreateMode createMode)                         *  第1个参数: barrier节点的path  *  第2个参数: barrier节点的data  *  第3个参数: barrier节点的权限  *  第4个参数: barrier 节点的类型,持久节点 CreateMode.PERSISTENT,子节点必须是临时节点。  */  zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,  CreateMode.PERSISTENT);  }  } catch (KeeperException e) {  System.out  .println("Keeper exception when instantiating queue: "  + e.toString());  } catch (InterruptedException e) {  System.out.println("Interrupted exception");  }  }  // My node name  try {  name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString()+ ":"+ RandomStringUtils.randomAlphabetic(4));  } catch (UnknownHostException e) {  System.out.println(e.toString());  }  }  /**  * Join barrier         *         * @return         * @throws KeeperException  * @throws InterruptedException  */  boolean enter() throws KeeperException, InterruptedException{  zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,  CreateMode.EPHEMERAL); // EPHEMERAL 临时节点  while (true) {  synchronized (mutex) {  List<String> list = zk.getChildren(root, true);  if (list.size() < size) { //判断当前根下子节点的数量,若数量小于设定的进程数,则等待。  mutex.wait();  } else {  return true;  }  }  }  }  /**  * Wait until all reach barrier         *         * @return         * @throws KeeperException  * @throws InterruptedException  */  boolean leave() throws KeeperException, InterruptedException{  zk.delete(root + "/" + name, 0); //模拟进程完成任务,删除子节点。  while (true) {  synchronized (mutex) {  List<String> list = zk.getChildren(root, true);  if (list.size() > 0) { //只要还存在子节点,就说明还有任务没有完成。  mutex.wait();  } else {  return true;  }  }  }  }  }  /**  * Producer-Consumer queue     */    static public class Queue extends SyncPrimitive {  /**  * Constructor of producer-consumer queue         *         * @param address  * @param name  */  Queue(String address, String name) {  super(address);  this.root = name;  // Create ZK node name  if (zk != null) {  try {  Stat s = zk.exists(root, false);  if (s == null) {  zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,  CreateMode.PERSISTENT);  }  } catch (KeeperException e) {  System.out  .println("Keeper exception when instantiating queue: "  + e.toString());  } catch (InterruptedException e) {  System.out.println("Interrupted exception");  }  }  }  /**  * Add element to the queue.         *         * @param i  * @return  */  boolean produce(int i) throws KeeperException, InterruptedException{  ByteBuffer b = ByteBuffer.allocate(4);  byte[] value;  // Add child with value i  b.putInt(i);  value = b.array();  zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,  CreateMode.PERSISTENT_SEQUENTIAL);  return true;  }  /**  * Remove first element from the queue.         *         * @return         * @throws KeeperException  * @throws InterruptedException  */  int consume() throws KeeperException, InterruptedException{  int retvalue = -1;  Stat stat = null;  // Get the first element available  while (true) {  synchronized (mutex) {  List<String> list = zk.getChildren(root, true);  if (list.size() == 0) {  System.out.println("Going to wait");  mutex.wait();  } else {  Integer min = Integer.parseInt((list.get(0).substring(7)));  String minNode = list.get(0);  for(String s : list){  Integer tempValue = Integer.parseInt(s.substring(7));  //System.out.println("Temporary value: " + tempValue);  if(tempValue < min) {  min = tempValue;  minNode = s;  }  }  System.out.println("Temporary value: " + root + "/" + minNode);  byte[] b = zk.getData(root + "/" + minNode,  false, stat);  zk.delete(root + "/" + minNode, 0);  ByteBuffer buffer = ByteBuffer.wrap(b);  retvalue = buffer.getInt();  return retvalue;  }  }  }  }  }  public static void main(String args[]) {  if (args[0].equals("qTest"))  queueTest(args);  else  barrierTest(args);  }  public static void queueTest(String args[]) {  Queue q = new Queue(args[1], "/app1");  System.out.println("Input: " + args[1]);  int i;  Integer max = Integer.parseInt(args[2]+"");  if (args[3].equals("p")) {  System.out.println("Producer");  for (i = 0; i < max; i++)  try{  q.produce(10 + i);  } catch (KeeperException e){  } catch (InterruptedException e){  }  } else {  System.out.println("Consumer");  for (i = 0; i < max; i++) {  try{  int r = q.consume();  System.out.println("Item: " + r);  } catch (KeeperException e){  i--;  } catch (InterruptedException e){  }  }  }  }  public static void barrierTest(String args[]) {  Barrier b = new Barrier(args[1], "/b1", Integer.parseInt(args[2]+""));  try{  boolean flag = b.enter();  System.out.println("Entered barrier: " + args[2]);  if(!flag) System.out.println("Error when entering the barrier");  } catch (KeeperException e){  } catch (InterruptedException e){  }  // Generate random integer  Random rand = new Random();  int r = rand.nextInt(100);  // Loop for rand iterations  for (int i = 0; i < r; i++) {  try {  Thread.sleep(100);  } catch (InterruptedException e) {  }  }  try{  b.leave();  } catch (KeeperException e){  } catch (InterruptedException e){  }  System.out.println("Left barrier");  }  
}

1.3 测试步骤

  • 第1步,打包 ZookeeperDemo-0.0.1-SNAPSHOT.jar
<build>  <plugins>  <plugin>  <groupId>org.apache.maven.plugins</groupId>  <artifactId>maven-jar-plugin</artifactId>  <configuration>  <archive>  <manifest>  <addClasspath>true</addClasspath>  <mainClass>com.xx.zookeeperdemo.barriers.SyncPrimitive</mainClass> </manifest>  </archive>  </configuration>  </plugin>  </plugins>  
</build>
  • 第2步,jar包目录下打开命令窗口,并执行 java -jar .\ZookeeperDemo-0.0.1-SNAPSHOT.jar bTest 192.168.206.100:2181 3
    控制台输出:

执行后,查看zookeeper的znode情况:

  • 第3步,复制第2步操作,模拟启动第2个进程
    执行后,查看zookeeper的znode情况:

  • 第4步,复制第2步操作,模拟启动第3个进程
    执行后,第1个控制台输出:

第2个控制台输出:

第3个控制台输出:

然后所有进程在随机的整数时间后输出 Left barrier

查看zookeeper的znode情况: 所有子进程创建的临时子节点都已delete

1.4 结果

能实现多个进程之间的并行协同。

1.5 注意事项

  • 为了方便在同一台IP上模拟不同的进程,在官方提供的代码基础上增加了4位长度的随机字符串。

// 官方示例:
name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());// 新增后的示例
name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString()+ ":"+ RandomStringUtils.randomAlphabetic(4));
  • 关闭SASL安全验证
static{  System.setProperty("zookeeper.sasl.client", "false");  
}

2. 队列

2.1 概念

模拟向同一队列生产/消费消息。

2.2 设计

生产消息: 往znode新增子节点。
消费消息: 往znode中取first子节点,然后删除子节点。

2.3 源码

/**  * Producer-Consumer queue */static public class Queue extends SyncPrimitive {  /**  * Constructor of producer-consumer queue     *     * @param address  * @param name  */  Queue(String address, String name) {  super(address);  this.root = name;  // Create ZK node name  if (zk != null) {  try {  Stat s = zk.exists(root, false);  if (s == null) {  zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,  CreateMode.PERSISTENT);  }  } catch (KeeperException e) {  System.out  .println("Keeper exception when instantiating queue: "  + e.toString());  } catch (InterruptedException e) {  System.out.println("Interrupted exception");  }  }  }  /**  * Add element to the queue.     *     * @param i  * @return  */  boolean produce(int i) throws KeeperException, InterruptedException{  ByteBuffer b = ByteBuffer.allocate(4);  byte[] value;  // Add child with value i  b.putInt(i);  value = b.array();  zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,  CreateMode.PERSISTENT_SEQUENTIAL);  return true;  }  /**  * Remove first element from the queue.     *     * @return     * @throws KeeperException  * @throws InterruptedException  */  int consume() throws KeeperException, InterruptedException{  int retvalue = -1;  Stat stat = null;  // Get the first element available  while (true) {  synchronized (mutex) {  List<String> list = zk.getChildren(root, true);  if (list.size() == 0) {  System.out.println("Going to wait");  mutex.wait();  } else {  Integer min = Integer.parseInt((list.get(0).substring(7)));  String minNode = list.get(0);  for(String s : list){  Integer tempValue = Integer.parseInt(s.substring(7));  //System.out.println("Temporary value: " + tempValue);  if(tempValue < min) {  min = tempValue;  minNode = s;  }  }  System.out.println("Temporary value: " + root + "/" + minNode);  byte[] b = zk.getData(root + "/" + minNode,  false, stat);  zk.delete(root + "/" + minNode, 0);  ByteBuffer buffer = ByteBuffer.wrap(b);  retvalue = buffer.getInt();  return retvalue;  }  }  }  }  
}

2.4 测试

生产消息: java SyncPrimitive qTest 192.168.206.100:2181 100 p
消费消息: java SyncPrimitive qTest 192.168.206.100:2181 100 c

2.5 结论

借助zookeeper实现消息队列的模拟。


http://www.ppmy.cn/server/110680.html

相关文章

jQuery基础——事件

写在前面 参考文献&#xff1a;莫振杰《从0到1&#xff1a;jQuery快速上手》 这期主讲事件。 事件基础 什么是事件&#xff1f; 有动作&#xff08;事件类型&#xff09;&#xff0c;有结果&#xff08;函数&#xff09;。 事件的组成&#xff1f; 事件主体 事件类型 事件过…

C_11_位段,共同体,枚举

位段 位段也称 位域 ​ 1 字节 8 位域 概述&#xff1a; 特殊的结构体 大小按位分配 示例1&#xff1a; struct packed_data {unsigned int a : 2; // 占2 位unsigned int a : 4; // 占4 位unsigned int a : 6; // 占6 位unsigned int i; // 占4字节 32位 1b8位 } data…

VLM 系列——phi3.5-Vision——论文解读

一、概述 1、是什么 论文全称《Phi-3 Technical Report: A Highly Capable Language Model Locally on Your Phone》 是一系列大型语言模型(LLM) & 多模态大型语言模型(MLLM)。其中LLM包括phi-3-mini 3.8B、phi-3-small 7B、phi-3-medium 14B,phi-3-mini可以轻松地在…

通过自定义注解、反射和AOP在Spring Boot中动态修改请求参数

在Spring Boot中&#xff0c;通过自定义注解、反射以及AOP&#xff08;面向切面编程&#xff09;来动态修改请求参数是一种高级且强大的技术组合&#xff0c;它允许开发者在不修改原始方法实现的情况下&#xff0c;对方法的执行过程进行干预和定制。这种技术通常用于日志记录、…

51单片机——模块化编程

1、模块化编程介绍 传统方式编程&#xff1a;所有的函数均放在main.c里&#xff0c;若使用的模块比较多&#xff0c;则一个文件内会有很多的代码&#xff0c;不利于代码的组织和管理&#xff0c;而且很影响编程者的思路。 模块化编程&#xff1a;把各个模块的代码放在不同的.…

【LeetCode】321. 拼接最大数(贪心+单调栈类型题)

在做帆软笔试的时候&#xff0c;最后一道题一直没想出来。 题目&#xff1a;在两个数组中选取 k 个元素&#xff0c;拼接一个最小数&#xff0c;并且要保证来自同一数组的元素顺序不发生改变。 搜索后发现是 LeetCode 321 拼接最大数的变形题&#xff0c;借此题学习一下。 4…

相似图像、相似商品检索的流程具体是什么样的?

&#x1f349; CSDN 叶庭云&#xff1a;https://yetingyun.blog.csdn.net/ 1. 数据收集和预处理&#xff1a; 首先&#xff0c;我们需要构建一个包含丰富图像或商品信息的数据库。针对每个图像或商品&#xff0c;我们需确保以下几点&#xff1a; 高质量的图像数据。相关的元数…

docker实战基础三(Docker基础命令)

Docker 实战案例:构建镜像、查看容器运行信息、查看镜像构建信息 在这个实战案例中,我们将详细介绍如何构建Docker镜像、查看容器运行信息以及查看镜像构建信息。这些知识点非常实用,可以帮助你在实际工作中更好地利用Docker进行开发和运维。 一、构建Docker镜像 1. 创建…