一、异步请求
Spring MVC与Servlet异步请求 处理 有广泛的集成:
- controller 方法中的 DeferredResult 和 Callable 返回值为单个异步返回值提供了基本支持。
- controller 可以 流转(stream) 多个数值,包括 SSE 和 原始数据。
- controller 可以使用 reactive 客户端并返回 reactive 类型 来处理响应。
1. DeferredResult
一旦在Servlet容器中 启用异步请求处理功能,controller 方法可以用 DeferredResult
包裹任何支持的 controller 方法的返回值,如下例所示:
@GetMapping("/quotes")
@ResponseBody
public DeferredResult<String> quotes() {DeferredResult<String> deferredResult = new DeferredResult<>();// Save the deferredResult somewhere..return deferredResult;
}// From some other thread...
deferredResult.setResult(result);
controller 可以从不同的线程异步地产生返回值—例如,响应外部事件(JMS消息)、计划任务(scheduled task)或其他事件。
2. Callable
一个控制器可以用 java.util.concurrent.Callable
来包装任何支持的返回值,正如下面的例子所示:
@PostMapping
public Callable<String> processUpload(final MultipartFile file) {return () -> "someView";
}
然后可以通过 配置的 TaskExecutor 运行给定的任务来获得返回值。
3. 处理
这里是对Servlet异步请求处理的一个非常简洁的概述:
-
ServletRequest
可以通过调用request.startAsync()
进入异步模式。这样做的主要效果是,Servlet(以及任何 filter)可以退出,但响应仍然是开放的,以便以后完成处理。 -
对
request.startAsync()
的调用返回AsyncContext
,你可以用它来进一步控制异步处理。例如,它提供了dispatch
方法,它类似于 Servlet API中的forward,只是它允许应用程序在 Servlet 容器线程上恢复请求处理。 -
ServletRequest
提供了对当前DispatcherType
的访问,你可以用它来区分处理初始请求、异步调度、转发和其他调度器类型。
DeferredResult
的处理工作如下:
-
控制器返回
DeferredResult
,并将其保存在一些可以访问的内存队列或列表中。 -
Spring MVC 调用
request.startAsync()
。 -
同时,
DispatcherServlet
和所有配置的 filter 退出请求处理线程,但响应仍然开放。 -
应用程序从某个线程设置
DeferredResult
,Spring MVC将请求调度回Servlet容器。 -
DispatcherServlet
被再次调用,并以异步产生的返回值恢复处理。
Callable
处理的工作方式如下:
-
控制器返回一个
Callable
。 -
Spring MVC 调用
request.startAsync()
,并将Callable
提交给TaskExecutor
,在一个单独的线程中进行处理。 -
同时,
DispatcherServlet
和所有的 filter 退出 Servlet 容器线程,但响应仍然开放。 -
最终,
Callable
产生了一个结果,Spring MVC将请求调度回Servlet容器以完成处理。 -
DispatcherServlet
被再次调用,并以来自Callable
的异步产生的返回值继续进行处理。
异常处理
当你使用 DeferredResult
时,你可以选择是否调用 setResult
或 setErrorResult
与一个异常。在这两种情况下,Spring MVC 都会将请求调度回Servlet容器以完成处理。然后,它被当作 controller 方法返回给定值或产生给定的异常来处理。然后,该异常将通过常规的异常处理机制(例如,调用 @ExceptionHandler
方法)。
当你使用 Callable
时,会发生类似的处理逻辑,主要区别在于结果是由 Callable
返回,还是由它引发异常
拦截
HandlerInterceptor 实例可以是 AsyncHandlerInterceptor 类型,以便在开始异步处理的初始请求上接收 afterConcurrentHandlingStarted 回调(而不是 postHandle 和 afterCompletion)。
HandlerInterceptor 的实现还可以注册一个 CallableProcessingInterceptor 或 DeferredResultProcessingInterceptor,以便更深入地与异步请求的生命周期相结合(例如,处理一个超时事件)
DeferredResult 提供 onTimeout(Runnable) 和 onCompletion(Runnable) 回调
异步 Spring MVC 与 WebFlux 的比较
Servlet API最初是为在 Filter-Servlet 链中进行单次传递而构建的。异步请求处理让应用程序退出Filter-Servlet链,但为进一步处理留下了 response。Spring MVC的异步支持是围绕这一机制建立的。当控制器返回一个 DeferredResult 时,Filter-Servlet链被退出,Servlet容器线程被释放。后来,当 DeferredResult 被设置,一个 ASYNC 调度(到相同的URL),在此期间,controller 再次被映射,但不是调用它,DeferredResult 值被使用(就像 controller 返回它一样),以恢复处理。
相比之下,Spring WebFlux 既没有建立在 Servlet API 上,也不需要这样的异步请求处理功能,因为它在设计上就是异步的。异步处理是建立在所有框架契约中的,并且通过请求处理的所有阶段得到内在支持。
从编程模型的角度来看,Spring MVC 和 Spring WebFlux 都支持异步和 响应式(Reactive)类型 作为 controller 方法的返回值。Spring MVC 甚至支持 stream,包括响应式背压。然而,对响应的单独写入仍然是阻塞的(并且在一个单独的线程上执行),这与 WebFlux 不同,WebFlux 依赖于非阻塞的I/O,并且不需要为每次写入增加一个线程。
另一个根本区别是,Spring MVC 不支持 controller 方法参数中的异步或响应式类型(例如,@RequestBody、@RequestPart 等),也没有明确支持异步和响应式类型作为 model attributes。Spring WebFlux 确实支持所有这些。
最后,从配置的角度来看,异步请求处理功能必须 在Servlet容器级别启用。
4、 HTTP Streaming
你可以使用 DeferredResult 和 Callable 来实现一个单一的异步返回值。如果你想产生多个异步值,并让这些值写入响应中,该怎么办?本节描述了如何做到这一点。
4.1.Objects
你可以使用 ResponseBodyEmitter 的返回值来产生一个对象流,其中每个对象都被 HttpMessageConverter 序列化并写入响应中,如下例所示:
@GetMapping("/events")
public ResponseBodyEmitter handle() {ResponseBodyEmitter emitter = new ResponseBodyEmitter();// Save the emitter somewhere..return emitter;
}// In some other thread
emitter.send("Hello once");// and again later on
emitter.send("Hello again");// and done at some point
emitter.complete();
你也可以使用 ResponseBodyEmitter 作为 ResponseEntity 中的 body,让你自定义响应的 status 和 header。
当 emitter 抛出一个 IOException 时(例如,如果远程客户端消失了),应用程序不负责清理连接,也不应该调用 emitter.complete 或 emitter.completeWithError。相反,servlet容器会自动发起一个 AsyncListener 错误通知,Spring MVC 在其中进行 completeWithError 调用。这个调用反过来又向应用程序执行最后的 ASYNC 调度,在此期间,Spring MVC 会调用配置的异常解析器并完成请求。
4.2.SSE
SseEmitter(ResponseBodyEmitter 的一个子类)提供了 Server-Sent Events 支持,从服务器发送的事件是按照W3C SSE规范格式化的。为了从 controller 中产生一个SSE流,返回 SseEmitter,如下面的例子所示:
@GetMapping(path="/events", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter handle() {SseEmitter emitter = new SseEmitter();// Save the emitter somewhere..return emitter;
}// In some other thread
emitter.send("Hello once");// and again later on
emitter.send("Hello again");// and done at some point
emitter.complete();
虽然 SSE stream 是浏览器的主要选择,但请注意,Internet Explorer不支持 Server-Sent Events。可以考虑使用 Spring 的 WebSocket messaging 与 SockJS fallback transports(包括SSE),其目标是广泛的浏览器。
4.3.原始数据(Raw Data)
有时,绕过消息转换,直接流向响应的 OutputStream
是很有用的(例如,对于文件下载)。你可以使用 StreamingResponseBody
返回值类型来做到这一点,正如下面的例子所示:
@GetMapping("/download")
public StreamingResponseBody handle() {return new StreamingResponseBody() {@Overridepublic void writeTo(OutputStream outputStream) throws IOException {// write...}};
}
你可以在 ResponseEntity
中使用 StreamingResponseBody
作为 body,以定制响应的 status 和 header 信息。
5. 响应式(Reactive)类型
Spring MVC支持在 controller 中使用响应式客户端库。这包括来自 spring-webflux
的 WebClient
和其他,如Spring Data的响应式 data repository。在这种情况下,能够从 controller 方法中返回响应式类型是很方便的。
响应式返回值的处理方法如下:
-
一个单值 promise 被适配,类似于使用
DeferredResult
。例子包括Mono
(Reactor)或Single
(RxJava)。 -
一个具有流媒体类型(如
application/x-ndjson
或text/event-stream
)的多值流被适配,类似于使用ResponseBodyEmitter
或SseEmitter
。例子包括Flux
(Reactor)或Observable
(RxJava)。应用程序也可以返回Flux<ServerSentEvent>
或Observable<ServerSentEvent>
。
对于 stream 响应,支持响应式背压,但对响应的写入仍然是阻塞的,并通过 配置的 TaskExecutor 在一个单独的线程上运行,以避免阻塞上游(upstream)源(如从 WebClient 返回的 Flux)。默认情况下,SimpleAsyncTaskExecutor 被用于阻塞性写入,但这在负载(load)下并不合适。如果你打算用一个响应式的流,你应该使用 MVC配置 来配置一个任务执行器(task executor)。
6. 上下文(Context)的传播
通过 java.lang.ThreadLocal 来传播上下文是很常见的。这对于在同一线程上的处理来说是透明的,但对于跨多线程的异步处理来说需要额外的工作。 Micrometer 上下文传播 库简化了跨线程和跨上下文机制(如 ThreadLocal 值、Reactor context、GraphQL Java context和其他)的上下文传播。
如果classpath上存在 Micrometer Context Propagation,当一个控制器方法返回一个 响应式类型,如 Flux 或 Mono,所有的 ThreadLocal 值,对于有注册的 io.micrometer.ThreadLocalAccessor,将作为 key-value 对写入 Reactor Context,使用 ThreadLocalAccessor 分配的key。
对于其他异步处理场景,你可以直接使用 Context Propagation 库。比如说:
// Capture ThreadLocal values from the main thread ...
ContextSnapshot snapshot = ContextSnapshot.captureAll();// On a different thread: restore ThreadLocal values
try (ContextSnapshot.Scope scope = snapshot.setThreadLocals()) {// ...
}
7. Disconnects
当一个远程客户端消失时,Servlet API不提供任何通知。因此,在 stream 响应的时候,无论是通过 SseEmitter 还是 响应式类型,定期发送数据是很重要的,因为如果客户端已经断开了连接,写入就会失败。发送的形式可以是一个空的(只有comment的)SSE事件或其他任何数据,另一方必须解释为心跳而忽略。
8. 配置
异步请求处理功能必须在Servlet容器级别启用。MVC配置也为异步请求暴露了几个选项。
Servlet 容器
Filter 和 Servlet 的声明有一个 asyncSupported
标志,需要设置为 true
以启用异步请求处理。此外,Filter
映射(mappings)应被声明为处理 ASYNC
jakarta.servlet.DispatchType
。
在Java配置中,当你使用 AbstractAnnotationConfigDispatcherServletInitializer
来初始化Servlet容器时,这将自动完成。
在 web.xml
配置中,你可以在 DispatcherServlet
和 Filter
声明中添加 <async-supported>true</async-supported>
,在Filter映射(mappings)中添加 <dispatcher>ASYNC</dispatcher>
。
Spring MVC
MVC配置暴露了以下与异步请求处理有关的选项:
-
Java 配置:在
WebMvcConfigurer
上使用 configureAsyncSupport 回调。 -
XML 命名空间: 使用
<mvc:annotation-driven>
下的<async-support>
元素。
你可以配置以下内容:
-
异步请求的默认超时值,如果没有设置,则取决于底层Servlet容器。
-
AsyncTaskExecutor,用于 响应式(Reactive)类型 类型流时的阻塞写入,以及执行从 controller 方法返回的 Callable 实例。如果你使用响应式(Reactive)类型的流媒体或有返回 Callable 的 controller 方法,我们强烈建议你配置这个属性,因为在默认情况下,它是一个 SimpleAsyncTaskExecutor。
-
DeferredResultProcessingInterceptor 的实现和 CallableProcessingInterceptor 的实现。
注意,你也可以在 DeferredResult
、ResponseBodyEmitter
和 SseEmitter
上设置默认超时值。对于一个 Callable
,你可以使用 WebAsyncTask
来提供一个超时值。