rabbitMq举例

server/2024/12/16 8:28:05/

新来个技术总监,把 RabbitMQ 讲的那叫一个透彻,佩服!

生产者

代码举例

public String sendMsg(final String exchangeName,final String routingKey,final String msg) {}

java">/*** 发送消息* @param exchangeName	exchangeName* @param routingKey	routingKey* @param msg	mq message* @return msgId*/public String sendMsg(final String exchangeName,final String routingKey,final String msg) {final CorrelationData correlationDataInfo = new CorrelationData();final String msgId = String.join(":", "yl:dps", exchangeName, routingKey,String.valueOf(IdUtil.getSnowflake(1, 3).nextId()));correlationDataInfo.setId(msgId);//send over callback lograbbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if(correlationData!=null) {final String id = correlationData.getId();if (ack) {log.info("消息投递成功,消息Id[{}]【{}】", id, JSON.toJSONString(correlationData));} else {log.error("消息投递失败,消息Id[{}] [{}]", id, cause);}}else {log.error("消息投递失败,correlationData为null!");}});rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationDataInfo);

分析代码

这段代码的功能是向 RabbitMQ 发送消息,并且在发送消息后通过回调函数来确认消息的投递状态。下面是对代码的详细分析:

1. 方法签名

java

public String sendMsg(final String exchangeName, final String routingKey, final String msg)

  • 该方法接收三个参数:
    • exchangeName:消息发送的交换机名称。
    • routingKey:消息的路由键,用来确定消息将被路由到哪个队列。
    • msg:待发送的消息内容。

2. CorrelationData 创建与消息 ID 设置

java

final CorrelationData correlationDataInfo = new CorrelationData(); final String msgId = String.join(":", "yl:dps", exchangeName, routingKey, String.valueOf(IdUtil.getSnowflake(1, 3).nextId())); correlationDataInfo.setId(msgId);

  • CorrelationData:RabbitMQ 提供的一个类,用于设置和传递消息的相关信息,主要用来在消息确认回调中标识消息。
  • 生成 msgIdmsgId 是通过拼接一个前缀 "yl:dps"、交换机名 exchangeName、路由键 routingKey,以及通过 IdUtil.getSnowflake(1, 3).nextId() 生成的雪花算法(Snowflake)ID来创建的唯一标识符。这个 ID 用来唯一标识这条消息。
  • setId(msgId):将生成的 msgId 设置到 correlationDataInfo 对象中。

3. 设置回调函数

java

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if(correlationData != null) { final String id = correlationData.getId(); if (ack) { log.info("消息投递成功,消息Id[{}]【{}】", id, JSON.toJSONString(correlationData)); } else { log.error("消息投递失败,消息Id[{}] [{}]", id, cause); } } else { log.error("消息投递失败,correlationData为null!"); } });

  • setConfirmCallback:通过 rabbitTemplate.setConfirmCallback() 方法设置消息确认回调函数。在消息发送之后,RabbitMQ 会调用该回调函数来通知消息是否成功投递到交换机。
  • 参数说明
    • correlationData:携带消息相关信息的对象(这里是 correlationDataInfo),包含了消息的 ID 等。
    • ack:布尔值,表示消息是否成功投递(true 表示成功,false 表示失败)。
    • cause:如果 ack 为 falsecause 会包含失败的原因。
  • 回调内容
    • 如果 correlationData 不为空:
      • 如果 ack 为 true,则表示消息投递成功,记录日志。
      • 如果 ack 为 false,则表示消息投递失败,记录失败日志,并包含失败原因。
    • 如果 correlationData 为 null,记录错误日志。

4. 发送消息

 

java

rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationDataInfo);

  • convertAndSend:该方法用于将消息发送到指定的交换机(exchangeName)、路由键(routingKey)和消息内容(msg)。同时,将 correlationDataInfo(包含 msgId)附带发送,以便在回调中进行确认和记录。
  • rabbitTemplate 是 RabbitTemplate 对象,用来发送和接收消息。它是 Spring AMQP 的核心组件,封装了与 RabbitMQ 的交互。

5. 返回消息 ID

return msgId;

  • 返回生成的消息 ID。这个 ID 可以在外部被用于追踪消息投递的状态。

总结:

这段代码的主要作用是:

  • 生成一个唯一的消息 ID,包含了交换机名、路由键和通过 Snowflake 算法生成的 ID。
  • 设置消息确认回调,当消息发送后,RabbitMQ 会通过回调通知消息是否成功投递。
  • 调用 rabbitTemplate.convertAndSend() 方法向指定的交换机发送消息,同时附带 CorrelationData(包含消息 ID)。
  • 返回生成的消息 ID,以便后续追踪或处理。

关键点:

  • 消息确认机制:使用 ConfirmCallback 确保消息是否成功投递,并且通过日志记录投递状态。
  • 消息 ID:通过 Snowflake 算法生成一个唯一的消息 ID,可以用于追踪和日志记录。
  • 回调机制:确保生产者能在消息投递过程中获得详细的确认信息。

消费者

代码举例

java">@RabbitHandler@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${rabbitmq.yl.x.queue}", durable = "true"),exchange = @Exchange(value = "${rabbitmq.yl.x.exchange}"),key = "${rabbitmq.yl.x.routing.key}"))public void stateChange(@Payload String payload, @Headers Map<String, Object> headers, Channel channel) throws IOException {log.info("MQ-收到【x】状态变更消息:{} ", payload);final String objDeliveryTag = String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG));if (NumberUtil.isNumber(objDeliveryTag)) {final long deliveryTag= Long.parseLong(objDeliveryTag);log.info("MQ-收到【x】MQ消息进行消费【执行ack】[{}]", deliveryTag);channel.basicAck(deliveryTag,true);}if(CharSequenceUtil.isNotBlank(payload)) {}}

在这个 Java 方法 leaseStateChange 中,通常是在使用 Spring AMQP 框架与 RabbitMQ 集成时出现的消费消息的处理方法。下面逐个解析这个方法中的各个部分:

方法签名解析

public void leaseStateChange(@Payload String payload, @Headers Map<String, Object> headers, Channel channel) throws IOException

1. public void leaseStateChange
  • public: 表明这个方法是公开的,可以被外部调用。
  • void: 表明这个方法没有返回值。
  • leaseStateChange: 这是方法的名称。可以推测这个方法是用来处理与“租赁状态变化”相关的消息的。
2. @Payload String payload
  • @Payload: 这是 Spring AMQP 框架中的一个注解,用于将消息体的内容绑定到方法参数中。在这个例子中,payload 是一个 String 类型的参数,代表从 RabbitMQ 队列中接收到的消息内容。通常,这个消息体是通过 JSON 或其他格式的字符串传递的。

    • 注解的作用: @Payload 使得 Spring 能够自动将消息的主体部分注入到方法参数 payload 中。比如,如果消息体是一个 JSON 格式的字符串,Spring 会将其直接赋值给 payload 参数。

    • 示例: 假设接收到的消息体是 "{"state": "active", "leaseId": "12345"}"payload 将会是该字符串。

3. @Headers Map<String, Object> headers
  • @Headers: 这是另一个 Spring AMQP 注解,用来将消息的头部信息注入到方法参数中。RabbitMQ 消息不仅有消息体(payload),还可能包含一些头信息(比如消息的发送时间、路由信息等)。

    • 注解的作用: @Headers 会将消息头部的内容绑定到 headers 参数,这个参数是一个 Map<String, Object> 类型,其中键是头部的名称,值是相应的值。头部信息常常用于传递一些附加信息(例如消息的优先级、发送者标识等)。

    • 示例: 如果消息头包含如下信息:

      {"correlationId": "abc123", "messageType": "leaseUpdate"}

      那么 headers 将会是一个 Map,其内容是:

      {"correlationId": "abc123", "messageType": "leaseUpdate"}
4. Channel channel
  • Channel: 这是 RabbitMQ 的核心概念之一。Channel 代表一个与 RabbitMQ 服务的连接通道,允许你在该通道上进行消息的消费、确认等操作。

    • 作用: 在 Spring AMQP 中,Channel 通常用来进行消息的确认(acknowledge)操作,或者处理消息处理失败时的重新排队等任务。你可以使用 Channel 来手动确认消息,或者控制消息是否成功消费。

    • 示例: 如果在消息处理过程中出现异常,消费者可能需要通过 channel.basicNack() 方法来拒绝该消息并可能重新入队。

5. throws IOException
  • throws IOException: 表明这个方法可能会抛出 IOException 异常。RabbitMQ 的消息操作可能会遇到 I/O 错误,因此需要在方法签名中声明可能抛出此异常。通常,这类异常会发生在与 RabbitMQ 的连接中断、消息传输过程失败时等。

Spring AMQP 消费者代码示例

假设这是一个处理来自某个队列的消息的方法,下面是该方法的使用场景和完整代码示例:

java">import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.MessageListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;import java.io.IOException;
import java.util.Map;@Component
public class LeaseStateChangeListener {// 监听指定队列的消息@RabbitListener(queues = "leaseStateQueue")public void leaseStateChange(@Payload String payload, @Headers Map<String, Object> headers, Channel channel) throws IOException {try {// 处理消息体System.out.println("Received message: " + payload);// 获取消息头部信息String correlationId = (String) headers.get("correlationId");String messageType = (String) headers.get("messageType");System.out.println("CorrelationId: " + correlationId + ", MessageType: " + messageType);// 模拟处理业务逻辑processLeaseStateChange(payload);// 确认消息已成功消费channel.basicAck(headers.hashCode(), false);  // 手动确认消息} catch (Exception e) {// 异常处理,拒绝消息并重新入队System.err.println("Error processing message: " + e.getMessage());channel.basicNack(headers.hashCode(), false, true); // 拒绝并重新入队}}private void processLeaseStateChange(String payload) {// 假设这里是处理租赁状态更新的业务逻辑// 比如将消息解析为对象,进行租赁状态更新System.out.println("Processing lease state change for payload: " + payload);}
}

解析

  • @RabbitListener: 注解的作用是声明这个方法是一个 RabbitMQ 消息的消费者,并且该方法监听 leaseStateQueue 队列。当有消息到达这个队列时,这个方法会被调用。

  • 消息体 (payload): 这个方法会接收到一个消息体,@Payload 注解将该消息的内容(通常是 JSON 格式的字符串)自动绑定到方法参数 payload 上。

  • 消息头 (headers): 使用 @Headers 注解将消息的头部信息绑定到 headers 参数上,Map<String, Object> 类型。你可以从中获取如 correlationIdmessageType 等附加信息。

  • Channel: 这个参数用于消息的确认、拒绝等操作。在成功处理完消息后,调用 channel.basicAck() 来确认消息,表示该消息已经被成功消费。如果处理失败,调用 channel.basicNack() 拒绝该消息,并可以选择是否重新入队。

总结

  • 该方法是一个 RabbitMQ 消费者,用于从指定的队列中消费消息。
  • 通过 @Payload 获取消息体内容,使用 @Headers 获取消息头信息。
  • 使用 Channel 来确认消息的处理状态。
  • 使用 @RabbitListener 注解自动监听队列,并处理消费的消息。

这种方式非常适合处理队列中的业务逻辑,并能够灵活处理消息的确认、拒绝等操作。

消息怎么知道发给哪一个队列

先看队列与交换机怎么绑定的

先创建队列,然后绑定到交换机

RabbitMQ系列-6.如何通过控制台创建交换机、队列、死信队列、延迟队列 - 简书


http://www.ppmy.cn/server/150575.html

相关文章

Oracle plsqldev1106 安装及TNS配置

Oracle plsqldev1106 安装及TNS配置 下载好安装包&#xff0c;直接双击安装 点击 I Agree 默认是C盘的&#xff0c;我改了D盘&#xff0c;根据自己实际情况修改 这里用默认的for current user 也可以&#xff0c;我选了for all user 点Finish&#xff0c;等待安装完成即可 …

JavaScript ES6+ 语法速通

一、ES6 基础语法 1. let 和 const 声明变量 let&#xff1a;块级作用域&#xff0c;可以重新赋值。const&#xff1a;块级作用域&#xff0c;声明常量&#xff0c;不能重新赋值。 let name Li Hua; name Li Ming; // 可修改const age 21; // age 22; // 报错&#xff0…

jmeter CLI Mode 传参实现动态设置用户数

一.需求 CLI 运行模式下每次运行想要传入不同的用户数&#xff0c;比如寻找瓶颈值的场景&#xff0c;需要运行多次设置不同的用户数。 二.解决思路 查看官方API Apache JMeter - Users Manual: Getting Started api CLI Mode 一节中提到可以使用如下参数做属性的替换&#…

c++总复习

C 中多态性在实际项目中的应用场景 图形绘制系统 描述&#xff1a;在一个图形绘制软件中&#xff0c;可能有多种图形&#xff0c;如圆形、矩形、三角形等。这些图形都有一个共同的操作&#xff0c;比如绘制&#xff08;draw&#xff09;。通过多态性&#xff0c;可以定义一个基…

一个初始化bitmap的小算法

一个初始化bitmap小算法 根据长度&#xff0c;创建bitmap初始化bitmap 根据长度&#xff0c;创建bitmap 看到一个开源项目&#xff0c;利用bitmap存储数据&#xff0c;其中创建和初始化过程&#xff0c;比较经典。这里摘录出来&#xff0c;以备后续使用。代码采用的是golang …

Sentinel一分钟

前置 Qps&#xff1a;每秒查询率 吞吐量&#xff1a;指系统在单位时间内处理请求的数量 资源&#xff1a;我们代码中的 Java 方法,一段代码&#xff0c;或者一个接口 限流 核心&#xff1a; 对资源(url或其他)进行限流可对资源和来源进行限流可调用openapi自动生成规则或平…

国内CentOS使用yum安装docker和docker-compose

安装docker 安装需要的软件包&#xff0c; yum-util 提供yum-config-manager功能&#xff0c;另两个是devicemapper驱动依赖 yum install -y yum-utils device-mapper-persistent-data lvm2下载yum源采用阿里云的镜像源 wget -O /etc/yum.repos.d/docker-ce.repo https://mi…

RabbitMQ快速入门 - 生产者和消费者的简单实现

引入依赖 <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version> </dependency> 编写生产者代码 RabbitMQ 默认的⽤于客户端连接的 TCP 端⼝号是 5672, 需要提前进⾏开…