深入浅出指南:Netty开发【NIO核心组件

2023年 7月 31日 107.8k 0

image.png

关于我:我是山茶君nlefer,一个专注于技术的菜鸟。你懂的越多,就懂得不懂的越多。

1.NIO基础概念

NIO(Non-Blocking IO 也称为New IO),JDK提供的新API。从JDK1.4开始,Java提供了一系列改进的输入/输出的新特性,被统称为NIO(Non-Blocking IO 也称为New IO),是同步非阻塞的

2.NIO核心组件

2.1.Channel&&Buffer简介

Channel是一种管道,类似于stream,可以用来数据的传输。但Channel是基于Buffer缓冲区的异步的双向的数据读写通道,即可以从Buffer中读取数据,也可以向Buffer中写入数据。

常见的Channel有
	● FileChannel
	● DatagramChannel
	● SocketChannel
	● ServerSocketChannel
其中SocketChannel和ServerSocketChannel可以被使用在服务端与客户端的通信。

Buffer被用作为缓冲区存储数据,读写数据发生的位置,常见的有:
	● ByteBuffer
      ○ MappedByteBuffer
      ○ DirectByteBuffer
      ○ HeapByteBuffer
    ●  ShortBuffer
    ● IntBuffer
    ● LongBuffer
    ● FloatBuffer
    ● DoubleBuffer
    ● CharBuffer

2.2.Selector

  • Selector 为IO多路复用选择器,轮询检测注册在selector上的服务端状态,如果有事件发生,则获取对应的事件,针对事件的不同进行分类别的处理。

  • 也就是说可以使用一个线程管理多个Channel,也就是管理多个请求与连接。

  • Selector 维护了三个set集合,主要展现为整体步骤中的连接存储、获取连接通道信息、断开连接响应(也就是断开监控)

    • key set :通道注册时,使用register()方法将对应的cahnnel信息添加到SelectionKey中;
    • selected key set :在做轮询检测时,通过遍历Set selectionKeys = selector.selectedKeys(); 获取相应的key也就是存储的channel信息;
    • canneled key set:客户端主动断开连接或以外断开连接时,该事件已经不存在了,但在服务端的key集合中依旧存在,下次检测的时候会因此而出现问题,所以,需要通过判断客户端发送事件的返回值来确定是否将对应的key的SelectionKey从key set中移除, 将关联channel丢弃掉,不再进行监听;
  • 通过服务器的设计演变进化,可以进一步了解Selector的功能以及作用

  • 服务器的多线程版本

    多线程操作,既使用了一个连接,那就开一个线程去处理这个连接的所有事物,当连接没有事件发生的时候,对应的线程就等待着,一直等到这个连接有事情发生或者是连接断开。可以类比为餐厅服务员与顾客的关系,一个服务员专门服务于一个顾客。

    缺点与不足:

    • 当连接多了的时候,开的线程数量较多,会占用较大的内存
    • 线程的上下文切换成本比较高(一个线程被暂停,另一个线程被操作系统选择选中开始执行的过程就叫做上下文切换)
    • 场景使用仅适用于连接较少的情况

    服务器的线程池版本

    针对于上述多线程版本的缺点,做了进一步的改进。较多线程版本资源使用有所降低,但是随之而来的是性能的问题。(就好比一个餐厅服务员耳听八方,眼观四路一样,服务于多个顾客,但是当顾客同事发起请求的时候,服务生就只能处理一个请求事件,就会造成阻塞的情况发生。因此才会继续诞生selector模式,线程池中设定固定的线程数)。

    缺点与不足:

  • 阻塞模式下,线程仅能处理一个socket连接(处理scoket1的时候就不能处理scoket3,只有等代1处理完断开连接后才能处理3)

  • 仅适合短连接(避免线程处理一个socket,造成阻塞,导致下一个socket无法被处理)

  • 服务器的selector版本

    如图,依旧以餐厅为例,露天餐厅经历了前两次的失败,进化了成了一个小馆子。channel就是我们的多个顾客,selecor就好比一个监控视频头,能够通知thread服务员。当有channel发生事件时,seletcor会将对应事件通知到服务员(thread)去进行处理。处理完该顾客后,继续等待事件的请求,不会阻塞。与线程池版不同的是,channel是工作在非阻塞环境下的,因此selector的效率有较大提高。

    缺点与不足:

  • 如果当前请求和事务的量级较大,线程就会一直处理该事务,其他的channel请求也会排队等待处理。
  • 2.3.Buffer

    ByteBuffer为NIO中的字节缓冲区,相对于BIO的Stream流只支持写入或者读取单向操作,ByteBuffer是双向的,支持读和写。

    0.ByteBuffer的正确使用流程

  • 向buffer中写入数据

    • channel的read()方法
    • buffer自身的put方法
  • 调用filp()方法切换为读取数据模式

  • 从buffer中读取数据

    • channel的write()方法
    • buffer的get()方法
  • 调用clear()或compact()切换回写模式继续写入数据

  • 重复以上步骤

  • ByteBuffer byteBuffer = ByteBuffer.allocate(10);
    // 写入数据到buffer;两种方式均可以
    int readByte = channel.read(byteBuffer);
    byteBuffer.put((byte) 0x61);
    // 切换为读数据模式
    byteBuffer.filp();
    // 写入数据到buffer;两种方式均可以
    int writeByte = channel.write(byteBuffer);
    byte b = byteBuffer.get();
    

    1.ByteBuffer类型简介

    类型 简介 方法
    DirectByteBuffer 使用的是操作系统级别的内存,分配比较慢,但是数据的读写比较快,因为少了一次从系统内存到JVM内存的复制过程 初始化方法ByteBuffer.allocateDirect(1024 * 4);
    HeapByteBuffer 使用的是JVM的堆内存,对于JVM来说,分配比较快,但是读写比较慢,因为需要将操作系统内存里的数据复制到JVM内存,且java的gc会对其有影响,因为gc会对内存模块进行整理 初始化方法ByteBuffer.allocate(1024 * 4);

    2.ByteBuffer核心属性说明

    属性名称 属性说明
    capacity ByteBuffer的容量,这个值在ByteBuffer初始化的时候就确定下来了。不论是在读还是在写模式下,这个值都不变
    position 写模式:该值表示当前写到了ByteBuffer的哪个位置,ByteBuffer初始化时,这个值为0。position的最大值为capacity-1读模式:当从写模式切换到读模式,会将position重置为0,即从ByteBuffer的起始位置开始读取数据
    limit 写模式:limit为最大可写入的数据量,即ByteBuffer的最大容量,值为capacity读模式:当从写模式切换从读模式,limit将会被设置为读模式下的position值,即可读取的最大数据量

    3.ByteBuffer核心方法

    方法 详情
    flip() 将写模式切换为读模式,会触发的对核心属性的操作:- 将position设置为0,即从ByteBuffer起始位置开始读
    • 将limit设置为写模式下position的值,即最大可读取的数据量大小。 |
      | clear() | 在逻辑上清空ByteBuffer里的数据,实际上不清空数据会触发的动作:- 将limit设置为capacity
    • position指向起始位置0
    • 注意:实际上数据并未清理,只是下次是从0的位置开始写入数据,效果上像是数据清空了。如果ByteBuffer中的数据并未完全读完,调用这个方法将忽略那些未读取的数据。 |
      | mark() | 标记当前position位置【结合reset()使用】 |
      | reset() | 将position指向上一次mark()所指向的位置,可以从这个位置重复向下读取数据 |
      | compact() | 如果并未读取完ByteBuffer中的数据,调用compact()会将position~limit之间的数据拷贝到ByteBuffer的起始处,并且position为剩余数据量的大小,下次再往ByteBuffer中写入数据时,将在position位置继续往下写,不会覆盖历史数据。 |
      | hasRemaining() | 判断缓冲区中是否还有未读数据 |
      | 写入ByteBuffer | byteBuffer.put(x)channel.read(byteBuffer)- 从通道上读取数据,写入缓冲区 |
      | 读取ByteBuffer | byteBuffer.get()channel.write(bytebuffer)- 从缓冲区中读取数据,写入通道 |

    4.代码示例说明

    0.maven配置

    测试时需要创建一个maven项目,增加配置信息

    
            
                io.netty
                netty-all
                4.1.39.Final
            
            
                org.projectlombok
                lombok
                1.16.18
            
            
                com.google.code.gson
                gson
                2.8.5
            
            
                com.google.guava
                guava
                19.0
            
            
                ch.qos.logback
                logback-classic
                1.2.3
            
            
                com.google.protobuf
                protobuf-java
                3.11.3
            
        
    
    1.缓冲区初始化
    public static void main(String[] args) {
            // 1.創建ByteBuffer缓冲区,使用allocate()初始化操作,设定大小为10
            ByteBuffer byteBuffer = ByteBuffer.allocate(10);
            // 2.向缓冲区写入一个数字a,占一个字节       
            byteBuffer.put((byte) 0x61);
            // 3.打印结果
            debugAll(byteBuffer);
        }
    

    只进行了写入的操作,目前position的位置是处于写入数据的当前位置1,limit的位置是出于初始化容量大小的位置

    2.position、limit、flip()、clear()、mark()和reset()、compact()、hasRemaining() 方法详解

    在Bytebuffer流程中有一个读取数据,使用get()方法会使得position位置后移,limit位置不会发生变化

     public static void main(String[] args) {
            try {
                // 1.創建ByteBuffer缓冲区,使用allocate()初始化操作,设定大小为10
                ByteBuffer byteBuffer = ByteBuffer.allocate(10);
                // 2.向缓冲区写入一个数字a,占一个字节
                byteBuffer.put((byte) 0x61);
                byteBuffer.put((byte) 0x62);
                byteBuffer.put((byte) 0x63);
                byteBuffer.put((byte) 0x64);
                // 3.打印结果
                debugAll(byteBuffer);
    
                // 4.切换为读取操作
                byteBuffer.flip();
                // 5.读取数据
                while(byteBuffer.hasRemaining()){ // 6. 判断是否还有剩余未读取的数据
                    byte b = byteBuffer.get();
                    System.out.println("对应数据是:"+(char)b);
                }
                debugAll(byteBuffer);
    
                // 7. 数据切换为写模式,clear()方式
                byteBuffer.clear();
                debugAll(byteBuffer);
    
                // 8.写入数据到缓冲区,使用channel方法,然后重复切换操作
                FileChannel channel = new FileInputStream("data.txt").getChannel();
                channel.read(byteBuffer);
                byteBuffer.flip();
                while(byteBuffer.hasRemaining()){ // 9. 判断是否还有剩余未读取的数据
                    byte b1 = byteBuffer.get();
                    System.out.println("新的对应数据是:"+(char)b1);
                    debugAll(byteBuffer);
                }
                debugAll(byteBuffer);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    

    从图中可知,在向缓冲区塞入数据且未开始切换为读模式时,从第一步到第七步,position和limit的位置分别从开始未读取的4和10变化为读取后的4和4,再次切换为写入数据,直接将position和limit的位置切换为0和10,既覆盖模式,从零开始,将上一次的数据覆盖掉。

    为了避免出现未读完数据被覆盖的情况,使用compact()方法可以直接继续上一次的数据后面继续写入,如图所示,未读完的数据bcd被移至前面,然后继续写入数据

     public static void main(String[] args) {
            try {
                // 1.創建ByteBuffer缓冲区,使用allocate()初始化操作,设定大小为10
                ByteBuffer byteBuffer = ByteBuffer.allocate(10);
                // 2.向缓冲区写入一个数字a,占一个字节
                byteBuffer.put((byte) 0x61);
                byteBuffer.put((byte) 0x62);
                byteBuffer.put((byte) 0x63);
                byteBuffer.put((byte) 0x64);
                // 3.打印结果
                debugAll(byteBuffer);
    
                // 4.切换为读取操作
                byteBuffer.flip();
                // 5.0 单次读取数据
                System.out.println("单次读取数据:"+byteBuffer.get());
                debugAll(byteBuffer);
                // 6.避免数据出现覆盖的情况,使用compact()方法操作
                byteBuffer.compact();
                byteBuffer.put((byte) 0x66);
                byteBuffer.put((byte) 0x67);
                byteBuffer.put((byte) 0x68);
                debugAll(byteBuffer);
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    

    mark和reset()方法联合使用的方法示例

    public static void main(String[] args) {
            try {
                // 1.創建ByteBuffer缓冲区,使用allocate()初始化操作,设定大小为10
                ByteBuffer byteBuffer = ByteBuffer.allocate(10);
                // 2.向缓冲区写入一个数字a,占一个字节
                byteBuffer.put((byte) 0x61);
                byteBuffer.put((byte) 0x62);
                byteBuffer.put((byte) 0x63);
                byteBuffer.put((byte) 0x64);
                // 3.打印结果
                debugAll(byteBuffer);
                // 4.切换为读取操作
                byteBuffer.flip();
                // 5.0 单次读取数据
                System.out.println("第一次读取数据:"+byteBuffer.get());
                byteBuffer.mark();
                debugAll(byteBuffer);
                System.out.println("第二次读取数据:"+byteBuffer.get());
                debugAll(byteBuffer);
                byteBuffer.reset();
                debugAll(byteBuffer);
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    

    在字符a的做了标记,当读完b的时候,选择返回到a的位置,从新读取数据,实现了反复读取数据的需求与操作,具体看下图position和limit的位置

    5.数据的粘包与包

    5.1.什么是粘包和半包?

    粘包:在缓冲区有足够大的空间时,且存在多条数据同时存储到缓冲区内,且无按照一定的分割符进行分割,那么在读取数据时,多条数据就会粘连到一起,这种现象就较多粘包。比如分别传入“9000000”、“世界你好”,可能读取的数据就会变为9000000世界你好,

    半包:在缓冲区数据被读取时,无法解析成一句完整的数据会出现乱码等现象,这就是半包。例如,缓冲区是4个字节,我存储“中国”,在读取数据的时候读到的是“中*”,“中国”是6个字节,缓冲区较小存不下,国子无法被正常解析。

    网络上有多条数据发送给服务端,数据之间使用 n 进行分隔
    但由于某种原因这些数据在接收时,被进行了重新组合,例如原始数据有3条为
    
    Hello,worldn
    I'm zhangsann
    How are you?n
    
    变成了下面的两个 byteBuffer (粘包,半包)
    
    Hello,worldnI'm zhangsannHo
    w are you?n
    
    5.2.粘包和半包如何解决?

    通过对"1.0粘包、半包示例"解决来学习如何解决对应的粘包以及半包,用以下代码来解析内容

      public static void main(String[] args) {
    //      1.创建对应的数据
            ByteBuffer byteBuffer = ByteBuffer.allocate(32);
            byteBuffer.put("Hello,worldnI'm zhangsannHo".getBytes());
            splitbyteBuffer(byteBuffer);
    
            byteBuffer.put("w are you?n".getBytes());
            splitbyteBuffer(byteBuffer);
    
    
    
        }
        public static void splitbyteBuffer(ByteBuffer source){
    //      1.先写模式切换,既读模式和写模式
            source.flip();
    //      2.遍历数据
            for (int i = 0;i < source.limit();i++){
    //          3。找到完整的一条数据
                if (source.get(i) == 'n') {
    //               4. 根据数据长度开辟缓冲区空间大小
                    int length = i + 1 - source.position();
                    ByteBuffer byteBufferCatch = ByteBuffer.allocate(length);
    //              5.遍历获取的数据,写入到新的缓冲区中
                    for (int j = 0;j < length;j++){
                        byteBufferCatch.put(source.get());
                    }
    //                6.展示完整数据,既就是通过读物的数据
                    debugAll(byteBufferCatch);
                }
            }
            source.compact();
        }
    

    分散集中写的方式就不继续介绍,可以在网络上 查询相应的一些资料。

    小结

    ByteBuffer在编写过程中不仅仅只有以上的单个方法的使用,可能会涉及到多个场景的应用,及多个数据场景结合使用,因此在使用的时候,需要根据具体业务具体操作。

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论