十三. Redis 应用问题和解决方案思想
文章目录
1. 缓存穿透
问题描述-如下图:
上图图解:
缓存穿透:当一个用户(黑客)访问一个不存在的 id 的数据信息,一开始走缓存(比如 Redis 数据缓存),查找不到该数据,就去后台的 DB 数据库当中(I / O )查找,还是查找不到,因为该数据本身就是不存在的。这个黑客就是这样不停不断的访问这样一个不存在的数据内容,就一直没有,因为 Redis缓存数据库当中没有该数据,就会去后端的DB数据库进行 I/O 上的查询。
这个黑客故意这样,查找一个不存在的用户信息,让其不走缓存(某种意义上就是穿透了我们的缓存),而是直接访问我们的后端 DB 数据库,一旦重复的请求并发量达到了一定的级别量度,就将我们后端的DB 数据库给压垮了。从而造成了,系统的崩坏。
缓存穿透的原因:
- key 对应的数据在数据源并不存在,每次针对该key 的请求,从缓存当中获取不到,请求就会去找后端的DB,这样请求都会压到 DB 数据源里面,可能压垮我们的数据源。
- 比如:用一个不存在的用户 id 获取用户信息,无论缓存还是数据库都没有该数据,若黑客利用此漏洞进行攻击可能压垮数据库。
- 也就是说:如果从存储层查不到该数据,则不会写入缓存,这将导致这个不存在的数据每次请求都要走到存储层去查询,这样就失去了缓存的意义了。因为我们的缓存的作用就是为了减少请求去访问我们后端的DB数据库。
缓存穿透的现象/表象:
- 应用服务器压力变大(因为我们无论是后端DB数据库,还是缓存 Redis 数据库,都无法即使返回这样不存在的数据信息,当大量请求访问这样一个不存在的数据,缓存数据库和DB数据库无法做到一个及时返回信息,就会造成大量的请求压至在应用服务器当中,给应用服务器造成非常大的压力)。
- Redis 命中率降低(因为大量的请求查找的数据都是不存在的,都是去后端 DB 数据库查找了,当然也是查找不到大量(因为该数据本身就不存在。))
- 一直查后端DB数据库(Redis 缓存找不到数据,就一直访问后端的DB 数据库。)
解决方案/思路:
- 对空值也进行缓存。
如果一个查询返回的数据为空,我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间,应该短一些,最长不要超过 5 分钟。
- 设置可访问的名单(白名单)
定义一个可以访问的名单,每次访问和白名单的 id 进行比较,如果访问 id 不在白名单里面,进行一个拦截,不允许访问,比如使用 bitmaps 进行一个实现。
- 采用布隆过滤器
布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。
- 进行实时监控
当发现 Redis 的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务。
2. 缓存击穿
问题描述-如图:
上图解图:
当一个用户(黑客)不断不停大量的访问一个 key 已经被 Redis 过期的一个数据信息,因为这个 key 信息已经被 Redis 设置为了过期了,那么就不可以从 Redis 当中拿到这个数据了,这个请求就会去找后端的 DB 数据库去查找这个 key(过期)的数据信息。然后这个黑客发送大量的请求去我们后端DB 数据库当中查找。这样一种现象被称之为“缓存击穿”了。
在举一个例子:当一个信息,我们暂时成为这个信息是热点信息,这个热点信息,前几天被大量的用户访问,成为了一个热点信息,同时这个时候,这个热点信息还没有在 Redis 当中过期。然后过了几天,这个热点信息,不在是热点了,Redis 同时也将这个热点信息给设置为了过期。然后,过了几天后,有一个特别特别有影响力的网红,将这个热点进行了一个大量的炒作,这个热点被再次成为了热点,这时候,大量用户访问这个被之前Redis 设置为了过期的 key 信息,突然被大量访问,因为被过期了,这些大量的请求,就去找了我们后端的 DB 数据库了。
缓存击穿的原因:
- key 对应的数据存在,但在 Redis 中过期了,此时若有大量并发请求过来,这些请求发现缓存过期,会从后端 DB 加载数据并回设到缓存,这时大并发的请求可能回瞬间把后端 DB 压垮。
- 比如某个热点数据,可能会在某些时间点,被超高并发地访问,容易出现缓存击穿。
缓存击穿的现象/表象:
解决方案/思路:
- 预先设置热门数据 。
在 Redis 高峰访问之前,把一些热门数据提前存入到 Redis 里面,加大这些热门数据的 key 的时长
- 实时调整:
现场监控哪些数据热门,实时调整 key 的过期时长。
- 使用锁:
上述流程图:解图:
3. 缓存雪崩
下面是我们没有发生缓存雪崩的正常状态的图示:
如下则是发生了缓存雪崩的情况图示:
上述图示解图:
当我们的Redis 当中,存在同一时刻大量的 key 缓存数据过期了,而刚刚好,这个时候,又来了大量的的请求,而这些大量的请求,刚刚好又是去访问上面我们大量过期的 key 的数据信息。这时候就导致了,这些大量的请求跑去后端的 DB 数据库查找了。这种现象被称之为是 缓存雪崩 。
缓存雪崩的原因:
- key 对应的数据存在,但在 redis 中过期了,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端 DB 加载数据并回设到缓存。
- 这个时候大并发的请求可能回瞬间把后端 DB 压垮。
- 缓存雪崩于缓存击穿的区别在于:这里针对很多过期的 key 缓存,而缓存击穿是针对某一个 key 过期的缓存。
缓存雪崩的现象/表象
- 数据库访问压力变大,服务器崩溃
- 在极短时间内,访问大量的 key ,而这些 key 集中过期的
解决方案:
- 构建多级缓存架构:
Nginx 缓存 + redis 缓存 + 其他缓存(ehcache 等),这种方式开发/维护成本较高。
- 使用锁或队列 :
用加锁或者队列的方式保证来,确保不会有大量的线程对数据库一次性进行读写,从而避免失效时,大量的并发请求落到底层存储系统上。不适用高并发情况。
- 设置过期标志更新缓存:
记录缓存数据是否过期,如果过期会触发通知另外的线程在后台去更新实际 key 的缓存。
- 将缓存失效时间分散开
比如:我们可以在原有的失效时间基础上增加一个随机值,比如 1- 5分钟(秒的时间)随机,这样每一个缓存的过期时间的重复率就降低,就很难引发集体失效的事件。
小总结:
无论是缓存穿透,还是缓存击穿,缓存雪崩,导致的原因就是缓存失效了,导致不走缓存了,而是直接访问我们的后端 DB 数据库了,导致大量的请求走后端 DB 数据库,让 DB 数据库压力过大,而导致的崩盘了 。所以想要解决的问题核心就是:让请求尽量都走缓存,不要直接访问后端 DB 数据库。
4. 分布式锁
- 单体单击部署的系统被演化成分布式集群系统后
- 由于分布式系统多线程,多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制策略失效。
- 单纯的 Java API 并不能提供分布式锁的能力。
- 为了解决这个问题就需要一种跨 JVM 的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题。
- 示意图(说明:我们探讨的分布式锁是针对分布式项目/架构而言)
单体结构项目-锁机制示意图:
解读:
单体项目,锁是在某个项目
当多个请求来的时,获取到锁,在进行业务操作
这个锁可以控制的范围只是当前项目
分布式/集群项目-锁机制简图:
解读:
分布式项目,锁需要控制多个子项目/子模块
当多个请求来的时,获取到锁,再进行业务操作
这锁的范围需要控制整个分布式项目
4.1 分布式锁主流实现方案:
分布式锁主流的实现方案:
每一种分布式锁解决方案都有各自的优缺点:
-
性能: Redis 最高
-
可靠性:Zookeeper 最高
-
这里我们讲解基于 Redis 实现分布式锁
4.2 Redis 实现分布式锁-基本实现
- 指令:
setnx key value
- setnx: 可以理解为上锁/加锁指令
- key 是锁的键
- value 是锁的值
一旦这个 key使用了 setnx 创建的,就上锁了,在整个 key 没有删除前,不能执行相同 key 的上锁指令,也不能修改这个被 setnx 上锁上的 value内容的值。
127.0.0.1:6379> setnx lock_1 "100"
- 指令:
del key
就是删除 key,可以理解成就是释放锁 。
127.0.0.1:6379> del lock_1
- 指令:
expire key seconds
- 给锁 key,设置过期时间
- 目的是防止死锁,
- 默认单位是
秒
127.0.0.1:6379> expire lock_1 30
- 指令:
ttl key
查看某个 key 的过期时间
127.0.0.1:6379> ttl lock_1
(integer) 28
127.0.0.1:6379> ttl lock_1
(integer) 27
127.0.0.1:6379> ttl lock_1
(integer) 20
127.0.0.1:6379> ttl lock_1
(integer) 9
127.0.0.1:6379> ttl lock_1
(integer) 3
127.0.0.1:6379> ttl lock_1
(integer) -2
- 指令:
set key value nx ex seconds
- 设置锁的同时,指定该锁的过期时间,防止死锁
- 这个指令是原子性的,防止
sentnx key value / expire key seconds
两条指令,中间执行被打断。- 过期时间到后,会自动删除。
127.0.0.1:6379> set lock_2 "200" nx ex 30
4.3 Redis 实现分布式锁-Java代码实现
- 需求说明/图解,编写代码,实现如下功能:
- 在 SpringBoot + Redis 实现分布式锁的使用
- 获取锁,key 为 lock,示意图如下:
第1种情况:
- 如果获取到该分布式锁、
- 就获取 key 为 num 的值,并对 num + 1,再更新 num 的值,并释放锁(key 为 Lock)
- 如果获取不到 key 为 num 的值,就直接返回。
第2种情况:
- 如果没有获取到该分布式锁。
- 休眠 100 毫秒,再尝试获取
具体代码实现如下:
在 pom.xml 文件当中导入相关的依赖的 jar 包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.6</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.rainbowsea</groupId><artifactId>redis_springboot</artifactId><version>1.0-SNAPSHOT</version><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><!-- 说明: 如果这里是 spring-boot-start 就改成如下 spring-boot-start-web--><artifactId>spring-boot-starter-web</artifactId></dependency><!-- redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- spring2.X 集成 redis 所需 common-pool--><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><!--不要带版本号,防止冲突--></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- json 转换的 jar 包依赖--><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.13.2.2</version></dependency></dependencies><!-- 插件--><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
在 Redis 当中设置好对应需要的 num 的值。
127.0.0.1:6379> set num 0
server.port=9090
#Redis 服务器地址
spring.redis.host=192.168.76.147
#Redis 服务器连接端口
spring.redis.port=6379
#Redis 如果有密码,需要配置, 没有密码就不要写
spring.redis.password=rainbowsea
#Redis 数据库索引(默认为 0)
spring.redis.database=0
#连接超时时间(毫秒)
spring.redis.timeout=1800000
#连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=20
#最大阻塞等待时间(负数表示没限制)
spring.redis.lettuce.pool.max-wait=-1
#连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=5
#连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0
编写Java业务逻辑代码如下:
java">package com.rainbowsea.redis.controller;import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;@RestController
@RequestMapping("/redisTest")
public class RedisTestController {// 装配 RedisTemplate@Resourceprivate RedisTemplate redisTemplate;// 编写方法,使用 Redis 分布式锁,完成对 key 为 num 的 + 1操作@GetMapping("/lock")public void lock() {// 1. 获取锁/设置锁 key -> lock : setnxBoolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "ok");if(lock) { // true ,说明获取锁/设置锁成功Object value = redisTemplate.opsForValue().get("num");// 判断返回的 value 是否有值if(value == null | !StringUtils.hasText(value.toString())) {return;}// 2. 有值,就将其转成 intint num = Integer.parseInt(value.toString());// 3. 将 num + 1 ,再重新设置回去redisTemplate.opsForValue().set("num",++num);// 4. 释放锁-lockredisTemplate.delete("lock");} else { // 获取锁失败,休眠 100 毫秒,再重新获取锁/设置锁try {Thread.sleep(100);lock(); // 递归回去,休眠结束重新发送新的请求} catch (InterruptedException e) {e.printStackTrace();}}}
}
使用 ab 工具完成测试:
ab -n 1000 -c 100 http://192.168.76.1:9090/redisTest/lock
4.4 实例:优化-设置锁的过期时间,防止死锁
为了防止,当业务逻辑出现了问题/异常,什么的导致中断了后续的操作,导致锁没有被成功释放,这里我们设置一个锁的过期时间,无论业务在这个时间点是否将锁释放了,就会将锁释放掉,给其他的请求使用。注意的是: 这个时间点一定要使业务上充足完成执行完的时间,设置的过期时间太短了,业务还没来得及完成锁就过期了。会出问题。
这里我们设置过期时间为 3 秒钟
java">package com.rainbowsea.redis.controller;import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;@RestController
@RequestMapping("/redisTest")
public class RedisTestController {// 装配 RedisTemplate@Resourceprivate RedisTemplate redisTemplate;// 编写方法,使用 Redis 分布式锁,完成对 key 为 num 的 + 1操作@GetMapping("/lock")public void lock() {// 1. 获取锁/设置锁 key -> lock : setnxBoolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "ok", 3,TimeUnit.SECONDS);if(lock) { // true ,说明获取锁/设置锁成功Object value = redisTemplate.opsForValue().get("num");// 判断返回的 value 是否有值if(value == null | !StringUtils.hasText(value.toString())) {return;}// 2. 有值,就将其转成 intint num = Integer.parseInt(value.toString());// 3. 将 num + 1 ,再重新设置回去redisTemplate.opsForValue().set("num",++num);// 4. 释放锁-lockredisTemplate.delete("lock");} else { // 获取锁失败,休眠 100 毫秒,再重新获取锁/设置锁try {Thread.sleep(100);lock(); // 递归回去,休眠结束重新发送新的请求} catch (InterruptedException e) {e.printStackTrace();}}}
}
修改了程序,重新再次启动程序,执行 DB 测试。
[root@localhost ~]# ab -n 1000 -c 100 http://192.168.76.1:9090/redisTest/lock
4.5 实例:优化- UUID防止误删错误锁
问题分析:
当A用户业务出现了网络异常,然后超时时间了,key 锁被 Redis 释放了,在这个锁被释放的瞬间,就有新的B用户进入,生成了一个新的锁,准备执行业务,但是这个时候,A用户的网络又好了,并将业务完成了,就删除了 lock 锁了。但是这个锁已经不是(它本身自己的锁了)而是,我们新B用户的锁。然后,它就将这个新B用户生产的锁该删除释放了。删除/释放了不是自己生成的锁 。
解决方案:
- 在获取锁的时候,给锁设置的值是唯一的 UUID
- 在释放锁的时候,判断释放的锁是不是我们自己生成的同一把锁。是就删除,不是就不删除释放。
- 造成这个问题的本质原因,就是因为删除操作缺乏原子性。
修改代码如下:
java">package com.rainbowsea.redis.controller;import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;@RestController
@RequestMapping("/redisTest")
public class RedisTestController {// 装配 RedisTemplate@Resourceprivate RedisTemplate redisTemplate;// 编写方法,使用 Redis 分布式锁,完成对 key 为 num 的 + 1操作@GetMapping("/lock")public void lock() {// 得到一个 UUID的值,作为锁的值String uuid = UUID.randomUUID().toString();// 1. 获取锁/设置锁 key -> lock : setnxBoolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);if (lock) { // true ,说明获取锁/设置锁成功Object value = redisTemplate.opsForValue().get("num");// 判断返回的 value 是否有值if (value == null | !StringUtils.hasText(value.toString())) {return;}// 2. 有值,就将其转成 intint num = Integer.parseInt(value.toString());// 3. 将 num + 1 ,再重新设置回去redisTemplate.opsForValue().set("num", ++num);// 4. 释放锁-lock// 为了防止误删除其他用户的锁,先判断当前的锁是不是前面获取的锁,如果相同,再释放,不相同不可以释放if (uuid.equals((String) redisTemplate.opsForValue().get("lock"))) {redisTemplate.delete("lock");}} else { // 获取锁失败,休眠 100 毫秒,再重新获取锁/设置锁try {Thread.sleep(100);lock(); // 递归回去,休眠结束重新发送新的请求} catch (InterruptedException e) {e.printStackTrace();}}}}
修改了程序,重新再次启动程序,执行 DB 测试。
[root@localhost ~]# ab -n 1000 -c 100 http://192.168.76.1:9090/redisTest/lock
4.6 实例:优化-LUA 脚本保证删除原子性
问题分析:
解决方案:
- 删除操作缺乏原子性
- 使用 Lua 脚本保证删除原子性
具体代码实现如下:
具体的 LUA 脚本如下:
if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end;
修改了程序,重新再次启动程序,执行 DB 测试。
[root@localhost ~]# ab -n 1000 -c 100 http://192.168.76.1:9090/redisTest/lock
4.7 注意事项和细节
- 定义锁的 key ,key 可以根据业务,分别设置,比如操作某商品,key 应该是为每个 sku 定义的,也就是每个 sku 有一把锁。
- 为了确保分布式锁可用,要确保锁的实现同时满足以下四个条件:
- 互斥性,在任意时刻,只有一个客户端能持有锁。
- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其它客户端能加锁。
- 加锁和解锁必须是同一个客户端,A客户端不能把B客户端加的锁给解了。
- 加锁和解锁必须具有原子性。
5. 最后:
“在这个最后的篇章中,我要表达我对每一位读者的感激之情。你们的关注和回复是我创作的动力源泉,我从你们身上吸取了无尽的灵感与勇气。我会将你们的鼓励留在心底,继续在其他的领域奋斗。感谢你们,我们总会在某个时刻再次相遇。”