Zookeeper学习笔记
- 四、客户端命令
- 4.1、新增节点
- 4.2、查询节点信息
- 4.3、节点类型
- 4.4、更新节点
- 4.5、删除节点
- 4.6、监听器
- 五、SpringBOOT整合Zookeeper
- 六、写数据流程
- 6.1、写流程之写入请求直接发送给Leader节点
- 6.2、写流程之写入请求发送给follower节点
- 七、服务器动态上下线监听
- 7.1、案例
- 八、分布式锁
- 8.1、分析
- 8.1、案例
- 九、其他
- 9.1、生产集群安装多少k合适
四、客户端命令
在bin目录下 集群客户端启动命令
./zkCli.sh -server 192.168.3.34:2181
4.1、新增节点
命令格式:
# -s 为有序节点, -e 为临时节点
create [-s] [-e] path data
创建持久化节点并写入数据:
# 创建 hadoop节点,内容为123456
create /hadoop "123456"
# 读取节点内容
get /hadoop
创建持久化有序节点。此时创建出来的节点名称为:指定的节点名+自增序号:
#创建出来的节点名称为:指定的节点名+自增序号:# 此时创建出来的节点名称为/a0000000001
create -s /a "a"# 再创建/b时,节点名称为/b0000000002
create -s /b "b"
创建临时节点:
create -e /tmp "tmp"# 创建完之后,通过get /tmp可以查到
get /tmp# 使用quit退出当前会话
quit# 重新打开zkCli,get /tmp 找不到该节点
get /tmp
创建临时有序节点,可用于分布式锁:
# 创建的临时节点:/t0000000004
create -s -e /t "tt"
4.2、查询节点信息
节点数据信息
ls -s /
节点属性说明:
- cZxid:数据节点创建时的事务ID
- ctime:数据节点创建时的时间
- mZxid:数据节点最后一次更新时的事务ID
- pZxid:数据节点最后一次更新时的时间
- cversion:子节点的更改次数
- dataVersion:节点数据的更改次数
- aclVersion:节点的ACL的更改次数
- ephemeralOwner:如果节点是临时节点,则表示创建该节点的会话sessionID。如果节点是持久节点,则该属性值为0
- dataLength:数据内容的长度
numChildren:数据节点当前的子节点个数
4.3、节点类型
持久(Persistent):客户端和服务器端断开连接后,创建的节点不删除
短暂(Ephemeral):客户端和服务器端断开连接后,创建的节点自己删除
- 持久化目录节点
客户端与Zookeeper断开连接后,该节点依旧存在 - 持久化顺序编号目录节点
客户端与Zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号由父节点维护 - 临时目录节点
客户端与Zookeeper断开连接后,该节点被删除 - 临时顺序编号目录节点
客户端与Zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号。让天下漫有难学的技
4.4、更新节点
set /hadoop "1234"
4.5、删除节点
delete /hadoop
4.6、监听器
1、监听原理详解
- 首先要有一个main()线程
- 在main线程中创建Zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connet) ,一个负责监听(listener) 。
- 通过connect线程将注册的监听事件发送给Zookeeper。
- 在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中。
- Zookeeper监听到有数据或路径变化,就会将这个消息发送给listener线程。
- listener线程内部调用了process()方法。
监听节点数据的变化
get path [watch]
# 注册一次,只能监听一次。想再次监听,需要再次注册。
监听子节点增减的变化
ls path [watch]
五、SpringBOOT整合Zookeeper
SpringBOOT整合Zookeeper
六、写数据流程
6.1、写流程之写入请求直接发送给Leader节点
- 客户端想服务器leader写入数据
- leader向slave1写入
- slave1写完向leader发送ack表示自己写入完成
- 只要超过半数,就可以应答了 leader向客户端发送ack
- leader向slave2写入
- slave2写完向leader发送ack表示自己写入完成
6.2、写流程之写入请求发送给follower节点
- 客户端想服务器slave1写入数据
- slave1没有写入权限、将数据转发给leader
- leader写入后再向slave1写入
- slave1写完向leader发送ack表示自己写入完成
- 只要超过半数,就可以应答了 leader向客户端发送ack
- leader向slave2写入
- slave2写完向leader发送ack表示自己写入完成
七、服务器动态上下线监听
7.1、案例
先在客户端创建一个结点
create /servers "servers"
Server端 (创建3个 在 server.regist(“zk1”)改成不同的名字zk1、zk2、zk3)
import org.apache.zookeeper.*;
import java.io.IOException;public class DistributeServer01 {//,分割不能有空格 在程序里写connectString不可使用ip,必须使用主机名private String connectString = "master:2181,slave1:2181,slave2:2181";//超时时间private int sessionTimeout = 2000;//zk客户端private ZooKeeper zkClient;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {DistributeServer01 server=new DistributeServer01();//1.获取zk连接server.getConnect();//2.注册服务器到zk集群server.regist("zk1");//3.启动业务逻辑(睡觉)server.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}private void regist(String hostname) throws InterruptedException, KeeperException {String s = zkClient.create("/servers/", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(hostname+"已经上线");}private void getConnect() throws IOException {zkClient= new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}});}
}
Client端
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;public class DistributeClient {//,分割不能有空格 在程序里写connectString不可使用ip,必须使用主机名private String connectString = "master:2181,slave1:2181,slave2:2181";//超时时间private int sessionTimeout = 2000;//zk客户端private ZooKeeper zkClient;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {DistributeClient client=new DistributeClient();//1.获取zk连接client.getConnect();//2.监听/servers下面子节点的增加和删除client.getServerList();//3.业务逻辑(睡觉)client.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}private void getServerList() throws InterruptedException, KeeperException {List<String> children = zkClient.getChildren("/servers", true);ArrayList<String> servers = new ArrayList<>();for (String s:children){try {byte[] data = zkClient.getData("/servers/" + s, false, null);servers.add(new String(data, "utf-8"));}catch (Exception e){}}System.out.println(servers);}private void getConnect() throws IOException {zkClient= new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {getServerList();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}});}
}
逐步启动DistributeServer01 、DistributeServer02、DistributeServer013 启动一个看一下DistributeClient 的控制台
八、分布式锁
8.1、分析
- 接收到请求后,在/locks节点下创建一个临时顺序节点
- 判断自己是不是当前节点下最小的节点:是,获取到锁;不是,对前一个节点进行监听
- 获取到锁,处理完业务后,delete节点释放锁,然后下面的节点将收到通知,重复第二步判断
8.1、案例
DistributeLock
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class DistributeLock {//,分割不能有空格 在程序里写connectString不可使用ip,必须使用主机名private String connectString = "master:2181,slave1:2181,slave2:2181";//超时时间private int sessionTimeout = 2000;//zk客户端private ZooKeeper zkClient;private CountDownLatch countDownLatch=new CountDownLatch(1);private CountDownLatch waitDownLatch=new CountDownLatch(1);//前一个结点的路径private String waitPath;private String currentMode;public DistributeLock() throws IOException, InterruptedException, KeeperException {//获取连接zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {// connectLatch如果连接上zk可以释放if (watchedEvent.getState() == Event.KeeperState.SyncConnected){countDownLatch.countDown();}// waitLatch需要释放if (watchedEvent.getType()==Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){waitDownLatch.countDown();}}});//等待zk连接成功countDownLatch.await();//判断根节点locks是否存在Stat exists = zkClient.exists("/locks", false);if(exists==null){//创建根节点zkClient.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}}//对zk加锁public void zkLock(){//创建临时带序号结点try {currentMode = zkClient.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);//判断创建的结点是否是最小的结点 是获取锁,如果不是监听前一个结点List<String> children = zkClient.getChildren("/locks", false);//如果children只有一个值,那就直接获取锁;如果有多个节点,需要判断,谁最小if (children.size() == 1){return;}else{Collections.sort(children);//获取节点名称seq-00000000String thisNode = currentMode.substring( "/locks/".length());//通过seq-00000000获取该节点在children集合的位置int index = children.indexOf(thisNode) ;//判断if (index == -1 ){System.out.println("数据异常");}else if(index == 0){//就一个节点,可以获取锁了return;}else {waitPath="/locks/"+children.get(index-1);zkClient.getData(waitPath,true,null);//等待监听waitDownLatch.await();return;}}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}//对zk解锁public void UnzkLock(){//删除结点try {zkClient.delete(currentMode,-1);} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}
}
DistributeLockTest
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
public class DistributeLockTest {public static void main(String[] args) throws IOException, InterruptedException, KeeperException {final DistributeLock distributeLock1 = new DistributeLock();final DistributeLock distributeLock2 = new DistributeLock();new Thread(()->{try {distributeLock1.zkLock();System.out.println("aaa线程获取锁");Thread.sleep(5000);distributeLock1.UnzkLock();System.out.println("aaa线程释放锁");} catch (InterruptedException e) {e.printStackTrace();}},"aaa").start();new Thread(()->{try {distributeLock2.zkLock();System.out.println("bbb线程获取锁");Thread.sleep(5000);distributeLock2.UnzkLock();System.out.println("bbb线程释放锁");} catch (InterruptedException e) {e.printStackTrace();}},"bbb").start();}
}
九、其他
9.1、生产集群安装多少k合适
安装奇数台
生产经验:
- 10台服务器:3台zk;
- 20台服务器:5台zk;
- 100台服务器:11台zk
- 200台服务器:11台zk
服务器台数多:好处,提高可靠性;坏处:提高通信延时