RabbitMQ:基本消息模型

news/2025/2/12 2:30:33/

单生产单消费模型,即基本消息模型或简单消费模型,即完成基本的一对一消息转发。

RabbitMQ 单生产单消费模型主要有以下三个角色构成:

  • 生产者(producer/ publisher):一个发送消息的用户应用程序。
  • 消费者(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
  • 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

总之:生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区!

~

本篇内容包括:RabbitMQ 基本(单生产单消费)消息模型、RabbitMQ 单生产消费模型实现、RabbitMQ 单生产消费模型实现(连接工具类封装)


文章目录

    • 一、RabbitMQ 基本(单生产单消费)消息模型
        • 1、单生产单消费模型(Hello World)
        • 2、单生产单消费模型组成
        • 3、参数细节
    • 二、RabbitMQ 单生产消费模型实现
        • 1、添加 Maven 依赖
        • 2、生产者 实现
        • 3、消费者 实现
    • 三、RabbitMQ 单生产消费模型实现(连接工具类封装)
        • 1、定义封装工具类 ConnectionUtil
        • 2、生产者
        • 3、消费者


一、RabbitMQ 基本(单生产单消费)消息模型

1、单生产单消费模型(Hello World)

单生产单消费模型,即基本消息模型或简单消费模型,即完成基本的一对一消息转发。

image-20221201012211684

2、单生产单消费模型组成

RabbitMQ 单生产单消费模型主要有以下三个角色构成:

  • 生产者(producer/ publisher):一个发送消息的用户应用程序。
  • 消费者(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
  • 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

总之:生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区!

3、参数细节

channel.queueDeclare("HelloWorld",false,false,false,null);
  • 参数1:队列名称,如果不存在则自动创建;
  • 参数2:队列特性-是否持久化,只是持久化队列,如果持久化消息需要在 channel.basicPublish 参数3进行额外设置:MessageProperties.PERSISTENT_TEXT_PLAIN
  • 参数3:队列特性-是否独占队列,队列只能被同一个连接绑定;
  • 参数4:队列特性-是否队列完成后自动删除队列,消费者完成消息后自动删除(消费者断开连接);
  • 参数5:额外附加参数

二、RabbitMQ 单生产消费模型实现

1、添加 Maven 依赖

# 在 pom.xml 文件中添加以下依赖

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency>...
</dependencies>

2、生产者 实现

# Rabbit 基本消息模型 生产者-Producer

package com.lizhengi.hello;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.testng.annotations.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author liziheng* @version 1.0.0* @description Rabbit 基本消息模型 生产者* @date 2022-12-18 5:09 下午**/
public class Producer {/*** 消息生产** @throws IOException      IOException* @throws TimeoutException TimeoutException*/@Testpublic void testSendMessage() throws IOException, TimeoutException {// 创建MQ连接工厂对象ConnectionFactory connectionFactory = new ConnectionFactory();// 设置连接MQ主机connectionFactory.setHost("127.0.0.1");// 设置连接MQ端口号connectionFactory.setPort(5672);// 设置连接MQ虚拟主机connectionFactory.setVirtualHost("/test");// 设置连接MQ虚拟主机的用户名密码connectionFactory.setUsername("admin");connectionFactory.setPassword("123456");// 获取连接对象Connection connection = new ConnectionFactory().newConnection();// 获取连接通道Channel channel = connection.createChannel();/*  通道绑定对应消息队列*  参数1:队列名称,如果不存在则自动创建;*  参数2:队列特性-是否持久化;*  参数3:队列特性-是否独占队列;*  参数4:队列特性-是否队列完成后自动删除队列;*  参数5:额外附加参数*/channel.queueDeclare("HelloWorld", false, false, false, null);/*  发布消息*  参数1:交换机名称;参数2:队列名称;参数3:传递消息额外值设置;参数4:具体消息内容*/channel.basicPublish("", "HelloWorld", null, "Hello RabbitMQ".getBytes());// 连接关闭channel.close();connection.close();}
}

3、消费者 实现

# Rabbit 基本消息模型 消费者-Customer

package com.lizhengi.hello;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author liziheng* @version 1.0.0* @description Rabbit 基本消息模型 消费者* @date 2022-12-18 5:19 下午**/
public class Customer {public static void main(String[] msg) throws IOException, TimeoutException {// 创建MQ连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/test");connectionFactory.setUsername("admin");connectionFactory.setPassword("123456");Connection connection = new ConnectionFactory().newConnection();Channel channel = connection.createChannel();channel.queueDeclare("HelloWorld", false, false, false, null);/*  消费消息*  参数1:消费队列名称;参数2:开始消费的自动确认机制;参数3:消费时的回调接口*/channel.basicConsume("HelloWorld", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {System.out.println("The Body:" + new String(body));}});// 关闭连接channel.close();connection.close();}
}

三、RabbitMQ 单生产消费模型实现(连接工具类封装)

1、定义封装工具类 ConnectionUtil

# 连接工具类封装-ConnectionUtil

package com.lizhengi.hello.demo2;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** @author liziheng* @version 1.0.0* @description 连接工具类封装* @date 2022-12-18 5:24 下午**/
public class ConnectionUtil {/*** 创建 MQ 连接工厂对象*/private static final ConnectionFactory CONNECTION_FACTORY;// 类加载时,重量级资源加载static {CONNECTION_FACTORY = new ConnectionFactory();//设置连接MQ主机CONNECTION_FACTORY.setHost("127.0.0.1");//设置连接MQ端口号CONNECTION_FACTORY.setPort(5672);//设置连接MQ虚拟主机CONNECTION_FACTORY.setVirtualHost("/test");//设置连接MQ虚拟主机的用户名密码CONNECTION_FACTORY.setUsername("admin");CONNECTION_FACTORY.setPassword("123456");}/*** 建立与RabbitMQ连接 工具方法** @return Connection*/public static Connection getConnection() {try {//返回连接对象return CONNECTION_FACTORY.newConnection();} catch (Exception e) {e.printStackTrace();}return null;}/*** 关闭通道和连接 工具方法** @param channel    Channel* @param connection Connection*/public static void closeConnectionAndChanel(Channel channel, Connection connection) {try {if (channel != null) {channel.close();}if (connection != null) {connection.close();}} catch (Exception e) {e.printStackTrace();}}
}

2、生产者

# Rabbit 基本消息模型 生产者-Producer

package com.lizhengi.hello.demo2;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.testng.annotations.Test;import java.io.IOException;/*** @author liziheng* @version 1.0.0* @description Rabbit 基本消息模型 生产者* @date 2022-12-18 5:26 下午**/
public class Producer {/*** 消息生产* @throws IOException IOException*/@Testpublic void testSendMessage() throws IOException {Connection connection = ConnectionUtil.getConnection();assert connection != null;Channel channel = connection.createChannel();channel.queueDeclare("HelloWorld",false,false,false,null);channel.basicPublish("","HelloWorld",null,"Hello RabbitMQ".getBytes());//资源关闭ConnectionUtil.closeConnectionAndChanel(channel,connection);}
}

3、消费者

# Rabbit 基本消息模型 消费者-Customer

package com.lizhengi.hello.demo2;import com.rabbitmq.client.*;
import java.io.IOException;/*** @author liziheng* @version 1.0.0* @description Rabbit 基本消息模型 消费者* @date 2022-12-18 5:26 下午**/
public class Customer {public static void main(String[] msg) throws IOException {Connection connection = ConnectionUtil.getConnection();assert connection != null;Channel channel = connection.createChannel();channel.queueDeclare("HelloWorld",false,false,false,null);channel.basicConsume("HelloWorld",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {System.out.println("The Body:" + new String(body));}});}
}

http://www.ppmy.cn/news/6590.html

相关文章

Allegro如何设置格点操作指导

Allegro如何设置格点操作指导 在PCB设计的时候,格点是一个很好用的工具,Allegro可以设置格点 具体操作如下 选择Setup-GirdsNon-Etch和All Etch中的X和Y都输入5,代表都是5的格点

Linux内核死锁检测工具——Lockdep

文章目录前言配置内核简单的AB-BA死锁案例实际项目中的死锁前言 死锁是指两个或多个进程因争夺资源而造成的互相等待的现象&#xff0c;如进程A需要资源X&#xff0c;进程B需要资源Y&#xff0c;而双方都掌握对方所需要的资源&#xff0c;且都不释放&#xff0c;这会导致死锁。…

前端css样式小知识点(2)

文章目录前言图文实操讲解1、微信小程序之页面跳转方法页面跳转有很多种方式&#xff0c;先简单说一下它们的区别吧简单实现2、微信小程序this.setData修改对象、数组中的值3、微信小程序-获取input值的方法4、微信小程序-常用的三种弹窗5、dataset 简单小知识 误区6、that.set…

【Linux】Linux编译器gcc/g++的使用

今天不学习&#xff0c;明天变垃圾。 文章目录一、程序的翻译过程1.预处理&#xff08;1.2.3把你的代码编译成二进制代码&#xff09;2.编译&#xff08;C语言 > 汇编语言&#xff09;3.汇编&#xff08;无法被执行的二进制文件&#xff0c;为什么捏&#xff1f;&#xff09…

云原生之部署wordpress博客及设置圣诞主题风格

2022年圣诞节到来啦&#xff0c;很高兴这次我们又能一起度过~ CSDN诚邀各位技术er分享关于圣诞节的各种技术创意&#xff0c;展现你与众不同的精彩&#xff01;参与本次投稿即可获得【话题达人】勋章【圣诞快乐】定制勋章&#xff08;1年1次&#xff0c;错过要等下一年喔&#…

C语言的一些基本概念

数据类型&#xff1a; 基本类型&#xff1a;整数&#xff08;unsigned/signed char、unsigned/signed int、unsigned/signed short、unsigned/signed long、unsigned/signed long long&#xff09;、浮点数&#xff08;float、double、long double&#xff09;枚举类型&#x…

vue实现随机生成分享海报(内容动态)

大家好&#xff0c;我是雄雄。 前言 昨天写了篇文章&#xff1a;自己整理的vue实现生成分享海报&#xff08;含二维码&#xff09;&#xff0c;看着网上的没实现 主要是介绍了如何使用vue实现&#xff0c;动态分享内容为海报&#xff0c;且附带二维码&#xff0c;扫描二维码能…

【算法与数据结构】排序详解(C语言)

目录 前言 插入排序 希尔排序 选择排序 堆排序 冒泡排序 快速排序 hoare版本 ​编辑 挖坑法 前后指针版本 优化 非递归实现 归并排序 非递归实现 前言 &#x1f384;在生活中我们必不可少的就是对一组数据进行排序&#xff0c;所谓排序&#xff0c;就是使一串…