【黑马点评|项目】万字总结(下)

devtools/2025/3/16 23:55:57/

文章上半部分: 【黑马点评|项目】万字总结(上)

优惠卷秒杀

当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:
id的规律性太明显,容易出现信息的泄露,被不怀好意的人伪造请求
受单表数据量的限制,MySQL中表能够存储的数据有限,会出现分库分表的情况,id不能够一直自增

分布式ID的实现

分布式ID的实现方式:
~~~~     UUID
~~~~     Redis自增
~~~~     数据库自增
~~~~     snowflake算法(雪花算法)
在这里插入图片描述

java">@Component
public class RedisIdWorker {@Resourceprivate StringRedisTemplate stringRedisTemplate;/*** 开始时间戳*/private static final long BEGIN_TIMESTAMP = 1640995200;/*** 序列化位数*/private static final int COUNT_BITS = 32;/*** 生成分布式ID* @param keyPrefix* @return*/public long nextId(String keyPrefix){// 1、生成时间戳LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timestamp = nowSecond - BEGIN_TIMESTAMP;// 2、生成序列号// 以当天的时间戳为key,防止一直自增下去导致超时,这样每天的极限都是 2^{31}String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd"));Long count = stringRedisTemplate.opsForValue().increment(ID_PREFIX + keyPrefix + ":" + date);// 3、拼接并返回return timestamp << COUNT_BITS | count;}public static void main(String[] args) {LocalDateTime time = LocalDateTime.of(2022, 1, 1, 0, 0, 0);long second = time.toEpochSecond(ZoneOffset.UTC);System.out.println("second = " + second);}
}

优惠卷秒杀接口

在这里插入图片描述

java">/*** 抢购秒杀券** @param voucherId* @return*/
@Transactional
@Override
public Result seckillVoucher(Long voucherId) {// 1、查询秒杀券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2、判断秒杀券是否合法if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 秒杀券的开始时间在当前时间之后return Result.fail("秒杀尚未开始");}if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 秒杀券的结束时间在当前时间之前return Result.fail("秒杀已结束");}if (voucher.getStock() < 1) {return Result.fail("秒杀券已抢空");}// 5、秒杀券合法,则秒杀券抢购成功,秒杀券库存数量减一boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>().eq(SeckillVoucher::getVoucherId, voucherId).setSql("stock = stock -1"));if (!flag){throw new RuntimeException("秒杀券扣减失败");}// 6、秒杀成功,创建对应的订单,并保存到数据库VoucherOrder voucherOrder = new VoucherOrder();long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);voucherOrder.setId(orderId);voucherOrder.setUserId(ThreadLocalUtls.getUser().getId());voucherOrder.setVoucherId(voucherOrder.getId());flag = this.save(voucherOrder);if (!flag){throw new RuntimeException("创建秒杀券订单失败");}// 返回订单idreturn Result.ok(orderId);
}

单体项目下一人多单的情况

~~~~     上一节我们通过分布式ID+事务成功完成了优惠券秒杀功能,并且在测试后发现逻辑跑通了,看上去已经成功的解决了秒杀优惠券功能。但是前面我们只是正常的测试,那如果换到高并发的场景下能否成功解决?现在就让我们使用 Jmeter 来进行压力测试看看吧!
~~~~     通过压力测试我们发现,在多次请求的情况下,可能出现数据库优惠卷数量为负数的情况,这是我们不愿意看到的,那么为什么会产生超卖的情况呢,如下图

在这里插入图片描述
线程1查询库存,发现库存充足,创建订单,然后准备对库存进行扣减,但此时线程2和线程3也进行查询,同样发现库存充足,然后线程1执行完扣减操作后,库存变为了0,线程2和线程3同样完成了库存扣减操作,最终导致库存变成了负数!这就是超卖问题的完整流程

那么我们该如何有效防止超卖问题的发生呢,以下提供几种常见的解决方案

悲观锁:认为线程安全问题一定会发生,因此操作数据库之前都需要先获取锁,确保线程串行执行。常见的悲观锁有:synchronized、lock
乐观锁: 认为线程安全问题发生事小概率的,因此并不会加锁,而是在修改数据库数据的时候判断一下是否有人修改过数据,如果没变,则说明是线程安全的,如果修改过,那么说明线程是不不安全的,直接抛异常或者等待重试。常见的实现方式有:版本号法、CAS操作、乐观锁算法

悲观锁和乐观锁的比较
~~~~     悲观锁比乐观锁的性能低:悲观锁需要先加锁再操作,而乐观锁不需要加锁,所以乐观锁通常具有更好的性能。
~~~~     悲观锁比乐观锁的冲突处理能力低:悲观锁在冲突发生时直接阻塞其他线程,乐观锁则是在提交阶段检查冲突并进行重试。
~~~~     悲观锁比乐观锁的并发度低:悲观锁存在锁粒度较大的问题,可能会限制并发性能;而乐观锁可以实现较高的并发度。
~~~~     应用场景:两者都是互斥锁,悲观锁适合写入操作较多、冲突频繁的场景;乐观锁适合读取操作较多、冲突较少的场景。

拓展:CAS
~~~~     CAS(Compare and Swap)是一种并发编程中常用的原子操作,用于解决多线程环境下的数据竞争问题。它是乐观锁算法的一种实现方式。
~~~~     CAS操作包含三个参数:内存地址V、旧的预期值A和新的值B。CAS的执行过程如下:
~~~~     比较(Compare):将内存地址V中的值与预期值A进行比较。
判断(Judgment):如果相等,则说明当前值和预期值相等,表示没有发生其他线程的修改。
交换(Swap):使用新的值B来更新内存地址V中的值。
~~~~     CAS操作是一个原子操作,意味着在执行过程中不会被其他线程中断,保证了线程安全性。如果CAS操作失败(即当前值与预期值不相等),通常会进行重试,直到CAS操作成功为止。
~~~~     CAS操作适用于精细粒度的并发控制,可以避免使用传统的加锁机制带来的性能开销和线程阻塞。然而,CAS操作也存在一些限制和注意事项:
~~~~     ABA问题:CAS操作无法感知到对象值从A变为B又变回A的情况,可能会导致数据不一致。为了解决ABA问题,可以引入版本号或标记位等机制。
自旋开销:当CAS操作失败时,需要不断地进行重试,会占用CPU资源。如果重试次数过多或者线程争用激烈,可能会引起性能问题。
并发性限制:如果多个线程同时对同一内存地址进行CAS操作,只有一个线程的CAS操作会成功,其他线程需要重试或放弃操作。
在Java中,提供了相关的CAS操作支持,如AtomicInteger、AtomicLong、AtomicReference等类,可以实现基于CAS操作的线程安全操作。

乐观锁解决一人多单超卖情况

  • 方法一:版本号法

在这里插入图片描述
首先我们要为 tb_seckill_voucher 表新增一个版本号字段 version ,线程1查询完库存,在进行库存扣减操作的同时将版本号+1,线程2在查询库存时,同时查询出当前的版本号,发现库存充足,也准备执行库存扣减操作,但是需要判断当前的版本号是否是之前查询时的版本号,结果发现版本号发生了改变,这就说明数据库中的数据已经发生了修改,需要进行重试(或者直接抛异常中断)

  • 方法二:CAS方法

在这里插入图片描述
CAS法类似与版本号法,但是不需要另外在添加一个 version 字段,而是直接使用库存替代版本号,线程1查询完库存后进行库存扣减操作,线程2在查询库存时,发现库存充足,也准备执行库存扣减操作,但是需要判断当前的库存是否是之前查询时的库存,结果发现库存数量发生了改变,这就说明数据库中的数据已经发生了修改,需要进行重试(或者直接抛异常中断)

综上所诉,CAS发比较简单,能够避免更多的内存开销

java">// 5、秒杀券合法,则秒杀券抢购成功,秒杀券库存数量减一
boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>().eq(SeckillVoucher::getVoucherId, voucherId).eq(SeckillVoucher::getStock, voucher.getStock()).setSql("stock = stock -1"));

单体项目下一人一单的超卖情况

在这里插入图片描述

java">/*** 抢购秒杀券** @param voucherId* @return*/
@Transactional
@Override
public Result seckillVoucher(Long voucherId) {// 1、查询秒杀券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2、判断秒杀券是否合法if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 秒杀券的开始时间在当前时间之后return Result.fail("秒杀尚未开始");}if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 秒杀券的结束时间在当前时间之前return Result.fail("秒杀已结束");}if (voucher.getStock() < 1) {return Result.fail("秒杀券已抢空");}// 3、判断当前用户是否是第一单int count = this.count(new LambdaQueryWrapper<VoucherOrder>().eq(VoucherOrder::getUserId, ThreadLocalUtls.getUser().getId()));if (count >= 1) {// 当前用户不是第一单return Result.fail("用户已购买");}// 4、用户是第一单,可以下单,秒杀券库存数量减一boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>().eq(SeckillVoucher::getVoucherId, voucherId).gt(SeckillVoucher::getStock, 0).setSql("stock = stock -1"));if (!flag) {throw new RuntimeException("秒杀券扣减失败");}// 5、创建对应的订单,并保存到数据库VoucherOrder voucherOrder = new VoucherOrder();long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);voucherOrder.setId(orderId);voucherOrder.setUserId(ThreadLocalUtls.getUser().getId());voucherOrder.setVoucherId(voucherOrder.getId());flag = this.save(voucherOrder);if (!flag) {throw new RuntimeException("创建秒杀券订单失败");}// 6、返回订单idreturn Result.ok(orderId);
}

~~~~     我们再次通过压力测试来判断一个人是否买只能一单,最后经过测试我们发现并没有,一个人在压力测试下可以出现一个人买多单的情况,那么这是为什么呢。
~~~~     出现这个问题的原因和前面库存为负数数的情况是一样的,线程1查询当前用户是否有订单,当前用户没有订单准备下单,此时线程2也查询当前用户是否有订单,由于线程1还没有完成下单操作,线程2同样发现当前用户未下单,也准备下单,这样明明一个用户只能下一单,结果下了两单,也就出现了超卖问题

悲观锁解决一人一单超卖情况

乐观锁需要判断数据是否修改,而当前是判断当前是否存在,所以无法像解决库存超卖一样使用CAS机制,但是可以使用版本号法,但是版本号法需要新增一个字段,所以这里为了方便,就直接演示使用悲观锁解决超卖问题

在这里插入图片描述

java">/*** 抢购秒杀券** @param voucherId* @return*/
@Transactional
@Override
public Result seckillVoucher(Long voucherId) {// 1、查询秒杀券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2、判断秒杀券是否合法if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {// 秒杀券的开始时间在当前时间之后return Result.fail("秒杀尚未开始");}if (voucher.getEndTime().isBefore(LocalDateTime.now())) {// 秒杀券的结束时间在当前时间之前return Result.fail("秒杀已结束");}if (voucher.getStock() < 1) {return Result.fail("秒杀券已抢空");}// 3、创建订单Long userId = ThreadLocalUtls.getUser().getId();synchronized (userId.toString().intern()) {// 创建代理对象,使用代理对象调用第三方事务方法, 防止事务失效IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(userId, voucherId);}
}/**1. 创建订单2.  3. @param userId4. @param voucherId5. @return*/
@Transactional
public Result createVoucherOrder(Long userId, Long voucherId) {
//        synchronized (userId.toString().intern()) {// 1、判断当前用户是否是第一单int count = this.count(new LambdaQueryWrapper<VoucherOrder>().eq(VoucherOrder::getUserId, userId));if (count >= 1) {// 当前用户不是第一单return Result.fail("用户已购买");}// 2、用户是第一单,可以下单,秒杀券库存数量减一boolean flag = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>().eq(SeckillVoucher::getVoucherId, voucherId).gt(SeckillVoucher::getStock, 0).setSql("stock = stock -1"));if (!flag) {throw new RuntimeException("秒杀券扣减失败");}// 3、创建对应的订单,并保存到数据库VoucherOrder voucherOrder = new VoucherOrder();long orderId = redisIdWorker.nextId(SECKILL_VOUCHER_ORDER);voucherOrder.setId(orderId);voucherOrder.setUserId(ThreadLocalUtls.getUser().getId());voucherOrder.setVoucherId(voucherOrder.getId());flag = this.save(voucherOrder);if (!flag) {throw new RuntimeException("创建秒杀券订单失败");}// 4、返回订单idreturn Result.ok(orderId);
//        }
}

这时候我们需要注意几个问题:

  1. 锁的范围尽量小。synchronized尽量锁代码块,而不是方法,锁的范围越大性能越低
  2. 锁的对象一定要是一个不变的值。我们不能直接锁 Long 类型的 userId,每请求一次都会创建一个新的 userId 对象,synchronized 要锁不变的值,所以我们要将 Long 类型的 userId 通过 toString()方法转成 String 类型的 userId,toString()方法底层(可以点击去看源码)是直接 new 一个新的String对象,显然还是在变,所以我们要使用 intern() 方法从常量池中寻找与当前 字符串值一致的字符串对象,这就能够保障一个用户 发送多次请求,每次请求的 userId 都是不变的,从而能够完成锁的效果(并行变串行)
  3. 我们要锁住整个事务,而不是锁住事务内部的代码。如果我们锁住事务内部的代码会导致其它线程能够进入事务,当我们事务还未提交,锁一旦释放,仍然会存在超卖问题,例如下面这种情况,我们锁住的仅仅是代码,而并没有锁住数据库事务提交这部分,一旦代码执行完毕,锁就立即释放了,但是这个时候我们的事务并没有提交,这就仍然会出现超卖的问题。

在这里插入图片描述

  1. Spring的@Transactional注解要想事务生效,必须使用动态代理。Service中一个方法中调用另一个方法,另一个方法使用了事务,此时会导致@Transactional失效,所以我们需要创建一个代理对象,使用代理对象来调用方法。

事务失效的几种情况我们可以看这一篇文章 :spring 事务失效的 12 种场景_spring 截获duplicatekeyexception 不抛异常-CSDN博客

集群下的一人一单超卖问题

首先,在IDEA中启动两个SpringBoot程序,一个端口号是8081,另一个端口是8082:
在这里插入图片描述
打开nginx的负载均衡
在这里插入图片描述

j经过压力测试我们发现,两个服务器有两把锁,这个synchronized锁形同虚设,这是由于synchronized是本地锁,只能提供线程级别的同步,每个JVM中都有一把synchronized锁,不能跨 JVM 进行上锁,当一个线程进入被 synchronized 关键字修饰的方法或代码块时,它会尝试获取对象的内置锁(也称为监视器锁)。如果该锁没有被其他线程占用,则当前线程获得锁,可以继续执行代码;否则,当前线程将进入阻塞状态,直到获取到锁为止。而现在我们是创建了两个节点,也就意味着有两个JVM,所以synchronized会失效!

分布式锁

前面sychronized锁失效的原因是由于每一个JVM都有一个独立的锁监视器,用于监视当前JVM中的sychronized锁,所以无法保障多个集群下只有一个线程访问一个代码块。所以我们直接将使用一个分布锁,在整个系统的全局中设置一个锁监视器,从而保障不同节点的JVM都能够识别,从而实现集群下只允许一个线程访问一个代码块

在这里插入图片描述
分布式锁的特点:

  • 多线程可见。
  • 互斥。分布式锁必须能够确保在任何时刻只有一个节点能够获得锁,其他节点需要等待。
  • 高可用。分布式锁应该具备高可用性,即使在网络分区或节点故障的情况下,仍然能够正常工作。(容错性)当持有锁的节点发生故障或宕机时,系统需要能够自动释放该锁,以确保其他节点能够继续获取锁。
  • 高性能。分布式锁需要具备良好的性能,尽可能减少对共享资源的访问等待时间,以及减少锁竞争带来的开销。
  • 安全性。(可重入性)如果一个节点已经获得了锁,那么它可以继续请求获取该锁而不会造成死锁。(锁超时机制)为了避免某个节点因故障或其他原因无限期持有锁而影响系统正常运行,分布式锁通常应该设置超时机制,确保锁的自动释放。

分布式锁常见的实现方式:
在这里插入图片描述

  • 基于关系数据库:可以利用数据库的事务特性和唯一索引来实现分布式锁。通过向数据库插入一条具有唯一约束的记录作为锁,其他进程在获取锁时会受到数据库的并发控制机制限制。
  • 基于缓存(如Redis):使用分布式缓存服务(如Redis)提供的原子操作来实现分布式锁。通过将锁信息存储在缓存中,其他进程可以通过检查缓存中的锁状态来判断是否可以获取锁。
  • 基于ZooKeeper:ZooKeeper是一个分布式协调服务,可以用于实现分布式锁。通过创建临时有序节点,每个请求都会尝试创建一个唯一的节点,并检查自己是否是最小节点,如果是,则表示获取到了锁。
  • 基于分布式算法:还可以利用一些分布式算法来实现分布式锁,例如Chubby、DLM(Distributed Lock Manager)等。这些算法通过在分布式系统中协调进程之间的通信和状态变化,实现分布式锁的功能

获取锁:

方案1:

# 添加锁
setnx [key] [value]
# 为锁设置过期时间,超时释放,避免死锁
expire [key] [time]

方案2 (这种方案吧设置锁和设置超时时间放在一起保证了操作的原子性)

# 添加锁
set [key] [value] ex [time] nx

分布式锁解决超卖问题

在这里插入图片描述
创建分布式锁

java">public class SimpleRedisLock implements Lock {/*** RedisTemplate*/private StringRedisTemplate stringRedisTemplate;/*** 锁的名称*/private String name;public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {this.stringRedisTemplate = stringRedisTemplate;this.name = name;}/*** 获取锁** @param timeoutSec 超时时间* @return*/@Overridepublic boolean tryLock(long timeoutSec) {String id = Thread.currentThread().getId() + "";// SET lock:name id EX timeoutSec NXBoolean result = stringRedisTemplate.opsForValue().setIfAbsent("lock:" + name, id, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(result);}/*** 释放锁*/@Overridepublic void unlock() {stringRedisTemplate.delete("lock:" + name);}
}

使用锁

java">// 3、创建订单(使用分布式锁)
Long userId = ThreadLocalUtls.getUser().getId();
SimpleRedisLock lock = new SimpleRedisLock(stringRedisTemplate, "order:" + userId);
boolean isLock = lock.tryLock(1200);
if (!isLock) {// 索取锁失败,重试或者直接抛异常(这个业务是一人一单,所以直接返回失败信息)return Result.fail("一人只能下一单");
}
try {// 索取锁成功,创建代理对象,使用代理对象调用第三方事务方法, 防止事务失效IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(userId, voucherId);
} finally {lock.unlock();
}

分布锁的优化1

上一节,我们实现了一个简单的分布式锁,但是会存在一个问题:当线程1获取锁后,由于业务阻塞,线程1的锁超时释放了,这时候线程2趁虚而入拿到了锁,然后此时线程1业务完成了,然后把线程2刚刚获取的锁给释放了,这时候线程3又趁虚而入拿到了锁,这就导致又出现了超卖问题!(但是这个在小项目(并发数不高)中出现的概率比较低,在大型项目(并发数高)情况下是有一定概率的)

在这里插入图片描述
那么该如何解决这个问题呢,其实我们可以在释放锁的时候判断一下这个锁是否是自己的,如果是自己的那么就释放掉
在这里插入图片描述

java">package com.hmdp.utils.lock.impl;import cn.hutool.core.lang.UUID;
import com.hmdp.utils.lock.Lock;
import org.springframework.data.redis.core.StringRedisTemplate;import java.util.concurrent.TimeUnit;/*** @author ghp* @title* @description*/
public class SimpleRedisLock implements Lock {/*** RedisTemplate*/private StringRedisTemplate stringRedisTemplate;/*** 锁的名称*/private String name;/*** key前缀*/public static final String KEY_PREFIX = "lock:";/*** ID前缀*/public static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {this.stringRedisTemplate = stringRedisTemplate;this.name = name;}/*** 获取锁** @param timeoutSec 超时时间* @return*/@Overridepublic boolean tryLock(long timeoutSec) {String threadId = ID_PREFIX + Thread.currentThread().getId() + "";// SET lock:name id EX timeoutSec NXBoolean result = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(result);}/*** 释放锁*/@Overridepublic void unlock() {// 判断 锁的线程标识 是否与 当前线程一致String currentThreadFlag = ID_PREFIX + Thread.currentThread().getId();String redisThreadFlag = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);if (currentThreadFlag != null || currentThreadFlag.equals(redisThreadFlag)) {// 一致,说明当前的锁就是当前线程的锁,可以直接释放stringRedisTemplate.delete(KEY_PREFIX + name);}// 不一致,不能释放}
}

分布式锁优化2

在上一节中,我们通过给锁添加一个线程标识,并且在释放锁时添加一个判断,从而防止锁超时释放产生的超卖问题,一定程度上解决了超卖问题,但是仍有可能发生超卖问题(出现超卖概率更低了):当线程1获取锁,执行完业务然后并且判断完当前锁是自己的锁时,但就在此时发生了阻塞,结果锁被超时释放了,线程2立马就趁虚而入了,获得锁执行业务,但就在此时线程1阻塞完成,由于已经判断过锁,已经确定锁是自己的锁了,于是直接就删除了锁,结果删的是线程2的锁,这就又导致线程3趁虚而入了,从而继续发生超卖问题
备注:我们可以在判断删除锁的那行代码上打一个断点,然后user1发送一个请求,获取锁,手动把锁删了,模拟锁超时释放,然后使用user2发送一个请求,成功获取锁,从而模拟上诉过程,检验超卖问题
这就是没有保证确认是否是自己的锁和删除锁的原子性一致问题

在这里插入图片描述
PS:虽然这个情况发生的概率较低,但是根据墨菲定律,我们最好不要抱有侥幸心理,不然最终我们会在这个细微的问题上付诸沉重的代价!你可能还会想,判断锁和释放锁在同一个方法中,并且两者之间没有别的代码,为什么会发生阻塞呢?JVM的垃圾回收机制会导致短暂的阻塞

那么我们该如何保障 判断锁 和 释放锁 这连段代码的原子性呢?答案是使用Lua脚本,关于Lua脚本相关知识可以参考这篇文章:Lua脚本快速入门

在这里插入图片描述
下面是释放锁的流程
在这里插入图片描述
Lua脚本

---
--- Generated by EmmyLua(https://github.com/EmmyLua)
--- Created by ghp.
--- DateTime: 2023/7/13 16:19
---
-- 比较缓存中的线程标识与当前线程标识是否一致
if (redis.call('get', KEYS[1]) == ARGV[1]) then-- 一致,直接删除return redis.call('del', KEYS[1])
end
-- 不一致,返回0
return 0

代码实现

java">package com.hmdp.utils.lock.impl;import cn.hutool.core.lang.UUID;
import com.hmdp.utils.lock.Lock;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;import java.util.Collections;
import java.util.concurrent.TimeUnit;/*** @author ghp* @title* @description*/
public class SimpleRedisLock implements Lock {/*** RedisTemplate*/private StringRedisTemplate stringRedisTemplate;/*** 锁的名称*/private String name;/*** key前缀*/private static final String KEY_PREFIX = "lock:";/*** ID前缀*/private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {this.stringRedisTemplate = stringRedisTemplate;this.name = name;}/*** 获取锁** @param timeoutSec 超时时间* @return*/@Overridepublic boolean tryLock(long timeoutSec) {String threadId = ID_PREFIX + Thread.currentThread().getId() + "";// SET lock:name id EX timeoutSec NXBoolean result = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(result);}/*** 加载Lua脚本*/private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("lua/unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}/*** 释放锁*/@Overridepublic void unlock() {// 执行lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());}
}

这里我们使用静态代码块的方法来保证只加载一次lua脚本

现在我们的分布式锁满足了:
~~~~     多线程可见,将锁放到Redis中,所有的JVM都可以同时看到互斥,set ex nx指令互斥
~~~~     高可用,层层优化,即使是特别极端的情况下照样可以防止超卖
~~~~     高性能,Redis的IO速度很快,Lua脚本的性能也很快
~~~~     安全性,这个不用多说了,通过给锁夹线程标识+Lua封装Redis指令充分保障了线程安全,不那么容易出现并发安全问题,同时采用超时释放避免死锁

Redisson

经过优化1和优化2,我们实现的分布式锁已经达到生产可用级别了,但是还不够完善,比如:
~~~~     分布式锁不可重入:不可重入是指同一线程不能重复获取同一把锁。比如,方法A中调用方法B,方法A需要获取分布式锁,方法B同样需要获取分布式锁,线程1进入方法A获取了一次锁,进入方法B又获取一次锁,由于锁不可重入,所以就会导致死锁
~~~~     分布式锁不可重试:获取锁只尝试一次就返回false,没有重试机制,这会导致数据丢失,比如线程1获取锁,然后要将数据写入数据库,但是当前的锁被线程2占用了,线程1直接就结束了而不去重试,这就导致数据发生了丢失
~~~~     分布式锁超时释放:超时释放机机制虽然一定程度避免了死锁发生的概率,但是如果业务执行耗时过长,期间锁就释放了,这样存在安全隐患。锁的有效期过短,容易出现业务没执行完就被释放,锁的有效期过长,容易出现死锁,所以这是一个大难题!
我们可以设置一个较短的有效期,但是加上一个 心跳机制 和 自动续期:在锁被获取后,可以使用心跳机制并自动续期锁的持有时间。通过定期发送心跳请求,显示地告知其他线程或系统锁还在使用中,同时更新锁的过期时间。如果某个线程持有锁的时间超过了预设的有效时间,其他线程可以尝试重新获取锁。
~~~~     主从一致性问题:如果Redis提供了主从集群,主从同步存在延迟,线程1在主节点获取了锁,但是尚未同步给从节点的时候,主节点宕机了,这时候选择一个从节点当主,那么这时候其他线程就会趁虚而入拿到锁

我们如果想要更进一步优化分布式锁,当然是可以的,但是没必要,除非是迫不得已,我们完全可以直接使用已经造好的轮子,比如:Redisson。Redssion是一个十分成熟的Redis框架,功能也很多,比如:分布式锁和同步器、分布式对象、分布式集合、分布式服务,各种Redis实现分布式的解决方案。简而言之Redisson就是一个使用Redis解决分布式问题的方案的集合,当然它不仅仅是解决分布式相关问题,还包含其它的一些问题。

所以说分布式锁的究极优化就是使用别人造好的轮子🤣

Maven坐标

 <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version></dependency>

配置Redission

java">@Configuration
public class RedisConfig {@Beanpublic RedissonClient redisson() {Config config = new Config();config.useSingleServer().setAddress(RedisConstants.REDIS + "://" + RedisConstants.REDIS_HOST + ":" + RedisConstants.REDIS_PORT).setDatabase(2);return Redisson.create(config);}}

简单使用

java">// 3、创建订单(使用分布式锁)
Long userId = ThreadLocalUtls.getUser().getId();
RLock lock = redissonClient.getLock(RedisConstants.LOCK_ORDER_KEY + userId);
boolean isLock = lock.tryLock();

tryLock方法介绍
~~~~     tryLock():它会使用默认的超时时间和等待机制。具体的超时时间是由 Redisson 配置文件或者自定义配置决定的。
~~~~     tryLock(long time, TimeUnit unit):它会在指定的时间内尝试获取锁(等待time后重试),如果获取成功则返回 true,表示获取到了锁;如果在指定时间内(Redisson内部默认指定的)未能获取到锁,则返回 false。
~~~~     tryLock(long waitTime, long leaseTime, TimeUnit unit):指定等待时间为watiTime,如果超过 leaseTime 后还没有获取锁就直接返回失败

总的来讲自上而下,tryLock的灵活性逐渐提高,无参tryLock时,waitTime的默认值是-1,代表不等待,leaseTime的默认值是30,unit默认值是 seconds ,也就是锁超过30秒还没有释放就自动释放

原理可以去看具体的视频

秒杀下单优化

最开始我们的遇到自增ID问题,我们通过实现分布式ID解决了问题;后面我们在单体系统下遇到了一人多单超卖问题,我们通过乐观锁解决了;我们对业务进行了变更,将一人多单变成了一人一单,结果在高并发场景下同一用户发送相同请求仍然出现了超卖问题,我们通过悲观锁解决了;由于用户量的激增,我们将单体系统升级成了集群,结果由于锁只能在一个JVM中可见导致又出现了,在高并发场景下同一用户发送下单请求出现超卖问题,我们通过实现分布式锁成功解决集群下的超卖问题;由于我们最开始实现的分布式锁比较简单,会出现超时释放导致超卖问题,我们通过给锁添加线程标识成功解决了;但是释放锁时,判断锁是否是当前线程 和 删除锁两个操作不是原子性的,可能导致超卖问题,我们通过将两个操作封装到一个Lua脚本成功解决了;为了解决锁的不可重入性,我们通过将锁以hash结构的形式存储,每次释放锁都value-1,获取锁value+1,从而实现锁的可重入性,并且将释放锁和获取锁的操作封装到Lua脚本中以确保原子性。最最后,我们发现可以直接使用现有比较成熟的方案Redisson来解决上诉出现的所有问题🤣,什么不可重试、不可重入、超市释放、原子性等问题Redisson都提供相对应的解决方法(。^▽^)

异步秒杀优化

同步(Synchronous)是指程序按照顺序依次执行,每一步操作完成后再进行下一步。在同步模式下,当一个任务开始执行时,程序会一直等待该任务完成后才会继续执行下一个任务。
异步(Asynchronous)是指程序在执行任务时,不需要等待当前任务完成,而是在任务执行的同时继续执行其他任务。在异步模式下,任务的执行顺序是不确定的,程序通过回调、事件通知等方式来获取任务执行的结果。

显然异步的性能是要高于同步的,但是会牺牲掉一定的数据一致性,所以也不是无脑用异步,要根据具体业务进行分析,这里的下单是可以使用异步的,因为下单操作比较耗时,后端操作步骤多,可以进行拆分

通过准备200多个token对这个接口进行压力测试我们发现,这个接口的响应速度很慢,可以看到 qps1000、平均值1921、异常率37.2%、吞吐量315.3。

在这里插入图片描述

可以看到这个流程是同步执行的,同步是比较耗费时间的,我们直接将同步变成异步,从而大幅提高秒杀业务的性能,具体如何做呢?我们可以将一部分的工作交给Redis,并且不能直接去调用Redis,而是通过开启一个独立的子线程去异步执行,从而大大提高效率

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

这里Redis执行lua脚本是单线程的,并且具有原子性一致,我们继续对这个接口进行压力测试,发现响应速度大大降低

消息队列优化

分析
~~~~     前面我们使用 Java 自带的阻塞队列 BlockingQueue 实现消息队列,这种方式存在以下几个严重的弊端:
~~~~     信息可靠性没有保障,BlockingQueue 的消息是存储在内存中的,无法进行持久化,一旦程序宕机或者发生异常,会直接导致消息丢失
消息容量有限,BlockingQueue 的容量有限,无法进行有效扩容,一旦达到最大容量限制,就会抛出OOM异常
~~~~     所以这里我们可以选择采用其它成熟的的(和之前分布式锁一样)MQ,比如:RabbitMQ、RocketMQ、Kafka等,但是本项目是为了学习Redis而设计的,所以这里我们将要学习如何使用Redis实现一个相对可靠的消息队列(自己实现的肯定没法和别人成熟的产品相比)

那么我们该如何实现呢?首先我们需要了解MQ的特点(这里不再赘述了,MQ详情看上面那篇文章),根据MQ的特点选取相对应的数据结构,Redis中能够实现MQ效果的主要由以下三种方式:
在这里插入图片描述

  • list结构:基于List结构模拟消息队列(BRPOP+BLPOP实现阻塞队列)

    • 生产消息:BRPUSH key value [value …] 将一个或多个元素推入到指定列表的头部。如果列表不存在,BRPUSH命令会自动创建一个新的列表

    • 消费消息:BRPOP key [key …] timeout 从指定的一个或多个列表中弹出最后一个元素。如果 list 列表为空,BRPOP命令会导致客户端阻塞,直到有数据可用或超过指定的超时时间

  • 优点:不会内存超限、可以持久化、消息有序性;缺点:无法避免数据丢失、只支持单消费者

  • pubsub:发布订阅模式,基本的点对点消息模型(redis2.0引入)

    • 生产消息
      # 用于向指定频道发布一条消息
      PUBLISH channel message 
      
    • 消费消息
      # 订阅一个或多个频道
      SUBSCRIBE channel [channel]# 用于取消订阅一个或多个频道
      UNSUBSCRIBE [channel [channel ...]]# 用于订阅一个或多个符合给定模式的频道,接收消息
      PSUBSCRIBE pattern [pattern ...]# 用于取消订阅一个或多个符合给定模式的频道
      PUNSUBSCRIBE [pattern [pattern ...]]
      
  • 优点:支持多生产、多消费者;缺点:不支持持久化、无法比避免数据丢失,消息堆积有上限(消费者会缓存消息),超出会丢失消息

  • stream:比较完善的消息队列模型(redis5.0引入,我的是redis6.0😄)

  • stream是一种数据类型,专门为消息队列设计的,相较于前面两种方式能够更加完美实现一个消息队列

    • 生产消息:用于向指定的Stream流中添加一个消息

      XADD key *|ID value [value ...]
      # 创建名为 users 的队列,并向其中发送一个消息,内容是{name=jack,age=21},并且使用Redis自动生成ID
      127.0.0.1:6379> XADD users * name jack age 21
      "1644805700523-0"
      

      key就是消息队列,key不存(*)在会自动创建(默认),ID是消息表示,value是消息的内容

    • 消费消息

      XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID ID
      # 读取XREAD中的第一条消息
      XREAD COUNT 1 STREAMS users 0
      # 阻塞1秒钟后从XREAD中读取的最新消息
      XREAD COUNT 1 BLOCK 1000 STREAMS users $
      

注意:当我们指定起始ID为$时代表读取最后一条消息(读取最新的消息)ID为0时代表读最开始的一条消息(读取最旧的消息),如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题

优点:消息可回溯、一个消息可以被多个消费者消费、可以阻塞读取;缺点:有消息漏读的风险

~~~~     上面我们介绍的消费方式都是单消费方式,容易发生消息堆积导致消息丢失,所以我们需要改用消费者组的模式
~~~~     消费者组(Consumer Group):将多个消息划分到一个组中,监听同一队列
~~~~     消费者组的特点:
~~~~     消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
~~~~     消息标识:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费
~~~~     消息确认:消费者获取消息后,消息处于pending(待处理)状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除

# 创建消费者组
XGROUP CREATE key groupName ID
# 删除指定的消费者组
XGROUP DESTORY key groupName
# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupName consumerName
# 删除消费者组中指定消费者
XGROUP DELCONSUMER key groupName consumerName
# 从消费者组中读取消息
XREADGROUP GROUP

在这里插入图片描述
在这里插入图片描述
案例
在这里插入图片描述
首先需要我们创建队列

# 创建队列(消费者组模式)
XGROUP CREATE stream.orders g1 0 MKSTREAM

案例代码

java">@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate RedisWork redisWork;@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;private IVoucherOrderService iVoucherOrderService;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstructprivate void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private static final String queueName = "stream.orders";private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {log.info("开始处理异步任务:秒杀卷订单");while (true) {try {List<MapRecord<String, Object, Object>> read = stringRedisTemplate.opsForStream().read(Consumer.from("g1","c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));if(read == null || read.isEmpty()){continue;}MapRecord<String,Object,Object> record = read.get(0);Map<Object, Object> value = record.getValue();System.out.println(value);VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);System.out.println(voucherOrder);
//                    VoucherOrder take = orderTasks.take();handleVoucherOrder(voucherOrder);stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("订单异常:{}", e.getMessage());handlePendingList();}}}}private void handlePendingList() {while (true) {try{List<MapRecord<String, Object, Object>> read = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));if(read == null || read.isEmpty()){break;}MapRecord<String,Object,Object> record = read.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());}catch (Exception e){log.error("异常信息{}", e.getMessage());}}}private void handleVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();RLock lock = redissonClient.getLock("lock:order" + userId);boolean isLock = lock.tryLock();if (!isLock) {log.error("重复下单:优惠卷id{},用户id{}",voucherOrder.getVoucherId(), userId);return;}try{iVoucherOrderService.createVoucherOrder(voucherOrder);} catch (Exception e) {log.error("订单异常信息:{}", e.getMessage());throw new RuntimeException(e);}finally {lock.unlock();}}@Overridepublic Result seckillVoucher(Long voucherId) {long orderId = redisWork.nextId("order");Long userId = UserHolder.getUser().getId();// 1 执行lua脚本int result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(),String.valueOf(orderId)).intValue();if (result != 0) {return Result.fail(result == 1 ? "库存不足" : "不能重复下单");}iVoucherOrderService = (IVoucherOrderService) AopContext.currentProxy();// 2 判断是否为0// 为0 表示有购买资格,添加到阻塞队列当中// 返回订单idVoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setId(orderId);voucherOrder.setVoucherId(voucherId);voucherOrder.setUserId(userId);
//        orderTasks.add(voucherOrder);return Result.ok(orderId);}@Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();Integer count = query().eq("user_id",userId).eq("voucher_id",voucherOrder.getVoucherId()).count();if(count > 0){return;}boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherOrder.getVoucherId()).gt("stock",0).update();if (!success){return;}save(voucherOrder);}
}

这里我们需要注意一下代码,获取到record中的String是消息的ID,在Lua脚本当中发送消息的时候我们,需要注意字段名一直,在将map对象转换成java对象的时候才能保证正确转换

java">MapRecord<String,Object,Object> record = read.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);

在消息队列中获取消息出现异常的时候,我们需要从pending-list中获取消息,重新处理

java">try{
......
} catch (Exception e) {log.error("订单异常:{}", e.getMessage());handlePendingList();
}private void handlePendingList() {while (true) {try{List<MapRecord<String, Object, Object>> read = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));if(read == null || read.isEmpty()){break;}MapRecord<String,Object,Object> record = read.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());}catch (Exception e){log.error("异常信息{}", e.getMessage());}}
}

达人探店

Set实现点赞功能

现在存在一个问题,一个用户可以无限点赞,这显然是不合理的,所以我们需要对点赞功能进行一个优化,实现一人只能点赞一次。

对于点赞这种高频变化的数据,如果我们使用MySQL是十分不理智的,因为MySQL慢、并且并发请求MySQL会影响其它重要业务,容易影响整个系统的性能,继而降低了用户体验。那么如何我们要使用Redis,那么我们又该选择哪种数据结构才更加合理呢?

这里我推荐使用Set,因为Set类型的数据结构具有
~~~~     不重复,符合业务的特点,一个用户只能点赞一次
~~~~     高性能,Set集合内部实现了高效的数据结构(Hash表)
~~~~     灵活性,Set集合可以实现一对多,一个用户可以点赞多个博客,符合实际的业务逻辑
在这里插入图片描述

java">@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {@Resourceprivate IUserService userService;@Resourceprivate StringRedisTemplate stringRedisTemplate;/*** 根据id查询博客** @param id* @return*/@Overridepublic Result queryBlogById(Long id) {// 查询博客信息Blog blog = this.getById(id);if (Objects.isNull(blog)) {return Result.fail("笔记不存在");}// 查询blog相关的用户信息queryUserByBlog(blog);// 判断当前用户是否点赞该博客isBlogLiked(blog);return Result.ok(blog);}/*** 判断当前用户是否点赞该博客*/private void isBlogLiked(Blog blog) {Long userId = ThreadLocalUtls.getUser().getId();String key = BLOG_LIKED_KEY + blog.getId();Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());blog.setIsLike(BooleanUtil.isTrue(isMember));}/*** 查询热门博客** @param current* @return*/@Overridepublic Result queryHotBlog(Integer current) {// 根据用户查询Page<Blog> page = this.query().orderByDesc("liked").page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));// 获取当前页数据List<Blog> records = page.getRecords();// 查询用户records.forEach(blog -> {this.queryUserByBlog(blog);this.isBlogLiked(blog);});return Result.ok(records);}/*** 点赞** @param id* @return*/@Overridepublic Result likeBlog(Long id) {// 判断用户是否点赞Long userId = ThreadLocalUtls.getUser().getId();String key = BLOG_LIKED_KEY + blog.getId();// sismember key valueBoolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());boolean result;if (BooleanUtil.isFalse(isMember)) {// 用户未点赞,点赞数+1result = this.update(new LambdaUpdateWrapper<Blog>().eq(Blog::getId, id).setSql("liked = liked + 1"));if (result) {// 数据库更新成功,更新缓存  sadd key valuestringRedisTemplate.opsForSet().add(key, userId.toString());}} else {// 用户已点赞,点赞数-1result = this.update(new LambdaUpdateWrapper<Blog>().eq(Blog::getId, id).setSql("liked = liked - 1"));if (result) {// 数据更新成功,更新缓存 srem key valuestringRedisTemplate.opsForSet().remove(key, userId.toString());}}return Result.ok();}/*** 查询博客相关用户信息** @param blog*/private void queryUserByBlog(Blog blog) {Long userId = blog.getUserId();User user = userService.getById(userId);blog.setName(user.getNickName());blog.setIcon(user.getIcon());}
}

在Blog这个对象当中就有这样一个字段,判断当前用户是否点赞该文章

java"> @TableField(exist = false)private Boolean isLike;

SortedSet实现点赞排行榜

在这里插入图片描述
~~~~     平常我们所使用的软件中(比如微信、QQ、抖音)的点赞功能都会默认按照时间顺序对点赞的用户进行一个排序,后点赞的用户会排在最前面,而Set是无需的,无法满足这个需求,虽然 List有序,但是不唯一,查找效率也比较低,所以也不推荐使用,此时我们就可以选择使用SortedSet这个数据结构,它完美的满足了我们所有的需求:唯一、有序、查找效率高。

相较于Set集合,SortedList有以下不同之处:

对于Set集合我们可以使用 isMember方法判断用户是否存在,对于SortedList我们可以使用ZSCORE方法判断用户是否存在
Set集合没有提供范围查询,无法获排行榜前几名的数据,SortedList可以使用ZRANGE方法实现范围查询

java">@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {@Resourceprivate IUserService userService;@Resourceprivate StringRedisTemplate stringRedisTemplate;/*** 根据id查询博客** @param id* @return*/@Overridepublic Result queryBlogById(Long id) {// 查询博客信息Blog blog = this.getById(id);if (Objects.isNull(blog)) {return Result.fail("笔记不存在");}// 查询blog相关的用户信息queryUserByBlog(blog);// 判断当前用户是否点赞该博客isBlogLiked(blog);return Result.ok(blog);}/*** 判断当前用户是否点赞该博客*/private void isBlogLiked(Blog blog) {UserDTO user = ThreadLocalUtls.getUser();if (Objects.isNull(user)){// 当前用户未登录,无需查询点赞return;}Long userId = user.getId();String key = BLOG_LIKED_KEY + blog.getId();Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());blog.setIsLike(Objects.nonNull(score));}/*** 查询热门博客** @param current* @return*/@Overridepublic Result queryHotBlog(Integer current) {// 根据用户查询Page<Blog> page = this.query().orderByDesc("liked").page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));// 获取当前页数据List<Blog> records = page.getRecords();// 查询用户records.forEach(blog -> {this.queryUserByBlog(blog);this.isBlogLiked(blog);});return Result.ok(records);}/*** 点赞** @param id* @return*/@Overridepublic Result likeBlog(Long id) {// 1、判断用户是否点赞Long userId = ThreadLocalUtls.getUser().getId();String key = BLOG_LIKED_KEY + id;// zscore key valueDouble score = stringRedisTemplate.opsForZSet().score(key, userId.toString());boolean result;if (score == null) {// 1.1 用户未点赞,点赞数+1result = this.update(new LambdaUpdateWrapper<Blog>().eq(Blog::getId, id).setSql("liked = liked + 1"));if (result) {// 数据库更新成功,更新缓存 zadd key value scorestringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis());}} else {// 1.2 用户已点赞,点赞数-1result = this.update(new LambdaUpdateWrapper<Blog>().eq(Blog::getId, id).setSql("liked = liked - 1"));if (result) {// 数据更新成功,更新缓存 zrem key valuestringRedisTemplate.opsForZSet().remove(key, userId.toString());}}return Result.ok();}/*** 查询所有点赞博客的用户** @param id* @return*/@Overridepublic Result queryBlogLikes(Long id) {// 查询Top5的点赞用户 zrange key 0 4Long userId = ThreadLocalUtls.getUser().getId();String key = BLOG_LIKED_KEY + id;Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);if (top5 == null || top5.isEmpty()) {return Result.ok(Collections.emptyList());}List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());List<UserDTO> userDTOList = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());return Result.ok(userDTOList);}/*** 查询博客相关用户信息** @param blog*/private void queryUserByBlog(Blog blog) {Long userId = blog.getUserId();User user = userService.getById(userId);blog.setName(user.getNickName());blog.setIcon(user.getIcon());}
}

但是这里我们发现先点赞的被排在最后面,那这是我们不愿意看到的,这是为什么呢,如下图
在这里插入图片描述
这就是由于Mysql默认查出来的数据是按照id自增的,解决方法如下,根据字段的顺序排名

select id, phone,password,nick_name,icon,create_time,update_time
from tb_user
where id in(1, 5)
order by field(id, 5, 1)
java">/*** 查询所有点赞博客的用户** @param id* @return*/
@Override
public Result queryBlogLikes(Long id) {// 查询Top5的点赞用户 zrange key 0 4Long userId = ThreadLocalUtls.getUser().getId();String key = BLOG_LIKED_KEY + id;Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);if (top5 == null || top5.isEmpty()) {return Result.ok(Collections.emptyList());}List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());String idStr = StrUtil.join(",", ids);// 根据id降序排序 select * from tb_user where id in(1,5) order by field(id, 1, 5)List<UserDTO> userDTOList = userService.list(new LambdaQueryWrapper<User>().in(User::getId, ids).last("order by field (id," + idStr + ")")).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());return Result.ok(userDTOList);
}

好友关注

关注与取关

java">@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {/*** 关注用户** @param followUserId 关注用户的id* @param isFollow     是否已关注* @return*/@Overridepublic Result follow(Long followUserId, Boolean isFollow) {Long userId = ThreadLocalUtls.getUser().getId();if (isFollow) {// 用户为关注,则关注Follow follow = new Follow();follow.setUserId(userId);follow.setFollowUserId(followUserId);this.save(follow);} else {// 用户已关注,删除关注信息this.remove(new LambdaQueryWrapper<Follow>().eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId));}return Result.ok();}/*** 是否关注用户** @param followUserId 关注用户的id* @return*/@Overridepublic Result isFollow(Long followUserId) {Long userId = ThreadLocalUtls.getUser().getId();int count = this.count(new LambdaQueryWrapper<Follow>().eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId));return Result.ok(count > 0);}
}

Set实现共同关注

我们可以关注的信息存入set集合当中,这样就可以利用set集合求交集的特性找到共同关注的用户

java">@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate IUserService userService;/*** 关注用户** @param followUserId 关注用户的id* @param isFollow     是否已关注* @return*/@Overridepublic Result follow(Long followUserId, Boolean isFollow) {Long userId = ThreadLocalUtls.getUser().getId();String key = FOLLOW_KEY + userId;if (isFollow) {// 用户为关注,则关注Follow follow = new Follow();follow.setUserId(userId);follow.setFollowUserId(followUserId);boolean isSuccess = this.save(follow);if (isSuccess) {// 用户关注信息保存成功,把关注的用户id放入Redis的Set集合中,stringRedisTemplate.opsForSet().add(key, followUserId.toString());}} else {// 用户已关注,删除关注信息boolean isSuccess = this.remove(new LambdaQueryWrapper<Follow>().eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId));if (isSuccess) {stringRedisTemplate.opsForSet().remove(key, followUserId.toString());}}return Result.ok();}/*** 是否关注用户** @param followUserId 关注用户的id* @return*/@Overridepublic Result isFollow(Long followUserId) {Long userId = ThreadLocalUtls.getUser().getId();int count = this.count(new LambdaQueryWrapper<Follow>().eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId));return Result.ok(count > 0);}/*** 查询共同关注** @param id* @return*/@Overridepublic Result followCommons(Long id) {Long userId = ThreadLocalUtls.getUser().getId();String key1 = FOLLOW_KEY + userId;String key2 = FOLLOW_KEY + id;// 查询当前用户与目标用户的共同关注对象Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key1, key2);if (Objects.isNull(intersect) || intersect.isEmpty()) {return Result.ok(Collections.emptyList());}List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());// 查询共同关注的用户信息List<UserDTO> userDTOList = userService.listByIds(ids).stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());return Result.ok(userDTOList);}
}

Feed流关注推送

什么是Feed流

关注推送也叫做Feed流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的信息。Feed流是一种基于用户个性化需求和兴趣的信息流推送方式,常见于社交媒体、新闻应用、音乐应用等互联网平台。Feed流通过算法和用户行为数据分析,动态地将用户感兴趣的内容以流式方式呈现在用户的界面上。

在这里插入图片描述

Feed流产品有两种常见模式:
~~~~     时间排序(Timeline):不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈
~~~~     优点:信息全面,不会有缺失。并且实现也相对简单
~~~~     缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户
~~~~     优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
~~~~     缺点:如果算法不精准,可能起到反作用

本项目是基于关注的好友来进行投喂的所以选择用时间排序的方式在这里插入图片描述

  1. 拉模式:也叫做读扩散。在拉模式中,终端用户或应用程序主动发送请求来获取最新的数据流。它是一种按需获取数据的方式,用户可以在需要时发出请求来获取新数据。在Feed流中,数据提供方将数据发布到实时数据源中,而终端用户或应用程序通过订阅或请求来获取新数据。
    优点:节约空间,可以减少不必要的数据传输,只需要获取自己感兴趣的数据,因为赵六在读信息时,并没有重复读取,而且读取完之后可以把他的收件箱进行清楚。
    缺点:延迟较高,当用户读取数据时才去关注的人里边去读取数据,假设用户关注了大量的用户,那么此时就会拉取海量的内容,对服务器压力巨大。
    在这里插入图片描述
  2. 推模式:也叫做写扩散。在推模式中,数据提供方主动将最新的数据推送给终端用户或应用程序。数据提供方会实时地将数据推送到终端用户或应用程序,而无需等待请求。
    优点:数据延迟低,不用临时拉取
    缺点:内存耗费大,假设一个大V写信息,很多人关注他, 就会写很多份数据到粉丝那边去
    在这里插入图片描述
  3. 推拉结合:也叫做读写混合,兼具推和拉两种模式的优点。在推拉结合模式中,数据提供方会主动将最新的数据推送给终端用户或应用程序,同时也支持用户通过拉取的方式来获取数据。这样可以实现实时的数据更新,并且用户也具有按需获取数据的能力。推拉模式是一个折中的方案,站在发件人这一段,如果是个普通的人,那么我们采用写扩散的方式,直接把数据写入到他的粉丝中去,因为普通的人他的粉丝关注量比较小,所以这样做没有压力,如果是大V,那么他是直接将数据先写入到一份到发件箱里边去,然后再直接写一份到活跃粉丝收件箱里边去,现在站在收件人这端来看,如果是活跃粉丝,那么大V和普通的人发的都会直接写入到自己收件箱里边来,而如果是普通的粉丝,由于他们上线不是很频繁,所以等他们上线时,再从发件箱里边去拉信息
    在这里插入图片描述
    案例
    在这里插入图片描述
    这里我们的粉丝数量比较小,因此选择用推的方式来实现

由于我们需要实现分页查询功能,这里我们可以选择 list 或者 SortedSet,而不能使用Set,因为Set是无需的, list是有索引的,SortedSet 是有序的,那么我们该如何选择呢?

如果我们选择 list 会存在索引漂移现象(这个在Vue中也存在),从而导致读取重复数据,所以我们不能选择使用 list(当我们正查询第二页的数据的时候,突然关注的一个人发送一了一条信息,这时候使用list,它的角标就会偏移,导致读出来的数据重复)
在这里插入图片描述
我们可以选择使用滚动分页,我们使用SortedSet,如果使用排名和使用角标是一样的,但是SortedSet可以按照Score排序(Score默认按照时间戳生成,所以是固定的),每次我们可以选择比之前Score较小的,这样就能够实现滚动排序,从而防止出现问题
在这里插入图片描述
发布笔记推送给所有粉丝

java">/*** 保存探店笔记** @param blog* @return*/
@Override
public Result saveBlog(Blog blog) {Long userId = ThreadLocalUtls.getUser().getId();blog.setUserId(userId);// 保存探店笔记boolean isSuccess = this.save(blog);if (!isSuccess){return Result.fail("笔记保存失败");}// 查询笔记作者的所有粉丝List<Follow> follows = followService.list(new LambdaQueryWrapper<Follow>().eq(Follow::getFollowUserId, userId));// 将笔记推送给所有的粉丝for (Follow follow : follows) {// 获取粉丝的idLong id = follow.getUserId();// 推送笔记String key = FEED_KEY + id;stringRedisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis());}return Result.ok(blog.getId());
}
java">/*** 关注推送页面的笔记分页** @param max* @param offset* @return*/
@Override
public Result queryBlogOfFollow(Long max, Integer offset) {// 1、查询收件箱Long userId = ThreadLocalUtls.getUser().getId();String key = FEED_KEY + userId;// ZREVRANGEBYSCORE key Max Min LIMIT offset countSet<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);// 2、判断收件箱中是否有数据if (typedTuples == null || typedTuples.isEmpty()) {return Result.ok();}// 3、收件箱中有数据,则解析数据: blogId、minTime(时间戳)、offsetList<Long> ids = new ArrayList<>(typedTuples.size());long minTime = 0; // 记录当前最小值int os = 1; // 偏移量offset,用来计数for (ZSetOperations.TypedTuple<String> tuple : typedTuples) { // 5 4 4 2 2// 获取idids.add(Long.valueOf(tuple.getValue()));// 获取分数(时间戳)long time = tuple.getScore().longValue();if (time == minTime) {// 当前时间等于最小时间,偏移量+1os++;} else {// 当前时间不等于最小时间,重置minTime = time;os = 1;}}// 4、根据id查询blog(使用in查询的数据是默认按照id升序排序的,这里需要使用我们自己指定的顺序排序)String idStr = StrUtil.join(",", ids);List<Blog> blogs = this.list(new LambdaQueryWrapper<Blog>().in(Blog::getId, ids).last("ORDER BY FIELD(id," + idStr + ")"));// 设置blog相关的用户数据,是否被点赞等属性值for (Blog blog : blogs) {// 查询blog有关的用户queryUserByBlog(blog);// 查询blog是否被点赞isBlogLiked(blog);}// 5、封装并返回ScrollResult scrollResult = new ScrollResult();scrollResult.setList(blogs);scrollResult.setOffset(os);scrollResult.setMinTime(minTime);return Result.ok(scrollResult);
}

这里我们在存入文章的时候,存入了时间戳,我们第一次获取的时候根据当前时间来规定为最大值,0为最小值,偏移量为1,查询个数为3,然后循环遍历出最小的时间戳当做下一次最大的时间戳,然后找出最小时间戳相同的有几个,当做下一次的偏移量,封装为结果进行返回

附近商铺搜索

GEO数据结构

GEO就是Geolocation的简写形式,代表地理坐标。Redis在3.2版本中加入了对GEO的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。常见的命令有:
GEOADD:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)
GEODIST:计算指定的两个点之间的距离并返回
GEOHASH:将指定member的坐标转为hash字符串形式并返回
GEOPOS:返回指定member的坐标
GEORADIUS:指定圆心、半径,找到该圆内包含的所有member,并按照与圆心之间的距离排序后返回。6.2以后已废弃
GEOSEARCH:在指定范围内搜索member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2.新功能
GEOSEARCHSTORE:与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key。 6.2.新功能

主播我到最后才发现我是用的数据库版本是5点多,由于到最后了不想更换了,后面用的API是比较老的

预热数据

java">/*** 预热店铺数据,按照typeId进行分组,用于实现附近商户搜索功能*/
@Test
public void loadShopListToCache() {// 1、获取店铺数据List<Shop> shopList = shopService.list();// 2、根据 typeId 进行分类
//        Map<Long, List<Shop>> shopMap = new HashMap<>();
//        for (Shop shop : shopList) {
//            Long shopId = shop.getId();
//            if (shopMap.containsKey(shopId)){
//                // 已存在,添加到已有的集合中
//                shopMap.get(shopId).add(shop);
//            }else{
//                // 不存在,直接添加
//                shopMap.put(shopId, Arrays.asList(shop));
//            }
//        }// 使用 Lambda 表达式,更加优雅(优雅永不过时)Map<Long, List<Shop>> shopMap = shopList.stream().collect(Collectors.groupingBy(Shop::getTypeId));// 3、将分好类的店铺数据写入redisfor (Map.Entry<Long, List<Shop>> shopMapEntry : shopMap.entrySet()) {// 3.1 获取 typeIdLong typeId = shopMapEntry.getKey();List<Shop> values = shopMapEntry.getValue();// 3.2 将同类型的店铺的写入同一个GEO ( GEOADD key 经度 维度 member )String key = SHOP_GEO_KEY + typeId;// 方式一:单个写入(这种方式,一个请求一个请求的发送,十分耗费资源,我们可以进行批量操作)
//            for (Shop shop : values) {
//                stringRedisTemplate.opsForGeo().add(key, new Point(shop.getX(), shop.getY()),
//                shop.getId().toString());
//            }// 方式二:批量写入List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>();for (Shop shop : values) {locations.add(new RedisGeoCommands.GeoLocation<>(shop.getId().toString(),new Point(shop.getX(), shop.getY())));}stringRedisTemplate.opsForGeo().add(key, locations);}
}

案例代码

java">@Override
public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {if (x == null || y == null){// 根据类型分页查询Page<Shop> page = query().eq("type_id", typeId).page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));// 返回数据return Result.ok(page.getRecords());}int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;int end = current * SystemConstants.DEFAULT_PAGE_SIZE;String key = RedisConstants.SHOP_GEO_KEY + typeId;GeoResults<RedisGeoCommands.GeoLocation<String>> search = stringRedisTemplate.opsForGeo().radius(key,new Circle(new Point(x,y),5000),RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().limit(end));if(search == null){return Result.ok(Collections.emptyList());}List<Long> ids = new ArrayList<>();Map<String,Distance> distanceMap = new HashMap<>();List<GeoResult<RedisGeoCommands.GeoLocation<String>>> content = search.getContent();content.stream().skip(from).forEach(item -> {String shopId = item.getContent().getName();ids.add(Long.valueOf(shopId));Distance distance = item.getDistance();distanceMap.put(shopId, distance);});if(ids.isEmpty()){return Result.ok();}String idsStr = StrUtil.join(",",ids);List<Shop> shopList = query().in("id", ids).last("ORDER BY FIELD(id," + idsStr + ")").list();for (Shop shop : shopList) {shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());}return Result.ok(shopList);
}

用户签到与连续签到

BitMap的基本使用

BitMap的操作命令有:
SETBIT:向指定位置(offset)存入一个0或1
GETBIT :获取指定位置(offset)的bit值
BITCOUNT :统计BitMap中值为1的bit位的数量
BITFIELD :操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
BITFIELD_RO :获取BitMap中bit数组,并以十进制形式返回
BITOP :将多个BitMap的结果做位运算(与 、或、异或)
BITPOS :查找bit数组中指定范围内第一个0或1出现的位置

用户签到

java">@Override
public Result userSign() {Long userId = UserHolder.getUser().getId();LocalDateTime now = LocalDateTime.now();String YM = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));String key = RedisConstants.USER_SIGN_KEY + userId + YM;int dayofMonth = now.getDayOfMonth();Boolean b = stringRedisTemplate.opsForValue().setBit(key, dayofMonth - 1, true);if(Boolean.TRUE.equals(b)){return Result.ok();}return Result.fail("签到失败");
}

统计连续签到次数

问题1:什么叫做连续签到天数?
从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数。
问题2:如何得到本月到今天为止的所有签到数据?
BITFIELD key GET u[dayOfMonth] 0
问题3:如何从后向前遍历每个bit位?
与 1 做与运算,就能得到最后一个bit位。随后右移1位,下一个bit位就成为了最后一个bit位。

java">@Override
public Result userSingCount() {Long userId = UserHolder.getUser().getId();LocalDateTime now = LocalDateTime.now();String YM = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));String key = RedisConstants.USER_SIGN_KEY + userId + YM;int dayofMonth = now.getDayOfMonth();List<Long> result = stringRedisTemplate.opsForValue().bitField(key, BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(dayofMonth)).valueAt(0));if (result == null || result.isEmpty()){return Result.ok(0);}Long num = result.get(0);int count = 0;if (num == null || num == 0){return Result.ok(0);}while (true){if ((num & 1) == 0) {break;}else {count++;}num >>>= 1;}return Result.ok(count);
}

http://www.ppmy.cn/devtools/167672.html

相关文章

如何在androidstudio开发环境中查看sqlite数据库(按新版本Android Studio Giraffe提供详细步骤和操作说明,附截图,代码)

如何在androidstudio开发环境中查看sqlite数据库&#xff08;按新版本Android Studio Giraffe提供详细步骤和操作说明&#xff0c;附截图&#xff0c;代码&#xff09;鹿溪IT工作室提供_android studio查看数据库-CSDN博客

使用 ConfigMaps 可以优化 Spring Boot应用

基本概念 ConfigMaps&#xff1a;Kubernetes 中的一种资源对象&#xff0c;用于存储非敏感的配置数据&#xff0c;如应用程序属性、环境变量等。 主要优势 简化部署&#xff1a;无需在容器镜像中嵌入配置&#xff0c;减少镜像大小&#xff0c;加快部署速度。 动态更新&#…

如何在AVL树中高效插入并保持平衡:一步步掌握旋转与平衡因子 —— 平衡因子以及AVL结构篇

文章目录 AVL树的概念AVL树的结构AVL树的插入平衡因子更新终止条件插入以及平衡因子的保持AVL树的查找 AVL树的概念 AVL树&#xff08;Adelson-Velsky and Landis Tree&#xff09;是一种自平衡二叉查找树&#xff0c;它的特点是每个节点的左子树和右子树的高度差不能超过1。这…

grunt构建工具:scss转css

Grunt 是一个基于 JavaScript 的任务运行工具&#xff0c;通常用于自动化重复性任务&#xff0c;例如代码编译、文件压缩、单元测试等。它通过配置文件 Gruntfile.js 来定义任务和插件。 完整项目地址&#xff1a;https://github.com/ylpxzx/grunt-scss-to-css 以下是 Grunt 的…

爬虫基础之爬取豆瓣同城信息(保存为csv excel 数据库)

网站:长沙最近一周戏剧活动_豆瓣 温馨提示: 本案例仅供学习交流使用 本案例所使用的模块 requests(发送HTTP请求)pandas(数据保存模块)lxml(用于解析数据模块)csv(用于保存为csv文件)pymysql(用于操作数据库)parsel(解析数据的模块) 确定爬取的信息内容&#xff1a; 戏剧的名称…

贪心算法(6)(java)优势洗牌

题目: 给定两个长度相等的数组nums1和nums2&#xff0c;nums1相对于nums2的优势可以满足nums1【1】>nums[2]的索引的数目来描述。 返回nums1的任意排列&#xff0c;使其相对于nums2的透视最大化呀。 原理&#xff08;贪心策略&#xff09;&#xff1a;田忌赛马 1.如果比不…

数据类设计_图片类设计之6_混合图形类设计(前端架构)

前言 学的东西多了,要想办法用出来.C和C是偏向底层的语言,直接与数据打交道.尝试做一些和数据方面相关的内容 引入 接续上一篇,讨论混合图形类设计 方法论-现在能做什么 这段属于聊天内容---有句话是这么说的&#xff1a;不要只埋头拉车&#xff0c;还要抬头看路。写代码也是…

蓝桥杯_LED模块

一 前言 还有四十多天将要进行蓝桥杯的比赛&#xff0c;接下来一个多月我将进行我的知识点的复习&#xff0c;争取在蓝桥杯提交一个满意的答卷 二 锁存器M74HC753M1R 在我这一年并没有进行在csdn上发布任何文章&#xff0c;这一年我学了stm32、51&#xff0c;还有部分理论知…