摘要

我之前封装了一个 HTTP 反向代理的工具 http-proxy-boot,主要是方便自己临时测试时使用的。但在测试高并发的转发服务时,遇到了瓶颈。该文主要针对该瓶颈问题,记录一下相关知识。

正文

一、背景

1.1 铺垫知识

涉及到阻塞式 IO、非阻塞式 IO、事件驱动等相关知识,可以查看我以往文章。

1.2 瓶颈解决

我之前使用 java 封装了一个 HTTP 反向代理的工具 meethigher/proxy-servlet: 基于 OkHttp 和 javax.servlet-api 实现的 HTTP 反向代理,主要是方便自己临时测试时使用的。

该工具存在一个问题,高并发的时候,整体响应就会滞后。原因是因为该工具采用 bio 实现的,即“一连接一线程”。

假如我这个工具,只开放了 200 个线程。此时有 201 个请求过来,顺序依次是,200 个慢请求 (10 秒响应),1 个快请求 (1 秒响应)。

实际出现的情况是这 200 个连接 (线程) 都被前面 200 个慢请求阻塞了,以至于后面的快请求一直等待,实际 20+1 秒后才会响应。而我希望的效果是即使我可支配的线程数不变,快请求仍然可以 1 秒响应。

题外话,PostgreSQL 使用的是“一连接一进程”模型,MySQL 使用的是“一连接一线程”模型。

计算机任务有很多种分类,以上属于其中 IO 密集型任务。尽管 CPU 参与任务处理,但大部分时间在等待 IO 完成。这中间的时间,CPU 完全可以用来做更多的事。

我采用的做法是将 bio 切换成 nio,也就是 cpu 不用一直等待,而是去做别的事,当传输完成后,通知 cpu 过来善后。

对于 http 来说,成熟的框架有很多。

其中,我对比了下两者。

  • WebFlux 对 Spring 的生态支持相当友好,可以说是强耦合,健壮的同时也牺牲了些便捷性。
  • Vertx 就相当轻量、简单了。

于是我选择 Vertx 解决这个问题。

二、计算机任务分类

计算机任务可以根据其特性分为几种类型,主要包括:

  1. CPU 密集型
  2. IO 密集型
  3. 内存密集型
  4. 网络密集型

2.1 CPU 密集型

CPU 密集型也叫做计算密集型。

这类任务主要依赖 CPU 的处理能力,通常需要大量的计算和复杂的算法,因此 CPU 长时间处于繁忙状态。典型的例子包括视频编码、科学计算、数据分析等。

优化方式:并行处理,换用更高性能的 CPU。

32 线程的 CPU 同时刻只能跑 32 个线程。像我之前 32 核 64G 的 Win11 可以达到几十万个线程,那是因为其实有很多线程是空闲或者等待中的,CPU 来回轮转,间接达到了上万线程同时运行的效果,比如像下面的这种 IO 密集型任务。

2.2 IO 密集型

主要依赖于输入/输出操作,通常涉及大量的数据读写,如文件操作、网络请求等。

如果你正在上传/下载文件,性能主要取决于网络速度和磁盘写入速度,CPU 大部分时间处于空闲等待状态,等待数据传输完成或外部资源的响应。

该类任务,我们在编程时,可以使线程数远大于 CPU 线程数。

优化方式:减少 IO 操作次数、使用异步编程、换用更高性能硬盘。

2.3 内存密集型

这类任务对内存的使用量很大,例如处理大数据集、复杂的数据结构等。内存访问速度较快,但如果内存不足,可能会导致性能瓶颈。

优化方式:优化数据结构、减少内存占用、增加内存。

2.4 网络密集型

这类任务主要依赖网络带宽和延迟,通常涉及大量的数据在网络上传输,如 Web 服务器、大规模分布式系统等。

优化方式:优化网络协议、使用负载均衡、提高带宽等。

三、Vert.x 框架小记

3.1 介绍

Vert.x 是一个基于事件驱动的非阻塞框架,适合如下

  • HTTP/TCP 等服务器
  • 文件上传/下载

这类 **IO 密集型的高并发 ** 场景。

像这种非阻塞的框架,不只有 vertx,还有很多。

  • Netty:很多非阻塞框架都是基于 Netty 实现。
  • Spring WebFlux:类似于 Vertx,也是基于 Netty,与 Spring 框架强耦合。
  • Play Framework:轻量级的反应式 Web 框架,适合高并发 Web 应用。
  • Akka:基于 Actor 模型的高并发框架,适合分布式系统和容错系统。超重量级。

但是 vertx 各模块之间,没有强耦合,都可以灵活组装使用,这点就特别好,因此我选用 vertx。

在了解 vertx 之前,一定要先阅读下官方这些文档。

3.2 示例

以下示例均基于 Vert.x 4.5.10 版本。

所有代码示例放置在 meethigher/vertx-examples: learn to use vertx

3.2.1 基础知识

单机

vertx 主要有两种线程池

  • WorkerPool:主要用于处理阻塞任务的工作线程池。默认 20 个。
  • EventLoopPool:主要用于处理非阻塞的 IO 操作和定时器的事件循环线程池。默认 2*core 线程。

下面放置一个基础示例,包含阻塞逻辑、事件回调逻辑、定时任务注册与取消逻辑。

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private static void basic() {
    /**
     * workerPoolSize 指定用于处理阻塞任务的工作线程池的大小。默认 20 个
     * eventLoopPoolSize 用于处理事件回调时执行的逻辑。默认 CPU 实际线程的 2 倍。因此在注册事件回调时的逻辑不要阻塞,如果必须要执行阻塞逻辑,就丢给 workerPool
     */
    Vertx tVertx = Vertx.vertx();
    Vertx vertx = Vertx.vertx(new VertxOptions().setEventLoopPoolSize(5).setWorkerPoolSize(1));
    /**
     * 执行过程中,注意线程的名称。
     * 使用 WorkerPool 执行阻塞任务,并使用 EventLoopPool 执行阻塞完成后的回调。
     */
    vertx.executeBlocking(() -> {
                // 模拟阻塞任务的耗时
                log.info("simulate blocking task execution time...");
                TimeUnit.SECONDS.sleep(5);
                if (System.currentTimeMillis() % 2 == 0) {
                    int i = 1 / 0;
                }
                return "Hello World";
            })
            .onComplete(re -> {
                if (re.succeeded()) {
                    log.info("success, result:{}", re.result());
                } else {
                    log.error("failure, result:", re.cause());
                }
            })
    // 以下写法相同
    //onSuccess 和 onFailure 是对 onComplete 更精确的封装,本质还是基于 onComplete。
    //.onSuccess(r -> {
    //    log.info("success, result:{}", r);
    //}).onFailure(e -> {
    //    log.error("failure, result:", e);
    //})
    ;
    // 一次性定时任务
    vertx.setTimer(5000, t -> {
        log.info("timer id:{}", t);
    });
    // 周期性定时任务,并注册取消定时任务的逻辑
    vertx.setPeriodic(5000, t -> {
        log.info("periodic id:{}", t);
        if (System.currentTimeMillis() % 2 == 0) {
            vertx.cancelTimer(t);
            log.info("cancel timer:{}", t);
        }
    });
}

在 vertx 中,还有执行任务的单元 verticle,一个 vertx 实例可以部署多个 verticle。

注意,一个 verticle 由一个 eventloop 执行。如果直接在主线程执行,也是一个 eventloop 执行。一个 eventloop 即一个线程,若需要多线程执行,则需要部署多个 verticle 实例。

如果是 httpserver 部署多个 verticle 实例,每个实例都会注册端口,但是内部采用了端口复用,因此不会出现端口占用问题。可以简单理解成 vertx 是一个代理,负责分发请求,而 verticle 是实际的后端节点。

verticle 的分类有如下

  • 标准 Verticle:使用 EventLoopPool 执行的 Verticle,这也是默认的 Verticle 类型。一个 Verticle 中的所有代码都会在一个事件循环 eventloop (底层对应线程) 中执行。

  • 工作者 Verticle:使用 WorkerPool 执行的 Verticle,这类 Verticle 被设计用来执行阻塞任务,不过阻塞任务也可以不通过工作者 Verticle,直接 executeBlocking 也可。

vertx 支持部署任意语言编写的 verticle,

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
private static void verticle() {
    Vertx vertx = Vertx.vertx();
    // 部署标准 Verticle
    vertx.deployVerticle(new AbstractVerticle() {
        @Override
        public void start() throws Exception {
            log.info("eventLoopVerticle start");
        }

    }).onFailure(e -> {
        log.error("error", e);
    });
    // 部署工作者 Verticle
    vertx.deployVerticle(new AbstractVerticle() {
        @Override
        public void start() throws Exception {
            log.info("workerVerticle start");
        }
    }, new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER)).onFailure(e -> {
        log.error("error", e);
    });
}

在 vertx 中,还有事件总线。这要求事件的发布、消费,都注册在同一个 vertx 实例或者集群中。

java
1
2
3
4
5
6
7
8
private static void eventBus() {
    Vertx vertx = Vertx.vertx();
    vertx.eventBus().consumer("test",t->{
        Object body = t.body();
        log.info("consumer: {}",body.toString());
    });
    vertx.eventBus().publish("test","hello world");
}

集群

vertx 构建集群需要依赖一些分布式服务。比如 hazelcast 或者 redis。下面给出示例

xml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-core</artifactId>
    <version>${vertx.version}</version>
</dependency>
<dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-web</artifactId>
    <version>${vertx.version}</version>
</dependency>
<dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-hazelcast</artifactId>
    <version>${vertx.version}</version>
    <exclusions>
        <exclusion>
            <groupId>com.hazelcast</groupId>
            <artifactId>hazelcast</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>com.hazelcast</groupId>
    <artifactId>hazelcast</artifactId>
    <version>${hazelcast.version}</version>
</dependency>

示例代码

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) throws Exception {
    Config config = new Config();
    config.setClusterName("test");
    HazelcastClusterManager clusterManager = new HazelcastClusterManager(config);
    Future<Vertx> vertxFuture = Vertx.builder()
            .with(new VertxOptions())
            .withClusterManager(clusterManager)
            //.build()// 构建单节点
            .buildClustered();
    // 阻塞直到获取集群中的 vertx 实例
    Vertx vertx = vertxFuture.toCompletionStage().toCompletableFuture().get();
    vertx.eventBus().consumer("test", t -> {
        log.info(t.body().toString());
    });
    vertx.setTimer(20000, t -> {
        vertx.eventBus().publish("test", "现在时间是:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    });
}

当启动两个程序实例时,会发现接收到了两条消息。这说明事件总线发送的消息,已经被集群中所有节点消费了。

image-20241005141030999.png

3.2.2 HttpServer

创建一个 HTTP 服务器,并且注册相应的接口。

结合如下代码理解三个路由

  • /test/test?test=xxx:该接口在等待 10 秒后返回 xxx
  • /halo/*:该接口匹配 /halo/ 下面的目录及子目录,并返回 html 形式的 helloworld。
    • 这点与 SpringWeb 不同,SpringWeb 中 /* 表示当前目录,/** 表示当前目录及子目录。
  • :表示匹配其他所有路径,相当于 /*,并返回 text 形式 helloworld。
  • /static/test.html:表示返回 D:/3Develop/www/test.html 的内容,若该内容不存在,则回转到 /* 路由。

示例代码

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public static void main(String[] args) {
    Vertx vertx = Vertx.vertx();
    vertx.deployVerticle(new AbstractVerticle() {
        @Override
        public void start() throws Exception {
            int port = 4321;
            Router router = Router.router(vertx);

            // order值越小,优先级越高

            //注册接口
            router.route("/test/test").order(1).handler(t -> {
                //接口逻辑
                String testParam = t.request().getParam("test");
                if (testParam == null) {
                    t.response().setStatusCode(400)
                            .end("missing 'test' query parameter");
                } else {
                    vertx.setTimer(10000, tt -> {
                        t.response().putHeader("Content-Type", "text/plain")
                                .end(testParam);
                    });

                }
            });
            //`/*`表示匹配当前目录及子目录。注意与SpringWeb的区别。
            router.route("/halo/*").order(2).handler(t -> {
                // 开启chunked分片传输。content-length与transfer-encoding是矛盾的。
                // content-length需要服务器在内存中计算好内容长度,适用于数据较少时传输;而transfer-encoding则是分块流式传输,适用于大文件传输。
                // nginx自身不能直接开启chunked。需要借助插件或者实际后端服务。
                t.response().setChunked(true);
                t.response().end("hello world " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "\n"
                        + t.request().absoluteURI());
            });
            //不传值表示匹配下面的所有内容
            router.route().order(3).handler(t -> {
                t.response().end("Hello World");
            });

            // 注册静态资源路径,若访问的内容是404,则会回转到/*的路由上
            StaticHandler staticHandler = StaticHandler.create("D:/3Develop/www")
                    .setDirectoryListing(true)
                    .setAlwaysAsyncFS(true)
                    .setIndexPage("index.html");
            router.route("/static/*").order(Integer.MIN_VALUE).handler(staticHandler);

            vertx.createHttpServer(new HttpServerOptions()
                              .setSsl(true)
                              .setKeyCertOptions(new PemKeyCertOptions()//使用自签名证书开启ssl
                                      .addCertPath("/usr/local/nginx/conf/cert/certificate.pem")
                                      .addKeyPath("/usr/local/nginx/conf/cert/private.key")
                              )
                    )
                    //注册路由
                    .requestHandler(router)
                    .listen(port).onComplete(re -> {
                        if (re.succeeded()) {
                            log.info("http server started on port {}", port);
                        } else {
                            log.error("http server failed to start", re.cause());
                        }
                    });
        }
    });
}

3.2.3 HttpClient

应用场景:

有一个接口,http://localhost:4321/test/test?test= 参数 ,该接口 10 秒后会返回 参数 值。

我现在需要在 10 秒左右获取到 2000 次接口的请求响应。

如果使用传统的做法,那么就得需要 2000 个线程,但如果使用 vertx,只需要一个线程即可。

示例代码

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
public static void main(String[] args) throws Exception {

    Vertx vertx = Vertx.vertx(new VertxOptions().setEventLoopPoolSize(1).setWorkerPoolSize(1));
    /**
     * 当你想要一个既能发送 http、又能发送 https,既能 followRedirect 也能不 followRedirect
     * 那么就需要在发送时,使用 io.vertx.core.http.HttpClient#request(io.vertx.core.http.RequestOptions)
     * 否则,建议就使用多个池
     */
    // 创建 HttpClient 时指定的 PoolOptions 里面的 EventLoopSize 不会生效。以 Vertx 的 EventLoopSize 为主。默认 http/1 为 5 并发,http/2 为 1 并发
    HttpClient httpClient = vertx.createHttpClient(new PoolOptions().setHttp2MaxSize(2000).setHttp1MaxSize(2000).setEventLoopSize(2000));
    HttpClient httpsClient = vertx.createHttpClient(new HttpClientOptions()
            .setProtocolVersion(HttpVersion.HTTP_2).setUseAlpn(true)// 若服务器支持 http2,则发送 http2 请求
            .setSsl(true).setTrustAll(true)// 发送 https 请求
            .setConnectTimeout(60000), new PoolOptions().setHttp2MaxSize(2000).setHttp1MaxSize(2000).setEventLoopSize(2000));

    /**
     * 输出当前活着的线程
     */
    vertx.deployVerticle(new AbstractVerticle() {
        @Override
        public void start() throws Exception {
            vertx.setPeriodic(5000, t -> {
                vertx.executeBlocking(() -> {
                    ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
                    int i = threadGroup.activeCount();
                    Thread[] threads = new Thread[i];
                    threadGroup.enumerate(threads);
                    List<String> list = new ArrayList<>();
                    for (Thread thread : threads) {
                        list.add(thread.getName());
                    }
                    // 模拟耗时操作
                    try {
                        log.info("calculating...");
                        TimeUnit.SECONDS.sleep(5);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    return String.join(",", list);
                }).onComplete(r -> {
                    if (r.succeeded()) {
                        log.info("threads:\n{}", r.result());
                    } else {
                        log.warn(r.cause().getMessage(), r.cause());
                    }
                });

            });
        }
    });
    int total = 2000;
    CountDownLatch countDownLatch = new CountDownLatch(total);
    long start = System.currentTimeMillis();
    for (int i = 0; i < total; i++) {
        int finalI = i;
        vertx.deployVerticle(new AbstractVerticle() {
            @Override
            public void start() throws Exception {
                httpClient.request(new RequestOptions().setMethod(HttpMethod.GET).setSsl(true).setHost("reqres.in").setPort(443).setURI("/api/users?page=" + finalI))
                //httpClient.request(HttpMethod.GET, 4321, "localhost", "/test/test?test=" + finalI)
                        //httpsClient.request(HttpMethod.GET, 443, "reqres.in", "/api/users?page=" + finalI)
                        .onComplete(r -> {
                            if (r.succeeded()) {
                                HttpClientRequest request = r.result();
                                log.info("{} send request {}", this, request.absoluteURI());
                                request.putHeader("User-Agent", "I am Vertx");
                                request.send()
                                        .onComplete(r1 -> {
                                            if (r1.succeeded()) {
                                                HttpClientResponse result = r1.result();
                                                log.info("{} received response with status code {}", this, result.statusCode());
                                                // 这种做法其实对内存要求较大,相当于是一次性将内容写到 buffer 里了
                                                result.body().onComplete(re -> {
                                                    if (re.succeeded()) {
                                                        Buffer result1 = re.result();
                                                        log.info("result: {}", result1);
                                                    } else {
                                                        log.warn("warn:", re.cause());
                                                    }
                                                });
                                            } else {
                                                log.error("{} send failed: {}", this, r1.cause().getMessage(), r1.cause());
                                            }

                                            countDownLatch.countDown();
                                        });
                            } else {
                                log.error("request failed: {}", r.cause().getMessage(), r.cause());
                                countDownLatch.countDown();
                            }
                        });
            }
        });
    }
    countDownLatch.await();
    log.info("done {} ms", System.currentTimeMillis()- start);
}

3.2.4 文件系统

示例代码

vertx 中的 FileSystem 提供了许多方法,并且都有阻塞版本和非阻塞版本的 api。如下

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
public static void fileCopy() {
    Vertx vertx = Vertx.vertx();
    FileSystem fs = vertx.fileSystem();
    /**
     * 非阻塞方法
     */
    fs.copy("D:/Downloads/yjwj_2024-06-05-10-50.zip", "D:/Desktop/test.zip")
            .onComplete(re -> {
                if (re.succeeded()) {
                    log.info("copy success");
                } else {
                    log.error("copy failed,", re.cause());
                }
            });
    /**
     * 对应的阻塞方法
     */
    fs.copyBlocking("D:/Downloads/yjwj_2024-06-05-10-50.zip", "D:/Desktop/test1.zip");
    log.info("done");
}

针对 copy 方法,底层是使用的 java.nio.file.Files#copy 方法,这个拷贝起来特别快,因为他使用了 零拷贝 机制。

传统拷贝:硬盘 -> 系统缓冲区 -> 程序缓冲区 -> 系统缓冲区 -> 硬盘

零拷贝:硬盘 -> 系统缓冲区 -> 硬盘

vertx 还提供了 open 方法,实现异步拷贝文件。如下

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
fs.open(sourcePath, new OpenOptions(), ar -> {
    if (ar.succeeded()) {
        AsyncFile sourceFile = ar.result();
        sourceFile.setReadBufferSize(bufferSize);
        sourceFile.pause();
        fs.open(targetPath, new OpenOptions().setTruncateExisting(true), ar1 -> {
            if (ar1.succeeded()) {
                AsyncFile targetFile = ar1.result();
                // 缓冲区空闲回调
                targetFile.drainHandler(v -> {
                    sourceFile.resume();
                });
                sourceFile.handler(buffer -> {
                    targetFile.write(buffer);
                    if (targetFile.writeQueueFull()) {
                        sourceFile.pause();
                    }
                });
                sourceFile.endHandler(v -> {
                    sourceFile.close();
                    targetFile.close();
                });
                sourceFile.resume();
            } else {
                log.error("open targetPath failed", ar1.cause());
            }
        });
    } else {
        log.error("open sourcePath failed", ar.cause());
    }
});

但是相比传统流式拷贝,该做法会占用更多 jvm 内存、耗费更多时间。此处我也在 vertx 的论坛里问过佬了,对方的回答是在执行连续工作时,不建议使用 async 或者 reactive。

image-20241023214114510.jpg

3.2.5 HTTP 反向代理

添加依赖

xml
1
2
3
4
5
<dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-web-proxy</artifactId>
    <version>${vertx.version}</version>
</dependency>

其实这个依赖提供的功能非常简陋,如果想要实现强大的 HTTP 反向代理,需要自己编写代理逻辑。如下给出示例代码

示例代码

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public static void main(String[] args) throws Exception {
    int port = 8080;
    Vertx vertx = Vertx.vertx();
    HttpServer httpServer = vertx.createHttpServer();
    Router router = Router.router(vertx);
    HttpClient httpClient = vertx.createHttpClient();
    HttpClient httpsClient = vertx.createHttpClient(new HttpClientOptions().setSsl(true).setTrustAll(true));

    HttpProxy httpProxy = HttpProxy.reverseProxy(httpClient);
    HttpProxy httpsProxy = HttpProxy.reverseProxy(httpsClient);

    // 默认处理逻辑 ProxyHandler,这个是由 vertx-web-proxy 扩展提供
    router.route("/halo/*").handler(ProxyHandler.create(httpProxy, 4321, "10.0.0.1"));
    router.route().handler(ProxyHandler.create(httpsProxy, 443, "meethigher.top"));

    // 自定义逻辑,自己实现。order 越小,优先级越高
    // /api/* --> https://reqres.in/api/*
    router.route("/api/*").order(Integer.MIN_VALUE).handler(ctx -> {
        HttpServerRequest request = ctx.request();
        String uri = request.uri();
        Route route = ctx.currentRoute();
        String path;
        /**
         * false 表示 /api/* --> https://reqres.in/api/*
         * true 表示 /api/* --> https://requres.in/*
         */
        boolean whole = false;
        if (whole) {
            if (route.getPath().endsWith("/")) {
                int length = route.getPath().length() - 1;
                path = uri.substring(length);
            } else {
                int length = route.getPath().length();
                path = uri.substring(length);
            }
        } else {
            path = request.uri();
        }
        System.out.println(path);
        httpsClient.request(HttpMethod.valueOf(request.method().name()), 443, "reqres.in", path)
                .onSuccess(r -> {
                    r.headers().setAll(request.headers());
                    r.putHeader("Host", "reqres.in");
                    r.send()
                            .onSuccess(r1 -> {
                                ctx.response()
                                        .setStatusCode(r1.statusCode())
                                        .headers().setAll(r1.headers());
                                r1.handler(data -> {
                                    ctx.response().write(data);
                                });
                                r1.endHandler(v -> ctx.response().end());

                            })
                            .onFailure(e1 -> {
                                ctx.response().setStatusCode(500).end(e1.getMessage());
                            });
                })
                .onFailure(e -> {
                    ctx.response().setStatusCode(500).end("Internal Server Error");
                });
    });

    httpServer.requestHandler(router).listen(port).onSuccess(t -> {
        log.info("http server started on port {}", port);
    });

    TimeUnit.MINUTES.sleep(10);
    // 可以在运行时,动态移除路由
    List<Route> routes = router.getRoutes();
    for (Route next : routes) {
        // 首先判断是否是精准匹配地址
        String path = next.isExactPath()? next.getPath() : next.getPath() + "*";
        if ("/api/*".equals(path)) {
            next.remove();
        }
    }
    log.info("remove route /api/*");
    List<Route> routes1 = router.getRoutes();
    System.out.println();
}

3.2.6 TCP 反向代理

大家口中常说的反向代理,一般都是 HTTP 反向代理。

我简单实现了一个 TCP 反向代理,源码可查看 tcp 反向代理 top.meethigher.proxy.tcp.SimpleReverseTcpProxy。我是使用传统 bio 的形式,这种做法,反代 1 个 TCP 连接,需要使用 2 个线程,直到 TCP 连接释放。如果想要在有限的 CPU 上同时反代更多的 TCP 连接,这种做法是不可取的。而且这种做法有明显的缺陷,当 TCP 连接没有读写时,线程处于阻塞等待的状态,这段时间本可以做其他的事。

一个优化做法——事件驱动 (多路复用),通俗解释就是,线程等待的时间,安排他去做其他类型的事。

  • 有可连接事件时,去连接
  • 有可读事件,去读取
  • 有可写事件,去写入

Vertx 就是基于事件驱动的,2 个线程即可实现超多连接的处理。代码如下

示例代码

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
private static void tcpReverse() {
    NetClient netClient = vertx.createNetClient();
    NetServer netServer = vertx.createNetServer();
    Handler<NetSocket> handler = sourceSocket -> {
        sourceSocket.pause();
        netClient.connect(5432, "10.0.0.9").onSuccess(targetSocket -> {
            targetSocket.pipeTo(sourceSocket);
            sourceSocket.pipeTo(targetSocket);
            sourceSocket.resume();
        });
    };
    Handler<AsyncResult<NetServer>> successHandler = r -> System.out.println("Server listening on port" + port);
    Handler<Throwable> throwableHandler = Throwable::printStackTrace;

    netServer.connectHandler(handler).listen(port, host).onComplete(successHandler).onFailure(throwableHandler);
}

3.2.7 TCP 数据传输问题

当 tcp 向 socket 传输数据时,如果对面消费不过来或者接收不过来,那么数据会存储在发送方的本地发送缓冲区(也称为发送队列,在 vertx 的 socket 中即 writeQueue)中。

若对方一直不消费,那么就会出现本地内存崩了的情况。因此在开发过程中,要利用好相关 api,如下。

在编程时,更建议使用 pipeTo,内部已经实现了如下操作。

这其实就是一种背压/反压(Backpressure)机制。当下游处理能力不足,来不及处理上游发送过来的数据时,下游会以某种方式通知或限制上游减速或暂停发送数据,这个机制就叫做“背压”。

java
1
2
3
4
5
6
7
ws.drainHandler(v -> src.resume());
src.handler(item -> {
    ws.write(item);
    if (ws.writeQueueFull()) {
        src.pause();
    }
});

3.2.8 TCP 通信-自定义协议

当我们直接使用 TCP 进行通信时,如果双方没有一个明确通信协议的话,就会出现粘包/半包的问题。

下面记录一个简单的自定义协议。

sh
1
2
|  4 字节(消息长度) |  2 字节(消息类型) |  变长(消息体)  |
|      0x0010      |      0x0001       |   {"id":1, "msg":"Hello"}  |

序列化和反序列化就使用probotufprotobuf-javalite 保留了大部分常见的基本功能,如序列化、反序列化、消息对象的构建等。然而,去除了一些 Protobuf 的高级特性,如流式解析、扩展字段、未知字段、反射支持等,适用于不需要这些特性的应用场景。以下两个依赖,按需引入其中一个即可。

xml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>4.30.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-javalite -->
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-javalite</artifactId>
    <version>4.30.2</version>
</dependency>

另外,需要在编译前,将 .proto 转为 .java,我建议使用命令,轻量便捷。

sh
1
2
3
4
5
6
7
# 在任意目录下
protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/addressbook.proto
# 在当前应用目录下
protoc --java_out=${OUTPUT_DIR} path/to/your/proto/file
# 注意,如果你使用的是javalite,那么在生成时,命令需要更换为如下
# 参考 https://github.com/protocolbuffers/protobuf/blob/main/java/lite.md
protoc --java_out=lite:${OUTPUT_DIR} path/to/your/proto/file

示例代码参考meethigher/vertx-examples

3.2.9 TCP 通信-自定义解码逻辑

在上面的自定义协议中,规范了编码逻辑。但是在数据接收时,就需要编写解码逻辑实现解码。

此处我们可以使用RecordParser来实现解码。RecordParser支持定长消息、固定分隔符消息。

针对预设长度的消息,我使用的自定义解码逻辑。代码如下

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;

/**
 * 自定义消息结构解析器
 * [4字节长度+2字节类型+protobuf变长消息体]
 */
public class ProtoParser implements Handler<Buffer> {

    private Buffer buf = Buffer.buffer();

    /**
     * 预设长度起始位置
     */
    private final int lengthFieldOffset = 0;

    /**
     * 预设长度占用的字节数
     */
    private final int lengthFieldLength = 4;
    /**
     * 消息类型起始位置
     */
    private final int typeFieldOffset = 4;

    /**
     * 消息类型占用的字节数
     */
    private final int typeFieldLength = 2;

    /**
     * 消息体起始位置
     */
    private final int bodyFieldOffset = 6;

    private final Handler<Buffer> outputHandler;

    public ProtoParser(Handler<Buffer> outputHandler) {
        this.outputHandler = outputHandler;
    }

    @Override
    public void handle(Buffer buffer) {
        buf.appendBuffer(buffer);
        if (buf.length() < lengthFieldLength) {
            return;
        } else {
            int totalLength = buf.getInt(lengthFieldOffset);
            if (buf.length() < totalLength) {
                return;
            } else {
                outputHandler.handle(buf.getBuffer(4, totalLength));
                buf = buf.getBuffer(totalLength, buf.length());
            }
        }

    }
}

运行示例

java
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;
import top.meethigher.example13.ProtoParser;

import java.util.concurrent.ThreadLocalRandom;

public class Example13 {

    public static void main(String[] args) {
        diyRecord();
    }

    /**
     * 使用换行符作为分隔符,表示消息的结尾
     * 示例消息内容
     * <pre>
     * [...\n][...\n]
     * </pre>
     */
    public static void lineBasedRecord() {
        Handler<Buffer> output = h -> {
            System.out.println(h.toString());
        };
        final RecordParser parser = RecordParser.newDelimited("\n", output);

        // parser.handle(Buffer.buffer("HELLO\nHOW ARE Y"));
        // parser.handle(Buffer.buffer("OU?\nI AM"));
        // parser.handle(Buffer.buffer("DOING OK"));
        // parser.handle(Buffer.buffer("\n"));

        Buffer buffer = Buffer.buffer("Hello!\nHow are you?\nI am fine! Thank you!\n");
        // 模拟网络传输中的粘包半包问题
        int tStart = 0, tEnd = 0;
        for (int i = 0; i < buffer.length(); i++) {
            // 前闭后开
            tEnd = ThreadLocalRandom.current().nextInt(tStart + 1, buffer.length() + 1);
            if (tEnd > buffer.length()) {
                tEnd = buffer.length();
            }
            // 前闭后开
            Buffer tb = buffer.getBuffer(tStart, tEnd);
            parser.handle(tb);
            tStart = tEnd;

            if (tStart >= buffer.length()) {
                break;
            }
        }
    }


    /**
     * 定长消息结构。假如我固定长度为3(一个中文刚好字节占3)。示例消息内容
     * 你好啊世界
     */
    public static void fixedLengthRecord() {
        RecordParser parser = RecordParser.newFixed(3, b -> {
            System.out.println(b);
        });
        parser.handle(Buffer.buffer("你好啊世界"));
    }


    /**
     * 自定义消息结构。示例消息内容
     * <pre>
     * [4字节长度+2字节类型+protobuf变长消息体][4字节长度+2字节类型+protobuf变长消息体]
     * </pre>
     */
    public static void diyRecord() {
        ProtoParser parser = new ProtoParser(b -> {
            short type = b.getShort(0);
            Buffer buffer = b.getBuffer(2, b.length());
            System.out.println("消息类型=" + type + ", 消息内容=" + buffer.toString());
        });

        // 模拟发送的消息
        Buffer buffer = Buffer.buffer();
        Buffer body = Buffer.buffer("你好,世界!");
        // 网络传输都使用大端
        // 模拟第一条消息
        buffer.appendInt(4 + 2 + body.length());
        buffer.appendShort((short) 1);
        buffer.appendBytes(body.getBytes());
        // 模拟第二条消息
        Buffer body1 = Buffer.buffer("hello, world! ");
        buffer.appendInt(4 + 2 + body1.length());
        buffer.appendShort((short) 2);
        buffer.appendBytes(body1.getBytes());
        buffer.appendBytes(body1.getBytes());


        // 模拟网络传输中的粘包半包问题
        int tStart = 0, tEnd = 0;
        for (int i = 0; i < buffer.length(); i++) {
            // 前闭后开
            tEnd = ThreadLocalRandom.current().nextInt(tStart + 1, buffer.length() + 1);
            if (tEnd > buffer.length()) {
                tEnd = buffer.length();
            }
            // 前闭后开
            Buffer tb = buffer.getBuffer(tStart, tEnd);
            parser.handle(tb);
            tStart = tEnd;

            if (tStart >= buffer.length()) {
                break;
            }
        }

    }
}

3.2.10 TCP 通信-Client自动重连

我需要让 NetClient 一直与服务端保持一个活跃的连接。除了使用心跳外,针对一些被关闭的情况,需要添加自动重连机制。

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import io.vertx.core.Vertx;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;

public class Example14 {


    private static void startServer() {
        Vertx vertx = Vertx.vertx();
        vertx.createNetServer().connectHandler(socket -> {
            socket.close();
        }).listen(8080).onFailure(e -> {
            e.printStackTrace();
            System.exit(1);
        });
    }

    /**
     * 实现client的重连机制
     */
    private static void startClient() {
        Vertx vertx = Vertx.vertx();
        NetClient netClient = vertx.createNetClient();
        connect(vertx, netClient);
    }

    private static void connect(Vertx vertx, NetClient netClient) {
        netClient.connect(8080, "127.0.0.1")
                .onComplete(ar -> {
                    if (ar.succeeded()) {
                        NetSocket socket = ar.result();
                        socket.pause();

                        reconnectDelay = MIN_DELAY;
                        System.out.println("重连成功");

                        socket.closeHandler(t -> {
                            System.out.println("连接被关闭咯");
                            reconnect(vertx, netClient);
                        });
                        socket.resume();
                    } else {
                        ar.cause().printStackTrace();
                        reconnect(vertx, netClient);
                    }
                });
    }


    private static final long MIN_DELAY = 3000;
    private static final long MAX_DELAY = 60000;
    private static long reconnectDelay = MIN_DELAY;

    /**
     * 自动重连: 指数退避策略
     */
    private static void reconnect(Vertx vertx, NetClient netClient) {
        vertx.setTimer(reconnectDelay, id -> {
            System.out.println("开始重连");
            connect(vertx, netClient);
            reconnectDelay = Math.min(reconnectDelay * 2, MAX_DELAY);
        });
    }


    public static void main(String[] args) {
        startServer();
        startClient();
    }
}

以上只是一个简单的例子,其实还可以封装的更优雅。可以参考这个的底层源码

3.2.11 手撕内网穿透FRP

时序图

image-20250405152713664.jpg

源码地址手撕FRP

3.3 参考致谢