一、配置文件
application.properties配置文件如下
#kafka多数据源配置
#kafka数据源一,日志审计推送
spring.kafka.one.bootstrap-servers=172.19.12.109:32182
spring.kafka.one.producer.retries=0
spring.kafka.one.producer.properties.max.block.ms=5000
#kafka数据源二,动环数据消费
spring.kafka.two.bootstrap-servers=172.19.12.109:32182
spring.kafka.two.producer.retries=0
spring.kafka.two.producer.properties.max.block.ms=5000
spring.kafka.two.consumer.group-id=bw-convert-data
spring.kafka.two.consumer.enable-auto-commit=true
二、pom依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
三、生产者、消费者配置
1.第一个kakfa
package com.gstanzer.convert.config;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;@EnableKafka
@Configuration
public class KafkaOneConfig {@Value("${spring.kafka.one.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.one.producer.retries}")private String retries;@Value("${spring.kafka.one.producer.properties.max.block.ms}")private String maxBlockMs;@Beanpublic KafkaTemplate<String, String> kafkaOneTemplate() {return new KafkaTemplate<>(producerFactory());}private ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}private Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}
}
2.第二个kakfa
package com.gstanzer.convert.config;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaTwoConfig {@Value("${spring.kafka.two.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.two.producer.retries}")private String retries;@Value("${spring.kafka.two.producer.properties.max.block.ms}")private String maxBlockMs;@Value("${spring.kafka.two.consumer.group-id}")private String groupId;@Value("${spring.kafka.two.consumer.enable-auto-commit}")private boolean enableAutoCommit;@Beanpublic KafkaTemplate<String, String> kafkaTwoTemplate() {return new KafkaTemplate<>(producerFactory());}@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}private ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}public ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}private Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}private Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}}
四.生产者
@Controller
public class TestController {@Autowiredprivate KafkaTemplate kafkaOneTemplate;@Autowiredprivate KafkaTemplate kafkaTwoTemplate;@RequestMapping("/send")@ResponseBodypublic String send() {final String TOPIC = "TOPIC_1";kafkaOneTemplate.send(TOPIC, "kafka one");kafkaTwoTemplate.send(TOPIC, "kafka two");return "success";}
}
五.消费者
@Component
public class KafkaConsumer {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);final String TOPIC = "TOPIC_1";// containerFactory 的值要与配置中 KafkaListenerContainerFactory 的 Bean 名相同@KafkaListener(topics = {TOPIC}, containerFactory = "kafkaOneContainerFactory")public void listenerOne(ConsumerRecord<?, ?> record) {LOGGER.info(" kafka one 接收到消息:{}", record.value());}@KafkaListener(topics = {TOPIC}, containerFactory = "kafkaTwoContainerFactory")public void listenerTwo(ConsumerRecord<?, ?> record) {LOGGER.info(" kafka two 接收到消息:{}", record.value());}
}
备注:
生产者消费者代码参考链接,开发同学需要以实际情况按要求自己变更下代码即可:
Spring Boot 集成多个 Kafka_springboot集成多个kafka_//承续缘_纪录片的博客-CSDN博客