rabbitMq举例

news/2024/12/18 21:06:05/

新来个技术总监,把 RabbitMQ 讲的那叫一个透彻,佩服!

生产者

代码举例

public String sendMsg(final String exchangeName,final String routingKey,final String msg) {}

java">/*** 发送消息* @param exchangeName	exchangeName* @param routingKey	routingKey* @param msg	mq message* @return msgId*/public String sendMsg(final String exchangeName,final String routingKey,final String msg) {final CorrelationData correlationDataInfo = new CorrelationData();final String msgId = String.join(":", "yl:dps", exchangeName, routingKey,String.valueOf(IdUtil.getSnowflake(1, 3).nextId()));correlationDataInfo.setId(msgId);//send over callback lograbbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if(correlationData!=null) {final String id = correlationData.getId();if (ack) {log.info("消息投递成功,消息Id[{}]【{}】", id, JSON.toJSONString(correlationData));} else {log.error("消息投递失败,消息Id[{}] [{}]", id, cause);}}else {log.error("消息投递失败,correlationData为null!");}});rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationDataInfo);

分析代码

这段代码的功能是向 RabbitMQ 发送消息,并且在发送消息后通过回调函数来确认消息的投递状态。下面是对代码的详细分析:

1. 方法签名

java

public String sendMsg(final String exchangeName, final String routingKey, final String msg)

  • 该方法接收三个参数:
    • exchangeName:消息发送的交换机名称。
    • routingKey:消息的路由键,用来确定消息将被路由到哪个队列。
    • msg:待发送的消息内容。

2. CorrelationData 创建与消息 ID 设置

java

final CorrelationData correlationDataInfo = new CorrelationData(); final String msgId = String.join(":", "yl:dps", exchangeName, routingKey, String.valueOf(IdUtil.getSnowflake(1, 3).nextId())); correlationDataInfo.setId(msgId);

  • CorrelationData:RabbitMQ 提供的一个类,用于设置和传递消息的相关信息,主要用来在消息确认回调中标识消息。
  • 生成 msgIdmsgId 是通过拼接一个前缀 "yl:dps"、交换机名 exchangeName、路由键 routingKey,以及通过 IdUtil.getSnowflake(1, 3).nextId() 生成的雪花算法(Snowflake)ID来创建的唯一标识符。这个 ID 用来唯一标识这条消息。
  • setId(msgId):将生成的 msgId 设置到 correlationDataInfo 对象中。

3. 设置回调函数

java

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if(correlationData != null) { final String id = correlationData.getId(); if (ack) { log.info("消息投递成功,消息Id[{}]【{}】", id, JSON.toJSONString(correlationData)); } else { log.error("消息投递失败,消息Id[{}] [{}]", id, cause); } } else { log.error("消息投递失败,correlationData为null!"); } });

  • setConfirmCallback:通过 rabbitTemplate.setConfirmCallback() 方法设置消息确认回调函数。在消息发送之后,RabbitMQ 会调用该回调函数来通知消息是否成功投递到交换机。
  • 参数说明
    • correlationData:携带消息相关信息的对象(这里是 correlationDataInfo),包含了消息的 ID 等。
    • ack:布尔值,表示消息是否成功投递(true 表示成功,false 表示失败)。
    • cause:如果 ack 为 falsecause 会包含失败的原因。
  • 回调内容
    • 如果 correlationData 不为空:
      • 如果 ack 为 true,则表示消息投递成功,记录日志。
      • 如果 ack 为 false,则表示消息投递失败,记录失败日志,并包含失败原因。
    • 如果 correlationData 为 null,记录错误日志。

4. 发送消息

 

java

rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, correlationDataInfo);

  • convertAndSend:该方法用于将消息发送到指定的交换机(exchangeName)、路由键(routingKey)和消息内容(msg)。同时,将 correlationDataInfo(包含 msgId)附带发送,以便在回调中进行确认和记录。
  • rabbitTemplate 是 RabbitTemplate 对象,用来发送和接收消息。它是 Spring AMQP 的核心组件,封装了与 RabbitMQ 的交互。

5. 返回消息 ID

return msgId;

  • 返回生成的消息 ID。这个 ID 可以在外部被用于追踪消息投递的状态。

总结:

这段代码的主要作用是:

  • 生成一个唯一的消息 ID,包含了交换机名、路由键和通过 Snowflake 算法生成的 ID。
  • 设置消息确认回调,当消息发送后,RabbitMQ 会通过回调通知消息是否成功投递。
  • 调用 rabbitTemplate.convertAndSend() 方法向指定的交换机发送消息,同时附带 CorrelationData(包含消息 ID)。
  • 返回生成的消息 ID,以便后续追踪或处理。

关键点:

  • 消息确认机制:使用 ConfirmCallback 确保消息是否成功投递,并且通过日志记录投递状态。
  • 消息 ID:通过 Snowflake 算法生成一个唯一的消息 ID,可以用于追踪和日志记录。
  • 回调机制:确保生产者能在消息投递过程中获得详细的确认信息。

消费者

代码举例

java">@RabbitHandler@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${rabbitmq.yl.x.queue}", durable = "true"),exchange = @Exchange(value = "${rabbitmq.yl.x.exchange}"),key = "${rabbitmq.yl.x.routing.key}"))public void stateChange(@Payload String payload, @Headers Map<String, Object> headers, Channel channel) throws IOException {log.info("MQ-收到【x】状态变更消息:{} ", payload);final String objDeliveryTag = String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG));if (NumberUtil.isNumber(objDeliveryTag)) {final long deliveryTag= Long.parseLong(objDeliveryTag);log.info("MQ-收到【x】MQ消息进行消费【执行ack】[{}]", deliveryTag);channel.basicAck(deliveryTag,true);}if(CharSequenceUtil.isNotBlank(payload)) {}}

在这个 Java 方法 leaseStateChange 中,通常是在使用 Spring AMQP 框架与 RabbitMQ 集成时出现的消费消息的处理方法。下面逐个解析这个方法中的各个部分:

方法签名解析

public void leaseStateChange(@Payload String payload, @Headers Map<String, Object> headers, Channel channel) throws IOException

1. public void leaseStateChange
  • public: 表明这个方法是公开的,可以被外部调用。
  • void: 表明这个方法没有返回值。
  • leaseStateChange: 这是方法的名称。可以推测这个方法是用来处理与“租赁状态变化”相关的消息的。
2. @Payload String payload
  • @Payload: 这是 Spring AMQP 框架中的一个注解,用于将消息体的内容绑定到方法参数中。在这个例子中,payload 是一个 String 类型的参数,代表从 RabbitMQ 队列中接收到的消息内容。通常,这个消息体是通过 JSON 或其他格式的字符串传递的。

    • 注解的作用: @Payload 使得 Spring 能够自动将消息的主体部分注入到方法参数 payload 中。比如,如果消息体是一个 JSON 格式的字符串,Spring 会将其直接赋值给 payload 参数。

    • 示例: 假设接收到的消息体是 "{"state": "active", "leaseId": "12345"}"payload 将会是该字符串。

3. @Headers Map<String, Object> headers
  • @Headers: 这是另一个 Spring AMQP 注解,用来将消息的头部信息注入到方法参数中。RabbitMQ 消息不仅有消息体(payload),还可能包含一些头信息(比如消息的发送时间、路由信息等)。

    • 注解的作用: @Headers 会将消息头部的内容绑定到 headers 参数,这个参数是一个 Map<String, Object> 类型,其中键是头部的名称,值是相应的值。头部信息常常用于传递一些附加信息(例如消息的优先级、发送者标识等)。

    • 示例: 如果消息头包含如下信息:

      {"correlationId": "abc123", "messageType": "leaseUpdate"}

      那么 headers 将会是一个 Map,其内容是:

      {"correlationId": "abc123", "messageType": "leaseUpdate"}
4. Channel channel
  • Channel: 这是 RabbitMQ 的核心概念之一。Channel 代表一个与 RabbitMQ 服务的连接通道,允许你在该通道上进行消息的消费、确认等操作。

    • 作用: 在 Spring AMQP 中,Channel 通常用来进行消息的确认(acknowledge)操作,或者处理消息处理失败时的重新排队等任务。你可以使用 Channel 来手动确认消息,或者控制消息是否成功消费。

    • 示例: 如果在消息处理过程中出现异常,消费者可能需要通过 channel.basicNack() 方法来拒绝该消息并可能重新入队。

5. throws IOException
  • throws IOException: 表明这个方法可能会抛出 IOException 异常。RabbitMQ 的消息操作可能会遇到 I/O 错误,因此需要在方法签名中声明可能抛出此异常。通常,这类异常会发生在与 RabbitMQ 的连接中断、消息传输过程失败时等。

Spring AMQP 消费者代码示例

假设这是一个处理来自某个队列的消息的方法,下面是该方法的使用场景和完整代码示例:

java">import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.MessageListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;import java.io.IOException;
import java.util.Map;@Component
public class LeaseStateChangeListener {// 监听指定队列的消息@RabbitListener(queues = "leaseStateQueue")public void leaseStateChange(@Payload String payload, @Headers Map<String, Object> headers, Channel channel) throws IOException {try {// 处理消息体System.out.println("Received message: " + payload);// 获取消息头部信息String correlationId = (String) headers.get("correlationId");String messageType = (String) headers.get("messageType");System.out.println("CorrelationId: " + correlationId + ", MessageType: " + messageType);// 模拟处理业务逻辑processLeaseStateChange(payload);// 确认消息已成功消费channel.basicAck(headers.hashCode(), false);  // 手动确认消息} catch (Exception e) {// 异常处理,拒绝消息并重新入队System.err.println("Error processing message: " + e.getMessage());channel.basicNack(headers.hashCode(), false, true); // 拒绝并重新入队}}private void processLeaseStateChange(String payload) {// 假设这里是处理租赁状态更新的业务逻辑// 比如将消息解析为对象,进行租赁状态更新System.out.println("Processing lease state change for payload: " + payload);}
}

解析

  • @RabbitListener: 注解的作用是声明这个方法是一个 RabbitMQ 消息的消费者,并且该方法监听 leaseStateQueue 队列。当有消息到达这个队列时,这个方法会被调用。

  • 消息体 (payload): 这个方法会接收到一个消息体,@Payload 注解将该消息的内容(通常是 JSON 格式的字符串)自动绑定到方法参数 payload 上。

  • 消息头 (headers): 使用 @Headers 注解将消息的头部信息绑定到 headers 参数上,Map<String, Object> 类型。你可以从中获取如 correlationIdmessageType 等附加信息。

  • Channel: 这个参数用于消息的确认、拒绝等操作。在成功处理完消息后,调用 channel.basicAck() 来确认消息,表示该消息已经被成功消费。如果处理失败,调用 channel.basicNack() 拒绝该消息,并可以选择是否重新入队。

总结

  • 该方法是一个 RabbitMQ 消费者,用于从指定的队列中消费消息。
  • 通过 @Payload 获取消息体内容,使用 @Headers 获取消息头信息。
  • 使用 Channel 来确认消息的处理状态。
  • 使用 @RabbitListener 注解自动监听队列,并处理消费的消息。

这种方式非常适合处理队列中的业务逻辑,并能够灵活处理消息的确认、拒绝等操作。

消息怎么知道发给哪一个队列

先看队列与交换机怎么绑定的

先创建队列,然后绑定到交换机

RabbitMQ系列-6.如何通过控制台创建交换机、队列、死信队列、延迟队列 - 简书


http://www.ppmy.cn/news/1556204.html

相关文章

【网络安全设备系列】7、流量监控设备

0x00 定义: 网络流量控制是一种利用软件或硬件方式来实现对电脑网络流量的控制。它的最主要方法&#xff0c;是引入QoS的概念&#xff0c;从通过为不同类型的 网络数据包标记&#xff0c;从而决定数据包通行的优先次序。 0x01 类型: 流控技术分为两种&#xff1a; 一种是…

【Rust自学】3.2. 数据类型:标量类型

3.2.0. 写在正文之前 欢迎来到Rust自学的第三章&#xff0c;一共有6个小节&#xff0c;分别是: 变量与可变性数据类型&#xff1a;标量类型&#xff08;本文&#xff09;数据类型&#xff1a;复合类型函数和注释控制流&#xff1a;if else控制流&#xff1a;循环 通过第二章…

探索Django:从项目创建到图片上传的全方位指南

Django是什么 Django 是一个流行的 Python Web 开发框架&#xff0c;它提供了一系列工具和库&#xff0c;用于帮助开发人员构建高效、可扩展的 Web 应用程序。Django 的目标是让开发者能够以快速和简单的方式构建复杂的 Web 应用&#xff0c;通过提供许多预构建的组件和功能&a…

xshell连接虚拟机,更换网络模式:NAT->桥接模式

NAT模式&#xff1a;虚拟机通过宿主机的网络访问外网。优点在于不需要手动配置IP地址和子网掩码&#xff0c;只要宿主机能够访问网络&#xff0c;虚拟机也能够访问。对外部网络而言&#xff0c;它看到的是宿主机的IP地址&#xff0c;而不是虚拟机的IP。但是&#xff0c;宿主机可…

鸿蒙NEXT开发案例:经纬度距离计算

【引言】 在鸿蒙NEXT平台上&#xff0c;我们可以轻松地开发出一个经纬度距离计算器&#xff0c;帮助用户快速计算两点之间的距离。本文将详细介绍如何在鸿蒙NEXT中实现这一功能&#xff0c;通过简单的用户界面和高效的计算逻辑&#xff0c;为用户提供便捷的服务。 【环境准备…

400G/800G光模块崛起:AI时代的网络基础设施革命

随着AI技术的不断成熟&#xff0c;各行各业都在大规模投入AI。医疗行业通过AI技术实现了更精准的诊断和治疗&#xff1b;金融行业通过AI技术提高了风险管理能力&#xff1b;制造行业通过AI技术优化了生产流程&#xff1b;娱乐行业通过AI技术创造了更加丰富的用户体验。AI在医疗…

在 Ubuntu 下通过 Docker 部署 FTP 服务器

在今天的技术探险中&#xff0c;我们将使用 Docker 在 Ubuntu 上部署一个 FTP 服务器。这不仅能提升文件传输的效率&#xff0c;还能让你在管理上游刃有余。Docker 的灵活性和 vsftpd 的安全性让这一切变得简单有趣。准备好了吗&#xff1f;让我们开始吧&#xff01; Docker 和…

android 控制主板串口

import com.fazecast.jSerialComm.SerialPort; import java.nio.charset.StandardCharsets; public class SendAndReceiveFromAllPorts { public static void main(String[] args) { SerialPort[] ports SerialPort.getCommPorts(); // 统一的发送指令&#xff08;16 进制&…