Reactor 第十篇 定制一个生产的WebClient

news/2024/12/21 22:31:20/

1 为什么要用 WebClient

刚开始尝试使用 Spring WebFlux 的时候,很多人都会使用 Mono.fromFuture() 将异步请求转成 Mono 对象,或者 Mono.fromSupplier() 将请求转成 MOno 对象,这两种方式在响应式编程中都是不建议的,都会阻塞当前线程。

1.1 Mono.fromFuture() VS WebClient

Mono.fromFuture()方法和使用 WebClient 调用第三方接口之间存在以下区别:

  • 异步 vs. 非阻塞

Mono.fromFuture()方法适用于接收一个 java.util.concurrent.Future 对象,并将其转换为响应式的 Mono。这是一个阻塞操作,因为它会等待 Future 对象完成。而使用 WebClient 调用第三方接口是异步和非阻塞的,它不会直接阻塞应用程序的执行,而是使用事件驱动的方式处理响应。

可扩展性和灵活性:使用 WebClient 可以更灵活地进行配置和处理,例如设置超时时间、请求头、重试机制等。WebClient 还可以与许多其他 Spring WebFlux 组件集成,如 WebSockets、Server-Sent Events 等。而 Mono.fromFuture() 是适用于单个 Future 对象转化为 Mono 的情况,可扩展性较差。

  • 错误处理

WebClient 提供了更丰富的错误处理机制,可以通过 onStatus、onError 等方法来处理不同的 HTTP 状态码或异常。同时,WebClient 还提供了更灵活的重试和回退策略。Mono.fromFuture() 方法只能将 Future 对象的结果包装在 Mono 中,不提供特定的错误处理机制。

  • 阻塞操作

Mono.fromFuture() 会阻塞。当调用 Mono.fromFuture() 方法将 Future 转换为 Mono 时,它会等待 Future 对象的结果返回。在这个等待的过程中,Mono.fromFuture()方法会阻塞当前的线程。这意味着,如果 Future 的结果在运行过程中没有返回,则当前线程会一直阻塞,直到 Future 对象返回结果或者超时。因此,在使用 Mono.fromFuture() 时需要注意潜在的阻塞风险。另外,需要确保F uture 的任务在后台线程中执行,以免阻塞应用程序的主线程。

1.2 Mono.fromFuture VS Mono.fromSupplier

Mono.fromSupplier() 和 Mono.fromFuture() 都是用于将异步执行的操作转换为响应式的 Mono 对象,但它们的区别在于:

Mono.fromSupplier() 适用于一个提供者/生产者,可以用来表示某个操作的结果,该操作是一些纯计算并且没有阻塞的方法。也就是说,Mono.fromSupplier() 将其参数 (Supplier) 所提供的操作异步执行,并将其结果打包成一个 Mono 对象。

Mono.fromFuture() 适用于一个 java.util.concurrent.Future 对象,将其封装成 Mono 对象。这意味着调用 Mono.fromFuture() 方法将阻塞当前线程,直到异步操作完成返回一个 Future 对象。

因此,Mono.fromSupplier() 与 Mono.fromFuture() 的主要区别在于:

Mono.fromSupplier() 是一个非阻塞的操作,不会阻塞当前线程。这个方法用于执行计算型的任务,返回一个封装了计算结果的 Mono 对象。
Mono.fromFuture() 是阻塞操作,会阻塞当前线程,直到异步操作完毕并返回看,它适用于处理 java.util.concurrent.Future 对象。

需要注意的是,如果 Supplier 提供的操作是阻塞的,则 Mono.fromSupplier() 方法本身也会阻塞线程。但通常情况下,Supplier 提供的操作是纯计算型的,不会阻塞线程。

因此,可以使用 Mono.fromSupplier() 方法将一个纯计算型的操作转换为 Mono 对象,而将一个异步返回结果的操作转换为 Mono 对象时,可以使用 Mono.fromFuture() 方法。

2 定制化自己的 WebClient

2.1 初始化 WebClient

WebClient 支持建造者模式,使用 WebClient 建造者模式支持开发自己的个性化 WebClient,比如支持设置接口调用统一耗时、自定义底层 Http 客户端、调用链路、打印接口返回日志、监控接口耗时等等。

WebClient builder 支持以下方法

interface Builder {/*** 配置请求基础的url,如:baseUrl = "https://abc.go.com/v1";和 uriBuilderFactory 冲突,如果有 uriBuilderFactory ,则忽略 baseUrl*/Builder baseUrl(String baseUrl);/*** URI 请求的默认变量。也和 uriBuilderFactory 冲突,如果有 uriBuilderFactory ,则忽略 defaultUriVariables*/Builder defaultUriVariables(Map<String, ?> defaultUriVariables);/*** 提供一个预配置的UriBuilderFactory实例*/Builder uriBuilderFactory(UriBuilderFactory uriBuilderFactory);/*** 默认 header*/Builder defaultHeader(String header, String... values);/*** 默认cookie*/Builder defaultCookie(String cookie, String... values);/*** 提供一个 consumer 来定制每个请求*/Builder defaultRequest(Consumer<RequestHeadersSpec<?>> defaultRequest);/*** 添加一个filter,可以添加多个*/Builder filter(ExchangeFilterFunction filter);/*** 配置要使用的 ClientHttpConnector。这对于插入或自定义底层HTTP 客户端库(例如SSL)的选项非常有用。*/Builder clientConnector(ClientHttpConnector connector);/*** Configure the codecs for the {@code WebClient} in the* {@link #exchangeStrategies(ExchangeStrategies) underlying}* {@code ExchangeStrategies}.* @param configurer the configurer to apply* @since 5.1.13*/Builder codecs(Consumer<ClientCodecConfigurer> configurer);/*** 提供一个预先配置了ClientHttpConnector和ExchangeStrategies的ExchangeFunction。
这是对 clientConnector 的一种替代,并且有效地覆盖了它们。*/Builder exchangeFunction(ExchangeFunction exchangeFunction);/*** Builder the {@link WebClient} instance.*/WebClient build();// 其他方法}

2.2 日志打印及监控

  • 打印参数、url、返回
  • 参数和返回需要转成json
  • 需要打印正常返回日志和异常
  • 正常监控、异常监控、总监控以及响应时间
.doOnSuccess(response-> {log.info("get.success, url={}, response={}, param={}", url, response);
})
.doOnError(error-> {log.info("get.error, url={}", url, error);// 监控
})
.doFinally(res-> {//监控
})

2.3 返回处理

retrieve() // 声明如何提取响应。例如,提取一个ResponseEntity的状态,头部和身体:

.bodyToMono(clazz) 将返回body内容转成clazz对象,clazz 对象可以自己指定类型。如果碰到有问题的无法转化的,也可以先转成String,然后自己实现一个工具类,将String转成 class 对象。

2.3.1 get

public <T> Mono<T> get(String url, Class<T> clazz, T defaultClass) {
long start = System.currentTimeMillis();
return webClient.get().uri(url).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(clazz).doOnSuccess(response-> {log.info("get.success, url={}, response={}, param={}", url, response);}).doOnError(error-> {log.info("get.param.error, url={}", url, error);}).onErrorReturn(defaultClass).doFinally(res-> {}).publishOn(customScheduler);
}

2.3.2 get param 请求

public <T> Mono<T> getParam(String url, MultiValueMap<String, String> param, Class<T> clazz, T defaultClass) {
long start = System.currentTimeMillis();
URI uri = UriComponentsBuilder.fromUriString(url).queryParams(param).build().toUri();return webClient.get().uri(uri).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(clazz).doOnSuccess(response-> {log.info("get.param.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(param));}).doOnError(error-> {log.error("get.param.error, url={}, param={}", url, JsonUtil.toJson(param), error);}).onErrorReturn(defaultClass).doFinally(res-> {// 监控 or 打印日志 or 耗时}).publishOn(customScheduler);
}

2.3.3 post json 请求

public <T> Mono<T> postJson(String url, final HttpParameter4Json parameter, Class<T> clazz, T defaultClass) {
final long start = System.currentTimeMillis();
return webClient.post().uri(url).contentType(MediaType.APPLICATION_JSON).cookies(cookies -> cookies.setAll(parameter.getCookies())).body(Mono.just(parameter.getJsonBody()), ParameterizedTypeReference.forType(parameter.getBodyType())).headers(headers -> headers.setAll(parameter.getHeaders())).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(clazz).doOnSuccess(response-> {log.info("post.json.success, url={}, response={}, param={}", url, response, parameter.getJsonBody());}).doOnError(error-> {log.error("get.param.error, url={}, param={}", url, parameter.getJsonBody(), error);}).onErrorReturn(defaultClass).doFinally(res-> {}).publishOn(customScheduler);}

2.3.4 post form Data 请求

public <T> Mono<T> postFormData(String url, HttpParameter parameter, Class<T> clazz, T defaultClass) {final long start = System.currentTimeMillis();return webClient.post().uri(url).contentType(MediaType.APPLICATION_FORM_URLENCODED).cookies(cookies -> cookies.setAll(parameter.getCookies())).body(BodyInserters.fromFormData(parameter.getMultiValueMapParam())).headers(headers -> headers.setAll(parameter.getMapHeaders())).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(clazz).doOnSuccess(response-> {log.info("post.fromData.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(parameter));}).doOnError(error-> {log.info("get.param.error, url={}, param={}", url, JsonUtil.toJson(parameter), error);}).onErrorReturn(defaultClass).doFinally(res-> {}).publishOn(customScheduler);
}

2.4 异常处理

2.4.1 异常返回兜底

onErrorReturn 发现异常返回兜底数据

2.4.2 异常处理

状态码转成异常抛出

.onStatus(HttpStatus::isError, response -> Mono.error(new RuntimeException("Request failed with status code: " + response.statusCode())))

监控异常

.doOnError(error -> {// log and monitor
})

3 完整的 WebClient


package com.geniu.reactor.webclient;import com.geniu.utils.JsonUtil;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.SslProvider;
import reactor.netty.tcp.TcpClient;import java.net.URI;
import java.time.Duration;
import java.util.function.Function;/*** @Author: prepared* @Date: 2023/8/15 11:05*/
@Slf4j
public class CustomerWebClient {public static final CustomerWebClient instance = new CustomerWebClient();/*** 限制并发数 100*/Scheduler customScheduler = Schedulers.newParallel("CustomScheduler", 100);private final WebClient webClient;private CustomerWebClient() {final SslContextBuilder sslBuilder = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE);final SslProvider ssl = SslProvider.builder().sslContext(sslBuilder).defaultConfiguration(SslProvider.DefaultConfigurationType.TCP).build();final int cpuCores = Runtime.getRuntime().availableProcessors();final int selectorCount = Math.max(cpuCores / 2, 4);final int workerCount = Math.max(cpuCores * 2, 8);final LoopResources pool = LoopResources.create("HCofSWC", selectorCount, workerCount, true);final Function<? super TcpClient, ? extends TcpClient> tcpMapper = tcp -> tcp.option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000).option(ChannelOption.SO_TIMEOUT, 10000).secure(ssl).runOn(pool);ConnectionProvider.Builder httpClientOfSWC = ConnectionProvider.builder("HttpClientOfSWC").maxConnections(100_000).pendingAcquireTimeout(Duration.ofSeconds(6));final ConnectionProvider connectionProvider = httpClientOfSWC.build();final HttpClient hc = HttpClient.create(connectionProvider).tcpConfiguration(tcpMapper);final Function<HttpClient, HttpClient> hcMapper = rhc -> rhc.compress(true);final WebClient.Builder wcb = WebClient.builder().clientConnector(new ReactorClientHttpConnector(hcMapper.apply(hc)));
//				.filter(new TraceRequestFilter()); 可以通过Filter 增加trace追踪this.webClient = wcb.build();}public <T> Mono<T> get(String url, Class<T> clazz, T defaultClass) {long start = System.currentTimeMillis();return webClient.get().uri(url).accept(MediaType.APPLICATION_JSON).retrieve().onStatus(HttpStatus::isError, response -> Mono.error(new RuntimeException("Request failed with status code: " + response.statusCode()))).bodyToMono(clazz).doOnSuccess(response-> {log.info("get.success, url={}, response={}, param={}", url, response);}).doOnError(error-> {log.info("get.param.error, url={}", url, error);}).onErrorReturn(defaultClass).doFinally(res-> {}).publishOn(customScheduler);}public <T> Mono<T> getParam(String url, MultiValueMap<String, String> param, Class<T> clazz, T defaultClass) {long start = System.currentTimeMillis();URI uri = UriComponentsBuilder.fromUriString(url).queryParams(param).build().toUri();return webClient.get().uri(uri).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(clazz).doOnSuccess(response-> {log.info("get.param.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(param));}).doOnError(error-> {log.error("get.param.error, url={}, param={}", url, JsonUtil.toJson(param), error);}).onErrorReturn(defaultClass).doFinally(res-> {}).publishOn(customScheduler);}public <T> Mono<T> postJson(String url, final HttpParameter4Json parameter, Class<T> clazz, T defaultClass) {final long start = System.currentTimeMillis();return webClient.post().uri(url).contentType(MediaType.APPLICATION_JSON).cookies(cookies -> cookies.setAll(parameter.getCookies())).body(Mono.just(parameter.getJsonBody()), ParameterizedTypeReference.forType(parameter.getBodyType())).headers(headers -> headers.setAll(parameter.getHeaders())).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(clazz).doOnSuccess(response-> {log.info("post.json.success, url={}, response={}, param={}", url, response, parameter.getJsonBody());}).doOnError(error-> {log.error("get.param.error, url={}, param={}", url, parameter.getJsonBody(), error);}).onErrorReturn(defaultClass).doFinally(res-> {}).publishOn(customScheduler);}public <T> Mono<T> postFormData(String url, HttpParameter parameter, Class<T> clazz, T defaultClass) {final long start = System.currentTimeMillis();return webClient.post().uri(url).contentType(MediaType.APPLICATION_FORM_URLENCODED).cookies(cookies -> cookies.setAll(parameter.getCookies())).body(BodyInserters.fromFormData(parameter.getMultiValueMapParam())).headers(headers -> headers.setAll(parameter.getMapHeaders())).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(clazz).doOnSuccess(response-> {log.info("post.fromData.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(parameter));}).doOnError(error-> {log.info("get.param.error, url={}, param={}", url, JsonUtil.toJson(parameter), error);}).onErrorReturn(defaultClass).doFinally(res-> {}).publishOn(customScheduler);}}

http://www.ppmy.cn/news/1050663.html

相关文章

使用vscode编写插件-php语言

https://blog.csdn.net/qq_45701130/article/details/125206645 一、环境搭建 1、安装 Visual Studio Code 2、安装 Node.js 3、安装 Git 4、安装生产插件代码的工具&#xff1a;npm install -g yo generator-code 二、创建工程 yo code 选择项解释&#xff1a; 选择编写扩…

linux下c控制光标

最近在写进度条的需求&#xff0c;需要控制光标&#xff0c;用到下面的相关函数 // 清除屏幕#define CLEAR() printf("\033[2J")// 上移光标#define MOVEUP(x) printf("\033[%dA", (x))// 下移光标#define MOVEDOWN(x) printf("\033[%dB", (x))…

分类预测 | MATLAB实现NGO-DBN北方苍鹰优化深度置信网络多特征输入分类预测

分类预测 | MATLAB实现NGO-DBN北方苍鹰优化深度置信网络多特征输入分类预测 目录 分类预测 | MATLAB实现NGO-DBN北方苍鹰优化深度置信网络多特征输入分类预测效果一览基本介绍模型描述程序设计参考资料 效果一览 基本介绍 MATLAB实现NGO-DBN北方苍鹰优化深度置信网络多特征输入…

【jsthreeJS】入门three,并实现3D汽车展示厅,附带全码

首先放个最终效果图&#xff1a; 三维&#xff08;3D&#xff09;概念&#xff1a; 三维&#xff08;3D&#xff09;是一个描述物体在三个空间坐标轴上的位置和形态的概念。相比于二维&#xff08;2D&#xff09;只有长度和宽度的平面&#xff0c;三维增加了高度或深度这一维度…

机器学习使用场景

在计算机系统中&#xff0c;“经验”通常以“数据”的形式存在。因此&#xff0c;机器学习的主要内容&#xff0c;是关于在计算机上从数据中产生Function的算法&#xff0c;这个Function的作用是将将输入映射成合理的输出。例如给Function输入猫的图片&#xff0c;Function能够…

利用屏幕水印学习英语单词,无打扰英语单词学习

1、利用屏幕水印学习英语单词&#xff0c;不影响任何鼠标键盘操作&#xff0c;不影响工作 2、利用系统热键快速隐藏&#xff08;ALT1键 隐藏与显示&#xff09; 3、日积月累单词会有进步 4、软件下载地址: 免安装&#xff0c;代码未加密&#xff0c;安全的屏幕水印学习英语…

git开发常用命令

版本回退 soft&#xff1a;git reset --soft HEAD^ 将版本库回退一个版本&#xff0c;且这次提交的所有文件都移动到暂存区 mixed&#xff08;默认&#xff09;&#xff1a;git reset HEAD^ 将版本库回退一个版本&#xff0c;且这次提交的所有文件都移动到工作区&#xff0c;会…

【Java开发】 Mybatis-Plus 07:创建时间、更新时间自动添加

Mybatis-Plus 可以通过配置实体类的注解来自动添加创建时间和更新时间&#xff0c;这可以减轻一定的开发量。 1 在实体类中添加注解 public class User {TableId(type IdType.AUTO)private Long id;private String username;private String password;TableField(fill FieldF…