RabbitMQ应用

ops/2024/12/29 10:10:08/

1. 7种工作模式介绍

1.1  Simple(简单模式)

  • P: ⽣产者,也就是要发送消息的程序
  • C: 消费者,消息的接收者
  • Queue: 消息队列(图中⻩⾊背景部分)类似⼀个邮箱,可以缓存消息;⽣产者向其中投递消息,消费者从 其中取出消息

特点:

  • ⼀个⽣产者P,⼀个消费者C, 消息只能被消费⼀次.
  • 也称为点对点(Point-to-Point)模式.

适⽤场景:消息只能被单个消费者处理

1.2 WorkQueue(⼯作队列)

 

  • ⼀个⽣产者P,多个消费者C1,C2.
  • 在多个消息的情况下,WorkQueue会将消息分派给不同的消费者,每个消费者都会接收到不同的消息.

特点:

消息不会重复,分配给不同的消费者.

适⽤场景:集群环境中做异步处理

1.3 Publish/Subscribe(发布/订阅)

 

  • X表⽰交换机
  • ⼀个⽣产者P,多个消费者C1,C2,X代表交换机消息复制多份,每个消费者接收相同的消息
  • ⽣产者发送⼀条消息,经过交换机转发到多个不同的队列,多个不同的队列就有多个不同的消费者

适合场景:消息需要被多个消费者同时接收的场景.如:实时通知或者⼴播消息

Exchange:

作⽤:

⽣产者将消息发送到Exchange,由交换机将消息按⼀定规则路由到⼀个或多个队列中(上图中⽣产者将消息投递到队列中,实际上这个在RabbitMQ中不会发⽣.)

RabbitMQ交换机有四种类型:fanout,direct,topic,headers,不同类型有着不同的路由策略:

①Fanout:⼴播,将消息交给所有绑定到交换机的队列(Publish/Subscribe模式)

②Direct:定向,把消息交给符合指定routingkey的队列(Routing模式)

③Topic:通配符,把消息交给符合routingpattern(路由模式)的队列(Topics模式)

④headers类型的交换器不依赖于路由键的匹配规则来路由消息,⽽是根据发送的消息内容中的 headers属性进⾏匹配.headers类型的交换器性能会很差,⽽且也不实⽤,基本上不会看到它的存在.

  • Exchange(交换机)只负责转发消息,不具备存储消息的能⼒,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息就会丢失
  • RoutingKey: 路由键.⽣产者将消息发给交换器时,指定的⼀个字符串,⽤来告诉交换机应该如何处理这个消息.
  • BindingKey:绑定.RabbitMQ中通过Binding(绑定)将交换器与队列关联起来,在绑定的时候⼀般会指定⼀个BindingKey, 这样RabbitMQ就知道如何正确地将消息路由到队列了.
  • 在使⽤绑定的时候,需要的路由键是BindingKey.
  • 在发送消息的时候,需要的路由键是RoutingKey.

1.4 Routing(路由模式)

 

  • 路由模式是发布订阅模式的变种,在发布订阅基础上,增加路由key
  • Exchange根据RoutingKey的规则, 将数据筛选后发给对应的消费者队列

适合场景:需要根据特定规则分发消息的场景.

1.5 Topics(通配符模式)

 

Topics和Routing的基本原理相同

即:⽣产者将消息发给交换机,交换机根据RoutingKey将消息转发给与RoutingKey匹配的队列

不同之处是:routingKey的匹配⽅式不同,Routing模式是相等匹配,topics模式是通配符匹配.

适合场景:需要灵活匹配和过滤消息的场景

在topic类型的交换机在匹配规则上,有些要求:

①RoutingKey是⼀系列由点( . )分隔的单词,⽐如" " q uick.orange.rabbit " stock.usd.nyse ","

②BindingKey 和RoutingKey⼀样,也是点( . )分割的字符串.

③Binding Key中可以存在两种特殊字符串,⽤于模糊匹配

  •  *表⽰⼀个单词
  •  #表⽰多个单词(0-N个)

比如:

• BindingKey为"d.a.b"会同时路由到Q1和Q2

• BindingKey为"d.a.f"会路由到Q1

1.6 RPC(RPC通信)

 

  • 客⼾端发送消息到⼀个指定的队列,并在消息属性中设置replyTo字段,这个字段指定了⼀个回调队 列,⽤于接收服务端的响应.
  • 服务端接收到请求后,处理请求并发送响应消息到replyTo指定的回调队列
  • 客⼾端在回调队列上等待响应消息.⼀旦收到响应,客⼾端会检查消息的correlationId属性,以确保它是所期望的响应.

1.7 Publisher Confirms(发布确认)

 

Publisher Confirms模式是RabbitMQ提供的⼀种确保消息可靠发送到RabbitMQ服务器的机制。

在这种模式下,⽣产者可以等待RabbitMQ服务器的确认,以确保消息已经被服务器接收并处理.

  • ⽣产者将Channel设置为confirm模式(通过调⽤channel.confirmSelect()完成)后,发布的每⼀条消 息都会获得⼀个唯⼀的ID,⽣产者可以将这些序列号与消息关联起来,以便跟踪消息的状态.
  • 当消息被RabbitMQ服务器接收并处理后,服务器会异步地向⽣产者发送⼀个确认(ACK)给⽣产者 (包含消息的唯⼀ID),表明消息已经送达.

通过PublisherConfirms模式,⽣产者可以确保消息被RabbitMQ服务器成功接收,从⽽避免消息丢失 的问题.

适⽤场景:对数据安全性要求较⾼的场景.⽐如⾦融交易,订单处理

 

2. 使用案例

java">public class Constants {public static final String HOST = "119.91.154.99";public static final int PORT = 5672;public static final String USER_NAME = "xuexue";public static final String PASSWORD = "xuexue";public static final String VIRTUAL_HOST = "bit";//工作队列模式public static final String WORK_QUEUE = "work.queue";//发布订阅模式public static final String FANOUT_EXCHANGE = "fanout.exchange";public static final String FANOUT_QUEUE1 = "fanout.queue1";public static final String FANOUT_QUEUE2 = "fanout.queue2";//路由模式public static final String DIRECT_EXCHANGE = "direct.exchange";public static final String DIRECT_QUEUE1 = "direct.queue1";public static final String DIRECT_QUEUE2 = "direct.queue2";//通配符模式public static final String TOPIC_EXCHANGE = "topic.exchange";public static final String TOPIC_QUEUE1 = "topic_queue1";public static final String TOPIC_QUEUE2 = "topic_queue2";//rpc 模式public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";//publisher confirmspublic static final String PUBLISHER_CONFIRMS_QUEUE1 = "publisher.confirms.queue1";public static final String PUBLISHER_CONFIRMS_QUEUE2 = "publisher.confirms.queue2";public static final String PUBLISHER_CONFIRMS_QUEUE3 = "publisher.confirms.queue3";//推拉模式public static final String MESSAGE_QUEUE = "message.queue";}

2.1 简单模式

无 

2.2 WorkQueues(⼯作队列)

生产者:

java">public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列   使用内置的交换机//如果队列不存在, 则创建, 如果队列存在, 则不创建channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);//4. 发送消息for (int i = 0; i < 10; i++) {String msg = "hello work queue...."+i;channel.basicPublish("",Constants.WORK_QUEUE, null, msg.getBytes());}System.out.println("消息发送成功~");//5. 资源释放channel.close();connection.close();}
}

消费者(两个一样): 

java">public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列   使用内置的交换机//如果队列不存在, 则创建, 如果队列存在, 则不创建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};//默认交换机,RoutingKey=队列名称channel.basicConsume(Constants.WORK_QUEUE, true, consumer);//5. 资源释放channel.close();connection.close();}
}

2.3 Publish/Subscribe(发布/订阅)

生产者: 

java">public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3.声明交换机channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true);//4. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);//5.交换机和队列绑定channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");//6. 发布消息String msg = "hello fanout....";channel.basicPublish(Constants.FANOUT_EXCHANGE,"", null, msg.getBytes());System.out.println("消息发送成功");//7. 释放资源channel.close();connection.close();}
}

消费者:

java">public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);}
}
java">public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE2, true, consumer);}
}

2.4 Routing(路由模式)

生产者:

java">public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);//4. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);//5. 绑定交换机和队列channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");//6. 发送消息String msg = "hello direct, my routingkey is a....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"a", null, msg.getBytes());String msg_b = "hello direct, my routingkey is b....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"b", null, msg_b.getBytes());String msg_c = "hello direct, my routingkey is c....";channel.basicPublish(Constants.DIRECT_EXCHANGE,"c", null, msg_c.getBytes());System.out.println("消息发送成功");//7. 释放资源channel.close();connection.close();}
}

消费者: 

java">public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = ((Connection) connection).createChannel();//3. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);}
}

java">public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = ((Connection) connection).createChannel();//3. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE2, true, consumer);}
}

2.5 Topics(通配符模式)

生产者:

java">public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = ((Connection) connection).createChannel();//3. 声明交换机channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);//4. 声明队列channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);//5. 绑定交换机和队列channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");//6. 发送消息String msg = "hello topic, my routingkey is ae.a.f....";channel.basicPublish(Constants.TOPIC_EXCHANGE,"ae.a.f", null, msg.getBytes());  //转发到Q1String msg_b = "hello topic, my routingkey is ef.a.b....";channel.basicPublish(Constants.TOPIC_EXCHANGE,"ef.a.b", null, msg_b.getBytes()); //转发到Q1和Q2String msg_c = "hello topic, my routingkey is c.ef.d....";channel.basicPublish(Constants.TOPIC_EXCHANGE,"c.ef.d", null, msg_c.getBytes());//转发Q2System.out.println("消息发送成功");//7. 释放资源channel.close();connection.close();}
}

消费者: 

java">public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE1, true, consumer);}
}
java">public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.TOPIC_QUEUE2,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE2, true, consumer);}
}

2.6 RPC(RPC通信)

客户端:

java">/*** 1.发送请求* 2.接收响应*/
public class Client {public  static void main(String[] args) throws IOException, TimeoutException, InterruptedException, IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3.声明队列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);//4. 发送请求String msg = "hello rpc...";//设置请求的唯一标识String correlationID = UUID.randomUUID().toString();//设置请求的相关属性AMQP.BasicProperties props = new AMQP.BasicProperties().builder().correlationId(correlationID).replyTo(Constants.RPC_RESPONSE_QUEUE)//指定了⼀个回调队列,服务端处理后,会把响应结果发送到这个队列.build();//默认交换机,RoutingKey=队列名称//props属性channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());//4. 接收响应(校验ID)//使用阻塞队列, 来存储响应信息//不适用阻塞队列,很快就接收响应,但是响应还没有传过来final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String respMsg = new String(body);System.out.println("接收到回调消息:"+respMsg);if (correlationID.equals(properties.getCorrelationId())){//如果唯⼀标识正确, 放到阻塞队列中response.offer(respMsg);}}};channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);//获取回调的结果String result = response.take();System.out.println("[RPC Client 响应结果]:"+ result);}}

服务端:

java">/*** 1.接收请求* 2.发送响应*/public class Server {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 接收请求//设置服务端同时最多只能获取⼀个消息channel.basicQos(1);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//将请求转化为stringString request = new String(body, "UTF-8");System.out.println("接收到请求:" + request);//响应String response = "针对request:" + request + ", 响应成功";AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());//消息应答channel.basicAck(envelope.getDeliveryTag(), false);}};//false:手动确认channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);}
}

RabbitMQ消息确定机制

在RabbitMQ中,basicConsume⽅法的autoAck参数⽤于指定消费者是否应该⾃动向消息队列确认 消息

  • ⾃动确认(autoAck=true):消息队列在将消息发送给消费者后,会⽴即从内存中删除该消息.这意味着,如果消费者处理消息失败,消息将丢失,因为消息队列认为消息已经被成功消费
  • ⼿动确认(autoAck=false):消息队列在将消息发送给消费者后,需要消费者显式地调⽤basicAck ⽅法来确认消息.⼿动确认提供了更⾼的可靠性,确保消息不会被意外丢失,适⽤于消息处理重要且需 要确保每个消息都被正确处理的场景

2.7 Publisher Confirms(发布确认)

消息丢失⼤概分为三种情况:

1. ⽣产者问题.因为应⽤程序故障,⽹络抖动等各种原因,⽣产者没有成功向broker发送消息.

2. 消息中间件⾃⾝问题.⽣产者成功发送给了Broker,但是Broker没有把消息保存好,导致消息丢失.

3. 消费者问题.Broker发送消息到消费者,消费者在消费消息时,因为没有处理好,导致broker将消费 失败的消息从队列中删除了.


针对问题1,可以采⽤发布确认(PublisherConfirms)机制实现:

⽣产者将信道设置成confirm(确认)模式,⼀旦信道进⼊confirm模式,所有在该信道上⾯发布的消息都 会被指派⼀个唯⼀的ID(从1开始),⼀旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送⼀个确认给⽣产者(包含消息的唯⼀ID)

这就使得⽣产者知道消息已经正确到达⽬的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写⼊磁盘之后发出

broker回传给⽣产者的确认消息中deliveryTag 包含了确认消息的序号,此外broker也可以设置channel.basicAck⽅法中的multiple参数,表⽰到这个序号之前的所有消息都已经得到了处理

发送⽅确认机制最⼤的好处在于它是异步的,⽣产者可以同时发布消息和等待信道返回确认消息.

当消息最终得到确认之后,⽣产者可以通过回调⽅法来处理该确认消息.

如果RabbitMQ因为⾃⾝内部错误导致消息丢失,就会发送⼀条nack(Basic.Nack)命令,⽣产者同样可以在回调⽅法中处理该nack命令. 

java">public class PublisherConfirms {private static final Integer MESSAGE_COUNT = 100;static Connection createConnection() throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机return connectionFactory.newConnection();}public static void main(String[] args) throws Exception {//Strategy #1: Publishing Messages Individually//单独确认//publishingMessagesIndividually();//Strategy #2: Publishing Messages in Batches//批量确认//publishingMessagesInBatches();//Strategy #3: Handling Publisher Confirms Asynchronously//异步确认handlingPublisherConfirmsAsynchronously();}/*** 异步确认*/private static void handlingPublisherConfirmsAsynchronously() throws Exception {try (Connection connection = createConnection()){//1. 开启信道Channel channel = connection.createChannel();//2. 设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);//4. 监听confirm//创建有序集合.中存储的是未确认的消息IDlong start = System.currentTimeMillis();SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if (multiple){//<=都清除//headSet返回<n的集合,但是这条也要被删除confirmSeqNo.headSet(deliveryTag+1).clear();}else {confirmSeqNo.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple){confirmSeqNo.headSet(deliveryTag+1).clear();}else {confirmSeqNo.remove(deliveryTag);}//业务需要根据实际场景进行处理, 比如重发, 此处代码省略}});//5.发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publisher confirms"+i;//获取要发送消息的序号long seqNo = channel.getNextPublishSeqNo();channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());//放confirmSeqNo.add(seqNo);}while (!confirmSeqNo.isEmpty()){Thread.sleep(10);}long end = System.currentTimeMillis();System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);}}/*** 批量确认*/private static void publishingMessagesInBatches() throws Exception{try(Connection connection = createConnection()) {//1. 开启信道Channel channel = connection.createChannel();//2. 设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);//4. 发送消息, 并进行确认long start = System.currentTimeMillis();int batchSize = 100;int outstandingMessageCount = 0;for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publisher confirms"+i;channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());outstandingMessageCount++;if (outstandingMessageCount==batchSize){channel.waitForConfirmsOrDie(5000);outstandingMessageCount = 0;}}if (outstandingMessageCount>0){channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);}}/*** 单独确认*/private static void publishingMessagesIndividually() throws Exception {try (Connection connection = createConnection()){//1.开启新道Channel channel = connection.createChannel();//2.设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);//4.发送信息,并等待确认long start = System.currentTimeMillis();for(int i =0; i< MESSAGE_COUNT;i++){String msg = "hello publisher confirms"+i;channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());//等待确认channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);}}}

异步确认:

Channel接⼝提供了⼀个⽅法addConfirmListener.这个⽅法可以添加ConfirmListener回调接⼝

deliveryTag 表⽰发送消息的序号

multiple 表⽰是否批量确认

我们需要为每⼀个Channel维护⼀个已发送消息的序号集合.当收到RabbitMQ的confirm回调时,从集 合中删除对应的消息.当Channel开启confirm模式后,channel上发送消息都会附带⼀个从1开始递增的 deliveryTag序号. 可以使⽤SortedSet的有序性来维护这个已发消息的集合.

  • 当收到ack时,从序列中删除该消息的序号.如果为批量确认消息,表⽰⼩于等于当前序号 deliveryTag的消息都收到了,则清除对应集合
  • 当收到nack时,处理逻辑类似,不过需要结合具体的业务情况,进⾏消息重发等操作.

对比:

消息数越多,异步确认的优势越明显

 

3. Spring Boot整合RabbitMQ

 创建项⽬时, 加⼊依赖:

 

添加配置:

详情代码看idea

 

 

3.1 工作队列模式

 

声明队列(@Bean交给spring进行管理):

生产者:

 

消费者:

① Message message

②String message 

 

此处返回的是message的具体内容

channel:

 

3.2  Publish/Subscribe(发布订阅模式)

 

声明队列,交换机:

绑定:

这里@Qualifier指定绑定的队列和交换机

 

发送消息:

 

 

 

3.3 Routing(路由模式)

 

此时绑定需要指定routingkey 

 

注意:放到路径里,需要使用注解@PathVariable

 

 

3.4 Topics(通配符模式)

 

 

 

 

4. 基于SpringBoot+RabbitMQ完成应⽤通信

需求:

⽤⼾下单成功之后, 通知物流系统, 进⾏发货

 

订单系统:生产者

物流系统:消费者

创建项目

创建⼀个空的项⽬ rabbitmq-communication(其实就是⼀个空的⽂件夹),将两个项⽬放在⼀个项⽬中

生产者

配置 

java">spring:rabbitmq:addresses: amqp://xuexue:xuexue@119.91.154.99:5672/order
server:port: 9090

声明队列

java">@Configuration
public class RabbitMQConfig {@Bean("orderQueue")public Queue orderQueue(){return QueueBuilder.durable("order.create").build();}
}

发送订单消息

 

java">@RequestMapping("/order")
@RestController
public class OrderController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/create")public String create(){//发送消息String orderId = UUID.randomUUID().toString();rabbitTemplate.convertAndSend("","order.create","订单信息,订单ID:"+orderId);return "下单成功";}}

启动服务,观察结果 

119.91.154.99:15672

 

 

 

 

消费者

配置

java">spring:rabbitmq:addresses: amqp://xuexue:xuexue@119.91.154.99:5672/order
server:port: 8080

 

监听队列 

java">@Component
public class OrderListener {@RabbitListener(queues = "order.create")public void handMessage(String orderInfo){System.out.println("接收到订单消息:"+orderInfo);}}

 结果

  

发送消息格式为对象

如果通过 RabbitTemplate 发送⼀个对象作为消息, 我们需要对该对象进⾏序列化.

Spring AMQP推荐使⽤JSON序列化,Spring AMQP提供了 Jackson2JsonMessageConverter,我们需要把⼀个 MessageConverter 设置 到 RabbitTemplate 中

JSON序列化(生产者和消费者都要添加)

java">@Configuration
public class RabbitMQConfig {@Bean("orderQueue")public Queue orderQueue(){return QueueBuilder.durable("order.create").build();}@Beanpublic Jackson2JsonMessageConverter jackson2JsonMessageConverter(){return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,Jackson2JsonMessageConverter jackson2JsonMessageConverter){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());return rabbitTemplate;}
}

定义对象

java">@Data
public class OrderInfo  { private String orderId;private String name;
}

生产者

java">@RequestMapping("/order")
@RestController
public class OrderController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/create")public String create(){//发送消息String orderId = UUID.randomUUID().toString();rabbitTemplate.convertAndSend("","order.create","订单信息,订单ID:"+orderId);return "下单成功";}@RequestMapping("/create2")public String create2(){//发送消息OrderInfo orderInfo  = new OrderInfo();orderInfo.setOrderId( UUID.randomUUID().toString());orderInfo.setName("价格"+new Random().nextInt(100));rabbitTemplate.convertAndSend("","order.create","订单信息,订单ID:"+orderInfo);return "下单成功";}
}

查看消息: 

消费者 

java">@Component
@RabbitListener(queues = "order.create")
public class OrderListener {@RabbitHandlerpublic void handMessage(String orderInfo){System.out.println("接收到订单消息String:"+orderInfo);}@RabbitHandlerpublic void handMessage(OrderInfo orderInfo){System.out.println("接收到订单消息OrderInfo:"+orderInfo);}
}

@RabbitListener(queues="order.create")可以加在类上,也可以加在方法上,⽤于定于⼀个类或者方法作为消息的监听器

@RabbitHandler是一个方法级别的注解,使用它该方法被调用处理特定的消息


http://www.ppmy.cn/ops/130867.html

相关文章

macOS开发环境配置与应用开发教程

macOS开发环境配置与应用开发教程 引言 macOS是一个强大的操作系统&#xff0c;广泛应用于软件开发&#xff0c;尤其是iOS和macOS应用开发。本文将详细介绍如何配置macOS开发环境&#xff0c;并通过实例演示如何进行应用开发。希望通过这篇文章&#xff0c;帮助读者快速上手m…

使用Jupyter Notebook进行数据科学项目

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 使用Jupyter Notebook进行数据科学项目 Jupyter Notebook 简介 安装 Jupyter Notebook 创建和管理 Notebook 编写和运行代码 示例…

线性代数(第一章:行列式)

一、行列式的概念 1. 行列式的定义 其中: j1j2…jn 是 n 级排列:n 个自然数按照一定的次序排成的无重复数字的有序数组,如 2314 。 τ(j1j2…jn) 是逆序数。逆序:一个排列中,若大数在小数前,则称这两个数构成一个逆序。逆序数即一个排列中的逆序总数。 行列式是一个数,…

Manus在虚拟现实仿真模拟中的应用案例分享

Manus虚拟现实手套作为一种高精度的人机交互设备&#xff0c;在仿真模拟领域展现出了巨大的应用潜力。通过提供实时、准确的手指动作捕捉数据&#xff0c;Manus手套为多个行业带来了前所未有的仿真体验&#xff0c;推动了技术发展和应用创新。 技术特点 1. 高精度手指跟踪 Manu…

面试准备第一版ssm spring-springmvc

请写出spring中常用的依赖注入方法&#xff1a; 1、setter 2、构造方法注入 3、字段注入 Setter 注入&#xff1a; 通过公共的 setter 方法进行依赖注入。优点&#xff1a;可选依赖&#xff0c;能更清晰地看到依赖关系。缺点&#xff1a;依赖在构造时不可用&#xff0c;可能导…

解决edge浏览器无法同步问题

有时候电脑没带&#xff0c;但是浏览器没有同步很烦恼。chrome浏览器的同步很及时在多设备之间能很好使用。但是edge浏览器同步没反应。 在这里插入图片描述 解决方法&#xff1a; 一、进入edge浏览器点击图像会显示未同步。点击“管理个人资料”&#xff0c;进入后点击同步&…

微信小程序生成二维码

目前是在开发小程序端 --> 微信小程序。然后接到需求&#xff1a;根据 form 表单填写内容生成二维码&#xff08;第一版&#xff1a;表单目前需要客户进行自己输入&#xff0c;然后点击生成按钮实时生成二维码&#xff0c;不需要向后端请求&#xff0c;不存如数据库&#xf…

利用JoySSL的免费SSL证书,为您的网站安全保驾护航

JoySSL的优势 一年期免费SSL证书 JoySSL深知用户需求&#xff0c;因此我们提供一年期免费SSL证书&#xff0c;让您在享受高品质服务的同时&#xff0c;无需担忧成本问题。只需填写注册码230907&#xff0c;即可轻松获得。无限制证书申请数量 JoySSL承诺&#xff0c;无论您是个…