select,poll,epoll理论与实战
IO多路复用是什么?
- IO:在操作系统中,数据在用户态和内核态之间的读写操作
- 多路:多个客户端TCP连接
- 复用:一个或者多个线程资源、
BIO、NIO、多路复用IO是什么?
要了解IO多路复用那么我们得先知道BIO、NIO、AIO是什么。
BIO阻塞IO
BIO是一个传统的IO模型。我们调用Socket accept = socket.accept();
之后就会一直等待客户端的输入,之后的连接都会阻塞。
BIO 实战
public class BIOTest {
public static void main(String[] args) throws IOException {
//服务端监听9000端口
ServerSocket socket = new ServerSocket(9000);
while(true){
System.out.println("等待连接");
Socket accept = socket.accept();
System.out.println("有客户端连接了");
handler(accept);
}
}
private static void handler(Socket accept) {
while(true){
byte buffer[] = new byte[1024];
System.out.println("ready reading");
int read = 0;
try {
read = accept.getInputStream().read(buffer);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("read finish");
if(read!=-1){
System.out.println(new String(buffer,0,read));
}
}
}
}
先启动我们的项目,再我们打开网络调试助手,连接9000端口。
可以看到此时有客户端连接了,我们可以发消息了
BIO的缺点
阻塞的IO模型,如果没有连接的客户端一直不发送消息,则会一直阻塞,影响之后的客户端连接。对于优化,我们可以用多个线程去处理,每一个线程监听一个客户端,大家可以自行去尝试哦。
NIO 同步非阻塞
因为BIO的种种不足,我们引入了NIO,同步非阻塞IO模型。
public class NIOTest {
private static List list = new ArrayList();
public static void main(String[] args) throws IOException {
//新建连接并且绑定127.0.0.1:9000
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(9000));
//设定为非阻塞
serverSocket.configureBlocking(false);
System.out.println("服务启动成功");
while (true){
SocketChannel accept = serverSocket.accept();
//配置为非阻塞
if(accept!=null){
System.out.println("连接成功");
accept.configureBlocking(false);
//将已经连接的加入到channel集合中
list.add(accept);
}
//每一次都要去找是哪一个channel发送了消息
list.forEach(socketChannel -> {
ByteBuffer buffer = ByteBuffer.allocate(128);
try {
long read = socketChannel.read(buffer);
if (read>0){
System.out.println(new String(buffer.array()));
}else if(read ==-1){
list.remove(socketChannel);
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
}
可以看到能同时连接多个请求。
NIO对于BIO做了很大的改进.有以下几点
- 采取单线程,将
socket
存入list,每次遍历就绪的socket
- 解决了BIO单线程阻塞的问题
缺点我们还是要每一次都轮询channel检查,会产生上下文切换很浪费资源。
在真实NIO中,并不会在Java层上来进行一个轮询,而是将轮询的这个步骤交给我们的操作系统来进行,他将轮询的那部分代码改为操作系统级别的系统调用(select函数,在linux环境中为epoll),在操作系统级别上调用select函数,主动地去感知有数据的socket。
BIO,NIO 总结
BIO是同步阻塞IO,NIO是同步非阻塞IO。NIO针对BIO进行了改进,不再阻塞等待客户端的数据,而是采取轮询遍历的方式去感知已经就绪的客户端。
IO多路复用
在Java中的Selector和LInux的epoll都是基于IO多路复用的。接来下我们讲一讲select、poll,epoll。
Select
基本原理
采用轮询加遍历的方式,先将用户态的fd拷贝到内核态,通过selector.select()
阻塞服务器,当有就绪的fd,就会返回。
cpu是怎么知道我们有数据来了呢?客户端通过网卡发送数据给我们的服务端,服务器网卡会将这个数据写入到指定内存中,然后通过中断信号告诉cpu有数据到达
缺点:每次select调用都需要拷贝fd到内核态。而且fd的数量有限制1024,返回的只是就绪fd的数量
poll
改进了用数组存储fd,改为链表
epoll
用户态fd拷贝到内核态只需要一次。而且fd数量没有限制,返回的是就绪的fd,不需要再遍历查找
epoll_create 函数
- 会创建一个红黑树的结构,返回的就是epoll的文件描述符
epoll_ctl 函数
- 事件注册
- 函数的主要功能就是将我们这一次想要监听的fd和事件(比如读,写,异常)注册到epoll
epoll_wait 函数
- 获取就绪事件,返回就绪fd的数量,将就绪的fd拷贝到用户空间,
- 如果是ET模式,会将就绪队列中的fd从链表中移除
- LT移除之后,会判断fd中是否有数据未读,有的话会重新将fd添加回就绪链表
事件通知机制
-
LT:当FD可读时,会重复通知多次,是默认模式(每一次调用epoll_wait都会通知你就绪)
- 重复通知效率有影响。
- 惊群现象:通俗一点就是多个进程监听了一个fd,并且调用了epoll_wait看是否有就绪的fd。而fd会重复唤醒他们
-
ET:当FD有数据可读,只会通知一次。
- 发现还有数据未读完,可以手动的将fd添加回红黑树
epoll_ctl
- 使用非阻塞IO一直读取
- 发现还有数据未读完,可以手动的将fd添加回红黑树
整体流程
- 首先就是通过epoll_create 创建一个eventpoll的数据结构。
- 每次调用epoll_ctl 的时候将fd封装成epitem加入到红黑树。
- 调用epoll_wait的时候会判断是否就绪队列有fd,有直接返回就绪的fd。没有的话会阻塞。当我们客户端通过网卡将数据发送到服务端的时候,服务端处理完数据会给cpu发出一个中断信息,cpu收到中断信号会根据数据找到对应的ip,端口,socket,然后根据socket找到回调函数,回调函数就会将我们当前的socket添加到就绪队列里面去,阻塞进程发现就绪队列里面有fd了就会返回。
代码实战
public class NIOEpollTest {
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(9000));
serverSocket.configureBlocking(false);
//创建多路复用器
Selector selector = Selector.open();
//注册多路复用器到serverSocket
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器启动");
while (true){
//阻塞等待需要发生的事件
selector.select();
System.out.println("有客户端连接");
Set selectionKeys = selector.selectedKeys();
selectionKeys.forEach(selectionKey -> {
if (selectionKey.isAcceptable()) {
try {
System.out.println("是连接请求");
//获取对应的连接服务
ServerSocketChannel server = (ServerSocketChannel)selectionKey.channel();
SocketChannel socketChannel = server.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ);
socketChannel.register(selector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}else if(selectionKey.isReadable()){
System.out.println("是IO请求");
SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
int read = 0;
try {
read = socketChannel.read(buffer);
if(read>0){
System.out.println(new String(buffer.array()));
}else if(read == -1){
System.out.println("客户端断开连接啦");
socketChannel.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
selectionKeys.remove(selectionKey);
});
}
}
}