Redis实战笔记

ops/2024/9/20 3:53:05/ 标签: redis, 笔记

黑马点评项目笔记

一:数据交互:

1.把String解析成Java对象集合并且存入Redis及Java对象集合转换成JSON。

 @Overridepublic Result queryTypeList() {String s = stringRedisTemplate.opsForValue().get("cache:list:");System.out.println("s = " + s);if(!StrUtil.isBlank(s)){List<ShopType> cachedList = JSONUtil.toList(s, ShopType.class);return Result.ok(cachedList);}LambdaQueryWrapper <ShopType>lambdaQueryWrapper = new LambdaQueryWrapper<>();lambdaQueryWrapper.orderByAsc(ShopType::getSort);List <ShopType> list = shopTypeMapper.selectList(lambdaQueryWrapper);System.out.println("list = " + list);stringRedisTemplate.opsForValue().set("cache:list:", JSONUtils.toJSONString(list));return Result.ok(list);}

2.List存储:

List <String> stringlist= stringRedisTemplate.opsForList().range("cache:list",0,-1);System.out.println("stringlist = " + stringlist);if (stringlist != null && !stringlist.isEmpty()) {List<ShopType> shopTypeList = new ArrayList<>();for (String jsonString : stringlist) {ShopType shopType = JSONUtil.toBean(jsonString, ShopType.class);shopTypeList.add(shopType);}return Result.ok(shopTypeList);}LambdaQueryWrapper <ShopType>lambdaQueryWrapper = new LambdaQueryWrapper<>();lambdaQueryWrapper.orderByAsc(ShopType::getSort);List <ShopType> list = shopTypeMapper.selectList(lambdaQueryWrapper);System.out.println("list = " + list);for (int i = 0; i < list.size(); i++) {stringRedisTemplate.opsForList().rightPush("cache:list", JSONUtil.toJsonStr(list.get(i)));System.out.println(list.get(i));}return Result.ok(list);}
当然,亦可以用Lambda表达式进一步简化:
List <String> stringlist= stringRedisTemplate.opsForList().range("cache:list",0,-1);System.out.println("stringlist = " + stringlist);if (stringlist != null && !stringlist.isEmpty()) {List<ShopType> shopTypeList = stringlist.stream()  .map(jsonString -> JSONUtil.toBean(jsonString, ShopType.class))  .collect(Collectors.toList());  return Result.ok(shopTypeList);}LambdaQueryWrapper <ShopType>lambdaQueryWrapper = new LambdaQueryWrapper<>();lambdaQueryWrapper.orderByAsc(ShopType::getSort);List <ShopType> list = shopTypeMapper.selectList(lambdaQueryWrapper);System.out.println("list = " + list);for (int i = 0; i < list.size(); i++) {stringRedisTemplate.opsForList().rightPush("cache:list", JSONUtil.toJsonStr(list.get(i)));System.out.println(list.get(i));}return Result.ok(list);}
注意,前端一般要的是Json数组,在这里,我们只需要一个对象集合就行,Springboot会默认JSON格式返回。
对于下面的代码:
List<ShopType> shopTypeList = stringlist.stream()  .map(jsonString -> JSONUtil.toBean(jsonString, ShopType.class))  .collect(Collectors.toList());  return Result.ok(shopTypeList)
  1. 声明并初始化 shopTypeList
List<ShopType> shopTypeList = ...;

这行代码声明了一个ShopType对象的列表shopTypeList。它会在后续代码中通过Stream API操作进行初始化。

  1. 使用Stream API处理 stringlist
stringlist.stream()

这行代码将stringlist(一个包含JSON字符串的列表)转换为一个Stream,以便进行流式处理。

3.映射每个JSON字符串到 ShopType 对象

.map(jsonString -> JSONUtil.toBean(jsonString, ShopType.class))
使用map操作,将Stream中的每个JSON字符串元素映射为对应的ShopType对象。这里JSONUtil.toBean是一个可以将JSON字符串转换为指定类型对象的方法。

​ 4. 收集结果到新的列表

.collect(Collectors.toList());

使用collect操作,将Stream中的元素收集到一个新的列表中。这里使用了Collectors.toList()作为收集器,它会创建一个新的列表来存储转换后的ShopType对象。

二:缓存策略:

2.1.更新:
image-20240422202913302
一般我们选用第一种。

image-20240422203251310

对于三,两种都存在问题:

image-20240422203736068

但是,第二种发生问题的概率极低。所以我们用第二种:
@Override
@Transactional
public Result updateshop(Shop shop) {Long id = shop.getId();if (id == null){return Result.fail("店铺id不能为空");}updateById(shop);stringRedisTemplate.delete("cache:shop:" + id);return Result.ok();
}

2.2.穿透:

image-20240422205639525
我们这里用第一种方法:
if(StrUtil.isNotBlank(s)){Shop shop = JSONUtil.toBean(s,Shop.class);return Result.ok(shop);
}
//注意,null不等于""
if(s != null){return Result.fail("店铺不存在。");
}//......if(shop == null){
//写空值
stringRedisTemplate.opsForValue().set("cache:shop:" + id,"",RedisConstants.CACHE_NULL_TTL,TimeUnit.MINUTES);return Result.fail("店铺不存在。");}

2.3.雪崩:

image-20240422211712419

2.4.击穿:

image-20240422212047122

对比:

image-20240422212634600

image-20240422212722940
注意,逻辑过期是假的过期,但是互斥锁是真的没有该数据的缓存了。

互斥锁:

image-20240422213224412
我们用setnx来实现该锁:
private Boolean tryLock(String key){Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);return BooleanUtil.isTrue(result);}private void unLock(String key){stringRedisTemplate.delete(key);}
防止大量缓存穿透及锁的使用代码:
public Shop queryWithLock(Long id){String s = stringRedisTemplate.opsForValue().get("cache:shop:" + id);log.info(s);if(StrUtil.isNotBlank(s)){Shop shop = JSONUtil.toBean(s,Shop.class);return shop;}if(s != null){return null;}//缓存重建String lockKey = "lock:shop:"+id;Shop shop = null;try {if(!tryLock(lockKey)){Thread.sleep(50);return queryWithLock(id);}shop = getById(id);log.info("shop = " +shop);if(shop == null){//写空值stringRedisTemplate.opsForValue().set("cache:shop:" + id,"",RedisConstants.CACHE_NULL_TTL,TimeUnit.MINUTES);return null;}String key = "cache:shop:" + id;stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {unLock(lockKey);}return shop;}
注意,try-catch-finally是保障锁必须被释放的关键。方法调用如下:
@Overridepublic Result queryById(Long id) {//缓存穿透//Shop shop =queryWithPassTrough(id);//互斥锁Shop shop = queryWithLock(id);if(shop == null){return Result.fail("店铺不存在。");}return Result.ok(shop);}

逻辑过期:

image-20240423125308260

首先是怎么实现设置逻辑过期:
@Data
public class RedisData {private LocalDateTime expireTime;private Object data;
}
redis_271">然后我们接下来模拟一下将热点事件提前写入redis
 public void saveShopRedis(Long id,Long expires){Shop shop = getById(id);RedisData redisData = new RedisData();redisData.setData(shop);redisData.setExpireTime(LocalDateTime.now().plusSeconds(expires));stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));}
先给出独立线程代码:
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
  1. ExecutorService: 这是Java的并发包java.util.concurrent中的一个接口,它代表了一个用于执行任务的线程池。线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的ThreadFactory创建一个新线程。
  2. CACHE_REBUILD_EXECUTOR: 这是我们创建的ExecutorService对象的变量名。根据命名,它可能用于重建或刷新某种缓存。
  3. Executors.newFixedThreadPool(10): 这是创建线程池的方法。Executors是Java中的一个工具类,它提供了创建线程池的静态方法。newFixedThreadPool(10)创建了一个固定大小的线程池,其中包含10个线程。这意味着无论提交多少任务,线程池中的线程数量都不会超过10个。当提交的任务数超过线程数时,任务会在队列中等待,直到线程空闲并可以处理它们。

简而言之,这行代码定义了一个私有的、静态的、不可变的ExecutorService对象,该对象是一个包含10个线程的固定线程池,用于处理与缓存重建相关的任务。

接下来就是业务代码:
 public Shop queryWithExpire(Long id){String s = stringRedisTemplate.opsForValue().get("cache:shop:" + id);log.info(s);if(StrUtil.isBlank(s)){return null;}RedisData bean = JSONUtil.toBean(s, RedisData.class);JSONObject object = (JSONObject) bean.getData();Shop shop = JSONUtil.toBean(object, Shop.class);LocalDateTime expirationTime = bean.getExpireTime();if(expirationTime.isAfter(LocalDateTime.now())){return shop;}//缓存重建String lockKey = LOCK_SHOP_KEY + id;boolean isLock = tryLock(lockKey);if(isLock){CACHE_REBUILD_EXECUTOR.submit(()->{try {saveShopRedis(id,20L);} catch (Exception e) {throw new RuntimeException(e);} finally {unLock(lockKey);}});}return shop;}
记得在finally中释放锁。

2.5.自定义工具类:

@Slf4j
@Component
public class CacheClient {private final StringRedisTemplate stringRedisTemplate;private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);public CacheClient(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public void set(String key, Object value, Long time , TimeUnit unit) {stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);}private Boolean tryLock(String key){Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);return BooleanUtil.isTrue(result);}private void unLock(String key){stringRedisTemplate.delete(key);}public void setWithLogicalExpire (String key, Object value, Long time , TimeUnit unit) {RedisData redisData = new RedisData();redisData.setData(value);redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));}//穿透代码public <R,ID> R queryWithPassTrough (String keyPrefix, ID id, Class<R> type, Function<ID,R>dbFallback,Long time,TimeUnit unit) {String s = stringRedisTemplate.opsForValue().get(keyPrefix + id);log.info(s);if(StrUtil.isNotBlank(s)){return JSONUtil.toBean(s,type);}if(s != null){return null;}R r = dbFallback.apply(id);if(r == null){//写空值stringRedisTemplate.opsForValue().set(keyPrefix + id,"",RedisConstants.CACHE_NULL_TTL,TimeUnit.MINUTES);return null;}String key = keyPrefix + id;this.set(key,r,time,unit);return r;}public <R,ID>R queryWithExpire (String keyPrefix,ID id,Class<R> type,Function<ID,R>dbFallback,Long time,TimeUnit unit){String s = stringRedisTemplate.opsForValue().get(keyPrefix + id);log.info(s);if(StrUtil.isBlank(s)){return null;}RedisData bean = JSONUtil.toBean(s, RedisData.class);JSONObject object = (JSONObject) bean.getData();R r = JSONUtil.toBean(object, type);LocalDateTime expirationTime = bean.getExpireTime();if(expirationTime.isAfter(LocalDateTime.now())){return r;}//缓存重建String lockKey = LOCK_SHOP_KEY + id;boolean isLock = tryLock(lockKey);if(isLock){CACHE_REBUILD_EXECUTOR.submit(()->{try {R apply = dbFallback.apply(id);this.setWithLogicalExpire(keyPrefix + id,apply,time,unit);} catch (Exception e) {throw new RuntimeException(e);} finally {unLock(lockKey);}});}return r;}
}
补充:
这里的StringRedisTemplate没有调用set方法,也没有注解,但是依然实现了依赖注入,这是因为在spring中,如果注入对象,被注入对象都加入IOC容器,并且有一个单一的构造函数,那么就可以实现不需要注解与配置的依赖注入。
使用举例:
 @Overridepublic Result queryById(Long id) {Shop shop = cacheClient.queryWithPassTrough(CACHE_SHOP_KEY,id,Shop.class,this::getById,CACHE_SHOP_TTL,TimeUnit.MINUTES);if(shop == null){return Result.fail("店铺不存在。");}return Result.ok(shop);}
注意,如果是缓存击穿,一定要数据预热。

三:秒杀业务:

在这种业务中,我们用全局ID生成器来生成ID:

image-20240426175726793

redisredis_439">我们用redis来处理,可以满足要求。无论在哪里操作数据库,都是redis

image-20240426175938648

常见全局唯一ID策略:

image-20240426212805722

注意,这里的数据库自增并不是数据库单纯的自增,因为我们知道,在后面的学习中,是分布式的,到时候,不同的线程可能会导致乱序,重序,这里是单独的那另外一张表做自增,其他的线程都从这里取id。
redis_449">下面我们来看看基于redis自增的策略:
public class RedisIDWorker {public static final long BEGIN_TIMESTAMP = 1640995200L;private static final int COUNT_BITS = 32;private StringRedisTemplate stringRedisTemplate;public RedisIDWorker (StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public long nextId (String keyPrefix) {LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timestamp = nowSecond - BEGIN_TIMESTAMP;String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));long count = stringRedisTemplate.opsForValue().increment("icr:"+keyPrefix+":"+date);return timestamp << COUNT_BITS | count;}
}

increment(String key): 这个方法是对指定的 key 进行自增操作。如果该 key 不存在,那么它的初始值会被设为 0,然后进行自增,结果为 1。如果 key 存在且其值可以转换为整数,则该值会进行自增。

3.2.超卖问题:

image-20240427204959568

image-20240427205407204

image-20240427205711299

但是,这里的票的变化是与版本同步的,所以我们没有必要用版本号,直接用票作为乐观锁。

image-20240427210009616

CAS是指"比较和交换"(Compare and Swap)的缩写,是一种原子操作,通常用于多线程编程中实现同步。CAS操作包括三个参数:需要进行操作的内存位置、旧的预期值和新的值。CAS操作会比较内存位置的当前值与旧的预期值,如果相同则将内存位置的值更新为新的值,否则不做任何操作。CAS操作是一种乐观锁的实现方式,可以避免使用传统的锁机制带来的性能开销和死锁问题。CAS操作在Java中的实现包括AtomicInteger、AtomicLong等类。
@Transactional@Overridepublic Result seckillVoucher(Long voucherId) {SeckillVoucher byId = seckillVoucherService.getById(voucherId);if(byId.getBeginTime().isAfter(LocalDateTime.now())){return Result.fail("秒杀还没开始");}if(byId.getEndTime().isBefore(LocalDateTime.now())){return Result.fail("秒杀已经结束");}if(byId.getStock()<=0){return Result.fail("库存不足");}boolean success = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherId).eq("stock",byId.getStock()).update();if(!success){return Result.fail("库存不足");}VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setVoucherId(voucherId);long orderId = redisIDWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(UserHolder.getUser().getId());save(voucherOrder);return Result.ok(orderId);}
}
注意,乐观锁也有弊端,他在多个线程同时买票时只能允许一个线程成功,其他线程都会失败,所以我们要解决这个问题:
boolean success = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherId).gt("stock",0).update();
  1. .eq("voucher_id", voucherId)
    • 这是一个条件语句,表示只更新那些voucher_id字段值等于voucherId的记录。voucherId应该是一个变量,存储了要更新的优惠券的ID。
  2. .gt("stock",0)
    • 这是另一个条件语句,表示只更新那些stock字段值大于0的记录。gt是“greater than”的缩写,所以这个条件确保不会更新那些库存已经为0的记录。
  3. .update();
    • 最后,调用update方法执行实际的更新操作。
注意,这里事实上是利用了数据库自带的行锁,当某个事务尝试更新某一行记录时,数据库会为该行加上行锁,阻止其他事务同时修改该行,直到当前事务完成并释放锁。

3.3.一人一单:

Long userid = UserHolder.getUser().getId();
long count = query().eq("user_id",userid).eq("voucher_id",voucherId).count();if(count > 0){return Result.fail("您已经购买过一次!");}
但是依然有问题,用户多线程操作时,依然会存在都判断通过的问题,所以我们先把代码分成几个方法,然后加锁:
 @Transactionalpublic  Result createVoucher(Long voucherId){Long userid = UserHolder.getUser().getId();synchronized(userid.toString().intern()) {long count = query().eq("user_id", userid).eq("voucher_id", voucherId).count();if (count > 0) {return Result.fail("您已经购买过一次!");}VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setVoucherId(voucherId);long orderId = redisIDWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(UserHolder.getUser().getId());save(voucherOrder);return Result.ok(orderId);}}
这里有两个细节:
1.userid.toString().intern(),当处理一个用户的多个请求时,如果只用toString(),该方法在Java中是new一个新的对象,那么即使是同一个用户,可能他的多次请求的锁都不一样,这是有问题的,但是intern()是在字符串常量池中判断有没有这个这个,如果有,就用这个对象。
2.不能在方法上加锁:因为所有的线程都走一个方法,可能会导致其他问题。

但是这样还是有问题:

因为这里又有事务又有锁,锁在事务的内部,如果锁先释放,事务还没有提交,然后其他线程去查询,有可能查询不到,就会存在问题:
所以我们把锁加在外面:
@Transactional
public  Result createVoucher(Long voucherId){Long userid = UserHolder.getUser().getId();long count = query().eq("user_id", userid).eq("voucher_id", voucherId).count();if (count > 0) {return Result.fail("您已经购买过一次!");}VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setVoucherId(voucherId);long orderId = redisIDWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(UserHolder.getUser().getId());save(voucherOrder);return Result.ok(orderId);
}
Long userid = UserHolder.getUser().getId();
synchronized(userid.toString().intern()) {return createVoucher(voucherId);}
但是这样还是有问题:
在外部同步代码块中直接调用带有@Transactional注解的方法可能会导致事务管理不生效的原因,主要是因为Spring的AOP(面向切面编程)代理机制。@Transactional注解的事务管理是通过Spring的AOP代理来实现的,具体来说:

代理模式:**Spring使用代理(通常是JDK动态代理或者CGLIB代理)来拦截那些标有@Transactional注解的方法调用。**当一个被代理的对象调用这些方法时,Spring会在调用前后插入事务管理的逻辑,比如开始事务、提交或回滚事务。
自我调用问题:在同一个类内部,如果一个方法直接调用另一个带有@Transactional注解的方法,这种调用是“直接调用”,而非通过代理对象进行。因此,Spring的AOP代理无法拦截到这种内部调用,从而不会触发事务管理的行为。
在代码示例中,虽然createVoucher方法上有@Transactional注解,但如果它是直接在类内部通过同步代码块被调用,那么这个直接调用就绕过了Spring的AOP代理,事务管理因此可能不会生效。

为了解决这个问题,确保事务管理能够正常工作,可以考虑以下几种方案:
1.通过代理对象调用:如果必须在类内部调用事务方法,可以通过注入当前类的bean实例来间接调用,这样Spring的AOP代理就能生效。重构方法结构:将事务逻辑和同步逻辑整合到一个方法中,确保事务和同步都在同一个由Spring管理的方法体内执行。
2.使用AspectJ编织:Spring也支持使用AspectJ来实现更细粒度的切面编织,包括对类内部方法调用的拦截,但这需要额外的配置和可能的编译期织入。
综上所述,直接在同步代码块中调用事务方法的问题在于它规避了Spring AOP代理,从而可能忽略了事务管理的预期效果。
修改如下:
	 Long userid = UserHolder.getUser().getId();synchronized(userid.toString().intern()) {IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucher(voucherId);}
AopContext.currentProxy(): 这是Spring提供的一个方法,用于在AOP代理方法内部获取当前代理对象。这一步很关键,因为它允许在类内部调用时仍然通过代理来执行,从而确保了@Transactional注解的事务管理能够生效。通过代理调用createVoucher: 通过强制类型转换得到的代理对象proxy来调用createVoucher方法,而不是直接调用。这样做使得事务管理逻辑能够被Spring的AOP代理捕获并正确执行,包括事务的开始、提交或回滚。
导入依赖:
 		<dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId></dependency>
暴露代理对象,在启动类上面加上注解:
@EnableAspectJAutoProxy(exposeProxy = true)

集群的并发问题:

image-20240428144744194
每个jvm内部共享一个synchronized锁监视器。如果在同一个tomcat,那肯定是没有问题的,但是如果是多个服务器,就有多个tomcat,JVM,就会出现问题。

3.4.分布式锁:

在这里插入图片描述

image-20240428145811718

image-20240428150237310

所以我们使用Redis。

为了避免释放错误,所以我们加上过期时间。注意,设置过期时间与setnx是两个语句,我们必须确保它们具有原子性。所以我们可以用set语句:
set lock k1 EX 10 NX
我们用非阻塞式实现这个操作。
public class SimpleRedisLock implements ILock{private StringRedisTemplate stringRedisTemplate;private String name;private static final String KEY_PREFIX = "lock:";public SimpleRedisLock(String name,StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic void unLock() {stringRedisTemplate.delete(KEY_PREFIX+name);}@Overridepublic boolean tryLock(long timeoutSec) {long id = Thread.currentThread().getId();Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,id+"",timeoutSec, TimeUnit.MINUTES);return Boolean.TRUE.equals(success);}
}
使用方法:
Long userid = UserHolder.getUser().getId();SimpleRedisLock lock= new SimpleRedisLock("order"+userid,stringRedisTemplate);boolean isLock = lock.tryLock(1200);if(!isLock){return Result.fail("不允许重复下单");}try {synchronized(userid.toString().intern()) {IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucher(voucherId);}} finally {lock.unLock();}
可能存在的问题:

image-20240428160736009

所以,要判断这个锁是不是自己的,不要把别人的锁释放了。
image-20240428161935640
注意,不要用本身的线程Id作为判断标准,因为不同的JVM有不同的线程Id,代码改进如下:
 private static  final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";//......其他代码@Overridepublic void unLock() {String id = ID_PREFIX + Thread.currentThread().getId();String lock = stringRedisTemplate.opsForValue().get(KEY_PREFIX+name);if(id.equals(lock)) {stringRedisTemplate.delete(KEY_PREFIX + name);}}@Overridepublic boolean tryLock(long timeoutSec) {String id = ID_PREFIX + Thread.currentThread().getId();Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,id+"",timeoutSec, TimeUnit.MINUTES);return Boolean.TRUE.equals(success);}
但是事实上,还是有问题的(md,怎么这么多问题):
下面是一个极端情况:

image-20240428164447229

如果线程一判断相同通过了,但是删除锁的时候被阻塞了,还是会删除其他线程的锁。但是为什么这样的情况我们可以删掉呢:

注意这里存入redis的key,是前缀加用户ID,而前缀,后缀都是static修饰的,在同一个服务器内前缀后缀是一样的,而我们这里解决的是一人一单,也就是说,用户的ID也是一样的,因此在同一台服务器下的同一个用户的key就是固定的。其次,我们上面将后缀加上线程设置为key值,这是为了应对多台服务器下的问题,而我们用的redis,也是为了解决多台服务器下的并发问题,因此在这里,反而在同一台服务器下,在判断通过后删除之前的间隙中,删除动作迟迟不执行,然后老线程的key过期了,然后另外一个线程创建了一个相同的key,此时,老线程执行删除动作,就会删除掉新线程的key。注意,删除是按照key的!!!

所以我们引入Lua脚本:

image-20240428171308131

image-20240428172216314

image-20240428172818687

新建一个unlock.lua文件:
if(redis.call('get',KEYS[1]) == ARGV[1]) thenreturn redis.call('del',KEYS[1])
end
return 0
然后使用静态代码块加载脚本文件:
 	private static final DefaultRedisScript<Long>UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}
使用脚本:
 @Overridepublic void unLock() {stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX+name),ID_PREFIX+Thread.currentThread().getId());}

image-20240428174619419

3.5.分布式锁优化——Redisson:

image-20240428175026803

因为自己写实在是太烦琐了,所以我们引入了其他第三方服务:

image-20240428175113837

导入依赖:
		<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version></dependency>
配置客户端:
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient() {Config config = new Config();config.useSingleServer().setAddress("redis://192.168.6.128").setPassword("kz32330981");return Redisson.create(config);}
}
使用分布式锁:

image-20240502194926872

举例:
 @Overridepublic Result seckillVoucher(Long voucherId) {SeckillVoucher byId = seckillVoucherService.getById(voucherId);if(byId.getBeginTime().isAfter(LocalDateTime.now())){return Result.fail("秒杀还没开始");}if(byId.getEndTime().isBefore(LocalDateTime.now())){return Result.fail("秒杀已经结束");}if(byId.getStock()<=0){return Result.fail("库存不足");}boolean success = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherId).gt("stock",byId.getStock()).update();if(!success){return Result.fail("库存不足");}Long userid = UserHolder.getUser().getId();//SimpleRedisLock lock= new SimpleRedisLock("order"+userid,stringRedisTemplate);RLock lock = redissonClient.getLock("lock:order:" + userid);boolean isLock = lock.tryLock();if(!isLock){return Result.fail("不允许重复下单");}try {synchronized(userid.toString().intern()) {IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucher(voucherId);}} finally {lock.unlock();}}
这里boolean isLock = lock.tryLock();没有设置参数,默认失败一次就马上返回。

3.6.Redissson可重入锁原理:

为什么一般情况下不能实现可重入锁:

image-20240502213631856

string只能有一个标识,当我们实现可重入锁时,如果是当前的线程,我们把它的value++,释放锁的时候–,所以我们需要两个标识,所以我们使用hash:

image-20240503115517381

image-20240503115612799

Redisson内部也是这么实现的。

3.6.Redissson可重试与避免超时释放原理:

image-20240503122435943

image-20240503123318771

注意,这里为null说明获取锁成功了。为null说明没有存在这个锁,所以当然会成功了。如果失败,返回的是当前存在的锁的有效期。
重试机制:

在Redisson中,可重试机制通常与分布式锁的实现紧密相关。当客户端尝试获取分布式锁但失败时(如锁已被其他客户端持有),Redisson会利用Redis的发布/订阅机制来实现锁的可重试。下面我将简要解释Redisson是如何结合消息订阅来实现可重试机制的:

  1. 锁获取失败与等待
    • 当客户端尝试获取分布式锁但失败时(比如调用lock.tryLock(waitTime, leaseTime, TimeUnit.unit)方法),Redisson会记录当前线程并启动一个定时任务或监听器来等待锁的释放。
  2. 消息订阅
    • 为了实现等待锁的释放,Redisson会利用Redis的发布/订阅机制。具体来说,Redisson会为每个锁维护一个特定的频道(channel)或主题(topic)。
    • 当客户端尝试获取锁但失败时,它会订阅该锁的频道。这样,一旦锁的持有者释放了锁,Redisson就会在该频道上发布一个消息。
  3. 监听器与重试
    • 客户端在订阅了锁的频道后,会设置一个监听器来监听该频道上的消息。
    • 当锁的持有者释放锁并在频道上发布消息时,监听器会立即收到通知。
    • 监听器在收到通知后,会触发一个重试机制,即再次尝试获取锁。
避免超时释放:
  • Redis中的看门狗策略通常与Redisson库一起使用,用于自动检测并处理过期键的机制。

  • 应用程序使用Redisson库监视Redis服务器上的指定键。当这些键被修改时,看门狗策略会自动触发相应的操作,如更新本地缓存或重新计算依赖项。

  • 这有助于应用程序实时响应Redis中的数据变化,并防止在数据发生变化时出现问题,如死锁和其他并发问题。

  • 具体来说,leaseTime是锁的有效期,也就是锁的持有时间。当调用lock(leaseTime, unit)方法并传入一个非-1的leaseTime时,这个特定的锁将会在一个固定的时间后自动释放,即使锁的持有者仍然在运行。在这种情况下,Redisson不会启动看门狗机制,因为锁的过期时间是由应用程序明确指定的。然而,如果调用lock()方法(没有传入leaseTime参数),或者传入-1作为leaseTime的值,那么锁将没有固定的过期时间。此时,Redisson会启动看门狗机制。看门狗会定期检查锁的持有者是否仍然活跃,并在锁即将过期时自动延长锁的有效期,从而确保锁不会被意外释放,直到锁的持有者显式地释放锁或者持有者不再活跃。
注意,只用我们没有设置时间(或者-1)才会触发这个策略。
3.7.主从一致性:
为什么以前会出现问题:

image-20240506172825214

当主节点出现问题时,从从节点获取锁就会成功。
而Redisson是怎么解决这个问题的呢:

image-20240506173309320

以下是如何使用Redisson的MultiLock的基本步骤:

  1. 配置Redisson客户端
    首先,你需要配置Redisson客户端以连接到Redis集群或哨兵模式,如之前所述。

  2. 创建RLock对象
    对于每个Redis节点,你需要创建一个RLock对象。这些RLock对象将用于组成MultiLock

    RLock lock1 = redisson1.getLock("lock1"); // 假设redisson1连接到第一个Redis节点  
    RLock lock2 = redisson2.getLock("lock2"); // 假设redisson2连接到第二个Redis节点  
    // ... 为其他节点创建RLock对象 ...
    

    注意:虽然锁的名称(如"lock1""lock2")可以不同,但出于一致性和可维护性的考虑,最好使用相同的锁名称。

  3. 组合RLock对象为MultiLock
    使用这些RLock对象创建一个MultiLock

    List<RLock> locks = Arrays.asList(lock1, lock2, /* ... 其他locks ... */);  
    RLock multiLock = redisson.getMultiLock(locks);
    
  4. 使用MultiLock
    现在你可以像使用普通的RLock一样使用MultiLock了。

    multiLock.lock();  
    try {  // 临界区代码,只有获得锁的线程才能执行  
    } finally {  multiLock.unlock();  
    }
    

    注意:在finally块中解锁是非常重要的,以确保锁总是被释放,即使发生异常。

  5. 关闭Redisson客户端
    完成所有操作后,确保关闭所有Redisson客户端实例以释放资源。

    redisson1.shutdown();  
    redisson2.shutdown();  
    // ... 关闭其他Redisson客户端 ...
    
  6. 测试
    为了测试MultiLock在多节点环境中的行为,你可以模拟节点故障、网络分区等情况,并观察MultiLock是否仍然能够正确工作。

总结:

在这里插入图片描述

3.7.异步秒杀:

原来的串行执行:

image-20240506175926269

优化如下:

image-20240506180246721

完整的流程如下:

image-20240506184456708

redisredis_1008">Part 1:保存到redis:增加redis中的优惠券数量:
 	@Override@Transactionalpublic void addSeckillVoucher(Voucher voucher) {// 保存优惠券save(voucher);// 保存秒杀信息SeckillVoucher seckillVoucher = new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);stringRedisTemplate.opsForValue().set("seckill:stock:" + voucher.getId(), voucher.getStock().toString());}
Part 2:编写Lua脚本:
image-20240506190359407
local voucherId = ARGV[1]
local userId = ARGV[2]
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
if(tonumber(redis.call('get',stockKey))<= 0) thenreturn 1
end
if(redis.call('sismember',orderKey,userId) == 1) thenreturn 2
end
redis.call('incrby',stockKey,-1)
redis.call('sadd',orderKey,userId)
return 0
  1. 参数定义:
    • ARGV[1]voucherId,表示优惠券ID。
    • ARGV[2]userId,表示用户的ID。
  2. 定义键名:
    • stockKey:这是优惠券库存的键名,格式为 'seckill:stock:' 加上 voucherId
    • orderKey:这是已购买(或已下单)该优惠券的用户的集合的键名,格式为 'seckill:order:' 加上 voucherId
  3. 检查库存:
    • 使用 redis.call('get', stockKey) 获取库存数量。
    • 使用 tonumber 函数将返回的字符串转换为数字。
    • 如果库存数量小于或等于0,则返回 1,表示库存不足。
  4. 检查用户是否已经购买:
    • 使用 redis.call('sismember', orderKey, userId) 检查用户是否已经在 orderKey 集合中。
    • 如果用户已经购买(即用户ID在集合中),则返回 2,表示用户已购买。
  5. 扣减库存并记录订单:
    • 如果上述两个条件都不满足(即库存充足且用户未购买),则执行以下操作:
      • 使用 redis.call('incrby', stockKey, -1) 扣减库存。
      • 使用 redis.call('sadd', orderKey, userId) 将用户ID添加到已购买集合中。
    • 这两个操作是原子性的,因为它们都在同一个Lua脚本中执行。
  6. 返回值:
    • 如果成功扣减库存并记录订单,则返回 0,表示操作成功。
    • 如果库存不足,返回 1
    • 如果用户已购买,返回 2
Part 3:基于阻塞队列的秒杀业务:
image-20240506204827037
private BlockingQueue<VoucherOrder >orderTasks = new ArrayBlockingQueue<>(1024*1024);
@Resource
private RedissonClient redissonClient;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}@Overridepublic Result seckillVoucher(Long voucherId) {Long resultT =stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),String.valueOf(UserHolder.getUser().getId()));int res = resultT.intValue();if(res!=0){return Result.fail(res==1?"库存不足":"不能重复下单");}long id = redisIDWorker.nextId("order");VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setVoucherId(voucherId);voucherOrder.setId(id);voucherOrder.setUserId(UserHolder.getUser().getId());//在父线程就先获取代理对象proxy = (IVoucherOrderService) AopContext.currentProxy();orderTasks.add(voucherOrder);return Result.ok(id);}
Part 4:异步下单业务:
private static final ExecutorService seckill_order_executor = Executors.newSingleThreadExecutor();private IVoucherOrderService proxy;private class VoucherOrderHandler implements Runnable{@PostConstructpublic void init(){seckill_order_executor.submit(new VoucherOrderHandler());}@Overridepublic void run() {while (true){try {VoucherOrder voucherOrder = orderTasks.take();handleVoucherOrder(voucherOrder);} catch (Exception e) {log.error("处理订单异常",e);}}}
}
  1. @PostConstruct注解:

    这个注解标记在init()方法上,表示该方法会在类的实例创建并注入到Spring容器后立即调用。这里是用来启动任务处理的逻辑。

  2. init()方法:

    seckill_order_executor是一个线程池,可能是ExecutorService类型的实例。
    方法中提交了一个新的VoucherOrderHandler实例到线程池执行。这意味着每次初始化VoucherOrderHandler,都会启动一个新的线程来处理订单任务,形成一个循环执行的任务队列。

  3. run()方法:

    作为Runnable接口的实现,run()方法包含实际的业务逻辑。使用一个无限循环while (true),确保线程将持续运行,直到程序停止或线程被显式中断。

  4. try-catch块:

    在循环中,从orderTasks队列(可能是BlockingQueue类型)中获取一个VoucherOrder对象并调用handleVoucherOrder(voucherOrder)进行处理。如果在处理过程中出现任何异常,捕获并记录错误日志,但不会中断循环。这样可以确保即使处理过程中出现问题,线程仍能继续尝试处理下一个订单。

private void handleVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();RLock lock = redissonClient.getLock("lock:order:" + userId);boolean isLock = lock.tryLock();if(!isLock){log.error("不允许重复下单");return;}try {proxy.createVoucher(voucherOrder);}finally {lock.unlock();}}
注意这里的两处细节,userId与createVoucher都与父线程有关系,而在这里是异步的子线程,所以代理对象要在父线程就创建好,而userid也不能通过线程本地直接获取。这里用Redisson重复验证是为了让线程更加安全。
 @Transactionalpublic void createVoucher (VoucherOrder voucherOrder){Long userid = voucherOrder.getId();long count = query().eq("user_id", userid).eq("voucher_id", voucherOrder.getVoucherId()).count();if (count > 0) {log.error("用户已经购买过一次!");return ;}boolean success = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherOrder.getVoucherId()).gt("stock",0).update();if(!success){log.error("库存不足");return;}save(voucherOrder);}
重复验证道理同上。

3.8.阻塞队列优化——消息队列:

基于上面的阻塞队列存在两个问题:1.内存问题 2.服务宕机的安全问题,所以我们必须进行优化。

在这里插入图片描述

1.独立于JVM,解决内存问题。
2.内部做了数据的持久化,避免安全问题。
笔记.assets/image-20240507125920501.png" alt="image-20240507125920501" />
(1):基于List

image-20240507130119033

image-20240507130403241

(2):PubSub:

image-20240507134533877

image-20240507134641523

image-20240507135041817

image-20240507135131273

如果没有被任何频道订阅,会直接丢失。

(3):Stream:

发送消息:

image-20240507135521200

读取消息:

image-20240507135729110

image-20240507140014875

image-20240507140118244

3.9.基于Stream的消费者组:

image-20240507184608215

image-20240507184732468

image-20240507184942090

基于Java代码:

image-20240507192911659

image-20240507193426970

总结:

image-20240507193455216

3.10.代码改造:

image-20240507193619350

(1):创建消息队列:
XGROUP CREATE stream.orders g1 0 MKSTREAM
这是创建消费者组,自动创建了队列。
(2):修改Lua脚本:
local voucherId = ARGV[1]
local userId = ARGV[2]
local orderId = ARGV[3]
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
if(tonumber(redis.call('get',stockKey))<= 0) thenreturn 1
end
if(redis.call('sismember',orderKey,userId) == 1) thenreturn 2
end
redis.call('incrby',stockKey,-1)
redis.call('sadd',orderKey,userId)
redis.call('xadd','strem.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
return 0
这行代码是使用 Redis 的 XADD 命令向名为 “stream.orders” 的 Stream 数据结构中添加一条新的消息。该消息包含了三个字段:“userId”、“voucherId” 和 “id”,分别对应传入的参数 userId、voucherId 和 orderId 的值。这样可以将订单相关的信息存储在 Redis 中,并且可以通过 Stream 数据结构的功能对这些信息进行处理和查询。这种方式可以方便地实现消息队列、事件日志等功能。
(3):修改发消息业务代码:
@Override
public Result seckillVoucher(Long voucherId) {
long id = redisIDWorker.nextId("order");
Long resultT =stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),String.valueOf(UserHolder.getUser().getId()),String.valueOf(id)
);
int res = resultT.intValue();
if(res!=0){return Result.fail(res==1?"库存不足":"不能重复下单");
}proxy = (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(id);
}
(4):收消息:
 private class VoucherOrderHandler implements Runnable{String queueName = "stream.orders";@PostConstructpublic void init(){seckill_order_executor.submit(new VoucherOrderHandler());}@Overridepublic void run() {while (true){try {List< MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));if(list == null || list.isEmpty()){continue;}//解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);//ackstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理订单异常",e);handlePendinglist();}}}private void handlePendinglist() {while (true){try {List< MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));if(list == null || list.isEmpty()){break;}//解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);//ackstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理pending异常",e);try {Thread.sleep(20);} catch (InterruptedException interruptedException) {e.printStackTrace();}}}}}
//增加订单业务不变
private void handleVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();RLock lock = redissonClient.getLock("lock:order:" + userId);boolean isLock = lock.tryLock();if(!isLock){log.error("不允许重复下单");return;}try {proxy.createVoucher(voucherOrder);}finally {lock.unlock();}}
我们分成三个部分来看:
4.1.正常接受处理消息:
String queueName = "stream.orders";@PostConstructpublic void init(){seckill_order_executor.submit(new VoucherOrderHandler());}@Overridepublic void run() {while (true){try {List< MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));if(list == null || list.isEmpty()){continue;}//解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);//ackstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理订单异常",e);handlePendinglist();}}}
  1. 参数详解:
    • Consumer.from(“g1”, “c1”)
      • 这表示从消费者组 “g1” 中的消费者 “c1” 读取数据。在 RedisStreams 中,消费者组用于允许多个消费者并发地读取同一个 Stream。注意,这里因为没有c1,所以先创建了c1。
    • StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2))
      • StreamReadOptions.empty() 创建一个空的读取选项对象。
      • .count(1) 设置读取的条目数量为 1。这意味着,无论 Stream 中有多少未读的数据,此方法仅返回一个条目。
      • .block(Duration.ofSeconds(2)) 设置读取操作的阻塞时间。如果 Stream 中没有新的数据可读,那么这个方法会阻塞最多 2 秒,然后返回一个空列表。
    • StreamOffset.create(queueName, ReadOffset.lastConsumed())
      • 这定义了从哪个位置开始读取 Stream。
      • queueName 是 Stream 的名称。
      • ReadOffset.lastConsumed() 表示从消费者 “c1” 最后消费的位置开始读取。如果消费者 “c1” 还没有消费过任何数据,那么这个位置可能是 Stream 的开始位置或任何其他预定义的位置(例如 Stream 的起始点)。
等价于:
XREADGROUP GROUP g1 c1 COUNT 1 STREAMS mystream >
4.2.处理因异常而位于pending-list的消息:
  private void handlePendinglist() {while (true){try {List< MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));if(list == null || list.isEmpty()){break;}//解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);//ackstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理pending异常",e);try {Thread.sleep(20);} catch (InterruptedException interruptedException) {e.printStackTrace();}}}}
与上面同理,只是这里是获取未确认的消息,这里的异常不做递归处理,因为外面本身就是true循环。
4.3.新增:
//增加订单业务不变
private void handleVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();RLock lock = redissonClient.getLock("lock:order:" + userId);boolean isLock = lock.tryLock();if(!isLock){log.error("不允许重复下单");return;}try {proxy.createVoucher(voucherOrder);}finally {lock.unlock();}}

四:其他业务:

4.1.不存在该表中字段处理方法如下,后续需要自己维护:

@TableField(exist = false)
private String icon;
/*** 用户姓名*/
@TableField(exist = false)
private String name;

4.2.点赞排序:

image-20240509175116540

改造前:
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
//......
stringRedisTemplate.opsForSet().add(key,userId.toString());
//......
stringRedisTemplate.opsForSet().remove(key,userId.toString());
改造后:
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
//...
stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());
//......
stringRedisTemplate.opsForZSet().remove(key,userId.toString());
部分代码如下:
 @Overridepublic Result queryBlogLikes(Long id) {String key = "blog:liked:" + id;Set <String> range = stringRedisTemplate.opsForZSet().range(key, 0, 4);if(range == null || range.isEmpty()){return Result.ok(Collections.emptyList());}List<Long> ids =  range.stream().map(Long::valueOf).collect(Collectors.toList());List <UserDTO> users = userService.listByIds(ids).stream().map(user-> BeanUtil.copyProperties(user,UserDTO.class)).collect(Collectors.toList());return Result.ok(users);}
注意,这里有个问题,取出的结果是有序的,都是我们在进行userService.listByIds(ids)时,数据库内部用的是WHERE id IN(1,5),而它在数据库内部的索引中,就优化了,导致无论怎么样,都是先1再5。可以用下面的sql语句优化:

image-20240509194119419

在MP中,就可以这么写:
@Override
public Result queryBlogLikes(Long id) {String key = "blog:liked:" + id;Set <String> range = stringRedisTemplate.opsForZSet().range(key, 0, 4);if(range == null || range.isEmpty()){return Result.ok(Collections.emptyList());}List<Long> ids =  range.stream().map(Long::valueOf).collect(Collectors.toList());String idStr = StrUtil.join(",", ids);List <UserDTO> users = userService.query().in("id",ids).last("ORDER BY FIELD(id,"+idStr+")").list().stream().map(user-> BeanUtil.copyProperties(user,UserDTO.class)).collect(Collectors.toList());return Result.ok(users);
}

4.3.共同关注:

基于Set集合的交集:
 @Overridepublic Result followCommons(Long id) {Long userId = UserHolder.getUser().getId();Set<String> intersect = stringRedisTemplate.opsForSet().intersect("follows:" + userId, "follows:" + id);if(intersect==null||intersect.isEmpty()){return Result.ok(Collections.emptyList());}List<Long> collect = intersect.stream().map(Long::valueOf).collect(Collectors.toList());List<UserDTO> userDTOS = userService.listByIds(collect).stream().map(user-> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());return Result.ok(userDTOS);}

4.4.关注推送——Feed流

image-20240509211548013

image-20240509212052545

image-20240509212325997

image-20240509213611387

image-20240509213803443

image-20240509213920882

以下是基于推模式:
image-20240509214104598
分页问题:

image-20240509214321964

image-20240509214603581

滚动查询:

image-20240512185712520

代码实现如下:
@Overridepublic Result queryBlogOfFollow(Long max, Integer offset) {Long userId = UserHolder.getUser().getId();String key = "feed:" + userId;Set <ZSetOperations.TypedTuple <String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);if(typedTuples == null || typedTuples.isEmpty()){return Result.ok();}List<Long>ids = new ArrayList<>(typedTuples.size());long minTime = 0;int os = 1;for(ZSetOperations.TypedTuple <String> tuple : typedTuples){ids.add(Long.valueOf(tuple.getValue()));long time = tuple.getScore().longValue();if(time == minTime){os++;}else{minTime = time;os = 1;}}List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + StrUtil.join(",", ids) + ")").list();for (Blog blog : blogs) {queryBlogUser(blog);queryBlogLikes(blog);}ScoreResult r = new ScoreResult();r.setList(blogs);r.setMinTime(minTime);r.setOffset(os);return Result.ok(r);}
下面是关键代码解释:
 Set <ZSetOperations.TypedTuple <String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);

参数

  1. key: 这是 Redis 中有序集合的键名。在这个例子中,它是通过 "feed:" + userId 构建的,其中 userId 是从 UserHolder 获取的当前用户的 ID。
  2. 0: 分数范围查询的起始分数(包含)。在这个例子中,查询从分数 0 开始。
  3. max: 分数范围查询的结束分数(不包含)。这是你要查询的最高分数(不包括这个分数)。
  4. offset: 分页查询的偏移量。这表示从有序集合中跳过多少个元素后再开始返回结果。
  5. 2: 这是一个限制参数,表示最多返回 2 个元素。然而,在实际应用中,这个限制可能不是非常有用,因为它会限制返回结果的数量。通常,你可能会根据分页参数(如每页大小)来动态设置这个值。
在这个例子中,通过 reverseRangeByScoreWithScores(key, 0, max, offset, 2),你正在查询从分数 0 到 max(不包含)之间的博客,按分数从高到低排序,并跳过 offset 个元素,最后最多返回 2 个结果。每个结果都是一个 TypedTuple,它包含博客的 ID 和其发布时间(作为分数)。
注意,分布时间越前的,id越小,所以从零开始,而且这里的os,及偏移量,是上次发布时间最早的时间的重复个数,不是在所有结果中的个数,比如时间戳为4 4 4 3,每次两个,第一次查出4与4,4虽然有三个,但是这里记录的是上一次查询中重复的最小时间,及两个,下一次,就从0到最大时间4,并且跳过2个,即第三个4开始查询。

4.5.附近商铺:

在这里插入图片描述

image-20240512202043735

redis_1687">我们可以先做数据预热,先在test中存入redis
@Testpublic void test(){List<Shop> list = shopService.list();Map<Long, List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));for(Map.Entry<Long, List<Shop>> entry : map.entrySet()){Long typeId = entry.getKey();String key = "shop:geo:" + typeId;List<Shop> value = entry.getValue();for(Shop shop : value){stringRedisTemplate.opsForGeo().add(key,new org.springframework.data.geo.Point(shop.getX(),shop.getY()),shop.getId().toString());}}
}
或者可以这么写:

image-20240512203734547

代码如下:
 @Overridepublic Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {if (x == null || y == null){// 根据类型分页查询Page<Shop> page = query().eq("type_id", typeId).page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));// 返回数据return Result.ok(page.getRecords());}int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;int end = (current + 1) * SystemConstants.DEFAULT_PAGE_SIZE;String key = SHOP_GEO_KEY + typeId;GeoResults<RedisGeoCommands.GeoLocation<String> >search = stringRedisTemplate.opsForGeo().search(key,GeoReference.fromCoordinate(x, y),new Distance(5000),RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end));if(search == null){return Result.ok();}List<GeoResult<RedisGeoCommands.GeoLocation<String>>>contents = search.getContent();if(contents.size()<=from){return Result.ok();}//截取List <Long> ids = new ArrayList<>(contents.size());Map<String,Distance>distanceMap = new HashMap<>(contents.size());contents.stream().skip(from).forEach(result->{String shopIdStr = result.getContent().getName();ids.add(Long.valueOf(shopIdStr));Distance distance = result.getDistance();distanceMap.put(shopIdStr,distance);});List<Shop> shops = query().in("id",ids).last("ORDER BY FIELD(id," + StrUtil.join(",", ids) + ")").list();for (Shop shop : shops) {shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());}return Result.ok(shops);}

4.6.用户签到:

image-20240512215935791

image-20240512220014515

image-20240513163758603

  @Overridepublic Result sign() {Long userId = UserHolder.getUser().getId();LocalDateTime now = LocalDateTime.now();String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));String key = USER_SIGN_KEY + userId + keySuffix;int day = now.getDayOfMonth() - 1;stringRedisTemplate.opsForValue().setBit(key, day, true);return Result.ok();}
String keySuffix = now.format(DateTimeFormatter.ofPattern(“:yyyyMM”));: 根据当前日期的年月(格式为"yyyyMM",例如"202302")创建一个字符串后缀,用于构建Redis键。
int day = now.getDayOfMonth() - 1;: 获取当前日期减1(因为数组或位数组通常从0开始计数,所以这里将日期减1来对应位数组的索引)。
stringRedisTemplate.opsForValue().setBit(key, day, true);: 使用Spring Data Redis的StringRedisTemplate操作位值。在Redis的键key对应的位数组中,设置第day位为true,表示用户在这一天签到了。
连续签到:

在这里插入图片描述

 @Overridepublic Result signCount() {Long userId = UserHolder.getUser().getId();LocalDateTime now = LocalDateTime.now();String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));String key = USER_SIGN_KEY + userId + keySuffix;int day = now.getDayOfMonth();List <Long> result = stringRedisTemplate.opsForValue().bitField(key,BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(day)).valueAt(0));if(result == null || result.isEmpty()){return Result.ok(0);}Long num = result.get(0);if(num == null || num == 0){return Result.ok(0);}int count = 0;while (true){if((num & 1) == 0){break;}else {count++;}num >>>= 1;}return Result.ok(count);}
关键代码如下:
List <Long> result = stringRedisTemplate.opsForValue().bitField(key,BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(day)).valueAt(0));
这段代码使用 StringRedisTemplate 和 Redis 的 bitField 命令从一个 Redis 字符串键的指定偏移量开始,获取一个指定大小的位字段的无符号整数值,并将结果存储在一个 List<Long> 中。

4.7.UV统计:

image-20240513170528253

在这里插入图片描述

image-20240513171219913


http://www.ppmy.cn/ops/40829.html

相关文章

Docker 部署 Nginx 实现一个极简的 负载均衡

背景: Nginx是异步框架的网页服务器&#xff0c;其常用作反向代理(负载均衡器)。在一般的小项目中, 服务器不多, 如果不考虑使用服务注册与发现, 使用Nginx 可以容易实现负载均衡。 在特此写一个快速入门 Nginx 的技术贴, 使用 Docker 部署 Nginx, 实现一个极简的加权轮询负载均…

【go项目01_学习记录11】

操作数据库 1 文章列表2 删除文章 1 文章列表 &#xff08;1&#xff09;先保证文章已经有多篇&#xff0c;可以直接在数据库中添加&#xff0c;或者访问链接: localhost:3000/articles/create&#xff0c;增加几篇文章。 &#xff08;2&#xff09;之前设置好了articles.ind…

爬虫 Python将网页内容保存为PDF(url转pdf) 譬如下载某个专栏下的全部文章

我看到一个不错的教程&#xff0c;想下载教程下全部文章到本地&#xff0c;有时间看看&#xff0c;但是问了作者没有电子档&#xff0c;就想办法了。 PS: 我一天天的到底在干嘛&#xff01;唉… 需求: 爬取一个网页里全部文章且存为pdf 参考链接: 【已解决】Python将网页内容…

JS_监听dom变化触发,new MutationObserver

MutationObserver 是一个用于监测 DOM 变化的接口&#xff0c;它提供了一种机制来异步观察在特定元素或文档中发生的 DOM 变化。 MutationObserver 的作用包括&#xff1a; 1.监测 DOM 变化&#xff1a;你可以创建一个 MutationObserver 实例&#xff0c;并指定一个回调函数。…

【论文笔记】CNN2GNN: How to Bridge CNN with GNN

Abstract CNN在视觉任务上表现优异&#xff0c;通常堆叠大量卷积核来提高训练表现&#xff1b; GNN成功用几个图神经层探索了图数据之间的潜在拓扑关系。 由于缺乏图结构&#xff0c;在非图数据上无法使用GNN&#xff0c;在大规模场景下推理延迟较高。 提出问题&#xff1a;如…

blender 制作圆角立方体模型,倒角实现。cocos 使用。导出fbx

图片&#xff1a; 步骤&#xff1a; 1.首先创建一个立方体&#xff0c;这里可以使用默认的立方体。 2.在属性面板选择如“扳手”图标一样的修改器工具。 3.设置数量和段数实现圆角的圆滑效果&#xff0c;没有菱角。 保存导出相关的教程&#xff1a;

Linux防火墙iptalbes

1 iptalbes 1.1 概念 防火墙(Firewall)是一种隔离技术&#xff0c;用于安全管理与筛选的软件和硬件设备&#xff0c;使计算机内网和外网分开&#xff0c;可以防止外部网络用户以非法手段通过外部网络进入内部网络&#xff0c;保护内网免受外部非法用户的侵入。 1.2 SELinux …

景源畅信电商:经营抖店需要用电脑吗?

在数字时代的浪潮中&#xff0c;抖音小店(简称抖店)成为了众多创业者和品牌的新宠。它不仅提供了便捷的线上销售渠道&#xff0c;还通过短视频的形式拉近了卖家与买家的距离。对于这样一个新兴的电商平台&#xff0c;许多跃跃欲试的商家都关心同一个问题&#xff1a;经营抖店究…

虚拟机有线已连接但无法上网—·可能性之一

背景 VMware虚拟机&#xff0c;搭建了三台Linux服务器&#xff0c;组成Hadoop集群&#xff0c;由于在Hadoop102上有一些经常与Mysql数据库交互的任务&#xff0c;需要经常打开运行&#xff0c;而Hadoop103和104则经常处于关闭状态&#xff0c;一段时间后再次启动集群时候&…

如何进行并行执行的诊断与调优 —— 《OceanBase 并行执行》系列 6

在诊断并行执行问题时&#xff0c;我们可以从两个主要方面展开分析。首先&#xff0c;从整体系统层面进行考量&#xff0c;比如检查网络是否畅通、磁盘IO是否过载、CPU资源是否已用满&#xff1b;其次&#xff0c;针对具体的SQL语句进行深入剖析&#xff0c;定位问题SQL&#x…

【软考网络工程师】每日练题学知识

1.在EIGRP协议中&#xff0c;某个路由器收到了两条路径到达目标网络&#xff0c;路径1的带宽为100Mbps&#xff0c;延迟2ms&#xff0c;路径2的带宽为50Mbps&#xff0c;迟为4ms&#xff0c;如果EIGRP使用带宽和延迟的综合度量标准&#xff0c;那么该路由器选择的最佳路径是&am…

超级好用的C++实用库之日志类

&#x1f4a1; 需要该C实用库源码的大佬们&#xff0c;可搜索微信公众号“希望睿智”。添加关注后&#xff0c;输入消息“超级好用的C实用库”&#xff0c;即可获得源码的下载链接。 概述 日志类主要用于在程序运行过程中记录信息、错误、警告以及其他需要跟踪的数据&#xff0…

【专利】一种日志快速分析方法、设备、存储介质

公开号CN116560938A申请号CN202310311478.5申请日2023.03.28 是我在超音速人工智能科技股份有限公司(833753) 职务作品&#xff0c;第一发明人是董事长夫妇&#xff0c;第二发明人是我。 ** 注意** &#xff1a; 内容比较多&#xff0c;还有流程图、界面等。请到 专利指定页面…

清华团队开发首个AI医院小镇模拟系统;阿里云发布通义千问 2.5:超越GPT-4能力;Mistral AI估值飙升至60亿美元

&#x1f989; AI新闻 &#x1f680; 清华团队开发首个AI医院小镇模拟系统 摘要&#xff1a;来自清华的研究团队最近开发出了一种创新的模拟系统&#xff0c;名为"Agent Hospital"&#xff0c;该系统能够完全模拟医患看病的全流程&#xff0c;其中包括分诊、挂号、…

VC 编程开发中的 封装类 :log日志类 和SQL server 操作类 源代码

VC 编程开发中的 封装类 &#xff1a;日志类 和SQL server 操作类 源代码 在VC&#xff08;Visual C&#xff09;开发中&#xff0c;日志文件输出是一个至关重要的环节&#xff0c;它对于程序调试、问题排查以及系统监控等方面都具有不可替代的作用。以下是对日志文件输出在VC开…

【PB案例学习笔记】-01创建应用、窗口与控件

写在前面 这是PB案例学习笔记系列文章的第一篇&#xff0c;也是最基础的一篇。后续文章中【创建程序基本框架】部分操作都跟这篇文章一样&#xff0c; 将不再重复。该系列文章是针对具有一定PB基础的读者&#xff0c;通过一个个由浅入深的编程实战案例学习&#xff0c;提高编…

Vue的学习 —— <vue指令>

目录 前言 正文 内容渲染指令 内容渲染指令的使用方法 v-text v-html 属性绑定指令 双向数据绑定指令 事件绑定指令 条件渲染指令 循环列表渲染指令 侦听器 前言 在完成Vue开发环境的搭建后&#xff0c;若想将Vue应用于实际项目&#xff0c;首要任务是学习Vue的基…

Java入门基础学习笔记14——数据类型转换

类型转换&#xff1a; 1、存在某种类型的变量赋值给另一种类型的变量&#xff1b; 2、存在不同类型的数据一起运算。 自动类型转换&#xff1a; 类型范围小的变量&#xff0c;可以直接赋值给类型范围大的变量。 byte类型赋值给int类型&#xff0c;就是自动类型转换。 pack…

Hugging Muti Agent:第一章

Hugging Muti Agent系列文章目录 学习资料链接&#xff1a;Hugging Muti Agent&#xff08;二月学习&#xff09; 文章目录 Hugging Muti Agent系列文章目录第一章&#xff1a;前期准备1.1 获取MetaGPT1.2 配置MetaGPT1.2.1 调用 ChatGPT API 服务 1.3 首次尝试 第一章&#…

爬虫工作量由小到大的思维转变---<第七十四章 > Scrapy爬虫关闭方法(close)的机制及其在爬虫优化中的重要性

前言 Scrapy爬虫也有一个至关重要的功能——close方法&#xff0c;它控制着爬虫的“生命周期”。本论文旨在探讨Scrapy框架中close方法的核心作用和定义&#xff0c;以及它在爬虫管理与优化过程中的重要性。我们将深入探索如何通过这个强大的功能去优雅地结束一个爬取任务&…