SpringBoot(三十九)SpringBoot集成RabbitMQ实现流量削峰添谷

server/2024/11/27 17:33:58/

前边我们有具体的学习过RabbitMQ的安装和基本使用的情况。

但是呢,没有演示具体应用到项目中的实例。

这里使用RabbitMQ来实现流量的削峰添谷。

一:添加pom依赖

<!--rabbitmq-需要的 AMQP 依赖-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

二:yml配置

spring:
#配置rabbitmq 服务器
rabbitmq:virtual-host: /host: 1.15.157.156port: 5672username: xxxxxpassword: xxxxx# 开启发布确认机制#SIMPLE,     // 使用 RabbitTemplate#waitForConfirms() 或 waitForConfirmsOrDie()#CORRELATED, // 使用 CorrelationData 关联确认与发送的消息#NONE        // 不启用发布确认publisher-confirm-type: correlated# publisher-confirms 消息的可靠投递, confirm 确认模式 默认为false#publisher-confirms: true# 添加发布确认返回, return 回退模式 默认为falsepublisher-returns: true### listenerlistener:# 每次从队列中预取5条消息prefetch: 20# 最小消费者数量concurrency: 1# 最大的消费者数量max-concurrency: 10simple:# 设置预取数量为1  每次取一个prefetch: 1# manual:手动 ack,需要在业务代码结束后,调用 api 发送 ack,虽灵活但会提高编码复杂度。# auto:自动 ack,没有异常则返回 ack;抛出异常则返回 nack,消息重新入队,一直到没有异常为止,也可以设置最大重试次数,超过次数后发送到专门收集错误消息的队列进一步处理# none:关闭ack,MQ 假定消费者获取消息后会成功处理,因此消息投递后立即被删除(消息投递是不可靠的,可能丢失)acknowledge-mode: manual# 失败重试retry:# 开启消费者失败重试enabled: true# 初始的失败等待时长为1秒initial-interval: 1000# 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmultiplier: 3# 最大重试次数max-attempts: 4# true无状态;false有状态。如果业务中包含事务,这里改为falsestateless: true

具体的配置都有对应的注释,参照即可。

三:编写config配置类

package com.modules.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;@Configuration
public class RabbitMQConfig
{@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String userName;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.listener.prefetch}")private int prefetch;@Value("${spring.rabbitmq.listener.concurrency}")private int concurrentConsumers;@Value("${spring.rabbitmq.listener.max-concurrency}")private int maxConcurrentConsumers;/*** 链接RabbitMQ* @return*/@Beanpublic ConnectionFactory connectionDirectFactory(){CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(userName);connectionFactory.setPassword(password);connectionFactory.setPublisherConfirms(true); //必须要设置return connectionFactory;}/*** 配置RabbitMQ参数* @return*/@Beanpublic SimpleRabbitListenerContainerFactory rabbitDirectListenerContainerFactory(){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionDirectFactory());//设置最小并发的消费者数量factory.setConcurrentConsumers(concurrentConsumers);//设置最大并发的消费者数量factory.setMaxConcurrentConsumers(maxConcurrentConsumers);//限流,单位时间内消费多少条记录factory.setPrefetchCount(prefetch);// json转消息//factory.setMessageConverter(new Jackson2JsonMessageConverter());//设置rabbit 确认消息的模式,默认是自动确认//factory.setAcknowledgeMode(AcknowledgeMode.AUTO);//设置rabbit 确认消息的模式,默认是自动确认factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}/*** 回调函数* @param connectionFactory* @return*/@Beanpublic RabbitTemplate createDirectRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Manatory,才能触发回调函数,无论消息推送结果怎么样都会强制调用回调函数rabbitTemplate.setMandatory(true);// 设置确认发送到交换机的回调函数 =》 消息推送到server,但是在server里找不到交换机 / 消息推送到sever,交换机和队列啥都没找到 / 消息推送到server,找到交换机了,但是没找到队列 / 消息推送成功rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if(ack){System.out.println("发送者消息确认成功!");}else{System.out.println("发送者消息确认是呗,考虑重发:"+cause);}//System.out.println("相关数据:"+correlationData);//System.out.println("确认情况:"+ack);//System.out.println("原因:"+cause);//System.out.println("===============================");});//设置确认消息已发送到队列的回调  =》 消息推送到server,找到交换机了,但是没找到队列 触发这个回调函数rabbitTemplate.setReturnsCallback(returnedMessage -> {System.out.println("交换机为:"+returnedMessage.getExchange());System.out.println("返回消息为:"+returnedMessage.getMessage());System.out.println("路由键为:"+returnedMessage.getRoutingKey());System.out.println("回应消息为:"+returnedMessage.getReplyText());System.out.println("回应代码为:"+returnedMessage.getReplyCode());System.out.println("===============================");});return rabbitTemplate;}@BeanQueue trafficSpikedQueue(){return new Queue("trafficSpikedQueue", true);}@BeanDirectExchange trafficSpikedExchange(){return new DirectExchange("trafficSpikedExchange");}@BeanBinding binding(Queue trafficSpikedQueue, DirectExchange trafficSpikedExchange){return BindingBuilder.bind(trafficSpikedQueue).to(trafficSpikedExchange).with("trafficSpikedKey");}//*/
}

四:创建生产者

package com.modules.controller.rabbitmq;import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class TrafficController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/java/traffic")public String sendTrafficMessage(@RequestParam String message){for (int i = 1; i <= 100; i++){// 使用java多线程来模拟多用户并发请求final int temp = i;new Thread(()->{// 给RabbitMQ发送消息rabbitTemplate.convertAndSend("trafficExchange","trafficKey","hello world:"+temp,new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException{// System.out.println("发送回调:"+temp);System.out.println(message);return message;}});}).start();}// rabbitTemplate.convertAndSend("trafficSpikedExchange", "trafficSpikedKey", message);return "Message sent";}
}

五:创建消费者

package com.modules.controller.rabbitmq;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.amqp.core.Message;
import com.rabbitmq.client.*;
import java.io.IOException;@Component
public class TrafficSpikedConsumer {@RabbitListener(queues = "trafficQueue")public void receiveMessage(Message message, Channel channel) throws InterruptedException, IOException{// 为了演示一个一个消费的情况,这里使用线程暂停来延迟控制台输出Thread.sleep(100);// =========================================// 处理消息,例如写入数据库或进行计算System.out.println("Received message: " + new String(message.getBody()));//System.out.println("channel: " + channel);// =========================================// 成功处理后手动确认消息long deliveryTag = message.getMessageProperties().getDeliveryTag();//System.out.println("deliveryTag:"+deliveryTag);channel.basicAck(deliveryTag, false);}
}

控制台输出的数据比较多。我这里就不做展示了。

PS:我这里测试的时候遇到一个小问题,发现消费者最后消费的数量跟生产者生产的数量对不上。我百思不得其解。这问题出在哪里呢?

后来,我才发现,我测试是在本地做的测试,对应的代码,我服务器端打包的jar里边也有一份,也就是说,我一个生产者,对应两个消费者(本地+服务器)这也是我本地消费者消费的数量跟生产数量不一致的原因。

以上大概就是Springboot集成RabbitMQ实现流量削峰添谷的一个小例子。

通过RabbitMQ的队列机制,可以有效地缓解高峰期的流量压力。

有好的建议,请在下方输入你的评论。


http://www.ppmy.cn/server/145394.html

相关文章

TCP IP协议和网络安全

传输层的两个协议&#xff1a; 可靠传输 TCP 分段传输 建立对话&#xff08;消耗系统资源&#xff09; 丢失重传netstat -n 不可靠传输 UDP 一个数据包就能表达完整的意思或屏幕广播 应用层协议&#xff08;默认端口&#xff09;&#xff1a; httpTCP80 网页 ftpTCP21验证用户身…

在Ubuntu2004中搭建基于ESP-IDF v5.1的ESP32-S3开发环境

在Ubuntu2004中搭建基于ESP-IDF v5.1的ESP32-S3开发环境 目录 1 基本资料 2 注意事项 2.1 子模块检出失败处理 2.2 选择 Espressif 下载服务器 2.3 自定义工具安装路径 2.4 导出环境变量 2.5 测试基础环境 3 创建自己的工程 3.1 创建基础应用工程 3.2 创建组件(…

Linux 虚拟机下安装RedisJSON

文章目录 一、安装 Redis二、安装RedisJSON 一、安装 Redis 安装地址 二、安装RedisJSON RedisJSON github 地址 选择版本&#xff0c;下载压缩包。 RedisJson 是根据 Rust 开发编译的&#xff0c;所以我们要在系统中安装 Rust。官网地址。 国内下载 Rust 下载较慢&#x…

Http 响应协议

HTTP的响应协议 响应数据格式 响应行 响应数据的第一行&#xff0c;包括协议、状态码、描述 响应头 从响应数据格式的第二行开始&#xff0c;也是以key:value的格式 响应体 和响应头之间有一个空行&#xff0c;是响应数据格式的最后一部分&#xff0c;用于存放响应的数据 常见响…

D2761 适合在个人电脑、便携式音响等系统中作音频限幅用。

概述&#xff1a; D2761是为保护扬声器所设计的音频限幅器&#xff0c;其限幅值可通过外接电阻来调节&#xff0c;适合在个人电脑、便携式音响等系统中作音频限幅用。D2761采用SSOP10、MSOP10、TSSOP14的封装形式封装。 主要特点&#xff1a;  工作电压范围宽&#xff1a;2.7…

idea怎么打开两个窗口,运行两个项目

今天在开发项目的时候&#xff0c;前端希望运行一下以前的项目&#xff0c;于是就需要开两个 idea 窗口&#xff0c;运行两个项目 这里记录一下如何设置&#xff1a;首先依次点击&#xff1a; File -> Settings -> Appearance & Behavior ->System Settings 看到如…

STM32中I2C总线中,允许从机控制SCL总线吗?

在I2C总线中&#xff0c;不允许从机控制SCL总线。关于I2C总线的控制&#xff0c;以下是详细解释&#xff1a; 一、I2C总线的基本构成 I2C&#xff08;Inter IC Bus&#xff09;是由Philips公司开发的一种通用数据总线&#xff0c;它只需要两根串行信号线&#xff1a;串行数据…

Windows安装Node.js

当出现这个的时候&#xff0c;是说明没有安装Node.js。 Node.js安装包及源码下载地址为&#xff1a;Node.js — Download Node.js Package Manager&#xff1a; 使用系统自带的包管理工具&#xff08;如 apt、yum、brew&#xff09;安装。自动管理依赖和更新&#xff0c;适合命…