目录
前言介绍
(1)启动RabbitMQ
(2)账户管理
一、简单模式
(1)概念
(2)生产者代码
(3)消费者代码
二、工作队列模式
(1)概念
(1)生产者代码
(2)消费者代码
三、发布订阅模式
(1)概念
(2)生产者代码
(3)消费者代码
四、路由模式
(1)概念
(2)生产者代码
(3)消费者代码
五、通配符模式
(1)概念
(2)生产者代码
(3)消费者代码
前言介绍
前言:想要进行RabbitMQ的工作模式,首先你得先启动RabbitMQ并同时进行账号管理
(1)启动RabbitMQ
为了可以正常启动RabbitMQ,需要将防火墙关闭 #查看防火墙的运行状态 firewall-cmd --state #关闭正在运行的防火墙 systemctl stop firewalld.service #禁止防火墙自启动 systemctl disable firewalld.service #开启管控台插件 rabbitmq-plugins enable rabbitmq_managment #启动rabbitmq rabbitmq-server -detached #停止rabbitmq rabbitmqctl stop #重启rabbitmq rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl start_app通过管控台访问rabbitmq,在网址栏输入http://IP地址:15672
(2)账户管理
1:创建账户 rabbitmqctl add_user 用户名 密码2:给用户创建管理员角色 rabbitmqctl set_user_tags 用户名 administrator3:给用户授权 "/" 表示虚拟机 "." "." "." 表示完整权限rabbitmqctl set_permissions -p "/" 用户名 "." "." "."
一、简单模式
(1)概念
简单工作模式的特点:
(1)一个生产者对应一个消费者,通过队列进行消息的传递
(2)该模式使用的是direct交换机,也是默认的交换机
(2)生产者代码
package com.itbaizhan.simple;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂ConnectionFactory connectionFactory=new ConnectionFactory();//自己的服务器IP地址connectionFactory.setHost("192.168.66.130");//rabbitmq的端口号connectionFactory.setPort(5672);//用户名和密码connectionFactory.setUsername("root");connectionFactory.setPassword("root");//虚拟主机路径connectionFactory.setVirtualHost("/");//建立连接Connection connection= connectionFactory.newConnection();//建立信道Channel channel=connection.createChannel();//创建队列//参数一是自己起的队列名//参数二是是否持久化,true表示重启rabbitmq时队列还在//参数三表示是否私有化,false表示所有的消费者都可以访问,true表示还有第一次拥有他的消费者可以访问//参数四表示是否自动删除,true表示不再使用队列时自动删除队列//参数五表示额外参数channel.queueDeclare("simple_queue",false,false,false,null);//发送信息String m="再一次的说,我是简单模式 ";//参数一表示交换机名,简单模式可不写//参数二表示路由键,简单模式就是队列名//参数三表示其他额外参数//参数四表示要传递的消息的字节数组channel.basicPublish("","simple_queue",null,m.getBytes(StandardCharsets.UTF_8));//关闭信道和连接channel.close();connection.close();System.out.println("====发送成功=====");} }
(3)消费者代码
package com.itbaizhan.simple;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {//建立连接工厂ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setHost("192.168.66.130");connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");//创建连接Connection connection=connectionFactory.newConnection();//创建信道Channel channel=connection.createChannel();//监听队列//参数一表示监听的队列名//参数二表示是否自动签收,false表示需要手动确认消息已经收到//参数三表示consumer的实现类,重写该方法表示接收到消息要做的事情channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message=new String(body,"UTF-8");System.out.println("接收到的消息="+message);}});} }
二、工作队列模式
(1)概念
与简单模式相比,工作模式多了一些消费者,该模式使用的是direct
交换机,应用消息较多的情况特点:
(1)一个队列对应多个消费者
(2)一条消息只会被一个消费者消费
(3)消息队列默认采用轮询的方式将消息平均发送给消费者,详细点说就是队列中的信息一个一个排队给消费者,其中的消息只有一次(不懂可以看代码展示)
(1)生产者代码
package com.itbaizhan.work;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setHost("192.168.66.130");connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setVirtualHost("/");connectionFactory.setPort(5672);//创建连接Connection connection=connectionFactory.newConnection();//建立信道Channel channel=connection.createChannel();//创建队列channel.queueDeclare("work_queue",true,false,false,null);//发送信息for(int i=1;i<=100;i++){//参数三表示的是持久化信息,即除了保存到内存中还会保存到磁盘中channel.basicPublish("","work_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,("现在是第"+i+"条信息").getBytes(StandardCharsets.UTF_8));}//关闭资源channel.close();connection.close();} }
(2)消费者代码
package com.itbaizhan.work;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;//工作模式消费者1 public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setPort(5672);connectionFactory.setHost("192.168.66.130");connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setVirtualHost("/");//创建连接Connection connection=connectionFactory.newConnection();//创建信道Channel channel=connection.createChannel();//接受信息channel.basicConsume("work_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1号接受="+new String(body,"UTF-8"));}});} }//后面的消费者2和3与1一样
三、发布订阅模式
(1)概念
一些消息需要不同的消费者进行不同的处理,如电商网站中消息的发送有邮件,
短信,弹窗等等,这就用到了订阅模式特点
(1)生产者将信息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中
(2)工作模式中的交换机(direct交换机)只能将交换机发送给一个
队列,而订阅模式的交换机可以发送给多个队列。
(3)该模式的交换机使用fanout交换机
(2)生产者代码
package com.itbaizhan.publish;import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;//发布订阅模式生产者 public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setVirtualHost("/");connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setHost("192.168.66.130");connectionFactory.setPort(5672);//建立连接Connection connection=connectionFactory.newConnection();Channel channel=connection.createChannel();//创建交换机//参数一是交换机名//参数二是交换机类型//参数三是交换机持久化channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true);//创建队列channel.queueDeclare("send1",true,false,false,null);channel.queueDeclare("send2",true,false,false,null);channel.queueDeclare("send3",true,false,false,null);//交换机绑定队列//参数一是队列名//参数二是交换机名//参数三是路由关键字,订阅模式使用""channel.queueBind("send1","exchange_fanout","");channel.queueBind("send2","exchange_fanout","");channel.queueBind("send3","exchange_fanout","");//发送信息for(int i=1;i<=100;i++){//参数二是路由键channel.basicPublish("exchange_fanout"," ",null,("这是第"+i+"条信息").getBytes(StandardCharsets.UTF_8));}channel.close();connection.close();} }
(3)消费者代码
package com.itbaizhan.publish;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Consumer_send1 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setVirtualHost("/");connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setPort(5672);connectionFactory.setHost("192.168.66.130");//建立连接Connection connection=connectionFactory.newConnection();Channel channel=connection.createChannel();channel.basicConsume("send1",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("send1的队列="+new String(body,"Utf-8"));}});} }//send2和send3如上一致
四、路由模式
(1)概念
使用发布订阅模式时,所有消息都活发送到绑定的队列中,
但是我们并不是希望所有的消息都无差别的发送到所有队列中,
比如一个活动中,有些消息需要邮件发送,有些需要短信发送,
而路由模式便是如此。特点:
(1)每个队列绑定路由关键字rountingkey
(2)生产者将带有rountingkey的消息发送给交换机,交换机再根据
rountingkey转发到指定队列。(该模式使用direct交换机)
(2)生产者代码
package com.itbaizhan.rounter;import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setHost("192.168.66.130");connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setVirtualHost("/");connectionFactory.setPort(5672);Connection connection=connectionFactory.newConnection();Channel channel=connection.createChannel();//创建交换机channel.exchangeDeclare("exchange_rounter", BuiltinExchangeType.DIRECT,true);//创建队列channel.queueDeclare("mail1",true,false,false,null);channel.queueDeclare("mail2",true,false,false,null);channel.queueDeclare("mail3",true,false,false,null);//交换机绑定队列//参数三表示路由关键字channel.queueBind("mail1","exchange_rounter","import");channel.queueBind("mail2","exchange_rounter","import");channel.queueBind("mail3","exchange_rounter","normal");//发送信息channel.basicPublish("exchange_rounter","import",null,"我是import".getBytes(StandardCharsets.UTF_8));channel.basicPublish("exchange_rounter","normal",null,"我是normal".getBytes(StandardCharsets.UTF_8));//关闭资源channel.close();connection.close();} }
(3)消费者代码
package com.itbaizhan.rounter;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Consumer_mail1 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setHost("192.168.66.130");connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setVirtualHost("/");connectionFactory.setPort(5672);//建立连接Connection connection=connectionFactory.newConnection();Channel channel=connection.createChannel();//监听队列channel.basicConsume("mail1",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("mail1队列="+new String(body,"UTF-8"));}});} } //监听mail2和mail3与上基本一致
五、通配符模式
(1)概念
通配符模式是在路由模式的基础上,给队列绑定带通配符的
路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消
息转发到该队列。通配符模式比路由模式更灵活
(该模式使用topic交换机)
通配符规则:消息设置RoutingKey时,RoutingKey由多个单词构成,中间以.分割。
队列设置RoutingKey时,#可以匹配任意多个单词,*可以匹配任意一个单词。
注意:通配符和路由模式都是根据指定规则或者关键字绑定到对应队列,是队列!而不是消费者!
(2)生产者代码
package com.itbaizhan.topic;import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setHost("192.168.66.130");connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setVirtualHost("/");connectionFactory.setPort(5672);//建立连接Connection connection=connectionFactory.newConnection();Channel channel=connection.createChannel();//创建交换机channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true);//创建队列channel.queueDeclare("em1",true,false,false,null);channel.queueDeclare("em2",true,false,false,null);channel.queueDeclare("em3",true,false,false,null);//交换机绑定队列channel.queueBind("em1","exchange_topic","#.import.#");channel.queueBind("em2","exchange_topic","#.normal.#");channel.queueBind("em3","exchange_topic","#.low.#");//发送信息channel.basicPublish("exchange_topic","import.noraml",null,"我是一号".getBytes(StandardCharsets.UTF_8));channel.basicPublish("exchange_topic","import.low",null,"我是二号".getBytes(StandardCharsets.UTF_8));channel.basicPublish("exchange_topic","import.low.normal",null,"我是全部".getBytes(StandardCharsets.UTF_8));//关闭资源channel.close();connection.close();} }
(3)消费者代码
package com.itbaizhan.topic;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Consumer_em1{public static void main(String[] args) throws IOException, TimeoutException {//建立连接工厂ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setHost("192.168.66.130");connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");//创建连接Connection connection=connectionFactory.newConnection();//创建信道Channel channel=connection.createChannel();//监听队列//参数一表示监听的队列名//参数二表示是否自动签收,false表示需要手动确认消息已经收到//参数三表示consumer的实现类,重写该方法表示接收到消息要做的事情channel.basicConsume("em1",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message=new String(body,"UTF-8");System.out.println("em1队列的消息="+message);}});} } //em2队列和em3队列与上基本一致