Kafka原生API使用Java代码-生产者-异步发送消息回调

ops/2024/10/18 14:18:29/

文章目录

  • 1、异步发送消息&回调
    • 1.1、pom.xml
    • 1.2、KafkaProducer1.java

1、异步发送消息&回调

回调就是接收kafka的响应

1.1、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu.kafka</groupId><artifactId>kafka-producer</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-producer</name><description>kafka-producer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

java_60">1.2、KafkaProducer1.java

java">package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducer1 {/*** 主函数用于演示如何向Kafka的特定主题发送消息。** @param args 命令行参数(未使用)*/public static void main(String[] args) {// 初始化Kafka生产者配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.74.148:9092"); // 指定Kafka broker的地址和端口props.put("acks", "all"); // 确认消息写入策略props.put("retries", 0); // 消息发送失败时的重试次数props.put("linger.ms", 1); // 发送缓冲区等待时间props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定键的序列化器props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定值的序列化器// 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);// 发送消息到主题"my_topic3"// 异步发送消息:不接收kafka的响应//producer.send(new ProducerRecord<String, String>("my_topic3",  "hello,1,2,3"));// 注释掉的循环代码块展示了如何批量发送消息//for (int i = 0; i < 100; i++)//    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));producer.send(new ProducerRecord<String, String>("my_topic3",  "hello_3"),new Callback() {//消息发送成功,kafka broker ack 以后回调@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {//exception:如果有异常代表消息未能正常发送到kafka,没有异常代表消息发送成功://此时kafka的消息不一定持久化成功(需要kafka生产者加配置)//RecordMetadata代表发送成功的消息的元数据System.out.println("recordMetadata.topic() = " + recordMetadata.topic());System.out.println("recordMetadata.partition() = " + recordMetadata.partition());System.out.println("msg offset = " + recordMetadata.offset());System.out.println("recordMetadata.timestamp() = " + recordMetadata.timestamp());}});// 关闭生产者实例producer.close();}
}
recordMetadata.topic() = my_topic3
recordMetadata.partition() = 1
msg offset = 1
recordMetadata.timestamp() = 1716976982243

在这里插入图片描述
在这里插入图片描述


http://www.ppmy.cn/ops/45042.html

相关文章

Go 控制协程(goroutine)的并发数量

在使用协程并发处理某些任务时, 其并发数量往往因为各种因素的限制不能无限的增大. 例如网络请求、数据库查询等等。 从运行效率角度考虑&#xff0c;在相关服务可以负载的前提下&#xff08;限制最大并发数&#xff09;&#xff0c;尽可能高的并发。 在Go语言中&#xff0c;…

搭建YOLOv10环境 训练+推理+模型评估

文章目录 前言一、环境搭建必要环境1. 创建yolov10虚拟环境2. 下载pytorch (pytorch版本>1.8)3. 下载YOLOv10源码4. 安装所需要的依赖包 二、推理测试1. 将如下代码复制到ultralytics文件夹同级目录下并运行 即可得到推理结果2. 关键参数 三、训练及评估1. 数据结构介绍2. 配…

Java_基础加强——日志、类加载器、单元测试、注解

1.日志&#xff1a; 日志的优势&#xff1a; 体系结构&#xff1a; 日志规范&#xff1a; 日志规范大多是一些接口&#xff0c;提供给实现框架去设计的 常见的规范是Commons Logging&#xff08;JCL&#xff09;、Simple Logging Facade for Java&#xff08;slf4j&#xff09…

第十三节:带你梳理Vue2 : watch侦听器

官方解释:> 观察 Vue 实例变化的一个表达式或计算属性函数。回调函数得到的参数为新值和旧值。表达式只接受监督的键路径。对于更复杂的表达式&#xff0c;用一个函数取代<br/>## 1. 侦听器的基本使用侦听器可以监听data对象属性或者计算属性的变化watch是观察属性的…

电脑同时配置两个版本mysql数据库常见问题

1.配置时&#xff0c;要把bin中的mysql.exe和mysqld.exe 改个名字&#xff0c;不然两个版本会重复&#xff0c;当然&#xff0c;在初始化数据库的时候&#xff0c;如果时57版本的&#xff0c;就用mysql57(已经改名的)和mysqld57 代替 mysql 和 mysqld 例如 mysql -u root -p …

Unix环境高级编程--8-进程控制---8.1-8.2进程标识-8.3fork函数-8.4 vfork函数

1、进程控制几个过程 创建进程--》执行进程---》终止进程 2、进程标识 &#xff08;1&#xff09;专用进程&#xff1a;ID为0的进程是调度进程&#xff0c;常常被称为交换进程&#xff0c;也称为系统进程&#xff1b; ID为1通常是init进程&#xff0c;在自举结束时由内核调用…

轧钢测径仪分析软件,四大图表带来产线新视角!

轧钢测径仪是智能化检测设备&#xff0c;除了测径仪主体外&#xff0c;还配有测控软件系统&#xff0c;从这里可对测径仪进行各种设置&#xff0c;亦可从此观测到测径仪获得的各种信息&#xff0c;如检测信息、分析图表、计算尺寸、历史数据等。而从测径仪获得的图表信息主要有…

leetcode-顺时针旋转矩阵-111

题目要求 思路 1.假设现在有一个矩阵 123 456 789 2.我们可以根据19这个对角线将数据进行交换&#xff0c;得到矩阵 147 258 369 3.然后将矩阵每一行的数据再翻转&#xff0c;得到矩阵 741 852 963 代码实现 class Solution { public:vector<vector<int> > rot…