【并发编程】线程池多线程异步去分页调用其他服务接口获取海量数据

news/2025/1/9 12:47:19/

文章目录

  • 场景:
  • 解决方案

场景:

前段时间在做一个数据同步工具,其中一个服务的任务是调用A服务的接口,将数据库中指定数据请求过来,交给kafka去判断哪些数据是需要新增,哪些数据是需要修改的。

刚开始的设计思路是,,我创建多个服务同时去请求A服务的接口,每个服务都请求到全量数据,由于这些服务都注册在xxl-job上,而且采用的是分片广播的路由策略,那么每个服务就可以只处理请求到的所有数据中id%服务总数==分片索引的部分数据,然后交给kafka,由kafka决定这条数据应该放到哪个分区上。

解决方案

最近学了线程池后,回过头来思考,认为之前的方案还有很大的优化空间。

  • 1.当数据量很大时,一次性查询所有数据会导致数据库的负载过大,而使用分页查询,每次只查询部分数据,可以减轻数据库的负担,从而提高数据库的性能和响应速度,所以请求数据方每次分页查询少量数据,这样可以整体降低请求数据的时间。
  • 第一次优化.之前是每个服务都要把全量数据请求过来,假设全量数据1000w条,一个服务请求数据需要100s,我开5个服务,那请求数据的总时长就是500s。现在把1000w条数据均分给5个服务,那1个服务就只需要请求200w条数据,耗时20s,那所有服务的请求总时长就是100s。总体耗时缩小了5倍。上面说的分页查询就可以实现:页面大小假设10w(也就是将1000w/10w,逻辑上分成了100页),每个服务自己的分片索引作为页号,每次请求完,都给索引加上分片总数(例如:当前注册了五个服务,那分片总数=5,对于分片索引为1的服务来说,请求的页号为1,6,11,16,21。。。,对于分片索引为2的服务来说,请求的页号为2,7,12,17。。。,对于分片索引为3的服务来说,请求的页号为3,8,13,18,。。。。,对于分片索引为4的服务来说,请求的页号为4,9,14,19。。。。,对于分片索引为5的服务来说,请求的页号为5,10,15,20.。。)这样1000w条数据就均分到每个服务上了。对于每个服务都是单线程去请求数据,就可以将请求操作以及(页号+总服务数)的操作写在一个while循环里,一直请求数据,直到请求的数据为空时(也就是页号超过100了),退出while。
        //单线程情况下while(true){String body = HttpUtil.get(remoteURL+"?pageSize=100000&pageNum="+shardIndex);
//        logger.info("body:{}",body);//2.获取返回结果的messageJSONObject jsonObject = new JSONObject();
//        if (StrUtil.isNotBlank(body)) {jsonObject = JSONUtil.parseObj(body);
//            logger.info("name:{}",Thread.currentThread().getName());
//        }
//        logger.info("jsonObject:{}",jsonObject);//3.从body中获取dataList<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);if(CollectionUtil.isEmpty(tests)){break;}shardIndex+=shardTotal;}
  • 第二次优化: 了解了线程池后,还可以再优化。之前是一个服务单线程循环请求需要20s(假设),每次请求10w条,需要请求200w/10w,也就是20次,那一次请求就需要1s。如果使用线程池的话,那么耗时还会更小,因为当你将任务都交给线程池去执行时,多个线程会同时(并行)去请求各自页的数据,假如你只设置了4个线程,那这4个线程会同时发起请求获取数据,1s会完成4次请求,那分给服务的200w,5s就请求完了。那5个服务从总耗时500s,降到了总耗时5s*5=25s。
    这次优化,第一版代码(只展示了请求数据的代码,其他业务代码没有展示)
    一直向线程池里扔请求数据的任务,当某个任务请求到的数据是空的时候,意味着要请求的数据已经没了,那就结束循环,不再扔请求数据的任务。
    //线程共享变量static volatile boolean flag = true;@XxlJob(value = "fenpian")public void fenpian() {int shardIndex = XxlJobHelper.getShardIndex();
//        int shardTotal = XxlJobHelper.getShardTotal();//分片总数int shardTotal = 4;AtomicInteger pageNum = new AtomicInteger(shardIndex);//多线程情况下
//        List<CompletableFuture>completableFutureList=new ArrayList<>();while (flag){CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {String body = HttpUtil.get(remoteURL + "?pageSize=1000&pageNum=" + pageNum.getAndAdd(shardTotal));JSONObject jsonObject = new JSONObject();jsonObject = JSONUtil.parseObj(body);List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);logger.info("tests的size:{}",tests.size());if(CollectionUtil.isEmpty(tests)){flag=false;}},executorService);completableFutureList.add(future);}CompletableFuture[] completableFutures = completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]);CompletableFuture.allOf(completableFutures).join();logger.info("任务结束");executorService.shutdown();

上面代码会有一个问题,就是while循环往线程池里扔任务,所有线程在执行时,会在请求数据那里”停留“一段时间,“停留期间”还会一直循环向线程池扔任务,当线程执行完某次请求得到空数据结束循环时,等待队列中还排着大堆任务等着去请求数据。

为了解决这个问题,我改用了for循环提交任务,提前根据请求数据总量、每次读取的条数,以及服务总数得到每个服务需要执行的任务数。
第二版代码

@XxlJob(value = "fenpian")public void fenpian() {int shardIndex = XxlJobHelper.getShardIndex()+1;int shardTotal = XxlJobHelper.getShardTotal();//分片总数
//        int shardTotal = 4;AtomicInteger pageNum = new AtomicInteger(shardIndex);//多线程情况下List<CompletableFuture>completableFutureList=new ArrayList<>();//总条数double total = 10000000;//读取的条数double pageSize=1000;double tasks = Math.ceil( total / (double) shardTotal / pageSize);logger.info("任务数{}",tasks);for(double i=0;i<tasks;i++){CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {String url = remoteURL + "?pageSize=1000&pageNum=" + pageNum.getAndAdd(shardTotal);logger.info("url:{},threadName:{}",url,Thread.currentThread().getName());String body = HttpUtil.get(url);JSONObject jsonObject = new JSONObject();jsonObject = JSONUtil.parseObj(body);List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);logger.info("tests的size:{}",tests.size());},executorService);completableFutureList.add(future);}CompletableFuture[] completableFutures = completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]);CompletableFuture.allOf(completableFutures).join();logger.info("任务结束");

如有问题,请求指正(^^ゞ


http://www.ppmy.cn/news/1028922.html

相关文章

LeetCode150道面试经典题-- 有效的字母异位词(简单)

1.题目 给定两个字符串 s 和 t &#xff0c;编写一个函数来判断 t 是否是 s 的字母异位词。 注意&#xff1a;若 s 和 t 中每个字符出现的次数都相同&#xff0c;则称 s 和 t 互为字母异位词。 2.示例 s"adasd" t"daads" 返回true s"addad" t &q…

【java面向对象中static关键字】

提纲 static修饰成员变量static修饰成员变量的应用场景static修饰成员方法static修饰成员方法的应用场景static的注意事项static的应用知识&#xff1a;代码块static的应用知识&#xff1a;单例设计模式 static静态的意思&#xff0c;可以修饰成员变量&#xff0c;成员方法&a…

【数据结构】二叉树篇| 纲领思路02+刷题

博主简介&#xff1a;努力学习的22级计算机科学与技术本科生一枚&#x1f338;博主主页&#xff1a; 是瑶瑶子啦每日一言&#x1f33c;: 所谓自由&#xff0c;不是随心所欲&#xff0c;而是自我主宰。——康德 目录 一、前言二、刷题1、翻转二叉树 2、二叉树的层序遍历✨3、 二…

掌握Python的X篇_38_类的常见特殊方法让对象更好看

python中所有的特殊类方法&#xff0c;都是为了让类或者说对象更加好用。 文章目录 1. 让对象更好看_Python类中的特殊方法 1. 让对象更好看_Python类中的特殊方法 我们之前实现的类&#xff0c;实例化对象&#xff0c;打印的结果可读性不好。 In [3]: class Dog:...: de…

前端杂项-个人总结八股文的背诵方案

个人总结八股文的背诵方案 URL到显示网页的过程 浏览器解析URL&#xff0c;获取协议&#xff0c;主机名&#xff0c;端口号&#xff0c;路径等信息&#xff0c;并通过DNS查询将主机名转换为对应的IP地址浏览器与服务器建立TCP&#xff0c;进行三次握手。浏览器向服务器发送HT…

力扣刷题(C++)知识点

一&#xff0c;找到数组的中间位置 这个是错的&#xff0c;不能分开来 C vector<int>& nums 用法 创建一维数组vector&#xff1a; vector<int> nums; //不指定长度vector<int> nums(n); //指定长度为n c &#xff1c;numeric&#xff1e; accumul…

linux系列基本介绍

虽然我们常说Linux操作系统&#xff0c;这种叫法是不正确的&#xff0c;严格意义上讲&#xff0c;Linux并不是操作系统&#xff0c;而是属于操作系统的一个内核&#xff0c;inux内核提供了操作系统的核心功能&#xff0c;如进程管理、内存管理、文件系统等。 Linux有很多不同的…

电路综合原理与实践---T衰减与PI衰减的详细计算理论与设计仿真

电路综合原理与实践—T衰减与PI衰减的详细计算理论与设计仿真 最近要找工作在刷笔试题目&#xff0c;会刷到关于T衰减的理论计算问题&#xff0c;一直搞不明白怎么算的&#xff0c;搞明白之后给大家伙来分享一下。 基础理论可以参考&#xff1a;电阻衰减网络计算&#xff08;P…