项目应用
应用1
redis分布式锁实现两个操作的原子性
需求:实现一人一单业务逻辑时(如果能走到这个逻辑,代表库存是充足的),我们需要
-
先查询订单
-
如果订单不存在即没有买过则创建订单
这两个步骤我们要保证是原子性的。在高并发情况下,如果多个线程都查询订单,发现订单都不存在,那么就会出现一人多单的问题。
@Transactional
public Result createVoucherOrder(Long voucherId) {// TODO 5.一人一单Long userId = UserHolder.getUser().getId();// TODO 5.1.查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// TODO 5.2.判断是否买过if (count > 0) {return Result.fail("用户已经购买过一次了");}// TODO 6.扣减库存boolean success = seckillVoucherService.update().setSql("stock=stock-1").eq("voucher_id", voucherId).gt("stock", 0).update();if (!success) {return Result.fail("库存不足");}// TODO 7.创建订单VoucherOrder voucherOrder = new VoucherOrder();long orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);save(voucherOrder);// TODO 8.返回订单idreturn Result.ok(orderId);
}
解决方案2:用lua脚本来保证多个步骤的原子性。
local voucherId = argv[1]
local userId = argv[2]
local stockKey = 'seckill:stock:'..voucherId
local orderKey = 'seckill:order:'..voucherId
-- 查询库存量
if(tonumber(redis.call('get',stockKey))<=0) thenreturn 1
end
-- 一人一单
if( redis.call('sismember',orderKey,userId)==1) thenreturn 2
end
-- 扣减库存、订单key中新增用户id
redis.call('incrby',stockKey,-1)
redis.call('sadd',orderKey,userId)
当redis执行这一段lua脚本时,是单线程的,保证了原子性。可以看到lua脚本里保证了查询缓存+操作缓存这两个步骤的原子性。
应用2
redis分布式锁实现互斥执行
缓存雪崩:大量缓存数据在同一时间过期,或者 Redis 故障宕机时,如果此时有大量的用户请求,都无法在 Redis 中处理,于是全部请求都直接访问数据库,从而导致数据库的压力骤增。
缓存击穿:如果缓存中的某个热点数据过期了,此时大量的请求访问了该热点数据,就无法从缓存中读取,直接访问数据库,数据库很容易就被高并发的请求冲垮。
以上两种情况都存在相似的问题:
-
大量请求查询缓存发现失效
-
接着查询数据库
-
将数据写入缓存
我们希望查询数据库是互斥执行的,因此我们使用redis分布式锁,保证只有一个请求查询数据库,而其他请求未获取到锁的,循环重试。但在获取锁之前要判断缓存是否已经有数据了,如果已经有数据了则不用再查询数据库了。
但是仔细分析,缓存雪崩是大量缓存失效,而缓存击穿仅仅是一个缓存失效。如果用redis分布式锁解决缓存雪崩的,那速度应该会很慢吧。
应用3
redis分布式锁防止同一任务重复执行
需求:一个被@Scheduled标识的定时任务,在每天的凌晨2点都会执行一次。如果在集群的环境下,比如两台机器都部署了当前的项目,那么就会出现这个定时任务在同一个时间点被执行了两次。
解决方法:采用redis分布锁,如果一台机器抢到了分布式锁,那么就执行定时任务,如果没抢到,不作处理。抢到分布式锁的机器执行完定时任务后,我们【不应该直接释放锁】,因为可能出现服务器时间不一致的问题,可能你释放锁之后,另外一个服务器到了自己该执行这个定时任务的时间后抢了锁,然后执行定时任务,这不是就又出现了重复执行的问题了吗?因此我们可以设置锁自动释放时间,比如我们设置两小时后再释放锁,服务器误差不可能超过两小时吧?总之思路就是【采用自动释放锁】。
在CacheService类(一个专门用来装着操作redis的各种方法)中定义加锁逻辑:
/*** 加锁*/
public boolean tryLock(String name,long expire){String key = name + "_lock";String token = UUID.randomUUID().toString();Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(key, token, expire, TimeUnit.MINUTES);return BooleanUtils.isTrue(success);
}
@Override
@Scheduled(cron = "0 */1 * * * ?")//每一分钟执行一次
public void refreshToList() {boolean success = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);//锁自动释放时间// 如果抢到锁,则执行定时任务;如果未抢到锁,不作处理if(success){// TODO 1.获取zset中的所有keySet<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// TODO 2.根据key查出满足执行时间在【0-当前时间】的任务for (String futureKey : futureKeys) {Set<String> task = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
if(!task.isEmpty()){// TODO 3.将任务同步到list中,并把任务在zset中删除String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1];
cacheService.refreshWithPipeline(futureKey,topicKey,task);log.info("成功的将"+futureKey+"任务同步到"+topicKey+new Date(System.currentTimeMillis()));}}}
}