29.Netty源码之服务端启动:创建EventLoop&Selector流程

2023年 8月 16日 110.6k 0

源码篇:从 Linux 出发深入剖析服务端启动流程

通过前几章课程的学习,我们已经对 Netty 的技术思想和基本原理有了初步的认识,从今天这节课开始我们将正式进入 Netty 核心源码学习的课程。希望能够通过源码解析的方式让你更加深入理解 Netty 的精髓,如 Netty 的设计思想、工程技巧等,为之后继续深入研究 Netty 打下坚实的基础。

在课程开始之前,我想分享一下关于源码学习的几点经验和建议。

第一,很多同学在开始学习源码时面临的第一个问题就是不知道从何下手,这个时候一定不能对着源码毫无意义地四处翻看。建议你可以通过 Hello World 或者 TestCase 作为源码学习的入口,然后再通过 Debug 断点的方式调试并跑通源码。

第二,阅读源码一定要有全局观。首先要把握源码的主流程,避免刚开始陷入代码细节的死胡同。

第三,源码一定要反复阅读,让自己每一次读都有不同的收获。我们可以通过画图、注释的方式帮助自己更容易理解源码的核心流程,方便后续的复习和回顾。

作为源码解析的第一节课,我们将深入分析 Netty 服务端的启动流程。启动服务的过程中我们可以了解到 Netty 各大核心组件的关系,这将是学习 Netty 源码一个非常好的切入点,让我们一起看看 Netty 的每个零件是如何运转起来的吧。

调试示例代码

位于netty-example工程下的handler工程的 io.netty.example.echo包下

​
package io.netty.example.echo;
​
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
​
public final class EchoClient {
​
    static final boolean SSL = System.getProperty("ssl") != null;
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
​
    public static void main(String[] args) throws Exception {
        // Configure SSL.git
        final SslContext sslCtx;
        if (SSL) {
            sslCtx = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }
​
        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     System.out.println("客户端bootstrap.handler()方法中指定的处理器被调用");
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     }
                    // p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoClientHandler());
                 }
             });
​
            // Start the client.
            ChannelFuture f = b.connect(HOST, PORT).sync();
​
            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down the event loop to terminate all threads.
            group.shutdownGracefully();
        }
    }
}
​
​
package io.netty.example.echo;
​
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
​
/**
 * Handler implementation for the echo client.  It initiates the ping-pong
 * traffic between the echo client and server by sending the first message to
 * the server.
 */
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
​
    private final ByteBuf firstMessage;
​
    /**
     * Creates a client-side handler.
     */
    public EchoClientHandler() {
        firstMessage = Unpooled.buffer(EchoClient.SIZE);
       for (int i = 0; i < firstMessage.capacity(); i ++) {
            firstMessage.writeByte((byte) i);
        }
       firstMessage.writeByte((byte) 98);
    }
​
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("EchoClientHandler.channelActive");
        ctx.writeAndFlush(firstMessage);
    }
​
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("客户端EchoClientHandler#channelRead被调用");
        System.out.println("客户端收到数据" + msg);
        System.out.println("客户端发送数据" + msg);
        ctx.writeAndFlush(msg);
    }
​
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
       ctx.flush();
    }
​
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
​
​
package io.netty.example.echo;
​
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.ClientLoggingHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.logging.ServerLoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
​
/**
 * Echoes back any received data from a client.
 */
public final class EchoServer {
​
    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
​
    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }
​
        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new ServerLoggingHandler(LogLevel.INFO))
              //两种设置keepalive风格
             .childOption(ChannelOption.SO_KEEPALIVE, true)
             .childOption(NioChannelOption.SO_KEEPALIVE, true)
​
              //切换到unpooled的方式之一
             .childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT)
                    .childHandler(new ChannelInitializer() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(new ClientLoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });
​
            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();
​
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
​

package io.netty.example.echo;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
* Handler implementation for the echo server.
*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

   @Override
   public void channelRead(final ChannelHandlerContext ctx, Object msg) {
       ctx.channel().eventLoop().schedule(new Runnable() {
           @Override
           public void run() {
               System.out.println(new Date().toString() + ":服务器收到消息");
               try {
                   Thread.sleep(2 * 1000);
                   ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论