【黑马点评】 使用RabbitMQ实现消息队列——2.使用RabbitMQ监听秒杀下单

ops/2024/10/18 16:53:58/

2 使用RabbitMQ实现消息队列

2.1 修改\hm-dianping\pom.xmlpom.xml文件

添加RabbitMQ的环境

<!-- RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.2 修改Resource下的application.yaml文件

添加RabbitMQ的配置信息

spring:rabbitmq:host: 127.0.0.1 # IP地址port: 5672 # 端口号username: hmdianping # 用户名password: 123456 # 密码listener:simple:concurrency: 1max-concurrency: 1acknowledge-mode: manualprefetch: 1

主启动类标注@EnableRabbit开启消息队列的监听功能

在这里插入图片描述

2.3 配置RabbitMQ,创建交换机和消息队列,将二者绑定

创建direct类型的交换机以及一个名为seckill.order.queue的消息队列,然后将二者绑定.之后发往该交换机且路由键为seckill.order的消息都会转发seckill.order.queue

具体而言,在hm-dianping.src.main.java.com.hmdp.config下新建RabbitMQConfig文件

文件内容如下

package com.hmdp.config;import com.hmdp.entity.VoucherOrder;
import com.hmdp.service.IVoucherOrderService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;
import java.io.IOException;@Configuration
public class RabbitMQConfig {@ResourceIVoucherOrderService voucherOrderService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.seckill.queue"),key = "direct.seckill",exchange = @Exchange(name = "hmdianping.direct", type = ExchangeTypes.DIRECT)))public void recieveMessage(Object message){System.out.println("监听到了"+message);}@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
}

2.3.1 测试监听消息

在Test中添加发送消息的方法,指定交换机hmdianping.direct 为和路由键 direct.seckill

    @ResourceRabbitTemplate rabbitTemplate;@Testpublic void testSendMessage(){rabbitTemplate.convertAndSend("hmdianping.direct","direct.seckill","测试发送消息");}

先运行Test方法

在这里插入图片描述

之后运行启动类HmDianPingApplication

发现监听到了,说明连接成功。

在这里插入图片描述

2.3.2 修改秒杀下单业务(VoucherOrderServiceImpl中的seckillVoucher方法)

  • 注入RabbitTemplate类

  • 在认定有抢购资格后,直接向seckill.direct交换机发送消息,内容包含voucherId、userId、orderId

    @ResourceRabbitTemplate rabbitTemplate;@Overridepublic Result seckillVoucher(Long voucherId) {//1.执行lua脚本,判断当前用户的购买资格Long userId = UserHolder.getUser().getId();Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString());if (result != 0) {//2.不为0说明没有购买资格return Result.fail(result==1?"库存不足":"不能重复下单");}//3.走到这一步说明有购买资格,将订单信息存到消息队列VoucherOrder voucherOrder = new VoucherOrder();long orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(UserHolder.getUser().getId());voucherOrder.setVoucherId(voucherId);//存入消息队列等待异步消费rabbitTemplate.convertAndSend("hmdianping.direct","direct.seckill",voucherOrder);return Result.ok(orderId);}

此时VoucherOrderServiceImpl文件内容如下

package com.hmdp.service.impl;import com.hmdp.dto.Result;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.aop.framework.AopContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.concurrent.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;/*** <p>*  服务实现类* </p>** @author 虎哥* @since 2021-12-22*/
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate IVoucherOrderService iVoucherOrderService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);//异步处理线程池private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@Transactionalpublic void handleVoucherOrder(VoucherOrder voucherOrder) {//1.所有信息从当前消息实体中拿Long voucherId = voucherOrder.getVoucherId();//2.扣减库存boolean success = seckillVoucherService.update().setSql("stock=stock-1").eq("voucher_id", voucherId)//======判断当前库存是否大于0就可以决定是否能抢池子中的券了.gt("stock", 0).update();//3.创建订单if(success) save(voucherOrder);}@ResourceRabbitTemplate rabbitTemplate;@Overridepublic Result seckillVoucher(Long voucherId) {//1.执行lua脚本,判断当前用户的购买资格Long userId = UserHolder.getUser().getId();Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString());if (result != 0) {//2.不为0说明没有购买资格return Result.fail(result==1?"库存不足":"不能重复下单");}//3.走到这一步说明有购买资格,将订单信息存到消息队列VoucherOrder voucherOrder = new VoucherOrder();long orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(UserHolder.getUser().getId());voucherOrder.setVoucherId(voucherId);//存入消息队列等待异步消费rabbitTemplate.convertAndSend("hmdianping.direct","direct.seckill",voucherOrder);return Result.ok(orderId);}@Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder){// 5.一人一单逻辑// 5.1.用户idLong userId = voucherOrder.getId();int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder).count();// 5.2.判断是否存在if (count > 0) {// 用户已经购买过了log.error("用户已经购买过一次!");return ;}//6.扣减库存boolean success = seckillVoucherService.update().setSql("stock= stock -1") // set stock = stock -1.eq("voucher_id", voucherOrder).gt("stock",0)// where id = ? and stock > 0.update();if (!success) {//扣减库存log.error("库存不足!");return ;}save(voucherOrder);}
}

2.3.3 监听秒杀成功订单

  • 监听seckill.order.queue队列的信息并且创建订单到数据库,当创建完成时手动ack

RabbitMQConfig中的recieveMessage修改为

    public void recieveMessage(Message message, Channel channel, VoucherOrder voucherOrder){try {voucherOrderService.handleVoucherOrder(voucherOrder);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {throw new RuntimeException(e);}System.out.println("监听到了"+message);}

此时,文件内容为。

package com.hmdp.config;import com.hmdp.entity.VoucherOrder;
import com.hmdp.service.IVoucherOrderService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;
import java.io.IOException;@Configuration
public class RabbitMQConfig {@ResourceIVoucherOrderService voucherOrderService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.seckill.queue"),key = "direct.seckill",exchange = @Exchange(name = "hmdianping.direct", type = ExchangeTypes.DIRECT)))
/*    public void recieveMessage(Object message){System.out.println("监听到了"+message);}*/public void recieveMessage(Message message, Channel channel, VoucherOrder voucherOrder){try {voucherOrderService.handleVoucherOrder(voucherOrder);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {throw new RuntimeException(e);}System.out.println("监听到了"+message);}@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
}

2.3.4 测试

使用Apifox测试

在这里插入图片描述

成功

在这里插入图片描述


http://www.ppmy.cn/ops/122292.html

相关文章

BGP路由原理详解

&#x1f423;个人主页 可惜已不在 &#x1f424;这篇在这个专栏 华为_可惜已不在的博客-CSDN博客 &#x1f425;有用的话就留下一个三连吧&#x1f63c; 目录 一. BGP简介: 二. BGP报文中的角色 BGP的报文 BGP处理过程 BGP有限状态机 BGP属性 三. BGP作用 四. BGP选路 ​…

通用版本升级规范

目录 文章说明文档版本升级规则三段式版本主版本号&#xff08;Major&#xff09;次版本号&#xff08;Minor&#xff09;修订号&#xff08;Patch&#xff09; 变更日志&#xff08;Changelog&#xff09; 软件版本升级规则三段式版本主版本号&#xff08;Major&#xff09;次…

【华为HCIP实战课程六】OSPF邻居关系排错网络子网掩码问题,网络工程师

一、链路上网络和掩码引发的OSPF邻居问题 R3和R4已经建立正常的ospf邻居关系 更改IP地址前R3接口IP地址 interface Serial2/0/0 link-protocol ppp ip address 10.1.34.3 255.255.255.240 [R3-Serial2/0/0]ip address 10.1.88.2 255.255.255.240 更改为10.1.88.2 R3和R4虽…

spark-sql建表数据同步到hive

1、基础环境 组件版本备注hadoop3.4.0官方下载hive3.1.3自编译sparkspark-3.5.3-bin-hadoop3官方下载&#xff0c;需要内置hive的jar相关内容paimon0.9.0Maven官方下载jdk1.8.0_41maven3.9.6固定版本 2、停止服务、清理日志 先停止&#xff0c;清理数据 sudo kill -9 $(ps -ef…

MySQL多表查询:列子查询

先看我的表数据 dept表 emp表 列子查询&#xff0c;也就是多列作为子查询去寻找一些问题 常用操作符&#xff1a;IN, NOT IN, ANY, SOME, ALL 1.查询 "销售部" 和 "市场部" 的所有员工的信息&#xff08;拆分成以下两个问题&#xff09; a. 查询"销…

【C/C++】错题记录(五)

题目一 题目二 在 16 位机器上&#xff0c;通常以 2 字节为边界对齐。 首先看 char a&#xff0c;它占用 1 个字节。接着是 int b&#xff0c;占用 2 个字节。由于要满足边界对齐&#xff0c;在 char a后面会填充 1 个字节&#xff0c;使得 int b从 2 字节边界开始存储。最后是…

【CSS/HTML】左侧固定,右侧自适应的布局方式理解margin负值理论

文章目录 一.浮动布局二.margin的负值(3个div)三.calc()计算属性四.flex布局 一.浮动布局 1.先让固定宽度的div浮动&#xff01;使其脱离文档流。 2.margin-left的值等于固定div的宽度相等。 .aside{float: left;width: 200px;background-color: red;}.content{margin-left: 2…

Golang | Leetcode Golang题解之第446题等差数列划分II-子序列

题目&#xff1a; 题解&#xff1a; func numberOfArithmeticSlices(nums []int) (ans int) {f : make([]map[int]int, len(nums))for i, x : range nums {f[i] map[int]int{}for j, y : range nums[:i] {d : x - ycnt : f[j][d]ans cntf[i][d] cnt 1}}return }