ServerBootstrapAcceptor:接收连接的核心

2023年 10月 11日 69.5k 0

ServerBootstrapAcceptor是Netty服务端用来接收客户端连接的核心类,之前的文章在分析Netty服务端启动全流程的时候有提到过一嘴,今天这篇文章会详细分析一下。

1. 何时被添加到Pipeline?

服务端启动时,会调用ServerBootstrap.bind()绑定本地端口用来监听客户端的连接,这个方法会通过反射创建ServerSocketChannel并初始化,ServerBootstrap.init()会初始化ServerSocketChannel,将ServerBootstrapAcceptor添加到服务端Channel的Pipeline中,源码如下:

// 服务端Channel初始化
@Override
void init(Channel channel) {// 这里的channel是ServerSocketChannel
	// 设置options
	setChannelOptions(channel, newOptionsArray(), logger);
	// 设置attrs
	setAttributes(channel, newAttributesArray());


	// 初始化ServerSocketChannel的ChannelPipeline
	ChannelPipeline p = channel.pipeline();

	final EventLoopGroup currentChildGroup = childGroup;
	final ChannelHandler currentChildHandler = childHandler;
	// 和ServerSocketChannel建立连接的客户端SocketChannel需要设置的options和attrs
	final Entry, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

	/*
	往服务端Channel添加Handler:
		1.封装HandlerAdded回调任务,保存在PendingHandlerCallback
		2.后续的register()操作会触发回调:pipeline.invokeHandlerAddedIfNeeded();
	 */
	p.addLast(new ChannelInitializer() {
		/*
		initChannel()何时被调用?
			ChannelHandler被添加到Pipeline有一个对应的回调:handlerAdded()
			addLast()会提交一个任务,让EventLoop来触发这个回调
			ChannelInitializer在handlerAdded()回调里会执行该初始化方法。
		 */
		@Override
		public void initChannel(final Channel ch) {
			final ChannelPipeline pipeline = ch.pipeline();
			ChannelHandler handler = config.handler();//ServerBootstrap.handler()设置的
			if (handler != null) {
				pipeline.addLast(handler);
			}

			ch.eventLoop().execute(new Runnable() {
				@Override
				public void run() {
					// ServerBootstrapAcceptor是服务端接收客户端连接的核心
					pipeline.addLast(new ServerBootstrapAcceptor(
							ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
				}
			});
		}
	});
}

初始化时,会往ServerSocketChannel的Pipeline中添加了一个ChannelInitializer,这里有必要说一下ChannelInitializer。

ChannelInitializer是一个特殊的ChannelHandler,它本身不处理任何出站/入站事件,它的目的仅仅是完成Channel的初始化。我们知道,ChannelHandler被添加到Pipeline后会触发一个handlerAdded回调,ChannelInitializer在这个回调里会调用initChannel()进行初始化,初始化完成后会将自己从Pipeline中删除,源码如下:

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        // 初始化Channel
        if (initChannel(ctx)) {
            // 将自己从Pipeline中移除
            removeState(ctx);
        }
    }
}

回到ServerBootstrapAcceptor,它在初始化ServerSocketChannel时会向它的Pipeline中添加ServerBootstrap.handler()用户设置的自定义ChannelHandler,当然你也可以不设置,它会判空校验。添加完用户自定义的ChannelHandler后,它会再追加一个ServerBootstrapAcceptor,这样ServerBootstrapAcceptor就可以处理入站事件了,即处理客户端的连接。

2. 源码分析

ServerBootstrapAcceptor的源码很简单,只有不到小一百行,它的职责也很简单,只负责处理客户端新的连接建立,并把连接注册到WorkerGroup中,仅此而已。

通过查看类结构,发现它只实现了ChannelInboundHandler接口,这代表它对入站事件感兴趣。本来嘛,它负责新连接的建立,肯定只对OP_ACCEPT事件感兴趣了。
ServerBootstrapAcceptor.png

再看下它的属性,也比较简单,因为它负责客户端Channel的建立和初始化,因此需要childChannel相关的配置信息。

private final EventLoopGroup childGroup;// Reactor模型中的WorkerGroup
private final ChannelHandler childHandler;// 客户端Channel的ChannelHandler
private final Entry, Object>[] childAttrs;// 客户端Channel的Attrs
private final Runnable enableAutoReadTask; // 启用自动读取的任务

构造函数就是基本的赋值操作,这里就不贴代码了。

它只重写了channelReadexceptionCaught方法,代表它只对这两个事件感兴趣。当有Selector有OP_ACCEPT事件到达时,NioEventLoop会接收客户端连接,创建SocketChannel,并触发channelRead回调,看下它是如何处理新的连接的。

/**
 * 有客户端连接时,触发.
 * NioEventLoop会监听Selector事件,OP_ACCEPT事件到达时,触发Unsafe.read()。
 * {@link AbstractNioMessageChannel.NioMessageUnsafe#read()}
 * 它会调用ServerSocketChannel.accept()获取客户端连接,并触发channelRead()回调。
 */
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
	final Channel child = (Channel) msg;// 这里的Channel是SocketChannel

	// 设置客户端Channel的Pipeline、Options、Attrs
	child.pipeline().addLast(childHandler);
	setChannelOptions(child, childOptions, logger);
	setAttributes(child, childAttrs);

	try {
		/*
		将客户端Channel注册到WorkerGroup:
			1.next()轮询出一个EventLoop.register()
			2.Channel.Unsafe.register(),Channel注册到Selector
			3.触发各种回调
		Channel一旦注册到EventLoop,就由该EventLoop负责处理它整个生命周期的所有事件。
		 */
		childGroup.register(child).addListener(new ChannelFutureListener() {
			@Override
			public void operationComplete(ChannelFuture future) throws Exception {
				// 如果注册失败,强制关闭连接
				if (!future.isSuccess()) {
					// 底层就是调用原生JDK的关闭方法:javaChannel().close();
					forceClose(child, future.cause());
				}
			}
		});
	} catch (Throwable t) {
		forceClose(child, t);
	}
}

大致步骤如下:

  • 设置SocketChannel的Pipeline。
  • 设置Options。
  • 设置Attrs。
  • 将SocketChannel注册到WorkerGroup中。
  • Netty采用的Reactor主从线程模型,BossGroup负责连接的建立,WorkerGroup负责后续连接的读写。所以ServerBootstrapAcceptor在客户端Channel连接建立后会将它注册到WorkerGroup中。

    当整个过程出现异常时,Netty会强制关闭连接,调用forceClose(),底层还是调用了JDK底层的SocketChannel.close()方法关闭连接。

    // 强制关闭连接
    private static void forceClose(Channel child, Throwable t) {
        /**
        * 底层还是调用了SocketChannel.close()
        * {@link io.netty.channel.socket.nio.NioSocketChannel#doClose()}
        */
        child.unsafe().closeForcibly();
        logger.warn("Failed to register an accepted channel: {}", child, t);
    }
    

    ChannelRead事件异常了,Pipeline还会传播异常事件,执行exceptionCaught回调。ServerBootstrapAcceptor面对异常时,会暂停1秒停止接受客户端连接,等待ServerSocketChannel恢复,并将异常事件传播出去。

    // 处理异常事件
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        final ChannelConfig config = ctx.channel().config();
        if (config.isAutoRead()) {
            // 1秒内停止接收新客户端
            config.setAutoRead(false);
            ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
        }
        // still let the exceptionCaught event flow through the pipeline to give the user
        // a chance to do something with it
        // 将异常事件传播出去
        ctx.fireExceptionCaught(cause);
    }
    

    至此,ServerBootstrapAcceptor所有的源码都分析完了,超级简单。

    3. 何时触发channelRead()?

    上面只对ServerBootstrapAcceptor的源码进行了分析,却没有说当有新的连接进来时,Netty是何时调用的channelRead()方法的,这节会给你答案。

    当调用ServerBootstrap.bind()方法时,Netty会创建ServerSocketChannel,并把它注册到BossGroup的NioEventLoop的Selector多路复用器,最后再绑定到本地端口。

    这样Netty就可以接收客户端的连接了,当有新的连接接入时,Selector会监听到并返回准备就绪的Channel,NioEventLoop会处理这些事件,详见NioEventLoop.processSelectedKey()方法。

    由于事件类型是OP_ACCEPT,因此会调用Unsafe.read()处理,源码如下:

     // 数据可读、有新的连接接入
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        // 对于ServerSocketChannel只关心OP_ACCEPT事件
        unsafe.read();
    }
    

    这个Unsafe接口有两大实现,分别是服务端Channel的Unsafe和客户端Channel的Unsafe。前者的read负责接收SocketChannel连接,后者的read负责读取对端发送的数据并封装成ByteBuf。

    对于服务端的Unsafe.read(),这里会执行io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe.read()方法,它会调用JDK底层的ServerSocketChannel.accept()接收到客户端的连接后,将其封装成Netty的NioSocketChannel,再通过Pipeline将ChannelRead事件传播出去,这样ServerBootstrapAcceptor就可以在ChannelRead回调里处理新的客户端连接了。

    /*
    NioEventLoop.processSelectedKey() 当Channel有 OP_READ | OP_ACCEPT 事件时调用该方法。
    对于服务端Channel来说,就是 OP_ACCEPT.
     */
    @Override
    public void read() {
    	assert eventLoop().inEventLoop();
    	final ChannelConfig config = config();
    	final ChannelPipeline pipeline = pipeline();
    	// 接收对端数据时,ByteBuf的分配策略,基于历史数据动态调整初始化大小,避免太大浪费空间,太小又会频繁扩容
    	final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    	allocHandle.reset(config);
    
    	boolean closed = false;
    	Throwable exception = null;
    	try {
    		try {
    			do {
    		/*
    		对于ServerSocketChannel来说,就是接收一个客户端Channel,添加到readBuf
    		 */
    				int localRead = doReadMessages(readBuf);
    				if (localRead == 0) {
    					break;
    				}
    				if (localRead < 0) {
    					closed = true;
    					break;
    				}
    				// 递增已读取的消息数量
    				allocHandle.incMessagesRead(localRead);
    			} while (continueReading(allocHandle));
    		} catch (Throwable t) {
    			exception = t;
    		}
    
    		int size = readBuf.size();
    		for (int i = 0; i < size; i++) {
    			readPending = false;
    			/**
    			 * 通过pipeline传播ChannelRead事件
    			 * {@link io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object)}
    			 */
    			pipeline.fireChannelRead(readBuf.get(i));
    		}
    		readBuf.clear();
    		// 读取完毕的回调,有的Handle会根据本次读取的总字节数,自适应调整下次应该分配的缓冲区大小
    		allocHandle.readComplete();
    		// 通过pipeline传播ChannelReadComplete事件
    		pipeline.fireChannelReadComplete();
    
    		if (exception != null) {// 事件处理异常了
    			// 是否需要关闭连接
    			closed = closeOnReadError(exception);
    
    			// 通过pipeline传播异常事件
    			pipeline.fireExceptionCaught(exception);
    		}
    
    		if (closed) {//如果需要关闭,那就关闭
    			inputShutdown = true;
    			if (isOpen()) {
    				close(voidPromise());
    			}
    		}
    	} finally {
    		if (!readPending && !config.isAutoRead()) {
    			removeReadOp();
    		}
    	}
    }
    

    4. 总结

    ServerBootstrapAcceptor是一个特殊的ChannelHandler,它是Netty服务端用来接收客户端连接的核心类。ServerBootstrap在初始化ServerSocketChannel时,会往它的Pipeline中添加ServerBootstrapAcceptor,ServerBootstrapAcceptor重写了ChannelRead回调,当NioEventLoop检测到有OP_ACCEPT事件到达时会执行NioMessageUnsafe.read()方法,它会调用JDK底层的API接收客户端连接,并把它作为msg触发ChannelRead回调,这样ServerBootstrapAcceptor就可以拿到客户端连接,帮助它进行初始化并注册到WorkerGroup中。

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论