-
添加kafka stream依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId> </dependency> <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
-
application.yml中添加配置
--- #stream config spring:cloud:stream:binders:myKafka1:type: kafkaenvironment:spring:kafka:bootstrap-servers: 127.0.0.1:9092bindings:helloFunc-in-0:destination: hello-topicgroup: hello-local-test-10binder: myKafka1consumer:batch-mode: truehelloFunc-out-0:destination: hello-topicgroup: hello-local-test-10binder: myKafka1consumer:batch-mode: true# 注意 function 节点与stream 同级,而非子节点function:definition: helloFunc;
-
编写消费者:
@Slf4j @Component @RequiredArgsConstructor public class HelloConsumer {@Beanpublic Consumer<Message<List<String>>> helloFunc() {return message -> {log.info("---------------------> ");List<String> list = message.getPayload();boolean result = this.handle(list);if (result) {Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);if (acknowledgment != null) {acknowledgment.acknowledge();}} else {throw new RuntimeException("消费数据出错!");}};}private boolean handle(List<String> list){log.info("list size : {}", list.size());if (!CollectionUtils.isEmpty(list)){log.info("group first message : {}", list.get(0));}return true ;} }