Spring Boot分段处理List集合多线程批量插入数据

server/2024/11/15 0:30:16/

项目场景:

大数据量的List集合,需要把List集合中的数据批量插入数据库中。


解决方案:

拆分list集合后,然后使用多线程批量插入数据库

1.实体类

java">package com.test.entity;import lombok.Data;@Data
public class TestEntity {private String id;private String name;
}

2.Mapper

如果数据量不大,用foreach标签就足够了。如果数据量很大,建议使用batch模式。

java">package com.test.mapper;import java.util.List;import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;import com.test.entity.TestEntity;public interface TestMapper {/*** 1.用于使用batch模式,ExecutorType.BATCH开启批处理模式* 数据量很大,推荐这种方式*/@Insert("insert into test(id, name) "+ " values"+ " (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR})")void testInsert(TestEntity testEntity);/*** 2.使用foreach标签,批量保存* 数据量少可以使用这种方式*/@Insert("insert into test(id, name) "+ " values"+ " <foreach collection='list' item='item' index='index' separator=','>"+ " (#{item.id,jdbcType=VARCHAR}, #{item.name,jdbcType=VARCHAR})"+ " </foreach>")void testBatchInsert(@Param("list") List<TestEntity> list);
}

3.spring容器注入线程池bean对象

java">package com.test.config;import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;@Configuration
@EnableAsync
public class ExecutorConfig {/*** 异步任务自定义线程池*/@Bean(name = "asyncServiceExecutor")public Executor asyncServiceExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//配置核心线程数executor.setCorePoolSize(50);//配置最大线程数executor.setMaxPoolSize(500);//配置队列大小executor.setQueueCapacity(300);//配置线程池中的线程的名称前缀executor.setThreadNamePrefix("testExecutor-");// rejection-policy:当pool已经达到max size的时候,如何处理新任务// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//调用shutdown()方法时等待所有的任务完成后再关闭executor.setWaitForTasksToCompleteOnShutdown(true);//等待所有任务完成后的最大等待时间executor.setAwaitTerminationSeconds(60);return executor;}
}

4.创建异步线程业务类

java">package com.test.service;import java.util.List;
import java.util.concurrent.CountDownLatch;import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;import com.test.entity.TestEntity;
import com.test.mapper.TestMapper;@Service
public class AsyncService {@Autowiredprivate SqlSessionFactory sqlSessionFactory;@Async("asyncServiceExecutor")public void executeAsync(List<String> logOutputResults, CountDownLatch countDownLatch) {try{//获取session,打开批处理,因为是多线程,所以每个线程都要开启一个事务SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH);TestMapper mapper = session.getMapper(TestMapper.class);//异步线程要做的事情for (int i = 0; i < logOutputResults.size(); i++) {System.out.println(Thread.currentThread().getName() + "线程:" + logOutputResults.get(i));TestEntity test = new TestEntity();//test.set()//.............//批量保存mapper.testInsert(test);//每1000条提交一次防止内存溢出if(i%1000==0){session.flushStatements();}}//提交剩下未处理的事务session.flushStatements();}finally {countDownLatch.countDown();// 很关键, 无论上面程序是否异常必须执行countDown,否则await无法释放}}
}

5.拆分list调用异步的业务方法

java">package com.test.service;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;import javax.annotation.Resource;import org.springframework.stereotype.Service;@Service
public class TestService {@Resourceprivate AsyncService asyncService;public int testMultiThread() {List<String> logOutputResults = getTestData();//按线程数拆分后的listList<List<String>> lists = splitList(logOutputResults);CountDownLatch countDownLatch = new CountDownLatch(lists.size());for (List<String> listSub:lists) {asyncService.executeAsync(listSub, countDownLatch);}try {countDownLatch.await(); //保证之前的所有的线程都执行完成,才会走下面的;// 这样就可以在下面拿到所有线程执行完的集合结果} catch (Exception e) {e.printStackTrace();}return logOutputResults.size();}public List<String> getTestData() {List<String> logOutputResults = new ArrayList<String>();for (int i = 0; i < 3000; i++) {logOutputResults.add("测试数据"+i);}return logOutputResults;}public List<List<String>> splitList(List<String> logOutputResults) {List<List<String>> results = new ArrayList<List<String>>();/*动态线程数方式*/// 每500条数据开启一条线程int threadSize = 500;// 总数据条数int dataSize = logOutputResults.size();// 线程数,动态生成int threadNum = dataSize / threadSize + 1;/*固定线程数方式// 线程数int threadNum = 6;// 总数据条数int dataSize = logOutputResults.size();// 每一条线程处理多少条数据int threadSize = dataSize / (threadNum - 1);*/// 定义标记,过滤threadNum为整数boolean special = dataSize % threadSize == 0;List<String> cutList = null;// 确定每条线程的数据for (int i = 0; i < threadNum; i++) {if (i == threadNum - 1) {if (special) {break;}cutList = logOutputResults.subList(threadSize * i, dataSize);} else {cutList = logOutputResults.subList(threadSize * i, threadSize * (i + 1));}results.add(cutList);}return results;}
}

5.Controller测试

java">@RestController
public class TestController {@Resourceprivate TestService testService;@RequestMapping(value = "/log", method = RequestMethod.GET)@ApiOperation(value = "测试")public String test() {testService.testMultiThread();return "success";}
}

总结:

注意这里执行插入的数据是无序的。

Java多线程分段处理List集合_java 每个list分配一个线程-CSDN博客 


http://www.ppmy.cn/server/10326.html

相关文章

Java 网络编程之TCP(一):基于BIO

环境&#xff1a; jdk 17 IntelliJ IDEA 2023.1.1 (Ultimate Edition) Windows 10 专业版 22H2 TCP&#xff1a;面向连接的&#xff0c;可靠的数据传送协议 Java中的TCP网络编程&#xff0c;其实就是基于常用的BIO和NIO来实现的&#xff0c;本文先讨论BIO&#xff1b; BIO…

java知识点---内部类与外部类

Java 中的内部类&#xff08;Inner Class&#xff09;是指定义在另一个类&#xff08;称为外部类或宿主类&#xff09;内部的类。内部类与外部类之间存在着特殊的关系和访问权限&#xff0c;这种设计允许更紧密的封装和更灵活的代码组织。下面详细介绍内部类的种类、特点、与外…

【GD32】_时钟架构及系统时钟频率配置

文章目录 一、有关时钟源二、系统时钟架构三、时钟树分析四、修改参数步骤1、设置外部晶振2、选择外部时钟源。3、 设置系统主频率大小4、修改PLL分频倍频系数 学习系统时钟架构和时钟树&#xff0c;验证及学习笔记如下&#xff0c;如有错误&#xff0c;欢迎指正。主要记录了总…

ElasticSearch实战之项目搜索高亮

文章目录 1. 前情配置2、数据操作2.1 操作API2.2 数据入库 3. 高亮搜索3.1 方法封装3.2 高亮搜索 1. 前情配置 为满足ElasticSearch可在项目中实现搜索高亮&#xff0c;我们需要先做一些前情配置 导入ElasticSearch依赖 <dependency><groupId>org.springframewor…

计算机网络-IS-IS链路状态数据库同步

在建立IS-IS邻接关系之后&#xff0c;路由器开始发送LSP报文进行链路状态数据库进行同步。 一、链路状态数据库同步 LSP&#xff08; Link State PDU&#xff0c;链路状态报文&#xff09; 用于交换链路状态信息。LSP分为两种&#xff1a;Level–1 LSP和Level–2 LSP。Level–1…

OpenCV轻松入门(九)——使用第三方库imgaug自定义数据增强器

安装命令&#xff1a;pip install imgaug 代码实现&#xff1a; import cv2 import random import matplotlib.pyplot as pltfrom imgaug import augmenters as iaa # 数据增强——缩放效果 def zoom_img(img):# 获取一个1-1.3倍的线性图像处理器&#xff0c;scale参数是缩放范…

在数字化转型过程中,企业的资产管理需要做出哪些调整?

在数字化转型过程中&#xff0c;企业的资产管理做出调整的常见于以下几个方面&#xff1a; 1、提高工作效率&#xff1a;数字化转型能够让员工在部门与部门之间的沟通更加顺畅&#xff0c;节省时间&#xff0c;提高效率。这要求企业在资产管理中采用数字化工具和流程&#xff…

Okhttp 403 Forbidden

android App 在使用okhttp下载全国中小企业股份转让系统的pdf文件时候,下载完成后使用MuPDF无法解析,提示文件损坏或者不是PDF文件,查看Okhttp的下载请求,发现报403 Forbidden错误: {protocol=http/1.1, code=403, message=Forbidden, url=https://www.neeq.com