一、添加依赖
<!-- kafka-clients--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.5.1</version></dependency>
二、生产者
自定义分区,可忽略
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;public class MyPatitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String msgStr = value.toString();if(msgStr.contains("a")){return 1;}return 0;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}
1、普通消息
public static void main(String[] args) throws ExecutionException, InterruptedException {//配置Properties properties = new Properties();//连接参数properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");//序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//关联自定义分区器 可选properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");//优化参数 可选//缓冲器大小 32Mproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);//批次大小properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);//Linger.msproperties.put(ProducerConfig.LINGER_MS_CONFIG, 5);//压缩properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");//acksproperties.put(ProducerConfig.ACKS_CONFIG, "-1");//重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 3);//创建生产者KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//异步发送数据for (int i = 0; i < 10; i++) {//给first主题发消息kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i));//回调异步发送kafkaProducer.send(new ProducerRecord<String, String>("first", "hello2" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("主题:" + recordMetadata.topic() + "分区:" + recordMetadata.partition());}}});kafkaProducer.send(new ProducerRecord<String, String>("first", "a" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("主题:" + recordMetadata.topic() + "分区" + recordMetadata.partition() + "a");}}});Thread.sleep(500);}//同步for (int i = 0; i < 10; i++) {//给first主题发消息kafkaProducer.send(new ProducerRecord<String, String>("first", "sync_hello" + i)).get();}//关闭资源kafkaProducer.close();}
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
a0
hello0
hello20
a1
hello1
hello21
a2
hello2
hello22
a3
hello3
hello23
a4
hello4
hello24
a5
hello5
hello25
a6
hello6
hello26
a7
hello7
hello27
a8
hello8
hello28
a9
hello9
hello29
sync_hello0
sync_hello1
sync_hello2
sync_hello3
sync_hello4
sync_hello5
sync_hello6
sync_hello7
sync_hello8
sync_hello9
2、事务消息
public static void main(String[] args) throws ExecutionException, InterruptedException {//配置Properties properties = new Properties();//连接参数properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");//序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//关联自定义分区器 可选properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");//优化参数 可选//缓冲器大小 32Mproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);//批次大小properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);//Linger.msproperties.put(ProducerConfig.LINGER_MS_CONFIG, 5);//压缩properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");//acksproperties.put(ProducerConfig.ACKS_CONFIG, "-1");//重试次数properties.put(ProducerConfig.RETRIES_CONFIG, 3);//指定事务IDproperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactional_id_01");properties.put("enable.idempotence", "true");//创建生产者KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//事务消息 初始化kafkaProducer.initTransactions();//开始事务kafkaProducer.beginTransaction();try {kafkaProducer.send(new ProducerRecord<String, String>("first", "Transactions")).get();//提交事务kafkaProducer.commitTransaction();} catch (Exception e) {//终止事务kafkaProducer.abortTransaction();} finally {//关闭资源kafkaProducer.close();}}
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
Transactions