Spring Boot配置多个Kafka数据源

news/2025/3/13 8:35:11/

一、配置文件

application.properties配置文件如下

#kafka多数据源配置
#kafka数据源一,日志审计推送
spring.kafka.one.bootstrap-servers=172.19.12.109:32182
spring.kafka.one.producer.retries=0
spring.kafka.one.producer.properties.max.block.ms=5000
#kafka数据源二,动环数据消费
spring.kafka.two.bootstrap-servers=172.19.12.109:32182
spring.kafka.two.producer.retries=0
spring.kafka.two.producer.properties.max.block.ms=5000
spring.kafka.two.consumer.group-id=bw-convert-data
spring.kafka.two.consumer.enable-auto-commit=true

二、pom依赖

		<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

三、生产者、消费者配置

1.第一个kakfa

package com.gstanzer.convert.config;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;@EnableKafka
@Configuration
public class KafkaOneConfig {@Value("${spring.kafka.one.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.one.producer.retries}")private String retries;@Value("${spring.kafka.one.producer.properties.max.block.ms}")private String maxBlockMs;@Beanpublic KafkaTemplate<String, String> kafkaOneTemplate() {return new KafkaTemplate<>(producerFactory());}private ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}private Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}
}

2.第二个kakfa

package com.gstanzer.convert.config;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaTwoConfig {@Value("${spring.kafka.two.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.two.producer.retries}")private String retries;@Value("${spring.kafka.two.producer.properties.max.block.ms}")private String maxBlockMs;@Value("${spring.kafka.two.consumer.group-id}")private String groupId;@Value("${spring.kafka.two.consumer.enable-auto-commit}")private boolean enableAutoCommit;@Beanpublic KafkaTemplate<String, String> kafkaTwoTemplate() {return new KafkaTemplate<>(producerFactory());}@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}private ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}public ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}private Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}}

四.生产者

@Controller
public class TestController {@Autowiredprivate KafkaTemplate kafkaOneTemplate;@Autowiredprivate KafkaTemplate kafkaTwoTemplate;@RequestMapping("/send")@ResponseBodypublic String send() {final String TOPIC = "TOPIC_1";kafkaOneTemplate.send(TOPIC, "kafka one");kafkaTwoTemplate.send(TOPIC, "kafka two");return "success";}
}

五.消费者

@Component
public class KafkaConsumer {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);final String TOPIC = "TOPIC_1";// containerFactory 的值要与配置中 KafkaListenerContainerFactory 的 Bean 名相同@KafkaListener(topics = {TOPIC}, containerFactory = "kafkaOneContainerFactory")public void listenerOne(ConsumerRecord<?, ?> record) {LOGGER.info(" kafka one 接收到消息:{}", record.value());}@KafkaListener(topics = {TOPIC}, containerFactory = "kafkaTwoContainerFactory")public void listenerTwo(ConsumerRecord<?, ?> record) {LOGGER.info(" kafka two 接收到消息:{}", record.value());}
}

备注:

生产者消费者代码参考链接,开发同学需要以实际情况按要求自己变更下代码即可:

Spring Boot 集成多个 Kafka_springboot集成多个kafka_//承续缘_纪录片的博客-CSDN博客


http://www.ppmy.cn/news/1167602.html

相关文章

再谈配置maven镜像

之前写过两篇 2016年写的 maven的settings.xml配置maven的settings.xml配置-CSDN博客 2018年写的 配置同济的maven镜像 配置同济的maven镜像_maven gsls200808-CSDN博客 为什么要再谈&#xff0c;因为又出幺蛾子了。spring初始化工具会默认生成带Maven Wrapper的项目结构。让…

微信小程序开发之自定义组件(会议OA项目其他页面搭建)

目录 前言 一、WeChat中的自定义组件 1. 基本概述 2. 包含文件及作用 3. 自定义组件的作用 4.使用步骤&#xff1a; 二、tabs组件及会议管理布局 tabs组件 1. 创建组件 准备 创建 使用组件 会议管理布局 tabs.wxml指定组件模版 tabs.wxss完成样式设计 tabs.js定义属…

ELK之LogStash插件grok和geoip的配置使用

本文针对LogStash常用插件grok和geoip的使用进行说明&#xff1a; 一、使用grok输出结构化数据 编辑 first-pipeline.conf 文件&#xff0c;修改为如下内容&#xff1a; input{#stdin{type > stdin}file {# 读取文件的路径path > ["/tmp/access.log"]start_…

微服务负载均衡实践

概述 本文介绍微服务的服务调用和负载均衡&#xff0c;使用spring cloud的loadbalancer及openfeign两种技术来实现。 本文的操作是在微服务的初步使用的基础上进行。 环境说明 jdk1.8 maven3.6.3 mysql8 spring cloud2021.0.8 spring boot2.7.12 idea2022 步骤 改造Eu…

@Cleanup() 使用注意事项

前端时间用lombok 的Cleanup() 想实现线程池的自动关闭&#xff0c;因为使用不当&#xff0c;查bug查了好久&#xff0c;因此写篇博客纪念下&#xff0c;同时也希望读者可以跳过这个坑。 Cleanup修饰的对象&#xff0c;可以在对象资源使用结束后&#xff0c;自动关闭。 1、错…

如何解决香港服务器使用的常见问题

​  站长们在选择香港服务器租用时会考虑到它的各种性能以及稳定性&#xff0c;这是必须的。但是使用过程中还有些问题也不容忽视&#xff0c;比如&#xff1a;带宽资源是否短缺&#xff0c;是否存在安全漏洞&#xff0c;连接是否正常等这些问题也要考虑到。 香港服务器使用中…

PageHelper基础知识

使用场景 便用mybatis&#xff0c;可以用 pagehelper 分页 。 maven依赖 <dependency><groupId>com.github.pagehelper</groupId><artifactId>pagehelper</artifactId><version>4.1.6</version> </dependency>PageHelper配…

安达发|人工智能在APS高级计划与排程中的应用

随着人工智能&#xff08;AI&#xff09;技术的发展&#xff0c;其在生产计划与排程&#xff08;APS&#xff09;领域的应用也日益广泛。APS是一种复杂的系统工程&#xff0c;它需要处理大量的数据&#xff0c;包括需求预测、资源优化、路径规划等。AI技术的应用可以帮助企业更…