基于Consul的分布式锁实现

news/2024/12/22 15:13:42/

我们在构建分布式系统的时候,经常需要控制对共享资源的互斥访问。这个时候我们就涉及到分布式锁(也称为全局锁)的实现,基于目前的各种工具,我们已经有了大量的实现方式,比如:基于Redis的实现、基于Zookeeper的实现。本文将介绍一种基于Consul 的Key/Value存储来实现分布式锁以及信号量的方法。

分布式锁实现

基于Consul的分布式锁主要利用Key/Value存储API中的acquire和release操作来实现。acquire和release操作是类似Check-And-Set的操作:

- acquire操作只有当锁不存在持有者时才会返回true,并且set设置的Value值,同时执行操作的session会持有对该Key的锁,否则就返回false

- release操作则是使用指定的session来释放某个Key的锁,如果指定的session无效,那么会返回false,否则就会set设置Value值,并返回true

具体实现中主要使用了这几个Key/Value的API:

- create session:https://www.consul.io/api/session.html#session_create

- delete session:https://www.consul.io/api/session.html#delete-session

- KV acquire/release:https://www.consul.io/api/kv.html#create-update-key

基本流程

具体实现

public class Lock {private static final String prefix = "lock/";  // 同步锁参数前缀private ConsulClient consulClient;private String sessionName;private String sessionId = null;private String lockKey;/**** @param consulClient* @param sessionName   同步锁的session名称* @param lockKey       同步锁在consul的KV存储中的Key路径,会自动增加prefix前缀,方便归类查询*/public Lock(ConsulClient consulClient, String sessionName, String lockKey) {this.consulClient = consulClient;this.sessionName = sessionName;this.lockKey = prefix + lockKey;}/*** 获取同步锁** @param block     是否阻塞,直到获取到锁为止* @return*/public Boolean lock(boolean block) {if (sessionId != null) {throw new RuntimeException(sessionId + " - Already locked!");}sessionId = createSession(sessionName);while(true) {PutParams putParams = new PutParams();putParams.setAcquireSession(sessionId);if(consulClient.setKVValue(lockKey, "lock:" + LocalDateTime.now(), putParams).getValue()) {return true;} else if(block) {continue;} else {return false;}}}/*** 释放同步锁** @return*/public Boolean unlock() {PutParams putParams = new PutParams();putParams.setReleaseSession(sessionId);boolean result = consulClient.setKVValue(lockKey, "unlock:" + LocalDateTime.now(), putParams).getValue();consulClient.sessionDestroy(sessionId, null);return result;}/*** 创建session* @param sessionName* @return*/private String createSession(String sessionName) {NewSession newSession = new NewSession();newSession.setName(sessionName);return consulClient.sessionCreate(newSession, null).getValue();}
}

复制

单元测试

下面单元测试的逻辑:通过线程的方式来模拟不同的分布式服务来竞争锁。多个处理线程同时以阻塞方式来申请分布式锁,当处理线程获得锁之后,Sleep一段随机事件,以模拟处理业务逻辑,处理完毕之后释放锁。

public class TestLock {private Logger logger = Logger.getLogger(getClass());@Testpublic void testLock() throws Exception  {new Thread(new LockRunner(1)).start();new Thread(new LockRunner(2)).start();new Thread(new LockRunner(3)).start();new Thread(new LockRunner(4)).start();new Thread(new LockRunner(5)).start();Thread.sleep(200000L);}class LockRunner implements Runnable {private Logger logger = Logger.getLogger(getClass());private int flag;public LockRunner(int flag) {this.flag = flag;}@Overridepublic void run() {Lock lock = new Lock(new ConsulClient(), "lock-session", "lock-key");try {if (lock.lock(true)) {logger.info("Thread " + flag + " start!");Thread.sleep(new Random().nextInt(3000L));logger.info("Thread " + flag + " end!");}} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}} 
}

复制

单元测试执行结果如下:

2017-04-12 21:28:09,698 INFO  [Thread-0] LockRunner - Thread 1 start!
2017-04-12 21:28:12,717 INFO  [Thread-0] LockRunner - Thread 1 end!
2017-04-12 21:28:13,219 INFO  [Thread-2] LockRunner - Thread 3 start!
2017-04-12 21:28:15,672 INFO  [Thread-2] LockRunner - Thread 3 end!
2017-04-12 21:28:15,735 INFO  [Thread-1] LockRunner - Thread 2 start!
2017-04-12 21:28:17,788 INFO  [Thread-1] LockRunner - Thread 2 end!
2017-04-12 21:28:18,249 INFO  [Thread-4] LockRunner - Thread 5 start!
2017-04-12 21:28:19,573 INFO  [Thread-4] LockRunner - Thread 5 end!
2017-04-12 21:28:19,757 INFO  [Thread-3] LockRunner - Thread 4 start!
2017-04-12 21:28:21,353 INFO  [Thread-3] LockRunner - Thread 4 end!

复制

从测试结果我们可以看到,通过分布式锁的形式来控制并发时,多个同步操作只会有一个操作能够被执行,其他操作只有在等锁释放之后才有机会去执行,所以通过这样的分布式锁,我们可以控制共享资源同时只能被一个操作进行执行,以保障数据处理时的分布式并发问题。

 


http://www.ppmy.cn/news/598451.html

相关文章

lol服务器维护8月7,LOL8月7日更新维护到几点 英雄联盟8.7更新了哪些内容

LOL8月7日维护到什么时候?今天将迎来一次很大的更新,全服停机,很多小伙伴已经等不及调查时间了,要知道不能玩自己心爱的游戏简直就是度日如年啊。 英雄联盟在8.7迎来一个全服停机公告,这让很多热心玩家都迫不急的上游戏…

lol11号服务器维护到几点,lol2月11日维护到几点 2月11日lol维护更新公告

lol将要周二进行一次更新,好多小伙伴们都在问这次的更新时间,会维护到几点呢?有兴趣的小伙伴们就赶紧来和小编一起看看lol维护更新公告介绍吧! lol2月11日维护到几点 2月11日更新维护时长为9个小时(3:-12:00),本次更新…

LOL弗雷尔卓德服务器维护,英雄联盟4月28日维护到几点 部分大区模式维护公告...

英雄联盟4月28日维护到几点?英雄联盟维护内容是什么?英雄联盟今天维护到什么时候?想必有不少的小伙伴们都想知道吧,下面是英雄联盟部分大区模式维护公告,感兴趣的小伙伴们一起来看看吧。 英雄联盟4月28日维护到几点? 不管是哪一款游戏,游…

lol服务器维护9月30,英雄联盟4月30日更新维护几点结束_4月30日LOL10.9版本停机维护结束时间_3DM网游...

英雄联盟4月30日更新维护几点结束?英雄联盟在2020年4月30日停机更新至10.9版本,本次版本更新后FPX冠军皮肤和相关的活动,而且云顶之弈模式有大规模平衡性更新,大家等待更新结束后即可体验,但具体什么时候结束呢?下面小编为大家带…

10月15日lol服务器维护,lol10月15日维护到几点 英雄联盟2020年10月15日10.21版本维护结束时间...

lol10月15日维护到几点?英雄联盟在2020年10月15日停机维护更新至10.21版本,本次维护后上线了很多新皮肤,还有一些英雄改动等内容。想知道本次维护结束时间的玩家,下面99单机网小编带来了lol10.21版本维护结束时间,一起来看看吧。…

英雄联盟7月23日维修服务器,lol维护到几点今天 英雄联盟7月23日停机维护多长时间...

英雄联盟维护到几点今天?英雄联盟2020最新维护公告是什么?lol最新维护内容是什么?在英雄联盟这一游戏中,游戏的官方每隔一段时间都将会进行一次维护更新。想必有不少的小伙伴们都想知道吧,下面是英雄联盟2020最新维护公告,感兴趣的小伙伴们…

英雄联盟7月23日维修服务器,英雄联盟7月23日更新维护到几点结束 7月23日lol10.15版本更新维护结束时间...

英雄联盟7月23日更新维护到几点结束?英雄联盟在2020年7月23日停机更新10.15版本内容,在lol10.15版本中不仅有新英雄莉莉娅和新系列皮肤灵魂莲华推出,极限闪击模式也回归了。想体验新版本的内容吗,那么就需要等待更新结束,下面小编…

lol服务器维护8月6日,lol8月6日维护到几点 英雄联盟2020年8月6日10.16版本维护结束时间...

lol8月6日维护到几点?英雄联盟在2020年8月6日停机更新10.16版本,因此全服进行维护,在lol10.16版本中带来了新英雄永恩、新皮肤灵魂莲华五款,还有大量的英雄改动,待更新维护结束后即可体验。不知道几点结束的玩家,下面…