RabbitMQ Work Queues (工作队列模式) 使用案例

ops/2024/12/18 22:19:07/

Hi~!这里是奋斗的明志,很荣幸您能阅读我的文章,诚请评论指点,欢迎欢迎 ~~
🌱🌱个人主页:奋斗的明志
🌱🌱所属专栏:RabbitMQ

📚本系列文章为个人学习笔记,在这里撰写成文一为巩固知识,二为展示我的学习过程及理解。文笔、排版拙劣,望见谅。

在这里插入图片描述

Work Queues 工作队列

  • 前言
  • Work Queues (工作队列)
    • 1、引入依赖
    • 2、编写生产者代码
    • 3、编写消费者代码
    • 4、运行程序, 观察结果

前言

在前面学习了简单模式的写法, 接下来学习另外几种工作模式的写法
简单模式
快速入门程序就是简单模式.
在这里插入图片描述

默契之舞 之 生产者消费者模式(RabbitMQ)

Work Queues (工作队列)

简单模式的增强版, 和简单模式的区别就是: 简单模式有一个消费者, 工作队列模式支持多个消费者接收消息, 消费者之间是竞争关系, 每个消息只能被一个消费者接收

在这里插入图片描述

1、引入依赖


<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>

2、编写生产者代码

工作队列模式和简单模式区别是有多个消费者, 所以生产者消费者代码差异不大
相比简单模式, 生产者的代码基本⼀样, 为了能看到多个消费者竞争的关系, 我们一次发送10条消息
我们把发送消息的地方, 改为一发送10条消息


for (int i = 0; i < 10; i++) {String mag = "hello work queue......" + i;channel.basicPublish("", Constants.WORK_QUEUE, null, mag.getBytes());
}

Constant 包下 Constants 类

package rabbitmq.constant;public class Constants {public static final String HOST = "123.57.16.61";public static final Integer PORT = 5672;public static final String USERNAME = "study";public static final String PASSWORD = "study";public static final String VIRTUAL_HOST = "bite";//声明一个工作队列public static final String WORK_QUEUE = "work.queue";
}

完整代码:

package rabbitmq.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 工作队列模式*/
public class Prooducer {public static void main(String[] args) throws IOException, TimeoutException {//创建一个连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);//创建一个新的连接Connection connection = connectionFactory.newConnection();//开启一个通信Channel channel = connection.createChannel();//声明交换机,使用内置的交换机//声明一个队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//发送消息for (int i = 0; i < 10; i++) {String mag = "hello work queue......" + i;channel.basicPublish("", Constants.WORK_QUEUE, null, mag.getBytes());}System.out.println("消息发送成功~");//资源释放channel.close();connection.close();}
}

在这里插入图片描述

3、编写消费者代码

消费者代码和简单模式⼀样, 只是复制两份. 两个消费者代码可以是⼀样的

消费者1:

package rabbitmq.work;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//创建一个连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);//创建一个新的连接Connection connection = connectionFactory.newConnection();//开启一个通信Channel channel = connection.createChannel();//声明一个队列,如果队列不存在,则创建,如果队列存在,则不创建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//消费消息//消费的逻辑DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接受到消息:" + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);
//        channel.close();
//        connection.close();}
}

在这里插入图片描述

消费者2:

package rabbitmq.work;import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//创建一个连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);//创建一个新的连接Connection connection = connectionFactory.newConnection();//开启一个通信Channel channel = connection.createChannel();//声明一个队列,如果队列不存在,则创建,如果队列存在,则不创建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//消费消息//消费的逻辑DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接受到消息:" + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);
//        channel.close();
//        connection.close();}
}

4、运行程序, 观察结果

先启动两个消费者运行, 再启动生产者
如果先启动⽣产者, 在启动消费者, 由于消息较少, 处理较快, 那么第⼀个启动的消费者就会瞬间把10条
消息消费掉, 所以我们先启动两个消费者, 再启动生产者

在这里插入图片描述


在这里插入图片描述


观察RabbitMQ客户端:

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述

启动生产者:

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述

在这里插入图片描述

在这里插入图片描述


http://www.ppmy.cn/ops/143011.html

相关文章

【图像处理lec3、4】空间域的图像增强

目录 1. 空间域图像增强的背景与目标 2. 空间域处理的数学描述 3. 灰度级变换 4. 幂律变换&#xff08;Power-Law Transformation&#xff09; 5、 分段线性变换 Case 1: 对比度拉伸 Case 2: 灰度切片 Case 3: 按位切片 6、对数变换&#xff08;Logarithmic Transform…

selenium自动爬虫工具

一、介绍selenium爬虫工具 selenium 是一个自动化测试工具&#xff0c;可以用来进行 web 自动化测试、爬虫 selenium 本质是通过驱动浏览器&#xff0c;完全模拟浏览器的操作&#xff0c;比如跳转、输入、点击、下拉等&#xff0c;来拿到网页渲染之后的结果&#xff0c;可支持…

SSE(Server-Sent Events)主动推送消息

说明 使用Java开发web应用&#xff0c;大多数时候我们提供的接口返回数据都是一次性完整返回。有些时候&#xff0c;我们也需要提供流式接口持续写出数据&#xff0c;以下提供一种简单的方式。 SSE&#xff08;Server-Sent Events&#xff09; SSE 是一种允许服务器单向发送事…

Chinese-Clip实现以文搜图和以图搜图

本文不生产技术&#xff0c;只做技术的搬运工&#xff01; 前言 目前网上能够找到的资料有限&#xff0c;要么收费&#xff0c;要么配置复杂&#xff0c;作者主打一个一毛不拔&#xff0c;决定自己动手实现一个&#xff0c;功能清单受启发于Nidia AI lab实验室的nanodb项目&am…

无人机飞行服务技术详解

无人机飞行服务技术涵盖了多个方面&#xff0c;以下是对其关键技术的详细解析&#xff1a; 一、无人机类型与特点 无人机根据其设计和功能的不同&#xff0c;主要分为以下几种类型&#xff1a; 1. 多旋翼无人机&#xff1a;最常见的无人机类型&#xff0c;通过多个电机和螺旋…

有没有办法让爬虫更加高效,比如多线程处理?

要让Python爬虫更加高效&#xff0c;确实可以采用多线程处理。多线程可以显著提高爬虫的效率&#xff0c;因为它允许程序同时执行多个任务&#xff0c;从而减少等待时间。以下是一些提高爬虫效率的方法&#xff0c;特别是通过多线程技术&#xff1a; 1. 多线程爬虫 多线程爬虫…

[【C++算法】43.分治_快排_颜色分类(过渡)

文章目录 题目链接&#xff1a;题目描述&#xff1a;解法C 算法代码&#xff1a; 题目链接&#xff1a; 75. 颜色分类 题目描述&#xff1a; 解法 分治&#xff1a;就是分而治之 这题作为过渡&#xff0c;不使用分治。 解法&#xff1a;三指针 C 算法代码&#xff1a; class …

wazuh-modules-sca-scan

sca模块主函数wm_sca_main -> wm_sca_start 检查policy文件中的每一个项目wm_sca_check_policy static int wm_sca_check_policy(const cJSON * const policy, const cJSON * const checks, OSHash *global_check_list) {if(!policy) {return 1;}const cJSON * const id c…