流处理 CompletableFuture

news/2025/2/3 14:02:00/

专栏系列文章地址:https://blog.csdn.net/qq_26437925/article/details/145290162

本文目标:

  1. 掌握:流处理 & CompletableFuture的使用

目录

    • 直接例子看同步和异步处理的区别
    • 普通流处理进行批量查询
      • 批量查询1:并行流操作
      • 批量查询2:使用`CompletableFuture`: 组合式异步编程
      • 更对的数据进行对比两种批处理
      • CompletableFuture 使用线程池定制
      • 使用`流`还是`CompletableFutures`?
    • CompletableFuture
      • 原理
      • 和future对比
      • CompletableFuture 组合使用例子
      • 异常处理

直接例子看同步和异步处理的区别

java">import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;class Shop {/*** 模拟1秒中延迟的方法*/public static void delay() {try {Thread.sleep(1000L);} catch (InterruptedException e) {throw new RuntimeException(e);}}private double calculatePrice(String product) {delay();Random random = new Random();return random.nextDouble() * product.charAt(0) + product.charAt(1);}private double calculatePriceException(String product) {delay();throw new RuntimeException(product + " not available");}/*** 同步API** @param product* @return*/public double getPrice(String product) {return calculatePrice(product);}/*** 异步API** @param product* @return*/public Future<Double> getPriceAsync(String product) {CompletableFuture<Double> futurePrice = new CompletableFuture<>();new Thread(() -> {double price = calculatePrice(product);futurePrice.complete(price);}).start();return futurePrice;}public void doSomethingElse() {System.out.println("doSomethingElse ......");}
}public class Main {public static void main(String[] args) {double price = 0;Shop shop = new Shop();// 同步获取:查询商店,试图取得商品的价格long start = System.nanoTime();price = shop.getPrice("apple");System.out.printf("Price is %.2f%n", price);long duration = (System.nanoTime() - start) / 1_000_000;System.out.println("sync get price returned after " + duration + " msecs");System.out.println("======================");start = System.nanoTime();// 异步任务:查询商店,试图取得商品的价格Future<Double> futurePrice = shop.getPriceAsync("apple");long invocationTime = ((System.nanoTime() - start) / 1_000_000);System.out.println("future Invocation returned after " + invocationTime + " msecs");// 执行更多任务,比如查询其他商店shop.doSomethingElse();try {// 通过该对象可以在将来的某个时刻取得的价格// 执行了这个操作后,要么获得Future中封装的值(如果异步任务已经完成),// 要么发生阻塞,直到该异步任务完成,期望的值能够访问。price = futurePrice.get();System.out.printf("get Price is %.2f%n", price);} catch (Exception e) {throw new RuntimeException(e);}long retrievalTime = ((System.nanoTime() - start) / 1_000_000);System.out.println("Price returned after " + retrievalTime + " msecs");}
}

异步可以让代码免受阻塞之苦;可以查询之后,后续获取,而期间可以干别的事情
在这里插入图片描述

普通流处理进行批量查询

如下查询一批商品的价格

java">import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.stream.Collectors;class Shop {private String name;public Shop(String name) {this.name = name;}public String getName() {return name;}/*** 模拟1秒中延迟的方法*/public static void delay() {try {Thread.sleep(1000L);} catch (InterruptedException e) {throw new RuntimeException(e);}}private double calculatePrice(String product) {delay();Random random = new Random();return random.nextDouble() * product.charAt(0) + product.charAt(1);}/*** 同步API** @param product* @return*/public double getPrice(String product) {return calculatePrice(product);}/*** 异步API** @param product* @return*/public Future<Double> getPriceAsync(String product) {CompletableFuture<Double> futurePrice = new CompletableFuture<>();new Thread(() -> {double price = calculatePrice(product);futurePrice.complete(price);}).start();return futurePrice;}public void doSomethingElse() {System.out.println("doSomethingElse ......");}
}public class Main {List<Shop> shops = Arrays.asList(new Shop("BestPrice"),new Shop("LetsSaveBig"),new Shop("MyFavoriteShop"),new Shop("BuyItAll"));public List<String> findPrices(String product) {return shops.stream().map(shop ->String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))).collect(Collectors.toList());}public static void main(String[] args) {Main mainTest = new Main();long start = System.nanoTime();List<String> shopPriceList = mainTest.findPrices("apple");shopPriceList.forEach( shopPrice -> System.out.println(shopPrice) );long duration = ((System.nanoTime() - start) / 1_000_000);System.out.println("Done in " + duration + " msecs");}
}

输出如下:耗时4秒多,因为一个商品查询就要1秒
在这里插入图片描述

批量查询1:并行流操作

java"> public List<String> parallelFindPrices(String product) {return shops.parallelStream().map(shop -> String.format("%s price is %.2f",shop.getName(), shop.getPrice(product))).collect(Collectors.toList());}public static void main(String[] args) {Main mainTest = new Main();long start = System.nanoTime();List<String> shopPriceList = mainTest.parallelFindPrices("apple");shopPriceList.forEach( shopPrice -> System.out.println(shopPrice) );long duration = ((System.nanoTime() - start) / 1_000_000);System.out.println("Done in " + duration + " msecs");}

输出如下:1秒多处理完成

批量查询2:使用CompletableFuture: 组合式异步编程

java">  private List<CompletableFuture<String>> parallelFindPricesAnyc(String product) {return shops.stream().map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(), shop.getPrice(product)))).collect(Collectors.toList());}public List<String> bestFindPrices(String product) {// 异步方式计算么个商店商品价格List<CompletableFuture<String>> priceFutures = parallelFindPricesAnyc(product);// 等待所有异步操作结束return priceFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());}public static void main(String[] args) {Main mainTest = new Main();long start = System.nanoTime();List<String> shopPriceList = mainTest.bestFindPrices("apple");shopPriceList.forEach( shopPrice -> System.out.println(shopPrice) );long duration = ((System.nanoTime() - start) / 1_000_000);System.out.println("Done in " + duration + " msecs");}

输出如下:比流处理似乎慢点,但是肯定比同步处理快
在这里插入图片描述

更对的数据进行对比两种批处理

当有9个商品的时候,对比如下,看着2者差不多了
在这里插入图片描述

原因:

二者内部采用的是同样的通用线程池,默认都使用固定数目的线程,具体线程数取决于Runtime. getRuntime().availableProcessors()的返回值(即CPU核心数)。

然而,CompletableFuture具有一定的优势,因为它允许你对执行器(Executor)进行配置,尤其是线程池的大小,让它以更适合应用需求的方式进行配置,满足程序的要求,而这是并行流API无法提供的。让我们看看怎样利用这种配置上的灵活性带来实际应用程序性能上的提升。

CompletableFuture 使用线程池定制

java"> private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setDaemon(true);return t;}});public List<CompletableFuture<String>> parallelFindPricesAsync(String product) {// supplyAsync工􏱣方法 指定线程池return shops.stream().map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(), shop.getPrice(product)), executor)).collect(Collectors.toList());}public List<String> bestFindPricesAsync(String product) {// 异步方式计算么个商店商品价格List<CompletableFuture<String>> priceFutures = parallelFindPricesAsync(product);// 等待所有异步操作结束return priceFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());}public static void main(String[] args) {Main mainTest = new Main();long start = System.nanoTime();List<String> shopPriceList = mainTest.bestFindPricesAsync("apple");shopPriceList.forEach( shopPrice -> System.out.println(shopPrice) );long duration = ((System.nanoTime() - start) / 1_000_000);System.out.println("Done in " + duration + " msecs");}

同样处理9个商品现在是1秒多了,比之前默认的设置快多了
在这里插入图片描述

反问:创建一个配有线程池的执行器,线程池中线程的数目取决于你预计你的应用需要处理的负荷,但是你该如何选择合适的线程数目呢?

《Java 并发编程实战》: 如果线程池中的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。反之,如果线程的数目过少,正如你的应用所面临的情况,处理器的一些核可能就无法充分利用。Brian Goetz建议,线程池大小与处理器的利用率之比可以使用下面的公式进行估算:

线程池数目 = CPU核心数 * 期望的CPU利用率(介于0和1之间) *(1 + 等待时间和计算时间的比率)

不过实际也需要进行压测和系统观察。

使用还是CompletableFutures?

目前为止,对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操作展开工作,要么枚举出集合中的每一个元素,创建新的线程,在CompletableFuture内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待I/O而发生阻塞。

建议是:

对于计算密集型操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也是最高的(如果所有的线程都是计算密集型的),那就没有必要创建比处理器和数更多的线程。

如果并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture的灵活性更好。不使用并行流另一个原因是:处理流的流水线中如果发生I/O等待,流的延迟特效会让我们很难判断到底什么时候触发了等待。

CompletableFuture

原理

在这里插入图片描述

forkjoinpool可参考阅读:https://blog.csdn.net/qq_26437925/article/details/145417518

和future对比

在这里插入图片描述

CompletableFuture 组合使用例子

java">import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import java.util.stream.Stream;class Shop {private String name;public Shop(String name) {this.name = name;}public String getName() {return name;}/*** 模拟1秒中延迟的方法*/public static void delay() {try {Thread.sleep(1000L);} catch (InterruptedException e) {throw new RuntimeException(e);}}private double calculatePrice(String productName) {delay();Random random = new Random();return random.nextDouble() * productName.charAt(0) + productName.charAt(1);}public String getPrice() {Random random = new Random();double price = calculatePrice(this.getName());Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];return String.format("%s:%.2f:%s", name, price, code);}
}/*** 折扣服务api*/
class Discount {public enum Code {NONE(0), SILVER(0), GOLD(10), PLATINUM(15), DIAMOND(20);private final int percentage;Code(int percentage) {this.percentage = percentage;}}public static String applyDiscount(Quote quote) {return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());}private static double apply(double price, Code code) {delay();return price * (100 - code.percentage) / 100;}/*** 模拟计算,查询数据库等耗时*/public static void delay() {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}
}/*** 商店返回消息实体,不可变对象模式 线程安全*/
final class Quote {private final String shopName;private final double price;private final Discount.Code discountCode;public Quote(String shopName, double price, Discount.Code discountCode) {this.shopName = shopName;this.price = price;this.discountCode = discountCode;}public static Quote parse(String s) {String[] split = s.split(":");String shopName = split[0];double price = Double.parseDouble(split[1]);Discount.Code discountCode = Discount.Code.valueOf(split[2]);return new Quote(shopName, price, discountCode);}public String getShopName() {return shopName;}public double getPrice() {return price;}public Discount.Code getDiscountCode() {return discountCode;}
}public class Main {List<Shop> shops = Arrays.asList(new Shop("BestPrice"),new Shop("LetsSaveBig"),new Shop("MyFavoriteShop"),new Shop("BuyItAll"),new Shop("five"),new Shop("six"),new Shop("seven"),new Shop("eight"),new Shop("nine"));private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setDaemon(true);return t;}});// 同步处理public List<String> findprices(String product) {// 1. 取出商品的原始价格 -- 耗时1秒多// 2. 在Quote对象中对shop返回对字符串进行转换// 3. 联系Discount服务,为每个Quote申请折扣 -- 耗时1秒多return shops.stream().map(shop -> shop.getPrice()).map(Quote::parse).map(Discount::applyDiscount).collect(Collectors.toList());}// 异步组合处理public Stream<CompletableFuture<String>> findPricesStream() {return shops.stream()// 异步方式取得每个shop中指定产品的原始价格.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(), executor))//  在Quote对象中对shop返回对字符串进行转换.map(future -> future.thenApply(Quote::parse))// 另一个异步任务构建期望的Future,申请折扣 thenCompose 将多个future组合 一个一个执行.map(future -> future.thenCompose(quote ->CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));}public static void main(String[] args) {Main mainTest = new Main();long start = System.nanoTime();CompletableFuture[] futures = mainTest.findPricesStream().map(f -> f.thenAccept(s -> System.out.println(s + " (done in " +((System.nanoTime() - start) / 1_000_000) + " msecs)"))).toArray(size -> new CompletableFuture[size]);CompletableFuture.allOf(futures).join();System.out.println("All shops have now responded in "+ ((System.nanoTime() - start) / 1_000_000) + " msecs");}}

异常处理

CompletableFuture提供了非阻塞的方式来处理计算结果,无论是计算成功还是遇到异常。CompletableFuture 提供了多种方法来处理异常,包括使用 exceptionally、handle、whenComplete 等方法。

java"> static void testException1(){CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {// 模拟异常抛出throw new RuntimeException("Something went wrong");});completableFuture.exceptionally(ex -> {System.err.println("Exception caught: " + ex.getMessage());return "Default result"; // 返回一个默认结果});}static void testException2() {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {// 模拟异常抛出throw new RuntimeException("Something went wrong");});completableFuture.handle((result, ex) -> {if (ex != null) {System.err.println("Exception caught: " + ex.getMessage());return "Exception handled"; // 返回异常处理结果}else {return result; // 返回正常结果}});}static void testException3() {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {// 模拟异常抛出throw new RuntimeException("Something went wrong");});completableFuture.whenComplete((result, ex) -> {if (ex != null) {System.err.println("Exception caught: " + ex.getMessage());}else {System.out.println("Result: " + result); // 仅当没有异常时执行}});}public static void main(String[] args) {testException1();testException2();testException3();}

http://www.ppmy.cn/news/1568969.html

相关文章

Xposed-Hook

配置 Xposed 模块的 AndroidManifest.xml&#xff1a; <?xml version"1.0" encoding"utf-8"?> <manifest xmlns:android"http://schemas.android.com/apk/res/android"package"your.package.name"><applicationandr…

我的世界(Minecraft)计算器python源码

我的世界(Minecraft)计算器python源码 1.介绍 使用教程 博客&#xff1a;【Python】python实现我的世界(Minecraft)计算器视频&#xff1a;Python实现我的世界(Minecraft)计算器(附源码与教程) 2.源码 文件一 # CreateBigScreen.py (创建大屏幕并返回大屏幕坐标)from m…

javaweb实训:购物商城系统项目

包括各类需求文档&#xff0c;任务计划&#xff0c;ppt&#xff0c;项目源代码&#xff0c;数据库文件&#xff0c;包括网站前后台&#xff01;唯一缺憾是面向初学者的&#xff0c;没怎么用框架。购物商城系统项目 文件列表 112购物商城系统项目/(1)需求说明书/112购物商城系统…

高清种子资源获取指南 | ✈️@seedlinkbot

在如今的数字时代&#xff0c;高清影视、音乐、游戏等资源的获取方式不断丰富。对于追求高质量资源的用户而言&#xff0c;一个高效的资源分享平台至关重要。而 ✈️seedlinkbot 正是这样一个便捷的资源获取工具&#xff0c;为用户提供高质量的种子资源索引和下载信息。 1. ✈️…

单细胞分析基础-第一节 数据质控、降维聚类

scRNA_pipeline\1.Seurat 生物技能树 可进官网查询 添加链接描述 分析流程 准备:R包安装 options("repos"="https://mirrors.ustc.edu.cn/CRAN/") if(!require("BiocManager")) install.packages("BiocManager",update = F,ask =…

TCP/IP 协议:互联网通信的基石

TCP/IP 协议:互联网通信的基石 引言 TCP/IP协议,全称为传输控制协议/互联网协议,是互联网上应用最为广泛的通信协议。它定义了数据如何在网络上传输,是构建现代互联网的基础。本文将深入探讨TCP/IP协议的原理、结构、应用以及其在互联网通信中的重要性。 TCP/IP 协议概述…

DRM系列六:Drm之KMS

KMS&#xff08;Kernel Mode Setting&#xff09;是负责显示输出的核心组件&#xff0c;它处理与plane、crtc、encoder和connector相关的各项任务。简单来说&#xff0c;KMS就是结构体drm_mode_config、drm_mode_object和组件&#xff08;object&#xff09;的结合。 KMSdrm_m…

高阶C语言|深入理解字符串函数和内存函数

文章目录 前言1.求字符串长度1.1 字符串长度函数&#xff1a;strlen模拟实现 2.长度不受限制的字符串函数2.1 字符串拷贝函数&#xff1a;strcpy模拟实现 2.2 字符串连接函数&#xff1a;strcat模拟实现 2.3 字符串比较函数&#xff1a;strcmp模拟实现 3.长度受限制的字符串函数…