假设我们开发了一个必须与其他HTTP服务来交互的服务。不幸的是,这些HTTP服务速度慢且是阻塞的。
它可能是一个非常慢的遗留HTTP服务或我们必须使用的一些阻塞 API。无论如何,我们无法控制它。在这里,我们将调用两个HTTP API。其中一个将阻塞2秒钟,另一个将阻塞5秒钟。
一旦两个响应都可用,我们还需要打印响应状态代码。如果我们以老的、非异步反应性方式执行此操作,我们将阻塞调用线程5秒钟。阻塞线程 5 秒效率不高,不是吗?现在就叫你如何使用 Vert.x 异步发送HTTP长阻塞请求来提高并发响应!
服务
我使用“httpstat.us”作为网络服务。这是一个简单的服务,用于生成不同的HTTP代码来测试Web客户端。可以提供额外的参数,在本例中为 sleep
,在规定的时间内阻塞 HTTP 请求。
我将使用“httpie”来测试这两种服务。
服务 1 将阻塞5秒钟,并返回状态代码为 200 的响应:
http://httpstat.us/200?sleep=5000
_____________________________________________HTTP/1.1 200 OK
Content-Length: 6
Content-Type: text/plain
Date: Tue, 08 Mar 2022 17:05:08 GMT
Request-Context: appId=cid-v1:1e93d241-20e4-4513-bbd7-f452a16a5d69
Server: Kestrel
Set-Cookie: ARRAffinity=e2c17206c539113795daf64bd958d003f2b29b9f62da53617beea05468875ba5;Path=/;HttpOnly;Domain=httpstat.us200 OK
服务 2 与前一个相同,只是它阻塞了2秒而不是5秒:
http://httpstat.us/200?sleep=2000
_____________________________________________HTTP/1.1 200 OK
Content-Length: 6
Content-Type: text/plain
Date: Tue, 08 Mar 2022 17:11:53 GMT
Request-Context: appId=cid-v1:1e93d241-20e4-4513-bbd7-f452a16a5d69
Server: Kestrel
Set-Cookie: ARRAffinity=e2c17206c539113795daf64bd958d003f2b29b9f62da53617beea05468875ba5;Path=/;HttpOnly;Domain=httpstat.us200 OK
Web 客户端
我们已经了解了服务。现在,让我们讨论 Web 客户端。现在,我将使用 Vert.x Web 客户端。它是一个异步的,易于使用的 HTTP
和 HTTP/2
客户端.
private static Future<Integer> service1(WebClient webClient) {return webClient.getAbs("http://httpstat.us/200?sleep=5000").send().onSuccess(response -> System.out.println(MessageFormat.format("[{0}] service 1: response received", Thread.currentThread().getName()))).compose(response -> Future.succeededFuture(response.statusCode()));}private static Future<Integer> service2(WebClient webClient) {return webClient.getAbs("http://httpstat.us/200?sleep=2000").send().onSuccess(response -> System.out.println(MessageFormat.format("[{0}] service 2 response received", Thread.currentThread().getName()))).compose(response -> Future.succeededFuture(response.statusCode()));}
这两种方法非常相似。它们将 WebClient
作为参数并发送返回 Future<Integer>
的 HTTP 请求。其中整数是 HTTP 响应代码。返回的 Future<Integer>
向我们保证结果是异步的。状态代码在稍后可用时将给出回调。
我们需要组合两个Future
。使用 Vert.x的 CompositeFuture
可以实现多个future的协调. 它支持并发组合(并行运行多个异步操作)和顺序组合(链式异步操作).
Future<Integer> service1Code = service1(webClient);Future<Integer> service2Code = service2(webClient);CompositeFuture.all(service1Code, service2Code).onSuccess(ar -> {printResult(ar);countDownLatch.countDown();});
合在一起
最后,我们可以将所有的点点滴滴放在一起,如下所示:
import java.text.MessageFormat;
import java.util.concurrent.CountDownLatch;import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.ext.web.client.WebClient;public class Services {public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);// Vertx instance and web clientVertx vertx = Vertx.vertx();WebClient webClient = WebClient.create(vertx);Future<Integer> service1Code = service1(webClient);Future<Integer> service2Code = service2(webClient);CompositeFuture.all(service1Code, service2Code).onSuccess(ar -> {printResult(ar);countDownLatch.countDown();});vertx.setPeriodic(1000, l -> System.out.println("[" + Thread.currentThread().getName() + "] is released"));countDownLatch.await();vertx.close();}private static Future<Integer> service1(WebClient webClient) {return webClient.getAbs("http://httpstat.us/200?sleep=5000").send().onSuccess(response -> System.out.println(MessageFormat.format("[{0}] service 1: response received", Thread.currentThread().getName()))).compose(response -> Future.succeededFuture(response.statusCode()));}private static Future<Integer> service2(WebClient webClient) {return webClient.getAbs("http://httpstat.us/200?sleep=2000").send().onSuccess(response -> System.out.println(MessageFormat.format("[{0}] service 2 response received", Thread.currentThread().getName()))).compose(response -> Future.succeededFuture(response.statusCode()));}private static void printResult(CompositeFuture compositeFuture) {System.out.println(Thread.currentThread().getName() + " Result: service1:" + compositeFuture.resultAt(0) + " service2:" + compositeFuture.resultAt(1));}
}
下面是运行代码后打印在控制台上的结果。这两个请求都是从同一个 vertx
事件循环线程调度的。该程序还会每秒打印线程未被阻止的消息。最后,它将打印两个状态代码作为最终结果。如您所见,一切都发生在同一个线程上:
[vert.x-eventloop-thread-1] is released
[vert.x-eventloop-thread-1] is released
[vert.x-eventloop-thread-1] service 2 response received
[vert.x-eventloop-thread-1] is released
[vert.x-eventloop-thread-1] is released
[vert.x-eventloop-thread-1] is released
[vert.x-eventloop-thread-1] service 1: response received
[vert.x-eventloop-thread-1] Result: service1:200 service2:200
总结
这就是我目前所要讨论的全部内容。我希望这篇文章能帮助您现在对如何使用 Vert.x 异步发送长阻塞请求有了更好的理解。