一、背景
1.1 铺垫知识
涉及到阻塞式 IO、非阻塞式 IO、事件驱动等相关知识,可以查看我以往文章。
1.2 瓶颈解决
我之前使用 java 封装了一个 HTTP 反向代理的工具 meethigher/proxy-servlet: 基于 OkHttp 和 javax.servlet-api 实现的 HTTP 反向代理,主要是方便自己临时测试时使用的。
该工具存在一个问题,高并发的时候,整体响应就会滞后。原因是因为该工具采用 bio 实现的,即“一连接一线程”。
假如我这个工具,只开放了 200 个线程。此时有 201 个请求过来,顺序依次是,200 个慢请求 (10 秒响应),1 个快请求 (1 秒响应)。
实际出现的情况是这 200 个连接 (线程) 都被前面 200 个慢请求阻塞了,以至于后面的快请求一直等待,实际 20+1 秒后才会响应。** 而我希望的效果是即使我可支配的线程数不变,快请求仍然可以 1 秒响应。**
题外话,PostgreSQL 使用的是“一连接一进程”模型,MySQL 使用的是“一连接一线程”模型。
计算机任务有很多种分类,以上属于其中 IO 密集型任务。尽管 CPU 参与任务处理,但大部分时间在等待 IO 完成。这中间的时间,CPU 完全可以用来做更多的事。
我采用的做法是将 bio 切换成 nio,也就是 cpu 不用一直等待,而是去做别的事,当传输完成后,通知 cpu 过来善后。
对于 http 来说,成熟的框架有很多。
其中,我对比了下两者。
- WebFlux 对 Spring 的生态支持相当友好,可以说是强耦合,健壮的同时也牺牲了些便捷性。
- Vertx 就相当轻量、简单了。
于是我选择 Vertx 解决这个问题。
二、计算机任务分类
计算机任务可以根据其特性分为几种类型,主要包括:
- CPU 密集型
- IO 密集型
- 内存密集型
- 网络密集型
2.1 CPU 密集型
CPU 密集型也叫做计算密集型。
这类任务主要依赖 CPU 的处理能力,通常需要大量的计算和复杂的算法,因此 CPU 长时间处于繁忙状态。典型的例子包括视频编码、科学计算、数据分析等。
优化方式:并行处理,换用更高性能的 CPU。
32 线程的 CPU 同时刻只能跑 32 个线程。像我之前 32 核 64G 的 Win11 可以达到几十万个线程,那是因为其实有很多线程是空闲或者等待中的,CPU 来回轮转,间接达到了上万线程同时运行的效果,比如像下面的这种 IO 密集型任务。
2.2 IO 密集型
主要依赖于输入/输出操作,通常涉及大量的数据读写,如文件操作、网络请求等。
如果你正在上传/下载文件,性能主要取决于网络速度和磁盘写入速度,CPU 大部分时间处于空闲等待状态,等待数据传输完成或外部资源的响应。
该类任务,我们在编程时,可以使线程数远大于 CPU 线程数。
优化方式:减少 IO 操作次数、使用异步编程、换用更高性能硬盘。
2.3 内存密集型
这类任务对内存的使用量很大,例如处理大数据集、复杂的数据结构等。内存访问速度较快,但如果内存不足,可能会导致性能瓶颈。
优化方式:优化数据结构、减少内存占用、增加内存。
2.4 网络密集型
这类任务主要依赖网络带宽和延迟,通常涉及大量的数据在网络上传输,如 Web 服务器、大规模分布式系统等。
优化方式:优化网络协议、使用负载均衡、提高带宽等。
三、Vert.x 框架小记
3.1 介绍
Vert.x 是一个基于事件驱动的非阻塞框架,适合如下
这类 *IO 密集型的高并发 * 场景。
像这种非阻塞的框架,不只有 vertx,还有很多。
- Netty:很多非阻塞框架都是基于 Netty 实现。
- Spring WebFlux:类似于 Vertx,也是基于 Netty,与 Spring 框架强耦合。
- Play Framework:轻量级的反应式 Web 框架,适合高并发 Web 应用。
- Akka:基于 Actor 模型的高并发框架,适合分布式系统和容错系统。超重量级。
但是 vertx 各模块之间,没有强耦合,都可以灵活组装使用,这点就特别好,因此我选用 vertx。
在了解 vertx 之前,一定要先阅读下官方这些文档。
3.2 示例
以下示例均基于 Vert.x 4.5.10 版本。
所有代码示例放置在 meethigher/vertx-examples: learn to use vertx
3.2.1 基础知识
单机
vertx 主要有两种线程池
- WorkerPool:主要用于处理阻塞任务的工作线程池。默认
20
个。 - EventLoopPool:主要用于处理非阻塞的 IO 操作和定时器的事件循环线程池。默认
2*core
线程。
下面放置一个基础示例,包含阻塞逻辑、事件回调逻辑、定时任务注册与取消逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| private static void basic() {
Vertx tVertx = Vertx.vertx(); Vertx vertx = Vertx.vertx(new VertxOptions().setEventLoopPoolSize(5).setWorkerPoolSize(1));
vertx.executeBlocking(() -> { log.info("simulate blocking task execution time..."); TimeUnit.SECONDS.sleep(5); if (System.currentTimeMillis() % 2 == 0) { int i = 1 / 0; } return "Hello World"; }) .onComplete(re -> { if (re.succeeded()) { log.info("success, result:{}", re.result()); } else { log.error("failure, result:", re.cause()); } }) ; vertx.setTimer(5000, t -> { log.info("timer id:{}", t); }); vertx.setPeriodic(5000, t -> { log.info("periodic id:{}", t); if (System.currentTimeMillis() % 2 == 0) { vertx.cancelTimer(t); log.info("cancel timer:{}", t); } }); }
|
在 vertx 中,还有执行任务的单元 verticle,一个 vertx 实例可以部署多个 verticle。
注意,一个 verticle 由一个 eventloop 执行。如果直接在主线程执行,也是一个 eventloop 执行。一个 eventloop 即一个线程,若需要多线程执行,则需要部署多个 verticle 实例。
如果是 httpserver 部署多个 verticle 实例,每个实例都会注册端口,但是内部采用了端口复用,因此不会出现端口占用问题。可以简单理解成 vertx 是一个代理,负责分发请求,而 verticle 是实际的后端节点。
verticle 的分类有如下
标准 Verticle:使用 EventLoopPool 执行的 Verticle,这也是默认的 Verticle 类型。一个 Verticle 中的所有代码都会在一个事件循环 eventloop (底层对应线程) 中执行。
工作者 Verticle:使用 WorkerPool 执行的 Verticle,这类 Verticle 被设计用来执行阻塞任务,不过阻塞任务也可以不通过工作者 Verticle,直接 executeBlocking 也可。
vertx 支持部署任意语言编写的 verticle,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| private static void verticle() { Vertx vertx = Vertx.vertx(); vertx.deployVerticle(new AbstractVerticle() { @Override public void start() throws Exception { log.info("eventLoopVerticle start"); }
}).onFailure(e -> { log.error("error", e); }); vertx.deployVerticle(new AbstractVerticle() { @Override public void start() throws Exception { log.info("workerVerticle start"); } }, new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER)).onFailure(e -> { log.error("error", e); }); }
|
在 vertx 中,还有事件总线。这要求事件的发布、消费,都注册在同一个 vertx 实例或者集群中。
1 2 3 4 5 6 7 8
| private static void eventBus() { Vertx vertx = Vertx.vertx(); vertx.eventBus().consumer("test",t->{ Object body = t.body(); log.info("consumer: {}",body.toString()); }); vertx.eventBus().publish("test","hello world"); }
|
集群
vertx 构建集群需要依赖一些分布式服务。比如 hazelcast 或者 redis。下面给出示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-core</artifactId> <version>${vertx.version}</version> </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-web</artifactId> <version>${vertx.version}</version> </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-hazelcast</artifactId> <version>${vertx.version}</version> <exclusions> <exclusion> <groupId>com.hazelcast</groupId> <artifactId>hazelcast</artifactId> </exclusion> </exclusions> </dependency>
<dependency> <groupId>com.hazelcast</groupId> <artifactId>hazelcast</artifactId> <version>${hazelcast.version}</version> </dependency>
|
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public static void main(String[] args) throws Exception { Config config = new Config(); config.setClusterName("test"); HazelcastClusterManager clusterManager = new HazelcastClusterManager(config); Future<Vertx> vertxFuture = Vertx.builder() .with(new VertxOptions()) .withClusterManager(clusterManager) .buildClustered(); Vertx vertx = vertxFuture.toCompletionStage().toCompletableFuture().get(); vertx.eventBus().consumer("test", t -> { log.info(t.body().toString()); }); vertx.setTimer(20000, t -> { vertx.eventBus().publish("test", "现在时间是:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); }); }
|
当启动两个程序实例时,会发现接收到了两条消息。这说明事件总线发送的消息,已经被集群中所有节点消费了。

3.2.2 HttpServer
创建一个 HTTP 服务器,并且注册相应的接口。
结合如下代码理解三个路由
/test/test?test=xxx
:该接口在等待 10 秒后返回 xxx/halo/*
:该接口匹配 /halo/ 下面的目录及子目录,并返回 html 形式的 helloworld。- 这点与 SpringWeb 不同,SpringWeb 中
/*
表示当前目录,/**
表示当前目录及子目录。
空
:表示匹配其他所有路径,相当于 /*
,并返回 text 形式 helloworld。/static/test.html
:表示返回 D:/3Develop/www/test.html
的内容,若该内容不存在,则回转到 /*
路由。
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| public static void main(String[] args) { Vertx vertx = Vertx.vertx(); vertx.deployVerticle(new AbstractVerticle() { @Override public void start() throws Exception { int port = 4321; Router router = Router.router(vertx);
router.route("/test/test").order(1).handler(t -> { String testParam = t.request().getParam("test"); if (testParam == null) { t.response().setStatusCode(400) .end("missing 'test' query parameter"); } else { vertx.setTimer(10000, tt -> { t.response().putHeader("Content-Type", "text/plain") .end(testParam); });
} }); router.route("/halo/*").order(2).handler(t -> { t.response().setChunked(true); t.response().end("hello world " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "\n" + t.request().absoluteURI()); }); router.route().order(3).handler(t -> { t.response().end("Hello World"); });
StaticHandler staticHandler = StaticHandler.create("D:/3Develop/www") .setDirectoryListing(true) .setAlwaysAsyncFS(true) .setIndexPage("index.html"); router.route("/static/*").order(Integer.MIN_VALUE).handler(staticHandler);
vertx.createHttpServer(new HttpServerOptions() .setSsl(true) .setKeyCertOptions(new PemKeyCertOptions() .addCertPath("/usr/local/nginx/conf/cert/certificate.pem") .addKeyPath("/usr/local/nginx/conf/cert/private.key") ) ) .requestHandler(router) .listen(port).onComplete(re -> { if (re.succeeded()) { log.info("http server started on port {}", port); } else { log.error("http server failed to start", re.cause()); } }); } }); }
|
3.2.3 HttpClient
应用场景:
有一个接口,http://localhost:4321/test/test?test= 参数
,该接口 10 秒后会返回 参数
值。
我现在需要在 10 秒左右获取到 2000 次接口的请求响应。
如果使用传统的做法,那么就得需要 2000 个线程,但如果使用 vertx,只需要一个线程即可。
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
| public static void main(String[] args) throws Exception {
Vertx vertx = Vertx.vertx(new VertxOptions().setEventLoopPoolSize(1).setWorkerPoolSize(1));
HttpClient httpClient = vertx.createHttpClient(new PoolOptions().setHttp2MaxSize(2000).setHttp1MaxSize(2000).setEventLoopSize(2000)); HttpClient httpsClient = vertx.createHttpClient(new HttpClientOptions() .setProtocolVersion(HttpVersion.HTTP_2).setUseAlpn(true) .setSsl(true).setTrustAll(true) .setConnectTimeout(60000), new PoolOptions().setHttp2MaxSize(2000).setHttp1MaxSize(2000).setEventLoopSize(2000));
vertx.deployVerticle(new AbstractVerticle() { @Override public void start() throws Exception { vertx.setPeriodic(5000, t -> { vertx.executeBlocking(() -> { ThreadGroup threadGroup = Thread.currentThread().getThreadGroup(); int i = threadGroup.activeCount(); Thread[] threads = new Thread[i]; threadGroup.enumerate(threads); List<String> list = new ArrayList<>(); for (Thread thread : threads) { list.add(thread.getName()); } try { log.info("calculating..."); TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { throw new RuntimeException(e); } return String.join(",", list); }).onComplete(r -> { if (r.succeeded()) { log.info("threads:\n{}", r.result()); } else { log.warn(r.cause().getMessage(), r.cause()); } });
}); } }); int total = 2000; CountDownLatch countDownLatch = new CountDownLatch(total); long start = System.currentTimeMillis(); for (int i = 0; i < total; i++) { int finalI = i; vertx.deployVerticle(new AbstractVerticle() { @Override public void start() throws Exception { httpClient.request(new RequestOptions().setMethod(HttpMethod.GET).setSsl(true).setHost("reqres.in").setPort(443).setURI("/api/users?page=" + finalI)) .onComplete(r -> { if (r.succeeded()) { HttpClientRequest request = r.result(); log.info("{} send request {}", this, request.absoluteURI()); request.putHeader("User-Agent", "I am Vertx"); request.send() .onComplete(r1 -> { if (r1.succeeded()) { HttpClientResponse result = r1.result(); log.info("{} received response with status code {}", this, result.statusCode()); result.body().onComplete(re -> { if (re.succeeded()) { Buffer result1 = re.result(); log.info("result: {}", result1); } else { log.warn("warn:", re.cause()); } }); } else { log.error("{} send failed: {}", this, r1.cause().getMessage(), r1.cause()); }
countDownLatch.countDown(); }); } else { log.error("request failed: {}", r.cause().getMessage(), r.cause()); countDownLatch.countDown(); } }); } }); } countDownLatch.await(); log.info("done {} ms", System.currentTimeMillis()- start); }
|
3.2.4 文件系统
示例代码
vertx 中的 FileSystem 提供了许多方法,并且都有阻塞版本和非阻塞版本的 api。如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public static void fileCopy() { Vertx vertx = Vertx.vertx(); FileSystem fs = vertx.fileSystem();
fs.copy("D:/Downloads/yjwj_2024-06-05-10-50.zip", "D:/Desktop/test.zip") .onComplete(re -> { if (re.succeeded()) { log.info("copy success"); } else { log.error("copy failed,", re.cause()); } });
fs.copyBlocking("D:/Downloads/yjwj_2024-06-05-10-50.zip", "D:/Desktop/test1.zip"); log.info("done"); }
|
针对 copy 方法,底层是使用的 java.nio.file.Files#copy
方法,这个拷贝起来特别快,因为他使用了 零拷贝
机制。
传统拷贝:硬盘 -> 系统缓冲区 -> 程序缓冲区 -> 系统缓冲区 -> 硬盘
零拷贝:硬盘 -> 系统缓冲区 -> 硬盘
vertx 还提供了 open 方法,实现异步拷贝文件。如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| fs.open(sourcePath, new OpenOptions(), ar -> { if (ar.succeeded()) { AsyncFile sourceFile = ar.result(); sourceFile.setReadBufferSize(bufferSize); sourceFile.pause(); fs.open(targetPath, new OpenOptions().setTruncateExisting(true), ar1 -> { if (ar1.succeeded()) { AsyncFile targetFile = ar1.result(); targetFile.drainHandler(v -> { sourceFile.resume(); }); sourceFile.handler(buffer -> { targetFile.write(buffer); if (targetFile.writeQueueFull()) { sourceFile.pause(); } }); sourceFile.endHandler(v -> { sourceFile.close(); targetFile.close(); }); sourceFile.resume(); } else { log.error("open targetPath failed", ar1.cause()); } }); } else { log.error("open sourcePath failed", ar.cause()); } });
|
但是相比传统流式拷贝,该做法会占用更多 jvm 内存、耗费更多时间。此处我也在 vertx 的论坛里问过佬了,对方的回答是在执行连续工作时,不建议使用 async 或者 reactive。

3.2.5 HTTP 反向代理
添加依赖
1 2 3 4 5
| <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-web-proxy</artifactId> <version>${vertx.version}</version> </dependency>
|
其实这个依赖提供的功能非常简陋,如果想要实现强大的 HTTP 反向代理,需要自己编写代理逻辑。如下给出示例代码
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
| public static void main(String[] args) throws Exception { int port = 8080; Vertx vertx = Vertx.vertx(); HttpServer httpServer = vertx.createHttpServer(); Router router = Router.router(vertx); HttpClient httpClient = vertx.createHttpClient(); HttpClient httpsClient = vertx.createHttpClient(new HttpClientOptions().setSsl(true).setTrustAll(true));
HttpProxy httpProxy = HttpProxy.reverseProxy(httpClient); HttpProxy httpsProxy = HttpProxy.reverseProxy(httpsClient);
router.route("/halo/*").handler(ProxyHandler.create(httpProxy, 4321, "10.0.0.1")); router.route().handler(ProxyHandler.create(httpsProxy, 443, "meethigher.top"));
router.route("/api/*").order(Integer.MIN_VALUE).handler(ctx -> { HttpServerRequest request = ctx.request(); String uri = request.uri(); Route route = ctx.currentRoute(); String path;
boolean whole = false; if (whole) { if (route.getPath().endsWith("/")) { int length = route.getPath().length() - 1; path = uri.substring(length); } else { int length = route.getPath().length(); path = uri.substring(length); } } else { path = request.uri(); } System.out.println(path); httpsClient.request(HttpMethod.valueOf(request.method().name()), 443, "reqres.in", path) .onSuccess(r -> { r.headers().setAll(request.headers()); r.putHeader("Host", "reqres.in"); r.send() .onSuccess(r1 -> { ctx.response() .setStatusCode(r1.statusCode()) .headers().setAll(r1.headers()); r1.handler(data -> { ctx.response().write(data); }); r1.endHandler(v -> ctx.response().end());
}) .onFailure(e1 -> { ctx.response().setStatusCode(500).end(e1.getMessage()); }); }) .onFailure(e -> { ctx.response().setStatusCode(500).end("Internal Server Error"); }); });
httpServer.requestHandler(router).listen(port).onSuccess(t -> { log.info("http server started on port {}", port); });
TimeUnit.MINUTES.sleep(10); List<Route> routes = router.getRoutes(); for (Route next : routes) { String path = next.isExactPath()? next.getPath() : next.getPath() + "*"; if ("/api/*".equals(path)) { next.remove(); } } log.info("remove route /api/*"); List<Route> routes1 = router.getRoutes(); System.out.println(); }
|
3.2.6 TCP 反向代理
大家口中常说的反向代理,一般都是 HTTP 反向代理。
我简单实现了一个 TCP 反向代理,源码可查看 tcp 反向代理 top.meethigher.proxy.tcp.SimpleReverseTcpProxy。我是使用传统 bio 的形式,这种做法,反代 1 个 TCP 连接,需要使用 2 个线程,直到 TCP 连接释放。如果想要在有限的 CPU 上同时反代更多的 TCP 连接,这种做法是不可取的。而且这种做法有明显的缺陷,当 TCP 连接没有读写时,线程处于阻塞等待的状态,这段时间本可以做其他的事。
一个优化做法——事件驱动 (多路复用),通俗解释就是,线程等待的时间,安排他去做其他类型的事。
- 有可连接事件时,去连接
- 有可读事件,去读取
- 有可写事件,去写入
Vertx 就是基于事件驱动的,2 个线程即可实现超多连接的处理。代码如下
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| private static void tcpReverse() { NetClient netClient = vertx.createNetClient(); NetServer netServer = vertx.createNetServer(); Handler<NetSocket> handler = sourceSocket -> { sourceSocket.pause(); netClient.connect(5432, "10.0.0.9").onSuccess(targetSocket -> { targetSocket.pipeTo(sourceSocket); sourceSocket.pipeTo(targetSocket); sourceSocket.resume(); }); }; Handler<AsyncResult<NetServer>> successHandler = r -> System.out.println("Server listening on port" + port); Handler<Throwable> throwableHandler = Throwable::printStackTrace;
netServer.connectHandler(handler).listen(port, host).onComplete(successHandler).onFailure(throwableHandler); }
|
3.2.7 TCP 数据传输问题
当 tcp 向 socket 传输数据时,如果对面消费不过来或者接收不过来,那么数据会存储在发送方的本地发送缓冲区(也称为发送队列,在 vertx 的 socket 中即 writeQueue)中。
若对方一直不消费,那么就会出现本地内存崩了的情况。因此在开发过程中,要利用好相关 api,如下。
在编程时,更建议使用 pipeTo,内部已经实现了如下操作。
这其实就是一种背压/反压(Backpressure)机制。当下游处理能力不足,来不及处理上游发送过来的数据时,下游会以某种方式通知或限制上游减速或暂停发送数据,这个机制就叫做“背压”。
1 2 3 4 5 6 7
| ws.drainHandler(v -> src.resume()); src.handler(item -> { ws.write(item); if (ws.writeQueueFull()) { src.pause(); } });
|
3.3 参考致谢