单生产单消费模型,即基本消息模型或简单消费模型,即完成基本的一对一消息转发。
RabbitMQ 单生产单消费模型主要有以下三个角色构成:
- 生产者(producer/ publisher):一个发送消息的用户应用程序。
- 消费者(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
- 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。
总之:生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区!
~
本篇内容包括:RabbitMQ 基本(单生产单消费)消息模型、RabbitMQ 单生产消费模型实现、RabbitMQ 单生产消费模型实现(连接工具类封装)
文章目录
- 一、RabbitMQ 基本(单生产单消费)消息模型
- 1、单生产单消费模型(Hello World)
- 2、单生产单消费模型组成
- 3、参数细节
- 二、RabbitMQ 单生产消费模型实现
- 1、添加 Maven 依赖
- 2、生产者 实现
- 3、消费者 实现
- 三、RabbitMQ 单生产消费模型实现(连接工具类封装)
- 1、定义封装工具类 ConnectionUtil
- 2、生产者
- 3、消费者
一、RabbitMQ 基本(单生产单消费)消息模型
1、单生产单消费模型(Hello World)
单生产单消费模型,即基本消息模型或简单消费模型,即完成基本的一对一消息转发。
2、单生产单消费模型组成
RabbitMQ 单生产单消费模型主要有以下三个角色构成:
- 生产者(producer/ publisher):一个发送消息的用户应用程序。
- 消费者(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
- 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。
总之:生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区!
3、参数细节
channel.queueDeclare("HelloWorld",false,false,false,null);
- 参数1:队列名称,如果不存在则自动创建;
- 参数2:队列特性-是否持久化,只是持久化队列,如果持久化消息需要在
channel.basicPublish
参数3进行额外设置:MessageProperties.PERSISTENT_TEXT_PLAIN
; - 参数3:队列特性-是否独占队列,队列只能被同一个连接绑定;
- 参数4:队列特性-是否队列完成后自动删除队列,消费者完成消息后自动删除(消费者断开连接);
- 参数5:额外附加参数
二、RabbitMQ 单生产消费模型实现
1、添加 Maven 依赖
# 在 pom.xml 文件中添加以下依赖
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency>...
</dependencies>
2、生产者 实现
# Rabbit 基本消息模型 生产者-Producer
package com.lizhengi.hello;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.testng.annotations.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author liziheng* @version 1.0.0* @description Rabbit 基本消息模型 生产者* @date 2022-12-18 5:09 下午**/
public class Producer {/*** 消息生产** @throws IOException IOException* @throws TimeoutException TimeoutException*/@Testpublic void testSendMessage() throws IOException, TimeoutException {// 创建MQ连接工厂对象ConnectionFactory connectionFactory = new ConnectionFactory();// 设置连接MQ主机connectionFactory.setHost("127.0.0.1");// 设置连接MQ端口号connectionFactory.setPort(5672);// 设置连接MQ虚拟主机connectionFactory.setVirtualHost("/test");// 设置连接MQ虚拟主机的用户名密码connectionFactory.setUsername("admin");connectionFactory.setPassword("123456");// 获取连接对象Connection connection = new ConnectionFactory().newConnection();// 获取连接通道Channel channel = connection.createChannel();/* 通道绑定对应消息队列* 参数1:队列名称,如果不存在则自动创建;* 参数2:队列特性-是否持久化;* 参数3:队列特性-是否独占队列;* 参数4:队列特性-是否队列完成后自动删除队列;* 参数5:额外附加参数*/channel.queueDeclare("HelloWorld", false, false, false, null);/* 发布消息* 参数1:交换机名称;参数2:队列名称;参数3:传递消息额外值设置;参数4:具体消息内容*/channel.basicPublish("", "HelloWorld", null, "Hello RabbitMQ".getBytes());// 连接关闭channel.close();connection.close();}
}
3、消费者 实现
# Rabbit 基本消息模型 消费者-Customer
package com.lizhengi.hello;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author liziheng* @version 1.0.0* @description Rabbit 基本消息模型 消费者* @date 2022-12-18 5:19 下午**/
public class Customer {public static void main(String[] msg) throws IOException, TimeoutException {// 创建MQ连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/test");connectionFactory.setUsername("admin");connectionFactory.setPassword("123456");Connection connection = new ConnectionFactory().newConnection();Channel channel = connection.createChannel();channel.queueDeclare("HelloWorld", false, false, false, null);/* 消费消息* 参数1:消费队列名称;参数2:开始消费的自动确认机制;参数3:消费时的回调接口*/channel.basicConsume("HelloWorld", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {System.out.println("The Body:" + new String(body));}});// 关闭连接channel.close();connection.close();}
}
三、RabbitMQ 单生产消费模型实现(连接工具类封装)
1、定义封装工具类 ConnectionUtil
# 连接工具类封装-ConnectionUtil
package com.lizhengi.hello.demo2;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** @author liziheng* @version 1.0.0* @description 连接工具类封装* @date 2022-12-18 5:24 下午**/
public class ConnectionUtil {/*** 创建 MQ 连接工厂对象*/private static final ConnectionFactory CONNECTION_FACTORY;// 类加载时,重量级资源加载static {CONNECTION_FACTORY = new ConnectionFactory();//设置连接MQ主机CONNECTION_FACTORY.setHost("127.0.0.1");//设置连接MQ端口号CONNECTION_FACTORY.setPort(5672);//设置连接MQ虚拟主机CONNECTION_FACTORY.setVirtualHost("/test");//设置连接MQ虚拟主机的用户名密码CONNECTION_FACTORY.setUsername("admin");CONNECTION_FACTORY.setPassword("123456");}/*** 建立与RabbitMQ连接 工具方法** @return Connection*/public static Connection getConnection() {try {//返回连接对象return CONNECTION_FACTORY.newConnection();} catch (Exception e) {e.printStackTrace();}return null;}/*** 关闭通道和连接 工具方法** @param channel Channel* @param connection Connection*/public static void closeConnectionAndChanel(Channel channel, Connection connection) {try {if (channel != null) {channel.close();}if (connection != null) {connection.close();}} catch (Exception e) {e.printStackTrace();}}
}
2、生产者
# Rabbit 基本消息模型 生产者-Producer
package com.lizhengi.hello.demo2;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.testng.annotations.Test;import java.io.IOException;/*** @author liziheng* @version 1.0.0* @description Rabbit 基本消息模型 生产者* @date 2022-12-18 5:26 下午**/
public class Producer {/*** 消息生产* @throws IOException IOException*/@Testpublic void testSendMessage() throws IOException {Connection connection = ConnectionUtil.getConnection();assert connection != null;Channel channel = connection.createChannel();channel.queueDeclare("HelloWorld",false,false,false,null);channel.basicPublish("","HelloWorld",null,"Hello RabbitMQ".getBytes());//资源关闭ConnectionUtil.closeConnectionAndChanel(channel,connection);}
}
3、消费者
# Rabbit 基本消息模型 消费者-Customer
package com.lizhengi.hello.demo2;import com.rabbitmq.client.*;
import java.io.IOException;/*** @author liziheng* @version 1.0.0* @description Rabbit 基本消息模型 消费者* @date 2022-12-18 5:26 下午**/
public class Customer {public static void main(String[] msg) throws IOException {Connection connection = ConnectionUtil.getConnection();assert connection != null;Channel channel = connection.createChannel();channel.queueDeclare("HelloWorld",false,false,false,null);channel.basicConsume("HelloWorld",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {System.out.println("The Body:" + new String(body));}});}
}