RabbitMQ的五种工作模式

news/2024/11/29 4:40:43/

目录

前言介绍

(1)启动RabbitMQ

(2)账户管理

一、简单模式

(1)概念

(2)生产者代码

(3)消费者代码

二、工作队列模式

(1)概念

(1)生产者代码

(2)消费者代码

三、发布订阅模式

(1)概念

(2)生产者代码

(3)消费者代码

四、路由模式

(1)概念

(2)生产者代码

(3)消费者代码

五、通配符模式

(1)概念

(2)生产者代码

(3)消费者代码


前言介绍

前言:想要进行RabbitMQ的工作模式,首先你得先启动RabbitMQ并同时进行账号管理

(1)启动RabbitMQ

为了可以正常启动RabbitMQ,需要将防火墙关闭
#查看防火墙的运行状态
firewall-cmd --state
#关闭正在运行的防火墙
systemctl stop firewalld.service
#禁止防火墙自启动
systemctl disable firewalld.service
#开启管控台插件
rabbitmq-plugins enable rabbitmq_managment
#启动rabbitmq
rabbitmq-server -detached
#停止rabbitmq
rabbitmqctl stop
#重启rabbitmq
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app通过管控台访问rabbitmq,在网址栏输入http://IP地址:15672

(2)账户管理

1:创建账户
rabbitmqctl add_user 用户名 密码2:给用户创建管理员角色
rabbitmqctl set_user_tags 用户名 administrator3:给用户授权
"/" 表示虚拟机
"." "." "." 表示完整权限rabbitmqctl set_permissions -p "/" 用户名 "." "." "."

一、简单模式

(1)概念

 简单工作模式的特点:

(1)一个生产者对应一个消费者,通过队列进行消息的传递

(2)该模式使用的是direct交换机,也是默认的交换机

(2)生产者代码

package com.itbaizhan.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂ConnectionFactory connectionFactory=new ConnectionFactory();//自己的服务器IP地址connectionFactory.setHost("192.168.66.130");//rabbitmq的端口号connectionFactory.setPort(5672);//用户名和密码connectionFactory.setUsername("root");connectionFactory.setPassword("root");//虚拟主机路径connectionFactory.setVirtualHost("/");//建立连接Connection connection= connectionFactory.newConnection();//建立信道Channel channel=connection.createChannel();//创建队列//参数一是自己起的队列名//参数二是是否持久化,true表示重启rabbitmq时队列还在//参数三表示是否私有化,false表示所有的消费者都可以访问,true表示还有第一次拥有他的消费者可以访问//参数四表示是否自动删除,true表示不再使用队列时自动删除队列//参数五表示额外参数channel.queueDeclare("simple_queue",false,false,false,null);//发送信息String m="再一次的说,我是简单模式 ";//参数一表示交换机名,简单模式可不写//参数二表示路由键,简单模式就是队列名//参数三表示其他额外参数//参数四表示要传递的消息的字节数组channel.basicPublish("","simple_queue",null,m.getBytes(StandardCharsets.UTF_8));//关闭信道和连接channel.close();connection.close();System.out.println("====发送成功=====");}
}

(3)消费者代码

package com.itbaizhan.simple;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {//建立连接工厂ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setHost("192.168.66.130");connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");//创建连接Connection connection=connectionFactory.newConnection();//创建信道Channel channel=connection.createChannel();//监听队列//参数一表示监听的队列名//参数二表示是否自动签收,false表示需要手动确认消息已经收到//参数三表示consumer的实现类,重写该方法表示接收到消息要做的事情channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message=new String(body,"UTF-8");System.out.println("接收到的消息="+message);}});}
}

二、工作队列模式

(1)概念

与简单模式相比,工作模式多了一些消费者,该模式使用的是direct
交换机,应用消息较多的情况

特点:
(1)一个队列对应多个消费者
(2)一条消息只会被一个消费者消费
(3)消息队列默认采用轮询的方式将消息平均发送给消费者,详细点说就是队列中的信息一个一个排队给消费者,其中的消息只有一次(不懂可以看代码展示)

 

(1)生产者代码

package com.itbaizhan.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setHost("192.168.66.130");connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setVirtualHost("/");connectionFactory.setPort(5672);//创建连接Connection connection=connectionFactory.newConnection();//建立信道Channel channel=connection.createChannel();//创建队列channel.queueDeclare("work_queue",true,false,false,null);//发送信息for(int i=1;i<=100;i++){//参数三表示的是持久化信息,即除了保存到内存中还会保存到磁盘中channel.basicPublish("","work_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,("现在是第"+i+"条信息").getBytes(StandardCharsets.UTF_8));}//关闭资源channel.close();connection.close();}
}

(2)消费者代码

package com.itbaizhan.work;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;//工作模式消费者1
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setPort(5672);connectionFactory.setHost("192.168.66.130");connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setVirtualHost("/");//创建连接Connection connection=connectionFactory.newConnection();//创建信道Channel channel=connection.createChannel();//接受信息channel.basicConsume("work_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1号接受="+new String(body,"UTF-8"));}});}
}//后面的消费者2和3与1一样

 

三、发布订阅模式

(1)概念

一些消息需要不同的消费者进行不同的处理,如电商网站中消息的发送有邮件,
短信,弹窗等等,这就用到了订阅模式

特点
(1)生产者将信息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中
(2)工作模式中的交换机(direct交换机)只能将交换机发送给一个
队列,而订阅模式的交换机可以发送给多个队列。
(3)该模式的交换机使用fanout交换机

(2)生产者代码

package com.itbaizhan.publish;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;//发布订阅模式生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setVirtualHost("/");connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setHost("192.168.66.130");connectionFactory.setPort(5672);//建立连接Connection connection=connectionFactory.newConnection();Channel channel=connection.createChannel();//创建交换机//参数一是交换机名//参数二是交换机类型//参数三是交换机持久化channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true);//创建队列channel.queueDeclare("send1",true,false,false,null);channel.queueDeclare("send2",true,false,false,null);channel.queueDeclare("send3",true,false,false,null);//交换机绑定队列//参数一是队列名//参数二是交换机名//参数三是路由关键字,订阅模式使用""channel.queueBind("send1","exchange_fanout","");channel.queueBind("send2","exchange_fanout","");channel.queueBind("send3","exchange_fanout","");//发送信息for(int i=1;i<=100;i++){//参数二是路由键channel.basicPublish("exchange_fanout"," ",null,("这是第"+i+"条信息").getBytes(StandardCharsets.UTF_8));}channel.close();connection.close();}
}

(3)消费者代码

package com.itbaizhan.publish;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_send1 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setVirtualHost("/");connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setPort(5672);connectionFactory.setHost("192.168.66.130");//建立连接Connection connection=connectionFactory.newConnection();Channel channel=connection.createChannel();channel.basicConsume("send1",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("send1的队列="+new String(body,"Utf-8"));}});}
}//send2和send3如上一致

 

四、路由模式

(1)概念

使用发布订阅模式时,所有消息都活发送到绑定的队列中,
但是我们并不是希望所有的消息都无差别的发送到所有队列中,
比如一个活动中,有些消息需要邮件发送,有些需要短信发送,
而路由模式便是如此。

特点:
(1)每个队列绑定路由关键字rountingkey
(2)生产者将带有rountingkey的消息发送给交换机,交换机再根据
rountingkey转发到指定队列。(该模式使用direct交换机)

(2)生产者代码

package com.itbaizhan.rounter;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setHost("192.168.66.130");connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setVirtualHost("/");connectionFactory.setPort(5672);Connection connection=connectionFactory.newConnection();Channel channel=connection.createChannel();//创建交换机channel.exchangeDeclare("exchange_rounter", BuiltinExchangeType.DIRECT,true);//创建队列channel.queueDeclare("mail1",true,false,false,null);channel.queueDeclare("mail2",true,false,false,null);channel.queueDeclare("mail3",true,false,false,null);//交换机绑定队列//参数三表示路由关键字channel.queueBind("mail1","exchange_rounter","import");channel.queueBind("mail2","exchange_rounter","import");channel.queueBind("mail3","exchange_rounter","normal");//发送信息channel.basicPublish("exchange_rounter","import",null,"我是import".getBytes(StandardCharsets.UTF_8));channel.basicPublish("exchange_rounter","normal",null,"我是normal".getBytes(StandardCharsets.UTF_8));//关闭资源channel.close();connection.close();}
}

 

(3)消费者代码

package com.itbaizhan.rounter;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_mail1 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setHost("192.168.66.130");connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setVirtualHost("/");connectionFactory.setPort(5672);//建立连接Connection connection=connectionFactory.newConnection();Channel channel=connection.createChannel();//监听队列channel.basicConsume("mail1",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("mail1队列="+new String(body,"UTF-8"));}});}
}
//监听mail2和mail3与上基本一致

五、通配符模式

(1)概念

通配符模式是在路由模式的基础上,给队列绑定带通配符的
路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消
息转发到该队列。通配符模式比路由模式更灵活
(该模式使用topic交换机)


通配符规则:

消息设置RoutingKey时,RoutingKey由多个单词构成,中间以.分割。
队列设置RoutingKey时,#可以匹配任意多个单词,*可以匹配任意一个单词。

 

注意:通配符和路由模式都是根据指定规则或者关键字绑定到对应队列,是队列!而不是消费者!

(2)生产者代码

package com.itbaizhan.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setHost("192.168.66.130");connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setVirtualHost("/");connectionFactory.setPort(5672);//建立连接Connection connection=connectionFactory.newConnection();Channel channel=connection.createChannel();//创建交换机channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true);//创建队列channel.queueDeclare("em1",true,false,false,null);channel.queueDeclare("em2",true,false,false,null);channel.queueDeclare("em3",true,false,false,null);//交换机绑定队列channel.queueBind("em1","exchange_topic","#.import.#");channel.queueBind("em2","exchange_topic","#.normal.#");channel.queueBind("em3","exchange_topic","#.low.#");//发送信息channel.basicPublish("exchange_topic","import.noraml",null,"我是一号".getBytes(StandardCharsets.UTF_8));channel.basicPublish("exchange_topic","import.low",null,"我是二号".getBytes(StandardCharsets.UTF_8));channel.basicPublish("exchange_topic","import.low.normal",null,"我是全部".getBytes(StandardCharsets.UTF_8));//关闭资源channel.close();connection.close();}
}

 

(3)消费者代码

package com.itbaizhan.topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_em1{public static void main(String[] args) throws IOException, TimeoutException {//建立连接工厂ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setHost("192.168.66.130");connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");//创建连接Connection connection=connectionFactory.newConnection();//创建信道Channel channel=connection.createChannel();//监听队列//参数一表示监听的队列名//参数二表示是否自动签收,false表示需要手动确认消息已经收到//参数三表示consumer的实现类,重写该方法表示接收到消息要做的事情channel.basicConsume("em1",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message=new String(body,"UTF-8");System.out.println("em1队列的消息="+message);}});}
}
//em2队列和em3队列与上基本一致

 


http://www.ppmy.cn/news/48740.html

相关文章

嵌入式【协议篇】CAN协议原理

一、CAN协议介绍 1、简介 CAN是控制器局域网络(Controller Area Network, CAN)的简称,是一种能够实现分布式实时控制的串行通信网络。 其实可以简单把CAN通信理解成开一场电话会议,当一个人讲话时其他人就听(广播),当多个人同时讲话时则根据一定规则来决定谁先讲话谁后讲…

leetcode551. 学生出勤记录 I

题目描述解题思路执行结果 leetcode551. 学生出勤记录 I . 题目描述 给你一个字符串 s 表示一个学生的出勤记录&#xff0c;其中的每个字符用来标记当天的出勤情况&#xff08;缺勤、迟到、到场&#xff09;。记录中只含下面三种字符&#xff1a; A&#xff1a;Absent&#xff…

OpenCV2 计算机视觉应用编程秘籍:1~5

原文&#xff1a;OpenCV2 Computer Vision Application Programming Cookbook 协议&#xff1a;CC BY-NC-SA 4.0 译者&#xff1a;飞龙 本文来自【ApacheCN 计算机视觉 译文集】&#xff0c;采用译后编辑&#xff08;MTPE&#xff09;流程来尽可能提升效率。 当别人说你没有底线…

JavaScript里实现继承的几种方式

JavaScript 中的继承可以通过以下几种方式来实现&#xff1a; 1、原型链继承&#xff1a;通过将子类的原型对象指向父类的实例来实现继承。这种方式的优点是实现简单&#xff0c;缺点是父类的私有属性和方法子类是不能访问的。 function Parent() {this.name parent;this.ag…

[算法总结] 关于字符串类型题你应该知道这些?精心汇总!!

&#x1f61a;一个不甘平凡的普通人&#xff0c;致力于为Golang社区和算法学习做出贡献&#xff0c;期待您的关注和认可&#xff0c;陪您一起学习打卡&#xff01;&#xff01;&#xff01;&#x1f618;&#x1f618;&#x1f618; &#x1f917;专栏&#xff1a;算法学习 &am…

OpenAI最新官方ChatGPT聊天插件接口《插件使用策略》全网最详细中英文实用指南和教程,助你零基础快速轻松掌握全新技术(七)(附源码)

Usage policies 使用策略 Introduction 导言Disallowed usage of our models 禁止使用我们的模型 Platform policy 平台策略Plugin policies 插件策略Changelog 更新日志其它资料下载 此插件使用策略 Updated 更新于 March 23,2023 2023年3月23日 Introduction 导言 We’ve r…

Onnx 转Ncnn

Onnx 转Ncnn 算法工程师给了onnx,需要转成ncnn才能用到安卓上去&#xff0c;步骤如下 简化onnx 算法给了.onnx后缀的文件&#xff0c;100多兆&#xff0c;太大&#xff0c;第一步&#xff0c;先简化&#xff1a; conda env list 查看conda环境&#xff08;前提是之前已经配…

学科类型-英文

标题 前言必学场景词汇及用法数学数字运算数学几何历史物理化学美术音乐语言学天文学生物学植物学生态学社会学情境常用单词数学数字运算数学几何周长历史物理化学二氧化碳美术音乐语言学天文学生物学植物学生态学