RabbitMQ02-RebbitMQ简介及交换器

news/2024/9/23 19:24:15/

一. AMQP协议

什么是AMQP协议

AMQP(Advanced Message Queuing Protocol,高级消息队列协议):它是进程之间传递异步消息的网络协议

AMQP工作过程

发布者通过发布消息,通过交换机,交换机根据路由规则将收到的消息分发交换机绑定的下消息队列,最后AMQP代理将消息推送给订阅了此队列的消费者
或消费者按照需求自行获取。

二. RabbitMQ简介

RabbitMQ是通过Erlang语言基于AMQP协议编写的消息中间件,它在分布式系统中可以解应用耦合、流量削峰、异步消息等问题。它有两个特性
队列排队和异步

  1. 应用解耦:多个个应用程序之间可通过RabbitMQ作为媒介,两个应用不再粘连,实现解耦;
  2. 异步消息:多个应用可通过RabbitMQ进行消息传递;
  3. 流量削峰:在高并发情况下,可以通过RabbitMQ的队列特性实现流量削峰;
  4. 应用场景:
    1. 应用到队列特性的应用场景: 排序算法、秒杀活动。
    2. 应用到异步特性的应用场景: 消息分发、异步处理、数据同步、处理耗时任务。

三.springBoot整合RabbitMQ

生产者端发送消息

pom文件

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.6.3</version></dependency>

yml文件

spring:application:name: producerrabbitmq:host: xxxusername: adminpassword: admin

配置类,需要返回一个Queue,org.springframework.amqp.core.Queue下的Queue对象

@Configuration
public class RabbitMqConfig {@Beanprotected Queue queue(){return new Queue("myQueue");}
}

使用RabbitMQ发送消息,注入AmqpTemplate,调用convertAndSend()方法

class ProducerApplicationTests {@Autowiredprivate AmqpTemplate amqpTemplate;@Testvoid send() {for (int i = 0; i < 10; i++) {amqpTemplate.convertAndSend("myQueue","这是发送的消息");System.out.println("发送成功!");}}}

消费端接收消息

配置同生产端,不需要配置RabbitMqConfig,接收消息时只需要使用注解RabbitMqConfig,queues属性绑定相应的队列即可。

@Component
public class ReceiveService {@RabbitListener(queues = "myQueue")public void test01(String msg){System.out.println("接收到消息1" + msg);}@RabbitListener(queues = "myQueue")public void test02(String msg){System.out.println("接收到消息2" + msg);}@RabbitListener(queues = "myQueue")public void test03(String msg){System.out.println("接收到消息3" + msg);}
}

四.交换器(四种)

Direct Exchange:直连交换器

它是RabbitMQ的默认交换器,给指定队列发消息,绑定该消息队列的消费者一次获取消息

实战:

/** 生产者发送消息,发送10个消息*/
@SpringBootTest
class ProducerApplicationTests {@Autowiredprivate AmqpTemplate amqpTemplate;@Testvoid send() {for (int i = 0; i < 10; i++) {amqpTemplate.convertAndSend("myQueue","这是发送的消息");System.out.println("发送成功!");}}}
/** 接收消息*/
@Component
public class ReceiveService {@RabbitListener(queues = "myQueue")public void test01(String msg){System.out.println("接收到消息1" + msg);}@RabbitListener(queues = "myQueue")public void test02(String msg){System.out.println("接收到消息2" + msg);}@RabbitListener(queues = "myQueue")public void test03(String msg){System.out.println("接收到消息3" + msg);}
}

结果:可以看到1、2、3依次接收消息

接收到消息1这是发送的消息
接收到消息2这是发送的消息
接收到消息3这是发送的消息
接收到消息2这是发送的消息
接收到消息3这是发送的消息
接收到消息1这是发送的消息
接收到消息3这是发送的消息
接收到消息1这是发送的消息
接收到消息2这是发送的消息
接收到消息1这是发送的消息

Fanout Exchange:扇形交换器

绑定该交换器的所有队列都可以接收到消息,扇形交换机将消息广播到所有与之绑定的队列。无论消息的路由键是什么,扇形交换机都会将消息发送到所有绑定的队列中。这种类型的交换机常用于实现发布-订阅模式,将消息广播给多个消费者。

实战

/** 绑定*/
/** Fanout Exchange*/
@Bean
public Queue FanoutExchangeQueue1(){return new Queue("fanoutExchangeQueue1");}
@Bean
public Queue FanoutExchangeQueue2(){return new Queue("fanoutExchangeQueue2");}
@Bean
public FanoutExchange fanoutExchange(){return new FanoutExchange("amq.fanout");}
@Bean
public Binding  FanoutExchangeBinding1(Queue FanoutExchangeQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(FanoutExchangeQueue1).to(fanoutExchange);}
@Bean
public Binding  FanoutExchangeBinding2(Queue FanoutExchangeQueue2,FanoutExchange fanoutExchange){return BindingBuilder.bind(FanoutExchangeQueue2).to(fanoutExchange);}
/** 生产者发送消息*/@Testvoid sendByFanoutExchange() {amqpTemplate.convertAndSend("amq.fanout","key","这是发送到的消息");System.out.println("发送成功!");}
    /** 消费者 Direct Exchange*/@RabbitListener(queues = "fanoutExchangeQueue1")public void test04(String msg){System.out.println("接收到消息4" + msg);}@RabbitListener(queues = "fanoutExchangeQueue2")public void test05(String msg){System.out.println("接收到消息5" + msg);}

结果:每一个绑定到Fanout Exchange上的队列都可以接收到消息

接收到消息4这是发送到的消息
接收到消息5这是发送到的消息

Topic Exchange:主题交换器

允许在路由键中设置匹配规则:'*‘代表一个字母两个’.'之间的内容;‘#’代表0或多个字符;

实战

    /** 绑定*/@Beanpublic Queue topicExchangeQueue1(){return new Queue("topicExchangeQueue1");}@Beanpublic Queue topicExchangeQueue2(){return new Queue("topicExchangeQueue2");}@Beanpublic TopicExchange topicExchange(){return new TopicExchange("amq.topic");}@Beanpublic Binding TopicExchangeToQueue1(Queue topicExchangeQueue1,TopicExchange topicExchange){return BindingBuilder.bind(topicExchangeQueue1).to(topicExchange).with("com.shaoby.*");}@Beanpublic Binding TopicExchangeToQueue2(Queue topicExchangeQueue2,TopicExchange topicExchange){return BindingBuilder.bind(topicExchangeQueue2).to(topicExchange).with("com.shaoby.test.#");}
    /**生产者发送消息*//** key为com.shaoby.test*/@Testvoid sendByTopicExchange() {amqpTemplate.convertAndSend("amq.topic","com.shaoby.test","这是发送到的消息");System.out.println("发送成功!");}/** key为com.shaoby.test.a*/@Testvoid sendByTopicExchange() {amqpTemplate.convertAndSend("amq.topic","com.shaoby.test.a.b","这是发送到的消息");System.out.println("发送成功!");}
    /**消费者接收消息*//**Topic Exchange*/@RabbitListener(queues = "topicExchangeQueue1")public void test06(String msg){System.out.println("接收到消息6" + msg);}@RabbitListener(queues = "topicExchangeQueue2")public void test07(String msg){System.out.println("接收到消息7" + msg);}

结果:

路由key为com.shaoby.test都能接收到消息,com.shaoby.test.a.b只有topicExchangeQueue2能接收到消息

Header Exchange:首部交换器

绑定:

/** Header Exchange*/
@Bean
public Queue headerExchangeQueue1(){return new Queue("headerExchangeQueue1");}@Bean
public Queue headerExchangeQueue2(){return new Queue("headerExchangeQueue2");}
@Bean
public HeadersExchange headersExchange(){return new HeadersExchange("amp.header");}
@Bean
public Binding headExchangeToQueue1(Queue headerExchangeQueue1,HeadersExchange headersExchange){HashMap<String, Object> map = new HashMap<>();map.put("type","OK");map.put("status","200");return BindingBuilder.bind(headerExchangeQueue1).to(headersExchange).whereAll(map).match();}
@Bean
public Binding headExchangeToQueue2(Queue headerExchangeQueue2,HeadersExchange headersExchange){HashMap<String, Object> map = new HashMap<>();map.put("type","error");map.put("status","500");return BindingBuilder.bind(headerExchangeQueue2).to(headersExchange).whereAll(map).match();}
/** 生产者发送消息*/
@Testvoid sendByHeadExchange() {Map<String, Object> headers = new HashMap<>();headers.put("type","OK");headers.put("status","200");String message = "这是发送到的消息";MessageProperties messageProperties = new MessageProperties();headers.forEach(messageProperties::setHeader);Message msg = new Message(message.getBytes(), messageProperties);amqpTemplate.convertAndSend("amp.header",null, msg);System.out.println("发送成功!");}
    @RabbitListener(queues = "headerExchangeQueue1")public void test08(Message msg){System.out.println("接收到消息8:" + msg.toString());}@RabbitListener(queues = "headerExchangeQueue2")public void test09(Message msg){System.out.println("接收到消息9:" + msg.toString());}

结果:只有匹配上header才能收到消息

接收到消息8:(Body:'[B@a7b38a8(byte[24])' MessageProperties [headers={type=OK, status=200}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=amp.header, receivedRoutingKey=, deliveryTag=2, consumerTag=amq.ctag-1WTdKW4n_rAEdJUosQD7bg, consumerQueue=headerExchangeQueue1])

http://www.ppmy.cn/news/1463898.html

相关文章

【Spring Boot】深度复盘在开发搜索引擎项目中重难点的整理,以及遇到的困难和总结

&#x1f493; 博客主页&#xff1a;从零开始的-CodeNinja之路 ⏩ 收录文章&#xff1a;【Spring Boot】深度复盘在开发搜索引擎项目中重难点的整理&#xff0c;以及遇到的困难和总结 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 目录 什么是搜索引…

爬虫案例-亚马逊反爬流程分析梳理(验证码突破)(x-amz-captcha)

总体概览&#xff1a;核心主要是需要突破该网站的验证码&#xff0c;成功后会返回我们需要的参数后再去请求一个中间页&#xff08;类似在后台注册一个session&#xff09;&#xff0c;最后需要注意一下 IP 是不能随意切换的 主要难点&#xff1a; 1、梳理整体反爬流程 2、验证…

第53期|GPTSecurity周报

GPTSecurity是一个涵盖了前沿学术研究和实践经验分享的社区&#xff0c;集成了生成预训练Transformer&#xff08;GPT&#xff09;、人工智能生成内容&#xff08;AIGC&#xff09;以及大语言模型&#xff08;LLM&#xff09;等安全领域应用的知识。在这里&#xff0c;您可以找…

oracle.jdbc.OracleDatabaseException: ORA-00911: 无效字符

先吐槽一句&#xff0c;oracle 真坑啊&#xff01; 一个很正常的sql 语句一直报 ORA-00911: 无效字符 &#xff0c;拿到数据库去执行一点问题没有&#xff0c;一运行代码就报错&#xff0c;然后一个字符一个字符的对比&#xff0c;竟然是因为sql 结尾的一个 ";" 导致…

rk3568_mutex

文章目录 前言1、什么是mutex?1.1mutex互斥体API函数二、实验2.1实验目的2.2源码2.3结果图前言 本文记录的是rk3568开发板基础上做的mutex实验 1、什么是mutex? mutex是互斥体,它是比信号量semaphore更加专业的机制。 在我们编写Linux驱动的时候遇到需要互斥的地方建议使用…

在VS Code中进行Java的单元测试

在VS Code中可以使用 Test Runner for Java扩展进行Java的测试执行和调试。 Test Runner for Java的功能 Test Runner for Java 结合 Language Support for Java by Red Hat 和 Debugger for Java这两个插件提供如下功能&#xff1a; 运行测试&#xff1a; Test Runner for …

微信小程序自定义头部

1.在对应界面的json文件&#xff0c;将navigationStyle属性设置为“custom” "navigationStyle":"custom" 2. 状态栏的高度可以通过 wx.getSystemInfo() 获取。 胶囊按钮的信息可以通过 wx.getMenuButtonBoundingClientRect() 获取。 导航栏高度状态栏…

python使用多种方法计算列表元素平方的技巧

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、使用列表推导式进行元素平方 二、使用map函数进行元素平方 三、循环遍历列表进行元素平…