摘要
我之前封装了一个 HTTP 反向代理的工具 http-proxy-boot ,主要是方便自己临时测试时使用的。但在测试高并发的转发服务时,遇到了瓶颈。该文主要针对该瓶颈问题,记录一下相关知识。
正文
一、背景 1.1 铺垫知识 涉及到阻塞式 IO、非阻塞式 IO、事件驱动等相关知识,可以查看我以往文章。
1.2 瓶颈解决 我之前使用 java 封装了一个 HTTP 反向代理的工具 meethigher/proxy-servlet: 基于 OkHttp 和 javax.servlet-api 实现的 HTTP 反向代理 ,主要是方便自己临时测试时使用的。
该工具存在一个问题,高并发的时候,整体响应就会滞后。原因是因为该工具采用 bio 实现的,即“一连接一线程”。
假如我这个工具,只开放了 200 个线程。此时有 201 个请求过来,顺序依次是,200 个慢请求 (10 秒响应),1 个快请求 (1 秒响应)。
实际出现的情况是这 200 个连接 (线程) 都被前面 200 个慢请求阻塞了,以至于后面的快请求一直等待,实际 20+1 秒后才会响应。而我希望的效果是即使我可支配的线程数不变,快请求仍然可以 1 秒响应。
题外话,PostgreSQL 使用的是“一连接一进程”模型,MySQL 使用的是“一连接一线程”模型。
计算机任务有很多种分类,以上属于其中 IO 密集型任务。尽管 CPU 参与任务处理,但大部分时间在等待 IO 完成。这中间的时间,CPU 完全可以用来做更多的事。
我采用的做法是将 bio 切换成 nio,也就是 cpu 不用一直等待,而是去做别的事,当传输完成后,通知 cpu 过来善后。
对于 http 来说,成熟的框架有很多。
其中,我对比了下两者。
WebFlux 对 Spring 的生态支持相当友好,可以说是强耦合,健壮的同时也牺牲了些便捷性。 Vertx 就相当轻量、简单了。 于是我选择 Vertx 解决这个问题。
二、计算机任务分类 计算机任务可以根据其特性分为几种类型,主要包括:
CPU 密集型 IO 密集型 内存密集型 网络密集型 2.1 CPU 密集型 CPU 密集型也叫做计算密集型。
这类任务主要依赖 CPU 的处理能力,通常需要大量的计算和复杂的算法,因此 CPU 长时间处于繁忙状态。典型的例子包括视频编码、科学计算、数据分析等。
优化方式:并行处理,换用更高性能的 CPU。
32 线程的 CPU 同时刻只能跑 32 个线程。像我之前 32 核 64G 的 Win11 可以达到几十万个线程,那是因为其实有很多线程是空闲或者等待中的,CPU 来回轮转,间接达到了上万线程同时运行的效果,比如像下面的这种 IO 密集型任务。
2.2 IO 密集型 主要依赖于输入/输出操作,通常涉及大量的数据读写,如文件操作、网络请求等。
如果你正在上传/下载文件,性能主要取决于网络速度和磁盘写入速度,CPU 大部分时间处于空闲等待状态,等待数据传输完成或外部资源的响应。
该类任务,我们在编程时,可以使线程数远大于 CPU 线程数。
优化方式:减少 IO 操作次数、使用异步编程、换用更高性能硬盘。
2.3 内存密集型 这类任务对内存的使用量很大,例如处理大数据集、复杂的数据结构等。内存访问速度较快,但如果内存不足,可能会导致性能瓶颈。
优化方式:优化数据结构、减少内存占用、增加内存。
2.4 网络密集型 这类任务主要依赖网络带宽和延迟,通常涉及大量的数据在网络上传输,如 Web 服务器、大规模分布式系统等。
优化方式:优化网络协议、使用负载均衡、提高带宽等。
三、Vert.x 框架小记 3.1 介绍 Vert.x 是一个基于事件驱动的非阻塞框架,适合如下
这类 **IO 密集型的高并发 ** 场景。
像这种非阻塞的框架,不只有 vertx,还有很多。
Netty:很多非阻塞框架都是基于 Netty 实现。 Spring WebFlux:类似于 Vertx,也是基于 Netty,与 Spring 框架强耦合。 Play Framework:轻量级的反应式 Web 框架,适合高并发 Web 应用。 Akka:基于 Actor 模型的高并发框架,适合分布式系统和容错系统。超重量级。 但是 vertx 各模块之间,没有强耦合,都可以灵活组装使用,这点就特别好,因此我选用 vertx。
在了解 vertx 之前,一定要先阅读下官方这些文档。
3.2 示例 以下示例均基于 Vert.x 4.5.10 版本。
所有代码示例放置在 meethigher/vertx-examples: learn to use vertx
3.2.1 基础知识 单机 vertx 主要有两种线程池
WorkerPool:主要用于处理阻塞任务的工作线程池。默认 20 个。 EventLoopPool:主要用于处理非阻塞的 IO 操作和定时器的事件循环线程池。默认 2*core 线程。 下面放置一个基础示例,包含阻塞逻辑、事件回调逻辑、定时任务注册与取消逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private static void basic () {
/**
* workerPoolSize 指定用于处理阻塞任务的工作线程池的大小。默认 20 个
* eventLoopPoolSize 用于处理事件回调时执行的逻辑。默认 CPU 实际线程的 2 倍。因此在注册事件回调时的逻辑不要阻塞,如果必须要执行阻塞逻辑,就丢给 workerPool
*/
Vertx tVertx = Vertx . vertx ();
Vertx vertx = Vertx . vertx ( new VertxOptions (). setEventLoopPoolSize ( 5 ). setWorkerPoolSize ( 1 ));
/**
* 执行过程中,注意线程的名称。
* 使用 WorkerPool 执行阻塞任务,并使用 EventLoopPool 执行阻塞完成后的回调。
*/
vertx . executeBlocking (() -> {
// 模拟阻塞任务的耗时
log . info ( "simulate blocking task execution time..." );
TimeUnit . SECONDS . sleep ( 5 );
if ( System . currentTimeMillis () % 2 == 0 ) {
int i = 1 / 0 ;
}
return "Hello World" ;
})
. onComplete ( re -> {
if ( re . succeeded ()) {
log . info ( "success, result:{}" , re . result ());
} else {
log . error ( "failure, result:" , re . cause ());
}
})
// 以下写法相同
//onSuccess 和 onFailure 是对 onComplete 更精确的封装,本质还是基于 onComplete。
//.onSuccess(r -> {
// log.info("success, result:{}", r);
//}).onFailure(e -> {
// log.error("failure, result:", e);
//})
;
// 一次性定时任务
vertx . setTimer ( 5000 , t -> {
log . info ( "timer id:{}" , t );
});
// 周期性定时任务,并注册取消定时任务的逻辑
vertx . setPeriodic ( 5000 , t -> {
log . info ( "periodic id:{}" , t );
if ( System . currentTimeMillis () % 2 == 0 ) {
vertx . cancelTimer ( t );
log . info ( "cancel timer:{}" , t );
}
});
}
在 vertx 中,还有执行任务的单元 verticle,一个 vertx 实例可以部署多个 verticle。
注意,一个 verticle 由一个 eventloop 执行。如果直接在主线程执行,也是一个 eventloop 执行。一个 eventloop 即一个线程,若需要多线程执行,则需要部署多个 verticle 实例。
如果是 httpserver 部署多个 verticle 实例,每个实例都会注册端口,但是内部采用了端口复用,因此不会出现端口占用问题。可以简单理解成 vertx 是一个代理,负责分发请求,而 verticle 是实际的后端节点。
verticle 的分类有如下
标准 Verticle:使用 EventLoopPool 执行的 Verticle,这也是默认的 Verticle 类型。一个 Verticle 中的所有代码都会在一个事件循环 eventloop (底层对应线程) 中执行。
工作者 Verticle:使用 WorkerPool 执行的 Verticle,这类 Verticle 被设计用来执行阻塞任务,不过阻塞任务也可以不通过工作者 Verticle,直接 executeBlocking 也可。
vertx 支持部署任意语言编写的 verticle,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private static void verticle () {
Vertx vertx = Vertx . vertx ();
// 部署标准 Verticle
vertx . deployVerticle ( new AbstractVerticle () {
@Override
public void start () throws Exception {
log . info ( "eventLoopVerticle start" );
}
}). onFailure ( e -> {
log . error ( "error" , e );
});
// 部署工作者 Verticle
vertx . deployVerticle ( new AbstractVerticle () {
@Override
public void start () throws Exception {
log . info ( "workerVerticle start" );
}
}, new DeploymentOptions (). setThreadingModel ( ThreadingModel . WORKER )). onFailure ( e -> {
log . error ( "error" , e );
});
}
在 vertx 中,还有事件总线。这要求事件的发布、消费,都注册在同一个 vertx 实例或者集群中。
1
2
3
4
5
6
7
8
private static void eventBus () {
Vertx vertx = Vertx . vertx ();
vertx . eventBus (). consumer ( "test" , t -> {
Object body = t . body ();
log . info ( "consumer: {}" , body . toString ());
});
vertx . eventBus (). publish ( "test" , "hello world" );
}
集群 vertx 构建集群需要依赖一些分布式服务。比如 hazelcast 或者 redis。下面给出示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<dependency>
<groupId> io.vertx</groupId>
<artifactId> vertx-core</artifactId>
<version> ${vertx.version}</version>
</dependency>
<dependency>
<groupId> io.vertx</groupId>
<artifactId> vertx-web</artifactId>
<version> ${vertx.version}</version>
</dependency>
<dependency>
<groupId> io.vertx</groupId>
<artifactId> vertx-hazelcast</artifactId>
<version> ${vertx.version}</version>
<exclusions>
<exclusion>
<groupId> com.hazelcast</groupId>
<artifactId> hazelcast</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId> com.hazelcast</groupId>
<artifactId> hazelcast</artifactId>
<version> ${hazelcast.version}</version>
</dependency>
示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main ( String [] args ) throws Exception {
Config config = new Config ();
config . setClusterName ( "test" );
HazelcastClusterManager clusterManager = new HazelcastClusterManager ( config );
Future < Vertx > vertxFuture = Vertx . builder ()
. with ( new VertxOptions ())
. withClusterManager ( clusterManager )
//.build()// 构建单节点
. buildClustered ();
// 阻塞直到获取集群中的 vertx 实例
Vertx vertx = vertxFuture . toCompletionStage (). toCompletableFuture (). get ();
vertx . eventBus (). consumer ( "test" , t -> {
log . info ( t . body (). toString ());
});
vertx . setTimer ( 20000 , t -> {
vertx . eventBus (). publish ( "test" , "现在时间是:" + new SimpleDateFormat ( "yyyy-MM-dd HH:mm:ss" ). format ( new Date ()));
});
}
当启动两个程序实例时,会发现接收到了两条消息。这说明事件总线发送的消息,已经被集群中所有节点消费了。
展开
3.2.2 HttpServer 创建一个 HTTP 服务器,并且注册相应的接口。
结合如下代码理解三个路由
/test/test?test=xxx:该接口在等待 10 秒后返回 xxx/halo/*:该接口匹配 /halo/ 下面的目录及子目录,并返回 html 形式的 helloworld。这点与 SpringWeb 不同,SpringWeb 中 /* 表示当前目录,/** 表示当前目录及子目录。 空:表示匹配其他所有路径,相当于 /*,并返回 text 形式 helloworld。/static/test.html:表示返回 D:/3Develop/www/test.html 的内容,若该内容不存在,则回转到 /* 路由。示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public static void main ( String [] args ) {
Vertx vertx = Vertx . vertx ();
vertx . deployVerticle ( new AbstractVerticle () {
@Override
public void start () throws Exception {
int port = 4321 ;
Router router = Router . router ( vertx );
// order值越小,优先级越高
//注册接口
router . route ( "/test/test" ). order ( 1 ). handler ( t -> {
//接口逻辑
String testParam = t . request (). getParam ( "test" );
if ( testParam == null ) {
t . response (). setStatusCode ( 400 )
. end ( "missing 'test' query parameter" );
} else {
vertx . setTimer ( 10000 , tt -> {
t . response (). putHeader ( "Content-Type" , "text/plain" )
. end ( testParam );
});
}
});
//`/*`表示匹配当前目录及子目录。注意与SpringWeb的区别。
router . route ( "/halo/*" ). order ( 2 ). handler ( t -> {
// 开启chunked分片传输。content-length与transfer-encoding是矛盾的。
// content-length需要服务器在内存中计算好内容长度,适用于数据较少时传输;而transfer-encoding则是分块流式传输,适用于大文件传输。
// nginx自身不能直接开启chunked。需要借助插件或者实际后端服务。
t . response (). setChunked ( true );
t . response (). end ( "hello world " + new SimpleDateFormat ( "yyyy-MM-dd HH:mm:ss" ). format ( new Date ()) + "\n"
+ t . request (). absoluteURI ());
});
//不传值表示匹配下面的所有内容
router . route (). order ( 3 ). handler ( t -> {
t . response (). end ( "Hello World" );
});
// 注册静态资源路径,若访问的内容是404,则会回转到/*的路由上
StaticHandler staticHandler = StaticHandler . create ( "D:/3Develop/www" )
. setDirectoryListing ( true )
. setAlwaysAsyncFS ( true )
. setIndexPage ( "index.html" );
router . route ( "/static/*" ). order ( Integer . MIN_VALUE ). handler ( staticHandler );
vertx . createHttpServer ( new HttpServerOptions ()
. setSsl ( true )
. setKeyCertOptions ( new PemKeyCertOptions () //使用自签名证书开启ssl
. addCertPath ( "/usr/local/nginx/conf/cert/certificate.pem" )
. addKeyPath ( "/usr/local/nginx/conf/cert/private.key" )
)
)
//注册路由
. requestHandler ( router )
. listen ( port ). onComplete ( re -> {
if ( re . succeeded ()) {
log . info ( "http server started on port {}" , port );
} else {
log . error ( "http server failed to start" , re . cause ());
}
});
}
});
}
3.2.3 HttpClient 应用场景:
有一个接口,http://localhost:4321/test/test?test= 参数 ,该接口 10 秒后会返回 参数 值。
我现在需要在 10 秒左右获取到 2000 次接口的请求响应。
如果使用传统的做法,那么就得需要 2000 个线程,但如果使用 vertx,只需要一个线程即可。
示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
public static void main ( String [] args ) throws Exception {
Vertx vertx = Vertx . vertx ( new VertxOptions (). setEventLoopPoolSize ( 1 ). setWorkerPoolSize ( 1 ));
/**
* 当你想要一个既能发送 http、又能发送 https,既能 followRedirect 也能不 followRedirect
* 那么就需要在发送时,使用 io.vertx.core.http.HttpClient#request(io.vertx.core.http.RequestOptions)
* 否则,建议就使用多个池
*/
// 创建 HttpClient 时指定的 PoolOptions 里面的 EventLoopSize 不会生效。以 Vertx 的 EventLoopSize 为主。默认 http/1 为 5 并发,http/2 为 1 并发
HttpClient httpClient = vertx . createHttpClient ( new PoolOptions (). setHttp2MaxSize ( 2000 ). setHttp1MaxSize ( 2000 ). setEventLoopSize ( 2000 ));
HttpClient httpsClient = vertx . createHttpClient ( new HttpClientOptions ()
. setProtocolVersion ( HttpVersion . HTTP_2 ). setUseAlpn ( true ) // 若服务器支持 http2,则发送 http2 请求
. setSsl ( true ). setTrustAll ( true ) // 发送 https 请求
. setConnectTimeout ( 60000 ), new PoolOptions (). setHttp2MaxSize ( 2000 ). setHttp1MaxSize ( 2000 ). setEventLoopSize ( 2000 ));
/**
* 输出当前活着的线程
*/
vertx . deployVerticle ( new AbstractVerticle () {
@Override
public void start () throws Exception {
vertx . setPeriodic ( 5000 , t -> {
vertx . executeBlocking (() -> {
ThreadGroup threadGroup = Thread . currentThread (). getThreadGroup ();
int i = threadGroup . activeCount ();
Thread [] threads = new Thread [ i ] ;
threadGroup . enumerate ( threads );
List < String > list = new ArrayList <> ();
for ( Thread thread : threads ) {
list . add ( thread . getName ());
}
// 模拟耗时操作
try {
log . info ( "calculating..." );
TimeUnit . SECONDS . sleep ( 5 );
} catch ( InterruptedException e ) {
throw new RuntimeException ( e );
}
return String . join ( "," , list );
}). onComplete ( r -> {
if ( r . succeeded ()) {
log . info ( "threads:\n{}" , r . result ());
} else {
log . warn ( r . cause (). getMessage (), r . cause ());
}
});
});
}
});
int total = 2000 ;
CountDownLatch countDownLatch = new CountDownLatch ( total );
long start = System . currentTimeMillis ();
for ( int i = 0 ; i < total ; i ++ ) {
int finalI = i ;
vertx . deployVerticle ( new AbstractVerticle () {
@Override
public void start () throws Exception {
httpClient . request ( new RequestOptions (). setMethod ( HttpMethod . GET ). setSsl ( true ). setHost ( "reqres.in" ). setPort ( 443 ). setURI ( "/api/users?page=" + finalI ))
//httpClient.request(HttpMethod.GET, 4321, "localhost", "/test/test?test=" + finalI)
//httpsClient.request(HttpMethod.GET, 443, "reqres.in", "/api/users?page=" + finalI)
. onComplete ( r -> {
if ( r . succeeded ()) {
HttpClientRequest request = r . result ();
log . info ( "{} send request {}" , this , request . absoluteURI ());
request . putHeader ( "User-Agent" , "I am Vertx" );
request . send ()
. onComplete ( r1 -> {
if ( r1 . succeeded ()) {
HttpClientResponse result = r1 . result ();
log . info ( "{} received response with status code {}" , this , result . statusCode ());
// 这种做法其实对内存要求较大,相当于是一次性将内容写到 buffer 里了
result . body (). onComplete ( re -> {
if ( re . succeeded ()) {
Buffer result1 = re . result ();
log . info ( "result: {}" , result1 );
} else {
log . warn ( "warn:" , re . cause ());
}
});
} else {
log . error ( "{} send failed: {}" , this , r1 . cause (). getMessage (), r1 . cause ());
}
countDownLatch . countDown ();
});
} else {
log . error ( "request failed: {}" , r . cause (). getMessage (), r . cause ());
countDownLatch . countDown ();
}
});
}
});
}
countDownLatch . await ();
log . info ( "done {} ms" , System . currentTimeMillis () - start );
}
3.2.4 文件系统 示例代码
vertx 中的 FileSystem 提供了许多方法,并且都有阻塞版本和非阻塞版本的 api。如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void fileCopy () {
Vertx vertx = Vertx . vertx ();
FileSystem fs = vertx . fileSystem ();
/**
* 非阻塞方法
*/
fs . copy ( "D:/Downloads/yjwj_2024-06-05-10-50.zip" , "D:/Desktop/test.zip" )
. onComplete ( re -> {
if ( re . succeeded ()) {
log . info ( "copy success" );
} else {
log . error ( "copy failed," , re . cause ());
}
});
/**
* 对应的阻塞方法
*/
fs . copyBlocking ( "D:/Downloads/yjwj_2024-06-05-10-50.zip" , "D:/Desktop/test1.zip" );
log . info ( "done" );
}
针对 copy 方法,底层是使用的 java.nio.file.Files#copy 方法,这个拷贝起来特别快,因为他使用了 零拷贝 机制。
传统拷贝:硬盘 -> 系统缓冲区 -> 程序缓冲区 -> 系统缓冲区 -> 硬盘
零拷贝:硬盘 -> 系统缓冲区 -> 硬盘
vertx 还提供了 open 方法,实现异步拷贝文件。如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
fs . open ( sourcePath , new OpenOptions (), ar -> {
if ( ar . succeeded ()) {
AsyncFile sourceFile = ar . result ();
sourceFile . setReadBufferSize ( bufferSize );
sourceFile . pause ();
fs . open ( targetPath , new OpenOptions (). setTruncateExisting ( true ), ar1 -> {
if ( ar1 . succeeded ()) {
AsyncFile targetFile = ar1 . result ();
// 缓冲区空闲回调
targetFile . drainHandler ( v -> {
sourceFile . resume ();
});
sourceFile . handler ( buffer -> {
targetFile . write ( buffer );
if ( targetFile . writeQueueFull ()) {
sourceFile . pause ();
}
});
sourceFile . endHandler ( v -> {
sourceFile . close ();
targetFile . close ();
});
sourceFile . resume ();
} else {
log . error ( "open targetPath failed" , ar1 . cause ());
}
});
} else {
log . error ( "open sourcePath failed" , ar . cause ());
}
});
但是相比传统流式拷贝,该做法会占用更多 jvm 内存、耗费更多时间。此处我也在 vertx 的论坛里问过佬了,对方的回答是在执行连续工作时,不建议使用 async 或者 reactive。
展开
3.2.5 HTTP 反向代理 添加依赖
1
2
3
4
5
<dependency>
<groupId> io.vertx</groupId>
<artifactId> vertx-web-proxy</artifactId>
<version> ${vertx.version}</version>
</dependency>
其实这个依赖提供的功能非常简陋,如果想要实现强大的 HTTP 反向代理,需要自己编写代理逻辑。如下给出示例代码
示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public static void main ( String [] args ) throws Exception {
int port = 8080 ;
Vertx vertx = Vertx . vertx ();
HttpServer httpServer = vertx . createHttpServer ();
Router router = Router . router ( vertx );
HttpClient httpClient = vertx . createHttpClient ();
HttpClient httpsClient = vertx . createHttpClient ( new HttpClientOptions (). setSsl ( true ). setTrustAll ( true ));
HttpProxy httpProxy = HttpProxy . reverseProxy ( httpClient );
HttpProxy httpsProxy = HttpProxy . reverseProxy ( httpsClient );
// 默认处理逻辑 ProxyHandler,这个是由 vertx-web-proxy 扩展提供
router . route ( "/halo/*" ). handler ( ProxyHandler . create ( httpProxy , 4321 , "10.0.0.1" ));
router . route (). handler ( ProxyHandler . create ( httpsProxy , 443 , "meethigher.top" ));
// 自定义逻辑,自己实现。order 越小,优先级越高
// /api/* --> https://reqres.in/api/*
router . route ( "/api/*" ). order ( Integer . MIN_VALUE ). handler ( ctx -> {
HttpServerRequest request = ctx . request ();
String uri = request . uri ();
Route route = ctx . currentRoute ();
String path ;
/**
* false 表示 /api/* --> https://reqres.in/api/*
* true 表示 /api/* --> https://requres.in/*
*/
boolean whole = false ;
if ( whole ) {
if ( route . getPath (). endsWith ( "/" )) {
int length = route . getPath (). length () - 1 ;
path = uri . substring ( length );
} else {
int length = route . getPath (). length ();
path = uri . substring ( length );
}
} else {
path = request . uri ();
}
System . out . println ( path );
httpsClient . request ( HttpMethod . valueOf ( request . method (). name ()), 443 , "reqres.in" , path )
. onSuccess ( r -> {
r . headers (). setAll ( request . headers ());
r . putHeader ( "Host" , "reqres.in" );
r . send ()
. onSuccess ( r1 -> {
ctx . response ()
. setStatusCode ( r1 . statusCode ())
. headers (). setAll ( r1 . headers ());
r1 . handler ( data -> {
ctx . response (). write ( data );
});
r1 . endHandler ( v -> ctx . response (). end ());
})
. onFailure ( e1 -> {
ctx . response (). setStatusCode ( 500 ). end ( e1 . getMessage ());
});
})
. onFailure ( e -> {
ctx . response (). setStatusCode ( 500 ). end ( "Internal Server Error" );
});
});
httpServer . requestHandler ( router ). listen ( port ). onSuccess ( t -> {
log . info ( "http server started on port {}" , port );
});
TimeUnit . MINUTES . sleep ( 10 );
// 可以在运行时,动态移除路由
List < Route > routes = router . getRoutes ();
for ( Route next : routes ) {
// 首先判断是否是精准匹配地址
String path = next . isExactPath () ? next . getPath () : next . getPath () + "*" ;
if ( "/api/*" . equals ( path )) {
next . remove ();
}
}
log . info ( "remove route /api/*" );
List < Route > routes1 = router . getRoutes ();
System . out . println ();
}
3.2.6 TCP 反向代理 大家口中常说的反向代理,一般都是 HTTP 反向代理。
我简单实现了一个 TCP 反向代理,源码可查看 tcp 反向代理 top.meethigher.proxy.tcp.SimpleReverseTcpProxy 。我是使用传统 bio 的形式,这种做法,反代 1 个 TCP 连接,需要使用 2 个线程,直到 TCP 连接释放。如果想要在有限的 CPU 上同时反代更多的 TCP 连接,这种做法是不可取的。而且这种做法有明显的缺陷,当 TCP 连接没有读写时,线程处于阻塞等待的状态,这段时间本可以做其他的事。
一个优化做法——事件驱动 (多路复用),通俗解释就是,线程等待的时间,安排他去做其他类型的事。
有可连接事件时,去连接 有可读事件,去读取 有可写事件,去写入 Vertx 就是基于事件驱动的,2 个线程即可实现超多连接的处理。代码如下
示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static void tcpReverse () {
NetClient netClient = vertx . createNetClient ();
NetServer netServer = vertx . createNetServer ();
Handler < NetSocket > handler = sourceSocket -> {
sourceSocket . pause ();
netClient . connect ( 5432 , "10.0.0.9" ). onSuccess ( targetSocket -> {
targetSocket . pipeTo ( sourceSocket );
sourceSocket . pipeTo ( targetSocket );
sourceSocket . resume ();
});
};
Handler < AsyncResult < NetServer >> successHandler = r -> System . out . println ( "Server listening on port" + port );
Handler < Throwable > throwableHandler = Throwable :: printStackTrace ;
netServer . connectHandler ( handler ). listen ( port , host ). onComplete ( successHandler ). onFailure ( throwableHandler );
}
3.2.7 TCP 数据传输问题 当 tcp 向 socket 传输数据时,如果对面消费不过来或者接收不过来,那么数据会存储在发送方的本地发送缓冲区(也称为发送队列,在 vertx 的 socket 中即 writeQueue)中。
若对方一直不消费,那么就会出现本地内存崩了的情况。因此在开发过程中,要利用好相关 api,如下。
在编程时,更建议使用 pipeTo,内部已经实现了如下操作。
这其实就是一种背压/反压(Backpressure)机制。当下游处理能力不足 ,来不及处理上游发送过来的数据时,下游会以某种方式通知或限制上游减速或暂停发送数据 ,这个机制就叫做“背压”。
1
2
3
4
5
6
7
ws . drainHandler ( v -> src . resume ());
src . handler ( item -> {
ws . write ( item );
if ( ws . writeQueueFull ()) {
src . pause ();
}
});
3.2.8 TCP 通信-自定义协议 当我们直接使用 TCP 进行通信时,如果双方没有一个明确通信协议的话,就会出现粘包/半包 的问题。
下面记录一个简单的自定义协议。
1
2
| 4 字节(消息长度) | 2 字节(消息类型) | 变长(消息体) |
| 0x0010 | 0x0001 | { "id" :1, "msg" :"Hello" } |
序列化和反序列化就使用probotuf 。protobuf-javalite 保留了大部分常见的基本功能,如序列化、反序列化、消息对象的构建等。然而,去除了一些 Protobuf 的高级特性,如流式解析、扩展字段、未知字段、反射支持等,适用于不需要这些特性的应用场景。以下两个依赖,按需引入其中一个即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId> com.google.protobuf</groupId>
<artifactId> protobuf-java</artifactId>
<version> 4.30.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-javalite -->
<dependency>
<groupId> com.google.protobuf</groupId>
<artifactId> protobuf-javalite</artifactId>
<version> 4.30.2</version>
</dependency>
另外,需要在编译前,将 .proto 转为 .java,我建议使用命令,轻量便捷。
1
2
3
4
5
6
7
# 在任意目录下
protoc -I= $SRC_DIR --java_out= $DST_DIR $SRC_DIR /addressbook.proto
# 在当前应用目录下
protoc --java_out= ${ OUTPUT_DIR } path/to/your/proto/file
# 注意,如果你使用的是javalite,那么在生成时,命令需要更换为如下
# 参考 https://github.com/protocolbuffers/protobuf/blob/main/java/lite.md
protoc --java_out= lite:${ OUTPUT_DIR } path/to/your/proto/file
示例代码参考meethigher/vertx-examples
3.2.9 TCP 通信-自定义解码逻辑 在上面的自定义协议中,规范了编码逻辑。但是在数据接收时,就需要编写解码逻辑实现解码。
此处我们可以使用RecordParser来实现解码。RecordParser支持定长消息、固定分隔符消息。
针对预设长度的消息,我使用的自定义解码逻辑。代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import io.vertx.core.Handler ;
import io.vertx.core.buffer.Buffer ;
/**
* 自定义消息结构解析器
* [4字节长度+2字节类型+protobuf变长消息体]
*/
public class ProtoParser implements Handler < Buffer > {
private Buffer buf = Buffer . buffer ();
/**
* 预设长度起始位置
*/
private final int lengthFieldOffset = 0 ;
/**
* 预设长度占用的字节数
*/
private final int lengthFieldLength = 4 ;
/**
* 消息类型起始位置
*/
private final int typeFieldOffset = 4 ;
/**
* 消息类型占用的字节数
*/
private final int typeFieldLength = 2 ;
/**
* 消息体起始位置
*/
private final int bodyFieldOffset = 6 ;
private final Handler < Buffer > outputHandler ;
public ProtoParser ( Handler < Buffer > outputHandler ) {
this . outputHandler = outputHandler ;
}
@Override
public void handle ( Buffer buffer ) {
buf . appendBuffer ( buffer );
if ( buf . length () < lengthFieldLength ) {
return ;
} else {
int totalLength = buf . getInt ( lengthFieldOffset );
if ( buf . length () < totalLength ) {
return ;
} else {
outputHandler . handle ( buf . getBuffer ( 4 , totalLength ));
buf = buf . getBuffer ( totalLength , buf . length ());
}
}
}
}
运行示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import io.vertx.core.Handler ;
import io.vertx.core.buffer.Buffer ;
import io.vertx.core.parsetools.RecordParser ;
import top.meethigher.example13.ProtoParser ;
import java.util.concurrent.ThreadLocalRandom ;
public class Example13 {
public static void main ( String [] args ) {
diyRecord ();
}
/**
* 使用换行符作为分隔符,表示消息的结尾
* 示例消息内容
* <pre>
* [...\n][...\n]
* </pre>
*/
public static void lineBasedRecord () {
Handler < Buffer > output = h -> {
System . out . println ( h . toString ());
};
final RecordParser parser = RecordParser . newDelimited ( "\n" , output );
// parser.handle(Buffer.buffer("HELLO\nHOW ARE Y"));
// parser.handle(Buffer.buffer("OU?\nI AM"));
// parser.handle(Buffer.buffer("DOING OK"));
// parser.handle(Buffer.buffer("\n"));
Buffer buffer = Buffer . buffer ( "Hello!\nHow are you?\nI am fine! Thank you!\n" );
// 模拟网络传输中的粘包半包问题
int tStart = 0 , tEnd = 0 ;
for ( int i = 0 ; i < buffer . length (); i ++ ) {
// 前闭后开
tEnd = ThreadLocalRandom . current (). nextInt ( tStart + 1 , buffer . length () + 1 );
if ( tEnd > buffer . length ()) {
tEnd = buffer . length ();
}
// 前闭后开
Buffer tb = buffer . getBuffer ( tStart , tEnd );
parser . handle ( tb );
tStart = tEnd ;
if ( tStart >= buffer . length ()) {
break ;
}
}
}
/**
* 定长消息结构。假如我固定长度为3(一个中文刚好字节占3)。示例消息内容
* 你好啊世界
*/
public static void fixedLengthRecord () {
RecordParser parser = RecordParser . newFixed ( 3 , b -> {
System . out . println ( b );
});
parser . handle ( Buffer . buffer ( "你好啊世界" ));
}
/**
* 自定义消息结构。示例消息内容
* <pre>
* [4字节长度+2字节类型+protobuf变长消息体][4字节长度+2字节类型+protobuf变长消息体]
* </pre>
*/
public static void diyRecord () {
ProtoParser parser = new ProtoParser ( b -> {
short type = b . getShort ( 0 );
Buffer buffer = b . getBuffer ( 2 , b . length ());
System . out . println ( "消息类型=" + type + ", 消息内容=" + buffer . toString ());
});
// 模拟发送的消息
Buffer buffer = Buffer . buffer ();
Buffer body = Buffer . buffer ( "你好,世界!" );
// 网络传输都使用大端
// 模拟第一条消息
buffer . appendInt ( 4 + 2 + body . length ());
buffer . appendShort (( short ) 1 );
buffer . appendBytes ( body . getBytes ());
// 模拟第二条消息
Buffer body1 = Buffer . buffer ( "hello, world! " );
buffer . appendInt ( 4 + 2 + body1 . length ());
buffer . appendShort (( short ) 2 );
buffer . appendBytes ( body1 . getBytes ());
buffer . appendBytes ( body1 . getBytes ());
// 模拟网络传输中的粘包半包问题
int tStart = 0 , tEnd = 0 ;
for ( int i = 0 ; i < buffer . length (); i ++ ) {
// 前闭后开
tEnd = ThreadLocalRandom . current (). nextInt ( tStart + 1 , buffer . length () + 1 );
if ( tEnd > buffer . length ()) {
tEnd = buffer . length ();
}
// 前闭后开
Buffer tb = buffer . getBuffer ( tStart , tEnd );
parser . handle ( tb );
tStart = tEnd ;
if ( tStart >= buffer . length ()) {
break ;
}
}
}
}
3.2.10 TCP 通信-Client自动重连 我需要让 NetClient 一直与服务端保持一个活跃的连接。除了使用心跳外,针对一些被关闭的情况,需要添加自动重连机制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import io.vertx.core.Vertx ;
import io.vertx.core.net.NetClient ;
import io.vertx.core.net.NetSocket ;
public class Example14 {
private static void startServer () {
Vertx vertx = Vertx . vertx ();
vertx . createNetServer (). connectHandler ( socket -> {
socket . close ();
}). listen ( 8080 ). onFailure ( e -> {
e . printStackTrace ();
System . exit ( 1 );
});
}
/**
* 实现client的重连机制
*/
private static void startClient () {
Vertx vertx = Vertx . vertx ();
NetClient netClient = vertx . createNetClient ();
connect ( vertx , netClient );
}
private static void connect ( Vertx vertx , NetClient netClient ) {
netClient . connect ( 8080 , "127.0.0.1" )
. onComplete ( ar -> {
if ( ar . succeeded ()) {
NetSocket socket = ar . result ();
socket . pause ();
reconnectDelay = MIN_DELAY ;
System . out . println ( "重连成功" );
socket . closeHandler ( t -> {
System . out . println ( "连接被关闭咯" );
reconnect ( vertx , netClient );
});
socket . resume ();
} else {
ar . cause (). printStackTrace ();
reconnect ( vertx , netClient );
}
});
}
private static final long MIN_DELAY = 3000 ;
private static final long MAX_DELAY = 60000 ;
private static long reconnectDelay = MIN_DELAY ;
/**
* 自动重连: 指数退避策略
*/
private static void reconnect ( Vertx vertx , NetClient netClient ) {
vertx . setTimer ( reconnectDelay , id -> {
System . out . println ( "开始重连" );
connect ( vertx , netClient );
reconnectDelay = Math . min ( reconnectDelay * 2 , MAX_DELAY );
});
}
public static void main ( String [] args ) {
startServer ();
startClient ();
}
}
以上只是一个简单的例子,其实还可以封装的更优雅。可以参考这个的底层源码 。
3.2.11 手撕内网穿透FRP 时序图
展开
源码地址手撕FRP
3.3 参考致谢