kafka是一个分布式流媒体平台,类似于消息队列或企业消息传递系统
案例一:生产者--消费者
1.导入依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>
2.生产者发送消息
/*** 生产者*/public class ProducerQuickStart {public static void main(String[] args) throws ExecutionException,InterruptedException {//1.kafka链接配置信息Properties prop = new Properties();//kafka链接地址prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//key和value的序列化prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//2.创建kafka生产者对象KafkaProducer<String,String> producer = new KafkaProducer<String,String>(prop);//3.发送消息/*** 第一个参数 :topic* 第二个参数:消息的key* 第三个参数:消息的value*/ProducerRecord<String,String> kvProducerRecord = newProducerRecord<String,String>("topic-first","key-001","hello kafka");//同步发送消息(3)消费者接收消息RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();System.out.println(recordMetadata.offset());//4.关闭消息通道 必须要关闭,否则消息发送不成功producer.close();}}
3.消费者接收消息
/*** 消费者*/public class ConsumerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties prop = new Properties();//链接地址prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//key和value的反序列化器prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//设置消费者组//组一样,则属于生产者--消费者(一对一)prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");//组不一样,则属于生产者--消费者(一对多)//prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");//2.创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(prop);//3.订阅主题consumer.subscribe(Collections.singletonList("topic-first"));//4.拉取消息while (true) {ConsumerRecords<String, String> consumerRecords =consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}
总结
-
生产者发送消息,多个消费者订阅同一个主题(多个消费者都是一个组)只能有一个消费者收到消息 (一对一)
-
生产者发送消息,多个消费者订阅同一个主题(多个消费者不是一个组)所有消费者都能收到消息 (一对多)