使用kafka-clients操作数据(java)

news/2025/1/12 23:14:15/

一、添加依赖

     <!--    kafka-clients--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.5.1</version></dependency>

二、生产者

自定义分区,可忽略

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;public class MyPatitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String msgStr = value.toString();if(msgStr.contains("a")){return 1;}return 0;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

1、普通消息

 public static void main(String[] args) throws ExecutionException, InterruptedException {//配置Properties properties = new Properties();//连接参数properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");//序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//关联自定义分区器 可选properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");//优化参数 可选//缓冲器大小 32Mproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);//批次大小properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);//Linger.msproperties.put(ProducerConfig.LINGER_MS_CONFIG, 5);//压缩properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");//acksproperties.put(ProducerConfig.ACKS_CONFIG, "-1");//重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 3);//创建生产者KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//异步发送数据for (int i = 0; i < 10; i++) {//给first主题发消息kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i));//回调异步发送kafkaProducer.send(new ProducerRecord<String, String>("first", "hello2" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("主题:" + recordMetadata.topic() + "分区:" + recordMetadata.partition());}}});kafkaProducer.send(new ProducerRecord<String, String>("first", "a" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("主题:" + recordMetadata.topic() + "分区" + recordMetadata.partition() + "a");}}});Thread.sleep(500);}//同步for (int i = 0; i < 10; i++) {//给first主题发消息kafkaProducer.send(new ProducerRecord<String, String>("first", "sync_hello" + i)).get();}//关闭资源kafkaProducer.close();}
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
a0
hello0
hello20
a1
hello1
hello21
a2
hello2
hello22
a3
hello3
hello23
a4
hello4
hello24
a5
hello5
hello25
a6
hello6
hello26
a7
hello7
hello27
a8
hello8
hello28
a9
hello9
hello29
sync_hello0
sync_hello1
sync_hello2
sync_hello3
sync_hello4
sync_hello5
sync_hello6
sync_hello7
sync_hello8
sync_hello9

2、事务消息

 public static void main(String[] args) throws ExecutionException, InterruptedException {//配置Properties properties = new Properties();//连接参数properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");//序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//关联自定义分区器 可选properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");//优化参数 可选//缓冲器大小 32Mproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);//批次大小properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);//Linger.msproperties.put(ProducerConfig.LINGER_MS_CONFIG, 5);//压缩properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");//acksproperties.put(ProducerConfig.ACKS_CONFIG, "-1");//重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 3);//指定事务IDproperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactional_id_01");properties.put("enable.idempotence", "true");//创建生产者KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//事务消息 初始化kafkaProducer.initTransactions();//开始事务kafkaProducer.beginTransaction();try {kafkaProducer.send(new ProducerRecord<String, String>("first", "Transactions")).get();//提交事务kafkaProducer.commitTransaction();} catch (Exception e) {//终止事务kafkaProducer.abortTransaction();} finally {//关闭资源kafkaProducer.close();}}
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
Transactions


http://www.ppmy.cn/news/990189.html

相关文章

绕过TLS/akamai指纹护盾

文章目录 前言TLS指纹什么是TLS指纹测试TLS指纹绕过TLS指纹使用原生urllib使用其他成熟库&#xff01;&#xff01;修改requests底层代码 Akamai指纹相关&#xff08;HTTP/2指纹&#xff09;什么是Akamai指纹测试Akamai指纹绕过Akamai指纹使用其他成熟库 实操参考 前言 有道是…

Linux中的进程和线程的概念、区别和实现的细节

在Linux中&#xff0c;进程和线程是操作系统中两个重要的执行单位。它们是对程序运行时的抽象概念&#xff0c;用于管理和调度计算机资源。 文章目录 进程&#xff1a;线程&#xff1a;进程和线程的区别&#xff1a;实现的细节进程的实现细节&#xff1a;线程的实现细节&#x…

行为型:发布订阅模式

定义   发布订阅模式是基于一个事件&#xff08;主题&#xff09;通道&#xff0c;希望接收通知的对象Subscriber&#xff08;订阅者&#xff09;通过自定义事件订阅主题&#xff0c;被激活事件的对象 Publisher &#xff08;发布者&#xff09;通过发布主题事件的方式通知订…

Qt C++实现Excel表格的公式计算

用Qt的QTableViewQStandardItemModelQStyledItemDelegate实现类似Excel表格的界面&#xff0c;在parser 模块中提供解析表格单元格输入的公式。单元格编辑结束后按回车进行计算和更新显示。 效果如下&#xff1a; 支持的公式计算可以深度嵌套&#xff0c;目前parser模块中仅提…

lucene、solr、es的区别以及应用场景

目录 1. Lucene:2. Solr:3. Elasticsearch: Lucene、Solr 和 Elasticsearch(ES) 都是基于 Lucene 引擎的搜索引擎&#xff0c;它们之间有相似之处&#xff0c;但也有一些不同之处。 Lucene 是一个低级别的搜索引擎库&#xff0c;它提供了一种用于创建和维护全文索引的 API&…

电脑重启后VScode快捷方式失效,找不到Code.exe

问题描述 下班回家关了部分程序就直接关机了&#xff0c;回家后重启电脑发现vscode的快捷方式就失效了&#xff0c;提示Code.exe已被移动或删除。 解决方法 查看你的vscode安装目录&#xff0c;Microsoft VS Code目录下大概率会存在一个名为_的文件夹&#xff0c;然后会发现…

数据结构:树的存储结构

学习树之前&#xff0c;我们已经了解了二叉树的顺序存储和链式存储&#xff0c;哪么我们如何来存储普通型的树结构的数据&#xff1f;如下图1&#xff1a; 如图1所示&#xff0c;这是一颗普通的树&#xff0c;我们要如何来存储呢&#xff1f;通常&#xff0c;存储这种树结构的数…

WPF实战学习笔记29-登录数据绑定,编写登录服务

添加登录绑定字段、命令、方法 修改对象&#xff1a;Mytodo.ViewModels.ViewModels using Mytodo.Service; using Prism.Commands; using Prism.Events; using Prism.Mvvm; using Prism.Services.Dialogs; using System; using System.CodeDom.Compiler; using System.Collec…