前言
Netty 作为 java 中最成功的网络框架,里面包含了很多的设计理念和方式。后续希望针对 Netty 学习下如何实现高性能、易扩展、使用简单的网络底层框架。我开始看 kafka 的还觉得很奇怪,为什么 kafka 不使用 netty 而是自己实现了一套底层网络。后面发现开始其实是使用了 netty 的,但是后面将他移除了。官方解释的原因是 jar 包冲突。这个问题在 quora 上还有人提问。希望自己在看完 netty 和 kafka 能够看到除了 jar 包冲突以外的更好的优化或者性能。本文不会对 Netty 的使用做一个解释,而是专注于 netty 中关于 epoll 的实现
Netty 的 epoll
java 在 NIO 中提供了 Selector 的类。底层封装了 linux 中的 epoll 或者 poll,windows 则是 select。一个小小的监听 8888 端口的 demo
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
public class SelectorDemo {
public static void main(String[] args) {
try {
// 创建一个 Selector
Selector selector = Selector.open();
// 创建一个 ServerSocketChannel 并绑定到指定端口
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.bind(new InetSocketAddress("localhost", 8888));
serverChannel.configureBlocking(false);
// 将 ServerSocketChannel 注册到 Selector,并监听 OP_ACCEPT 事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器已启动,正在监听端口 8888...");
while (true) {
// 阻塞直到有事件发生
int readyChannels = selector.select();
if (readyChannels == 0) {
continue;
}
// 获取已经准备就绪的事件集合
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.isAcceptable()) {
// 处理连接请求
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel clientChannel = serverSocketChannel.accept();
clientChannel.configureBlocking(false);
clientChannel.register(selector, SelectionKey.OP_READ);
System.out.println("收到来自 " + clientChannel.getRemoteAddress() + " 的连接");
} else if (key.isReadable()) {
// 处理读事件
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = clientChannel.read(buffer);
if (bytesRead == -1) {
// 客户端关闭了连接
System.out.println("客户端 " + clientChannel.getRemoteAddress() + " 关闭了连接");
clientChannel.close();
key.cancel();
} else if (bytesRead > 0) {
buffer.flip();
byte[] data = new byte[buffer.limit()];
buffer.get(data);
System.out.println("收到来自 " + clientChannel.getRemoteAddress() + " 的数据:" + new String(data));
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
demo 只展示了创建客户端的连接和读连接里的数据。也就是说,select 底层其实已经使用了 epoll 了,但是 netty 仍然实现了自己的 epoll。我没有找到具体为什么需要自己实现 epoll 的原因。但是 java 的 select 是不支持边缘触发的。这里在解释下边缘触发和水平触发:
- 边缘触发 仅会在数据到达的时候触发一次,需要应用程序一次性将所有的数据从通道中读取完,否则后续可能会错过事件通知和丢数据
- 水平触发 会在新数据到达的时候持续触发,也就是说如果当前的通道上还有没有被处理的数据,会一直触发通知。
从底层涉及来看的话,读写都是缓冲区的数据,也就是说如果采用边缘触发,只要当前感兴趣事件对应的缓冲区里面的数据发生变化就会触发,而不关心当前缓冲区是否有没有处理完毕的数据,而水平触发则是对应感兴趣的缓冲区数据是否为空,比如读缓冲区非空,就会一直触发读事件。
相比较而言,边缘触发和水平触发各有优劣(从读缓冲区来看):
- 实时性对比
- 边缘触发更能准确的获取到事件变化,实时性比较高,因为只有事件真正发生的时候才会触发。
- 水平触发无法准确感知到当前的事件是上次没有读完还是本次新的读取事件发生。
- 通知次数
- 通知次数肯定是边缘触发相对少很多,而且边缘触发没有水平触发的*惊群效应*
- 代码难度
- 相对而言,边缘触发可能出现没有及时处理完数据,比如某个连接写入了 2kb 的数据,在读取的过程中,如果出现了新的连接,此时缓冲区仍然处于可读状态,即没有新的事件发生,所以在边缘触发的场景下是不会触发事件的,导致该部分可能存在饥饿甚至假死的现象,如果当前的连接处理完,也可能存在立马被唤醒进行下一次操作的情况,导致其他的线程一直在旁边没有事情,一个线程非常忙的情况。但是水平触发则不会出现这种情况,他会新连接进来就唤醒所有等待的线程,然后资源抢占。相对而言,水平触发的编码难度较低些。
无论是边缘触发和水平触发都是先 accept 然后进行下一步处理,那么就可能存在 accept 惊群的概念。linux 中,使用了 prepare_to_wait_exclusive 避免了惊群,(1)当等待队列入口设置了 WQ_FLAG_EXCLUSEVE 标志,它被添加到等待队列的尾部;否则,添加到头部。(2)当 wake_up 被在一个等待队列上调用, 它在唤醒第一个有 WQ_FLAG_EXCLUSIVE 标志的进程后停止唤醒.但内核仍然每次唤醒所有的非独占等待。
Nginx 就是使用的边缘触发的模式,这样可以提高性能,在数据量小的情况下,可以更加的压榨机器性能,也减少了请求的重复处理。边缘触发模式也比较适合高并发的情况,因为状态机发送情况本来就快,去掉了额外的事件通知能够进一步的提高性能。类似的 demo
#include
#include
#include
#include
#include
#include
#include
#define MAX_EVENTS 10
#define PORT 8888
int main() {
// 创建套接字并设置为非阻塞模式
int listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (listen_fd < 0) {
perror("socket");
exit(EXIT_FAILURE);
}
// 绑定并监听套接字
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(PORT);
if (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
perror("bind");
exit(EXIT_FAILURE);
}
if (listen(listen_fd, 5) < 0) {
perror("listen");
exit(EXIT_FAILURE);
}
// 创建 epoll 实例并将监听套接字添加到集合中
int epoll_fd = epoll_create1(0);
if (epoll_fd < 0) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}
// 将新客户端套接字添加到 epoll 集合中,并使用边缘触发模式
struct epoll_event event;
event.events = EPOLLIN | EPOLLET;
event.data.fd = listen_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event) < 0) {
perror("epoll_ctl");
exit(EXIT_FAILURE);
}
struct epoll_event events[MAX_EVENTS];
char buffer[1024];
printf("服务器监听端口 %d...\n", PORT);
while (1) {
int num_events = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (num_events < 0) {
perror("epoll_wait");
break;
}
for (int i = 0; i < num_events; ++i) {
if (events[i].data.fd == listen_fd) {
// 新连接到达
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int client_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &client_addr_len);
if (client_fd < 0) {
perror("accept");
continue;
}
// 将新客户端套接字添加到 epoll 集合中
struct epoll_event event;
event.events = EPOLLIN | EPOLLET;
event.data.fd = client_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &event) 0) {
final long ioStartTime = System.nanoTime();
try {
if (processReady(events, strategy)) {
prevDeadlineNanos = NONE;
}
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
runAllTasks(0); // This will run the minimum number of tasks
}
} catch (Error e) {
throw (Error) e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
// 有删减
}
}
我对上面的 ioRatio 有一定的困惑,开始认为是必须保证按照这个比例执行 IO 和非 IO 的,后面感觉这个只是作为一个平衡而已,并不是强制性的。主要是为了平衡 IO 和非 IO 的处理时间,如果想要提高客户端连接即吞吐,估计需要将这个值调大。如果设置为 100,那么就不会根据时间进行一个 Io 和非 Io 的平衡。
上文还有一个 epollBusyWait0 ,这个方法的实现就是和一直循环查询是否有事件,如果有或者报错则返回:
do {
result = epoll_wait(efd, ev, len, 0);
if (result == 0) {
// Since we're always polling epoll_wait with no timeout,
// signal CPU that we're in a busy loop
cpu_relax();
}
if (result >= 0) {
return result;
}
} while((err = errno) == EINTR);
上文中删减了部分代码,也就是 case SelectStrategy.SELECT: 后面的处理逻辑:
private static final long AWAKE = -1L;
private static final long NONE = Long.MAX_VALUE;
// nextWakeupNanos is:
// AWAKE when EL is awake
// NONE when EL is waiting with no wakeup scheduled
// other value T when EL is waiting with wakeup scheduled at time T
private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
private boolean pendingWakeup;
if (pendingWakeup) {
// We are going to be immediately woken so no need to reset wakenUp
// or check for timerfd adjustment.
strategy = epollWaitTimeboxed();
if (strategy != 0) {
break;
}
// We timed out so assume that we missed the write event due to an
// abnormally failed syscall (the write itself or a prior epoll_wait)
logger.warn("Missed eventfd write (not seen after > 1 second)");
pendingWakeup = false;
if (hasTasks()) {
break;
}
// fall-through
}
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
if (curDeadlineNanos == prevDeadlineNanos) {
// No timer activity needed
strategy = epollWaitNoTimerChange();
} else {
// Timerfd needs to be re-armed or disarmed
prevDeadlineNanos = curDeadlineNanos;
strategy = epollWait(curDeadlineNanos);
}
}
} finally {
// Try get() first to avoid much more expensive CAS in the case we
// were woken via the wakeup() method (submitted task)
if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
pendingWakeup = true;
}
}
这里面涉及到了 nextWakeupNanos 和 pendingWakeUp。为什么需要这么判断下呢?该段代码是在#9586中添加的。之所以需要等待,是因为可能存在#9388。该问题的核心原因是 wakeup 方法被调用和 close 被调用可能是同时的,也就是说,可能 wakeup 正在写入 eventFd 的时候,close 执行了,close 执行会关闭 eventFd,如果此时这个文件描述符又被重用了,可能就会导致数据被写道其他的 channel 里,造成未知的问题,最开始的解决方案是新增一个 wakenup 的 volatile 类型的 int,然后原子的改变,如果当前有写入,则将其从 0 修改为 1 ,然后在进行唤醒操作,也就是写入 eventFd。核心就是等待写完后在关闭。
最后在 processReady 里面处理事件,最后执行的都是在 c 语言实现的 epoll 出来的数据。
if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) {
// Force flush of data as the epoll is writable again
unsafe.epollOutReady();
}
// Check EPOLLIN before EPOLLRDHUP to ensure all data is read before shutting down the input.
// See https://github.com/netty/netty/issues/4317.
//
// If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will
// try to read from the underlying file descriptor and so notify the user about the error.
if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) {
// The Channel is still open and there is something to read. Do it now.
unsafe.epollInReady();
}
// Check if EPOLLRDHUP was set, this will notify us for connection-reset in which case
// we may close the channel directly or try to read more data depending on the state of the
// Channel and als depending on the AbstractEpollChannel subtype.
if ((ev & Native.EPOLLRDHUP) != 0) {
unsafe.epollRdHupReady();
}
- EPOLLERR
- 在进行读取时,如果目标文件描述符已经被关闭或发生错误,会触发 EPOLLERR 事件。
- 在进行写入时,如果目标文件描述符已经被关闭或发生错误,会触发 EPOLLERR 事件。
- 如果出现特定类型的错误(例如连接重置、连接超时等),也可能会触发 EPOLLERR 事件
- EPOLLOUT
- 当文件描述符连接到远程主机并且可以发送数据时,会触发 EPOLLOUT 事件。
- 当文件描述符为非阻塞模式,并且在发送数据时不会阻塞时,会触发 EPOLLOUT 事件。
- 如果之前的写操作由于缓冲区已满而被暂停,并且现在文件描述符的缓冲区有足够的空间可以继续发送数据时,会触发 EPOLLOUT 事件
- EPOLLIN
- 当文件描述符接收到数据,并且数据可以被读取时,会触发 EPOLLIN 事件。
- 当文件描述符连接到远程主机并且可以接收数据时,会触发 EPOLLIN 事件。
- 当文件描述符处于非阻塞模式,并且有可用的数据可以被读取时,会触发 EPOLLIN 事件。
- EPOLLRDHUP
- 当连接的对端关闭了连接,即对端执行了关闭操作,会触发 EPOLLRDHUP 事件。
- 在使用非阻塞套接字时,如果连接的对端在发送 FIN 包或执行了 shutdown 操作,表示关闭连接,会触发 EPOLLRDHUP 事件。
- 当连接的对端进程崩溃或网络故障,导致连接关闭,会触发 EPOLLRDHUP 事件。
在 netty 初始化的时候会将 ServerBootstrapAcceptor 也放进去,这个 group 会将当前的 channel 注册到 childGroup 中,也就是主从 reactor 设计模式。Epoll 模式下的 accept 的 channel 是 EpollServerSocketChannel,最后会在 AbstractEpollServerChannel 的 unsafe 中执行的,这也是上面提到的 unsafe 主要用于和底层做交互,包括写和 readbegin,该类下的 epollInReady 主要接受为:
do {
// lastBytesRead represents the fd. We use lastBytesRead because it must be set so that the
// EpollRecvByteAllocatorHandle knows if it should try to read again or not when autoRead is
// enabled.
allocHandle.lastBytesRead(socket.accept(acceptedAddress));
if (allocHandle.lastBytesRead() == -1) {
// this means everything was handled for now
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(newChildChannel(allocHandle.lastBytesRead(), acceptedAddress, 1, acceptedAddress[0]));
} while (allocHandle.continueReading());
在 linux 中,socket 获取到一个连接后,会创建新的 socket 对象,具体步骤:
也就是说,accept 会返回一个成功连接好的 fd 作为读写的 socket,这里其实比较奇怪,这样这两个 socket 对象岂不是共享了一个端口?因为这两个 socket 对象是并不是独立的两个进程,可以理解为相同进程里共享了相同的端口。
当 accept 返回-1 的时候,要么是连接失败,要么就是没有新的连接,所以退出了循环。此时在 newChildChannel 这个方法中会创建一个 EpollSocketChannel
,新创建的对象会在触发 EpollServerSocketChannel 中的 read,此时他的 read 方法会被负责 accepter 的 group 注册到 childGroup 中,然后进行真正的读操作。
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
channel 被注册到 child 的 group 后会首先注册一个 EPOLLET 的事件,该事件仅仅是注册了是使用边缘触发,没有传入实际的感兴趣的时间,等到注册后面的 beginRead 方法里才将注册为读事件。此时读事件已经被传递给了子数据,然后就可以开始读取数据了。
最后的读事件是在 AbstractEpollStreamChannel 中完成的。该类中支持一个 spliceTo 的方法,该方法就是直接连接两个套接字的数据。该方法最后调用的是 native.c 中的
res = splice(fd, p_off_in, fdOut, p_off_out, (size_t) len, SPLICE_F_NONBLOCK |SPLICE_F_MOVE);
使用 man 查看解释该方法是不需要内核拷贝的:
splice() moves data between two file descriptors without copying between kernel address space and user address space. It transfers up to len bytes of data from the file descriptor fd_in to the file descriptor fd_out, where one of the file descriptors must refer to a pipe.
数据就被传送到用户自己实现的 handler 中了。当数据处理完毕了,如果需要返回数据,就在当前的 handler 里 writeAndFlush 就行了。
需要注意的是,handler 是可以自己绑定 Executor 的。如 rocketmq 中就将处理的线程模型一步步扩大,1 + N + M1 + M2 的方式,即有一个 accepter,然后 N 个作为 accepter 的子 group 执行数据的读取,然后 M1 个做做一些公共的处理,比如解析解码,验证连接等等,最后真正处理数据的地方则放在 M2 中进行进一步的处理。但是数据的读取还是在 child 的 group 来做的。
数据写入就相对简单,核心就是将数据写入到缓冲区。如果是水平触发模式,那么一旦缓冲区可以写就会不断触发,也就是说可以通过监听事件进行写入,没写完下次仍然会触发。但是边缘触发只会在可写的时候触发一次,也就是说如果后续缓冲区可写了,但是触发过一次了就不会在触发了,所以如果没有写完需要继续注册写事件。Netty 的实现就是上诉边缘触发模式下的写入:
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = config().getWriteSpinCount();
do {
final int msgCount = in.size();
// Do gathering write if the outbound buffer entries start with more than one ByteBuf.
if (msgCount > 1 && in.current() instanceof ByteBuf) {
writeSpinCount -= doWriteMultiple(in);
} else if (msgCount == 0) {
// Wrote all messages.
clearFlag(Native.EPOLLOUT);
// Return here so we not set the EPOLLOUT flag.
return;
} else { // msgCount == 1
writeSpinCount -= doWriteSingle(in);
}
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
} while (writeSpinCount > 0);
if (writeSpinCount == 0) {
// It is possible that we have set EPOLLOUT, woken up by EPOLL because the socket is writable, and then use
// our write quantum. In this case we no longer want to set the EPOLLOUT flag because the socket is still
// writable (as far as we know). We will find out next time we attempt to write if the socket is writable
// and set the EPOLLOUT if necessary.
clearFlag(Native.EPOLLOUT);
// We used our writeSpin quantum, and should try to write again later.
eventLoop().execute(flushTask);
} else {
// Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
// when it can accept more data.
setFlag(Native.EPOLLOUT);
}
}
如果当前的写入 msg 大于 1,则会使用一次性写入多个,这里的自旋次数是 15 次。如果发送过程缓冲区满了,则会返回一个最大的 int 值,然后让循环退出,然后注册写事件。
总结
本文感觉自己的思路有点乱,只能算是个关于 epoll 的小笔记。如果有什么不对的,欢迎订正