配置:server.properties
绑定Kafka服务器 生产者配置 生产者发送消息 消费配置 消费者接收消息 消费提交 springboot 集成 Kafka事务
配置:server.properties
#broker.id属性在kafka 集群中必须要是唯一
broker.id=0
#kafka 部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://192.168.65.60:9092
#kafka 的消息存储文件
log.dir=/usr/local/data/kafka ‐logs
#kafka 连接zookeeper的地址
zookeeper.connect=192.168.65.60:2181
绑定Kafka服务器
Properties props = new Properties ( ) ;
props. put ( ProducerConfig . BOOTSTRAP_SERVERS_CONFIG , "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094" ) ;
Producer < String , String > producer = new KafkaProducer < String , String > ( props) ;
KafkaConsumer < String , String > consumer = new KafkaConsumer < String , String > ( props) ;
生产者配置
props. put ( ProducerConfig . ACKS_CONFIG , "1" ) ;
props. put ( ProducerConfig . RETRIES_CONFIG , 3 ) ;
props. put ( ProducerConfig . RETRY_BACKOFF_MS_CONFIG , 300 ) ;
props. put ( ProducerConfig . BUFFER_MEMORY_CONFIG , 33554432 ) ;
props. put ( ProducerConfig . BATCH_SIZE_CONFIG , 16384 ) ;
props. put ( ProducerConfig . LINGER_MS_CONFIG , 10 ) ;
props. put ( ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG , StringSerializer . class . getName ( ) ) ;
props. put ( ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer . class . getName ( ) ) ;
生产者发送消息
var producerRecord = new ProducerRecord < String , String > ( TOPIC_NAME , 0 , key_json, value_json) ;
var producerRecord = new ProducerRecord < String , String > ( TOPIC_NAME , key_json, value_json) ;
RecordMetadata metadata = producer. send ( producerRecord) . get ( ) ;
producer. send ( producerRecord, new Callback ( ) { public void onCompletion ( RecordMetadata metadata, Exception exception) { }
} ) ;
producer. close ( ) ;
消费配置
props. put ( ConsumerConfig . GROUP_ID_CONFIG , CONSUMER_GROUP_NAME ) ;
props. put ( ConsumerConfig . ENABLE_AUTO_COMMIT_CONFIG , "true" ) ;
props. put ( ConsumerConfig . AUTO_COMMIT_INTERVAL_MS_CONFIG , "1000" ) ;
props. put ( ConsumerConfig . AUTO_OFFSET_RESET_CONFIG , "earliest" ) ;
props. put ( ConsumerConfig . HEARTBEAT_INTERVAL_MS_CONFIG , 1000 ) ;
props. put ( ConsumerConfig . SESSION_TIMEOUT_MS_CONFIG , 10 * 1000 ) ;
props. put ( ConsumerConfig . MAX_POLL_RECORDS_CONFIG , 500 ) ;
props. put ( ConsumerConfig . MAX_POLL_INTERVAL_MS_CONFIG , 30 * 1000 ) ;
props. put ( ConsumerConfig . KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer . class . getName ( ) ) ;
props. put ( ConsumerConfig . VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer . class . getName ( ) ) ;
消费者接收消息
消费者接收消息(topic):指定分区;回溯(从头,指定offset);拉取集合
consumer. subscribe ( Arrays . asList ( TOPIC_NAME ) ) ;
consumer. assign ( Arrays . asList ( new TopicPartition ( TOPIC_NAME , 0 ) ) ) ;
consumer. assign ( Arrays . asList ( new TopicPartition ( TOPIC_NAME , 0 ) ) ) ;
consumer. seekToBeginning ( Arrays . asList ( new TopicPartition ( TOPIC_NAME , 0 ) ) ) ;
consumer. assign ( Arrays . asList ( new TopicPartition ( TOPIC_NAME , 0 ) ) ) ;
consumer. seek ( new TopicPartition ( TOPIC_NAME , 0 ) , 10 ) ;
List < PartitionInfo > topicPartitions = consumer. partitionsFor ( TOPIC_NAME ) ;
long fetchDataTime = new Date ( ) . getTime ( ) ‐ 1000 * 60 * 60 ;
Map < TopicPartition , Long > map = new HashMap < > ( ) ;
for ( PartitionInfo par : topicPartitions) { map. put ( new TopicPartition ( topicName, par. partition ( ) ) , fetchDataTime) ;
}
Map < TopicPartition , OffsetAndTimestamp > parMap = consumer. offsetsForTimes ( map) ;
ConsumerRecords < String , String > records = consumer. poll ( Duration . ofMillis ( 1000 ) ) ;
消费提交
consumer. commitSync ( ) ;
consumer. commitAsync ( new OffsetCommitCallback ( ) { @Override public void onComplete ( Map < TopicPartition , OffsetAndMetadata > offsets, Exception ex) { }
} ) ;
springboot 集成
springboot配置application.yml
spring : kafka : bootstrap‐servers : 192.168.65.60: 9092 , 192.168.65.60: 9093 , 192.168.65.60: 9094 producer : retries : 3 batch‐size : 16384 buffer‐memory : 33554432 acks : 1 key‐serializer : org.apache.kafka .common.serialization.StringSerializervalue‐serializer : org.apache.kafka .common.serialization.StringSerializerconsumer : group‐id : default‐groupenable‐auto‐commit : false auto‐offset‐reset : earliestkey‐deserializer : xxx.StringDeserializervalue‐deserializer : xxx.StringDeserializerlistener : ack‐mode : manual_immediate
ack‐mode
ack‐mode RECORD:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交 BATCH:当每一批poll()的数据被消费者监听器处理之后提交 TIME:当每一批poll()的数据被消费者监听器处理之后,距离上次提交时间大于TIME时提交 COUNT:当每一批poll()的数据被消费者监听器处理之后,被处理record数量大于等于COUNT时提交 TIME | COUNT:有一个条件满足时提交 MANUAL:当每一批poll()的数据被消费者监听器处理之后, 手动调用Acknowledgment.acknowledge()后提交 MANUAL_IMMEDIATE:手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种(一次提交一条消息)
生产者 & 消费者
@Autowired
private KafkaTemplate < String , String > kafka Template;
kafka Template. send ( TOPIC_NAME , 0 , "key" , "this is a msg" ) ;
@KafkaListener ( topics = "my‐replicated‐topic" , groupId = "zhugeGroup" )
public void listenZhugeGroup ( ConsumerRecord < String , String > record, Acknowledgment ack) { String value = record. value ( ) ; ack. acknowledge ( ) ;
}
@KafkaListener ( topics = "my‐replicated‐topic" , groupId = "tulingGroup" )
@KafkaListener ( groupId = "testGroup" , topicPartitions = { @TopicPartition ( topic = "topic1" , partitions = { "0" , "1" } ) , @TopicPartition ( topic = "topic2" , partitions = "0" , partitionOffsets = @PartitionOffset ( partition = "1" , initialOffset = "100" ) )
} , concurrency = "6" )
Kafka事务
Properties props = new Properties ( ) ;
props. put ( "bootstrap.servers" , "localhost:9092" ) ;
props. put ( "transactional.id" , "my‐transactional‐id" ) ;
Producer < String , String > producer = new KafkaProducer < > ( props, new StringSerializer ( ) , new StringSerializer ( ) ) ;
producer. initTransactions ( ) ;
try { producer. beginTransaction ( ) ; producer. send ( ) ; producer. commitTransaction ( ) ;
} catch ( ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { producer. close ( ) ;
} catch ( KafkaException e) { producer. abortTransaction ( ) ;
}
producer. close ( ) ;