SpringBoot集成kafka

devtools/2024/11/20 7:38:28/

kafka_0">SpringBoot集成kafka

kafka_2">集成kafka

pom引入

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version>
</dependency>

kafka_14">配置kafka

spring:kafka:bootstrap-servers: 127.0.0.1:9092producer:# 发生错误后,消息重发的次数。retries: 0#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: 1consumer:# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: latest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 设置默认消费者分组idgroup-id: devlistener:# 在侦听器容器中运行的线程数。concurrency: 5#listner负责ack,每调用一次,就立即commitack-mode: manual_immediatemissing-topics-fatal: false

kafka_58">kafka生产者示例

@Component
@Slf4j
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;// 发送kafka消息public void send(String topic, Object data) {if (StringUtils.isEmpty(topic)) {log.error("Kafka topic不能为空");throw new IllegalArgumentException("Kafka topic不能为空");} else if (ObjectUtil.isEmpty(data)) {log.error("Kafka消息不能为空");throw new IllegalArgumentException("Kafka消息不能为空");} else {// 将数据转换为JSON字符串String jsonStr = JSONUtil.toJsonStr(data);// 向kafka发送消息ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, jsonStr);future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {//发送失败的处理log.info(topic + " - 生产者 发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> stringObjectSendResult) {//成功的处理log.info(topic + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());}});}}
}

kafka_99">kafka消费者示例

@Component
@Slf4j
public class KafkaConsumer {@KafkaListener(topics = "testTopic")public void topicMessage(String message){log.info("Received Message in topic: {}", message);// 处理接收到的消息}@KafkaListener(topics = "testTopic2")public void topicMessage2(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){try {Optional message = Optional.ofNullable(record.value());if (message.isPresent()) {log.info("Received Message in topic: {}", message.get());}// 处理接收到的消息} catch (Exception e) {log.error("Error processing message", e);} finally {ack.acknowledge();}}}

kafka_131">kafka配置批量并发消费

配置批量消费

spring:kafka:consumer:# 一次 poll 最多返回的记录数max-poll-records: 3100 max-partition-fetch-bytes: 15728640                 #设置拉取数据的大小,15Mlistener:type: batch # 开启批量消费

配置并发消费

为了加快消费,我们可以提高并发数,比如下面配置我们将并发设置为 3。注意:并发量根据实际分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态

spring:kafka:consumer:# 一次 poll 最多返回的记录数max-poll-records: 3100 max-partition-fetch-bytes: 15728640                 #设置拉取数据的大小,15Mlistener:type: batch # 开启批量消费concurrency: 3 # 设置并发数

批量消费示例

@KafkaListener(topics = "testTopic")
public void batchListener(List<ConsumerRecord<?,?>> records, Acknowledgment ack){try {records.forEach(record -> {// TODO 业务代码});} catch (Exception e) {log.error("Kafka监听异常"+e.getMessage(),e);} finally {ack.acknowledge();//手动提交偏移量}
}

除了通过配置文件配置外,还可以通过自定义配置类的方式实现批量消费,但是相对yml配置来说还是有点麻烦的:

/*** 消费者配置*/
@Configuration
public class KafkaConsumerConfig {/*** 消费者配置* @return*/public Map<String,Object> consumerConfigs(){Map<String,Object> props = new HashMap<>();props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> batchFactory() {ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));//并发数量factory.setConcurrency(3);//开启批量监听factory.setBatchListener(true);return factory;}
}

同时监听器通过@KafkaListener注解的containerFactory 配置指定批量消费的工厂即可,如下:

@KafkaListener(topics = {"testTopic"},containerFactory = "batchFactory")
public void batchListener(List<ConsumerRecord<?,?>> records, Acknowledgment ack){try {records.forEach(record -> {// TODO 业务代码});} catch (Exception e) {log.error("Kafka监听异常"+e.getMessage(),e);} finally {ack.acknowledge();//手动提交偏移量}
}

http://www.ppmy.cn/devtools/135418.html

相关文章

数据库基础知识

什么是数据库, 数据库管理系统, 数据库系统, 数据库管理员? 数据库 : 数据库(DataBase 简称 DB)就是信息的集合或者说数据库是由数据库管理系统管理的数据的集合。数据库管理系统 : 数据库管理系统(Database Management System 简称 DBMS)是一种操纵和管理数据库的大型软件&a…

前端框架主要做些什么工作

前端框架在Web开发中扮演着至关重要的角色&#xff0c;它们主要做以下几方面的工作&#xff1a; 一、简化和加速开发过程 前端框架通过提供预定义的组件、模块和代码库&#xff0c;使开发人员能够快速创建网站和应用&#xff0c;而无需从零开始编写大量的代码。这些框架通常包…

力扣2298. 周末任务计数

一、来源 2298、周末任务计数 表: Tasks ------------------- | Column Name | Type | ------------------- | task_id | int | | assignee_id | int | | submit_date | date | ------------------- task_id 是该表的主键&#xff08;具有唯一值的列&#xff09;。 此…

Flink Lookup Join(维表 Join)

Lookup Join 定义&#xff08;支持 Batch\Streaming&#xff09; Lookup Join 其实就是维表 Join&#xff0c;比如拿离线数仓来说&#xff0c;常常会有用户画像&#xff0c;设备画像等数据&#xff0c;而对应到实时数仓场景中&#xff0c;这种实时获取外部缓存的 Join 就叫做维…

跨平台WPF框架Avalonia教程 十一

控件类型 如果您想创建自己的控件&#xff0c;Avalonia中有三个主要的控件类型。首先要做的是选择最适合您使用场景的控件类型。 用户控件(User Controls)​ UserControl是创建控件的最简单方法。这种类型的控件最适合特定于应用程序的“视图”或“页面”。UserControl的创建…

数造科技亮相第26届高交会并接受媒体采访,以数据智能赋能未来

11 月 14 日至 16 日&#xff0c;第二十六届中国国际高新技术成果交易会&#xff08;简称“高交会”&#xff09;在深圳成功举办。本届大会以“科技引领发展&#xff0c;产业融合聚变”为主题&#xff0c;汇聚了全球最新的科技成果&#xff0c;打造了一场科技界的盛大聚会。 在…

webpack配置

4-3vue-loader测试_哔哩哔哩_bilibili 一.新建文件夹vue_todo&#xff0c;vscode打开 二.ctrl打开终端&#xff0c;输入npm init -y&#xff0c;快速生成一个默认的package.json文件 之后左边出现项目初始化文件package.json 三.接下来需要webpack完成打包&#xff0c;所以安装…

学习笔记024——Ubuntu 安装 Redis遇到相关问题

目录 1、更新APT存储库缓存&#xff1a; 2、apt安装Redis&#xff1a; 3、如何查看检查 Redis版本&#xff1a; 4、配置文件相关设置&#xff1a; 5、重启服务&#xff0c;配置生效&#xff1a; 6、查看服务状态&#xff1a; 1、更新APT存储库缓存&#xff1a; sudo apt…