【RabbitMQ】Java操作RabbitMQ之入门Demo

news/2024/11/28 5:40:30/

目录

一、项目创建

二、生产者

三、消费者

一、项目创建

我们先在idea里创建两个Maven项目一个项目作为生产者,另一个作为消费者。创建完成后,在各自的pom.xml文件里引入Java使用RabbitMQ的依赖

        <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.14.2</version></dependency>

然后在对应的项目里创建 消费者类与生产者类

二、生产者

由之前文章里我们了解到RabbitMQ的通信如下图,我们要想将生产者生产的消息存入队列,我们就一个先获得Connection(连接)然后通过连接获取到channel,然后选择虚拟机交换机以及队列等最后关闭连接【RabbitMQ】RabbitMQ的简介_1373i的博客-CSDN博客icon-default.png?t=N2N8https://blog.csdn.net/qq_61903414/article/details/130139970?spm=1001.2014.3001.5501

 我们获取连接是通过连接工厂进行获取的所以此时我们先要去创建连接工厂并给他配置相应的信息

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1.获取配置连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setVirtualHost("/DemoVirtualHost");factory.setUsername("guest");factory.setPassword("guest");}
}

此时我们可以通过连接工厂获取到连接connection,然后获取到channel

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1.获取配置连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setVirtualHost("/DemoVirtualHost");factory.setUsername("guest");factory.setPassword("guest");// 2.建立连接Connection connection = factory.newConnection();Channel channel = connection.createChannel();}
}

此时我们需要去在MQ里创建队列,通过channel里的queueDelclare方法来创建该方法有以下参数

参数说明
queue要创建队列的名称
durable该队列里的消息是否持久化
exclusive1》是否只允许一个消费者监听消费2》当连接关闭时是否销毁该队列
autoDelete没有consumer是否自动删除
arguments在后续文章里会详细讲到
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1.获取配置连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setVirtualHost("/DemoVirtualHost");factory.setUsername("guest");factory.setPassword("guest");// 2.建立连接Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 3.创建队列channel.queueDeclare("EmailQueue",true,false,false,null);}
}

此时我们就可以通过channel里的basicPublish方法进行发送消息,该方法有以下参数

参数说明
exchange交换机(后续详细讲到,此处使用默认交换机)
routingKey路由名称(后续讲到,由于此处使用默认交换机所以路由名称为队列名)
props相关的配置信息
body要发送的消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1.获取配置连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setVirtualHost("/DemoVirtualHost");factory.setUsername("guest");factory.setPassword("guest");// 2.建立连接Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 3.创建队列channel.queueDeclare("EmailQueue",true,false,false,null);// 4.发送消息channel.basicPublish("","EmailQueue",null,"hello mq".getBytes());// 5.关闭连接channel.close();connection.close();}
}

 发送完成后关闭连接即可,运行程序后此时mq队列里存在了一条消息

 

三、消费者

在实现消费者时,我们也需要先建立连接所以前个步骤与生产者相同

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1.获取配置连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setVirtualHost("/DemoVirtualHost");factory.setUsername("guest");factory.setPassword("guest");// 2.建立连接Connection connection = factory.newConnection();Channel channel = connection.createChannel();}
}

此时建立连接后我们通过basicConsume该方法进行获取消息,参数如下

参数说明
queue从哪个队列获取消息
autoAck获取到信息后是否自动给MQ服务器发送确认收到
callback收到消息后的回调对象,需要手动实现
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1.获取配置连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setVirtualHost("/DemoVirtualHost");factory.setUsername("guest");factory.setPassword("guest");// 2.建立连接Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 3.获取消息channel.basicConsume("EmailQueue",true,consumer);}
}

我们此时还需要去手动实现回调对象,通过匿名内部类实现回调对象的回调方法

import com.rabbitmq.client.*;
import com.rabbitmq.client.Consumer;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1.获取配置连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setVirtualHost("/DemoVirtualHost");factory.setUsername("guest");factory.setPassword("guest");// 2.建立连接Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 3.获取消息Consumer consumer = new DefaultConsumer(channel) {/*** 回调方法 收到信息后自动执行该方法* @param consumerTag 消息唯一标识* @param envelope    获取信息* @param properties  获取配置信息* @param body        消息* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag :" + consumerTag);System.out.println("exchange :" + envelope.getExchange());System.out.println("routingKey ;" + envelope.getRoutingKey());System.out.println("properties :"+ properties);System.out.println("consumer 消费消息 :" + new String(body));}};channel.basicConsume("EmailQueue",true,consumer);}
}

运行代码此时就可以获得存入的消息

 


http://www.ppmy.cn/news/46344.html

相关文章

记录一下verilog重复例化的两种方式

文章目录 0 前言1 for循环方式例化方法2 数组的方式例化4 一些其他的技巧 0 前言 这段时间例化了挺多mem&#xff0c;过程中也了解到了一些新的东西&#xff0c;在这里记录一下 1 for循环方式例化方法 先给出 sub_module module sub(input [7:0] din,output logic [7:0] do…

自定义OAuth2组件实现对授权码登录模式的封装

文章目录 一、OAuth2简介二、授权码模式执行流程1、网站接入开放平台2、设置开放平台登录按钮3、请求令牌和用户信息 三、存在问题1、攻击流程2、漏洞分析 四、组件封装1、AuthUrls2、AuthRequest3、AuthPlatformConfig4、DefaultAuthRequest 一、OAuth2简介 所谓OAuth2其实就…

代码随想录算法训练营第四十九天-动态规划10|121. 买卖股票的最佳时机 , 122.买卖股票的最佳时机II

买卖股票的最佳时机这两道题&#xff0c;对应着两者不同的处理过程&#xff0c;第一种是从头到尾只能交易一次&#xff0c;也就是买入一次和卖出一次&#xff0c;第二种是可以多次买卖。121买卖股票的最佳时机是只交易一次的。这时候需要用二维数组进行定义&#xff0c;dp[i][0…

开心档之C++ 信号处理

C 信号处理 目录 C 信号处理 signal() 函数 实例 raise() 函数 实例 信号是由操作系统传给进程的中断&#xff0c;会提早终止一个程序。在 UNIX、LINUX、Mac OS X 或 Windows 系统上&#xff0c;可以通过按 CtrlC 产生中断。 有些信号不能被程序捕获&#xff0c;但是下表…

我学习网络安全的心得

我的学习心得&#xff0c;我认为能不能自学成功的要素有两点。 第一点就是自身的问题&#xff0c;虽然想要转行学习安全的人很多&#xff0c;但是非常强烈的想要转行学好的人是小部分。而大部分人只是抱着试试的心态来学习安全&#xff0c;这是完全不可能的。 所以能不能学成并…

进制基础知识

进制概述&#xff1a; 指进位制&#xff0c;是一种计数方式&#xff0c;也称为进位计数法或位值计数法。 十进制&#xff1a;0&#xff0c;1&#xff0c;2&#xff0c;3&#xff0c;4&#xff0c;5&#xff0c;6&#xff0c;7&#xff0c;8&#xff0c;9 R进制&#xff1a;由…

RT-Thread GD32F4xx I2C之硬件I2C驱动(eeprom)

目录 1、I2C的驱动框架1.1 I2C的驱动框架层介绍1.2 I2C的两种驱动方法1.3 I2C总线设备结构2、硬件I2C驱动开发2.1 实现操作方法rt_i2c_bus_device_ops2.1.1 I2C设备结构定义2.1.2 i2c设备定义2.1.3 GD32F4xx i2c eeprom write函数2.1.4 GD32F4xx i2c eeprom read函数2.1.5 rt_i…

Cesium实践(3)——坐标系与相机系统

文章目录 前言Cesium 坐标系屏幕坐标系空间直角坐标系WGS-84坐标系 坐标系转换角度与弧度互转84坐标转笛卡尔空间坐标笛卡尔空间坐标转84坐标屏幕坐标转笛卡尔空间坐标笛卡尔空间直角坐标转屏幕坐标 Cesium相机默认交互相机姿态参数常用方法DEFAULT_VIEW_RECTANGLEsetViewflyTo…