目录
- 一、回调函数的异步发送消息概述
- 二、生产者带回调函数的异步发送消息(API代码示例)
- 2.1、pom文件导入依赖
- 2.2、API代码
- 2.3、在 kafka集群服务器上开启 Kafka 消费者
一、回调函数的异步发送消息概述
- 回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
二、生产者带回调函数的异步发送消息(API代码示例)
2.1、pom文件导入依赖
-
依赖包
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>
2.2、API代码
-
代码示例
-
注:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
package com.xz.kafka.producer;import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties;/*** @author: xz* @since: 2023/4/2 14:10* @description: 生产者带回调函数的异步发送消息*/ public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {//1、创建 kafka 生产者的配置对象Properties properties = new Properties();//2、给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");//3、指定对应的key和value的序列化类型 key.serializer value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//4、创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//5、调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("news", "hello kafka" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null){System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());}}});Thread.sleep(2);}//6、关闭资源kafkaProducer.close();} }
2.3、在 kafka集群服务器上开启 Kafka 消费者
-
在kafka集群某一台服务器上开启 Kafka 消费者
[root@localhost kafka-3.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.136.27:9092 --topic news
-
观察开启 Kafka 消费者的服务器中是否接收到消息。如下图所示:
-
在 IDEA 控制台观察回调信息,控制台如下: