BIO & NIO & AIO
1、前言
根据操作系统相关知识:为了保证操作系统的稳定性和安全性,一个进程的地址空间划分为 用户空间(User space) 和 内核空间(Kernel space )。
我们平常运行的程序都是运行在用户空间,只有内核空间才能进行系统态资源的调度工作,比如文件读写、进程通信、内存管理等,也就是说,我们想要进行 IO 操作,一定是要依赖内核空间的能力!
总之,Java中的IO都是依赖操作系统内核进行的,我们程序中的IO读写其实调用的是操作系统内核中的read&write两大系统调用!
那么,内核是如何进行IO交互的呢?
- 网卡收到经过网线传来的网络数据,并将网络数据写到内存中
- 当网卡把数据写入到内存后,网卡向cpu发出一个中断信号,操作系统便能得知有新数据到来,再通过网卡中断程序去处理数据
- 将内存中的网络数据写入到对应socket的接收缓冲区中
- 当接收缓冲区的数据写好之后,应用程序开始进行数据处理
2、Java 中 3 种常见 IO 模型:BIO、NIO、AIO
BIO 方式适用于
连接数比较小且固定
的架构,这种方式对服务器资源要求比较高,编程比较简单,JDK1.4 之前的唯一选择。NIO 方式适用于
连接数目多且连接比较短
的架构,比如聊天服务器,弹幕系统,服务器间通讯等,编程比较复杂,JDK1.4 开始支持。AIO 方式适用于
连接数目多且连接比较长
的架构,比如相册服务器,充分调用 OS 参与并发操作,编程比较复杂,JDK7 开始支持。
2.1 BIO (Blocking I/O)
BIO,即同步阻塞 IO!
服务器实现模式为一个连接创建一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果客户端不作任何事情就会造成这个线程的白白浪费,只能等待客户端断开链接,这个线程占用的系统资源才会释放!
同步阻塞 IO 模型中,应用程序发起 read 调用后,会一直阻塞,直到内核把数据拷贝到用户空间。
演示代码
- 服务端
public class Server {
public static void main(String[] args) throws IOException {
//创建线程池
ExecutorService executorService = Executors.newCachedThreadPool();
//创建serverSocket
ServerSocket serverSocket = new ServerSocket(6666);
while (true) {
System.out.println("等待连接中...");
//监听,等待客户端连接
Socket socket = serverSocket.accept();
System.out.println("连接到一个客户端");
executorService.execute(() -> handler(socket));
}
}
//编写一个handler方法,和客户端通讯
public static void handler(Socket socket) {
byte[] bytes = new byte[1024];
System.out.println("当前线程信息: " + Thread.currentThread().getName());
try {
//通过socket获取输入流
InputStream inputStream = socket.getInputStream();
//循环读取客户端发送的数据
while (inputStream.read(bytes) != -1) {
System.out.println(Thread.currentThread().getName()+ " : 发送信息为 :"+ new String(bytes, 0, bytes.length));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
System.out.println("关闭连接");
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
- 客户端
telnet 127.0.0.1 6666
- 运行结果
总结: 当客户端连接数量不高的情况下是没问题的。但是,当面对十万甚至百万级连接的时候,BIO 模型无能为力! 因此,我们需要一种更高效的 I/O 处理模型来应对更高的并发量!
2.2 NIO (Non-blocking I/O)
NIO,即同步非阻塞 IO !
Java 中的 NIO 于 Java 1.4 中引入,对应 java.nio
包,提供了 Channel
, Selector
,Buffer
等抽象。 对于高负载、高并发的(网络)应用,应使用 NIO !
当客户端发起io请求后,服务端立即返回(非阻塞io)。同步指的是,必须等待IO缓冲区内的数据就绪服务端才会为其服务;而非阻塞指的是,服务端线程不原地等待IO缓冲区,可以先做一些其他操作,但是要定时轮询检查IO缓冲区中的数据是否就绪。Java中的NIO 也有new IO的意思,是因为NIO使用了IO多路复用技术! 普通的NIO需要线程定时轮询查看一个IO缓冲区是否就绪,而Java中的new IO指的是线程轮询查看一堆IO缓冲区中哪些就绪,这是一种IO多路复用的思想!并且,IO多路复用模型中,将轮询检查IO数据是否就绪的任务,交给了系统级别的select或epoll模型,由系统进行监控,减轻服务端用户线程负担!
先来看看 同步非阻塞 IO 模型。
同步非阻塞 IO 模型中,服务端线程会一直发起 read 调用,等待数据从内核空间拷贝到用户空间的这段时间里,线程依然是阻塞的,直到在内核把数据拷贝到用户空间。
相比于同步阻塞 IO 模型,同步非阻塞 IO 模型确实有了很大改进,通过轮询操作,避免了一直阻塞!
但是,这种 IO 模型同样存在问题:服务端不断进行 I/O 系统调用,轮询数据是否已经准备好的过程是十分消耗 CPU 资源的!
这个时候,I/O 多路复用技术 就上场了!
IO 多路复用技术中,服务端线程首先发起 select 调用,询问内核数据是否准备就绪,等内核把数据准备好了,用户线程再发起 read 调用。read 调用的过程(数据从内核空间 -> 用户空间)还是阻塞的。
目前支持 IO 多路复用的系统调用,有 select,epoll 等。select 系统调用,目前几乎在所有的操作系统上都有支持!
- select :内核提供的系统调用,它支持一次查询多个内核数据缓冲区的可用状态,几乎所有的操作系统都支持。
- epoll :linux 2.6 内核,属于 select 调用的增强版本,优化了 IO 的执行效率。
IO 多路复用模型,通过减少无效的系统调用,减少了对 CPU 资源的消耗!
Java 中的 NIO ,有一个非常重要的选择器 ( Selector ) 的概念,也可以被称为 多路复用器。通过它,只需要一个线程便可以管理多个客户端连接。当客户端数据到了之后,才会为其服务!
说明:
- 每个 Channel 对应一个 Buffer
- Selector 对应一个线程,一个线程对应多个 Channel
- 该图反应了有三个 Channel 注册到该 Selector
- 程序切换到那个 Channel 是由
事件
决定的(Event) - Selector 会根据不同的事件,在各个通道上切换
- Buffer 就是一个内存块,底层是有一个数组。
- 数据的读取和写入是通过 Buffer,但是需要
flip()
切换读写模式;而 BIO 是单向的,要么输入流,要么输出流
演示代码
- 服务端
public class Server {
public static void main(String[] args) throws IOException {
//创建serverSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//绑定端口
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
//设置为非阻塞
serverSocketChannel.configureBlocking(false);
//得到Selector对象
try (Selector selector = Selector.open()) {
//把ServerSocketChannel注册到selector,事件为OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//如果返回的>0,表示已经获取到关注的事件
while (selector.select() > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
//获得到一个事件
SelectionKey next = iterator.next();
//如果是OP_ACCEPT,表示有新的客户端连接
if (next.isAcceptable()) {
//给该客户端生成一个SocketChannel
SocketChannel accept = serverSocketChannel.accept();
accept.configureBlocking(false);
//将当前的socketChannel注册到selector,关注事件为读事件,同时给socket Channel关联一个buffer
accept.register(selector, SelectionKey.OP_READ,ByteBuffer.allocate(1024));
System.out.println("获取到一个客户端连接");
//如果是读事件
} else if (next.isReadable()) {
//通过key 反向获取到对应的channel
SocketChannel channel = (SocketChannel) next.channel();
//获取到该channel关联的buffer
ByteBuffer buffer = (ByteBuffer) next.attachment();
while (channel.read(buffer) != -1) {
buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.limit()));
buffer.clear();
}
}
iterator.remove();
}
}
}
}
}
- 客户端
public class Client {
public static void main(String[] args) throws IOException {
//得到一个网络通道
SocketChannel socketChannel = SocketChannel.open();
//设置为非阻塞
socketChannel.configureBlocking(false);
//提供服务器端的IP和端口
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
//连接服务器
if (!socketChannel.connect(inetSocketAddress)) {
while (!socketChannel.finishConnect()) {
System.out.println("连接需要时间,客户端不会阻塞...先去吃个宵夜");
}
}
//连接成功,发送数据
String str = "hello,Java菜鸟程序员";
ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());
socketChannel.write(byteBuffer);
socketChannel.close();
System.out.println("客户端退出");
}
}
- 运行结果
总结: NIO主要由buffer、channel、selector三种技术的整合!通过零拷贝的buffer取得数据,每一个客户端通过channel在selector(多路复用器)上进行注册。服务端通过selector不断轮询channel来获取客户端的状态。channel上有 connect(链接)、accept(阻塞)、read(可读)、write(可写) 四种状态标识,selector根据标识来进行后续操作。所以一个服务端可接收无限多的channel,不需要新开一个线程,大大提升了性能!
2.3 AIO (Asynchronous I/O)
NIO,即异步非阻塞 IO!
异步 IO 是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作。
AIO是真正意义上的异步非阻塞IO模型。 上述NIO实现中,需要用户线程定时轮询,去检查IO缓冲区数据是否就绪,占用应用程序线程资源,其实轮询相当于还是阻塞的,并非真正解放当前线程,因为它还是需要去查询哪些IO就绪。而真正的理想的异步非阻塞IO应该让内核系统完成,用户线程只需要告诉内核,当缓冲区就绪后,通知我或者执行我交给你的回调函数。
代码演示
- 服务端
public class Server {
public static void main(String[] args) {
try {
final int port = 5555;
//首先打开一个ServerSocket通道并获取AsynchronousServerSocketChannel实例:
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
//绑定需要监听的端口到serverSocketChannel:
serverSocketChannel.bind(new InetSocketAddress(port));
//实现一个CompletionHandler回调接口handler,
//之后需要在handler的实现中处理连接请求和监听下一个连接、数据收发,以及通信异常。
CompletionHandler<AsynchronousSocketChannel, Object> handler = new CompletionHandler<AsynchronousSocketChannel,
Object>() {
@Override
public void completed(final AsynchronousSocketChannel result, final Object attachment) {
// 继续监听下一个连接请求
serverSocketChannel.accept(attachment, this);
try {
System.out.println("接受了一个连接:" + result.getRemoteAddress()
.toString());
// 给客户端发送数据并等待发送完成
result.write(ByteBuffer.wrap("From Server: Hello, i am server".getBytes()))
.get();
ByteBuffer readBuffer = ByteBuffer.allocate(128);
// 阻塞等待客户端接收数据
result.read(readBuffer)
.get();
System.out.println(new String(readBuffer.array()));
} catch (IOException | InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
@Override
public void failed(final Throwable exc, final Object attachment) {
System.out.println("出错了:" + exc.getMessage());
}
};
serverSocketChannel.accept(null, handler);
// 由于serverSocketChannel.accept(null, handler);是一个异步方法,调用会直接返回,
// 为了让子线程能够有时间处理监听客户端的连接会话,
// 这里通过让主线程休眠一段时间(当然实际开发一般不会这么做)以确保应用程序不会立即退出。
TimeUnit.MINUTES.sleep(Integer.MAX_VALUE);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
- 客户端
public class Client {
public static void main(String[] args) {
try {
// 打开一个SocketChannel通道并获取AsynchronousSocketChannel实例
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
// 连接到服务器并处理连接结果
client.connect(new InetSocketAddress("127.0.0.1", 5555), null, new CompletionHandler<Void, Void>() {
@Override
public void completed(final Void result, final Void attachment) {
System.out.println("成功连接到服务器!");
try {
// 给服务器发送信息并等待发送完成
client.write(ByteBuffer.wrap("From client: Hello, i am client".getBytes()))
.get();
ByteBuffer readBuffer = ByteBuffer.allocate(128);
// 阻塞等待接收服务端数据
client.read(readBuffer)
.get();
System.out.println(new String(readBuffer.array()));
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
@Override
public void failed(final Throwable exc, final Void attachment) {
exc.printStackTrace();
}
});
TimeUnit.MINUTES.sleep(Integer.MAX_VALUE);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
- 运行结果
总结:目前来说 AIO 的应用还不是很广泛! 虽然AIO可以做到真正的异步的操作,但实现起来比较复杂,支持纯异步IO的操作系统非常少,目前也就windows是IOCP技术实现了,而在Linux上,底层还是是使用的epoll实现的。Netty 之前也尝试使用过 AIO,不过又放弃了。这是因为,Netty 使用了 AIO 之后,在 Linux 系统上的性能并没有多少提升!