一文搞清RabbitMQ的部署运维及使用

news/2024/11/20 8:28:09/

1.通过docker-compose安装RabbitMQ

1.0 初始化yum和Docker

yum update
yum install epel-release -y
yum clean all
yum list
yum install docker-io -y

1.1 dockerfile

FROM rabbitmq:management
MAINTAINER LCJ
# 添加插件到指定目录 可按照此方式自行扩展其他插件
# ADD ./rabbitmq_delayed_message_exchange-3.11.1.ez /plugins
# 开启管理界面插件
RUN rabbitmq-plugins enable rabbitmq_management
# 开启延迟队列插件
#RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange ENTRYPOINT ["rabbitmq-server"]

1.2 docker-compose.yml(单节点)

version: '3.5' 
services:rabbitmq:image: rabbitmq:3.11-alpine #镜像版本hostname: rabbit_1container_name: rabbitmqrestart: alwaysbuild:context: .ports:- "15672:15672"- "5672:5672"volumes:- ./data:/var/lib/rabbitmq- ./log:/var/log/rabbitmqenvironment:- RABBITMQ_DEFAULT_USER=admin- RABBITMQ_DEFAULT_PASS=123456network_mode: "bridge"

1.3服务器允许挂载的文件写

chmod +x  /root/rabbitmq/data
chmod +x /root/rabbitmq/log

1.3 rabbitMQ起起来进入manage界面

在这里插入图片描述

2 RabbitMQ基础知识简介

2.1概念

  • Broker:服务端程序,一个mq节点就是一个broker
  • Producer生产者:创建一个Message,然后发布到RabbitMQ中
  • Consumer消费者:消费队列里的消息
  • Message消息:生产消费的内容,有消息体,消息头,也包括多个属性,比如rountingKey路由键
  • Queue队列:是RabbitMQ的内部对象,用于存储消息,消息智能存储在队列中
  • Channel信道:支持多路复用的通道,独立双向数据流通,可以发布,订阅,接收消息
  • Connection链接Lsocket链接,封装了socket协议逻辑,一个链接上可以有多个channel
  • Exchange交换器:生产者将消息发送到exchange,交换机将消息路由到队列中,队列和交换机是多对多的关系。DIrect Exchange(定向) Fanout Exchange(广播) Tocpic Exchange通配符
  • RoutingKey 路由键:生产者将消息发送给交换机时,会指定RoutingKey,用来指定消息的路由规则
  • Binding绑定:通过绑定交换机与队列关联起来,在绑定时,会指定一个BindingKey,可以正确的将消息路由到队列。
  • Virtual host虚拟主机:独立的exchange和queue

2.2 RabbitMQ工作模式

  • 简单模式(单生产,单消费)
  • 工作队列(单生产,多消费,可以有轮询和公平策略) channel.basicQos(1)
  • fanout交换机,fanout交换机直接转发到队列,不需要指定routingkey
  • 路由模式:direct交换机,交换机和队列绑定,指定路由键,生产者发送消息到交换机,交换机根据信息的路由key进行转发到队列,消息要指定routingkey路由键
  • 通配符模式:交换机与队列绑定,指定通配符路由键,生产者发送消息到交换机,交换机根据消息的路由key转发到队列,消息要指定routingkey路由键

3.SpringBoot2.X+SpringAMQP整合RabbitMQ实战

3.1 AMQP依赖

 <!--引入AMQP--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

server:port: 8080#消息队列
spring:rabbitmq:host: xx.xx.xxvirtual-host: /devpassword: 123456username: admin#开启消息二次确认,生产者到broker的交换机publisher-confirm-type: correlated#开启消息二次确认,交换机到队列的可靠性投递publisher-returns: true#为true,则交换机处理消息到路由失败,则会返回给生产者template:mandatory: true#消息手工确认ACKlistener:simple:acknowledge-mode: manual

3.2 主题模式开发

RabbitM@Config文件

@Configuration
public class RabbitMQConfig {public static final String EXCHANGE_NAME = "order_exchange";public static final String QUEUE_NAME = "order_queue";/*** 交换机* @return*/@Beanpublic Exchange orderExchange() {return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();//return new TopicExchange(EXCHANGE_NAME, true, false);}/*** 队列* @return*/@Beanpublic Queue orderQueue() {return QueueBuilder.durable(QUEUE_NAME).build();//return new Queue(QUEUE_NAME, true, false, false, null);}/*** 交换机和队列绑定关系*/@Beanpublic Binding orderBinding(Queue queue, Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();//return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, "order.#", null);}}

生产者;

@SpringBootTest
class DemoApplicationTests {@Autowiredprivate RabbitTemplate template;@Testvoid send() {template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","新订单来啦1");}
}

消费者

@Component
@RabbitListener(queues = "order_queue")
public class OrderMQListener {/*** RabbitHandler 会自动匹配 消息类型(消息自动确认)* @param msg* @param message* @throws IOException*/@RabbitHandlerpublic void releaseCouponRecord(String msg, Message message) throws IOException {long msgTag = message.getMessageProperties().getDeliveryTag();System.out.println("msgTag="+msgTag);System.out.println("message="+message.toString());System.out.println("监听到消息:消息内容:"+message.getBody());}}

3.3 消息可靠性投递

3.3.1 什么是消息可靠性投递

  • 保证消息送到消息队列中
  • 保证mq节点接收消息
  • 消息发送端到mq服务端接收到消息的确认应答
  • 完善的消息补偿机制,发送失败二次处理

3.3.2 RabbitMQ的消息可靠性投递

生产者-> 交换机 -> 队列 -> 消费者
通过两个节点控制消息可靠性投递

  • 生产者->交换机:confirmCallBack
  • 交换机-> 队列 -> returnCallBack

在这里插入图片描述

@Autowiredprivate RabbitTemplate template;@Testvoid testConfirmCallback() {
​template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/**** @param correlationData 配置* @param ack 交换机是否收到消息,true是成功,false是失败* @param cause 失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("confirm=====>");System.out.println("confirm==== ack="+ack);System.out.println("confirm==== cause="+cause);//根据ACK状态做对应的消息更新操作 TODO}});template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME+,"order.new","新订单来啦1");}
 @Testvoid testReturnCallback() {//为true,则交换机处理消息到路由失败,则会返回给生产者//开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {int code = returned.getReplyCode();System.out.println("code="+code);System.out.println("returned="+returned.toString());}});template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"xxx.order.new","新订单来啦11");}

4 Rabbitmq的消息确认机制ACK实战+DeliveryTag介绍

spring:rabbitmq:#开启手动确认消息,如果消息重新入队,进行重试listener:simple:acknowledge-mode: manual
@RabbitHandler
public void releaseCouponRecord(String body, Message message, Channel channel) throws IOException {long msgTag = message.getMessageProperties().getDeliveryTag();System.out.println("msgTag="+msgTag);System.out.println("message="+message.toString());System.out.println("body="+body);//成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除//channel.basicAck(msgTag,false);//channel.basicNack(msgTag,false,true);}

5 RabbitMQ延迟队列主题模式综合实战

需求:商家新建账户检查是否在规定时间内商家商品,否则对该账户做出处理
RabbitMQConfig

package net.xdclass.xdclasssp.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** 小滴课堂,愿景:让技术不再难学**  新商家审核通过->new_merchant_queue -> 死信消息交换机 -> 死信队列** @Description* @Author 二当家小D* @Remark 有问题直接联系我,源码-笔记-技术交流群* @Version 1.0**/@Configurationpublic class RabbitMQConfig {/*** 死信队列*/public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue";/*** 死信交换机*/public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange";/*** 进入死信队列的路由key*/public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key";/*** 创建死信队列* @return*/@Beanpublic Queue lockMerchantDeadQueue(){return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build();}/*** 绑定死信交换机和死信队列* @return*/@Beanpublic Binding lockMerchantBinding(){return new Binding(LOCK_MERCHANT_DEAD_QUEUE,Binding.DestinationType.QUEUE,LOCK_MERCHANT_DEAD_EXCHANGE,LOCK_MERCHANT_ROUTING_KEY,null);}/*** 普通队列,绑定的个死信交换机*/public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue";/*** 普通的topic交换机*/public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange";/*** 路由key*/public static final String NEW_MERCHANT_ROUTIING_KEY = "new_merchant_routing_key";/*** 创建普通交换机* @return*/@Beanpublic Exchange newMerchantExchange(){return new TopicExchange(NEW_MERCHANT_EXCHANGE,true,false);}/*** 创建普通队列* @return*/@Beanpublic Queue newMerchantQueue(){Map<String,Object> args = new HashMap<>(3);//消息过期后,进入到死信交换机args.put("x-dead-letter-exchange",LOCK_MERCHANT_DEAD_EXCHANGE);//消息过期后,进入到死信交换机的路由keyargs.put("x-dead-letter-routing-key",LOCK_MERCHANT_ROUTING_KEY);//过期时间,单位毫秒args.put("x-message-ttl",10000);return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build();}/*** 绑定交换机和队列* @return*/@Beanpublic Binding newMerchantBinding(){return new Binding(NEW_MERCHANT_QUEUE,Binding.DestinationType.QUEUE,NEW_MERCHANT_EXCHANGE,NEW_MERCHANT_ROUTIING_KEY,null);}

生产者

 @GetMapping("check")public Object check(){//修改数据库的商家账号状态  TODOrabbitTemplate.convertAndSend(RabbitMQConfig.NEW_MERCHANT_EXCHANGE,RabbitMQConfig.NEW_MERCHANT_ROUTIING_KEY,"商家账号通过审核");Map<String,Object> map = new HashMap<>();map.put("code",0);map.put("msg","账号审核通过,请10秒内上传1个商品");return map;}

消费者

@Component
@RabbitListener(queues = "lock_merchant_dead_queue")
public class MerchantMQListener {@RabbitHandlerpublic void messageHandler(String body, Message message, Channel channel) throws IOException {long msgTag = message.getMessageProperties().getDeliveryTag();System.out.println("msgTag="+msgTag);System.out.println("body="+body);//做复杂业务逻辑  TODO 商家业务//告诉broker,消息已经被确认channel.basicAck(msgTag,false);}}

http://www.ppmy.cn/news/79403.html

相关文章

R1CS和relaxed R1CS(一)

符号说明 F &#xff1a;有限域 ∘ \circ ∘ : 内积 1. R1CS RlCS定义: ( A , B , C , m , n , l ) (A,B,C,m,n,l) (A,B,C,m,n,l)&#xff0c;其中 m 、 n 、 l m、n、l m、n、l为正整数&#xff0c;且 m > l m>l m>l, A , B , C ∈ F m m A,B,C \in F^{m \time…

在flask项目中添加日志记录功能

入口文件中添加以下代码&#xff1a; # 创建日志记录器 logger logging.getLogger(my_logger) logger.setLevel(logging.INFO) # 创建处理程序&#xff08;普通日志&#xff09; handler RotatingFileHandler(logs/app.log, encodingutf-8-sig, maxBytes10485760, backupCou…

Vue2+CSS实现一个瀑布流布局案例

在练习代码的时候&#xff0c;看到了携程的首页下方的布局还挺好看 就是一个瀑布流的布局效果&#xff0c;在携程上是一共两列布局&#xff0c;然后每个格子的高度都会根据图片的高度做排布 一开始是想使用flex进行布局&#xff0c;先让每个格子各占百分之49&#xff0c;然后贴…

【算法】算法学习五:加权图 | 狄克斯特拉算法

文章目录 一、加权图二、负权边三、狄克斯特拉算法3.1 理论知识3.2 案例说明3.3 Python代码实现 一、加权图 加权图是指在图的边上赋予了权重&#xff08;或距离&#xff09;的图。每条边都带有一个数值&#xff0c;表示该边的权重。这种权重可以表示不同的度量&#xff0c;如…

day18文件上传下载与三层架构思想

servlet文件上传 注意事项:在写了响应后,若后面还需要执行代码,需要添加return; apach的servlet3.0提供了文件上传的功能. **在客户端中的jsp如何上传文件:**使用form标签 使用input标签type的file属性 form表单中的的enctype必须加:使用二进制的方式进行传输,否则不能进行…

蓝精灵协会:如何将传统 IP 融入 Web3

作者&#xff1a;Cedric Hervet&#xff0c;联合创始人&#xff0c;创意总监 我和许多项目合作过&#xff0c;并且担任了近 30 年的艺术总监和创意总监。我的方法一直是创造同质化的宇宙&#xff0c;把观众带入并使他们产生梦想。但我也曾系统地寻找过那份额外的感动&#xff1…

NeRF-VAE:将场景看作一个分布【ICML‘2021】

文章目录 GQN网络介绍Amortized InferenceNeRF-VAE GQN网络介绍 论文标题&#xff1a;Neural scene representation and rendering 作者&#xff1a;S. M. Ali Eslami, Danilo Jimenez Rezende, et al. 期刊&#xff1a;Science 发表时间&#xff1a;2018/06/15 该文章提出…

map reduce实现累加器

需求&#xff1a;数组长度为100&#xff0c;每一项为对应下标&#xff0c;累加求和。 切题思路&#xff1a; 1.如何声明一个长度为100的数组&#xff1f;答&#xff1a;new Array(100) 2.数组每一项如何比前一项1 答&#xff1a;map(item,index)index为数组下标&#xff0c;…