言成言成啊 | Kit Chen's Blog

网络编程NIO

发布于2023-03-16 21:30:59,更新于2023-05-03 22:15:21,标签:java  文章会持续修订,转载请注明来源地址:https://meethigher.top/blog

上学时到目前,只会阻塞io。结合最近工作实践,发现阻塞io在某些场景下是存在瓶颈的。就展开了对非阻塞io的了解。

一、背景

避开应用场景谈技术,全是耍流氓。粗略记一下,最近由应用场景瓶颈,所展开的对新技术的学习并实践。

最近要压测服务长连接瓶颈。测试他们使用常规压测工具(一连接一线程)来模拟客户端,一个线程一个连接,其实发的请求如果响应慢的时候,线程就阻塞在那了,是施加不了多少压力的(比如我想实现每个连接每5秒发送消息,如果光响应就超过5秒了,这时候哪有啥压力),再者,机器的线程数量本身也是个瓶颈。

由此,使用Netty简单封装了一个压测client。严格做到模拟每个长连接每5秒发送消息,5000长连接处理收发也只是12线程而已,压力大大上去。

不过也越发觉得,对nio的知识掌握过于薄弱。

本文非原创,指路原作者

不过以下内容都添加了本人学习过程的思路,部分内容与原作者有出入。

二、NIO 基础

non-blocking io 非阻塞 IO

2.1 三大组件

2.1.1 Channel & Buffer

channel 有一点类似于 stream,它就是读写数据的双向通道,可以从 channel 将数据读入 buffer,也可以将 buffer 的数据写入 channel,而之前的 stream 要么是输入,要么是输出,channel 比 stream 更为底层

常见的 Channel 有

  • FileChannel:文件传输通道
  • DatagramChannel:UDP传输通道
  • SocketChannel:TCP传输通道,一般用于服务端或客户端
  • ServerSocketChannel:TCP传输通道,适用于服务端

buffer 则用来缓冲读写数据,常见的 buffer 有

  • ByteBuffer:常用,抽象类,以下是实现类
    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
  • CharBuffer

2.1.2 Selector选择器

selector 单从字面意思不好理解,需要结合服务器的设计演化来理解它的用途

多线程版设计

⚠️ 多线程版缺点

  • 内存占用高(线程数量与CPU有关,存在瓶颈。比如16核,一般同时只有16个线程在跑,单核超线程技术除外)
  • 线程上下文切换成本高
  • 只适合连接数少的场景

线程池版设计

⚠️ 线程池版缺点

  • 阻塞模式下,线程仅能处理一个 socket 连接
  • 仅适合短连接场景(比如Tomcat就是一个请求一个线程)

selector 版设计

selector 的作用就是配合一个线程来管理多个 channel,获取这些 channel 上发生的事件,这些 channel 工作在非阻塞模式下,不会让线程吊死在一个 channel 上。适合连接数特别多,但流量低的场景(low traffic)

调用 selector 的 select() 会阻塞直到 channel 发生了读写就绪事件,这些事件发生,select 方法就会返回这些事件交给 thread 来处理

2.2 ByteBuffer

有一普通文本文件 data.txt,内容为

1
1234567890abcd

使用 FileChannel 来读取文件内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
public class TestByteBuffer {
public static void main(String[] args) {
/**
* 获取FileChannel
* 1. 通过输入输出流
* 2. 通过RandomAccessFile
*/
try (FileChannel channel = new FileInputStream("data.txt").getChannel()) {//data.txt里面有13个字符
//准备缓冲区10个字节
ByteBuffer buffer = ByteBuffer.allocate(10);
//要分多次读取,因为文件可能很大,但是缓冲区不能无限大
while (true) {
//从channel读取数据,向buffer写入
int len = channel.read(buffer);
log.info("读取到的字节数:{}",len);
if (len == 0 || len == -1) {
break;
}
//打印buffer内容
buffer.flip();//切换成读模式
while (buffer.hasRemaining()) {//检查是否有剩余未读数据
byte b = buffer.get();//一次读一个字节
System.out.println((char) b);//转为字符打印
}
buffer.clear();//切换为写模式
}
} catch (IOException e) {
}
}
}

输出

2.2.1 三个重要属性

  1. buffer初始是写入模式

  2. 向 buffer 写入数据,例如调用 channel.read(buffer)

  3. 调用 flip() 切换至读模式

  4. 从 buffer 读取数据,例如调用 buffer.get()

  5. 调用 clear() 或 compact() 切换至写模式

  6. 重复 1~4 步骤

ByteBuffer 有以下重要属性

  • capacity
  • position
  • limit

一开始

写模式下,position 是写入位置,limit 等于容量,下图表示写入了 4 个字节后的状态

flip 动作发生后,切换为读模式。position 切换为读取位置,limit 切换为读取限制

读取 4 个字节后,状态

clear 动作发生后,切换为写模式,状态

compact 方法,是把未读完的部分向前压缩,然后切换至写模式

2.2.2 调试工具类

该工具类,实现源码参考自io.netty.handler.logging.LoggingHandler#formatByteBufio.netty.buffer.ByteBufUtil.HexUtil#appendPrettyHexDump

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import io.netty.util.internal.StringUtil;
import java.nio.ByteBuffer;

import static io.netty.util.internal.StringUtil.NEWLINE;
import static io.netty.util.internal.MathUtil.isOutOfBounds;

public class ByteBufferUtil {
private static final char[] BYTE2CHAR = new char[256];
private static final char[] HEXDUMP_TABLE = new char[256 * 4];
private static final String[] HEXPADDING = new String[16];
private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
private static final String[] BYTE2HEX = new String[256];
private static final String[] BYTEPADDING = new String[16];

static {
final char[] DIGITS = "0123456789abcdef".toCharArray();
for (int i = 0; i < 256; i++) {
HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
}

int i;

// Generate the lookup table for hex dump paddings
for (i = 0; i < HEXPADDING.length; i++) {
int padding = HEXPADDING.length - i;
StringBuilder buf = new StringBuilder(padding * 3);
for (int j = 0; j < padding; j++) {
buf.append(" ");
}
HEXPADDING[i] = buf.toString();
}

// Generate the lookup table for the start-offset header in each row (up to 64KiB).
for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
StringBuilder buf = new StringBuilder(12);
buf.append(NEWLINE);
buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
buf.setCharAt(buf.length() - 9, '|');
buf.append('|');
HEXDUMP_ROWPREFIXES[i] = buf.toString();
}

// Generate the lookup table for byte-to-hex-dump conversion
for (i = 0; i < BYTE2HEX.length; i++) {
BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
}

// Generate the lookup table for byte dump paddings
for (i = 0; i < BYTEPADDING.length; i++) {
int padding = BYTEPADDING.length - i;
StringBuilder buf = new StringBuilder(padding);
for (int j = 0; j < padding; j++) {
buf.append(' ');
}
BYTEPADDING[i] = buf.toString();
}

// Generate the lookup table for byte-to-char conversion
for (i = 0; i < BYTE2CHAR.length; i++) {
if (i <= 0x1f || i >= 0x7f) {
BYTE2CHAR[i] = '.';
} else {
BYTE2CHAR[i] = (char) i;
}
}
}

/**
* 打印所有内容
*
* @param buffer
*/
public static void debugAll(ByteBuffer buffer) {
int oldlimit = buffer.limit();
buffer.limit(buffer.capacity());
StringBuilder origin = new StringBuilder(256);
appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
System.out.println("+--------+-------------------- all ------------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
System.out.println(origin);
buffer.limit(oldlimit);
}

/**
* 打印可读取内容
*
* @param buffer
*/
public static void debugRead(ByteBuffer buffer) {
StringBuilder builder = new StringBuilder(256);
appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
System.out.println("+--------+-------------------- read -----------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
System.out.println(builder);
}

private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
if (isOutOfBounds(offset, length, buf.capacity())) {
throw new IndexOutOfBoundsException(
"expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
+ ") <= " + "buf.capacity(" + buf.capacity() + ')');
}
if (length == 0) {
return;
}
dump.append(
" +-------------------------------------------------+" +
NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +
NEWLINE + "+--------+-------------------------------------------------+----------------+");

final int startIndex = offset;
final int fullRows = length >>> 4;
final int remainder = length & 0xF;

// Dump the rows which have 16 bytes.
for (int row = 0; row < fullRows; row++) {
int rowStartIndex = (row << 4) + startIndex;

// Per-row prefix.
appendHexDumpRowPrefix(dump, row, rowStartIndex);

// Hex dump
int rowEndIndex = rowStartIndex + 16;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(" |");

// ASCII dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append('|');
}

// Dump the last row which has less than 16 bytes.
if (remainder != 0) {
int rowStartIndex = (fullRows << 4) + startIndex;
appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);

// Hex dump
int rowEndIndex = rowStartIndex + remainder;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(HEXPADDING[remainder]);
dump.append(" |");

// Ascii dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append(BYTEPADDING[remainder]);
dump.append('|');
}

dump.append(NEWLINE +
"+--------+-------------------------------------------------+----------------+");
}

private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
if (row < HEXDUMP_ROWPREFIXES.length) {
dump.append(HEXDUMP_ROWPREFIXES[row]);
} else {
dump.append(NEWLINE);
dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
dump.setCharAt(dump.length() - 9, '|');
dump.append('|');
}
}

public static short getUnsignedByte(ByteBuffer buffer, int index) {
return (short) (buffer.get(index) & 0xFF);
}
}

2.2.3 常见方法

分配空间

可以使用 allocate 方法为 ByteBuffer 分配空间,其它 buffer 类也有该方法

1
2
Bytebuffer buf = ByteBuffer.allocate(16);
ByteBuffer buf2 = ByteBuffer.allocateDirect(16);

两种分配空间的方式对比

1
2
3
4
5
6
7
8
9
10
11
public class TestByteBufferAllocate {

public static void main(String[] args) {
System.out.println(ByteBuffer.allocate(10).getClass());//java.nio.HeapByteBuffer java堆内存
System.out.println(ByteBuffer.allocateDirect(10).getClass());//java.nio.DirectByteBuffer 直接内存
/**
* java堆内存: 读写效率较低,受到GC的影响(其他普通对象也是一样,在堆内存中。在GC时,内存分配地址会进行变更也就是进行了拷贝)
* 直接内存:读写效率高(少一次数据拷贝),不会受GC的影响(使用的系统内存,缺点是需要调用系统函数,分配的会比较慢,使用不当会造成内存泄漏)。
*/
}
}

向 buffer 写入数据

有两种办法

  • 调用 channel 的 read 方法。从channel读取数据, 向buffer写入
  • 调用 buffer 自己的 put 方法
1
int readBytes = channel.read(buf);

1
buf.put((byte)127);

从 buffer 读取数据

同样有两种办法

  • 调用 channel 的 write 方法。从buffer读取数据,写入到channel
  • 调用 buffer 自己的 get 方法
1
int writeBytes = channel.write(buf);

1
byte b = buf.get();

get 方法会让 position 读指针向后走,如果想重复读取数据

  • 可以调用 rewind 方法将 position 重新置为 0
  • 或者调用 get(int i) 方法获取索引 i 的内容,它不会移动读指针

mark 和 reset

mark 是在读取时,做一个标记,即使 position 改变,只要调用 reset 就能回到 mark 的位置

注意

rewind 和 flip 都会清除 mark 位置

字符串与 ByteBuffer 互转

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class TestByteBufferString {
public static void main(String[] args) {
//字符串转为ByteBuffer

//方法一:字节数组,写模式
ByteBuffer buffer = ByteBuffer.allocate(16);
buffer.put("hello world".getBytes(StandardCharsets.UTF_8));
debugAll(buffer);
System.out.println((char) buffer.get());


//方法二:直接通过编码转,读模式
ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("hello world");
// ByteBuffer buffer1 = Charset.forName("UTF-8").encode("hello world");
debugAll(buffer1);
// buffer1.flip();//查看TestByteBufferFlip源码, 其实严格来说flip切换读写模式是不对的,他只做了个位置更新。limit=currentPosition;currentPosition=0
System.out.println((char) buffer1.get());

//方法三: wrap,读模式
ByteBuffer wrap = ByteBuffer.wrap("hello world".getBytes(StandardCharsets.UTF_8));
debugAll(wrap);
System.out.println((char) wrap.get());

debugAll(wrap);
//ByteBuffer转为字符串

//方法一:使用编码
wrap.rewind();
String s = StandardCharsets.UTF_8.decode(wrap).toString();
System.out.println(s);

//方法二:使用字节数组
wrap.rewind();
String s1 = new String(wrap.array(), StandardCharsets.UTF_8);
System.out.println(s1);
}
}

Buffer 是非线程安全

Buffer 是非线程安全的

2.2.4 分散读

场景:文件里有三个已知长度的单词,我要分别读取成3个字符串。

常规做法是,一次性读出来后,再次根据长度分割成3个字符串。这会导致在内存中多复制了一份,性能略低。更好的做法是分散读

分散读取,有一个文本文件 words.txt

1
onetwothree

使用如下方式读取,可以将数据填充至多个 buffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class TestScatteringReads {
public static void main(String[] args) {
try (FileChannel channel = new RandomAccessFile("words.txt", "r").getChannel()) {
//准备多个buffer
ByteBuffer b1 = ByteBuffer.allocate(3);
ByteBuffer b2 = ByteBuffer.allocate(3);
ByteBuffer b3 = ByteBuffer.allocate(5);
channel.read(new ByteBuffer[]{b1, b2, b3});
//此时刚把值写到ByteBuffer,此时三个buffer的position与limit分别都到了3、3、5。进行翻转。将limit置为当前position,position置为0
b1.flip();
b2.flip();
b3.flip();
debugAll(b1);
debugAll(b2);
debugAll(b3);
} catch (IOException e) {
}
}
}

结果

2.2.5 集中写

由分散读可以引出集中写的思路。

使用如下方式写入,可以将多个 buffer 的数据填充至 channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class TestGatheringWrites {

public static void main(String[] args) {
//准备多个buffer
ByteBuffer b1 = StandardCharsets.UTF_8.encode("one");
ByteBuffer b2 = StandardCharsets.UTF_8.encode("two");
//utf-8中,中文占3个字节
ByteBuffer b3 = StandardCharsets.UTF_8.encode("你好");

try (FileChannel channel = new RandomAccessFile("words2.txt", "rw").getChannel()) {
channel.write(new ByteBuffer[]{b1, b2, b3});
} catch (IOException e) {
}

}
}

文件内容

1
onetwo你好

2.2.6 粘包拆包示例

网络上有多条数据发送给服务端,数据之间使用 \n 进行分隔,但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为

  • Hello,world\n
  • I’m zhangsan\n
  • How are you?\n

变成了下面的两个 byteBuffer (粘包、拆包)

  • Hello,world\nI’m zhangsan\nHo
  • w are you?\n

粘包与拆包只有在 TCP 传输的时候才会有,像 UDP 是不会有这种情况的,原因是因为 TCP 是面向流的,数据之间没有界限的,而 UDP 是有界限的。

粘包:服务端为了提高传输效率,会将多条短消息合并后一起发送。接收端就获得了一个大的消息,出现了粘包现象。

拆包:服务端发送的消息,超过了接收端的接收能力,不能一次性拿到包里所有数据,出现了拆包现象。

现在要求你编写程序,将错乱的数据恢复成原始的按 \n 分隔的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class TestByteBufferExample {

public static void main(String[] args) {
/**
* 网络上有多条数据发送给服务端,数据之间使用 `\n` 进行分隔,但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为
*
* Hello,world\n
* I'm zhangsan\n
* How are you?\n
*
* 变成了下面的两个 byteBuffer (粘包、拆包)
*
* Hello,world\nI'm zhangsan\nHo
* w are you?\n
*/
//用来模拟服务端接收到的消息
ByteBuffer source = ByteBuffer.allocate(32);
source.put("Hello,world\nI'm zhangsan\nHo".getBytes(StandardCharsets.UTF_8));
handle(source);
source.put("w are you?\n".getBytes(StandardCharsets.UTF_8));
handle(source);
}

private static void handle(ByteBuffer buffer) {
buffer.flip();
for (int i = 0; i < buffer.limit(); i++) {
//找到一条完整消息
if (buffer.get(i) == '\n') {
int length = i + 1 - buffer.position();
//把这条消息存入新的ByteBuffer
ByteBuffer byteBuffer = ByteBuffer.allocate(length);
//从buffer读,像byteBuffer写
for (int j = 0; j < length; j++) {
byteBuffer.put(buffer.get());
}
byteBuffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(byteBuffer));
}
}
//移动到未读的位置
buffer.compact();
}
}

2.3 文件编程

2.3.1 FileChannel

工作模式

FileChannel 只能工作在阻塞模式下,不能配合selector

只有SocketChannel才能配合selector工作在非阻塞模式下

获取

不能直接打开 FileChannel,必须通过 FileInputStream、FileOutputStream 或者 RandomAccessFile 来获取 FileChannel,它们都有 getChannel 方法

  • 通过 FileInputStream 获取的 channel 只能读
  • 通过 FileOutputStream 获取的 channel 只能写
  • 通过 RandomAccessFile 是否能读写根据构造 RandomAccessFile 时的读写模式决定

读取

会从 channel 读取数据填充 ByteBuffer,返回值表示读到了多少字节,-1 表示到达了文件的末尾

1
int readBytes = channel.read(buffer);

写入

写入的正确姿势如下

1
2
3
4
5
6
7
8
ByteBuffer buffer = ...;
buffer.put(...); // 存入数据
buffer.flip(); // 切换读模式

//channel写入能力是有上限的,因此不能一次性写入。而要循环写入
while(buffer.hasRemaining()) {
channel.write(buffer);
}

在 while 中调用 channel.write 是因为 write 方法并不能保证一次将 buffer 中的内容全部写入 channel

关闭

channel 必须关闭,不过调用了 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 close 方法会间接地调用 channel 的 close 方法

位置

获取当前位置

1
long pos = channel.position();

设置当前位置

1
2
long newPos = ...;
channel.position(newPos);

设置当前位置时,如果设置为文件的末尾

  • 这时读取会返回 -1
  • 这时写入,会追加内容,但要注意如果 position 超过了文件末尾,再写入时在新内容和原末尾之间会有空洞(00)

大小

使用 size 方法获取文件的大小

强制写入

操作系统出于性能的考虑,会将数据缓存,不是立刻写入磁盘。可以调用 force(true) 方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘

2.3.2 两个 Channel 传输数据

filechannel的transfer每次只能传输2g数据。

比如我有一个3g的文件,想一次性传输,只能传2g,所以要做循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Slf4j
public class TestFileChannelTransferTo {
public static void main(String[] args) {
log.info("start");
String from = "B:\\cepheus_images_V11.0.9.0.QFACNXM_20200408.0000.00_10.0_cn_c587631ef4.tgz";
String to = "C:\\Users\\meethigher\\Desktop\\to.tgz";
try (FileChannel fromChannel = new FileInputStream(from).getChannel();
FileChannel toChannel = new FileOutputStream(to).getChannel()) {
//效率比文件输入输出流效率要高。只要jdk中带transferTo的,都采用了操作系统的零拷贝技术
//缺点:一次只能传输2g的数据
//下面是一次性传输大于2g的数据示例
long left = fromChannel.size();
log.info("file size: {} B", left);
while (left > 0) {
// idea快捷键 ctrl+alt+v,可以快速将变量提取出来
long realTransfer = fromChannel.transferTo(fromChannel.size() - left, left, toChannel);
log.info("real transfer: {} B", realTransfer);
left -= realTransfer;
}
} catch (IOException e) {
e.printStackTrace();
}
log.info("end");
}
}

2.3.3 Path

jdk7 引入了 Path 和 Paths 类

  • Path 用来表示文件路径

  • Paths 是工具类,用来获取 Path 实例

  • . 代表了当前路径

  • .. 代表了上一级路径

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
public class TestPath {
public static void main(String[] args) {
System.out.println(System.getProperty("user.dir"));
Path source = Paths.get("1.txt"); // 相对路径 相当于使用 user.dir 环境变量来定位 1.txt
// System.out.println(source.toAbsolutePath());
source = Paths.get("d:\\1.txt"); // 绝对路径 代表了 d:\1.txt
source = Paths.get("d:/1.txt"); // 绝对路径 同样代表了 d:\1.txt
Path projects = Paths.get("d:\\data", "projects"); // 代表了 d:\data\projects
source=Paths.get("C:\\Users\\meethigher\\Desktop\\netty\\java-nio\\src\\..\\target");
System.out.println(source.toAbsolutePath());// C:\Users\meethigher\Desktop\netty\java-nio\src\..\target
System.out.println(source.normalize());//正常化后路径 C:\Users\meethigher\Desktop\netty\java-nio\target
}
}

2.3.4 Files

jdk7引入了Files

检查文件是否存在

1
2
Path path = Paths.get("helloword/data.txt");
System.out.println(Files.exists(path));

创建一级目录

1
2
Path path = Paths.get("helloword/d1");
Files.createDirectory(path);
  • 如果目录已存在,会抛异常 FileAlreadyExistsException
  • 不能一次创建多级目录,否则会抛异常 NoSuchFileException

创建多级目录用

1
2
Path path = Paths.get("helloword/d1/d2");
Files.createDirectories(path);

拷贝文件

1
2
3
4
Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/target.txt");

Files.copy(source, target);
  • 如果文件已存在,会抛异常 FileAlreadyExistsException

如果希望用 source 覆盖掉 target,需要用 StandardCopyOption 来控制

1
Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING);

移动文件

1
2
3
4
Path source = Paths.get("helloword/data.txt");
Path target = Paths.get("helloword/data.txt");

Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
  • StandardCopyOption.ATOMIC_MOVE 保证文件移动的原子性

删除文件

1
2
3
Path target = Paths.get("helloword/target.txt");

Files.delete(target);
  • 如果文件不存在,会抛异常 NoSuchFileException

删除目录

1
2
3
Path target = Paths.get("helloword/d1");

Files.delete(target);
  • 如果目录还有内容,会抛异常 DirectoryNotEmptyException

遍历目录文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
public class TestFilesWalkFileTree {
public static void main(String[] args) throws IOException {
AtomicInteger dirCount = new AtomicInteger(0);
AtomicInteger fileCount = new AtomicInteger(0);
//遍历文件,访问者模式
Files.walkFileTree(Paths.get(System.getProperty("user.dir")), new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
log.info("==>" + dir);
dirCount.incrementAndGet();
return super.preVisitDirectory(dir, attrs);
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
log.info(file.toString());
fileCount.incrementAndGet();
return super.visitFile(file, attrs);
}

@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
return super.postVisitDirectory(dir, exc);
}
});


log.info("文件夹数量:{}", dirCount.get());
log.info("文件数量:{}", fileCount.get());
}
}

统计 class 的数目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
public class TestFilesWalkFileTree {
public static void main(String[] args) throws IOException {
AtomicInteger classCount = new AtomicInteger(0);
//遍历文件,访问者模式
Files.walkFileTree(Paths.get(System.getProperty("user.dir")), new SimpleFileVisitor<Path>() {

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (file.getFileName().toString().endsWith(".class")) {
log.info(file.toString());
classCount.incrementAndGet();
}
return super.visitFile(file, attrs);
}

});

log.info("字节码数量:{}", classCount.get());
}
}

删除多级目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Slf4j
public class TestFilesWalkFileTree {
public static void main(String[] args) throws IOException {
Files.walkFileTree(Paths.get("C:\\Users\\meethigher\\Desktop\\web-filemanager"), new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
log.info("==> 进入{}", dir);
return super.preVisitDirectory(dir, attrs);
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
log.info("删除文件:{}", file);
return super.visitFile(file, attrs);
}

@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
log.info("<== 退出{}", dir);
Files.delete(dir);
return super.postVisitDirectory(dir, exc);
}
});
}
}

拷贝多级目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
public class TestFilesCopy {
public static void main(String[] args) throws Exception {
String sourceStr = "D:\\开发手册";
Path source = Paths.get(sourceStr);
String targetStr = "C:\\Users\\meethigher\\Desktop\\web-filemanager";
Files.walk(source).forEach(path -> {
try {
String replace = path.toString().replace(sourceStr, targetStr);
System.out.println(replace);
if (Files.isDirectory(path)) {
Files.createDirectories(Paths.get(replace));
} else {
Files.copy(path, Paths.get(replace), StandardCopyOption.REPLACE_EXISTING);
}
} catch (IOException e) {

}
});
}
}

2.4 TCP网络编程

以下是使用nio实现的网络编程

传统的bio网络编程,参考

2.4.1 非阻塞 vs 阻塞

阻塞

  • 阻塞模式下,相关方法都会导致线程暂停
    • ServerSocketChannel.accept 会在没有连接建立时让线程暂停,直到有连接
    • SocketChannel.read 会在没有数据可读时让线程暂停,直到收到数据
    • 阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置
  • 单线程下,阻塞方法之间相互影响,比如等待连接与等待消息,这两个是矛盾的,具体参照下面代码。
  • 多线程阻塞方式参考,但多线程下,有新的问题,体现在以下方面
    • 32 位 jvm 一个线程 320k,64 位 jvm 一个线程 1024k,如果连接数过多,必然导致 OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
    • 可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间 inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接

服务器端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
public class Server {
public static void main(String[] args) throws Exception {
//使用nio实现来理解阻塞模式, 单线程处理

ByteBuffer byteBuffer = ByteBuffer.allocate(16);
//1. 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
//2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
//3 建立连接的集合
List<SocketChannel> channels = new LinkedList<>();
while (true) {
//4 accept 建立与客户端的连接 SocketChannel用来与客户端通信
log.info("connecting...");
SocketChannel channel = ssc.accept();//阻塞方法
log.info("connected... {}", channel);
channels.add(channel);

for (SocketChannel socketChannel : channels) {
//5 接收客户端发送的数据
log.info("before read... {}", socketChannel);
//阻塞方法
//如果要按照阻塞模式开发,此处的read应该要新开一个线程
//阻塞模式的Server端可参照https://github.com/meethigher/chat-room/tree/master/server
socketChannel.read(byteBuffer);
byteBuffer.flip();
debugRead(byteBuffer);
byteBuffer.clear();
log.info("after read... {}", socketChannel);
}
}

}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j
public class Client {
public static void main(String[] args) throws Exception {
SocketChannel sc=SocketChannel.open();
sc.connect(new InetSocketAddress("127.0.0.1",8080));
sc.write(StandardCharsets.UTF_8.encode("hello"));

Thread.sleep(5000);
//使用单线程阻塞模式,并不能及时的监听到消息
sc.write(StandardCharsets.UTF_8.encode("world"));

while(true) {
}
}
}

非阻塞

  • 非阻塞模式下,相关方法都会不会让线程暂停
    • 在 ServerSocketChannel.accept 在没有连接建立时,会返回 null,继续运行
    • SocketChannel.read 在没有数据可读时,会返回 0,但线程不必阻塞,可以去执行其它 SocketChannel 的 read 或是去执行 ServerSocketChannel.accept
    • 写数据时,线程只是等待数据写入 Channel 即可,无需等 Channel 通过网络把数据发送出去
  • 但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了 cpu,实际中也并不会这么用。
  • 数据复制过程中,线程实际还是阻塞的(AIO 改进的地方)

服务器端,客户端代码不变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Slf4j
public class Server {
public static void main(String[] args) throws Exception {
//使用nio实现来理解阻塞模式, 单线程处理

ByteBuffer byteBuffer = ByteBuffer.allocate(16);
//1. 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
//1.1 设置ServerSocketChannel为非阻塞(默认是阻塞)。这会导致accept方法非阻塞
ssc.configureBlocking(false);
//2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
//3 建立连接的集合
List<SocketChannel> channels = new LinkedList<>();
while (true) {
//4 accept 建立与客户端的连接 SocketChannel用来与客户端通信
//log.info("connecting...");
//上面设置了非阻塞,那么这个就不等待,直接返回。
//如果没有连接建立,返回的就是空
SocketChannel channel = ssc.accept();
if (channel != null) {
log.info("connected... {}", channel);
//SocketChannel设置为非阻塞(默认是阻塞),这会导致read方法非阻塞
channel.configureBlocking(false);
channels.add(channel);
}
for (SocketChannel socketChannel : channels) {
//上面设置了非阻塞,如果没有读到数据,就返回
int len = socketChannel.read(byteBuffer);
if (len > 0) {
//5 接收客户端发送的数据
log.info("before read... {}", socketChannel);
byteBuffer.flip();
debugRead(byteBuffer);
byteBuffer.clear();
log.info("after read... {}", socketChannel);
}
}
}
}
}

缺点:即使没有连接或者消息,线程同样在跑,会影响实际性能。

多路复用

单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用

  • 多路复用仅针对网络 IO、普通文件 IO 没法利用多路复用
  • 如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 能够保证
    • 有可连接事件时才去连接
    • 有可读事件才去读取
    • 有可写事件才去写入
      • 限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件

2.4.2 Selector

好处

  • 一个线程配合 selector 就可以监控多个 channel 的事件,事件发生线程才去处理。避免非阻塞模式下所做无用功
  • 让这个线程能够被充分利用
  • 节约了线程的数量
  • 减少了线程上下文切换

创建

1
Selector selector = Selector.open();

绑定 Channel 事件

也称之为注册事件,绑定的事件 selector 才会关心

1
2
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, 绑定事件, null);
  • channel 必须工作在非阻塞模式
  • FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用
  • 绑定的事件类型可以有
    • connect - 客户端连接成功时触发
    • accept - 服务器端成功接受连接时触发
    • read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
    • write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况

监听 Channel 事件

可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少 channel 发生了事件

方法1,阻塞直到绑定事件发生

1
int count = selector.select();

方法2,阻塞直到绑定事件发生,或是超时(时间单位为 ms)

1
int count = selector.select(long timeout);

方法3,不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件

1
int count = selector.selectNow();

💡 select 何时不阻塞

  • 事件发生时
    • 客户端发起连接请求,会触发 accept 事件
    • 客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件
    • channel 可写,会触发 write 事件
    • 在 linux 下存在nio的bug,不论怎么处理都是不阻塞。这边netty中,从另一个角度解决了该问题。
  • 调用 selector.wakeup()
  • 调用 selector.close()
  • selector 所在线程 interrupt

2.4.3 处理 accept 事件

客户端代码为

1
2
3
4
5
6
7
8
9
10
11
public class Client {
public static void main(String[] args) {
try (Socket socket = new Socket("localhost", 8080)) {
System.out.println(socket);
socket.getOutputStream().write("world".getBytes());
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}

服务器端代码为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Slf4j
public class ChannelDemo6 {
public static void main(String[] args) {
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.bind(new InetSocketAddress(8080));
System.out.println(channel);
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
int count = selector.select();
// int count = selector.selectNow();
log.debug("select count: {}", count);
// if(count <= 0) {
// continue;
// }

// 获取所有事件
Set<SelectionKey> keys = selector.selectedKeys();

// 遍历所有事件,逐一处理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判断事件类型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必须处理
SocketChannel sc = c.accept();
log.debug("{}", sc);
}
// 处理完毕,必须将事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

💡 事件发生后能否不处理

事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发

2.4.4 处理 read 事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {
//1. 创建Selector, 用来管理多个Channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
//2. 建立Selector与ServerSocketChannel之间的联系(将Channel注册到Selector)
// SelectionKey就是将来事件发生后, 通过他可以知道事件和哪个channel的事件
// 0表示不监听事件
SelectionKey sscKey = ssc.register(selector, 0, null);
//测试往selector上面注册多个ssc
// ServerSocketChannel tempSsc = ServerSocketChannel.open();
// tempSsc.configureBlocking(false);
// tempSsc.register(selector, 0, null);
log.info("register key: {}", sscKey);
// 配置监听的事件, 只监听连接事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);

ssc.bind(new InetSocketAddress(8080));
while (true) {
//3. 调用Selector的select方法, select是阻塞方法。没有事件发生,线程阻塞(cpu闲置)
// select 在事件未处理时(调用accept或者cancel表示已处理), 它不会阻塞
selector.select();
//4. 处理事件, selectedKeys内部包含了所有发生的事件
Set<SelectionKey> keys = selector.keys();
log.info("all keys size: {}", keys.size());
Set<SelectionKey> selectionKeys = selector.selectedKeys();
log.info("keys size: {}", selectionKeys.size());
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
log.info("key: {}", key);
iterator.remove();//不执行删除,会存在下一次别的key触发事件时,上一个key还存在里面
//5. 区分事件类型
if (key.isAcceptable()) {//建立连接accept
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
//非阻塞下返回为null表示没有连接建立,这时候八成是keys里面没有移除。
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
log.info("{}", sc);
SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
} else if (key.isReadable()) {//接收消息 read
try {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(16);
int read = channel.read(byteBuffer);//如果是正常断开,返回值是-1
if (read == -1) {
key.cancel();
} else {
// 打印
byteBuffer.flip();
debugRead(byteBuffer);
}

} catch (IOException e) {
e.printStackTrace();
key.cancel();//如果客户端关闭了连接,要取消注册,会在keys中彻底删除
}
}
}
}
}
}

为何要 iter.remove()

因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如

  • 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey
  • 第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常

cancel 的作用

cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件

不处理边界的问题

以前有同学写过这样的代码,思考注释中两个问题,以 bio 为例,其实 nio 道理是一样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Server {
public static void main(String[] args) throws IOException {
ServerSocket ss=new ServerSocket(9000);
while (true) {
Socket s = ss.accept();
InputStream in = s.getInputStream();
// 这里这么写,有没有问题
byte[] arr = new byte[4];
while(true) {
int read = in.read(arr);
// 这里这么写,有没有问题
if(read == -1) {
break;
}
System.out.println(new String(arr, 0, read));
}
}
}
}

客户端

1
2
3
4
5
6
7
8
9
10
public class Client {
public static void main(String[] args) throws IOException {
Socket max = new Socket("localhost", 9000);
OutputStream out = max.getOutputStream();
out.write("hello".getBytes());
out.write("world".getBytes());
out.write("你好".getBytes());
max.close();
}
}

输出

1
2
3
4
hell
owor
ld
�好

为什么?

一个中文在utf-8编码下,占3个字节。

用4个字节接收,肯定会乱码

使用附件处理消息边界

  • 固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽
  • 按分隔符拆分,缺点是效率低
  • TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量
    • Http 1.1 是 TLV 格式
    • Http 2.0 是 LTV 格式

HTTP中TLV或者LTV格式,比较直观的表示就是Content-Type与Content-Length

举个场景,我的ByteBuffer容量为16,结果我发过来的消息长度是33。那么我需要接收三次,才能获取到完整包。

而且,这三次数据的ByteBuffer肯定是不能与其他Channel共用的,而应该是Channel自身拥有的。

类似于SpringMVC中每个请求的线程绑定实现,Channel提供了在注册时的附件参数,只供自己使用。

下面展示使用按分隔符(\n)拆分附件的方式实现的Server端代码

按分隔符拆分的逻辑,参照2.2.6

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@Slf4j
public class ServerPro {

private static void handle(ByteBuffer buffer) {
buffer.flip();
for (int i = 0; i < buffer.limit(); i++) {
//找到一条完整消息
if (buffer.get(i) == '\n') {
int length = i + 1 - buffer.position();
//把这条消息存入新的ByteBuffer
ByteBuffer byteBuffer = ByteBuffer.allocate(length);
//从buffer读,像byteBuffer写
for (int j = 0; j < length; j++) {
byteBuffer.put(buffer.get());
}
byteBuffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(byteBuffer));
}
}
//移动到未读的位置,如果都没读,缓冲区就是满的,就不能读入了。这也是能够扩容的前提。
//执行到position、limit相等时,就应该扩容。
buffer.compact();
}

public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
SelectionKey sscKey = ssc.register(selector, 0, null);
sscKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
//第三个参数,表示注册的附件。每个channel独有的附件。
SelectionKey scKey = sc.register(selector, 0, ByteBuffer.allocate(5));
scKey.interestOps(SelectionKey.OP_READ);
} else if (key.isReadable()) {
try {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
int read = channel.read(byteBuffer);
if (read == -1) {//正常断开
key.cancel();
} else {
handle(byteBuffer);
//执行到position、limit相等时,就应该扩容。
if (byteBuffer.position() == byteBuffer.limit()) {
ByteBuffer newByteBuffer = ByteBuffer.allocate(byteBuffer.capacity() + 1);
log.info("缓冲区扩容到{}", newByteBuffer.capacity());
byteBuffer.flip();
//将旧的数据copy到新的buffer
newByteBuffer.put(byteBuffer);
key.attach(newByteBuffer);
}
}
} catch (IOException e) {//异常断开
e.printStackTrace();
key.cancel();
}
}
}
}
}
}

以上代码,只是一个特别简易的例子,但会导致缓冲区容量越来越大,浪费空间。

在netty中,不仅有扩大容量,还有自动缩小容量。

ByteBuffer 大小分配

  • 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer
  • ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer
    • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能 ,参考实现源码
    • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

2.4.5 处理 write 事件

多次才能写完例子

  • 非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)
  • 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略
    • 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上
    • selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册,如果不取消,会每次可写均会触发 write 事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@Slf4j
public class WriteServer {
public static void main(String[] args) throws Exception {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while (true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
//((ServerSocketChannel) key.channel()).accept();
//写法作用同上
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
//向客户端发送大量数据
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 30001002; i++) {
sb.append("a");
}
ByteBuffer buffer = StandardCharsets.UTF_8.encode(sb.toString());
int write = sc.write(buffer);
log.info("写出字节数{}", write);
if (buffer.hasRemaining()) {
//获取到已经关注的事件,再额外加上写事件。
//读1 写4 连接成功8 接收连接16
scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);//注意越加越多
//scKey.interestOps(scKey.interestOps() | SelectionKey.OP_WRITE);//效果同上
//未写完的数据,挂到scKey
scKey.attach(buffer);
}
} else if (key.isWritable()) {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);
log.info("写出字节数{}", write);
//清理
if (!buffer.hasRemaining()) {
key.attach(null);
//不再关注可写事件
//channel关注了可写事件,那么只要缓冲区可写就会触发,所以应该在写完之后,取消关注write
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
}
} else if (key.isReadable()) {
log.info("可读");
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer attachment = (ByteBuffer) key.attachment();
if (attachment == null) {
attachment = ByteBuffer.allocate(5);
key.attach(attachment);
}
try {
int read = sc.read(attachment);
if (read == -1) {
key.cancel();
} else {
attachment.clear();
}
} catch (Exception e) {
key.cancel();
}
}
}
}
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class WriteClient {
public static void main(String[] args) throws Exception {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress(8080));
int count = 0;
while (true) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
System.out.println(count);
buffer.clear();
if(count==30001002) {
break;
}
}
ByteBuffer 一拳打爆江诗彼岸 = StandardCharsets.UTF_8.encode("一拳打爆江诗彼岸");
System.out.println(一拳打爆江诗彼岸.capacity());
sc.write(一拳打爆江诗彼岸);
System.in.read();
}
}

write 为何要取消

只要向 channel 发送数据时,socket 缓冲可写,这个事件会频繁触发。

因此应当只在 socket 缓冲区写不下时再关注可写事件,数据写完之后再取消关注

2.4.6 利用多线程优化

现在都是多核 cpu,设计时要充分考虑别让 cpu 的力量被白白浪费

前面的代码只有一个线程,配合一个选择器。没有充分利用多核 cpu,比如某个事件比较耗时,就会影响其他的事件执行效率。

如何改进呢?

分两组选择器

  • boss:专门处理accept事件,分配一个单线程
  • worker:专门处理读写事件,分配多个线程,每个线程配一个选择器

方式一:使用wakeUp和队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@Slf4j
public class MultiThreadServer {

public static void main(String[] args) throws Exception {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
ssc.register(boss, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
//创建固定数量的worker
Worker worker = new Worker("worker-0");
while (true) {
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
//关联给worker的selector,监听read write
log.info("connected...{}", sc.getRemoteAddress());
log.info("before registered...{}", sc.getRemoteAddress());
worker.register(sc);
log.info("after registered...{}", sc.getRemoteAddress());
}
}
}
}

static class Worker implements Runnable {
private Thread thread;

private Selector selector;

private String name;

private volatile boolean start = false;

private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

public Worker(String name) {
this.name = name;
}

/**
* 初始化线程和selector
*/
public void register(SocketChannel sc) throws IOException {
log.info("register");
if (!start) {
this.selector = Selector.open();
this.start = true;
this.thread = new Thread(this, name);
this.thread.start();
}
//向队列添加了任务,但这个任务并没有立即执行
queue.add(() -> {
try {
sc.register(selector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
//唤醒select
selector.wakeup();
}

@Override
public void run() {
while (true) {
try {
this.selector.select();
Runnable runnable = queue.poll();
if (runnable != null) {
runnable.run();
}
Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
SocketChannel sc = (SocketChannel) key.channel();
if (key.isReadable()) {
log.info("read...{}", sc.getRemoteAddress());
ByteBuffer byteBuffer = ByteBuffer.allocate(16);
sc.read(byteBuffer);
byteBuffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(byteBuffer));
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

方式二:仅使用wakeUp

不使用队列,仅使用wakeUp

这个方式可能会存在线程竞争,导致的执行顺序无法保证。执行顺序可能存在三种情况。

wakeUp

可以让上一个阻塞的select立即返回

如果没有阻塞的select,可以让下一个要阻塞的select立即返回。

这也是仅使用wakeUp就能实现队列的效果的原因了。

执行顺序一:下次循环方能监听事件

1
2
3
this.selector.select();//worker线程
selector.wakeup();//boss线程
sc.register(selector, SelectionKey.OP_READ);//boss线程

执行顺序二:下次循环方能监听事件

1
2
3
selector.wakeup();//boss线程
this.selector.select();//worker线程
sc.register(selector, SelectionKey.OP_READ);//boss线程

执行顺序三:本次循环即可监听事件,实现效果与使用队列效果相同

1
2
3
selector.wakeup();//boss线程
sc.register(selector, SelectionKey.OP_READ);//boss线程
this.selector.select();//worker线程

总而言之,该方式效率不如使用队列效果好,但是队列这种做法也只是空间换时间,实际上,也不会存在大量连接重复连入注册事件,所以实际而言,反而是方式二,更能节约系统资源。

优化最终版

如何分配CPU资源,可以参考阿姆达尔定律

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@Slf4j
public class MultiThreadServer {

public static void main(String[] args) throws Exception {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
ssc.register(boss, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
//创建固定数量的worker
//下面这种获取方式,像我的cpu是8核16线程,那么同一时刻可以跑16个线程,获取到的可用处理器数量就是16
//但是针对于docker,我分配了1个处理器,他还是会拿到16个。在jdk10之后,可以使用jvm参数UseContainerSupport配置,方能正确获取到1个处理器
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-" + i);
}
AtomicInteger index = new AtomicInteger(0);
while (true) {
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
//关联给worker的selector,监听read write
log.info("connected...{}", sc.getRemoteAddress());
log.info("before registered...{}", sc.getRemoteAddress());
//使用轮询算法
workers[index.getAndIncrement() % workers.length].register(sc);
log.info("after registered...{}", sc.getRemoteAddress());
}
}
}
}

static class Worker implements Runnable {
private Thread thread;

private Selector selector;

private String name;

private volatile boolean start = false;

public Worker(String name) {
this.name = name;
}

/**
* 初始化线程和selector
*/
public void register(SocketChannel sc) throws IOException {
log.info("register");
if (!start) {
this.selector = Selector.open();
this.start = true;
this.thread = new Thread(this, name);
this.thread.start();
}
//唤醒select
selector.wakeup();
sc.register(selector, SelectionKey.OP_READ);
}

@Override
public void run() {
while (true) {
try {
this.selector.select();
Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
SocketChannel sc = (SocketChannel) key.channel();
if (key.isReadable()) {
log.info("read...{}", sc.getRemoteAddress());
ByteBuffer byteBuffer = ByteBuffer.allocate(16);
sc.read(byteBuffer);
byteBuffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(byteBuffer));
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

2.5 UDP网络编程

  • UDP 是无连接的,client 发送数据不会管 server 是否开启
  • server 这边的 receive 方法会将接收到的数据存入 byte buffer,但如果数据报文超过 buffer 大小,多出来的数据会被默默抛弃

首先启动服务器端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class UdpServer {
public static void main(String[] args) {
try (DatagramChannel channel = DatagramChannel.open()) {
channel.socket().bind(new InetSocketAddress(9999));
System.out.println("waiting...");
ByteBuffer buffer = ByteBuffer.allocate(32);
channel.receive(buffer);
buffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(buffer));
} catch (IOException e) {
e.printStackTrace();
}
}
}

运行客户端

1
2
3
4
5
6
7
8
9
10
11
public class UdpClient {
public static void main(String[] args) {
try (DatagramChannel channel = DatagramChannel.open()) {
ByteBuffer buffer = StandardCharsets.UTF_8.encode("hello");
InetSocketAddress address = new InetSocketAddress("localhost", 9999);
channel.send(buffer, address);
} catch (Exception e) {
e.printStackTrace();
}
}
}

三、总结

3.1 stream vs channel

  • stream 不会自动缓冲数据,是更高层的API。channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
  • stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用(文件 channel 不可多路复用)
  • 二者均为全双工,即读写可以同时进行

3.2 IO 模型

以下概念参考自UNIX网络编程卷1

同步阻塞、同步非阻塞、同步多路复用、异步阻塞(没有此情况)、异步非阻塞

  • 同步:线程自己去获取结果(一个线程)
  • 异步:线程自己不去获取结果,而是由其它线程送结果(至少两个线程)

当调用一次 channel.read 或 stream.read 后,会切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:

  • 等待数据阶段
  • 复制数据阶段

3.2.1 同步阻塞 IO

3.2.2 同步非阻塞 IO

由图可知,只是在等待数据的过程中,实现了非阻塞,一直去问有无数据,并立即返回。参考2.4.1的非阻塞

但实际复制数据阶段,还是阻塞的。

非阻塞IO并没有比阻塞IO有性能提升。

3.2.3 同步多路复用

多路复用相当于在非阻塞IO基础上,解决了非阻塞IO的弊端。参考2.4.1的非阻塞

3.2.4 异步非阻塞

3.2.5 零拷贝

传统 IO 问题

传统的 IO 将一个文件通过 socket 写出

1
2
3
4
5
6
7
8
File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");

byte[] buf = new byte[(int)f.length()];
file.read(buf);

Socket socket = ...;
socket.getOutputStream().write(buf);

内部工作流程是这样的:

  1. java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 cpu

    DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO

  2. 内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA

  3. 调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,cpu 会参与拷贝

  4. 接下来要向网卡写数据,这项能力 java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的

  • 用户态与内核态的切换发生了 3 次,这个操作比较重量级
  • 数据拷贝了共 4 次

NIO 优化

通过 DirectByteBuf

  • ByteBuffer.allocate(10) HeapByteBuffer 使用的还是 java 内存
  • ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系统内存

大部分步骤与优化前相同,不再赘述。唯有一点:java 可以使用 DirectByteBuf 将堆外内存映射到 jvm 内存中来直接访问使用

  • 这块内存不受 jvm 垃圾回收的影响,因此内存地址固定,有助于 IO 读写

  • java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步

    1. DirectByteBuf 对象被垃圾回收,将虚引用加入引用队列

    2. 通过专门线程访问引用队列,根据虚引用释放堆外内存

减少了一次数据拷贝,用户态与内核态的切换次数没有减少

进一步优化(底层采用了 linux 2.1 后提供的 sendFile 方法),java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据

  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
  2. 数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
  3. 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

可以看到

  • 只发生了一次用户态与内核态的切换
  • 数据拷贝了 3 次

进一步优化(linux 2.4)

  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
  2. 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
  3. 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu

整个过程仅只发生了一次用户态与内核态的切换,数据拷贝了 2 次。

所谓的【零拷贝】,并不是真正无拷贝,而是在不会拷贝重复数据到 jvm 内存中,零拷贝的优点有

  • 更少的用户态与内核态的切换
  • 不利用 cpu 计算,减少 cpu 缓存伪共享
  • 零拷贝适合频繁传输、小文件传输(内核缓冲区如果传输过大的内容,会导致其他文件读写就会受到影响)

3.3 AIO

AIO 用来解决数据复制阶段的阻塞问题

  • 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
  • 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果

异步模型需要底层操作系统(Kernel)提供支持

  • Windows 系统通过 IOCP 实现了真正的异步 IO
  • Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势

Netty 5.0 就引入了异步IO,结果性能没优势,且引入了没必要的复杂性,维护成本高。所以在后来Netty 5.0废弃。

3.3.1 文件 AIO

文件不支持多路复用,但是支持异步IO

先来看看 AsynchronousFileChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
public class AioFileChannel {
public static void main(String[] args) throws Exception {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data.txt"), StandardOpenOption.READ);
//参数1 存储结果的Buffer
//参数2 读取的起始位置
//参数3 附件,一次读不完,存储下一次的结果
//参数4 回调对象
ByteBuffer byteBuffer = ByteBuffer.allocate(16);

log.info("read start...");
channel.read(byteBuffer, 0, ByteBuffer.allocate(4), new CompletionHandler<Integer, ByteBuffer>() {
//read成功
@Override
public void completed(Integer result, ByteBuffer attachment) {
log.info("read completed...");
byteBuffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(byteBuffer));
}

//read出现异常
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
log.info("read end...");
//处理数据的是个守护线程
//主线程结束,守护线程即使没执行完,也结束了,因此加个阻塞
System.in.read();
}
}

默认文件 AIO 使用的线程都是守护线程,所以最后要执行 System.in.read() 以避免守护线程意外结束

可以看到

  • 响应文件读取成功的是另一个线程 Thread-16
  • 主线程并没有 IO 操作阻塞

3.3.2 网络 AIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;

public class AioServer {
public static void main(String[] args) throws IOException {
AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.accept(null, new AcceptHandler(ssc));
System.in.read();
}

private static void closeChannel(AsynchronousSocketChannel sc) {
try {
System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}

private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;

public ReadHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}

@Override
public void completed(Integer result, ByteBuffer attachment) {
try {
if (result == -1) {
closeChannel(sc);
return;
}
System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());
attachment.flip();
System.out.println(StandardCharsets.UTF_8.decode(attachment));
attachment.clear();
// 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
sc.read(attachment, attachment, this);
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
closeChannel(sc);
exc.printStackTrace();
}
}

private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;

private WriteHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}

@Override
public void completed(Integer result, ByteBuffer attachment) {
// 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容
if (attachment.hasRemaining()) {
sc.write(attachment);
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
closeChannel(sc);
}
}

private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
private final AsynchronousServerSocketChannel ssc;

public AcceptHandler(AsynchronousServerSocketChannel ssc) {
this.ssc = ssc;
}

@Override
public void completed(AsynchronousSocketChannel sc, Object attachment) {
try {
System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());
} catch (IOException e) {
e.printStackTrace();
}
ByteBuffer buffer = ByteBuffer.allocate(16);
// 读事件由 ReadHandler 处理
sc.read(buffer, buffer, new ReadHandler(sc));
// 写事件由 WriteHandler 处理
sc.write(StandardCharsets.UTF_8.encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
// 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
ssc.accept(null, this);
}

@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
}
}
发布:2023-03-16 21:30:59
修改:2023-05-03 22:15:21
链接:https://meethigher.top/blog/2023/java-nio/
标签:java 
付款码 打赏 分享
Shift+Ctrl+1 可控制工具栏