SpringBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表

news/2025/1/3 5:07:32/

SpringBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表

 更多优秀文章,请扫码关注个人微信公众号或搜索“程序猿小杨”添加。

一、背景

        因业务发展需要,需要对接kafka,快速批量接收消息日志,避免消息日志累积过多,必须做到数据处理后,动态插入到库表(相同表结构,不同表名)下,并且还要支持批量事务提交,实现消息快速消费。(注意:源码文章最后有获取方式)

二、核心代码

2.1、开启批量、并发消费

kafka:bootstrap-servers: 10.1.*.*:9092     #服务器的ip及端口,可以写多个,服务器之间用“:”间隔producer: #生产者配置 key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: #消费者配置#指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名group-id: myGroup                 #设置消费者的组id default:Groupenable-auto-commit: true  #设置自动提交offsetauto-commit-interval: 2000  #默认值为5000key-deserializer: org.apache.kafka.common.serialization.StringDeserializer#值的反序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializerauto-offset-reset: latestmax-poll-records: 2000  #批量一次最大拉取数据量 默认500listener:# poll-timeout: 1000type: batch  # 开启批量消费concurrency: 3  #指定listener 容器中的线程数,用于提高并发量properties:session:timeout:ms: 120000  #默认10000max:poll:interval:ms: 600000  #默认300000(5分钟)

       说明:type: batch  # 开启批量消费, max-poll-records: 2000,批量消费每次最多消费记录数。这里设置 max-poll-records是2000,并不是说如果没有达到2000条消息,我们就一直等待。而是说一次poll最多返回的记录数为2000。concurrency: 3  #指定listener 容器中的线程数,用于提高并发量。注意:并发量根据实际分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态。例如:设置concurrency为3,也就是将会启动3条线程进行监听,而要监听的topic有5个partition,意味着将有2条线程都是分配到2个partition,还有1条线程分配到1个partition。

2.2、多线程异步配置

    具体配置参加前面文章:SpringBoot使用@Async实现多线程异步

    注意:在启动类上需要加上注解@EnableAsync,开启异步。

2.3、redis相关配置

1、yml相关配置:

spring:redis:# 地址host: 127.0.0.1# 端口,默认为6379port: 6379# 密码# 连接超时时间timeout: 10slettuce:pool:# 连接池中的最小空闲连接min-idle: 0# 连接池中的最大空闲连接max-idle: 8# 连接池的最大数据库连接数max-active: 8# #连接池最大阻塞等待时间(使用负值表示没有限制)max-wait: -1ms

2、RedisConfig配置

package com.wonders.config;import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** 〈自定义redis序列化方式〉* @author yangyalin* @create 2018/11/1* @since 1.0.0*/
@Configuration
public class RedisConfig extends CachingConfigurerSupport {/*** @Author yangyalin* @Description redisTemplate序列化使用的jdkSerializeable, 存储二进制字节码(默认), 所以自定义序列化类* 用于存储可视化内容* @Date 15:07 2018/11/1* @Param [redisConnectionFactory]* @return org.springframework.data.redis.core.RedisTemplate<java.lang.Object,java.lang.Object>**/@Beanpublic RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){RedisTemplate<Object,Object> redisTemplate=new RedisTemplate();redisTemplate.setConnectionFactory(redisConnectionFactory);//使用jackson2JsonRedisSerializer替换默认序列化Jackson2JsonRedisSerializer jackson2JsonRedisSerializer=new Jackson2JsonRedisSerializer(Object.class);ObjectMapper objectMapper=new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(objectMapper);//设置key和value的序列化规则redisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);redisTemplate.setHashKeySerializer(jackson2JsonRedisSerializer);redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);redisTemplate.afterPropertiesSet();return redisTemplate;}
}

2.4、动态表名

    <!--插入到kafka日志临时表中--><insert id="insertMsgInfoTemp" parameterType="com.wonders.entity.KafkaMsgConfig">INSERT INTO ${logTableName}("EVN_LOG_ID", "TABLE_NAME", "OPERATION", "PK_VALUE1", "PK_VALUE2","PK_VALUE3", "PK_VALUE4", "PK_VALUE5", "TRANS_FLAG", "PKS", "BASE_CODE", "PLA_BRANCH_CODE","CREATE_TIME","MSG_PRODUCE_TIME")VALUES (#{id,jdbcType=VARCHAR}, #{tableName,jdbcType=VARCHAR}, #{operation,jdbcType=VARCHAR},#{pk1,jdbcType=VARCHAR}, #{pk2,jdbcType=VARCHAR},#{pk3,jdbcType=VARCHAR},#{pk4,jdbcType=VARCHAR},#{pk5,jdbcType=VARCHAR}, 'Y',#{pks,jdbcType=VARCHAR}, #{baseCode,jdbcType=VARCHAR},#{plaBranchCode,jdbcType=VARCHAR},sysdate,#{msgProduceTime,jdbcType=VARCHAR})</insert>

    说明:1、#{} :会根据参数的类型进行处理,当传入String类型,则会为参数加上双引号(占位符);2、${} :将参数取出不做任何处理,直接放入语句中,就是简单的字符串替换(替换符)。

2.5、sql批量提交

public void batchInsert(List<KafkaMsgInfo> kafkaMsgInfoList) throws Exception{//如果自动提交设置为true,将无法控制提交的条数,改为最后统一提交// 创建session实列SqlSessionFactory sqlSessionFactory = ApplicationContextUtils.getBean("sqlSessionFactory");// 开启批量处理模式 BATCH 、关闭自动提交事务 falseSqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH,false);KafkaMsgConfigMapper KafkaMsgMapper = sqlSession.getMapper(KafkaMsgConfigMapper.class);int BATCH = 1000;for (int i = 0,size=kafkaMsgInfoList.size(); i < size; i++) {//循环插入 + 开启批处理模式KafkaMsgMapper.insertKafkaMsgInfo(kafkaMsgInfoList.get(i));if (i != 0 && i % BATCH == 0) {sqlSession .commit();}}// 一次性提交事务sqlSession.commit();// 关闭资源sqlSession.close();}
2.6、业务代码
 @KafkaListener(topics = {"${mykafka.topics:mytopic}"})public void myMQConsumer(List<String> msgList){log.info("接收到的消息条数size:"+msgList.size());//计算程序耗时时间StopWatch stopWatch = new StopWatch();// 开始计时stopWatch.start();this.getKafkaMsgAndDel(msgList);  //2、接收kafka日志并解析stopWatch.stop();log.info("本次任务耗时(秒):" + stopWatch.getLastTaskTimeMillis()/1000 + "s");}

三、测试结果

序号kafka数量(万条)消耗(秒)
113
21013
3100120

 

更多详细资料,请关注个人微信公众号或搜索“程序猿小杨”添加。

回复:源码,可以获取该项目对应的源码及表结构,开箱即可使用。

推荐文章:

    1、SpringBoot使用@Async实现多线程异步;

    2、SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据;

    3、SpringBoot用线程池ThreadPoolExecutor处理百万级数据。


http://www.ppmy.cn/news/466476.html

相关文章

操作系统的基本概念

目录 一.操作系统入门概述 二.操作系统特征 2.1并发 2.2共享 2.2.1互斥共享 2.2.2同时访问 2.3虚拟 2.4异步 3.操作系统应该提供的功能 3.1操作系统作为计算机系统资源的管理者 3.1.1处理机管理 3.1.2存储器管理 3.1.3文件管理 3.1.4设备管理 3.2操作系统作为用…

如何白嫖CSDN会员?

目录 一、白嫖攻略 二、如何免费领会员 三、插件重点功能介绍 一、白嫖攻略 本次福利活动是CSDN官方助手发起&#xff0c;这是一款专为程序猿设计的浏览器神器&#xff0c;主要通过万能工具和一键搜索等&#xff0c;提升工作效率。现在体验得永久免费去站内广告特权。 本次更…

Python制作动态桌面壁纸程序-摆脱付费-Mili_Wallpaper

目录 使用效果 部分源代码 开源 想必大家在使用动态壁纸程序的时会遇见一下情况。 实在是太难受了吧&#xff01; 这么好看的壁纸&#xff01;我想白嫖&#xff01;怎么办呢&#xff1f;作者也是遇见了和你们一样的情况。于是我去查看了很多资料。 找到了桌面壁纸原理&#x…

10大免费的白嫖网站

作者&#xff1a;pk哥 来源&#xff1a;Python知识圈 本次给大家分享下常用的 10个白嫖网站。 万能命令 https://wanneng.run/cn/在你浏览任意网页时&#xff0c;在网址前面输入这个万能命令 wn.run/ 就会展示出用于该网页的各种附加在线工具&#xff0c;方便快捷&#xff0c;一…

PPT如何在投影不显示放映备注,在电脑上可以显示备注

话不多说&#xff0c;直接切入正题&#xff1a; 1、使用快捷键WindowsP 选择扩展模式 2、打开PPT软件&#xff0c;工具栏窗口选择‘放映’&#xff0c;找到 ‘放映设置’ 3、放映设置勾选这几个选项 4、放映窗口找到 放映到 这个选项 选择 显示器2 【注意选择这个前提是要…

ppt上显示无法显示图片计算机可能,打不开电脑中的ppt文件并提示访问出错的解决方法...

‍ 有一位用户在网上下载了一些ppt文件&#xff0c;但当打开这些ppt文件时&#xff0c;却弹出一个窗口显示内容有问题&#xff0c;可尝试修复此演示文稿。但是当点击修复后又出现了访问出错的提示。对于以上出现的问题&#xff0c;导致无法打开电脑中的ppt文件&#xff0c;我们…

计算机如何打开office,win10电脑没有office软件如何打开ppt文档

‍ ‍  一些用户在网上下载ppt文档&#xff0c;但是自己win10 64位系统电脑并没有安装office办公软件&#xff0c;所以一直打不开ppt&#xff0c;怎么办呢&#xff1f;针对win10电脑没有office软件却打不开ppt文档的问题&#xff0c;小编这里告诉大家具体的解决方法。‍ 解决…

怎样用计算机做ppt,利用电脑如何做PPT 电脑做ppt详细步骤

1、首先&#xff0c;我们打开PPT&#xff0c;新建一些不同版式的界面。至少选择三种是必备的&#xff0c;【标题页】、【内容页】、【致谢页】即空白页。系统模板非常丰富&#xff0c;可以根据需要灵活选用&#xff1a;选择“文件”→“新建”&#xff0c;在打开的任务窗格中可…