BIO,NIO,AIO精讲
IO模型 IO模型就是说用什么样的通道进行数据的发送和接收,Java共支持3种网络编程IO模式:BIO,NIO,AIO
BIO(Blocking IO) 同步阻塞模型,一个客户端连接对应一个处理线程
BIO代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class SocketServer { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(9000); while (true) { System.out.println("等待连接。。"); //阻塞方法 Socket clientSocket = serverSocket.accept(); System.out.println("有客户端连接了。。"); handler(clientSocket); /*new Thread(new Runnable() { @Override public void run() { try { handler(clientSocket); } catch (IOException e) { e.printStackTrace(); } } }).start();*/ } } private static void handler(Socket clientSocket) throws IOException { byte[] bytes = new byte[1024]; System.out.println("准备read。。"); //接收客户端的数据,阻塞方法,没有数据可读时就阻塞 int read = clientSocket.getInputStream().read(bytes); System.out.println("read完毕。。"); if (read != -1) { System.out.println("接收到客户端的数据:" + new String(bytes, 0, read)); } clientSocket.getOutputStream().write("HelloClient".getBytes()); clientSocket.getOutputStream().flush(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 //客户端代码 public class SocketClient { public static void main(String[] args) throws IOException { Socket socket = new Socket("localhost", 9000); //向服务端发送数据 socket.getOutputStream().write("HelloServer".getBytes()); socket.getOutputStream().flush(); System.out.println("向服务端发送数据结束"); byte[] bytes = new byte[1024]; //接收服务端回传的数据 socket.getInputStream().read(bytes); System.out.println("接收到服务端的数据:" + new String(bytes)); socket.close(); } }
缺点:
1、IO代码里read操作是阻塞操作,如果连接不做数据读写操作会导致线程阻塞,浪费资源
2、如果线程很多,会导致服务器线程太多,压力太大,比如C10K问题
应用场景:
BIO 方式适用于连接数目比较小且固定的架构, 这种方式对服务器资源要求比较高, 但程序简单易理解。
NIO(Non Blocking IO) 同步非阻塞,服务器实现模式为一个线程可以处理多个请求(连接)**,客户端发送的连接请求都会注册到 多路复用器selector**上,多路复用器轮询到连接有IO请求就进行处理,JDK1.4开始引入。
应用场景:
NIO方式适用于连接数目多且连接比较短(轻操作) 的架构, 比如聊天服务器, 弹幕系统, 服务器间通讯,编程比较复杂
NIO非阻塞代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class NioServer { // 保存客户端连接 static List<SocketChannel> channelList = new ArrayList<>(); public static void main(String[] args) throws IOException, InterruptedException { // 创建NIO ServerSocketChannel,与BIO的serverSocket类似 ServerSocketChannel serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(9000)); // 设置ServerSocketChannel为非阻塞 serverSocket.configureBlocking(false); System.out.println("服务启动成功"); while (true) { // 非阻塞模式accept方法不会阻塞,否则会阻塞 // NIO的非阻塞是由操作系统内部实现的,底层调用了linux内核的accept函数 SocketChannel socketChannel = serverSocket.accept(); if (socketChannel != null) { // 如果有客户端进行连接 System.out.println("连接成功"); // 设置SocketChannel为非阻塞 socketChannel.configureBlocking(false); // 保存客户端连接在List中 channelList.add(socketChannel); } // 遍历连接进行数据读取 Iterator<SocketChannel> iterator = channelList.iterator(); while (iterator.hasNext()) { SocketChannel sc = iterator.next(); ByteBuffer byteBuffer = ByteBuffer.allocate(128); // 非阻塞模式read方法不会阻塞,否则会阻塞 int len = sc.read(byteBuffer); // 如果有数据,把数据打印出来 if (len > 0) { System.out.println("接收到消息:" + new String(byteBuffer.array())); } else if (len == -1) { // 如果客户端断开,把socket从集合中去掉 iterator.remove(); System.out.println("客户端断开连接"); } } } } }
总结:如果连接数太多的话,会有大量的无效遍历,假如有10000个连接,其中只有1000个连接有写数据,但是由于其他9000个连接并没有断开,我们还是要每次轮询遍历一万次,其中有十分之九的遍历都是无效的,这显然不是一个让人很满意的状态。
NIO引入多路复用器代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class NioSelectorServer { public static void main(String[] args) throws IOException, InterruptedException { // 创建NIO ServerSocketChannel ServerSocketChannel serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(9000)); // 设置ServerSocketChannel为非阻塞 serverSocket.configureBlocking(false); // 打开Selector处理Channel,即创建epoll Selector selector = Selector.open(); // 把ServerSocketChannel注册到selector上,并且selector对客户端accept连接操作感兴趣 serverSocket.register(selector, SelectionKey.OP_ACCEPT); System.out.println("服务启动成功"); while (true) { // 阻塞等待需要处理的事件发生 selector.select(); // 获取selector中注册的全部事件的 SelectionKey 实例 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); // 遍历SelectionKey对事件进行处理 while (iterator.hasNext()) { SelectionKey key = iterator.next(); // 如果是OP_ACCEPT事件,则进行连接获取和事件注册 if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = server.accept(); socketChannel.configureBlocking(false); // 这里只注册了读事件,如果需要给客户端发送数据可以注册写事件 socketChannel.register(selector, SelectionKey.OP_READ); System.out.println("客户端连接成功"); } else if (key.isReadable()) { // 如果是OP_READ事件,则进行读取和打印 SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(128); int len = socketChannel.read(byteBuffer); // 如果有数据,把数据打印出来 if (len > 0) { System.out.println("接收到消息:" + new String(byteBuffer.array())); } else if (len == -1) { // 如果客户端断开连接,关闭Socket System.out.println("客户端断开连接"); socketChannel.close(); } } //从事件集合里删除本次处理的key,防止下次select重复处理 iterator.remove(); } } } }
NIO 有三大核心组件: Channel(通道), Buffer(缓冲区),Selector(多路复用器)
1、channel 类似于流,每个 channel 对应一个 buffer缓冲区,buffer 底层就是个数组
2、channel 会注册到 selector 上,由 selector 根据 channel 读写事件的发生将其交由某个空闲的线程处理
3、NIO 的 Buffer 和 channel 都是既可以读也可以写
NIO底层在JDK1.4版本是用linux的内核函数select()或poll()来实现,跟上面的NioServer代码类似,selector每次都会轮询所有的sockchannel看下哪个channel有读写事件,有的话就处理,没有就继续遍历,JDK1.5开始引入了epoll基于事件响应机制来优化NIO。
NioSelectorServer 代码里如下几个方法非常重要,我们从Hotspot与Linux内核函数级别来理解下
1 2 3 Selector.open() //创建多路复用器 socketChannel.register(selector, SelectionKey.OP_READ) //将channel注册到多路复用器上 selector.select() //阻塞等待需要处理的事件发生
总结:NIO整个调用流程就是Java调用了操作系统的内核函数来创建Socket,获取到Socket的文件描述符,再创建一个Selector对象,对应操作系统的Epoll描述符,将获取到的Socket连接的文件描述符的事件绑定到Selector对应的Epoll文件描述符上,进行事件的异步通知,这样就实现了使用一条线程,并且不需要太多的无效的遍历,将事件处理交给了操作系统内核(操作系统中断程序实现),大大提高了效率。
Epoll函数详解
1 int epoll_create(int size);
创建一个epoll实例,并返回一个非负数作为文件描述符,用于对epoll接口的所有后续调用。参数size代表可能会容纳size个描述符,但size不是一个最大值,只是提示操作系统它的数量级,现在这个参数基本上已经弃用了。
1 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
使用文件描述符epfd引用的epoll实例,对目标文件描述符fd执行op操作。
参数epfd表示epoll对应的文件描述符,参数fd表示socket对应的文件描述符。
参数op有以下几个值:
EPOLL_CTL_ADD:注册新的fd到epfd中,并关联事件event;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中移除fd,并且忽略掉绑定的event,这时event可以为null;
参数event是一个结构体
1 2 3 4 5 6 7 8 9 10 11 struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ }; typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t;
events有很多可选值,这里只举例最常见的几个:
EPOLLIN :表示对应的文件描述符是可读的;
EPOLLOUT:表示对应的文件描述符是可写的;
EPOLLERR:表示对应的文件描述符发生了错误;
成功则返回0,失败返回-1
1 int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
等待文件描述符epfd上的事件。
epfd是Epoll对应的文件描述符,events表示调用者所有可用事件的集合,maxevents表示最多等到多少个事件就返回,timeout是超时时间。
I/O多路复用底层主要用的Linux 内核·函数(select,poll,epoll)来实现,windows不支持epoll实现,windows底层是基于winsock2的select函数实现的(不开源)
Redis线程模型
Redis就是典型的基于epoll的NIO线程模型(nginx也是),epoll实例收集所有事件(连接与读写事件),由一个服务端线程连续处理所有事件命令。
Redis底层关于epoll的源码实现在redis的src源码目录的ae_epoll.c文件里
AIO(NIO 2.0) 异步非阻塞, 由操作系统完成后回调通知服务端程序启动线程去处理, 一般适用于连接数较多且连接时间较长的应用
应用场景:
AIO方式适用于连接数目多且连接比较长(重操作)的架构,JDK7 开始支持
AIO代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class AIOServer { public static void main(String[] args) throws Exception { final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(9000)); serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { @Override public void completed(AsynchronousSocketChannel socketChannel, Object attachment) { try { System.out.println("2--"+Thread.currentThread().getName()); // 再此接收客户端连接,如果不写这行代码后面的客户端连接连不上服务端 serverChannel.accept(attachment, this); System.out.println(socketChannel.getRemoteAddress()); ByteBuffer buffer = ByteBuffer.allocate(1024); socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { System.out.println("3--"+Thread.currentThread().getName()); buffer.flip(); System.out.println(new String(buffer.array(), 0, result)); socketChannel.write(ByteBuffer.wrap("HelloClient".getBytes())); } @Override public void failed(Throwable exc, ByteBuffer buffer) { exc.printStackTrace(); } }); } catch (IOException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, Object attachment) { exc.printStackTrace(); } }); System.out.println("1--"+Thread.currentThread().getName()); Thread.sleep(Integer.MAX_VALUE); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 public class AIOClient { public static void main(String... args) throws Exception { AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open(); socketChannel.connect(new InetSocketAddress("127.0.0.1", 9000)).get(); socketChannel.write(ByteBuffer.wrap("HelloServer".getBytes())); ByteBuffer buffer = ByteBuffer.allocate(512); Integer len = socketChannel.read(buffer).get(); if (len != -1) { System.out.println("客户端收到信息:" + new String(buffer.array(), 0, len)); } } }
BIO、 NIO、 AIO 对比:
为什么Netty使用NIO而不是AIO?
在Linux系统上,AIO的底层实现仍使用Epoll,没有很好实现AIO,因此在性能上没有明显的优势,而且被JDK封装了一层不容易深度优化,Linux上AIO还不够成熟。Netty是异步非阻塞 框架,Netty在NIO上做了很多异步的封装。
Netty 核心功能 Netty通讯示例 Netty的maven依赖:
1 2 3 4 5 <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.35.Final</version> </dependency>
服务端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 public class NettyServer { public static void main(String[] args) throws Exception { //创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍 // bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //创建服务器端的启动对象 ServerBootstrap bootstrap = new ServerBootstrap(); //使用链式编程来配置参数 bootstrap.group(bossGroup, workerGroup) //设置两个线程组 .channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作为服务器的通道实现 // 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。 // 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理 .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数 @Override protected void initChannel(SocketChannel ch) throws Exception { //对workerGroup的SocketChannel设置处理器 ch.pipeline().addLast(new NettyServerHandler()); } }); System.out.println("netty server start。。"); //绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况 //启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕 ChannelFuture cf = bootstrap.bind(9000).sync(); //给cf注册监听器,监听我们关心的事件 /*cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (cf.isSuccess()) { System.out.println("监听端口9000成功"); } else { System.out.println("监听端口9000失败"); } } });*/ //对通道关闭进行监听,closeFuture是异步操作,监听通道关闭 // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成 cf.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } /** * 自定义Handler需要继承netty规定好的某个HandlerAdapter(规范) */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * 读取客户端发送的数据 * * @param ctx 上下文对象, 含有通道channel,管道pipeline * @param msg 就是客户端发送的数据 * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("服务器读取线程 " + Thread.currentThread().getName()); //Channel channel = ctx.channel(); //ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站 //将 msg 转成一个 ByteBuf,类似NIO 的 ByteBuffer ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8)); } /** * 数据读取完毕处理方法 * * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("HelloClient", CharsetUtil.UTF_8); ctx.writeAndFlush(buf); } /** * 处理异常, 一般是需要关闭通道 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
客户端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public class NettyClient { public static void main(String[] args) throws Exception { //客户端需要一个事件循环组 EventLoopGroup group = new NioEventLoopGroup(); try { //创建客户端启动对象 //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap Bootstrap bootstrap = new Bootstrap(); //设置相关参数 bootstrap.group(group) //设置线程组 .channel(NioSocketChannel.class) // 使用 NioSocketChannel 作为客户端的通道实现 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { //加入处理器 channel.pipeline().addLast(new NettyClientHandler()); } }); System.out.println("netty client start"); //启动客户端去连接服务器端 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync(); //对关闭通道进行监听 channelFuture.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } } public class NettyClientHandler extends ChannelInboundHandlerAdapter { /** * 当客户端连接服务器完成就会触发该方法 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("HelloServer", CharsetUtil.UTF_8); ctx.writeAndFlush(buf); } //当通道有读取事件时会触发,即服务端发送数据给客户端 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("服务端的地址: " + ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
Netty线程模型 可以先理解下《Scalable IO in Java》这篇文章里说的一些IO处理模式,Netty的线程模型如下图所示:
模型解释:
Netty 抽象出两组线程池BossGroup和WorkerGroup,BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写
BossGroup和WorkerGroup类型都是NioEventLoopGroup
NioEventLoopGroup 相当于一个事件循环线程组 , 这个组中含有多个事件循环线程 , 每一个事件循环线程是NioEventLoop
每个NioEventLoop都有一个selector , 用于监听注册在其上的socketChannel的网络通讯
每个Boss NioEventLoop线程内部循环执行的步骤有 3 步
处理accept事件 , 与client 建立连接 , 生成 NioSocketChannel
将NioSocketChannel注册到某个worker NIOEventLoop上的selector
处理任务队列的任务 , 即runAllTasks
每个worker NIOEventLoop线程循环执行的步骤
轮询注册到自己selector上的所有NioSocketChannel 的read, write事件
处理 I/O 事件, 即read , write 事件, 在对应NioSocketChannel 处理业务
runAllTasks处理任务队列TaskQueue的任务 ,一些耗时的业务处理一般可以放入TaskQueue中慢慢处理,这样不影响数据在 pipeline 中的流动处理
每个worker NIOEventLoop处理NioSocketChannel业务时,会使用 pipeline (管道),管道中维护了很多 handler 处理器用来处理 channel 中的数据
Netty模块组件 【Bootstrap、ServerBootstrap】:
Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。
【Future、ChannelFuture】:
正如前面介绍,在 Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。
但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件。
【Channel】:
Netty 网络通信的组件,能够用于执行网络 I/O 操作。Channel 为用户提供:
1)当前网络连接的通道的状态(例如是否打开?是否已连接?)
2)网络连接的配置参数 (例如接收缓冲区大小)
3)提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成。
4)调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以 I/O 操作成功、失败或取消时回调通知调用方。
5)支持关联 I/O 操作与对应的处理程序。
不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应。
下面是一些常用的 Channel 类型:
1 2 3 4 5 6 NioSocketChannel,异步的客户端 TCP Socket 连接。 NioServerSocketChannel,异步的服务器端 TCP Socket 连接。 NioDatagramChannel,异步的 UDP 连接。 NioSctpChannel,异步的客户端 Sctp 连接。 NioSctpServerChannel,异步的 Sctp 服务器端连接。 这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。
【Selector】:
Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。
当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select) 这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel 。
【NioEventLoop】:
NioEventLoop 中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用 NioEventLoop 的 run 方法,执行 I/O 任务和非 I/O 任务:
I/O 任务,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法触发。
非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks 方法触发。
【NioEventLoopGroup】:
NioEventLoopGroup,主要管理 eventLoop 的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个 Channel 上的事件,而一个 Channel 只对应于一个线程。
【ChannelHandler】:
ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。
ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类:
1 2 ChannelInboundHandler 用于处理入站 I/O 事件。 ChannelOutboundHandler 用于处理出站 I/O 操作。
或者使用以下适配器类:
1 2 ChannelInboundHandlerAdapter 用于处理入站 I/O 事件。 ChannelOutboundHandlerAdapter 用于处理出站 I/O 操作。
【ChannelHandlerContext】:
保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象。
【ChannelPipline】:
保存 ChannelHandler 的 List,用于处理或拦截 Channel 的入站事件和出站操作。
ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互。
在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下:
一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。
read事件(入站事件)和write事件(出站事件)在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰。
ByteBuf详解 从结构上来说,ByteBuf 由一串字节数组构成。数组中每个字节用来存放信息。
ByteBuf 提供了两个索引,一个用于读取数据,一个用于写入数据。这两个索引通过在字节数组中移动,来定位需要读或者写信息的位置。
当从 ByteBuf 读取时,它的 readerIndex(读索引)将会根据读取的字节数递增。
同样,当写 ByteBuf 时,它的 writerIndex 也会根据写入的字节数进行递增。
需要注意的是极限的情况是 readerIndex 刚好读到了 writerIndex 写入的地方。
如果 readerIndex 超过了 writerIndex 的时候,Netty 会抛出 IndexOutOf-BoundsException 异常。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.CharsetUtil; public class NettyByteBuf { public static void main(String[] args) { // 创建byteBuf对象,该对象内部包含一个字节数组byte[10] // 通过readerindex和writerIndex和capacity,将buffer分成三个区域 // 已经读取的区域:[0,readerindex) // 可读取的区域:[readerindex,writerIndex) // 可写的区域: [writerIndex,capacity) ByteBuf byteBuf = Unpooled.buffer(10); System.out.println("byteBuf=" + byteBuf); for (int i = 0; i < 8; i++) { byteBuf.writeByte(i); } System.out.println("byteBuf=" + byteBuf); for (int i = 0; i < 5; i++) { System.out.println(byteBuf.getByte(i)); } System.out.println("byteBuf=" + byteBuf); for (int i = 0; i < 5; i++) { System.out.println(byteBuf.readByte()); } System.out.println("byteBuf=" + byteBuf); //用Unpooled工具类创建ByteBuf ByteBuf byteBuf2 = Unpooled.copiedBuffer("hello,zhuge!", CharsetUtil.UTF_8); //使用相关的方法 if (byteBuf2.hasArray()) { byte[] content = byteBuf2.array(); //将 content 转成字符串 System.out.println(new String(content, CharsetUtil.UTF_8)); System.out.println("byteBuf2=" + byteBuf2); System.out.println(byteBuf2.getByte(0)); // 获取数组0这个位置的字符h的ascii码,h=104 int len = byteBuf2.readableBytes(); //可读的字节数 12 System.out.println("len=" + len); //使用for取出各个字节 for (int i = 0; i < len; i++) { System.out.println((char) byteBuf2.getByte(i)); } //范围读取 System.out.println(byteBuf2.getCharSequence(0, 6, CharsetUtil.UTF_8)); System.out.println(byteBuf2.getCharSequence(6, 6, CharsetUtil.UTF_8)); } } }
Netty编解码 Netty涉及到编解码的组件有Channel、ChannelHandler、ChannelPipe等,先大概了解下这几个组件的作用。
ChannelHandler
ChannelHandler充当了处理入站和出站数据的应用程序逻辑容器。例如,实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据随后会被你的应用程序的业务逻辑处理。当你要给连接的客户端发送响应时,也可以从ChannelInboundHandler冲刷数据。你的业务逻辑通常写在一个或者多个ChannelInboundHandler中。ChannelOutboundHandler原理一样,只不过它是用来处理出站数据的。
ChannelPipeline
ChannelPipeline提供了ChannelHandler链的容器。以客户端应用程序为例 ,如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站的 ,即客户端发送给服务端的数据会通过pipeline中的一系列ChannelOutboundHandler(ChannelOutboundHandler调用是从tail到head方向逐个调用每个handler的逻辑),并被这些Handler处理,反之则称为入站的, 入站只调用pipeline里的ChannelInboundHandler 逻辑(ChannelInboundHandler调用是从head到tail方向逐个调用每个handler的逻辑)。
编码解码器
当你通过Netty发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会被解码 :从字节转换为另一种格式(比如java对象);如果是出站消息,它会被编码成字节 。
Netty提供了一系列实用的编码解码器,他们都实现了ChannelInboundHadnler或者ChannelOutboundHandler接口。在这些类中,channelRead方法已经被重写了。以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,它将调用由已知解码器所提供的decode()方法进行解码,并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler。
Netty提供了很多编解码器,比如编解码字符串的StringEncoder和StringDecoder,编解码对象的ObjectEncoder和ObjectDecoder等。
如果要实现高效的编解码可以用protobuf,但是protobuf需要维护大量的proto文件比较麻烦,现在一般可以使用protostuff。
protostuff是一个基于protobuf实现的序列化方法,它较于protobuf最明显的好处是,在几乎不损耗性能的情况下做到了不用我们写.proto文件来实现序列化。使用它也非常简单,代码如下:
引入依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-api</artifactId> <version>1.0.10</version> </dependency> <dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-core</artifactId> <version>1.0.10</version> </dependency> <dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-runtime</artifactId> <version>1.0.10</version> </dependency>
protostuff使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 import com.dyuproject.protostuff.LinkedBuffer; import com.dyuproject.protostuff.ProtostuffIOUtil; import com.dyuproject.protostuff.Schema; import com.dyuproject.protostuff.runtime.RuntimeSchema; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * protostuff 序列化工具类,基于protobuf封装 */ public class ProtostuffUtil { private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>(); private static <T> Schema<T> getSchema(Class<T> clazz) { @SuppressWarnings("unchecked") Schema<T> schema = (Schema<T>) cachedSchema.get(clazz); if (schema == null) { schema = RuntimeSchema.getSchema(clazz); if (schema != null) { cachedSchema.put(clazz, schema); } } return schema; } /** * 序列化 * * @param obj * @return */ public static <T> byte[] serializer(T obj) { @SuppressWarnings("unchecked") Class<T> clazz = (Class<T>) obj.getClass(); LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); try { Schema<T> schema = getSchema(clazz); return ProtostuffIOUtil.toByteArray(obj, schema, buffer); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } finally { buffer.clear(); } } /** * 反序列化 * * @param data * @param clazz * @return */ public static <T> T deserializer(byte[] data, Class<T> clazz) { try { T obj = clazz.newInstance(); Schema<T> schema = getSchema(clazz); ProtostuffIOUtil.mergeFrom(data, obj, schema); return obj; } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } public static void main(String[] args) { byte[] userBytes = ProtostuffUtil.serializer(new User(1, "zhuge")); User user = ProtostuffUtil.deserializer(userBytes, User.class); System.out.println(user); } }
Netty粘包拆包 TCP是一个流协议,就是没有界限的一长串二进制数据。TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行数据包的划分,所以在业务上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。面向流的通信是无消息保护边界的。
如下图所示,client发了两个数据包D1和D2,但是server端可能会收到如下几种情况的数据。
解决方案
1)消息定长度,传输的数据大小固定长度,例如每段的长度固定为100字节,如果不够空位补空格
2)在数据包尾部添加特殊分隔符,比如下划线,中划线等,这种方法简单易行,但选择分隔符的时候一定要注意每条数据的内部一定不能出现分隔符。
3)发送长度:发送每条数据的时候,将数据的长度一并发送,比如可以选择每条数据的前4位是数据的长度,应用层处理时可以根据长度来判断每条数据的开始和结束。
Netty提供了多个解码器,可以进行分包的操作,如下:
LineBasedFrameDecoder (回车换行分包)
DelimiterBasedFrameDecoder(特殊分隔符分包)
FixedLengthFrameDecoder(固定长度报文来分包)
Netty心跳检测机制 所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性.
在 Netty 中, 实现心跳机制的关键是 IdleStateHandler, 看下它的构造器:
1 2 3 public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) { this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS); }
这里解释下三个参数的含义:
readerIdleTimeSeconds: 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件.
writerIdleTimeSeconds: 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件.
allIdleTimeSeconds: 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.
注:这三个参数默认的时间单位是秒。若需要指定其他时间单位,可以使用另一个构造方法:
1 IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)
要实现Netty服务端心跳检测机制需要在服务器端的ChannelInitializer中加入如下的代码:
1 pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
Netty心跳检测代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 //服务端代码 public class HeartBeatServer { public static void main(String[] args) throws Exception { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); //IdleStateHandler的readerIdleTime参数指定超过3秒还没收到客户端的连接, //会触发IdleStateEvent事件并且交给下一个handler处理,下一个handler必须 //实现userEventTriggered方法处理对应事件 pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS)); pipeline.addLast(new HeartBeatHandler()); } }); System.out.println("netty server start。。"); ChannelFuture future = bootstrap.bind(9000).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { worker.shutdownGracefully(); boss.shutdownGracefully(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 //服务端处理handler public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> { int readIdleTimes = 0; @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { System.out.println(" ====== > [server] message received : " + s); if ("Heartbeat Packet".equals(s)) { ctx.channel().writeAndFlush("ok"); } else { System.out.println(" 其他信息处理 ... "); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { IdleStateEvent event = (IdleStateEvent) evt; String eventType = null; switch (event.state()) { case READER_IDLE: eventType = "读空闲"; readIdleTimes++; // 读空闲的计数加1 break; case WRITER_IDLE: eventType = "写空闲"; // 不处理 break; case ALL_IDLE: eventType = "读写空闲"; // 不处理 break; } System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType); if (readIdleTimes > 3) { System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源"); ctx.channel().writeAndFlush("idle close"); ctx.channel().close(); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.err.println("=== " + ctx.channel().remoteAddress() + " is active ==="); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 //客户端代码 public class HeartBeatClient { public static void main(String[] args) throws Exception { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast(new HeartBeatClientHandler()); } }); System.out.println("netty client start。。"); Channel channel = bootstrap.connect("127.0.0.1", 9000).sync().channel(); String text = "Heartbeat Packet"; Random random = new Random(); while (channel.isActive()) { int num = random.nextInt(10); Thread.sleep(num * 1000); channel.writeAndFlush(text); } } catch (Exception e) { e.printStackTrace(); } finally { eventLoopGroup.shutdownGracefully(); } } static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(" client received :" + msg); if (msg != null && msg.equals("idle close")) { System.out.println(" 服务端关闭连接,客户端也关闭"); ctx.channel().closeFuture(); } } } }
Netty线程模型图
Netty线程模型源码剖析图
图链接:https://www.processon.com/view/link/5dee0943e4b079080a26c2ac
Netty高并发高性能架构设计精髓
主从Reactor线程模型
NIO多路复用非阻塞
无锁串行化 设计思想
支持高性能序列化协议
零拷贝(直接内存的使用)
ByteBuf内存池设计
灵活的TCP参数配置能力
并发优化
无锁串行化设计思想 在大多数场景下,并行多线程处理可以提升系统的并发性能。但是,如果对于共享资源的并发访问处理不当,会带来严重的锁竞争,这最终会导致性能的下降。为了尽可能的避免锁竞争带来的性能损耗,可以通过串行化设计,即消息的处理尽可能在同一个线程内完成,期间不进行线程切换,这样就避免了多线程竞争和同步锁。NIO的多路复用就是一种无锁串行化的设计思想(理解下Redis和Netty的线程模型)
为了尽可能提升性能,Netty采用了串行无锁化设计,在IO线程内部进行串行操作,避免多线程竞争导致的性能下降。表面上看,串行化设计似乎CPU利用率不高,并发程度不够。但是,通过调整NIO线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列-多个工作线程模型性能更优。
Netty的NioEventLoop读取到消息之后,直接调用ChannelPipeline的fireChannelRead(Object msg),只要用户不主动切换线程,一直会由NioEventLoop调用到用户的Handler,期间不进行线程切换,这种串行化处理方式避免了多线程操作导致的锁的竞争,从性能角度看是最优的。
直接内存 直接内存(Direct Memory)并不是虚拟机运行时数据区的一部分,也不是Java虚拟机规范中定义的内存区域,某些情况下这部分内存也会被频繁地使用,而且也可能导致OutOfMemoryError异常出现。Java里用DirectByteBuffer可以分配一块直接内存(堆外内存),元空间对应的内存也叫作直接内存,它们对应的都是机器的物理内存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 /** * 直接内存与堆内存的区别 */ public class DirectMemoryTest { public static void heapAccess() { long startTime = System.currentTimeMillis(); //分配堆内存 ByteBuffer buffer = ByteBuffer.allocate(1000); for (int i = 0; i < 100000; i++) { for (int j = 0; j < 200; j++) { buffer.putInt(j); } buffer.flip(); for (int j = 0; j < 200; j++) { buffer.getInt(); } buffer.clear(); } long endTime = System.currentTimeMillis(); System.out.println("堆内存访问:" + (endTime - startTime) + "ms"); } public static void directAccess() { long startTime = System.currentTimeMillis(); //分配直接内存 ByteBuffer buffer = ByteBuffer.allocateDirect(1000); for (int i = 0; i < 100000; i++) { for (int j = 0; j < 200; j++) { buffer.putInt(j); } buffer.flip(); for (int j = 0; j < 200; j++) { buffer.getInt(); } buffer.clear(); } long endTime = System.currentTimeMillis(); System.out.println("直接内存访问:" + (endTime - startTime) + "ms"); } public static void heapAllocate() { long startTime = System.currentTimeMillis(); for (int i = 0; i < 100000; i++) { ByteBuffer.allocate(100); } long endTime = System.currentTimeMillis(); System.out.println("堆内存申请:" + (endTime - startTime) + "ms"); } public static void directAllocate() { long startTime = System.currentTimeMillis(); for (int i = 0; i < 100000; i++) { ByteBuffer.allocateDirect(100); } long endTime = System.currentTimeMillis(); System.out.println("直接内存申请:" + (endTime - startTime) + "ms"); } public static void main(String args[]) { for (int i = 0; i < 10; i++) { heapAccess(); directAccess(); } System.out.println(); for (int i = 0; i < 10; i++) { heapAllocate(); directAllocate(); } } } 运行结果: 堆内存访问:44ms 直接内存访问:29ms 堆内存访问:33ms 直接内存访问:19ms 堆内存访问:55ms 直接内存访问:38ms 堆内存访问:39ms 直接内存访问:20ms 堆内存访问:38ms 直接内存访问:18ms 堆内存访问:36ms 直接内存访问:19ms 堆内存访问:34ms 直接内存访问:19ms 堆内存访问:40ms 直接内存访问:20ms 堆内存访问:37ms 直接内存访问:24ms 堆内存访问:59ms 直接内存访问:25ms 堆内存申请:11ms 直接内存申请:36ms 堆内存申请:13ms 直接内存申请:52ms 堆内存申请:62ms 直接内存申请:40ms 堆内存申请:2ms 直接内存申请:37ms 堆内存申请:1ms 直接内存申请:81ms 堆内存申请:2ms 直接内存申请:23ms 堆内存申请:1ms 直接内存申请:31ms 堆内存申请:2ms 直接内存申请:32ms 堆内存申请:7ms 直接内存申请:41ms 堆内存申请:8ms 直接内存申请:142ms
直接内存分配源码分析 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 public static ByteBuffer allocateDirect(int capacity) { return new DirectByteBuffer(capacity); } DirectByteBuffer(int cap) { // package-private super(-1, 0, cap, cap); boolean pa = VM.isDirectMemoryPageAligned(); int ps = Bits.pageSize(); long size = Math.max(1L, (long)cap + (pa ? ps : 0)); //判断是否有足够的直接内存空间分配,可通过-XX:MaxDirectMemorySize=<size>参数指定直接内存最大可分配空间,如果不指定默认为最大堆内存大小, //在分配直接内存时如果发现空间不够会显示调用System.gc()触发一次full gc回收掉一部分无用的直接内存的引用对象,同时直接内存也会被释放掉 //如果释放完分配空间还是不够会抛出异常java.lang.OutOfMemoryError Bits.reserveMemory(size, cap); long base = 0; try { // 调用unsafe本地方法分配直接内存 base = unsafe.allocateMemory(size); } catch (OutOfMemoryError x) { // 分配失败,释放内存 Bits.unreserveMemory(size, cap); throw x; } unsafe.setMemory(base, size, (byte) 0); if (pa && (base % ps != 0)) { // Round up to page boundary address = base + ps - (base & (ps - 1)); } else { address = base; } // 使用Cleaner机制注册内存回收处理函数,当直接内存引用对象被GC清理掉时, // 会提前调用这里注册的释放直接内存的Deallocator线程对象的run方法 cleaner = Cleaner.create(this, new Deallocator(base, size, cap)); att = null; } // 申请一块本地内存。内存空间是未初始化的,其内容是无法预期的。 // 使用freeMemory释放内存,使用reallocateMemory修改内存大小 public native long allocateMemory(long bytes); // openjdk8/hotspot/src/share/vm/prims/unsafe.cpp UNSAFE_ENTRY(jlong, Unsafe_AllocateMemory(JNIEnv *env, jobject unsafe, jlong size)) UnsafeWrapper("Unsafe_AllocateMemory"); size_t sz = (size_t)size; if (sz != (julong)size || size < 0) { THROW_0(vmSymbols::java_lang_IllegalArgumentException()); } if (sz == 0) { return 0; } sz = round_to(sz, HeapWordSize); // 调用os::malloc申请内存,内部使用malloc这个C标准库的函数申请内存 void* x = os::malloc(sz, mtInternal); if (x == NULL) { THROW_0(vmSymbols::java_lang_OutOfMemoryError()); } //Copy::fill_to_words((HeapWord*)x, sz / HeapWordSize); return addr_to_java(x); UNSAFE_END
使用直接内存的优缺点:
优点:
不占用堆内存空间,减少了发生GC的可能
java虚拟机实现上,本地IO会直接操作直接内存(直接内存=>系统调用=>硬盘/网卡),而非直接内存则需要二次拷贝(堆内存=>直接内存=>系统调用=>硬盘/网卡)
缺点:
初始分配较慢
没有JVM直接帮助管理内存,容易发生内存溢出。为了避免一直没有FULL GC,最终导致直接内存把物理内存耗完。我们可以指定直接内存的最大值,通过-XX:MaxDirectMemorySize来指定,当达到阈值的时候,调用system.gc来进行一次FULL GC,间接把那些没有被使用的直接内存回收掉。
Netty零拷贝
Netty的接收和发送ByteBuf采用DIRECT BUFFERS,使用堆外直接内存 进行Socket读写,不需要进行字节缓冲区的二次拷贝。
如果使用传统的JVM堆内存(HEAP BUFFERS)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才能写入Socket中。JVM堆内存的数据是不能直接写入Socket中的。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。
可以看下netty的读写源码,比如read源码NioByteUnsafe.read()
ByteBuf内存池设计 随着JVM虚拟机和JIT即时编译技术的发展,对象的分配和回收是个非常轻量级的工作。但是对于缓冲区Buffer(相当于一个内存块),情况却稍有不同,特别是对于堆外直接内存的分配和回收,是一件耗时的操作。为了尽量重用缓冲区,Netty提供了基于ByteBuf内存池的缓冲区重用机制。需要的时候直接从池子里获取ByteBuf使用即可,使用完毕之后就重新放回到池子里去。下面我们一起看下Netty ByteBuf的实现:
可以看下netty的读写源码里面用到的ByteBuf内存池,比如read源码NioByteUnsafe.read()
继续看newDirectBuffer方法,我们发现它是一个抽象方法,由AbstractByteBufAllocator的子类负责具体实现,代码如下:
代码跳转到PooledByteBufAllocator的newDirectBuffer方法,从Cache中获取内存区域PoolArena,调用它的allocate方法进行内存分配:
PoolArena的allocate方法如下:
我们重点分析newByteBuf的实现,它同样是个抽象方法,由子类DirectArena和HeapArena来实现不同类型的缓冲区分配
我们这里使用的是直接内存,因此重点分析DirectArena的实现
最终执行了PooledUnsafeDirectByteBuf的newInstance方法,代码如下:
灵活的TCP参数配置能力 合理设置TCP参数在某些场景下对于性能的提升可以起到显著的效果,例如接收缓冲区SO_RCVBUF和发送缓冲区SO_SNDBUF。如果设置不当,对性能的影响是非常大的。通常建议值为128K或者256K。
Netty在启动辅助类ChannelOption中可以灵活的配置TCP参数,满足不同的用户场景。
handler的生命周期回调接口调用顺序 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 /** * 在channel的pipeline里如下handler:ch.pipeline().addLast(new LifeCycleInBoundHandler()); * handler的生命周期回调接口调用顺序: * handlerAdded -> channelRegistered -> channelActive -> channelRead -> channelReadComplete * -> channelInactive -> channelUnRegistered -> handlerRemoved * * handlerAdded: 新建立的连接会按照初始化策略,把handler添加到该channel的pipeline里面,也就是channel.pipeline.addLast(new LifeCycleInBoundHandler)执行完成后的回调; * channelRegistered: 当该连接分配到具体的worker线程后,该回调会被调用。 * channelActive:channel的准备工作已经完成,所有的pipeline添加完成,并分配到具体的线上上,说明该channel准备就绪,可以使用了。 * channelRead:客户端向服务端发来数据,每次都会回调此方法,表示有数据可读; * channelReadComplete:服务端每次读完一次完整的数据之后,回调该方法,表示数据读取完毕; * channelInactive:当连接断开时,该回调会被调用,说明这时候底层的TCP连接已经被断开了。 * channelUnRegistered: 对应channelRegistered,当连接关闭后,释放绑定的workder线程; * handlerRemoved: 对应handlerAdded,将handler从该channel的pipeline移除后的回调方法。 */ public class LifeCycleInBoundHandler extends ChannelInboundHandlerAdapter { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelRegistered: channel注册到NioEventLoop"); super.channelRegistered(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelUnregistered: channel取消和NioEventLoop的绑定"); super.channelUnregistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive: channel准备就绪"); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelInactive: channel被关闭"); super.channelInactive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("channelRead: channel中有可读的数据" ); super.channelRead(ctx, msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("channelReadComplete: channel读数据完成"); super.channelReadComplete(ctx); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerAdded: handler被添加到channel的pipeline"); super.handlerAdded(ctx); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerRemoved: handler从channel的pipeline中移除"); super.handlerRemoved(ctx); } }
参考链接
参考链接 参考链接