springboot集成zookeeper的增删改查、节点监听、分布式读写锁、分布式计数器
ZooKeeper 是一个开源的分布式协调服务,由 Apache 软件基金会维护。它主要用于解决分布式系统中的一致性问题,提供高可用性和高性能的分布式数据管理服务。ZooKeeper 的设计目标是简化分布式应用的开发,帮助开发者处理分布式环境中的复杂问题,如配置管理、命名服务、分布式同步和组服务等。
ZooKeeper 的核心概念
1、ZNode
-
ZooKeeper 的数据模型类似于文件系统,数据存储在节点(ZNode)中。
-
ZNode 可以存储数据,并且可以有子节点,形成一个树状结构。
-
ZNode 分为持久节点(Persistent)和临时节点(Ephemeral),临时节点在客户端会话结束后会自动删除。
2、Watcher
-
Watcher 是 ZooKeeper 的一种通知机制,客户端可以在 ZNode 上设置 Watcher,当 ZNode 发生变化时,ZooKeeper 会通知客户端。
-
Watcher 是一次性的,触发后需要重新设置。
3、Session
-
客户端与 ZooKeeper 服务器之间的连接称为会话(Session)。
-
会话有超时时间,如果客户端在超时时间内没有与服务器通信,会话将失效。
4、集群(Ensemble)
-
ZooKeeper 通常以集群形式部署,称为 Ensemble。
-
集群中的服务器通过 Zab 协议(ZooKeeper Atomic Broadcast)保持数据一致性。
ZooKeeper 的主要功能
1、配置管理
ZooKeeper 可以用于集中管理分布式系统的配置信息,所有节点共享同一份配置数据。
2、命名服务
ZooKeeper 可以用于实现分布式系统中的命名服务,帮助客户端查找资源。
3、分布式锁
ZooKeeper 提供了分布式锁的实现,用于控制多个节点对共享资源的访问。
4、领导者选举
ZooKeeper 可以用于实现分布式系统中的领导者选举,确保系统中只有一个节点负责协调任务。
5、分布式队列
ZooKeeper 提供了分布式队列的实现,用于在多个节点之间共享任务。
ZooKeeper 的架构
ZooKeeper 的架构通常包括以下几个组件:
1、客户端(Client)
客户端通过 ZooKeeper 提供的 API 与服务器进行交互。
2、服务器(Server)
ZooKeeper 服务器负责处理客户端的请求,维护数据的一致性。
服务器分为 Leader 和 Follower,Leader 负责处理写请求,Follower 负责处理读请求。
3、Zab 协议
Zab 协议是 ZooKeeper 的核心协议,用于保证数据的一致性和顺序性。
使用 ZooKeeper 的优势
1、高可用性:ZooKeeper 以集群形式部署,具有高可用性。
2、一致性:ZooKeeper 通过 Zab 协议保证数据的一致性。
3、简单易用:ZooKeeper 提供了简单的 API,易于集成到分布式系统中。
Curator Recipes 是 Apache Curator 框架中的一组高级 API,旨在简化分布式系统中常见模式的实现。Apache Curator 是一个用于 Apache ZooKeeper 的客户端库,而 ZooKeeper 是一个分布式协调服务,广泛用于分布式系统中的配置管理、领导者选举、分布式锁等场景。
Curator Recipes 提供了一系列现成的解决方案,帮助开发者快速实现复杂的分布式系统模式,包括:
1、分布式锁(Distributed Lock)
-
提供可重入锁和不可重入锁的实现。
-
适用于需要跨多个节点同步资源的场景。
2、领导者选举(Leader Election)
-
实现分布式系统中的领导者选举。
-
适用于需要单一节点负责协调任务的场景。
3、分布式屏障(Barrier)
-
实现分布式系统中的屏障同步。
-
适用于需要多个节点同时开始或结束任务的场景。
4、分布式计数器(Distributed Counter)
-
提供分布式环境下的计数器功能。
-
适用于需要跨节点共享计数器的场景。
5、分布式队列(Distributed Queue)
-
实现分布式环境下的队列功能。
-
适用于需要跨节点共享任务的场景。
6、分布式缓存(Distributed Cache)
-
提供分布式环境下的缓存功能。
-
适用于需要跨节点共享数据的场景。
7、分布式信号量(Distributed Semaphore)
-
实现分布式环境下的信号量功能。
-
适用于需要控制资源访问的场景。
8、节点监听(Node Watcher)
-
提供对 ZooKeeper 节点的监听功能。
-
适用于需要实时监控节点变化的场景。
使用 Curator Recipes 的优势
1、简化开发:Curator Recipes 提供了高级 API,开发者无需从头实现复杂的分布式模式。
2、可靠性:Curator 封装了 ZooKeeper 的复杂性,提供了更稳定和可靠的实现。
3、灵活性:支持多种分布式模式,适用于不同的应用场景。
zk 常用的功能
添加依赖
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>1.8</java.version><zookeeper.version>3.4.8</zookeeper.version><curator.version>2.11.1</curator.version></properties><dependencies><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>${zookeeper.version}</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>${curator.version}</version></dependency></dependencies>
ZK 工具类
/*** zookeeper客户端*/
@Data
@Slf4j
public class ZkClient {private final Logger logger = LoggerFactory.getLogger(this.getClass());private CuratorFramework client;public TreeCache cache;private ZookeeperProperties zookeeperProperties;public ZkClient(ZookeeperProperties zookeeperProperties){this.zookeeperProperties = zookeeperProperties;}/*** 初始化zookeeper客户端*/public void init() {try{RetryPolicy retryPolicy = new ExponentialBackoffRetry(zookeeperProperties.getBaseSleepTimeMs(),zookeeperProperties.getMaxRetries());Builder builder = CuratorFrameworkFactory.builder().connectString(zookeeperProperties.getServer()).retryPolicy(retryPolicy).sessionTimeoutMs( zookeeperProperties.getSessionTimeoutMs()).connectionTimeoutMs( zookeeperProperties.getConnectionTimeoutMs()).namespace( zookeeperProperties.getNamespace());if(StringUtils.isNotEmpty( zookeeperProperties.getDigest())){builder.authorization("digest", zookeeperProperties.getDigest().getBytes("UTF-8"));builder.aclProvider(new ACLProvider() {@Overridepublic List<ACL> getDefaultAcl() {return ZooDefs.Ids.CREATOR_ALL_ACL;}@Overridepublic List<ACL> getAclForPath(final String path) {return ZooDefs.Ids.CREATOR_ALL_ACL;}});}client = builder.build();client.start();initLocalCache("/test");// addConnectionStateListener();client.getConnectionStateListenable().addListener(new ConnectionStateListener() {public void stateChanged(CuratorFramework client, ConnectionState state) {if (state == ConnectionState.LOST) {//连接丢失logger.info("lost session with zookeeper");} else if (state == ConnectionState.CONNECTED) {//连接新建logger.info("connected with zookeeper");} else if (state == ConnectionState.RECONNECTED) {logger.info("reconnected with zookeeper");}}});}catch(Exception e){e.printStackTrace();}}/*** 初始化本地缓存* @param watchRootPath* @throws Exception*/private void initLocalCache(String watchRootPath) throws Exception {cache = new TreeCache(client, watchRootPath);TreeCacheListener listener = (client1, event) ->{log.info("event:" + event.getType() +" |path:" + (null != event.getData() ? event.getData().getPath() : null));if(event.getData()!=null && event.getData().getData()!=null){log.info("发生变化的节点内容为:" + new String(event.getData().getData()));}// client1.getData().};cache.getListenable().addListener(listener);cache.start();}public void stop() {client.close();}public CuratorFramework getClient() {return client;}/*** 创建节点* @param mode 节点类型* 1、PERSISTENT 持久化目录节点,存储的数据不会丢失。* 2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失* 3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除*4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。* @param path 节点名称* @param nodeData 节点数据*/public void createNode(CreateMode mode, String path , String nodeData) {try {//使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点client.create().creatingParentsIfNeeded().withMode(mode).forPath(path,nodeData.getBytes("UTF-8"));} catch (Exception e) {logger.error("注册出错", e);}}/*** 创建节点* @param mode 节点类型* 1、PERSISTENT 持久化目录节点,存储的数据不会丢失。* 2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失* 3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除* 4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。* @param path 节点名称*/public void createNode(CreateMode mode,String path ) {try {//使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);} catch (Exception e) {logger.error("注册出错", e);}}/*** 删除节点数据** @param path*/public void deleteNode(final String path) {try {deleteNode(path,true);} catch (Exception ex) {log.error("{}",ex);}}/*** 删除节点数据* @param path* @param deleteChildre 是否删除子节点*/public void deleteNode(final String path,Boolean deleteChildre){try {if(deleteChildre){//guaranteed()删除一个节点,强制保证删除,// 只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);}else{client.delete().guaranteed().forPath(path);}} catch (Exception e) {e.printStackTrace();}}/*** 设置指定节点的数据* @param path* @param datas*/public void setNodeData(String path, byte[] datas){try {client.setData().forPath(path, datas);}catch (Exception ex) {log.error("{}",ex);}}/*** 获取指定节点的数据* @param path* @return*/public byte[] getNodeData(String path){Byte[] bytes = null;try {if(cache != null){ChildData data = cache.getCurrentData(path);if(data != null){return data.getData();}}client.getData().forPath(path);return client.getData().forPath(path);}catch (Exception ex) {log.error("{}",ex);}return null;}/*** 获取数据时先同步* @param path* @return*/public byte[] synNodeData(String path){client.sync();return getNodeData( path);}/*** 判断路径是否存在** @param path* @return*/public boolean isExistNode(final String path) {client.sync();try {return null != client.checkExists().forPath(path);} catch (Exception ex) {return false;}}/*** 获取节点的子节点* @param path* @return*/public List<String> getChildren(String path) {List<String> childrenList = new ArrayList<>();try {childrenList = client.getChildren().forPath(path);} catch (Exception e) {logger.error("获取子节点出错", e);}return childrenList;}/*** 随机读取一个path子路径, "/"为根节点对应该namespace* 先从cache中读取,如果没有,再从zookeeper中查询* @param path* @return* @throws Exception*/public String getRandomData(String path) {try{Map<String,ChildData> cacheMap = cache.getCurrentChildren(path);if(cacheMap != null && cacheMap.size() > 0) {logger.debug("get random value from cache,path="+path);Collection<ChildData> values = cacheMap.values();List<ChildData> list = new ArrayList<>(values);Random rand = new Random();byte[] b = list.get(rand.nextInt(list.size())).getData();return new String(b,"utf-8");}if(isExistNode(path)) {logger.debug("path [{}] is not exists,return null",path);return null;} else {logger.debug("read random from zookeeper,path="+path);List<String> list = client.getChildren().forPath(path);if(list == null || list.size() == 0) {logger.debug("path [{}] has no children return null",path);return null;}Random rand = new Random();String child = list.get(rand.nextInt(list.size()));path = path + "/" + child;byte[] b = client.getData().forPath(path);String value = new String(b,"utf-8");return value;}}catch(Exception e){log.error("{}",e);}return null;}/*** 可重入共享锁 -- Shared Reentrant Lock* @param lockPath* @param time* @param dealWork 获取* @return*/public Object getSRLock(String lockPath,long time, SRLockDealCallback<?> dealWork){InterProcessMutex lock = new InterProcessMutex(client, lockPath);try {if (!lock.acquire(time, TimeUnit.SECONDS)) {log.error("get lock fail:{}", " could not acquire the lock");return null;}log.debug("{} get the lock",lockPath);Object b = dealWork.deal();return b;}catch(Exception e){log.error("{}", e);}finally{try {lock.release();} catch (Exception e) {//log.error("{}",e);}}return null;}/*** 获取读写锁* @param path* @return*/public InterProcessReadWriteLock getReadWriteLock(String path){InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, path);return readWriteLock;}/*** 在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理*/ExecutorService pool = Executors.newFixedThreadPool(2);/*** 监听数据节点的变化情况* @param watchPath* @param listener*/public void watchPath(String watchPath,TreeCacheListener listener){// NodeCache nodeCache = new NodeCache(client, watchPath, false);TreeCache cache = new TreeCache(client, watchPath);cache.getListenable().addListener(listener,pool);try {cache.start();} catch (Exception e) {e.printStackTrace();}}
}
配置文件
zookeeper.enabled: true
#zookeeper.server: 47.106.106.53:9036,47.106.106.53:9037,47.106.106.53:9038
zookeeper.server: 10.10.2.137:2181,10.10.2.138:2181,10.10.2.139:2181
zookeeper.namespace: demo
zookeeper.digest: rt:rt #zkCli.sh acl 命令 addauth digest mpush
zookeeper.sessionTimeoutMs: 1000 #会话超时时间,单位为毫秒,默认60000ms,连接断开后,其它客户端还能请到临时节点的时间
zookeeper.connectionTimeoutMs: 6000 #连接创建超时时间,单位为毫秒
zookeeper.maxRetries: 3 #最大重试次数
zookeeper.baseSleepTimeMs: 1000 #初始sleep时间 ,毫秒
controller
@Api(tags="zookeeper基本操作")
@RequestMapping("/zk")
@RestController
@Slf4j
public class ZookeeperController {@Autowiredprivate ZkClient zkClient;@Autowiredprivate ZkClient zkClientTest;/*** 创建节点* @param type* @param znode* @return*/@ApiOperation(value = "创建节点",notes = "在命名空间下创建节点")@ApiImplicitParams({@ApiImplicitParam(name ="type",value = "节点类型:<br> 0 持久化节点<br> 1 临时节点<br> 2 持久顺序节点<br> 3 临时顺序节点",allowableValues = "0,1,2,3",defaultValue="3",paramType = "path",required = true,dataType = "Long"),@ApiImplicitParam(name ="znode",value = "节点名称",paramType = "path",required = true,dataType = "String"),@ApiImplicitParam(name ="nodeData",value = "节点数据",paramType = "body",dataType = "String")})@RequestMapping(value = "/create/{type}/{znode}",method=RequestMethod.POST)private String create(@PathVariable Integer type,@PathVariable String znode,@RequestBody String nodeData){znode = "/" + znode;try {zkClient.createNode(CreateMode.fromFlag(type),znode,nodeData);} catch (KeeperException e) {e.printStackTrace();}return znode;}/*** 设置节点数据* @param znode* @return*/@ApiOperation(value = "设置节点数据",notes = "设置节点数据")@ApiImplicitParams({@ApiImplicitParam(name ="znode",value = "节点名称",paramType = "body",required = true,dataType = "String"),@ApiImplicitParam(name ="nodeData",value = "节点数据",paramType = "query",required = true,dataType = "String")})@RequestMapping(value = "/update",method=RequestMethod.POST)public String update(@RequestBody String znode,@RequestParam String nodeData){znode = "/" + znode;zkClient.setNodeData(znode,nodeData.getBytes());return "sucess";}@ApiOperation(value = "删除节点",notes = "删除节点")@ApiImplicitParams({@ApiImplicitParam(name ="znode",value = "节点名称",paramType = "query",required = true,dataType = "String")})@RequestMapping(value = "/delete",method=RequestMethod.GET)public String delete(@RequestParam String znode){znode = "/" + znode;zkClient.deleteNode(znode);return "success";}@ApiOperation(value = "查找节点的内容",notes = "查找节点的内容")@ApiImplicitParams({@ApiImplicitParam(name ="znode",value = "节点名称",paramType = "body",required = true,dataType = "String")})@RequestMapping(value = "/find",method=RequestMethod.POST)public String find(@RequestBody String znode){znode = "/" + znode;byte[] b = zkClient.getNodeData(znode);return new String(b);}/*** 给节点添加读写锁* @param znode* @return*/@ApiOperation(value = "添加读写锁",notes = "写锁跟读锁互斥,读锁跟读锁共享")@ApiImplicitParams({@ApiImplicitParam(name ="lockType",value = "锁类型:<br> 0 写锁<br> 1 读锁",allowableValues = "0,1",defaultValue="0",paramType = "query",required = true,dataType = "Long"),@ApiImplicitParam(name ="znode",value = "节点名称",paramType = "query",required = true,dataType = "String")})@RequestMapping(value = "/writeLock",method=RequestMethod.GET)public String readLock(@RequestParam Integer lockType,@RequestParam String znode){znode = "/" + znode;InterProcessReadWriteLock readWriteLock = zkClient.getReadWriteLock(znode);InterProcessMutex writeLock = readWriteLock.writeLock();InterProcessMutex readLock = readWriteLock.readLock();Runnable writeRunnable = ()->{try {System.out.println("------write lock-----------");writeLock.acquire();System.out.println("write acquire");Thread.sleep(10_000);System.out.println("write release");writeLock.release();} catch (Exception e) {e.printStackTrace();}};Runnable readRunnable = ()->{try {System.out.println("-------read lock----------");readLock.acquire();System.out.println("read acquire");Thread.sleep(20_000);System.out.println("read release");readLock.release();} catch (Exception e) {e.printStackTrace();}};if(lockType == 0 ){new Thread(writeRunnable).start();}else if(lockType == 1){new Thread(readRunnable).start();}return "success";}/*** 监听节点* @param znode* @return*/@ApiOperation(value = "监听节点",notes = "监控整个树上的所有节点")@ApiImplicitParams(@ApiImplicitParam(name ="znode",value = "节点名称",paramType = "body",required = true,dataType = "String"))@RequestMapping(value="/watchPath",method=RequestMethod.POST)public String watchPath(@RequestBody String znode){znode = "/" + znode;zkClient.watchPath(znode,(client1, event) ->{log.info("event:" + event.getType() +" |path:" + (null != event.getData() ? event.getData().getPath() : null));if(event.getData()!=null && event.getData().getData()!=null){log.info("发生变化的节点内容为:" + new String(event.getData().getData()));}});return "success";}/*** 测试计算器* 并发越高耗时越长* 要自己实现获取锁失败重试* @return*/@ApiOperation(value = "模拟分布式计数器",notes = "模拟分布式计数器")@RequestMapping(value="/counter",method=RequestMethod.POST)public String counter(@RequestBody String znode){SharedCount baseCount = new SharedCount(zkClientTest.getClient(), znode, 0);try {baseCount.start();//生成线程池ExecutorService executor = Executors.newCachedThreadPool();Consumer<SharedCount> consumer = (SharedCount count) -> {try {List<Callable<Boolean>> callList = new ArrayList<>();Callable<Boolean> call = () -> {boolean result = false;try {Long time = System.currentTimeMillis();while(!result){VersionedValue<Integer> oldVersion = baseCount.getVersionedValue();int newCnt = oldVersion.getValue() + 1;result = baseCount.trySetCount(oldVersion, newCnt);if(System.currentTimeMillis()-time>10_000||result){break;}TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100)+1);}} catch (Exception e) {}return result;};//5个线程for (int i = 0; i < 100; i++) {callList.add(call);}List<Future<Boolean>> futures = executor.invokeAll(callList);} catch (Exception e) {}};//测试分布式int类型的计数器consumer.accept(baseCount);System.out.println("final cnt : " + baseCount.getCount());} catch (Exception e) {e.printStackTrace();}return "success:"+baseCount.getCount();}/*** DistributedAtomicLong计数器可以自己设置重试的次数与间隔* 并发越高耗时越长* 要自己实现获取锁失败重试*/@ApiOperation(value = "模拟分布式计数器2",notes = "模拟分布式计数器2")@RequestMapping(value="/counter2",method=RequestMethod.POST)public String distributedCount(@RequestBody String znode) throws Exception {DistributedAtomicLong distributedAtomicLong = new DistributedAtomicLong(zkClientTest.getClient(), znode, new RetryNTimes(10, 30));//生成线程池ExecutorService executor = Executors.newCachedThreadPool();Consumer<DistributedAtomicLong> consumer = (DistributedAtomicLong count) -> {try {List<Callable<Boolean>> callList = new ArrayList<>();Callable<Boolean> call = () -> {boolean result = false;try {AtomicValue<Long> val = count.increment();System.out.println("old cnt: "+val.preValue()+" new cnt : "+ val.postValue()+" result:"+val.succeeded());result = val.succeeded();} catch (Exception e) {} finally {}return result;};//5个线程for (int i = 0; i < 500; i++) {callList.add(call);}List<Future<Boolean>> futures = executor.invokeAll(callList);} catch (Exception e) {}};consumer.accept(distributedAtomicLong);return "success:"+distributedAtomicLong.get().postValue();}/**** @return* @throws KeeperException*/@ApiOperation(value = "模拟服务注册和随机获取服务",notes = "模拟服务注册和随机获取服务")@RequestMapping(value="/serviceRegistry",method=RequestMethod.POST)public String serviceRegistry() throws KeeperException {//服务注册zkClient.createNode(CreateMode.fromFlag(1),"/test/hello_service1","http://1270.0.1:8001/");zkClient.createNode(CreateMode.fromFlag(1),"/test/hello_service2","http://1270.0.1:8002/");zkClient.createNode(CreateMode.fromFlag(1),"/test/hello_service3","http://1270.0.1:8003/");return zkClient.getRandomData("/test");}
}