kafka_6">1、SpringBoot集成kafka接收消息
生产者
package com.power.producer;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;public void sendEvent(){kafkaTemplate.send("helloTopic","hello kafka");}}
application.yml配置文件
spring:application:#应用名称name: spring-boot-02-kafka-base#kafka连接地址(ip+port)kafka:bootstrap-servers: 47.116.35.15:9092#配置生产者(有24个配置)
# producer:#配置消费者(有24个配置)
# consumer:
测试类
package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot02KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendInterceptor(){eventProducer.sendEvent();}}
2、@Payload注解接收消息体内容
消费者:
package com.power.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;@Component
public class EventConsumer {//采用监听的方式接收事件(消息,数据)@KafkaListener(topics = {"helloTopic"},groupId="helloGroup")public void onEvent(@Payload String event){System.out.println("读取/消费到的事件:"+event);}
}
测试结果:
3、@Header注解接收消息头内容
注意,不太版本kafak使用@Header注解读取partition时不一样:
- kafka3.0以下版本使用KafkaHeaders.RECEIVED_PARTITION_ID获取分区
- kafka3.0以上版本使用KafkaHeaders.RECEIVED_PARTITION获取分区
消费者:
package com.power.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;@Component
public class EventConsumer {//采用监听的方式接收事件(消息,数据)@KafkaListener(topics = {"helloTopic"},groupId="helloGroup")public void onEvent(@Payload String event,@Header(value=KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(value=KafkaHeaders.RECEIVED_PARTITION_ID) String partition){System.out.println("读取/消费到的事件:"+event+",topic:"+topic+",partition:"+partition);}
}
测试结果:
4、接收消息所有内容
消费者:
package com.power.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.util.function.Consumer;@Component
public class EventConsumer {//采用监听的方式接收事件(消息,数据)@KafkaListener(topics = {"helloTopic"},groupId="helloGroup")public void onEvent(@Payload String event,@Header(value=KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(value=KafkaHeaders.RECEIVED_PARTITION_ID) String partition,ConsumerRecord<String,String> record){System.out.println("读取/消费到的事件:"+event+",topic:"+topic+",partition:"+partition);System.out.println("读取/消费到的事件:"+record.toString());}
}
测试打印所有消息: