引入依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version>
</dependency>
编写生产者代码
RabbitMQ 默认的⽤于客户端连接的 TCP 端⼝号是 5672, 需要提前进⾏开放
代码及其注解
package rabbitmq.producer;/*** Created with IntelliJ IDEA.* Description:* User: wuyulin* Date: 2024-12-10* Time: 15:12*/import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 生产者* */
public class ProducerDemo {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接⼯⼚ConnectionFactory factory = new ConnectionFactory();//2. 设置参数factory.setHost("192.168.66.129");//ip 默认值localhostfactory.setPort(5672); //默认值 5672factory.setVirtualHost("wuyulin");//虚拟机名称, 默认 /factory.setUsername("wuyulin");//⽤户名,默认guestfactory.setPassword("wuyulin");//密码, 默认guest//3. 创建连接ConnectionConnection connection = factory.newConnection();//4. 创建channel通道Channel channel = connection.createChannel();//5.声明交换机 使用内置的交换机//当⼀个新的RabbitMQ节点启动时,它会预声明(declare)⼏个内置的交换机,内置交换机名称是空字符串("").// ⽣产者发送的消息会根据队列名称直接路由到对应的队列.//6.声明队列/*queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)1.queue: 队列名称2.durable: 是否持久化.true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失。3.exclusive:* 是否独占, 只能有⼀个消费者监听队列* 当Connection关闭时, 是否删除队列4.autoDelete: 是否⾃动删除, 当没有Consumer时, ⾃动删除掉5.arguments: ⼀些参数*/
//如果没有⼀个hello 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建channel.queueDeclare("hello",true,false,false,null);//7.通过channel发送消息到队列中/*
basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
1.exchange: 交换机名称, 简单模式下, 交换机会使⽤默认的""
2.routingKey: 路由名称, routingKey = 队列名称
3.props: 配置信息
4.body: 发送消息的数据*/String msg = "Hello RabbitMQ";channel.basicPublish("","hello",null,msg.getBytes());System.out.println(msg + "消息发送成功");//释放资源//显式地关闭Channel是个好习惯, 但这不是必须的, Connection关闭的时候,Channel也会⾃动关闭.channel.close();connection.close();}
}
注意事项
当⼀个新的 RabbitMQ 节点启动时,它会预声明(declare)⼏个内置的交换机,内置交换机名称是空字符串("").⽣产者发送的消息会根据队列名称直接路由到对应的队列.
例如:如果有⼀个名为 "hello" 的队列,⽣产者可以直接发送消息到 "hello" 队列,⽽消费者可以从 "hello" 队列中接收消息,⽽不需要关⼼交换机的存在.这种模式⾮常适合简单的应⽤场景,其中⽣产者和消费者之间的通信是⼀对⼀的.
运行代码,观察结果
运⾏之前
运⾏之后,队列中就已经有了 hello 这个队列的信息
可以点击 “hello” 这个队列名称,进入详情页,查看队列中的数据
如果在代码中注掉资源释放的代码,在 Connections 和 Channels 也可以看到相关信息
‘Queue 也可以配置显⽰ Consumer 相关信息
编写消费者代码
消费者代码和⽣产者前3步都是⼀样的,第4步改为消费当前队列
1. 创建连接
2. 创建 Channel
3. 声明⼀个队列 Queue(
为什么消费者要声明队列,因为如果要消费的队列不存在就会报错,所以为了避免队列还未创建,消费者可以事先声明 )
4. 消费消息
5. 释放资源(
消费者相当于是⼀个监听程序, 一般不需要关闭资源
)
代码及其注解
package rabbitmq.producer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;/*** Created with IntelliJ IDEA.* Description:* User: wuyulin* Date: 2024-12-13* Time: 10:46*/
public class ConsumeDemo {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// 1. 创建连接⼯⼚ConnectionFactory factory = new ConnectionFactory();//2. 设置参数factory.setHost("192.168.66.129");//ip 默认值localhostfactory.setPort(5672); //默认值 5672factory.setVirtualHost("wuyulin");//虚拟机名称, 默认 /factory.setUsername("wuyulin");//⽤户名,默认guestfactory.setPassword("wuyulin");//密码, 默认guest//3. 创建连接ConnectionConnection connection = factory.newConnection();//4. 创建channel通道Channel channel = connection.createChannel();//5.声明队列(为什么消费者要声明队列,因为如果要消费的队列不存在就会报错,所以为了避免队列还未创建,消费者可以事先声明)/*queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)1.queue: 队列名称2.durable: 是否持久化.true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失。3.exclusive:* 是否独占, 只能有⼀个消费者监听队列* 当Connection关闭时, 是否删除队列4.autoDelete: 是否⾃动删除, 当没有Consumer时, ⾃动删除掉5.arguments: ⼀些参数*/
//如果没有⼀个hello 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建channel.queueDeclare("hello",true,false,false,null);//6. 接收消息, 并消费/*basicConsume(String queue, boolean autoAck, Consumer callback)参数:1. queue: 队列名称2. autoAck: 是否⾃动确认, 消费者收到消息之后,⾃动和MQ确认3. callback: 回调对象Consumer ⽤于定义消息消费者的⾏为.当我们需要从RabbitMQ接收消息时,需要提供⼀个实现了Consumer 接⼝的对象.
DefaultConsumer 是RabbitMQ提供的⼀个默认消费者,实现了 Consumer 接⼝.*/DefaultConsumer consumer = new DefaultConsumer(channel) {/*回调⽅法, 当收到消息后, 会⾃动执⾏该⽅法在这个⽅法中,我们可以定义如何处理接收到的消息,例如打印消息内容,处理业务逻辑或者将消息存储到数据库等.1. consumerTag: 标识2. envelope: 获取⼀些信息, 交换机, 路由key3. properties:配置信息4. body:数据*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume("hello", true, consumer);//等待回调函数执⾏完毕之后, 关闭资源TimeUnit.SECONDS.sleep(2);//7. 释放资源 消费者相当于是⼀个监听程序, 不需要关闭资源//顺序不可改变
// channel.close();
// connection.close();}
}
运行代码,观察结果
运⾏程序,我们刚才发送的消息,就收到了
如果我们不释放资源,可以看到响应的 Connection,channel
并且可以看到 hello 队列中的数据被消费者全部取出