webFlux自定义多kafka监听

news/2025/2/21 8:16:37/

架包

        <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId></dependency><dependency><groupId>io.projectreactor.kafka</groupId><artifactId>reactor-kafka</artifactId><version>1.3.11</version></dependency>

配置项

kafka:enabled:listen: #监听器是否启用root: ${KAFKA_ENABLED_LISTEN_ROOT:false} #kafka是否启用监听的总开关consumer1: ${KAFKA_ENABLED_LISTEN_CONSUMER_1:true} #kafka是否启用监听器1的开关consumer2: ${KAFKA_ENABLED_LISTEN_CONSUMER_2:true} #kafka是否启用监听器2的开关reactive: #{@link com.kittlen.boot.distribute.kafka.config.properties.ReactiveKafkaConsumerProperties}#    配置消费者信息consumer1: #第一个消费者canConsume: ${KAFKA_CONSUMER_1_CAN_CONSUME:true} #是否消费消息bootstrap-servers:  ${KAFKA_C1_CONSUMER_ADDR:192.168.1.52:9092} #kafka服务地址#      bootstrap-servers:  ${KAFKA_CONSUMER_1_CONSUMER_ADDR:127.0.0.1:9092} #kafka服务地址key-serializer: org.apache.kafka.common.serialization.ByteArraySerializervalue-serializer: org.springframework.kafka.support.serializer.JsonSerializerkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: ${KAFKA_CONSUMER_1_CONSUMER_MAX_POLL_RECORDS:100}auto-offset-reset: ${KAFKA_CONSUMER_1_CONSUMER_AUTO_OFFSET_RESET:latest} #earliest 最早未被消费的offset ; latest 消费最新的offsetenable-auto-commit: falsegroup-id: ${KAFKA_CONSUMER_1_CONSUMER_GROUP_ID:mmcd}topics: ${KAFKA_CONSUMER_1_CONSUMER_TOPIC:t1}consumer2: #消费者2canConsume: ${KAFKA_CONSUMER_2_CAN_CONSUME:true} #是否消费消息properties:security:protocol: SASL_PLAINTEXTsasl:mechanism: SCRAM-SHA-256jaas:config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="${KAFKA_CONSUMER_2_CONSUMER_USERNAME:user}" password="${KAFKA_CONSUMER_2_CONSUMER_PASSWORD:pwd}";'bootstrap-servers:  ${KAFKA_CONSUMER_2_CONSUMER_ADDR:192.168.1.55:9092} #kafka服务地址key-serializer: org.apache.kafka.common.serialization.ByteArraySerializervalue-serializer: org.springframework.kafka.support.serializer.JsonSerializerkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: ${KAFKA_CONSUMER_1_CONSUMER_MAX_POLL_RECORDS:100}auto-offset-reset: ${KAFKA_CONSUMER_2_CONSUMER_AUTO_OFFSET_RESET:latest}group-id: ${KAFKA_CONSUMER_2_CONSUMER_GROUP_ID:mmcd}topics: ${KAFKA_CONSUMER_2_CONSUMER_TOPIC:t2}

代码

配置配置项实体

package com.kittlen.boot.distribute.kafka.ext;import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;/*** @author kittlen* @version 1.0*/
@Getter
@Setter
@ApiModel("kafka消费者")
public class KafkaConsumerExt extends KafkaProperties.Consumer {/*** 监听的topic,多个用逗号隔开*/@ApiModelProperty("监听的topic,多个用逗号隔开")private String topics;/*** 是否消费消息*/@ApiModelProperty("是否消费消息")private boolean canConsume = true;
}

package com.kittlen.boot.distribute.kafka.config.properties;import com.kittlen.boot.distribute.kafka.ext.KafkaConsumerExt;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;/*** @author kittlen* @version 1.0*/
@Getter
@Setter
@Component
@ApiModel("kafka消费者配置参数")
@ConfigurationProperties(ReactiveKafkaConsumerProperties.PROPERTIES)
public class ReactiveKafkaConsumerProperties {public static final String PROPERTIES = "kafka.reactive";/*** 消费者1参数*/@ApiModelProperty("消费者1参数")private KafkaConsumerExt consumer1 = new KafkaConsumerExt();/*** 消费者2参数*/@ApiModelProperty("消费者2参数")private KafkaConsumerExt consumer2 = new KafkaConsumerExt();
}

config

package com.kittlen.boot.distribute.kafka.config;import com.kittlen.boot.distribute.kafka.config.properties.ReactiveKafkaConsumerProperties;
import com.kittlen.boot.distribute.kafka.ext.KafkaConsumerExt;
import com.kittlen.boot.distribute.kafka.ext.ReactiveKafkaConsumerTemplateExt;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import reactor.kafka.receiver.ReceiverOptions;import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;/*** @author kittlen* @version 1.0*/
@Slf4j
@Component
@ConditionalOnProperty(value = ReactiveConsumerConfig.CONSUMER_ROOT_ENABLED_PROPERTY, havingValue = "true")
public class ReactiveConsumerConfig {/*** 是否启用kafkaConsumer总开关*/public static final String CONSUMER_ROOT_ENABLED_PROPERTY = "kafka.enabled.listen.root";/*** 是否启用kafkaConsumer1开关*/public static final String CONSUMER_CONSUMER1_ENABLED_PROPERTY = "kafka.enabled.listen.consumer1";/*** 是否启用kafkaConsumer2开关*/public static final String CONSUMER_CONSUMER2_ENABLED_PROPERTY = "kafka.enabled.listen.consumer2";/*** 监听器1** @param reactiveKafkaConsumerProperties* @return*/@Bean@ConditionalOnProperty(value = ReactiveConsumerConfig.CONSUMER_CONSUMER1_ENABLED_PROPERTY, havingValue = "true")public ReactiveKafkaConsumerTemplateExt<String, String> consumer1ReactiveKafkaConsumerTemplate(ReactiveKafkaConsumerProperties reactiveKafkaConsumerProperties) {KafkaConsumerExt consumer1 = reactiveKafkaConsumerProperties.getConsumer1();List<String> topics = Stream.of(consumer1.getTopics().split(",")).collect(Collectors.toList());ReceiverOptions<String, String> objectObjectReceiverOptions = ReceiverOptions.create(consumer1.buildProperties());ReceiverOptions<String, String> subscription = objectObjectReceiverOptions.subscription(topics);ReactiveKafkaConsumerTemplateExt<String, String> stringStringReactiveKafkaConsumerTemplate = new ReactiveKafkaConsumerTemplateExt<>(subscription, consumer1, "consumer1");log.info("loading consumer1 reactive kafka consumer completed");return stringStringReactiveKafkaConsumerTemplate;}/*** 监听器2** @param reactiveKafkaConsumerProperties* @return*/@Bean@ConditionalOnProperty(value = ReactiveConsumerConfig.CONSUMER_CONSUMER2_ENABLED_PROPERTY, havingValue = "true")public ReactiveKafkaConsumerTemplateExt<String, String> consumer2ReactiveKafkaConsumerTemplate(ReactiveKafkaConsumerProperties reactiveKafkaConsumerProperties) {KafkaConsumerExt consumer2= reactiveKafkaConsumerProperties.getConsumer2();List<String> topics = Stream.of(consumer2.getTopics().split(",")).collect(Collectors.toList());ReceiverOptions<String, String> objectObjectReceiverOptions = ReceiverOptions.create(consumer2.buildProperties());ReceiverOptions<String, String> subscription = objectObjectReceiverOptions.subscription(topics);ReactiveKafkaConsumerTemplateExt<String, String> stringStringReactiveKafkaConsumerTemplate = new ReactiveKafkaConsumerTemplateExt<>(subscription, consumer2, "consumer2");log.info("loading consumer2 reactive kafka consumer completed");return stringStringReactiveKafkaConsumerTemplate;}}
package com.kittlen.boot.distribute.kafka.consumer;import com.kittlen.boot.comm.exceptions.BizException;
import com.kittlen.boot.distribute.kafka.config.ReactiveConsumerConfig;
import com.kittlen.boot.distribute.kafka.consumer.handlers.AbstractConsumerHandler;
import com.kittlen.boot.distribute.kafka.ext.ReactiveKafkaConsumerTemplateExt;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;/*** @author kittlen* @version 1.0*/
@Slf4j
@Component
@ConditionalOnBean(ReactiveConsumerConfig.class)
public class KafkaReactiveConsumer {private final Map<String, AbstractConsumerHandler> handlerMap;private final Duration defTimeOut = Duration.ofSeconds(240);public KafkaReactiveConsumer(List<ReactiveKafkaConsumerTemplateExt<String, String>> reactiveKafkaConsumerTemplateExtList, List<AbstractConsumerHandler> consumerHandlers) {handlerMap = consumerHandlers.stream().collect(Collectors.toMap(AbstractConsumerHandler::consumerName, a -> a, (a, b) -> {throw new BizException("consumerName: " + b.consumerName() + " 重复");}));for (ReactiveKafkaConsumerTemplateExt<String, String> stringStringReactiveKafkaConsumerTemplateExt : reactiveKafkaConsumerTemplateExtList) {this.consumerRun(stringStringReactiveKafkaConsumerTemplateExt);}}private void consumerRun(ReactiveKafkaConsumerTemplateExt<String, String> reactiveKafkaConsumerTemplateExt) {if (reactiveKafkaConsumerTemplateExt == null) {return;}log.info("consumer:{}监听services:{},topic:{}", reactiveKafkaConsumerTemplateExt.getTemplateConsumerName(), reactiveKafkaConsumerTemplateExt.getKafkaConsumerExt().getBootstrapServers(), reactiveKafkaConsumerTemplateExt.getKafkaConsumerExt().getTopics());reactiveKafkaConsumerTemplateExt.receiveAutoAck().map(c -> {String templateConsumerName = reactiveKafkaConsumerTemplateExt.getTemplateConsumerName();if (!reactiveKafkaConsumerTemplateExt.isCanConsume()) {log.debug("consumer:{} not consume topic:{} value:{}", templateConsumerName, c.topic(), c.value());return Mono.empty();}log.debug("consumer:{} handler consume topic:{} value:{}", templateConsumerName, c.topic(), c.value());AbstractConsumerHandler abstractConsumerHandler = handlerMap.get(templateConsumerName);try {if (abstractConsumerHandler != null) {return abstractConsumerHandler.handler(c).timeout(defTimeOut, Mono.just(false));} else {log.info("未知consumerHandler:{}", templateConsumerName);return Mono.empty();}} catch (Exception e) {log.error("consumer:{} 处理topic:{}实体信息:{}时出现异常为:{}", templateConsumerName, c.topic(), c.value(), e.getMessage(), e);return Mono.empty();}}).doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage())).subscribe(Mono::subscribe);}}

消息处理

通过实现下列接口来处理消息


package com.kittlen.boot.distribute.kafka.consumer.handlers;import org.apache.kafka.clients.consumer.ConsumerRecord;
import reactor.core.publisher.Mono;/*** @author kittlen* @version 1.0*/
public interface AbstractConsumerHandler {/*** 处理器名* 与监听器的templateConsumerName一直,次处理器就会使用** @return*/String consumerName();/*** 处理监听器消息* 处理对应consumerName监听器监听到的信息** @return*/Mono<Boolean> handler(ConsumerRecord<String, String> consumerRecord);
}

http://www.ppmy.cn/news/1051385.html

相关文章

【高级IO】- 多路转接之 poll | epoll

目录 I/O多路转接 - poll poll 函数 poll 服务器 poll 服务器 poll 的优点 poll 的缺点 I/O 多路转接 - epoll epoll 的相关系统调用 epoll_create 函数 epoll_ctl 函数 epoll_wait 函数 epoll 工作原理 epoll 服务器 ​编辑 epoll 的优点&#xff08;与 sele…

使用贝塞尔曲线算法制作曲线

一阶贝塞尔曲线 使用两个点绘制线段 p3p1(p2-p1)*t p1:起点;p2:终点;t:0-1;p3:线段L12上的点 两个点和t的变化(0-1)可得到一条线段 二阶贝塞尔曲线 使用三个点绘制曲线 p12p1(p2-p1)*t p23p2(p3-p2)*t p123p12(p23-p12)*t p12是线段L12上的点&#xff0c; p23是线段L23上的…

Python爬取斗罗大陆全集

打开网址http://www.luoxu.cc/dmplay/C888H-1-265.html F12打开Fetch/XHR&#xff0c;看到m3u8&#xff0c;ts&#xff0c;一眼顶真&#xff0c;打开index.m3u8 由第一个包含第二个index.m3u8的地址&#xff0c;ctrlf在源代码中一查index&#xff0c;果然有&#xff0c;不过/…

JAVA笔试基础知识-final/static+wait/sleep+tcp/udp

1、final关键字和static关键字的区别 /*** final修饰类&#xff1a;* 使用final修饰类的目的简单明确&#xff0c;表明这个类不能被继承。* 当程序中有永远不会被继承的类时&#xff0c;可以使用final关键字修饰。* 被final修饰的类所有成员方法都将被隐式修饰为final方法。**…

前馈神经网络解密:深入理解人工智能的基石

目录 一、前馈神经网络概述什么是前馈神经网络前馈神经网络的工作原理应用场景及优缺点 二、前馈神经网络的基本结构输入层、隐藏层和输出层激活函数的选择与作用网络权重和偏置 三、前馈神经网络的训练方法损失函数与优化算法反向传播算法详解避免过拟合的策略 四、使用Python…

4.14.媒体协商

那今天呢&#xff1f;我们来看一下是如何进行媒体协商的。开始之前呢&#xff0c;我们再来回顾一下媒体协商的过程。这张图呢&#xff0c;展示的就是媒体协商的过程&#xff0c;那通过这张图&#xff0c;我们可以看到那第一步呢&#xff0c;它首先要调用create offer。创建offe…

竞态条件?如何设计一个抢红包的程序? 说说你的思路

笔者在前两天参加面试的时候被问到了一个场景问题&#xff0c;觉得自己之前准备的确实不妥当&#xff0c;在此抛砖引玉分享思路。 背景了解 首先明确问题&#xff0c;100个人的群组里让你去发一个红包&#xff0c;可以被88个人抢。那么1. 你怎么解决他们争抢的问题。2. 你怎么…

CTFhub-sql注入-绕过空格过滤

常用绕过空格过滤的方法&#xff1a; /**/、()、%0a 1.判断是否存在sqli注入 1 1/**/union/**/select/**/11 1/**/union/**/select/**/12 如果1/**/union/**/select/**/11的显示结果与1/**/union/**/select/**/12的显示结果不一样&#xff0c; 与1的结果一样说明存在注入…