Netty 网络编程

2023年 7月 14日 49.3k 0

Netty 入门

概述

什么是 Netty?

  • 异步:用的多线程,不是异步 IO
  • 基于事件驱动:表示用的 Selector

Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

Netty 的地位

Netty 在 Java 网络应用框架中的地位就好比:Spring 框架在 JavaEE 开发中的地位

以下的框架都使用了 Netty,因为它们有网络通信需求!

  • Cassandra - nosql 数据库
  • Spark - 大数据分布式计算框架
  • Hadoop - 大数据分布式存储框架
  • RocketMQ - ali 开源的消息队列
  • ElasticSearch - 搜索引擎
  • gRPC - rpc 框架
  • Dubbo - rpc 框架
  • Spring 5.x - flux api 完全抛弃了 tomcat ,使用 netty 作为服务器端
  • Zookeeper - 分布式协调框架

Netty 的优势

Netty vs NIO,工作量大,bug 多

  • 需要自己构建协议
  • 解决 TCP 传输问题,如粘包、半包
  • epoll 空轮询导致 CPU 100%
  • 对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer

Netty vs 其它网络应用框架

  • Mina 由 apache 维护,将来 3.x 版本可能会有较大重构,破坏 API 向下兼容性,Netty 的开发迭代更迅速,API 更简洁、文档更优秀

创建服务器 / 客户端

  • 首先创建启动器
  • 创建NioEventLoopGroup基于NIO服务端实现
  • childHandler表示添加的处理器都是给SocketChannel用的
  • ChannelInitializer 仅仅执行一次
  • 客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
  • new ServerBootstrap().bind 绑定监听端口
  • 服务端

    // 1. 启动器,负责组装 netty组件,启动服务器
    new ServerBootstrap()
            // BossEventLoop,WorkerEventLoop(selector, thread)
            // 创建一个事件循环线程组,适用于 NIO 实现,可以简单理解为 `线程池 + Selector`
            .group(new NioEventLoopGroup())
            // 指定要使用的服务器 Channel 实现,适用于 NIO 实现
            .channel(NioServerSocketChannel.class) // OIO BIO
            // BOSS 负责处理连接Work(child)负责处理读写,决定了work(child)能执行哪些(handler)
            // 配置服务器 Channel 处理器,即 ChannelPipeline 中的一组 ChannelHandler
            .childHandler(
                    // 5. channel代表客户端进行数据读写的通道Initializer初始化,负责添加到别的 handler
                    new ChannelInitializer() {
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                            // 添加具体的 handler
                            // 将 bytebuf 转换为字符串
                            nioSocketChannel.pipeline().addLast(new StringDecoder());
                            // 自定义handler
                            nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override// 读事件
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    // 打印转好的字符串
                                    System.out.println(msg);
                                }
                            });
                        }
                    })
            // 监听端口
            .bind(8080);
    

    客户端

    // 启动类
    new Bootstrap()
            // 添加 EventLoop
            .group(new NioEventLoopGroup())
            // 选择客户端 channel 实现
            .channel(NioSocketChannel.class)
            // 添加处理器
            .handler(new ChannelInitializer() {
                @Override // 在建立连接后被调用
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    // 把字符串编码成字节
                    nioSocketChannel.pipeline().addLast(new StringEncoder());
                }
            })
            // 连接服务器
            .connect(new InetSocketAddress("localhost", 8080))
            .sync()
            .channel()
            // 向服务器发送数据
            .writeAndFlush("hello, world");
    

    流程分析

  • 先创建启动器类
  • 添加组件,eventloop(内部就有线程和选择器不断循环,查找事件)
  • 选择NIOServerSocket 实现
  • 添加处理器,只有连接事件发生之后才会执行 initChannel初始化方法
  • 绑定监听端口(服务端就到这里)
  • (客户端)创建启动器和eventloop
  • 客户端选择socket事件
  • 添加处理器,也是等连接建立才会执行初始化方法
  • 最后连接服务器
  • 服务器监听到accept事件之后
  • 最后找处理器处理这个事件
  • (我们看不懂事线),连接建立后调用初始化方法
  • 客户端 sync 只有连接之后才会继续执行
  • channel() 这个是连接对象
  • 最后就可以读写
  • 发数据就会走到处理器内部
  • 进行转为字节数组进行 bytebuf进行发送
  • 服务端的eventloop就会监听到读事件
  • 走到了服务器的处理器,进行处理
  • channel数据传输通道,可读出来可写进。和nio概念一致

    handel中的message流动数据,handel 是一个工序,对原始数据进行一道道工序进行处理

    pipeline就是流水线,一道工序,进行添加工序。

    handel 分为 inbound和outbound
    入站和出站 读入就走入站,写出就走出站

    eventloop就是线程,相当于工人
    一旦某一个工人负责一个事件,那就会负责到底。一个工人是可以管理多个事件的

    eventloop既可以执行io操作,也可以进行普通任务,每个工人都有自己的任务队列,依次处理。底层用的单线程的线程池。

    任务可以是定时任务也可以是普通任务

    组件

    EventLoop

    EventLoop是事件循环对象

    本质是一个单线程执行器,同时维护了Selector,里面有run方法处理 channel 上源源不断的 io 事件。

    继承关系:

    • 一条线是继承自 java.util.concurrent.ScheduledExecutorService
    • 因此包含了线程池的所有方法
    • 另一条是继承自 netty 自己的 OrderedEventExecutor
    • 提供了 Boolean inEventLoop(Thread thread)方法判断一个线程是否属于此EventLoop
    • 提供了 parent方法来看看自己属于哪个 EventLoopGroup
    // eventloop
    public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
        EventLoopGroup parent();
    }
    
    // 继承线程池接口
    public interface EventExecutorGroup extends ScheduledExecutorService, Iterable {
    

    EventLoopGroup 事件循环组,我们一般使用这个

    EventLoopGroup 是一组 EventLoop,channel 一般会调用 EventLoopGroup 的 register 方法俩绑定其中一个 EventLoop,后续这个 channel 上的 io 事件都由同一个 EventLoop 来处理(保证了 io 事件处理时的线程安全)

    • 继承自 netty 自己的 EventExecutorGroup
    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 方法获取集合中下一个 EventLoop

    我们平常使用的是下面两个循环组实现类

    // 可以实现普通任务,io事件,定时任务
    EventLoopGroup nioGroup = new NioEventLoopGroup();
    // 普通任务和定时任务
    EventLoopGroup defaultGroup = new DefaultEventLoopGroup();
    

    那么我们空参创建对象,线程数是多少呢

    下面这段代码,是用来初始化 EventLoopGroup对象的。

    如果没有指定线程数,会采用默认的,也就是当前系统的CPU核心数 * 2

    /**
     * 构造函数,创建MultithreadEventLoopGroup对象
     * @param nThreads 表示EventLoopGroup中EventLoop的数量,如果为0则使用默认的线程数
     * @param executor 用于执行任务的Executor对象
     * @param args 可选参数列表
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        // 调用父类的构造函数,初始化EventLoopGroup对象
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    

    获取当前CPU核数--我的是 16核

    NettyRuntime.availableProcessors();
    

    指定线程数量

    EventLoopGroup nioGroup = new NioEventLoopGroup(2);
    

    获取下一个线程

    // 创建两个线程的事件循环组
    EventLoopGroup nioGroup = new NioEventLoopGroup(2);
    // 获取下一个线程
    System.out.println(nioGroup.next()); // 第一次打印第一个
    System.out.println(nioGroup.next()); // 打印第二个
    System.out.println(nioGroup.next()); // 打印第一个,因为一共就两个
    System.out.println(nioGroup.next()); // 打印第二个
    // io.netty.channel.nio.NioEventLoop@737996a0
    // io.netty.channel.nio.NioEventLoop@61dc03ce
    // io.netty.channel.nio.NioEventLoop@737996a0
    // io.netty.channel.nio.NioEventLoop@61dc03ce
    

    执行普通任务

    • 这里的execute和submit都是一样的效果
    // 创建两个线程的事件循环组
    EventLoopGroup nioGroup = new NioEventLoopGroup(2);
    // 执行普通任务
    nioGroup.next().execute(() -> {
       log.debug("ok");
    });
    log.debug("main");
    

    定时任务

    // 创建两个线程的事件循环组
    EventLoopGroup nioGroup = new NioEventLoopGroup(2);
    // 执行定时任务 以一定的频率执行
    // 参数1:执行的方法
    // 参数2:表示第一次启动多久开始
    // 参数3:表示间隔多久再次触发
    // 参数4:单位
    nioGroup.next().scheduleAtFixedRate(() -> {
       log.debug("ok");
    }, 3, 5, TimeUnit.SECONDS);
    log.debug("main");
    

    IO 事件

    下面是服务端,客户端同上面。

    一个客户端绑定一个服务的 EventLoop线程。

    下面再编写客户端的时候,如果要打上断点实现阻塞,需要将idea的断点卡成单线程的

    image.png

    new ServerBootstrap()
            .group(new NioEventLoopGroup())
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        // 如果我们没有用 nio的处理字节方法,这个msg是 bytebuf类型
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            ByteBuf buf = (ByteBuf) msg;
                            // 这个实际开发是要指定字符类型的,不要默认
                            log.debug(buf.toString(Charset.defaultCharset()));
                        }
                    });
                }
            })
            .bind(8080);
    

    我们来分工细化一下

  • 我们可以把 eventloop 划分为 boss 和 work
  • 将accept 和 read 分开处理
  • 那 第一个事件循环组 线程是否可以设置为 1 呢

    因为服务器这一有个,它也只会和里面一个 eventloop 进行绑定

    new ServerBootstrap()
            // 将 参数1:只处理accept 参数2:处理read
            .group(new NioEventLoopGroup(), new NioEventLoopGroup())
    

    继续分工细化--如果其中一个Nio线程执行中
    在读操作时执行太久,会影响其他 channel读操作
    最好不要让它占用 work nio线程,所以我们继续细分

  • 创建独立的事件循环对象,因为不需要进行io所以是普通的
  • 将下个处理器绑定上。
  • // 细分2:创建独立的 EventLoopGroup
    DefaultEventLoopGroup group = new DefaultEventLoopGroup();
    new ServerBootstrap()
            .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    nioSocketChannel.pipeline()
                            .addLast("handler1", new ChannelInboundHandlerAdapter() {
                                @Override
                                // 如果我们没有用 nio的处理字节方法,这个msg是 bytebuf类型
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    ByteBuf buf = (ByteBuf) msg;
                                    // 这个实际开发是要指定字符类型的,不要默认
                                    log.debug(buf.toString(Charset.defaultCharset()));
                                    // 让消息传递给下一个handler
                                    ctx.fireChannelRead(msg);
                                }
                            })
                            // 指定事件循环组和名称
                            .addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
                                @Override
                                // 如果我们没有用 nio的处理字节方法,这个msg是 bytebuf类型
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    ByteBuf buf = (ByteBuf) msg;
                                    // 这个实际开发是要指定字符类型的,不要默认
                                    log.debug(buf.toString(Charset.defaultCharset()));
                                }
                            });
                }
            })
            .bind(8080);
    

    image.png

    那么是怎么实现切换事件循环组,也就是换人处理的呢?

    如果两个 handler 绑定的是同一个线程,那么直接调用,否则调用的代码封装为一个任务对象。由一下一个 handler的线程调用

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        // 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
        /**
        *    EventExecutor 是事件循环组
        */
        EventExecutor executor = next.executor();
        
        // 当前 handler 中的线程是否和 eventloop 是同一个线程
        // 是,直接调用
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } 
        // 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
        else {
        // 使用runnable 执行一个事件
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }
    

    Channel & 连接 & 关闭问题 & 异步

    channel 的主要作用

    • close() 可以用来关闭 channel
    • closeFuture() 用来处理 channel 的关闭
      • sync 方法作用是同步等待 channel 关闭
      • 而 addListener 方法是异步等待 channel 关闭
    • pipeline() 方法添加处理器
    • write() 方法将数据写入,只是写进入缓冲区
    • writeAndFlush() 方法将数据写入并刷出

    connect 连接问题

    connect:是异步阻塞,main方法发起,执行是Nio的线程

    所以如果在发送数据前,一定要保证已经连接好了,否则数据是发送不出去的

    ChannelFuture:带有Future或者 promise都是和异步配套使用,用来处理结果

    ChannelFuture channelFuture = new Bootstrap()
            .group(new NioEventLoopGroup())
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    nioSocketChannel.pipeline().addLast(new StringEncoder());
                }
            })
            // connect 是异步阻塞,main方法发起调用,真正执行连接的是Nio线程
            .connect(new InetSocketAddress("localhost", 8080));
    //        channelFuture.sync();
    // 会无阻塞向下执行获取channel,最后发送数据
    Channel channel = channelFuture.channel();
    //        log.debug("{}", channel); // 如果没有获取到打印的就是没有连接的 channel,所以需要执行 sync
    channel.writeAndFlush("123");
    

    解决连接问题,保证发送数据前一定是正确连接的

  • sync
  • // 阻塞当前线程,直到连接建立完毕
    channelFuture.sync();
    Channel channel = channelFuture.channel();
    log.debug("{}", channel);
    channel.writeAndFlush("123");
    
  • 将执行发送的代码,交给nio线程。
    • addListener(回调对象)方法异步处理结果
    channelFuture.addListener(new ChannelFutureListener() {
        @Override
        // 在nio 线程连接建立好之后,会调用operationComplete
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            Channel channel = channelFuture.channel();
            log.debug("{}", channel);
            channel.writeAndFlush("123");
        }
    });
    

    关闭问题

    我们看下面这段代码

    我们可以输入内容,发送给服务端,按 q 关闭传输通道

    但是我们关闭的时候,想做一些操作,怎么整
    直接在close下面写吗,那是不对的,因为close方法是异步的

    channelFuture.sync();
    Channel channel = channelFuture.channel();
    new Thread(new Runnable() {
        @Override
        public void run() {
            Scanner sc = new Scanner(System.in);
            while (true) {
                String line = sc.nextLine();
                if ("q".equals(line)) {
                    channel.close();
                    break;
                }
                channel.writeAndFlush(line);
            }
        }
    }, "input").start();
    

    解决异步问题

    添加日志调试

    nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
    
  • 同步解决---由主线程处理关闭操作
  • new Thread(() -> {
        Scanner sc = new Scanner(System.in);
        while (true) {
            String line = sc.nextLine();
            if ("q".equals(line)) {
                channel.close();
                break;
            }
            channel.writeAndFlush(line);
        }
    }, "input").start();
    // 添加关闭处理器
    ChannelFuture closeFuture = channel.closeFuture();
    closeFuture.sync();// 设置同步
    log.debug("处理关闭之后....");
    
  • 异步解决--由 nio 线程处理
  • new Thread(() -> {
        Scanner sc = new Scanner(System.in);
        while (true) {
            String line = sc.nextLine();
            if ("q".equals(line)) {
                channel.close();
                break;
            }
            channel.writeAndFlush(line);
        }
    }, "input").start();
    ChannelFuture closeFuture = channel.closeFuture();
    
    // 设置关闭回调函数
    closeFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            log.debug("处理关闭之后....");
        }
    });
    

    优雅的关闭

    • 它会等待所有正在处理的任务完成后,再进行关闭。
    NioEventLoopGroup group = new NioEventLoopGroup();
            ChannelFuture channelFuture = new Bootstrap()
                    .group(group)
    
    closeFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            log.debug("处理关闭之后的操作");
            group.shutdownGracefully(); // 优雅的关闭
        }
    });
    

    异步

    异步:就是一个线程发起连接一个线程去建立连接

    思考下面的场景,4 个医生给人看病,每个病人花费 20 分钟,而且医生看病的过程中是以病人为单位的,一个病人看完了,才能看下一个病人。假设病人源源不断地来,可以计算一下 4 个医生一天工作 8 小时,处理的病人总数是:4 * 8 * 3 = 96个病人

    image.png

    那我们就可以细分一下,四个医生分别处理四个事情

    只有一开始,医生 2、3、4 分别要等待 5、10、15 分钟才能执行工作,但只要后续病人源源不断地来,他们就能够满负荷工作,并且处理病人的能力提高到了 4 * 8 * 12 效率几乎是原来的四倍

    image.png

    重点:

  • 单线程没法异步提高效率,必须配合多线程、多核CPU才能发挥异步的优势
  • 异步并没有缩短响应时间,反而有所增加
  • 合理进行任务拆分,也是利用异步的关键
  • Future & Promise

    处理异步时:经常用到的两个接口, futer 核 promise

    首先要说明 netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展

    • jdk Future:只能同步等待任务结束(或成功、或失败)才能得到结果
    • netty Future:可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
    • netty Promise;不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
    功能/名称 jdk Future netty Future Promise
    cancel 取消任务 - -
    isCanceled 任务是否取消 - -
    isDone 任务是否完成,不能区分成功失败 - -
    get 获取任务结果,阻塞等待 - -
    getNow - 获取任务结果,非阻塞,还未产生结果时返回 null -
    await - 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 -
    sync - 等待任务结束,如果任务失败,抛出异常 -
    isSuccess - 判断任务是否成功 -
    cause - 获取失败信息,非阻塞,如果没有失败,返回null -
    addLinstener - 添加回调,异步接收结果 -
    setSuccess - - 设置成功结果
    setFailure - - 设置失败结果

    jdk future

    // 线程池
    ExecutorService pool = Executors.newFixedThreadPool(2);
    // 提交任务
    Future future = pool.submit(new Callable() {
        @Override
        public Integer call() throws Exception {
            log.debug("执行计算"); // poll 线程
            Thread.sleep(1000);
            return 50;
        }
    });
    // 主线程通过 future 获取结果
    log.debug("等待结果"); // 主线程
    log.debug("结果是 {}", future.get()); // 主线程
    

    nio future

    // 创建事件循环组
    NioEventLoopGroup eventGroup = new NioEventLoopGroup();
    // 获取执行事件
    EventLoop eventLoop = eventGroup.next();
    // 执行方法
    Future future = eventLoop.submit(new Callable() {
        @Override
        public Integer call() throws Exception {
            log.debug("执行计算"); // nio 线程
            Thread.sleep(1000);
            return 50;
        }
    });
    

    同步获取线程返回结果

    // 主线程通过 future 获取结果
    log.debug("等待结果"); // 主线程
    log.debug("结果是 {}", future.get()); // 主线程
    

    异步

    future.addListener(new GenericFutureListener

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论