一.redisson配置注意点
redis集群配置,三主三从,以下是redisson的示例配置
Config config = new Config();BaseConfig baseConfig;baseConfig = config.setCodec(JsonJacksonCodec.INSTANCE).useClusterServers().addNodeAddress("redis://172.21.75.166:6379","redis://172.21.75.172:6379","redis://172.21.75.164:6379","redis://172.21.75.148:6379","redis://172.21.75.26:6379","redis://172.21.75.173:6379").setMasterConnectionPoolSize(64) //主节点连接池大小,默认为64.setMasterConnectionMinimumIdleSize(24) //主节点最小空闲连接数,默认24.setSlaveConnectionPoolSize(64) //从节点连接池大小,默认为64.setSlaveConnectionMinimumIdleSize(24) //从节点最小空闲连接数,默认24.setSubscriptionConnectionPoolSize(50) //发布和订阅连接池大小,默认50.setSubscriptionConnectionMinimumIdleSize(1) //发布和订阅连接的最小空闲连接数,默认为1.setReadMode(ReadMode.MASTER)//读取操作的负载均衡模式.setScanInterval(2000);//对主节点变化节点状态扫描的时间间隔,单位毫秒String password = "123232111";baseConfig.setPassword(password);baseConfig.setTimeout(3000)//命令等待超时,单位毫秒.setRetryAttempts(3)//命令失败重试次数.setRetryInterval(1000)//命令重试发送时间间隔,单位毫秒//**此项务必设置.为解决redisson bug(timeout问题)的关键*****.setPingConnectionInterval(1000);//得到redisson对象redisson = (Redisson) Redisson.create(config);
设置只对redis主节点读写,不在从节点读,避免从节点不稳定影响。
主从变化,正常情况redisson连接能感知并根据连接池配置重新配置连接数。
如果发生连接错误,不要只通过reids命令看集群状态,建议去看看redis-server的日志,日志能看到集群的历史变动信息,如某个节点超时、不正常、主从切换信息。
二.锁使用API
trylock
trylock尝试获取锁,加锁成功返回true,失败false。
waitTime尝试时长,waitTime内无法加锁则返回false。如果为0,则是不等待。
leaseTime,锁有效期,有效期过后,其他线程就能加锁。为-1是一直持有,不自动释放。
RLock myLock = redisson.getLock(key);
Boolean flag = myLock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS);
lock
lock方法会阻塞线程,一直等待加锁成功。
lock有个watchdog机制,本身锁的有效期是30s,每10s检测一次任务是否执行完,没执行完会重新设置锁有效期为30s。所以watchdog有两个作用:1.不像trylock设置leasetime,万一任务没执行完就释放锁,造成线程不安全。2.如果线程异常退出,锁在30s后会自动释放,不会造成无法释放的情况。
RLock myLock = redisson.getLock(key);
myLock.lock();
锁重入
redisson的锁支持重入。什么情况下用到重入?
举例场景:封装了方法a,其中加锁了,方法b通过调用a完成业务,方法c也调用a,但方法c在调用a前已经加锁。方法b就没有重入,方法c相当于加锁两次就是重入。
重入在方法封装、方法嵌套调用很常见。为啥要这么用?
举例:方法a加锁用的是trylock,即可能加锁失败,方法b的业务也就可能失败,但如果方法c的业务一定要成功,那方法c就可以提前加锁用 lock方法,再调用方法a时加锁肯定成功,这样保证了方法c和a一定能成功。
锁中断
trylock在等待加锁时,可以被中断,只要当前线程设置为中断状态,trylock就会抛出中断异常。
举例场景:使用ThreadPoolTaskScheduler做定时任务,定时任务中使用trylock,现在把定时任务停止,就把正在执行的线程设置为中断状态(注意:正在执行的任务不会立刻停止、而是正常执行完只是不会开启下一次任务)。 这种情况下正在执行的线程执行到trylock就会抛出中断异常。
很多方法都可以被中断,比如sleep、uture.get、io流,有的方法被中断后会清除线程中断状态,有的则不会。如sleep方法被中断后线程会清除中断状态,Futrue.get也会清除中断状态,trylock则不会清除中断状态。
公平锁
公平锁让先等待锁的线程优先加锁,但原理更复杂,涉及在redis存的数据结构不止一个hash,还有两个队列。这里不讲了。
三.根据业务情况选择api
1.一定要执行成功的业务,可以使用lock,一直等待锁
2.非必须业务,如定时任务,反正会有下一次定时,可以使用trylock,只等待waitTime参数的时间、甚至不等待设置为0。
3.谨慎使用leaseTime参数,设置后watchdog不生效,到了leaseTime后、即使业务没执行结束,锁也会释放,如果此时其他线程加锁,可能造成线程不安全。
4.正常情况下记得解锁。
不解锁的话,过了有效期会自动释放。这里记录一个不解锁的例子,使用ThreadPoolTaskScheduler线程做了定时任务,多实例情况下任务无法按照规定间隔执行,在不采用分布式定时任务框架下,要完成分布式定时目标。这里给定时任务加了锁,锁有效期10s,但不主动解锁,这样保证即使多个实例,在10s内也只有一个实例会执行定时任务。
四.锁数据结构
redisson用的不是setnx这类指令,很明显利用单条指令的原子性,不能实现重入、线程标识等功能。
锁数据结构是redis的hash结构,<锁名称,<线程标识,锁次数>>。
由于redis单线程,执行一段lua脚本也是原子性,redisson执行的是一段lua脚本。以下是截取的代码。
- 先执行exists命令查询锁是否存在,不存在则使用hincrby加锁,然后pexpire指令设置时间
- 如果exists查询锁存在,则使用hexists查询线程标识是否是本线程(ARGV[2]),是则说明是锁重入,继续用hincrby给锁次数+1
- 如果查询锁存在,并且非本线程加锁,说明其他线程占用了锁,则执行pttl查看锁有效期。再往后有等待锁等逻辑
if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +
"return redis.call('pttl', KEYS[1]);"
五.锁拆分
1.不同的业务用不同的锁,这样可以避免锁竞争激烈,如果都用一把锁,则并发时相当于串行工作,无法发挥多线程作用。
2.涉及到map类型结构存储时,可以考虑用redis的hash结构存储,redisson的RMap可以单独对key加锁,大幅度减小锁竞争。
六.死锁
当业务拆分锁,方法嵌套调用,容易出现死锁情况。
举例场景:方法a和b加不同锁,接口1先调用a再调用b,接口2先调用b在调用a,就可能死锁。
1.约定锁顺序,比如必须先a锁再b锁,不能违反约定,调整代码解决。只要锁顺序相同,就不会有死锁。
2.将锁合并,锁粒度粗,死锁概率就小,这样锁竞争会变大。