RabbitMQ中点对点(Point-to-Point)通讯方式的Java实现

server/2024/12/14 19:06:37/

RabbitMQ是一个广泛使用的开源消息代理软件,它实现了高级消息队列协议(AMQP)。RabbitMQ支持多种消息传递模式,其中最基本的是点对点(Point-to-Point)通讯方式。在这种模式下,消息生产者将消息发送到一个队列,而消息消费者从该队列中接收消息。每个消息只会被一个消费者消费一次。

下面将通过一个简单的Java代码案例,详细介绍如何在RabbitMQ中实现点对点通讯。

1. 原理分析

  • 一个生产者,一个默认的交换机,一个队列,一个消费者
  • 看起来是生产者直接发送到队列,实际上是发送到了默认交换机

在这里插入图片描述

结构图:

在这里插入图片描述

2. 环境准备

在开始之前,请确保你已经安装了以下环境:

  • Java Development Kit (JDK) 8 或更高版本
  • Apache Maven
  • RabbitMQ 服务器

你可以通过以下命令检查Java和Maven的安装情况:

java -version
mvn -version

3. 添加RabbitMQ依赖

首先,我们需要在Maven项目中添加RabbitMQ的Java客户端依赖。在你的pom.xml文件中添加以下内容:

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version></dependency>
</dependencies>

4. 创建消息生产者

接下来,我们创建一个消息生产者,它将消息发送到RabbitMQ的队列中。

java">import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class MessageProducer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.138");factory.setPort(5672);factory.setVirtualHost("/test");factory.setUsername("test");factory.setPassword("test");// 创建连接和通道try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 发送消息String message = "Hello World!aaa";// 发布消息到exchange,同时指定路由的规则// 参数1:指定exchange,使用""。代表默认交换机// 参数2:指定路由的规则,使用具体的队列名称。// 参数3:指定传递的消息所携带的properties,使用null。// 参数4:指定发布的具体消息,byte[]类型channel.basicPublish("", QUEUE_NAME, null, message.getBytes());// Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。System.out.println(" [x] Sent '" + message + "'");}}
}

代码解析:

  1. ConnectionFactory: 用于创建到RabbitMQ服务器的连接。
  2. Connection: 代表与RabbitMQ服务器的物理连接。
  3. Channel: 用于发送和接收消息的通道。
  4. queueDeclare: 声明一个队列,如果队列不存在,则会创建它。
  5. basicPublish: 将消息发送到指定的队列。

默认交换机上可以看到生产者发送的消息:

在这里插入图片描述

5. 创建消息消费者

接下来,我们创建一个消息消费者,它将从RabbitMQ的队列中接收消息。

java">import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class MessageConsumer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.138");factory.setPort(5672);factory.setVirtualHost("/test");factory.setUsername("test");factory.setPassword("test");// 创建连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列//参数1:queue - 指定队列的名称//参数2:durable - 当前队列是否需要持久化(true)//参数3:exclusive:是否排外的,有两个作用://      一:当连接关闭时connection.close()该队列是否会自动删除;//      二:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,//          如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常://          com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)//          一般等于true的话用于一个队列只能有一个消费者来消费的场景//参数4:autoDelete - 如果这个队列没有消费者在消费,并且所有消息都消费完,队列自动删除//参数5:arguments - 指定当前队列的其他信息channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 设置消息回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};// 开始消费消息channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}

代码解析:

  1. DeliverCallback: 用于处理接收到的消息的回调函数。
  2. basicConsume: 开始消费队列中的消息。

6. 运行代码

  1. 首先启动RabbitMQ服务器。
  2. 运行MessageProducer类,发送消息。
  3. 运行MessageConsumer类,接收消息。

你应该会看到类似以下的输出:

Producer Output:

 [x] Sent 'Hello World!'

Consumer Output:

 [*] Waiting for messages. To exit press CTRL+C[x] Received 'Hello World!'

7. 总结

通过以上步骤,我们成功地在RabbitMQ中实现了点对点通讯。消息生产者将消息发送到队列,而消息消费者从队列中接收消息。这种模式非常适合需要确保消息只被一个消费者处理一次的场景。


http://www.ppmy.cn/server/150163.html

相关文章

java配置环境变量 jdk配置环境变量 linux环境

一. 使用yum安装jdk centos系统中可以使用yum来安装jdk, 执行以下命令 查询yum仓库信息 yum list java* java-1.8.0-openjdk.x86_64 1:1.8.0.422.b05-1.1.al7 updates java-1.8.0-openjdk-devel.x86_64 …

使用Vue.js的步骤

使用Vue.js开发一个应用的详细流程和代码示例如下&#xff1a; 1. 环境准备和项目初始化 使用Vue CLI创建项目 Vue CLI是一个全局命令行工具&#xff0c;用于快速搭建Vue项目。首先&#xff0c;你需要安装Vue CLI&#xff1a; bash npm install -g vue/cli # 或者 yarn glo…

【解决】k8s使用flannel网络插件的问题整理

问题1: 拉取镜像失败ImagePullBackOff 问题排查 1、查看所有pod的状态 kubectl get pods --all-namespaceskube-flannel的两个pod状态为ImagePullBackOff&#xff0c;由于镜像拉取异常导致 2、查看pod启动日志&#xff0c;获取更详细的信息 kubectl logs -n 命名空间namesp…

以ATTCK为例构建网络安全知识图

ATT&CK&#xff08;Adversarial Tactics, Techniques, and Common Knowledge &#xff09;是一个攻击行为知识库和模型&#xff0c;主要应用于评估攻防能力覆盖、APT情报分析、威胁狩猎及攻击模拟等领域。本文简单介绍ATT&CK相关的背景概念&#xff0c;并探讨通过ATT&a…

Qt之截图存为图片或PDF打印(七)

Qt开发 系列文章 - Screenshot-To-PicOrPDF&#xff08;七&#xff09; 目录 前言 一、截图 二、功能实现 1.创建项目 2.新建类 3.编写类 1.定义头文件 2.相关功能函数 3.使用类 总结 前言 本文利用Qt平台&#xff0c;实现屏幕截图功能&#xff0c;并将截图复制、保…

Ubuntu中iptables默认是开启的吗

不&#xff0c;Ubuntu 中 iptables 默认不是开启的。 虽然 Ubuntu 系统默认安装了 iptables 软件包&#xff08;你可以通过 dpkg -l iptables 或 which iptables 命令来验证&#xff09;&#xff0c;但这并不意味着 iptables 规则已经生效。实际上&#xff0c;iptables 的规则…

《从零开始:轻松入门数据结构的世界》

一、为什么数据结构如此重要&#xff1f; 数据结构就像是程序的骨架&#xff0c;它决定了数据在内存中的存储方式&#xff0c;以及我们如何对这些数据进行操作。一个好的数据结构可以大大提高程序的运行效率&#xff0c;减少内存消耗。我们将通过一个简单的比喻来理解这一点&a…

《Django 5 By Example》阅读笔记:p493-p520

《Django 5 By Example》学习第 17 天&#xff0c;p493-p520 总结&#xff0c;总计 28 页。 一、技术总结 1.internationalization(国际化) vs localization(本地化) (1)18n&#xff0c;L10n&#xff0c;g11n 以前总觉得这两个缩写好难记&#xff0c;今天仔细看了下维基百科…