Redis篇--常见问题篇3--缓存击穿(数据查询上锁,异步操作,熔断降级,三种缓存问题综合优化策略)

embedded/2024/12/27 2:58:47/

1、缓存击穿

(1)、概述

缓存击穿是指某个热点数据在缓存中过期后,大量并发请求同时访问该数据,导致这些请求全部穿透到数据库,形成瞬间的高负载,给数据库和服务带来巨大的压力,甚至会崩溃。
这种情况通常发生在高并发场景下,尤其是在某些热点数据的缓存过期时。

示意图:
在这里插入图片描述

(2)、解决方案

1、加锁机制

缓存失效时,可以使用分布式锁来确保只有一个线程能够去查询数据库并更新缓存,其他线程等待锁释放后再从缓存中获取数据。这样可以避免多个线程同时查询数据库

示例如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import java.util.concurrent.TimeUnit;@Service
public class CacheService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate DataService dataService; // 假设这是你的数据服务,负责从数据库获取数据private static final String LOCK_KEY = "cache:lock";private static final long LOCK_EXPIRE_TIME = 5; // 锁的过期时间(秒)/*** 获取数据,使用分布式锁防止缓存击穿* @param key 数据的唯一标识* @return 数据*/public String getData(String key) {// 1. 尝试从缓存中获取数据String cachedData = (String) redisTemplate.opsForValue().get(key);if (cachedData != null) {return cachedData;}// 2. 尝试获取分布式锁Boolean lockAcquired = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY, "locked", LOCK_EXPIRETime, TimeUnit.SECONDS);if (lockAcquired) {try {// 3. 锁定成功,去数据库中加载数据String data = dataService.getDataByKey(key);if (data != null) {// 4. 将数据存入缓存redisTemplate.opsForValue().set(key, data, 60, TimeUnit.SECONDS); // 设置60秒的TTL}return data;} finally {// 5. 释放锁redisTemplate.delete(LOCK_KEY);}} else {// 6. 锁定失败,等待一段时间后重试try {Thread.sleep(100); // 等待 100 毫秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}// 7. 重试获取数据return getData(key);}}
}
2、异步更新缓存

缓存失效时,后端可以返回给前端一个标识,告诉前端继续使用之前的旧缓存数据,并异步更新缓存。这样可以保证用户不会感知到缓存失效,同时减轻数据库的压力。

注意:这种方式往往需要搭配其他策略一起实现,如搭配限流或熔断机制。因为击穿往往在很大并发量下才会出现,异步操作就会导致启动大量子线程去执行任务,虽然这些子线程可以被分配到不同的CPU核心去处理,但对于我们业务而言,实际上只需要一个处理成功即可,其他的线程都是多余的,造成CPU资源浪费,所以此时结合熔断或限流机制就非常合适。

代码示例1:异步操作,不采用熔断和限流
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;@Service
public class CacheService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate DataService dataService; // 假设这是你的数据服务,负责从数据库获取数据/*** 获取数据,使用异步加载防止缓存击穿* @param key 数据的唯一标识* @return 数据*/public String getData(String key) {// 1. 尝试从缓存中获取数据String cachedData = (String) redisTemplate.opsForValue().get(key);if (cachedData != null) {return cachedData;}// 2. 异步加载新数据CompletableFuture.supplyAsync(() -> dataService.getDataByKey(key)).thenAccept(data -> {   // data这里是返回值if (data != null) {// 3. 将新数据data存入缓存redisTemplate.opsForValue().set(key, data, 60, TimeUnit.SECONDS); // 设置 60 秒的 TTL}});// 4. 返回旧的缓存数据(如果有)return cachedData;}
}
代码示例2:异步操作,结合限流

限流可以作用在接口上,也可以作用在数据库的查询上,示例如下

  import io.github.resilience4j.ratelimiter.RateLimiter;import io.github.resilience4j.ratelimiter.RateLimiterConfig;import io.github.resilience4j.ratelimiter.RateLimiterRegistry;// 配置限流器,每秒最多允许 10 个异步任务RateLimiter rateLimiter = RateLimiter.of("cacheUpdateLimiter", RateLimiterConfig.custom().limitForPeriod(10) // 每秒最多 10 个请求.limitRefreshPeriod(Duration.ofSeconds(1)) // 每秒刷新一次.timeoutDuration(Duration.ofMillis(500)) // 超时时间为 500 毫秒.build());// 在异步更新缓存时使用限流器CompletableFuture.supplyAsync(() -> {if (rateLimiter.acquirePermission()) {    // 校验是否限流// 未达到限流限制,执行读取数据库获取数据的任务return dataService.getDataByKey(key);} else {// 超过限流限制,直接返回null或其他默认值return null;}}).thenAccept(data -> {if (data != null) {   // 返回数据不为空,即查询数据库有返回结果// 更新本地缓存和 RedislocalCache.put(key, data);redisTemplate.opsForValue().set(key, data, getRandomTtl(), TimeUnit.SECONDS);}});
代码示例3:异步操作,结合熔断
  import io.github.resilience4j.circuitbreaker.CircuitBreaker;import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;// 配置熔断器,当错误率达到 50% 时跳闸,等待 10 秒后重试CircuitBreaker circuitBreaker = CircuitBreaker.of("cacheUpdateCircuit",CircuitBreakerConfig.custom().failureRateThreshold(50) // 错误率达到 50% 时跳闸.waitDurationInOpenState(Duration.ofSeconds(10)) // 等待 10 秒后重试.slidingWindowSize(10) // 使用 10 个请求的滑动窗口.build());// 在异步更新缓存时使用熔断器CompletableFuture.supplyAsync(() -> {try {return circuitBreaker.executeSupplier(() -> dataService.getDataByKey(key));     // 使用熔断器监听查询数据库的任务结果,如果执行失败次数超出熔断器配置,熔断器会主动跳闸(抛出异常),以保护当前的服务正常运行。} catch (Exception e) {// 熔断器跳闸,返回 nullreturn null;}}).thenAccept(data -> {if (data != null) {      // 执行成功// 更新本地缓存和 RedislocalCache.put(key, data);redisTemplate.opsForValue().set(key, data, getRandomTtl(), TimeUnit.SECONDS);}});
3、多级缓存策略

使用双缓存策略,即一个主缓存和一个影子缓存。当主缓存失效时,先从影子缓存中获取数据,影子缓存的过期时间比主缓存稍长。这样可以减少缓存失效时的冲击。如,使用本地缓缓+分布式缓存的方式,示例可以参考缓存雪崩篇的代码示例。

4、缓存不过期,定时更新

对于某些非常重要的热点数据,可以考虑将其设置为永不过期的缓存。通过定时任务或后台线程定期更新这些数据,确保缓存中的数据始终是最新的。

示例代码:

import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;@Service
public class CacheService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate DataService dataService; // 假设这是你的数据服务,负责从数据库获取数据/*** 定时更新热点数据*/@Scheduled(fixedRate = 60000) // 每分钟执行一次public void updateHotData() {// 1. 获取热点数据的 key 列表List<String> hotKeys = dataService.getHotKeys();// 2. 更新每个热点数据for (String key : hotKeys) {String data = dataService.getDataByKey(key);if (data != null) {// 3. 将数据存入缓存,设置较长的 TTLredisTemplate.opsForValue().set(key, data, 3600, TimeUnit.SECONDS); // 设置 1 小时的 TTL}}}
}

2、三种缓存问题综合优化策略

为了同时应对缓存穿透、缓存雪崩和缓存击穿,我们可以结合多种技术手段,构建一个健壮的缓存系统。

(1)、综合策略概述

缓存穿透:

  • 对于不存在的数据,缓存空结果(如null),并设置较短的过期时间。(推荐)
  • 使用布隆过滤器快速判断数据是否存在。(不太推荐,过滤器准确度以及需要维护成本)

缓存雪崩:

  • 为每个缓存项设置随机的过期时间,避免所有缓存项在同一时间点失效。(推荐)
  • 实现缓存预热,在系统启动时或定期加载常用数据到缓存中。(推荐)

缓存击穿:

  • 使用分布式锁确保只有一个线程去查询数据库并更新缓存,其他线程等待锁释放后再从缓存中获取数据。(推荐)
  • 实现 双缓存策略,即主缓存和影子缓存,减少缓存失效时的冲击。
  • 异步更新缓存,确保用户不会感知到缓存失效。(可以在锁机制的基础上,适当添加限流或熔断机制)

(2)、代码示例

本例仅以结合以上4中优化方式为例。

import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;@Service
public class ComprehensiveCacheService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;// Redisson 客户端用于分布式锁@Autowiredprivate RedissonClient redissonClient;// 主缓存前缀private static final String MAIN_CACHE_PREFIX = "main:";// 模拟数据库查询private Object queryFromDatabase(String key) {// 模拟数据库查询逻辑System.out.println("Querying from database for key: " + key);return "data-" + key; // 返回模拟数据}/*** 获取数据,综合处理缓存穿透、缓存雪崩和缓存击穿问题**/public Object getData(String key) {// 1、尝试从主缓存中获取数据Object mainCachedValue = redisTemplate.opsForValue().get(MAIN_CACHE_PREFIX + key);if (mainCachedValue != null) {return mainCachedValue;}// 2、使用分布式锁,确保只有一个线程去查询数据库,防止击穿RLock lock = redissonClient.getLock(key);try {boolean isLocked = lock.tryLock(10, 100, TimeUnit.MILLISECONDS);if (!isLocked) {// 如果获取锁失败,直接返回 null 或者旧的缓存数据return null;}// 3、再次检查缓存,防止其他线程已经更新了缓存mainCachedValue = redisTemplate.opsForValue().get(MAIN_CACHE_PREFIX + key);if (mainCachedValue != null) {return mainCachedValue;}// 4、如果缓存中没有数据,查询数据库Object dbValue = queryFromDatabase(key);// 5、更新缓存,设置相对随机的过期时间,防止雪崩int randomExpiration = ThreadLocalRandom.current().nextInt(500, 700); // 随机 500-700 秒redisTemplate.opsForValue().set(MAIN_CACHE_PREFIX + key, dbValue, randomExpiration, TimeUnit.SECONDS); // 6、如果数据库中没有数据,缓存空对象结果60秒,防止穿透if (dbValue == null) {redisTemplate.opsForValue().set(MAIN_CACHE_PREFIX + key, "NULL", 60, TimeUnit.SECONDS); // 缓存空结果60秒return null;}return dbValue;} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RuntimeException("Failed to acquire lock.", e);} finally {// 7、释放锁if (lock.isHeldByCurrentThread()) {lock.unlock();}}}/*** 缓存预热方法,可以通过一些项目初始化执行的方式执行,进行预热**/@PostConstructpublic void warmUpCache() {// 模拟预热一些常用的数据for (int i = 1; i <= 100; i++) {String key = "key-" + i;Object dbValue = queryFromDatabase(key);int randomExpiration = ThreadLocalRandom.current().nextInt(500, 700); // 随机 500-700 秒redisTemplate.opsForValue().set(MAIN_CACHE_PREFIX + key, dbValue, randomExpiration, TimeUnit.SECONDS); }System.out.println("Cache pre-warmed with 100 keys.");}
}

附录:

熔断器介绍:

1、概述

在高并发场景下,熔断器(Circuit Breaker)的作用是保护系统免受故障服务的影响。当后端服务出现故障时,熔断器会根据配置的规则自动跳闸,阻止更多的请求继续穿透到故障服务,从而避免系统过载或崩溃。随着故障的恢复,熔断器会逐渐从"打开"状态进入"半开"状态,尝试恢复正常的请求处理。

2、熔断器的状态

熔断器有三种状态:
- 关闭状态(Closed):正常工作状态,允许所有请求通过。
- 打开状态(Open):当错误率超过阈值时,熔断器会进入"打开"状态,阻止后续请求执行,直接返回默认值或抛出异常。
- 半开状态(Half-Open):在"打开"状态等待一段时间后,熔断器会进入"半开"状态,允许少量请求通过,以检测后端服务是否恢复正常。如果这些请求成功,熔断器会回到"关闭"状态;否则,它会再次进入"打开"状态。

3、熔断器行为解释
3.1、初始状态

我们定义了一个熔断器实例circuitBreaker,并指定了熔断器的名称为“cacheUpdateCircuit”,并配置了熔断器的行为如下。此时的熔断器处于关闭状态(Closed),会放行所有的请求通过。

CircuitBreaker circuitBreaker = CircuitBreaker.of("cacheUpdateCircuit",CircuitBreakerConfig.custom().failureRateThreshold(50) // 错误率达到 50% 时跳闸.waitDurationInOpenState(Duration.ofSeconds(10)) // 等待 10 秒后重试.slidingWindowSize(10) // 使用 10 个请求的滑动窗口.permittedNumberOfCallsInHalfOpenState(3) // 在半开状态下允许3个请求通过.minimumNumberOfCalls(3) // 在半开状态下至少需要3个请求才能评估.successRateThreshold(75) // 成功率达到 75% 时才恢复.build());
3.2、跳闸行为

定义的熔断器实例circuitBreaker用于检测dataService.getDataByKey(key)的执行结果。
因为我们上面配置的窗口大小是10个请求,这里以10个请求为例进行说明。

try {return circuitBreaker.executeSupplier(() -> dataService.getDataByKey(key));} catch (Exception e) {// 熔断器跳闸,返回 null 或其他默认值return null;}

情况1:如果连续的10次请求中,超过50%都请求都成功执行了任务,则熔断器会继续处于关闭状态,继续进行下一个窗口周期的10次统计。(注意:窗口是滚动的)
情况2:如果连续的10次请求中,大于等于50%都请求都失败了,那么熔断器会立即进入开启状态(即发生跳闸),直接拒绝接下来10秒(如上配置为10秒)内的所有请求。

3.3、重复跳闸与恢复

当熔断器进入"打开"状态后,它会等待10秒(配置中的waitDurationInOpenState的时间),这10秒内直接拒绝所有的请求。10秒后会自动进入"半开"状态下,此时熔断器会允许少量请求通过(默认为1个,如果配置了permittedNumberOfCallsInHalfOpenState,则以配置为准),以检测后端服务是否已经恢复正常。如上的示例中,我们配置为3个。
情况1:放行了3个请求后,这3次的成功率超过75%(successRateThreshold配置),则认为服务恢复,熔断器进入关闭状态,允许所有的请求通过。
情况2:放行了3个请求后,这3次的成功率低于75%,熔断器会认为服务仍然不可用,并再次进入"打开"状态,拒绝所有请求。10秒后,再次进入半开状态,以此往复。

4、熔断器总结
  • 跳闸条件:当滑动窗口内的错误率达到配置的阈值(例如 50%)时,熔断器会跳闸,进入 "打开"状态。此时,所有新的请求都会被拒绝,直接返回默认值,以防止更多的请求继续穿透到故障服务。

  • 半开状态:在"打开"状态等待一段时间后,熔断器会进入"半开"状态,允许少量请求通过,以检测后端服务是否恢复正常。如果这些请求成功,熔断器会回到"关闭"状态;如果失败,熔断器会再次进入"打开"状态。

  • 并发场景:在高并发场景下,熔断器能够快速响应错误率的变化,及时跳闸保护系统,并在后端服务恢复后逐步恢复正常请求处理。

熔断器可以在高并发场景下有效地保护系统免受故障服务的影响,确保系统的稳定性和可用性。


http://www.ppmy.cn/embedded/149072.html

相关文章

通用导出任何对象列表数据的excel工具类

在工作中经常会遇到列表数据的导出&#xff0c;每次需要的时候都要去开发一次&#xff0c;且数据不断在变化&#xff0c;于是就有了下述的工具类&#xff0c;可传入各种实体对象的List&#xff0c;最终以指定格式导出excel&#xff0c;废话不多说&#xff0c;上代码~ 控制层代…

全国硕士研究生入学考试(考研)备考要点之具体科目

考研备考要点详解&#xff1a;具体科目复习策略与关键建议 在中国&#xff0c;考研&#xff08;研究生入学考试&#xff09;是本科生提升学历、深化专业知识和拓展职业发展的重要途径。考研备考过程中&#xff0c;各科目的复习策略与方法至关重要&#xff0c;直接影响考生的考…

Tana 与 Notion 面向未来的 All-in-one 笔记工具

随着数字时代的快速发展&#xff0c;信息管理和知识组织工具成为了现代工作者不可或缺的一部分。在这篇文章中&#xff0c;我们将深入探讨两款备受瞩目的应用程序——Tana 和 Notion。它们不仅代表了当前市场上最先进的生产力解决方案之一&#xff0c;而且各自以独特的方式重新…

Spring常见问题

Spring常见问题 1.什么是Spring,对Spring的理解? Spring是一个轻量级的,IOC和AOP的一站式框架,为简化企业级开发而生的. Spring会管理对象,需要使用的时候直接注入即可,还可以对对象的功能进行增强,使得耦合度降低. 2.解释IOC和AOP IOC (控制反转)将生成对象控制权反转给…

使用复数类在C#中轻松绘制曼德布洛集分形

示例在 C# 中绘制曼德布洛特集分形解释了如何通过迭代以下方程来绘制曼德布洛特集&#xff1a; 其中 Z(n) 和 C 是复数。程序迭代此方程&#xff0c;直到 Z(n) 的大小至少为 2 或程序执行最大迭代次数。 该示例在单独的变量中跟踪数字的实部和虚部。此示例使用Complex类来更轻松…

Serverless集成和扩展性概述

Serverless架构的一个关键优势是其灵活性和可扩展性。开发者可以轻松集成各种第三方服务和平台,以扩展应用的功能。以下是一些关键的集成和扩展性策略: 第三方API集成:集成第三方API来增强应用功能,如支付处理、地图服务等。微服务架构:采用微服务架构来构建可扩展的Serve…

网络安全 | 云计算中的数据加密与访问控制

网络安全 | 云计算中的数据加密与访问控制 一、前言二、云计算概述2.1 云计算的定义与特点2.2 云计算的服务模式2.3 云计算的数据安全挑战 三、数据加密技术在云计算中的应用3.1 对称加密算法3.2 非对称加密算法3.3 混合加密算法 四、云计算中的访问控制模型4.1 基于角色的访问…

肿瘤电场仪疗法原理:科技之光,照亮抗癌之路

在医疗科技日新月异的今天&#xff0c;肿瘤电场仪疗法作为一种创新的无创治疗手段&#xff0c;正以其独特的物理机制和生物效应&#xff0c;为患者带来了新的治疗选择和希望。本文将深入探讨肿瘤电场仪疗法的原理&#xff0c;揭示其如何在不伤害正常组织的前提下&#xff0c;精…