一、概述 1.1 概念 Netty是一个异步的基于事件驱动(即多路复用技术)的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端。
1.2 地位 Netty在Java网络应用框架中的地位就好比,Spring框架在JavaEE开发中的地位。
以下的框架都使用了Netty,因为他们有网络通信需求。
Cassandra:非关系型数据库 Spark:大数据分布式计算框架 Hadoop:大数据分布式存储框架 RocketMQ:阿里开源的消息队列 ElasticSearch:搜索引擎 gRPC:RPC框架 Dubbo:RPC框架 Spring 5.x:flux api完全抛弃了tomcat,使用netty作为服务器端 Zookeeper:分布式协调框架 1.3 优势 Netty同样是基于java nio开发。如果自己使用nio开发,工作量大,bug 多,这是因为Netty已经做好了基础部分构建协议 解决 TCP 传输问题,如粘包、拆包 Linux多路复用的底层是epoll,会存在空轮询导致 CPU 100%(对应nio中Linux下不阻塞),Netty兼容并解决该问题 对 API 进行增强,使之更易用,如 ThreadLocal => FastThreadLocal ,ByteBuffer => ByteBuf Netty vs 其它网络应用框架Mina 由 apache 维护,将来 3.x 版本可能会有较大重构,破坏 API 向下兼容性,Netty 的开发迭代更迅速,API 更简洁、文档更优秀 久经考验,Netty 版本2.x 2004 3.x 2008 4.x 2013 5.x 已废弃(使用了AIO,但是Linux的是伪AIO,只有Win真正实现了AIO。实际没有明显的性能提升,却导致维护成本高) 二、入门 2.1 需求 首先需要引入依赖
1 2 3 4 5 <dependency > <groupId > io.netty</groupId > <artifactId > netty-all</artifactId > <version > 4.1.39.Final</version > </dependency >
使用Netty开发一个简单的服务器端和客户端
客户端向服务器端发送 hello, world 服务器仅接收,不返回 2.2 实现 服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class HelloServer { public static void main (String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class ) .childHandler (//child 即Worker ,负责读写。决定了Worker 能执行哪些操作(Handler ) new ChannelInitializer <NioSocketChannel >() { @Override protected void initChannel (NioSocketChannel channel) throws Exception { channel.pipeline().addLast(new StringDecoder()); channel.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); } }); } }) .bind(8080 ); } }
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class HelloClient { public static void main (String[] args) throws Exception { new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class ) //添加处理器,连接建立后该初始化器被调用 .handler (new ChannelInitializer <NioSocketChannel >() { @Override protected void initChannel (NioSocketChannel channel) throws Exception { channel.pipeline() .addLast(new StringEncoder()); } }) .connect("localhost" , 8080 ) .sync() .channel() .writeAndFlush("hello world" ); } }
针对上述代码对Netty的流程理解
channel 是数据的通道,与 jdk nio 中 channel 作用一致 msg 是流动的数据。输入是 ByteBuf ,输出也是 ByteBuf 。但是中间会经过 pipeline 加工,变成其他的类型对象。 handler是数据的处理工序工序有多道,合在一起就是 pipeline 。 pipeline 负责发布事件(读、读完成等)传播给各个 handler , handler 对自己感兴趣的事件进行处理 handler分为Inbound(数据输入时走入站handler)和Outbound(数据输出时走出站handler)两类 eventLoop(底层就是一个线程)是处理数据的工人工人可以管理多个 channel 的 io 操作。并且工人和 channel 针对io操作是绑定的(这也是从线程安全的角度考虑,如果一个 channel 可以被多个线程管理,就会存在多个线程一起读写的情况,防止出问题可能还要做串行操作) 工人既可以执行 io 操作,也可以进行任务的处理。每位工人有任务队列,队列里可以存储该工人绑定的多个 channel 的待处理任务,任务分为普通任务、定时任务 工人按照 pipeline 顺序,依次按照 handler 的代码处理数据,可以为每道工序指定不同的工人(只适用非io操作) 。 三、组件 3.1 EventLoop EventLoop,事件循环对象
EventLoop 本质是一个单线程执行器 (同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。
它的继承关系比较复杂
一般我们不会直接使用EventLoop,而是使用EventLoopGroup
3.2 EventLoopGroup EventLoopGroup,事件循环组
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
继承自 io.netty.util.concurrent.EventExecutorGroup
实现了 Iterable 接口提供遍历 EventLoop 的能力 另有 next 方法获取集合中下一个 EventLoop, next 底层是轮询 使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Slf 4jpublic class TestEventLoop { public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(2 ); System.out.println(group.next()); System.out.println(group.next()); System.out.println(group.next()); group.next().execute(() -> { log.info("普通任务" ); }); group.next().scheduleAtFixedRate(() -> { log.info("定时任务" ); }, 0L , 2L , TimeUnit.SECONDS); } }
3.2.1 完善2.1 将2.1的需求,进一步完善,代码如下。
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 @Slf 4jpublic class EventLoopServer { public static void main (String[] args) { DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup(2 , new ThreadFactory() { private final AtomicInteger atomicInteger = new AtomicInteger(0 ); @Override public Thread newThread (Runnable r) { return new Thread(r, "calc-" + atomicInteger.incrementAndGet()); } }); new ServerBootstrap() .group(new NioEventLoopGroup(1 , new ThreadFactory() { private final AtomicInteger atomicInteger = new AtomicInteger(0 ); @Override public Thread newThread (Runnable r) { return new Thread(r, "boss-" + atomicInteger.incrementAndGet()); } }), new NioEventLoopGroup(2 , new ThreadFactory() { private final AtomicInteger atomicInteger = new AtomicInteger(0 ); @Override public Thread newThread (Runnable r) { return new Thread(r, "worker-" + atomicInteger.incrementAndGet()); } })) .channel(NioServerSocketChannel.class ) .childHandler (new ChannelInitializer <NioSocketChannel >() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline() .addLast("handler1" , new ChannelInboundHandlerAdapter() { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; log.info(buf.toString(Charset.defaultCharset()) + "_" + ctx.channel().remoteAddress()); ctx.fireChannelRead(msg); } }) .addLast(defaultEventLoopGroup, "handler2" , new ChannelInboundHandlerAdapter() { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; TimeUnit.SECONDS.sleep(2L ); log.info(buf.toString(Charset.defaultCharset()) + "_" + ctx.channel().remoteAddress()); } }); } }) .bind(8080 ); } }
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class EventLoopClient { public static void main (String[] args) throws Exception { ChannelFuture future = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class ) //添加处理器,连接建立后该初始化器被调用 .handler (new ChannelInitializer <NioSocketChannel >() { @Override protected void initChannel (NioSocketChannel channel) throws Exception { channel.pipeline() .addLast(new StringEncoder()); } }) .connect("localhost" , 8080 ); Channel channel = future .sync() .channel(); while (true ) { System.in.read(); channel.writeAndFlush("hello world" ); } } }
建立三个Client,每个Client发送一次消息。运行结果如图
由上图可知,channel第一次创建时,就与线程绑定了,不管是处理读写的worker,还是处理耗时的calc,都是绑定的。
h1与h2对应服务端的handler1与handler2
至于head与tail,后面的会提到
3.2.2 切换线程原理 查看源码io.netty.channel.AbstractChannelHandlerContext
中的invokeChannelRead
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 static void invokeChannelRead (final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg" ), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run () { next.invokeChannelRead(m); } }); } }
3.3 Channel channel 的主要作用
close() 可以用来异步关闭 channel closeFuture() 用来执行 channel 关闭后的善后操作sync 方法作用是同步等待 channel 关闭 而 addListener 方法是异步等待 channel 关闭 pipeline() 方法添加处理器 write() 方法将数据写到缓冲区,但并不是立即写出可以直接将3.2.1的代码,修改成write方法,尝试即可 writeAndFlush() 方法立即将数据写出 3.3.1 ChannelFuture 保证获取到的channel是成功连接后的,两种方式
sync 阻塞,本线程阻塞,直到channel成功建立连接 addListener(回调对象) 添加回调,其他线程执行,channel监听到连接成功后执行回调对象 服务端代码保持不变,客户端代码修改
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 @Slf 4jpublic class EventLoopClient { public static void main (String[] args) throws Exception { ChannelFuture future = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class ) //添加处理器,连接建立后该初始化器被调用 .handler (new ChannelInitializer <NioSocketChannel >() { @Override protected void initChannel (NioSocketChannel channel) throws Exception { channel.pipeline() .addLast(new StringEncoder()); } }) .connect("localhost" , 8080 ); future.addListener(new ChannelFutureListener() { @Override public void operationComplete (ChannelFuture future) throws Exception { Channel channel = future.channel(); for (int i = 0 ; i < 2 ; i++) { if (channel.isActive()) { log.info("已连接" ); channel.writeAndFlush("hello world" ); } else { log.info("未连接" ); } TimeUnit.SECONDS.sleep(1 ); } } }); } }
3.3.2 CloseFuture Channel通过 closeFuture() 来进行善后操作
sync 方法作用是同步等待 channel 关闭 而 addListener 方法是异步等待 channel 关闭 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 @Slf 4jpublic class CloseFutureClient { public static void main (String[] args) throws Exception { NioEventLoopGroup group = new NioEventLoopGroup(); ChannelFuture future = new Bootstrap() .group(group) .channel(NioSocketChannel.class ) .handler (new ChannelInitializer <NioSocketChannel >() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline() .addLast(new LoggingHandler(LogLevel.DEBUG)) .addLast(new StringEncoder()); } }) .connect("localhost" , 8080 ); Channel channel = future.sync().channel(); new Thread(() -> { Scanner scanner = new Scanner(System.in); while (true ) { String line = scanner.nextLine(); if ("q" .equals(line)) { channel.close(); log.info("调用close方法" ); break ; } channel.writeAndFlush(line); } }, "input" ).start(); channel.closeFuture().addListener(new ChannelFutureListener() { @Override public void operationComplete (ChannelFuture future) throws Exception { log.info("成功关闭后的回调" ); group.shutdownGracefully(); } }); } }
Netty可以通过LoggingHandler打印日志,直观的查看Channel连接、收发、断开的过程
3.3.3 为何异步 疑问:为什么不在一个线程中去执行建立连接、去执行关闭 channel,那样不是也可以吗?非要用这么复杂的异步方式:比如一个线程发起建立连接,另一个线程去真正建立连接;一个线程去关闭连接,另一个线程真正去关闭连接。
这个问题也很简单,就比如多路复用的做法。我只有4个线程来发起长连接,如果一线程一长连接那种,撑死只能建立4个长连接。但是使用多路复用技术,就能处理更多的长连接了。这也就是Netty异步的核心思想了。
3.4 Future&&Promise 3.4.1 比较 在异步处理时,经常用到这 Future Promise 两个接口
首先要说明 netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展
jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果。比如get方法,就是只能同步等待获取结果。 netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果(比如CloseFuture的addListener) ,但都是要等任务结束 netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器 功能/名称 jdk Future netty Future Promise cancel 取消任务 - - isCanceled 任务是否取消 - - isDone 任务是否完成,不能区分成功失败 - - get 获取任务结果,阻塞等待 - - getNow - 获取任务结果,非阻塞,还未产生结果时返回 null - await - 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 - sync - 等待任务结束,如果任务失败,抛出异常 - isSuccess - 判断任务是否成功 - cause - 获取失败信息,非阻塞,如果没有失败,返回null - addLinstener - 添加回调,异步接收结果 - setSuccess - - 设置成功结果 setFailure - - 设置失败结果
3.4.2 示例 jdk Future
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Slf 4jpublic class TestJDKFuture { public static void main (String[] args) throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(2 ); Future<Integer> future = executorService.submit(new Callable<Integer>() { @Override public Integer call () throws Exception { log.info("calc.." ); TimeUnit.SECONDS.sleep(2 ); return ThreadLocalRandom.current().nextInt(1 , 10 ); } }); log.info("waiting.." ); Integer integer = future.get(); log.info("received..{}" , integer); } }
netty Future
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Slf 4jpublic class TestNettyFuture { public static void main (String[] args) throws Exception { NioEventLoopGroup group = new NioEventLoopGroup(1 ); EventLoop eventLoop = group.next(); Future<Integer> future = eventLoop.submit(new Callable<Integer>() { @Override public Integer call () throws Exception { log.info("calc.." ); TimeUnit.SECONDS.sleep(2 ); return ThreadLocalRandom.current().nextInt(1 , 10 ); } }); future.addListener(new GenericFutureListener<Future<Integer>>() { @Override public void operationComplete (Future<Integer> future) throws Exception { log.info("waiting.." ); Integer integer = future.getNow(); log.info("received..{}" , integer); } }); } }
netty Promise
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Slf 4jpublic class TestNettyPromise { public static void main (String[] args) throws Exception { EventLoop eventLoop = new NioEventLoopGroup(1 ).next(); Promise<Integer> promise = new DefaultPromise<>(eventLoop); new Thread(() -> { log.info("calc.." ); try { TimeUnit.SECONDS.sleep(2L ); promise.setSuccess(ThreadLocalRandom.current().nextInt(1 , 10 )); } catch (InterruptedException e) { e.printStackTrace(); promise.setFailure(e); } }).start(); log.info("waiting.." ); Integer integer = promise.get(); log.info("received..{}" , integer); } }
3.5 Handler&&Pipeline 3.5.1 Handler在Pipeline中执行顺序 ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline
入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据、写回结果 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工 打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而后面要讲的 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 @Slf 4jpublic class TestPipeline { public static void main (String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class ) .childHandler (new ChannelInitializer <NioSocketChannel >() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("In_1" , new ChannelInboundHandlerAdapter() { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { log.info("In_1" ); super .channelRead(ctx, msg); } }); pipeline.addLast("In_2" , new ChannelInboundHandlerAdapter() { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { log.info("In_2" ); super .channelRead(ctx, msg); } }); pipeline.addLast("In_3" , new ChannelInboundHandlerAdapter() { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { log.info("In_3" ); super .channelRead(ctx, msg); ctx.channel().writeAndFlush("hello world" ); } }); pipeline.addLast("Out_4" , new ChannelOutboundHandlerAdapter() { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.info("Out_4" ); super .write(ctx, msg, promise); } }); pipeline.addLast("Out_5" , new ChannelOutboundHandlerAdapter() { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.info("Out_5" ); super .write(ctx, msg, promise); } }); pipeline.addLast("Out_6" , new ChannelOutboundHandlerAdapter() { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.info("Out_6" ); super .write(ctx, msg, promise); } }); } }) .bind(8080 ); } } @Slf 4jclass TestPipelineClient { public static void main (String[] args) throws Exception { Channel channel = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class ) .handler (new ChannelInitializer <NioSocketChannel >() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline() .addLast(new StringEncoder()) .addLast(new StringDecoder()) .addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { log.info("received==>{}" , msg); } }) .addLast(new ChannelOutboundHandlerAdapter() { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.info("writed==>{}" , msg); super .write(ctx, msg, promise); } }); } }) .connect("localhost" , 8080 ) .sync().channel(); while (true ) { System.in.read(); channel.writeAndFlush("hello world" ); } } }
可以看到,ChannelInboundHandlerAdapter
是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter
是按照 addLast 的逆序执行的。
ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表
3.5.2 更方便地测试Handler执行顺序 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 @Slf 4jpublic class TestEmbeddedChannel { public static void main (String[] args) { ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { log.info("h1" ); super .channelRead(ctx, msg); } }; ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { log.info("h2" ); super .channelRead(ctx, msg); } }; ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.info("h3" ); super .write(ctx, msg, promise); } }; ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.info("h4" ); super .write(ctx, msg, promise); } }; EmbeddedChannel embeddedChannel = new EmbeddedChannel(h1, h2, h3, h4); log.info("测试入站" ); embeddedChannel.writeInbound("inbound" ); Object o = embeddedChannel.readInbound(); log.info("测试出站" ); embeddedChannel.writeOutbound("outbound" ); Object o1 = embeddedChannel.readOutbound(); } }
3.6 ByteBuf 3.6.1 优势 io.netty.buffer.ByteBuf
是对java.nio.ByteBuffer
的增强。
支持动态扩容。最大容量不超过Integer最大值。 池化思想。对直接内存影响最大,保证享受了直接内存的高读写的同时,又能有效避免重复开辟内存造成的性能损失。 读写指针分离。内部使用两套指针,标识读和写。与ByteBuffer相比,就能减少不必要的来回切换。 零拷贝。比如slice/duplicate/compositeByteBuf 方便开发者高效编写。比如链式调用。 3.6.2 组成 ByteBuf 由四部分组成
废弃字节 可读字节 可写字节 可扩容字节 该组成结构,使得ByteBuf在使用上,比ByteBuffer(如下图所示)方便许多,因为节省了人为频繁切换指针位置的操作。
3.6.3 使用 池化 VS 非池化 池化的最大意义在于可以重用 ByteBuf,优点有
没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率 高并发时,池化功能更节约内存,减少内存溢出的可能 池化功能是否开启,可以通过下面的系统环境变量来设置
1 -Dio.netty.allocator.type={unpooled|pooled}
4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现 4.1 之前,池化功能还不成熟,默认是非池化实现 直接内存 VS 堆内存 可以使用下面的代码来创建池化基于堆内存 的 ByteBuf
1 ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10 );
也可以使用下面的代码来创建池化基于直接内存 的 ByteBuf
1 ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10 );
直接内存 与 堆内存 的比较
直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放 调试工具类 首先创建一个调试工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import io.netty.buffer.ByteBuf;import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;import static io.netty.util.internal.StringUtil.NEWLINE;public class DebugByteBuf { public static void log (ByteBuf buffer) { log(buffer, false ); } public static void log (ByteBuf buffer, boolean pretty) { int length = buffer.readableBytes(); int rows = length / 16 + (length % 15 == 0 ? 0 : 1 ) + 4 ; StringBuilder buf = new StringBuilder(rows * 80 * 2 ) .append("read index:" ).append(buffer.readerIndex()) .append(" write index:" ).append(buffer.writerIndex()) .append(" capacity:" ).append(buffer.capacity()) .append(NEWLINE); if (pretty) { appendPrettyHexDump(buf, buffer); } System.out.println(buf.toString()); } }
创建 ByteBuf能自动扩容,初始值256,最大值为Integer最大范围
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class TestByteBuf { public static void main (String[] args) { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); System.out.println(buffer.getClass()); DebugByteBuf.log(buffer); StringBuilder sb = new StringBuilder(); for (int i = 0 ; i < 300 ; i++) { sb.append("a" ); } buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8)); DebugByteBuf.log(buffer); } }
写入 大小端存储 Big Endian(大端存储)
和Little Endian(小端存储)
是两种不同的字节存储方式,用于表示一个多字节数据类型在内存中的存储顺序。
不要将字节与位的关系混淆。
计算机中用来表示内存储器容量大小的基本单位是字节(Byte) ,此处是讲多字节数据类型的存储顺序。
Big Endian(大端存储)
是指内存的低地址,存储高位字节。
Little Endian(小端存储)
是指内存的低地址,存储低位字节。
操作系统都采用小端存储模式
通讯协议则采用大端存储模式
测试大小端存储时顺序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class DebugEndian { public static void debug (long num) { byte [] bigEndian = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN).putLong(num).array(); byte [] littleEndian = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.LITTLE_ENDIAN).putLong(num).array(); System.out.println("Big-endian dec: " + Arrays.toString(bigEndian)); System.out.println("Little-endian dec: " + Arrays.toString(littleEndian)); String[] bigEndianHex = new String[bigEndian.length]; for (int i = 0 ; i < bigEndian.length; i++) { bigEndianHex[i] = Integer.toHexString(bigEndian[i]); } String[] littleEndianHex = new String[littleEndian.length]; for (int i = 0 ; i < bigEndian.length; i++) { littleEndianHex[i] = Integer.toHexString(littleEndian[i]); } System.out.println("Big-endian hex: " + Arrays.toString(bigEndianHex)); System.out.println("Little-endian hex: " + Arrays.toString(littleEndianHex)); } public static void main (String[] args) { long num = 0x250L ; debug(num); } }
方法列表 方法列表,省略一些不重要的方法
方法 含义 备注 writeBoolean(boolean value) 写入 boolean 值,占1字节 非零为真 writeByte(int value) 写入 byte 值,占1字节 writeShort(int value) 写入 short 值,占2字节 writeInt(int value) 写入 int 值,占4字节 Big Endian(大端写入),如 250,写入后16进制表示 00 00 00 fa writeIntLE(int value) 写入 int 值,占4字节 Little Endian(小端写入),如 250,写入后16进制表示 fa 00 00 00 writeLong(long value) 写入 long 值,占8字节 writeChar(int value) 写入 char 值,占2字节 writeFloat(float value) 写入 float 值,占4字节 writeDouble(double value) 写入 double 值,占8字节 writeBytes(ByteBuf src) 写入 netty 的 ByteBuf writeBytes(byte[] src) 写入 byte[] writeBytes(ByteBuffer src) 写入 nio 的 ByteBuffer int writeCharSequence(CharSequence sequence, Charset charset) 写入字符串
注意
这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用 网络传输,默认习惯是 Big Endian (大端存储) CharSequence 是个接口,像 String/StringBuilder 都实现该接口 扩容 1 2 3 4 5 6 7 8 9 10 11 public class TestByteBufWrite { public static void main (String[] args) { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(3 ); buffer.writeBytes(new byte []{1 ,2 }); DebugByteBuf.log(buffer,true ); buffer.writeInt(250 ); DebugByteBuf.log(buffer,true ); buffer.writeIntLE(250 ); DebugByteBuf.log(buffer,true ); } }
由上图,可验证大小端存储相关知识。
不过由上图,也发现进行了自动扩容, ByteBuf 的扩容规则是
若容量小于16,则扩容后16 若容量大于16小于64,则扩容后64 若容量大于64,则每次扩容为当前容量的2倍 扩容不能超过 max capacity 会报错 读取 方法列表
方法名 含义 备注 int readByte() 读取一个字节 会向后移动读指针 ByteBuf markReaderIndex() 将当前位置定义为读标记。默认是0 ByteBuf resetReaderIndex() 重置到读标记 getXXX 读取 不会改变读指针
3.6.4 内存释放 释放原理 由于 Netty 中有多种内存的 ByteBuf 实现,因此要灵活处理
UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可 UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存,手动释放 PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存,手动释放还给内存池 不过,Netty 为了方便开发者手动释放内存,采用了引用计数法 来控制回收内存,每个 ByteBuf 都实现了 io.netty.util.ReferenceCounted
接口
每个 ByteBuf 对象初始后,计数为 1 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用 计数为0后的释放逻辑,在io.netty.buffer.AbstractReferenceCountedByteBuf#deallocate
方法中
谁来负责 release 呢?
不是我们想象的(一般情况下)
1 2 3 4 5 6 ByteBuf buf = ... try { ... } finally { buf.release(); }
请思考,因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在 finally 中 release 了,就失去了传递性(当然,如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)
基本规则是,谁是最后使用者,谁负责 release 。
在pipeline中,head与tail两个处理器可以自动做收尾工作
入站msg,tail 对 ByteBuf 进行释放 出站msg,head 对 ByteBuf 进行释放 开发者不能完全依赖head与tail
如果用户在某个handler中,并没有将ByteBuf往后面的处理器传,这时候,收尾的head与tail就失去了作用,因为你根本没把资源传递给我,我咋释放啊
tail源码 tail只处理入站,写出都是通过outHandler执行的,所以跟tail也没啥关系。因此代码中也只实现入站处理器。
1 2 final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {}
查看io.netty.channel.DefaultChannelPipeline.TailContext#channelRead
方法,往下跟,找到如下代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 protected void onUnhandledInboundMessage (Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration." , msg); } finally { ReferenceCountUtil.release(msg); } } public static boolean release (Object msg) { if (msg instanceof ReferenceCounted) { return ((ReferenceCounted) msg).release(); } return false ; }
head源码 head既处理入站,也处理出站,因此两个处理器都实现
1 2 final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler , ChannelInboundHandler {}
查看io.netty.channel.DefaultChannelPipeline.HeadContext#write
方法,往下跟,找到如下代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public final void write (Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this .outboundBuffer; if (outboundBuffer == null ) { safeSetFailure(promise, newClosedChannelException(initialCloseCause)); ReferenceCountUtil.release(msg); return ; } int size; try { msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0 ) { size = 0 ; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return ; } outboundBuffer.addMessage(msg, size, promise); }
3.6.5 深拷贝 会将底层内存数据进行深拷贝,因此无论读写,都与原始 ByteBuf 无关
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class TestCopy { public static void main (String[] args) { ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5 ); buf1.writeBytes(new byte []{ 1 , 2 , 3 , 4 , 5 }); ByteBuf buf2 = buf1.copy(); buf2.setByte(0 , 10 ); log(buf1, true ); log(buf2, true ); } }
3.6.6 零拷贝 零拷贝不进行内存复制,使用原有内存,其相关应用分为两大类
分 合 切片slice 【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read 和 write 指针
注意:
切片后的ByteBuf底层是SlicedByteBuf,再写内容,对原始数据有影响,因此,SlicedByteBuf禁止写入。
同样地,原内容发生变化,SlicedByteBuf也受到影响
所以,实际编写代码时,需要添加引用和手动释放
验证并没有发生数据复制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class TestSlice { public static void main (String[] args) { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10 ); buf.writeBytes(new byte []{ 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 }); log(buf, true ); ByteBuf buf1 = buf.slice(0 , 3 ); buf1.retain(); ByteBuf buf2 = buf.slice(3 , 3 ); buf2.retain(); ByteBuf buf3 = buf.slice(6 , 4 ); buf3.retain(); log(buf1, true ); log(buf2, true ); log(buf3, true ); buf1.setByte(0 , 10 ); log(buf1, true ); log(buf, true ); System.out.println("引用次数" +buf.refCnt()); } }
浅拷贝duplicate 【零拷贝】的体现之一,就好比截取了原始 ByteBuf 所有内容,并且没有 max capacity 的限制,也是与原始 ByteBuf 使用同一块底层内存,只是读写指针是独立的
实际编写代码时,需要添加引用和手动释放
组合compositeBuffer 【零拷贝】的体现之一,可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class TestComposite { public static void main (String[] args) { ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5 ); buf1.writeBytes(new byte []{ 1 , 2 , 3 , 4 , 5 }); ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5 ); buf2.writeBytes(new byte []{ 6 , 7 , 8 , 9 , 10 }); CompositeByteBuf buf = ByteBufAllocator.DEFAULT.compositeBuffer(10 ); buf.addComponents(true , buf1, buf2); buf1.retain(); buf2.retain(); log(buf, true ); buf1.setByte(0 , 10 ); log(buf, true ); System.out.println(buf1.refCnt()); System.out.println(buf2.refCnt()); } }
实际编写代码时,需要添加引用和手动释放
Unpooled 是一个工具类,类如其名,提供了非池化的 ByteBuf 创建、组合、复制等操作
这里仅介绍其跟【零拷贝】相关的 wrappedBuffer 方法(底层即CompositeByteBuf),可以用来包装 ByteBuf
1 2 3 4 5 6 7 8 ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5 ); buf1.writeBytes(new byte []{1 , 2 , 3 , 4 , 5 }); ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5 ); buf2.writeBytes(new byte []{6 , 7 , 8 , 9 , 10 }); ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2); System.out.println(ByteBufUtil.prettyHexDump(buf3));
输出
1 2 3 4 5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 01 02 03 04 05 06 07 08 09 0 a |.......... | +--------+-------------------------------------------------+----------------+
也可以用来包装普通字节数组,底层也不会有拷贝操作
1 2 3 ByteBuf buf4 = Unpooled.wrappedBuffer(new byte []{1 , 2 , 3 }, new byte []{4 , 5 , 6 }); System.out.println(buf4.getClass()); System.out.println(ByteBufUtil.prettyHexDump(buf4));
输出
1 2 3 4 5 6 class io.netty.buffer.CompositeByteBuf +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+ -------------------------------------------------+----------------+ |00000000| 01 02 03 04 05 06 |...... | +--------+ -------------------------------------------------+----------------+