一、背景 1.1 铺垫知识 涉及到阻塞式IO、非阻塞式IO、事件驱动等相关知识,可以查看我以往文章。
1.2 瓶颈解决 我之前使用java封装了一个HTTP反向代理的工具http-proxy-boot ,主要是方便自己临时测试时使用的。
该工具存在一个问题,高并发的时候,整体响应就会滞后。原因是因为该工具采用bio实现的,即“一连接一线程”。
假如我这个工具,只开放了200个连接。此时有201个请求过来,顺序依次是,200个慢请求(10秒响应),1个快请求(1秒响应)。
实际出现的情况是这200个连接都被前面200个慢请求阻塞了,以至于后面的快请求一直等待,实际20+1秒后才会响应。而我希望的效果是即使我可支配的线程数不变,快请求仍然可以1秒响应。
题外话,PostgreSQL使用的是“一连接一进程”模型,MySQL使用的是“一连接一线程”模型。
计算机任务有很多种分类,以上属于其中IO密集型任务。尽管CPU参与任务处理,但大部分时间在等待IO完成。这中间的时间,CPU完全可以用来做更多的事。
我采用的做法是将bio切换成nio,也就是cpu不用一直等待,而是去做别的事,当传输完成后,通知cpu过来善后。
对于http来说,成熟的框架有很多。
其中,我对比了下两者。
WebFlux对Spring的生态支持相当友好,可以说是强耦合,健壮的同时也牺牲了些便捷性。 Vertx就相当轻量、简单了,于是我选择Vertx解决这个问题。 二、计算机任务分类 计算机任务可以根据其特性分为几种类型,主要包括:
CPU密集型 IO密集型 内存密集型 网络密集型 每种类型的任务都有不同的性能瓶颈,了解它们的特点可以帮助我们在设计和优化系统时做出更合适的选择。
2.1 CPU密集型 CPU密集型也叫做计算机密集型。
这类任务主要依赖CPU的处理能力,通常需要大量的计算和复杂的算法,因此CPU长时间处于繁忙状态。典型的例子包括视频编码、科学计算、数据分析等。
优化方式:并行处理,换用更高性能的CPU。
32线程的CPU同时刻只能跑32个线程。像操作系统里面会有很多线程,几十万个,那是因为其实有很多线程是空闲或者等待中的,CPU来回轮转,间接达到了上万线程同时运行的效果,比如像下面的这种IO密集型任务。
2.2 IO密集型 主要依赖于输入/输出操作,通常涉及大量的数据读写,如文件操作、网络请求等。
如果你正在上传/下载文件,性能主要取决于网络速度和磁盘写入速度,CPU大部分时间处于空闲等待状态,等待数据传输完成或外部资源的响应。
优化方式:减少IO操作次数、使用异步编程、换用更高性能硬盘。
2.3 内存密集型 这类任务对内存的使用量很大,例如处理大数据集、复杂的数据结构等。内存访问速度较快,但如果内存不足,可能会导致性能瓶颈。
优化方式:优化数据结构、减少内存占用、增加内存。
2.4 网络密集型 这类任务主要依赖网络带宽和延迟,通常涉及大量的数据在网络上传输,如Web服务器、大规模分布式系统等。
优化方式:优化网络协议、使用负载均衡、提高带宽等。
三、Vert.x 框架小记 3.1 介绍 Vert.x 是一个基于事件驱动的非阻塞框架,适合如下
这类IO密集型的高并发 场景。
像这种非阻塞的框架,不只有vertx,还有很多。
Netty:很多非阻塞框架都是基于Netty实现。 Spring WebFlux:类似于Vertx,并且也是基于Netty,与Spring框架强耦合。 Play Framework:轻量级的反应式 Web 框架,适合高并发 Web 应用。 Akka:基于 Actor 模型的高并发框架,适合分布式系统和容错系统。超重量级。 但是vertx各模块之间,没有强耦合,都可以灵活组装使用,这点就特别好,因此我选用vertx。
在了解vertx之前,一定要先阅读下官方这些文档。
3.2 示例 以下示例均基于Vert.x 4.5.10版本。
所有代码示例放置在meethigher/vertx-examples: learn to use vertx
3.2.1 基础知识 单机 vertx主要有两种线程池
WorkerPool:主要用于处理阻塞任务的工作线程池。默认20
个。 EventLoopPool:主要用于处理非阻塞的IO操作和定时器的事件循环线程池。默认2*core
线程。 下面放置一个基础示例,包含阻塞逻辑、事件回调逻辑、定时任务注册与取消逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 private static void basic () { Vertx tVertx = Vertx.vertx(); Vertx vertx = Vertx.vertx(new VertxOptions().setEventLoopPoolSize(5 ).setWorkerPoolSize(1 )); vertx.executeBlocking(() -> { log.info("simulate blocking task execution time..." ); TimeUnit.SECONDS.sleep(5 ); if (System.currentTimeMillis() % 2 == 0 ) { int i = 1 / 0 ; } return "Hello World" ; }) .onComplete(re -> { if (re.succeeded()) { log.info("success, result:{}" , re.result()); } else { log.error("failure, result:" , re.cause()); } }) ; vertx.setTimer(5000 , t -> { log.info("timer id:{}" , t); }); vertx.setPeriodic(5000 , t -> { log.info("periodic id:{}" , t); if (System.currentTimeMillis() % 2 == 0 ) { vertx.cancelTimer(t); log.info("cancel timer:{}" , t); } }); }
在vertx中,还有执行任务的单元verticle,一个vertx实例可以部署多个verticle。verticle的分类有如下
vertx支持部署任意语言编写的verticle,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private static void verticle () { Vertx vertx = Vertx.vertx(); vertx.deployVerticle(new AbstractVerticle() { @Override public void start () throws Exception { log.info("eventLoopVerticle start" ); } }).onFailure(e -> { log.error("error" , e); }); vertx.deployVerticle(new AbstractVerticle() { @Override public void start () throws Exception { log.info("workerVerticle start" ); } }, new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER)).onFailure(e -> { log.error("error" , e); }); }
在vertx中,还有事件总线。这要求事件的发布、消费,都注册在同一个vertx实例或者集群中。
1 2 3 4 5 6 7 8 private static void eventBus () { Vertx vertx = Vertx.vertx(); vertx.eventBus().consumer("test" ,t->{ Object body = t.body(); log.info("consumer: {}" ,body.toString()); }); vertx.eventBus().publish("test" ,"hello world" ); }
集群 vertx构建集群需要依赖一些分布式服务。比如hazelcast或者redis。下面给出示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 <dependency > <groupId > io.vertx</groupId > <artifactId > vertx-core</artifactId > <version > ${vertx.version}</version > </dependency > <dependency > <groupId > io.vertx</groupId > <artifactId > vertx-web</artifactId > <version > ${vertx.version}</version > </dependency > <dependency > <groupId > io.vertx</groupId > <artifactId > vertx-hazelcast</artifactId > <version > ${vertx.version}</version > <exclusions > <exclusion > <groupId > com.hazelcast</groupId > <artifactId > hazelcast</artifactId > </exclusion > </exclusions > </dependency > <dependency > <groupId > com.hazelcast</groupId > <artifactId > hazelcast</artifactId > <version > ${hazelcast.version}</version > </dependency >
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public static void main (String[] args) throws Exception { Config config = new Config(); config.setClusterName("test" ); HazelcastClusterManager clusterManager = new HazelcastClusterManager(config); Future<Vertx> vertxFuture = Vertx.builder() .with(new VertxOptions()) .withClusterManager(clusterManager) .buildClustered(); Vertx vertx = vertxFuture.toCompletionStage().toCompletableFuture().get(); vertx.eventBus().consumer("test" , t -> { log.info(t.body().toString()); }); vertx.setTimer(20000 , t -> { vertx.eventBus().publish("test" , "现在时间是:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" ).format(new Date())); }); }
当启动两个程序实例时,会发现接收到了两条消息。这说明事件总线发送的消息,已经被集群中所有节点消费了。
3.2.2 HttpServer 创建一个HTTP服务器,并且注册响应的接口
/test/test?test=xxx
:该接口在等待10秒后返回xxx/halo/*
:该接口匹配/halo/下面的目录及子目录,并返回html形式的helloworld。这点与SpringWeb不同,SpringWeb中/*
表示当前目录,/**
表示当前目录及子目录。 缺省值
:表示匹配其他所有路径,并返回text形式helloworld。示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public static void main (String[] args) { Vertx vertx = Vertx.vertx(); vertx.deployVerticle(new AbstractVerticle() { @Override public void start () throws Exception { int port = 4321 ; Router router = Router.router(vertx); router.route("/test/test" ).handler(t -> { String testParam = t.request().getParam("test" ); if (testParam == null ) { t.response().setStatusCode(400 ) .end("missing 'test' query parameter" ); } else { vertx.setTimer(10000 , tt -> { t.response().putHeader("Content-Type" , "text/plain" ) .end(testParam); }); } }); router.route("/halo/*" ).handler(t -> { String s = t.request().absoluteURI(); t.response().putHeader("Content-Type" , "text/html" ) .end("<h1>Hello World</h1> " + s); }); router.route().handler(t -> { t.response().end("Hello World" ); }); vertx.createHttpServer() .requestHandler(router) .listen(port).onComplete(re -> { if (re.succeeded()) { log.info("http server started on port {}" , port); } else { log.error("http server failed to start" , re.cause()); } }); } }); }
3.2.3 HttpClient 应用场景:
有一个接口,http://localhost:4321/test/test?test=参数
,该接口10秒后会返回参数
值。
我现在需要在10秒左右获取到2000次个接口的请求响应。
如果使用传统的做法,那么就得需要2000个线程,但如果使用vertx,只需要一个线程即可。
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 public static void main (String[] args) throws Exception { Vertx vertx = Vertx.vertx(new VertxOptions().setEventLoopPoolSize(1 ).setWorkerPoolSize(1 )); HttpClient httpClient = vertx.createHttpClient(new PoolOptions().setHttp2MaxSize(2000 ).setHttp1MaxSize(2000 ).setEventLoopSize(2000 )); HttpClient httpsClient = vertx.createHttpClient(new HttpClientOptions() .setProtocolVersion(HttpVersion.HTTP_2).setUseAlpn(true ) .setSsl(true ).setTrustAll(true ) .setConnectTimeout(60000 ), new PoolOptions().setHttp2MaxSize(2000 ).setHttp1MaxSize(2000 ).setEventLoopSize(2000 )); vertx.deployVerticle(new AbstractVerticle() { @Override public void start () throws Exception { vertx.setPeriodic(5000 , t -> { vertx.executeBlocking(() -> { ThreadGroup threadGroup = Thread.currentThread().getThreadGroup(); int i = threadGroup.activeCount(); Thread[] threads = new Thread[i]; threadGroup.enumerate(threads); List<String> list = new ArrayList<>(); for (Thread thread : threads) { list.add(thread.getName()); } try { log.info("calculating..." ); TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { throw new RuntimeException(e); } return String.join("," , list); }).onComplete(r -> { if (r.succeeded()) { log.info("threads:\n{}" , r.result()); } else { log.warn(r.cause().getMessage(), r.cause()); } }); }); } }); int total = 2000 ; CountDownLatch countDownLatch = new CountDownLatch(total); long start = System.currentTimeMillis(); for (int i = 0 ; i < total; i++) { int finalI = i; vertx.deployVerticle(new AbstractVerticle() { @Override public void start () throws Exception { httpClient.request(new RequestOptions().setMethod(HttpMethod.GET).setSsl(true ).setHost("reqres.in" ).setPort(443 ).setURI("/api/users?page=" + finalI)) .onComplete(r -> { if (r.succeeded()) { HttpClientRequest request = r.result(); log.info("{} send request {}" , this , request.absoluteURI()); request.putHeader("User-Agent" , "I am Vertx" ); request.send() .onComplete(r1 -> { if (r1.succeeded()) { HttpClientResponse result = r1.result(); log.info("{} received response with status code {}" , this , result.statusCode()); result.body().onComplete(re -> { if (re.succeeded()) { Buffer result1 = re.result(); log.info("result: {}" , result1); } else { log.warn("warn: " , re.cause()); } }); } else { log.error("{} send failed: {}" , this , r1.cause().getMessage(), r1.cause()); } countDownLatch.countDown(); }); } else { log.error("request failed: {}" , r.cause().getMessage(), r.cause()); countDownLatch.countDown(); } }); } }); } countDownLatch.await(); log.info("done {} ms" , System.currentTimeMillis() - start); }
3.2.4 文件系统 示例代码
vertx中的FileSystem提供了许多方法,并且都有阻塞版本和非阻塞版本的api。如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static void fileCopy () { Vertx vertx = Vertx.vertx(); FileSystem fs = vertx.fileSystem(); fs.copy("D:/Downloads/yjwj_2024-06-05-10-50.zip" , "D:/Desktop/test.zip" ) .onComplete(re -> { if (re.succeeded()) { log.info("copy success" ); } else { log.error("copy failed, " , re.cause()); } }); fs.copyBlocking("D:/Downloads/yjwj_2024-06-05-10-50.zip" , "D:/Desktop/test1.zip" ); log.info("done" ); }
针对copy方法,底层是使用的java.nio.file.Files#copy
方法,这个拷贝起来特别快,因为他使用了零拷贝
机制。
传统拷贝:硬盘->系统缓冲区->程序缓冲区->系统缓冲区->硬盘
零拷贝:硬盘->系统缓冲区->硬盘
vertx还提供了open方法,实现异步拷贝文件。如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 fs.open(sourcePath, new OpenOptions(), ar -> { if (ar.succeeded()) { AsyncFile sourceFile = ar.result(); sourceFile.setReadBufferSize(bufferSize); sourceFile.pause(); fs.open(targetPath, new OpenOptions().setTruncateExisting(true ), ar1 -> { if (ar1.succeeded()) { AsyncFile targetFile = ar1.result(); targetFile.drainHandler(v -> { sourceFile.resume(); }); sourceFile.handler(buffer -> { targetFile.write(buffer); if (targetFile.writeQueueFull()) { sourceFile.pause(); } }); sourceFile.endHandler(v -> { sourceFile.close(); targetFile.close(); }); sourceFile.resume(); } else { log.error("open targetPath failed" , ar1.cause()); } }); } else { log.error("open sourcePath failed" , ar.cause()); } });
但是相比传统流式拷贝,该做法会占用更多jvm内存、耗费更多时间。此处我也在vertx的论坛里问过佬了,对方的回答是在执行连续工作时,不建议使用async或者reactive。
3.2.5 HTTP反向代理 添加依赖
1 2 3 4 5 <dependency > <groupId > io.vertx</groupId > <artifactId > vertx-web-proxy</artifactId > <version > ${vertx.version}</version > </dependency >
其实这个依赖提供的功能非常简陋,如果想要实现强大的HTTP反向代理,需要自己编写代理逻辑。如下给出示例代码
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 public static void main (String[] args) throws Exception { int port = 8080 ; Vertx vertx = Vertx.vertx(); HttpServer httpServer = vertx.createHttpServer(); Router router = Router.router(vertx); HttpClient httpClient = vertx.createHttpClient(); HttpClient httpsClient = vertx.createHttpClient(new HttpClientOptions().setSsl(true ).setTrustAll(true )); HttpProxy httpProxy = HttpProxy.reverseProxy(httpClient); HttpProxy httpsProxy = HttpProxy.reverseProxy(httpsClient); router.route("/halo/*" ).handler(ProxyHandler.create(httpProxy, 4321 , "10.0.0.1" )); router.route().handler(ProxyHandler.create(httpsProxy, 443 , "meethigher.top" )); router.route("/api/*" ).order(Integer.MIN_VALUE).handler(ctx -> { HttpServerRequest request = ctx.request(); String uri = request.uri(); Route route = ctx.currentRoute(); String path; boolean whole = false ; if (whole) { if (route.getPath().endsWith("/" )) { int length = route.getPath().length() - 1 ; path = uri.substring(length); } else { int length = route.getPath().length(); path = uri.substring(length); } } else { path = request.uri(); } System.out.println(path); httpsClient.request(HttpMethod.valueOf(request.method().name()), 443 , "reqres.in" , path) .onSuccess(r -> { r.headers().setAll(request.headers()); r.putHeader("Host" , "reqres.in" ); r.send() .onSuccess(r1 -> { ctx.response() .setStatusCode(r1.statusCode()) .headers().setAll(r1.headers()); r1.handler(data -> { ctx.response().write(data); }); r1.endHandler(v -> ctx.response().end()); }) .onFailure(e1 -> { ctx.response().setStatusCode(500 ).end(e1.getMessage()); }); }) .onFailure(e -> { ctx.response().setStatusCode(500 ).end("Internal Server Error" ); }); }); httpServer.requestHandler(router).listen(port).onSuccess(t -> { log.info("http server started on port {}" , port); }); TimeUnit.MINUTES.sleep(10 ); List<Route> routes = router.getRoutes(); for (Route next : routes) { String path = next.isExactPath() ? next.getPath() : next.getPath() + "*" ; if ("/api/*" .equals(path)) { next.remove(); } } log.info("remove route /api/*" ); List<Route> routes1 = router.getRoutes(); System.out.println(); }
3.2.6 TCP反向代理 大家口中常说的反向代理,一般都是HTTP反向代理。
万物皆可TCP,我发现市面上居然没有一个轻量的TCP反代工具。
因此,我就简单实现了一个TCP反向代理,已可通打一切HTTP、FTP、SSH、Telnet、数据库连接等基于TCP的顶层协议。
最初我尝试使用传统bio 的形式实现TCP反向代理,源码可查看tcp反向代理
这种做法,反代1个TCP连接,需要使用2个线程,直到TCP连接释放。如果想要在有限的CPU上同时反代更多的TCP连接,这种做法是不可取的。而且这种做法有明显的缺陷,当TCP连接没有读写时,线程处于阻塞等待的状态,这段时间本可以做其他的事。
一个优化做法——事件驱动(多路复用) ,通俗解释就是,线程等待的时间,安排他去做其他类型的事。
有可连接事件时,去连接 有可读事件,去读取 有可写事件,去写入 Vertx就是基于事件驱动的,2个线程即可实现超多连接的处理。代码如下
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private static void tcpReverse () { NetClient netClient = vertx.createNetClient(); NetServer netServer = vertx.createNetServer(); Handler<NetSocket> handler = sourceSocket -> { sourceSocket.pause(); netClient.connect(5432 , "10.0.0.9" ).onSuccess(targetSocket -> { targetSocket.pipeTo(sourceSocket); sourceSocket.pipeTo(targetSocket); sourceSocket.resume(); }); }; Handler<AsyncResult<NetServer>> successHandler = r -> System.out.println("Server listening on port " + port); Handler<Throwable> throwableHandler = Throwable::printStackTrace; netServer.connectHandler(handler).listen(port, host).onComplete(successHandler).onFailure(throwableHandler); }
3.2.7 TCP数据传输问题 当tcp向socket传输数据时,如果对面消费不过来或者接收不过来,那么数据会存在发送方的本地发送缓冲区(也称为发送队列,在vertx的socket中即writeQueue)中。
若对方一直不消费,那么就会出现本地内存崩了的情况。因此在开发过程中,要利用好相关api,如下。
在编程时,更建议使用pipeTo,内部已经实现了如下操作。
1 2 3 4 5 6 7 ws.drainHandler(v -> src.resume()); src.handler(item -> { ws.write(item); if (ws.writeQueueFull()) { src.pause(); } });
3.3 参考致谢