kafkaTemplate.sendDefault(message).addCallback

devtools/2024/9/23 4:42:45/

在使用 kafkaTemplate.sendDefault(message).addCallback 时,你可以通过 addCallback 方法来处理发送消息后的成功和失败回调。

java">import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Callback;
import org.springframework.kafka.support.SendResult;public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String message) {kafkaTemplate.sendDefault(message).addCallback(new Callback() {@Overridepublic void onSuccess(SendResult<String, String> result) {// **处理成功的逻辑**System.out.println("Message sent successfully: " + result.getProducerRecord().value());}@Overridepublic void onFailure(org.apache.kafka.clients.producer.ProducerRecord<String, String> producerRecord, Exception ex) {// **处理失败的逻辑**System.err.println("Message failed to send: " + ex.getMessage());}});}
}

关键点:

  • 成功回调:在 onSuccess 方法中,你可以处理消息成功发送后的逻辑。
  • 失败回调:在 onFailure 方法中,你可以处理消息发送失败的情况。

@KafkaListenerkafkaTemplate.sendDefault(message).addCallback 是 Kafka 中用于不同目的的两个概念,具体区别如下:

1. 功能目的

  • @KafkaListener:

    • 用于消费消息。它是一个注解,用于标记一个方法,使其能够自动接收来自指定主题的消息。
  • kafkaTemplate.sendDefault(message).addCallback:

    • 用于发送消息。它是 KafkaTemplate 的一个方法,用于将消息发送到 Kafka 主题,并提供成功和失败的回调处理。

2. 使用场景

  • @KafkaListener:

    • 当你需要处理来自 Kafka 主题的消息时,使用 @KafkaListener 注解的方法会被自动调用。
  • kafkaTemplate.sendDefault(message).addCallback:

    • 当你需要将消息发送到 Kafka 主题时,使用 kafkaTemplate 发送消息,并可以通过回调处理发送结果。

3. 示例代码

  • @KafkaListener 示例:

    java">import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Service;@Service
    public class KafkaConsumer {@KafkaListener(topics = "your_topic", groupId = "your_group_id")public void listen(String message) {// **处理接收到的消息**System.out.println("Received message: " + message);}
    }
    
  • kafkaTemplate.sendDefault 示例:

    java">kafkaTemplate.sendDefault("your_topic", message).addCallback(new Callback() {@Overridepublic void onSuccess(SendResult<String, String> result) {// **处理成功的逻辑**}@Overridepublic void onFailure(ProducerRecord<String, String> producerRecord, Exception ex) {// **处理失败的逻辑**}
    });
    

总结

  • @KafkaListener 是用于消费消息的,而 kafkaTemplate.sendDefault 是用于发送消息的。

kafkaTemplate.sendDefault(message).addCallback 的成功回调中,包含的信息主要是 SendResult 对象。这个对象提供了关于发送消息的详细信息,包括:

  1. ProducerRecord:发送的消息记录。
  2. RecordMetadata:关于消息的元数据,例如主题、分区、偏移量等。

示例代码

以下是一个示例,展示了如何在成功回调中使用这些信息:

java">import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Callback;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;@Service
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String message) {kafkaTemplate.sendDefault(message).addCallback(new Callback() {@Overridepublic void onSuccess(SendResult<String, String> result) {// **获取发送的消息记录**String sentMessage = result.getProducerRecord().value();// **获取元数据**String topic = result.getRecordMetadata().topic();int partition = result.getRecordMetadata().partition();long offset = result.getRecordMetadata().offset();// **处理成功的逻辑**System.out.printf("Message sent successfully: %s, Topic: %s, Partition: %d, Offset: %d%n",sentMessage, topic, partition, offset);}@Overridepublic void onFailure(org.apache.kafka.clients.producer.ProducerRecord<String, String> producerRecord, Exception ex) {// **处理失败的逻辑**System.err.println("Message failed to send: " + ex.getMessage());}});}
}

关键点

  • result.getProducerRecord().value():获取发送的消息内容。
  • result.getRecordMetadata().topic():获取消息发送到的主题。
  • result.getRecordMetadata().partition():获取消息发送到的分区。
  • result.getRecordMetadata().offset():获取消息在分区中的偏移量。

总结

在成功回调中,你可以获取到关于发送消息的详细信息,这些信息对于后续的处理和日志记录非常有用。


http://www.ppmy.cn/devtools/115815.html

相关文章

IOS 24 实现歌单详情(UITableView)列表

歌单详情完整效果 歌单详情歌单列表效果 歌单详情列表页整体效果稍微有点复杂&#xff0c;我们进行分部实现&#xff0c;先实现歌单详情里面的歌单列表&#xff0c;使用UITableView来实现。UITableView的使用在之前的文章中多次使用&#xff0c;想来也比较熟悉了。不熟悉的可以…

零工市场小程序:推动零工市场建设

人力资源和社会保障部在2024年4月发布了标题为《地方推进零工市场建设经验做法》的文章。 零工市场小程序的功能 信息登记与发布 精准匹配、推送 在线沟通 权益保障 零工市场小程序作为一个找零工的渠道&#xff0c;在往后随着技术的发展和政策的支持下&#xff0c;功能必然…

linux下的日志编写

1、日志初始化创建 2、日志写入 3、日志关闭 log.c #include "log.h"static log_t LOG;//初始化日志文件&#xff0c;在当前目录创建日志文件 int log_init(char *pdirname) {time_t t;struct tm *ptm NULL;char filepath[64] {0};int ret 0;time(&t);ptm …

网络编程问题解答

TCP/IP是哪种模型的协议 TCP/IP 是一组通信协议的集合&#xff0c;它基于 TCP/IP 模型。TCP/IP 模型通常被认为是一种实用的网络通信模型&#xff0c;与 OSI 模型相比&#xff0c;TCP/IP 模型更加简洁和侧重于实际应用&#xff0c;被广泛应用于互联网和大多数计算机网络中。 T…

CertiK因发现Apple Vision Pro眼动追踪技术漏洞,第6次获苹果认可

​2024年9月20日&#xff0c;头部Web3.0安全机构CertiK自豪地宣布&#xff0c;CertiK的工程师因发现Apple Vision Pro MR&#xff08;混合现实&#xff09;头显设备中的关键漏洞而获得Apple公司认可&#xff0c;这已经是Apple公司第六次公开发布对CertiK的致谢&#xff0c;Cert…

24年 九月 刷题记录

1. leetcode997找到小镇的法官 小镇里有 n 个人&#xff0c;按从 1 到 n 的顺序编号。传言称&#xff0c;这些人中有一个暗地里是小镇法官。 如果小镇法官真的存在&#xff0c;那么&#xff1a; 小镇法官不会信任任何人。 每个人&#xff08;除了小镇法官&#xff09;都信任这…

Qt安卓开发连接手机调试(红米K60为例)

1.前置条件 本人默认您已经完成Qt安卓环境的配置&#xff0c;若还没配置请参考链接文章&#xff1a;【Qt】最详细教程&#xff0c;如何从零配置Qt Android安卓环境_qt_七夕先生-开放原子开发者工作坊。准备一台目前主流在用的手机&#xff0c;其实自己用的就行(只要你不是某些…

光控资本:沪指涨0.59%,酿酒板块大幅拉升,数字货币概念等活跃

19日早盘&#xff0c;两市首要指数全线拉升&#xff0c;深证成指、创业板指涨约1%&#xff1b;场内超4800股飘红。 截至午间收盘&#xff0c;沪指涨0.59%报2733.38点&#xff0c;深证成指涨1.25%&#xff0c;创业板指涨0.99%&#xff0c;两市估计成交4263亿元。 盘面上看&…