分布式锁—2.Redisson的可重入锁一

ops/2025/3/5 0:48:02/

大纲

1.Redisson可重入锁RedissonLock概述

2.可重入锁源码之创建RedissonClient实例

3.可重入锁源码之lua脚本加锁逻辑

4.可重入锁源码之WatchDog维持加锁逻辑

5.可重入锁源码之可重入加锁逻辑

6.可重入锁源码之锁的互斥阻塞逻辑

7.可重入锁源码之释放锁逻辑

8.可重入锁源码之获取锁超时与锁超时自动释放逻辑

9.可重入锁源码总结

1.Redisson可重入锁RedissonLock概述

(1)在pom.xml里引入依赖

(2)构建RedissonClient并使用Redisson

(3)Redisson可重入锁RedissonLock简单使用

(1)在pom.xml里引入依赖

<dependencies><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.16.8</version></dependency> 
</dependencies>

(2)构建RedissonClient并使用Redisson

参考官网中文文档,连接上3主3从的Redis Cluster。

//https://github.com/redisson/redisson/wiki/目录
public class Application {public static void main(String[] args) throws Exception {//连接3主3从的Redis CLusterConfig config = new Config();config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001").addNodeAddress("redis://192.168.1.110:7002").addNodeAddress("redis://192.168.1.110:7003").addNodeAddress("redis://192.168.1.111:7001").addNodeAddress("redis://192.168.1.111:7002").addNodeAddress("redis://192.168.1.111:7003");//创建RedissonClient实例RedissonClient redisson = Redisson.create(config);//获取可重入锁RLock lock = redisson.getLock("myLock");lock.lock();lock.unlock();RMap<String, Object> map = redisson.getMap("myMap");map.put("foo", "bar");  map = redisson.getMap("myMap");System.out.println(map.get("foo"));   }
}

(3)Redisson可重入锁RedissonLock简单使用

Redisson可重入锁RLock实现了java.util.concurrent.locks.Lock接口,同时还提供了异步(Async)、响应式(Reactive)和RxJava2标准的接口。

RLock lock = redisson.getLock("myLock");
//最常见的使用方法
lock.lock();

如果设置锁的超时时间不合理,导致超时时间已到时锁还没能主动释放,但实际上锁却被Redis节点通过过期时间释放了,这会有问题。

为了避免这种情况,Redisson内部提供了一个用来监控锁的WatchDog。WatchDog的作用是在Redisson实例被关闭前,不断地延长锁的有效期。

WatchDog检查锁的默认超时时间是30秒,可通过Config.lockWatchdogTimeout来指定。

RLock的tryLock方法提供了leaseTime参数来指定加锁的超时时间,超过这个时间后锁便自动被释放。

//如果没有主动释放锁的话,10秒后将会自动释放锁
lock.lock(10, TimeUnit.SECONDS);//加锁等待最多是100秒;加锁成功后如果没有主动释放锁的话,锁会在10秒后自动释放
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {try {...} finally {lock.unlock();}
}

RLock完全符合Java的Lock规范,即只有拥有锁的进程才能解锁,其他进程解锁则会抛出IllegalMonitorStateException错误。如果需要其他进程也能解锁,那么可以使用分布式信号量Semaphore。

2.可重入锁源码之创建RedissonClient实例

(1)初始化与Redis的连接管理器ConnectionManager

(2)初始化Redis的命令执行器CommandExecutor

使用Redisson.create()方法可以根据配置创建一个RedissonClient实例,因为Redisson类会实现RedissonClient接口,而创建RedissonClient实例的主要工作其实就是:

一.初始化与Redis的连接管理器ConnectionManager

二.初始化Redis的命令执行器CommandExecutor

(1)初始化与Redis的连接管理器ConnectionManager

Redis的配置类Config会被封装在连接管理器ConnectionManager中,后续可以通过连接管理器ConnectionManager获取Redis的配置类Config。

public class Application {public static void main(String[] args) throws Exception {Config config = new Config();config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001");//创建RedissonClient实例RedissonClient redisson = Redisson.create(config);...}
}//创建RedissonClient实例的源码
public class Redisson implements RedissonClient {protected final Config config;//Redis配置类protected final ConnectionManager connectionManager;//Redis的连接管理器protected final CommandAsyncExecutor commandExecutor;//Redis的命令执行器...public static RedissonClient create(Config config) {return new Redisson(config);}protected Redisson(Config config) {this.config = config;Config configCopy = new Config(config);//根据Redis配置类Config实例创建和Redis的连接管理器connectionManager = ConfigSupport.createConnectionManager(configCopy);RedissonObjectBuilder objectBuilder = null;if (config.isReferenceEnabled()) {objectBuilder = new RedissonObjectBuilder(this);}//创建Redis的命令执行器commandExecutor = new CommandSyncService(connectionManager, objectBuilder);evictionScheduler = new EvictionScheduler(commandExecutor);writeBehindService = new WriteBehindService(commandExecutor);}...
}public class ConfigSupport {...//创建Redis的连接管理器public static ConnectionManager createConnectionManager(Config configCopy) {//生成UUIDUUID id = UUID.randomUUID();...if (configCopy.getClusterServersConfig() != null) {validate(configCopy.getClusterServersConfig());//返回ClusterConnectionManager实例return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);}...}...
}public class ClusterConnectionManager extends MasterSlaveConnectionManager {public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {super(config, id);...this.natMapper = cfg.getNatMapper();//将Redis的配置类Config封装在ConnectionManager中this.config = create(cfg);initTimer(this.config);Throwable lastException = null;List<String> failedMasters = new ArrayList<String>();for (String address : cfg.getNodeAddresses()) {RedisURI addr = new RedisURI(address);//异步连接Redis节点CompletionStage<RedisConnection> connectionFuture = connectToNode(cfg, addr, addr.getHost());...//通过connectionFuture阻塞获取建立好的连接RedisConnection connection = connectionFuture.toCompletableFuture().join();...List<ClusterNodeInfo> nodes = connection.sync(clusterNodesCommand);...CompletableFuture<Collection<ClusterPartition>> partitionsFuture = parsePartitions(nodes);Collection<ClusterPartition> partitions = partitionsFuture.join();List<CompletableFuture<Void>> masterFutures = new ArrayList<>();for (ClusterPartition partition : partitions) {if (partition.isMasterFail()) {failedMasters.add(partition.getMasterAddress().toString());continue;}if (partition.getMasterAddress() == null) {throw new IllegalStateException("Master node: " + partition.getNodeId() + " doesn't have address.");}CompletableFuture<Void> masterFuture = addMasterEntry(partition, cfg);masterFutures.add(masterFuture);}CompletableFuture<Void> masterFuture = CompletableFuture.allOf(masterFutures.toArray(new CompletableFuture[0]));masterFuture.join();...}...}...
}public class MasterSlaveConnectionManager implements ConnectionManager {protected final String id;//初始化时为UUIDprivate final Map<RedisURI, RedisConnection> nodeConnections = new ConcurrentHashMap<>();...protected MasterSlaveConnectionManager(Config cfg, UUID id) {this.id = id.toString();//传入的是UUIDthis.cfg = cfg;...}protected final CompletionStage<RedisConnection> connectToNode(NodeType type, BaseConfig<?> cfg, RedisURI addr, String sslHostname) {RedisConnection conn = nodeConnections.get(addr);if (conn != null) {if (!conn.isActive()) {closeNodeConnection(conn);} else {return CompletableFuture.completedFuture(conn);}}//创建Redis客户端连接实例RedisClient client = createClient(type, addr, cfg.getConnectTimeout(), cfg.getTimeout(), sslHostname);//向Redis服务端发起异步连接请求,这个future会层层往外返回CompletionStage<RedisConnection> future = client.connectAsync();return future.thenCompose(connection -> {if (connection.isActive()) {if (!addr.isIP()) {RedisURI address = new RedisURI(addr.getScheme() + "://" + connection.getRedisClient().getAddr().getAddress().getHostAddress() + ":" + connection.getRedisClient().getAddr().getPort());nodeConnections.put(address, connection);}nodeConnections.put(addr, connection);return CompletableFuture.completedFuture(connection);} else {connection.closeAsync();CompletableFuture<RedisConnection> f = new CompletableFuture<>();f.completeExceptionally(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!"));return f;}});}//创建Redis客户端连接实例@Overridepublic RedisClient createClient(NodeType type, RedisURI address, int timeout, int commandTimeout, String sslHostname) {RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout, sslHostname);return RedisClient.create(redisConfig);}...
}//Redisson主要使用Netty去和Redis服务端建立连接
public final class RedisClient {private final Bootstrap bootstrap;private final Bootstrap pubSubBootstrap;...public static RedisClient create(RedisClientConfig config) {return new RedisClient(config);}private RedisClient(RedisClientConfig config) {...bootstrap = createBootstrap(copy, Type.PLAIN);pubSubBootstrap = createBootstrap(copy, Type.PUBSUB);this.commandTimeout = copy.getCommandTimeout();}private Bootstrap createBootstrap(RedisClientConfig config, Type type) {Bootstrap bootstrap = new Bootstrap().resolver(config.getResolverGroup()).channel(config.getSocketChannelClass()).group(config.getGroup());bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());config.getNettyHook().afterBoostrapInitialization(bootstrap);return bootstrap;}//向Redis服务端发起异步连接请求public RFuture<RedisConnection> connectAsync() {CompletableFuture<InetSocketAddress> addrFuture = resolveAddr();CompletableFuture<RedisConnection> f = addrFuture.thenCompose(res -> {CompletableFuture<RedisConnection> r = new CompletableFuture<>();//Netty的Bootstrap发起连接ChannelFuture channelFuture = bootstrap.connect(res);channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(final ChannelFuture future) throws Exception {if (bootstrap.config().group().isShuttingDown()) {IllegalStateException cause = new IllegalStateException("RedisClient is shutdown");r.completeExceptionally(cause);return;}if (future.isSuccess()) {RedisConnection c = RedisConnection.getFrom(future.channel());c.getConnectionPromise().whenComplete((res, e) -> {bootstrap.config().group().execute(new Runnable() {@Overridepublic void run() {if (e == null) {if (!r.complete(c)) {c.closeAsync();}} else {r.completeExceptionally(e);c.closeAsync();}}});});} else {bootstrap.config().group().execute(new Runnable() {public void run() {r.completeExceptionally(future.cause());}});}}});return r;});return new CompletableFutureWrapper<>(f);}...
}

(2)初始化Redis的命令执行器CommandExecutor

首先,CommandSyncService继承自CommandAsyncService类。

而CommandAsyncService类实现了CommandExecutor接口。

然后,ConnectionManager连接管理器会封装在命令执行器CommandExecutor中。

所以,通过CommandExecutor命令执行器可以获取连接管理器ConnectionManager。

//Redis命令的同步执行器CommandSyncService
public class CommandSyncService extends CommandAsyncService implements CommandExecutor {//初始化CommandExecutorpublic CommandSyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder) {super(connectionManager, objectBuilder, RedissonObjectBuilder.ReferenceType.DEFAULT);}public <T, R> R read(String key, RedisCommand<T> command, Object... params) {return read(key, connectionManager.getCodec(), command, params);}public <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object... params) {RFuture<R> res = readAsync(key, codec, command, params);return get(res);}public <T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {return evalRead(key, connectionManager.getCodec(), evalCommandType, script, keys, params);}public <T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {RFuture<R> res = evalReadAsync(key, codec, evalCommandType, script, keys, params);return get(res);}public <T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {return evalWrite(key, connectionManager.getCodec(), evalCommandType, script, keys, params);}public <T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {RFuture<R> res = evalWriteAsync(key, codec, evalCommandType, script, keys, params);return get(res);}
}//Redis命令的异步执行器CommandAsyncService
public class CommandAsyncService implements CommandAsyncExecutor {//Redis连接管理器final ConnectionManager connectionManager;final RedissonObjectBuilder objectBuilder;final RedissonObjectBuilder.ReferenceType referenceType;public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {this.connectionManager = connectionManager;this.objectBuilder = objectBuilder;this.referenceType = referenceType;}@Overridepublic <V> V getNow(CompletableFuture<V> future) {try {return future.getNow(null);} catch (Exception e) {return null;}}@Overridepublic <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object... params) {RFuture<R> res = readAsync(key, codec, command, params);return get(res);}@Overridepublic <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {NodeSource source = getNodeSource(key);return async(true, source, codec, command, params, false, false);}private NodeSource getNodeSource(String key) {int slot = connectionManager.calcSlot(key);return new NodeSource(slot);}public <V, R> RFuture<R> async(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command, Object[] params, boolean ignoreRedirect, boolean noRetry) {CompletableFuture<R> mainPromise = createPromise();RedisExecutor<V, R> executor = new RedisExecutor<>(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder, referenceType, noRetry);executor.execute();return new CompletableFutureWrapper<>(mainPromise);}@Overridepublic <V> V get(RFuture<V> future) {if (Thread.currentThread().getName().startsWith("redisson-netty")) {throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");}try {return future.toCompletableFuture().get();} catch (InterruptedException e) {future.cancel(true);Thread.currentThread().interrupt();throw new RedisException(e);} catch (ExecutionException e) {throw convertException(e);}}...
}

3.可重入锁源码之lua脚本加锁逻辑

(1)通过Redisson.getLock()方法获取一个RedissonLock实例

(2)加锁时的执行流程

(3)加锁时执行的lua脚本

(4)执行加锁lua脚本的命令执行器逻辑

(5)如何根据slot值获取对应的节点

(1)通过Redisson.getLock()方法获取一个RedissonLock实例

在Redisson.getLock()方法中,会传入命令执行器CommandExecutor来创建一个RedissonLock实例,而命令执行器CommandExecutor是在执行Redisson.create()方法时初始化好的,所以命令执行器CommandExecutor会被封装在RedissonLock实例中。

因此,通过RedissonLock实例可以获取一个命令执行器CommandExecutor,通过命令执行器CommandExecutor可获取连接管理器ConnectionManager,通过连接管理器ConnectionManager可获取Redis的配置信息类Config,通过Redis的配置信息类Config可以获取各种配置信息。

RedissonLock类继承自实现了RLock接口的RedissonBaseLock类。在RedissonLock的构造方法里面,有个internalLockLeaseTime变量,这个internalLockLeaseTime变量与WatchDog看门狗有关系。interlnalLockLeaseTime的默认值是30000毫秒,即30秒;

public class Application {public static void main(String[] args) throws Exception {Config config = new Config();config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001");//创建RedissonClient实例RedissonClient redisson = Redisson.create(config);//获取可重入锁RLock lock = redisson.getLock("myLock");lock.lock();...}
}//创建Redisson实例
public class Redisson implements RedissonClient {protected final Config config;//Redis配置类protected final ConnectionManager connectionManager;//Redis的连接管理器protected final CommandAsyncExecutor commandExecutor;//Redis的命令执行器...public static RedissonClient create(Config config) {return new Redisson(config);}protected Redisson(Config config) {...//根据Redis配置类Config实例创建和Redis的连接管理器connectionManager = ConfigSupport.createConnectionManager(configCopy);//创建Redis的命令执行器commandExecutor = new CommandSyncService(connectionManager, objectBuilder);...}...@Overridepublic RLock getLock(String name) {return new RedissonLock(commandExecutor, name);}...
}//创建RedissonLock实例
//通过RedissonLock实例可以获取一个命令执行器CommandExecutor;
public class RedissonLock extends RedissonBaseLock {protected long internalLockLeaseTime;protected final LockPubSub pubSub;final CommandAsyncExecutor commandExecutor;public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor = commandExecutor;//与WatchDog有关的internalLockLeaseTime//通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager//通过连接管理器ConnectionManager可以获取Redis的配置信息类Config//通过Redis的配置信息类Config可以获取lockWatchdogTimeout超时时间this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();}...
}//创建Redis的命令执行器
//通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager
public class CommandAsyncService implements CommandAsyncExecutor {final ConnectionManager connectionManager;...public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {this.connectionManager = connectionManager;this.objectBuilder = objectBuilder;this.referenceType = referenceType;}@Overridepublic ConnectionManager getConnectionManager() {return connectionManager;}...
}//创建Redis的连接管理器
//通过连接管理器ConnectionManager可以获取Redis的配置信息类Config
public class ClusterConnectionManager extends MasterSlaveConnectionManager {...public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {super(config, id);...}...
}//创建Redis的连接管理器
//通过连接管理器ConnectionManager可以获取Redis的配置信息类Config
public class MasterSlaveConnectionManager implements ConnectionManager {private final Config cfg;protected final String id;//初始化时为UUID...protected MasterSlaveConnectionManager(Config cfg, UUID id) {this.id = id.toString();//传入的是UUIDthis.cfg = cfg;...}@Overridepublic Config getCfg() {return cfg;}...
}//配置信息类Config中的lockWatchdogTimeout变量初始化为30秒,该变量与WatchDog有关
public class Config {private long lockWatchdogTimeout = 30 * 1000;...//This parameter is only used if lock has been acquired without leaseTimeout parameter definition. //Lock expires after "lockWatchdogTimeout" if watchdog didn't extend it to next "lockWatchdogTimeout" time interval.//This prevents against infinity locked locks due to Redisson client crush or any other reason when lock can't be released in proper way.//Default is 30000 millisecondspublic Config setLockWatchdogTimeout(long lockWatchdogTimeout) {this.lockWatchdogTimeout = lockWatchdogTimeout;return this;}public long getLockWatchdogTimeout() {return lockWatchdogTimeout;}
}

默认情况下,调用RedissonLock.lock()方法加锁时,传入的leaseTime为-1。此时锁的超时时间会设为lockWatchdogTimeout默认的30秒,从而避免出现死锁的情况。

public class RedissonLock extends RedissonBaseLock {...//加锁@Overridepublic void lock() {try {lock(-1, null, false);} catch (InterruptedException e) {throw new IllegalStateException();}}private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(-1, leaseTime, unit, threadId);...}//解锁@Overridepublic void unlock() {try {get(unlockAsync(Thread.currentThread().getId()));} catch (RedisException e) {if (e.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException) e.getCause();} else {throw e;}}}...
}

(2)加锁时的执行流程

首先会调用RedissonLock的tryAcquire()方法处理异步RFuture相关,然后调用RedissonLock的tryAcquireAsync()方法对执行脚本的结果进行处理,接着调用RedissonLock.tryLockInnerAsync方法执行加锁的lua脚本。

public class RedissonLock extends RedissonBaseLock {protected long internalLockLeaseTime;protected final LockPubSub pubSub;final CommandAsyncExecutor commandExecutor;public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor = commandExecutor;//与WatchDog有关的internalLockLeaseTime//通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager//通过连接管理器ConnectionManager可以获取Redis的配置信息类Config//通过Redis的配置信息类Config可以获取lockWatchdogTimeout超时时间this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();}...//加锁@Overridepublic void lock() {try {lock(-1, null, false);} catch (InterruptedException e) {throw new IllegalStateException();}}private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {//线程ID,用来生成设置Hash的值long threadId = Thread.currentThread().getId();//尝试加锁,此时执行RedissonLock.lock()方法默认传入的leaseTime=-1Long ttl = tryAcquire(-1, leaseTime, unit, threadId);//ttl为null说明加锁成功if (ttl == null) {return;}//加锁失败时的处理CompletableFuture<RedissonLockEntry> future = subscribe(threadId);if (interruptibly) {commandExecutor.syncSubscriptionInterrupted(future);} else {commandExecutor.syncSubscription(future);}try {while (true) {ttl = tryAcquire(-1, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {break;}// waiting for messageif (ttl >= 0) {try {commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e;}commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {if (interruptibly) {commandExecutor.getNow(future).getLatch().acquire();} else {commandExecutor.getNow(future).getLatch().acquireUninterruptibly();}}}} finally {unsubscribe(commandExecutor.getNow(future), threadId);}}...private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {//默认下waitTime和leaseTime都是-1,下面调用的get方法是来自于RedissonObject的get()方法//可以理解为异步转同步:将异步的tryAcquireAsync通过get转同步return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));}private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;if (leaseTime != -1) {ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {//默认情况下,由于leaseTime=-1,所以会使用初始化RedissonLock实例时的internalLockLeaseTime//internalLockLeaseTime的默认值就是lockWatchdogTimeout的默认值,30秒ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {//加锁返回的ttlRemaining为null表示加锁成功if (ttlRemaining == null) {if (leaseTime != -1) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);}//默认情况下,外部传入的leaseTime=-1时,会取lockWatchdogTimeout的默认值=30秒<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()),//锁的名字:KEYS[1]unit.toMillis(leaseTime),//过期时间:ARGV[1],默认时为30秒getLockName(threadId)//ARGV[2],值为UUID + 线程ID);}...
}public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {final String id;final String entryName;final CommandAsyncExecutor commandExecutor;public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor = commandExecutor;this.id = commandExecutor.getConnectionManager().getId();this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();this.entryName = id + ":" + name;}protected String getLockName(long threadId) {return id + ":" + threadId;}...
}abstract class RedissonExpirable extends RedissonObject implements RExpirable {...RedissonExpirable(CommandAsyncExecutor connectionManager, String name) {super(connectionManager, name);}...
}public abstract class RedissonObject implements RObject {protected final CommandAsyncExecutor commandExecutor;protected String name;protected final Codec codec;public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {this.codec = codec;this.commandExecutor = commandExecutor;if (name == null) {throw new NullPointerException("name can't be null");}setName(name);}...protected final <V> V get(RFuture<V> future) {//下面会调用CommandAsyncService.get()方法return commandExecutor.get(future);}...
}public class CommandAsyncService implements CommandAsyncExecutor {...@Overridepublic <V> V get(RFuture<V> future) {if (Thread.currentThread().getName().startsWith("redisson-netty")) {throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");}try {return future.toCompletableFuture().get();} catch (InterruptedException e) {future.cancel(true);Thread.currentThread().interrupt();throw new RedisException(e);} catch (ExecutionException e) {throw convertException(e);}}...
}

(3)加锁时执行的lua脚本

public class RedissonLock extends RedissonBaseLock {//默认情况下,外部传入的leaseTime=-1时,会取lockWatchdogTimeout的默认值=30秒<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()),//锁的名字:KEYS[1],比如"myLock"unit.toMillis(leaseTime),//过期时间:ARGV[1],默认时为30秒getLockName(threadId)//ARGV[2],值为UUID + 线程ID);}...
}

首先执行Redis的exists命令,判断key为锁名的Hash值是否不存在,也就是判断key为锁名myLock的Hash值是否存在。

一.如果key为锁名的Hash值不存在,那么就进行如下加锁处理

首先通过Redis的hset命令设置一个key为锁名的Hash值。该Hash值的key为锁名,value是一个映射。也就是在value值中会有一个field为UUID + 线程ID,value为1的映射。比如:hset myLock UUID:ThreadID 1,lua脚本中的ARGV[2]就是由UUID + 线程ID组成的唯一值。

然后通过Redis的pexpire命令设置key为锁名的Hash值的过期时间,也就是设置key为锁名的Hash值的过期时间为30秒。比如:pexpire myLock 30000。所以默认情况下,myLock这个锁在30秒后就会自动过期。

二.如果key为锁名的Hash值存在,那么就执行如下判断处理

首先通过Redis的hexists命令判断在key为锁名的Hash值里,field为UUID + 线程ID的映射是否已经存在。

如果在key为锁名的Hash值里,field为UUID + 线程ID的映射存在,那么就通过Redis的hincrby命令,对field为UUID + 线程ID的value值进行递增1。比如:hincrby myLock UUID:ThreadID 1。也就是在key为myLock的Hash值里,把field为UUID:ThreadID的value值从1累加到2,发生这种情况的时候往往就是当前线程对锁进行了重入。接着执行:pexpire myLock 30000,再次将myLock的有效期设置为30秒。

如果在key为锁名的Hash值里,field为UUID + 线程ID的映射不存在,发生这种情况的时候往往就是其他线程获取不到这把锁而产生互斥。那么就通过Redis的pttl命令,返回key为锁名的Hash值的剩余存活时间,因为不同线程的ARGV[2]是不一样的,ARGV[2] = UUID + 线程ID。

(4)执行加锁lua脚本的命令执行器逻辑

在RedissonLock的tryLockInnerAsync()方法中,会通过RedissonBaseLock的evalWriteAsync()方法执行lua脚本,即通过CommandAsyncService的evalWriteAsync()方法执行lua脚本。

在CommandAsyncService的evalWriteAsync()方法中,首先会执行CommandAsyncService的getNodeSource()方法获取对应的节点。然后执行CommandAsyncService的evalAsync()方法来执行lua脚本。

在CommandAsyncService的getNodeSource()方法中,会根据key进行CRC16运算,然后再对16384取模,计算出key的slot值。然后根据这个slot值创建一个NodeSource实例进行返回。

在CommandAsyncService的evalAsync()方法中,会将获得的NodeSource实例封装到Redis执行器RedisExecutor里。然后执行RedisExecutor,实现将脚本请求发送给对应的Redis节点处理。

public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {//从外部传入的:在创建实现了RedissonClient的Redisson实例时,初始化的命令执行器CommandExecutorfinal CommandAsyncExecutor commandExecutor;public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor = commandExecutor;this.id = commandExecutor.getConnectionManager().getId();this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();this.entryName = id + ":" + name;}...protected <T> RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {//获取可用的节点,并继续封装一个命令执行器CommandBatchServiceMasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(getRawName());int availableSlaves = entry.getAvailableSlaves();CommandBatchService executorService = createCommandBatchService(availableSlaves);//通过CommandAsyncService.evalWriteAsync方法执行lua脚本RFuture<T> result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);if (commandExecutor instanceof CommandBatchService) {return result;}//异步执行然后获取结果RFuture<BatchResult<?>> future = executorService.executeAsync();CompletionStage<T> f = future.handle((res, ex) -> {if (ex == null && res.getSyncedSlaves() != availableSlaves) {throw new CompletionException(new IllegalStateException("Only " + res.getSyncedSlaves() + " of " + availableSlaves + " slaves were synced"));}return result.getNow();});return new CompletableFutureWrapper<>(f);}private CommandBatchService createCommandBatchService(int availableSlaves) {if (commandExecutor instanceof CommandBatchService) {return (CommandBatchService) commandExecutor;}BatchOptions options = BatchOptions.defaults().syncSlaves(availableSlaves, 1, TimeUnit.SECONDS);return new CommandBatchService(commandExecutor, options);}...
}public class CommandBatchService extends CommandAsyncService {...public CommandBatchService(CommandAsyncExecutor executor, BatchOptions options) {this(executor.getConnectionManager(), options, executor.getObjectBuilder(), RedissonObjectBuilder.ReferenceType.DEFAULT);}private CommandBatchService(ConnectionManager connectionManager, BatchOptions options, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {super(connectionManager, objectBuilder, referenceType);this.options = options;}...
}public class CommandAsyncService implements CommandAsyncExecutor {final ConnectionManager connectionManager;final RedissonObjectBuilder objectBuilder;final RedissonObjectBuilder.ReferenceType referenceType;public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {this.connectionManager = connectionManager;this.objectBuilder = objectBuilder;this.referenceType = referenceType;}...@Overridepublic <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {//获取key对应的节点NodeSource source = getNodeSource(key);//让对应的节点执行lua脚本请求return evalAsync(source, false, codec, evalCommandType, script, keys, false, params);}//获取key对应的Redis Cluster节点private NodeSource getNodeSource(String key) {//先计算key对应的slot值int slot = connectionManager.calcSlot(key);//返回节点实例return new NodeSource(slot);}//执行lua脚本private <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, boolean noRetry, Object... params) {if (isEvalCacheActive() && evalCommandType.getName().equals("EVAL")) {CompletableFuture<R> mainPromise = new CompletableFuture<>();Object[] pps = copy(params);CompletableFuture<R> promise = new CompletableFuture<>();String sha1 = calcSHA(script);RedisCommand cmd = new RedisCommand(evalCommandType, "EVALSHA");List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);args.add(sha1);args.add(keys.size());args.addAll(keys);args.addAll(Arrays.asList(params));//将根据key进行CRC16运算然后对16384取模获取到的NodeSource实例,封装到Redis执行器RedisExecutor中RedisExecutor<T, R> executor = new RedisExecutor<>(readOnlyMode, nodeSource, codec, cmd, args.toArray(), promise, false, connectionManager, objectBuilder, referenceType, noRetry);//通过执行Redis执行器RedisExecutor,来实现将lua脚本请求发送给对应的Redis节点进行处理executor.execute();...}...}...
}public class ClusterConnectionManager extends MasterSlaveConnectionManager {public static final int MAX_SLOT = 16384;//Redis Cluster默认有16384个slot...//对key进行CRC16运算,然后再对16384取模@Overridepublic int calcSlot(String key) {if (key == null) {return 0;}int start = key.indexOf('{');if (start != -1) {int end = key.indexOf('}');if (end != -1 && start + 1 < end) {key = key.substring(start + 1, end);}}int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;return result;}...
}

(5)如何根据slot值获取对应的节点

因为最后会执行封装了NodeSource实例的RedisExecutor的excute()方法,而NodeSource实例中又会封装了锁名key对应的slot值,所以RedisExecutor的excute()方法可以通过getConnection()方法获取对应节点的连接。

其中RedisExecutor的getConnection()方法会调用到MasterSlaveConnectionManager的connectionWriteOp()方法,该方法又会通过调用ConnectionManager的getEntry()方法根据slot值获取节点,也就是由ClusterConnectionManager的getEntry()方法去获取Redis的主节点。

其实在初始化连接管理器ClusterConnectionManager时,就已经根据配置初始化好哪些slot映射到那个Redis主节点了。

public class RedisExecutor<V, R> {NodeSource source;...public void execute() {...//异步获取建立好的Redis连接CompletableFuture<RedisConnection> connectionFuture = getConnection().toCompletableFuture();...}protected CompletableFuture<RedisConnection> getConnection() {...connectionFuture = connectionManager.connectionWriteOp(source, command);return connectionFuture;}...
}public class MasterSlaveConnectionManager implements ConnectionManager {...@Overridepublic CompletableFuture<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command) {MasterSlaveEntry entry = getEntry(source);...}private MasterSlaveEntry getEntry(NodeSource source) {if (source.getRedirect() != null) {return getEntry(source.getAddr());}MasterSlaveEntry entry = source.getEntry();if (source.getRedisClient() != null) {entry = getEntry(source.getRedisClient());}if (entry == null && source.getSlot() != null) {//根据slot获取Redis的主节点entry = getEntry(source.getSlot());}return entry;}...
}public class ClusterConnectionManager extends MasterSlaveConnectionManager {//slot和Redis主节点的原子映射数组private final AtomicReferenceArray<MasterSlaveEntry> slot2entry = new AtomicReferenceArray<>(MAX_SLOT);//Redis客户端连接和Redis主节点的映射关系private final Map<RedisClient, MasterSlaveEntry> client2entry = new ConcurrentHashMap<>();...@Overridepublic MasterSlaveEntry getEntry(int slot) {//根据slot获取Redis的主节点return slot2entry.get(slot);}...//初始化连接管理器ClusterConnectionManager时//就已经根据配置初始化好那些slot映射到那个Redis主节点了public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {...for (String address : cfg.getNodeAddresses()) {...CompletableFuture<Collection<ClusterPartition>> partitionsFuture = parsePartitions(nodes);Collection<ClusterPartition> partitions = partitionsFuture.join();List<CompletableFuture<Void>> masterFutures = new ArrayList<>();for (ClusterPartition partition : partitions) {...CompletableFuture<Void> masterFuture = addMasterEntry(partition, cfg);masterFutures.add(masterFuture);}...}...}private CompletableFuture<Void> addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) {...CompletionStage<RedisConnection> connectionFuture = connectToNode(cfg, partition.getMasterAddress(), configEndpointHostName);connectionFuture.whenComplete((connection, ex1) -> {//成功连接时的处理if (ex1 != null) {log.error("Can't connect to master: {} with slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());result.completeExceptionally(ex1);return;}MasterSlaveServersConfig config = create(cfg);config.setMasterAddress(partition.getMasterAddress().toString());//创建Redis的主节点MasterSlaveEntry entry;if (config.checkSkipSlavesInit()) {entry = new SingleEntry(ClusterConnectionManager.this, config);} else {Set<String> slaveAddresses = partition.getSlaveAddresses().stream().map(r -> r.toString()).collect(Collectors.toSet());config.setSlaveAddresses(slaveAddresses);entry = new MasterSlaveEntry(ClusterConnectionManager.this, config);}CompletableFuture<RedisClient> f = entry.setupMasterEntry(new RedisURI(config.getMasterAddress()), configEndpointHostName);f.whenComplete((masterClient, ex3) -> {if (ex3 != null) {log.error("Can't add master: " + partition.getMasterAddress() + " for slot ranges: " + partition.getSlotRanges(), ex3);result.completeExceptionally(ex3);return;}//为创建的Redis的主节点添加slot值for (Integer slot : partition.getSlots()) {addEntry(slot, entry);lastPartitions.put(slot, partition);}...});});...}//添加slot到对应节点的映射关系private void addEntry(Integer slot, MasterSlaveEntry entry) {MasterSlaveEntry oldEntry = slot2entry.getAndSet(slot, entry);if (oldEntry != entry) {entry.incReference();shutdownEntry(oldEntry);}client2entry.put(entry.getClient(), entry);}...
}


http://www.ppmy.cn/ops/163168.html

相关文章

本地部署 Traefik 的完整教程

Traefik 是一款现代化的反向代理和负载均衡工具,专为云原生环境设计。它支持自动服务发现、动态配置更新以及多种后端(如 Docker、Kubernetes、Consul 等)。本教程将指导你如何在本地部署 Traefik,并配置其作为反向代理和负载均衡器。 1. 准备工作 在开始之前,请确保你的…

常用的设计模式

设计模式是软件开发过程中针对反复出现的问题所总结归纳出的通用解决方案。以下为你介绍常见的设计模式&#xff0c;并结合常用框架给出相应示例。 创建型模式 创建型模式主要用于对象的创建过程&#xff0c;封装了对象创建的细节&#xff0c;提高了代码的灵活性和可维护性。…

PostgreSQL 创建表格

PostgreSQL 创建表格 在数据库管理中&#xff0c;表格&#xff08;Table&#xff09;是数据存储的基础。PostgreSQL作为一款强大的开源对象关系型数据库管理系统&#xff08;ORDBMS&#xff09;&#xff0c;创建表格是其最基本的功能之一。本文将详细讲解如何在PostgreSQL中创…

WebSocket相关技术

WebSocket 是一种网络通信协议&#xff0c;旨在通过单一的持久连接提供全双工、低延迟的通信。它与传统的 HTTP 协议不同&#xff0c;能够让客户端和服务器之间进行实时双向通信&#xff0c;而无需每次通信都重新建立连接。 WebSocket 的特点&#xff1a; 1. 全双工通信&#…

关于Hadoop集群部署打不开webUI界面问题

1.检查进程是否全部启动 在启动start-dfs.sh,start-yarn.sh等相关命令后&#xff0c;使用 jps 命令检查进程是否全部启动&#xff0c;比如&#xff1a; 确认进程全部启动后&#xff0c;看第2步 2.检查防火墙是否关闭&#xff0c;比如&#xff1a; 查看防火墙状态&#xff1…

扫描纸质文件转pdf---少页数+手机+电脑协作

针对手机上扫描软件扫描文件转pdf要收费的问题&#xff0c;提供一种在页数较少时的免费替代方案 。 实现方法&#xff1a;手机软件的免费功能将文件扫描并保存为图片电脑端在word中将图片拼成文档word转pdf 1.借助于“扫描全能王”APP可以免费扫描文件为图片的功能&#xff0…

Ubuntu 下 nginx-1.24.0 源码分析 - ngx_init_cycle 函数 - 详解(8)

详解&#xff08;8&#xff09; 初始化模块配置上下文&#xff08;conf_ctx&#xff09; cycle->conf_ctx ngx_pcalloc(pool, ngx_max_module * sizeof(void *));if (cycle->conf_ctx NULL) {ngx_destroy_pool(pool);return NULL;}1 分配模块配置上下文数组 cycle->…

在kali linux中kafka的配置和使用

官方文档 一、安装依赖 删除原有的jdk sudo apt remove --purge openjdk-\* sudo apt clean安装 Java (JDK 11) sudo apt install openjdk-11-jdk -y # 验证安装 java -version二、下载并解压 Kafka 下载 Kafka wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.t…