Reactive Spring实战 -- WebFlux使用教程

news/2024/11/15 1:54:09/

ebFlux是Spring 5提供的响应式Web应用框架。
它是完全非阻塞的,可以在Netty,Undertow和Servlet 3.1+等非阻塞服务器上运行。
本文主要介绍WebFlux的使用。

FluxWeb vs noFluxWeb

WebFlux是完全非阻塞的。
在FluxWeb前,我们可以使用DeferredResult和AsyncRestTemplate等方式实现非阻塞的Web通信。
我们先来比较一下这两者。

注意:关于同步阻塞与异步非阻塞的性能差异,本文不再阐述。
阻塞即浪费。我们通过异步实现非阻塞。只有存在阻塞时,异步才能提高性能。如果不存在阻塞,使用异步反而可能由于线程调度等开销导致性能下降。

下面例子模拟一种业务场景。
订单服务提供接口查找订单信息,同时,该接口实现还需要调用仓库服务查询仓库信息,商品服务查询商品信息,并过滤,取前5个商品数据。

OrderService提供如下方法

public void getOrderByRest(DeferredResult<Order> rs, long orderId) {// [1]Order order = mockOrder(orderId);// [2]ListenableFuture<ResponseEntity<User>> userLister = asyncRestTemplate.getForEntity("http://user-service/user/mock/" + 1, User.class);ListenableFuture<ResponseEntity<List<Goods>>> goodsLister =asyncRestTemplate.exchange("http://goods-service/goods/mock/list?ids=" + StringUtils.join(order.getGoodsIds(), ","),HttpMethod.GET,  null, new ParameterizedTypeReference<List<Goods>>(){});// [3]CompletableFuture<ResponseEntity<User>> userFuture = userLister.completable().exceptionally(err -> {logger.warn("get user err", err);return new ResponseEntity(new User(), HttpStatus.OK);});CompletableFuture<ResponseEntity<List<Goods>>> goodsFuture = goodsLister.completable().exceptionally(err -> {logger.warn("get goods err", err);return new ResponseEntity(new ArrayList<>(), HttpStatus.OK);});// [4]warehouseFuture.thenCombineAsync(goodsFuture, (warehouseRes, goodsRes)-> {order.setWarehouse(warehouseRes.getBody());List<Goods> goods = goodsRes.getBody().stream().filter(g -> g.getPrice() > 10).limit(5).collect(Collectors.toList());order.setGoods(goods);return order;}).whenCompleteAsync((o, err)-> {// [5]if(err != null) {logger.warn("err happen:", err);}rs.setResult(o);});
}
  1. 加载订单数据,这里mack了一个数据。
  2. 通过asyncRestTemplate获取仓库,产品信息,得到ListenableFuture。
  3. 设置ListenableFuture异常处理,避免因为某个请求报错导致接口失败。
  4. 合并仓库,产品请求结果,组装订单数据
  5. 通过DeferredResult设置接口返回数据。

可以看到,代码较繁琐,通过DeferredResult返回数据的方式也与我们同步接口通过方法返回值返回数据的方式大相径庭。

这里实际存在两处非阻塞

  1. 使用AsyncRestTemplate实现发送异步Http请求,也就是说通过其他线程调用仓库服务和产品服务,并返回CompletableFuture,所以不阻塞getOrderByRest方法线程。
  2. DeferredResult负责异步返回Http响应。

getOrderByRest方法中并不阻塞等待AsyncRestTemplate返回,而是直接返回,等到AsyncRestTemplate返回后通过回调函数设置DeferredResult的值将数据返回给Http,可对比以下阻塞等待的代码

ResponseEntity<Warehouse> warehouseRes = warehouseFuture.get();
ResponseEntity<List<Goods>> goodsRes = goodsFuture.get();
order.setWarehouse(warehouseRes.getBody());
order.setGoods(goodsRes.getBody());
return order;

下面我们使用WebFlux实现。
pom引入依赖

    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency>

服务启动类OrderServiceReactive

@EnableDiscoveryClient
@SpringBootApplication
public class OrderServiceReactive
{public static void main( String[] args ){new SpringApplicationBuilder(OrderServiceReactive.class).web(WebApplicationType.REACTIVE).run(args);}
}

WebApplicationType.REACTIVE启动WebFlux。

OrderController实现如下

@GetMapping("/{id}")
public Mono<Order> getById(@PathVariable long id) {return service.getOrder(id);
}

注意返回一个Mono数据,Mono与Flux是Spring Reactor提供的异步数据流。
WebFlux中通常使用Mono,Flux作为数据输入,输出值。
当接口返回Mono,Flux,Spring知道这是一个异步请求结果。
关于Spring Reactor,可参考理解Reactor的设计与实现

OrderService实现如下

public Mono<Order> getOrder(long orderId) {// [1]Mono<Order> orderMono = mockOrder(orderId);// [2]return orderMono.flatMap(o -> {// [3]Mono<User> userMono =  getMono("http://user-service/user/mock/" + o.getUserId(), User.class).onErrorReturn(new User());Flux<Goods> goodsFlux = getFlux("http://goods-service/goods/mock/list?ids=" +StringUtils.join(o.getGoodsIds(), ","), Goods.class).filter(g -> g.getPrice() > 10).take(5).onErrorReturn(new Goods());// [4]return userMono.zipWith(goodsFlux.collectList(), (u, gs) -> {o.setUser(u);o.setGoods(gs);return o;});});
}private <T> Mono<T> getMono(String url, Class<T> resType) {return webClient.get().uri(url).retrieve().bodyToMono(resType);
}// getFlux
  1. 加载订单数据,这里mock了一个Mono数据
  2. flatMap方法可以将Mono中的数据转化类型,这里转化后的结果还是Order。
  3. 获取仓库,产品数据。这里可以看到,对产品过滤,取前5个的操作可以直接添加到Flux<Goods>上。
  4. zipWith方法可以组合两个Mono,并返回新的Mono类型,这里组合仓库、产品数据,最后返回Mono<Order>。

可以看到,代码整洁不少,并且接口返回Mono<Order>,与我们在同步接口中直接数据的做法类似,不需要借助DeferredResult这样的工具类。
我们通过WebClient发起异步请求,WebClient返回Mono结果,虽然它并不是真正的数据(它是一个数据发布者,等请求数据返回后,它才把数据送过来),但我们可以通过操作符方法对他添加逻辑,如过滤,排序,组合,就好像同步操作时已经拿到数据那样。
而在AsyncRestTemplate,则所有的逻辑都要写到回调函数中。

WebFlux是完全非阻塞的。
Mono、Flux的组合函数非常有用。
上面方法中先获取订单数据,再同时获取仓库,产品数据,
如果接口参数同时传入了订单id,仓库id,产品id,我们也可以同时获取这三个数据,再组装起来

springboot - Reactive Spring实战 -- WebFlux使用教程 - binecy - SegmentFault 思否


http://www.ppmy.cn/news/731201.html

相关文章

0124 计算机网络体系结构

目录 1.计算机网络体系结构 1.1计算机网络概述 计算机网络的组成 计算机网络的功能 计算机网络的分类 计算机网络的性能指标 1.1部分习题 1.2计算机网络体系结构与参考模型 计算机网络分层结构 计算机网络协议、接口与服务 ISO/OSI参考模型和TCP/IP模型 OSI参考模型…

曙光服务器怎么进入bios_怎么进入bios,教您怎么进入bios

小伙伴们&#xff0c;小编知道你们最近在找进入bios的操作方法。所以小编我就偷偷的帮助小伙伴们寻找&#xff0c;经过我的一番搜索&#xff0c;终于找到了方法。那么现在呢我就来给你们说说进入bios的操作方法&#xff0c; 什么时候会需要进入bios呢&#xff1f;其实在我们要重…

GTC 2019:没有新架构,没有大核弹,黄仁勋打造了一个巨大的朋友圈

北京时间 3 月 19 日凌晨 5 点钟&#xff0c;在美国加州圣何塞的圣何塞大学活动中心&#xff0c;一年一度的英伟达 GTC&#xff08;GPU Technology Conference&#xff09;大会迎来了本年度最重要的 Keynote 环节&#xff0c;也就是英伟达创始人兼 CEO 黄仁勋的重磅演讲。这是英…

GTC 2019:没有新架构,没有大核弹,黄仁勋打造了一个巨大的朋友圈...

北京时间 3 月 19 日凌晨 5 点钟&#xff0c;在美国加州圣何塞的圣何塞大学活动中心&#xff0c;一年一度的英伟达 GTC&#xff08;GPU Technology Conference&#xff09;大会迎来了本年度最重要的 Keynote 环节&#xff0c;也就是英伟达创始人兼 CEO 黄仁勋的重磅演讲。这是英…

Linux运维架构:基础知识扫盲,使用CentOS,Linux目录结构,linux用户用户组,普通用户和管理员的区别,用户的删除

目录 0x00 基础知识扫盲 什么是公有云&#xff0c;私有云&#xff0c;混合云&#xff1f;云计算&#xff1f; 国内的几大云服务商&#xff1a; 世界服务器品牌排行榜&#xff1a; 服务器&#xff1a; 进程和线程&#xff1a; 内存&#xff1a; 硬盘&#xff1a; 0x01 …

oracle 清理资源池,资源池裸金属实施管理手册

概述 文档包括 “上线实施” —— 含布线规则、BIOS设置、部署流程&#xff0c;三个阶段针对资源池各机型的不同实施方法&#xff1b; “故障类型及处理办法” —— 含整个生命周期中会出现的 8 类问题、5 种异常状态&#xff0c;对应 28 种故障类型和处理办法 布线规则 千兆网…

服务器命名方式大全

服务器命名方式大全 Dell 第一位字母代表服务器类型&#xff1a;M代表模块&#xff0c;指刀片&#xff0c;而R则表示机架&#xff0c;T代表塔式&#xff1b; 所以R710就是机架式服务器&#xff0c;T110则是塔式服务器。 第二位的数字代表是几路服务器&#xff1a;9及以上表示…

SpringBoot运行中动态修改logback日志级别

SpringBoot运行中动态修改logback日志级别 思路&#xff1a;写一个api接口&#xff0c;通过api接口调用的方式动态修改logback的log日志打印级别 这里提供2个接口&#xff0c;分别是修改logback全局日志级别 &#xff0c;和单独修改某个package包的日志级别 package cn.demo…