如果不是有要求或者kafka生产者没有消费者群组,就不要用assign方式订阅,还是用subscribe订阅主题,我是被生产者坑了,开始给我说没有消费者群组,所有我只能用assign订阅指定分区,后来才给我说有消费者群组。
import com.alibaba.fastjson2.JSON; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component;import java.util.*;@Component public class KafkaConsumerAssign implementsCommandLineRunner {@Value("${ss.pubTopic}")private String pubTopic = "topic";@Value("${ss.kafkaAddress}")private String kafkaAddress = "xx.xx.xxx.xx:8093,xx.xxx.xxx.xx:8093,xx.xxx.xxx.xx:8093";public void autoCommit() {ConsumerDict consumerDict = new ConsumerDict();Properties properties = new Properties();// 指定key与value的反序列化器properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("enable.auto.commit", false);//手动提交提交properties.put("bootstrap.servers", kafkaAddress);//kafka连接地址//消费者群组,如果没有群组的话可以写通,若果有消费者组不写会,后面提交偏移量的时候会报错properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");//消费者组 // properties.put("max.poll.records",50);//单次最大记录数 // properties.put("session.timeout.ms","50000");//消费者连接的超时时间properties.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='用户名' password='密码';"); properties.put("security.protocol", "SASL_SSL");//安全协议properties.put("sasl.mechanism", "SCRAM-SHA-256");//加密方式//指定truststore文件properties.put("ssl.truststore.location", "D:/xxx/xx/xxx/xxxxxx.jks");//truststore文件密码properties.put("ssl.truststore.password", "aaaaaa");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);ArrayList<TopicPartition> topicPartitions = new ArrayList<>();//使用partitionsFor获取该topic下所有的分区List<PartitionInfo> partitionInfos = consumer.partitionsFor(pubTopic);for (PartitionInfo partitionInfo : partitionInfos) {topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));}//使用assign方式订阅kafkaconsumer.assign(topicPartitions);operationKafkaMessage(consumer);}//启动程序后自动启动此kafka客户端@Overridepublic void run(String... args) {new KafkaConsumerAssign().autoCommit();}private void operationKafkaMessage(KafkaConsumer<String, String> consumer) {while (true) {ConsumerRecords<String, String> records = consumer.poll(100);//100ms 自动获取一次数据,消费者主动发起请求//循环所有的分区for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);//获取每个分区中的所有数据for (ConsumerRecord<String, String> record : partitionRecords) {System.out.println(record.offset() + ": " + record.value()); }//当前的消费到的位置long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();//处理完每个分区中的消息后,提交偏移量。consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));}}}public static void main(String[] args) {//new KafkaConsumerAssign().autoCommit();} }