Kafka 是如何封装 Selector 多路复用器(中篇)

2023年 7月 10日 45.7k 0

Kafka 是如何封装 Selector 多路复用器(中篇)

来源丨华仔聊技术(ID:gh_97b8de4b5b34)

聊聊 Kafka 是如何封装 Selector 多路复用器的,本系列总共分为3篇,今天是中篇,主要剖析4、5两个问题:

  • 针对 Java NIO 的 SocketChannel,kafka 是如何封装统一的传输层来实现最基础的网络连接以及读写操作的?
  • 剖析 KafkaChannel 是如何对传输层、读写 buffer 操作进行封装的?
  • 剖析工业级 NIO 实战:如何基于位运算来控制事件的监听以及拆包、粘包是如何实现的?
  • 剖析 Kafka 是如何封装 Selector 多路复用器的?
  • 剖析 Kafka 封装的 Selector 是如何初始化并与 Broker 进行连接以及网络读写的?
  • 剖析 Kafka 网络发送消息和接收响应的整个过程是怎样的?
  • 认真读完这篇文章,我相信你会对 Kafka 封装 Java NIO 源码有更加深刻的理解。

    这篇文章干货很多,希望你可以耐心读完。

    Kafka 是如何封装 Selector 多路复用器(中篇)

    01 总体概述

    大家都知道在 Java NIO 有个三剑客,即「SocketChannel通道」、「Buffer读写」、「Selector多路复用器」,上篇已经讲解了前2个角色,今天我们来聊聊最后一个重要的角色。

    Kafka Selector 是对 Java NIO Selector 的二次封装,主要功能如下:

  • 提供网络连接以及读写操作
  • 对准备好的事件进行收集并进行网络操作
  • 为了方便大家理解,所有的源码只保留骨干。

    02 Selector 封装过程

    github 源码地址如下:

    https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/Selector.java

    org.apache.kafka.common.network.Selector,该类是 Kafka 网络层最重要最核心的实现,也是非常经典的工业级通信框架实现,为了简化,这里称为 Kselector, 接下来我们先来看看该类的重要属性字段:

    public class Selector implements Selectable, AutoCloseable {
        // 在 Java NIO 中用来监听网络I/O事件
        private final java.nio.channels.Selector nioSelector;
        // channels 管理
        private final Map channels; 
        // 发送完成的Send集合
        private final List completedSends;
        // 已经接收完毕的请求集合
        private final LinkedHashMap completedReceives;
        // 立即连接的集合
        private final Set immediatelyConnectedKeys;
        // 关闭连接的 channel 集合
        private final Map closingChannels;
        // 断开连接的节点集合
        private final Map disconnected;
        // 连接成功的节点集合
        private final List connected;
        // 发送失败的请求集合
        private final List failedSends;
        // 用来构建 KafkaChannel 的工具类
        private final ChannelBuilder channelBuilder;
        // 最大可以接收的数据量大小
        private final int maxReceiveSize;
        // 空闲超时到期连接管理器
        private final IdleExpiryManager idleExpiryManager;
        // 用来管理 ByteBuffer 的内存池
        private final MemoryPool memoryPool;
        // 初始化 Selector
        public Selector(int maxReceiveSize,
                long connectionMaxIdleMs,
                int failedAuthenticationDelayMs,
                Metrics metrics,
                Time time,
                String metricGrpPrefix,
                Map metricTags,
                boolean metricsPerConnection,
                boolean recordTimePerConnection,
                ChannelBuilder channelBuilder,
                MemoryPool memoryPool,
                LogContext logContext) {
            try {
                this.nioSelector = java.nio.channels.Selector.open();
            } catch (IOException e) {
                throw new KafkaException(e);
            }
            this.maxReceiveSize = maxReceiveSize;
            this.time = time;
            this.channels = new HashMap();
            this.explicitlyMutedChannels = new HashSet();
            this.outOfMemory = false;
            this.completedSends = new ArrayList();
            this.completedReceives = new LinkedHashMap();
            this.immediatelyConnectedKeys = new HashSet();
            this.closingChannels = new HashMap();
            this.keysWithBufferedRead = new HashSet();
            this.connected = new ArrayList();
            this.disconnected = new HashMap();
            this.failedSends = new ArrayList();
            this.log = logContext.logger(Selector.class);
            this.sensors = new SelectorMetrics(metrics, metricGrpPrefix, metricTags, metricsPerConnection);
            this.channelBuilder = channelBuilder;
            this.recordTimePerConnection = recordTimePerConnection;
            this.idleExpiryManager = connectionMaxIdleMs  NO_FAILED_AUTHENTICATION_DELAY) ? new LinkedHashMap() : null;
        }
    }
    

    重要字段如下所示:

  • nioSelector: 在 Java NIO 中用来监听网络I/O事件。
  • channels: 用来进行管理客户端到各个Node节点的网络连接,Map 集合类型
  • completedSends: 已经发送完成的请求对象 Send 集合,List 集合类型。
  • completedReceives: 已经接收完毕的网络请求集合,LinkedHashMap 集合类型 ,其中 value 都是已经接收完毕的 NetworkReceive 类对象。
  • immediatelyConnectedKeys: 立即连接key集合。
  • closingChannels: 关闭连接的 channel 集合。
  • disconnected: 断开连接的集合。Map 集合类型 ,value 是 KafkaChannel 的状态,可以在使用的时候可以通过这个 ChannelState 状态来判断处理逻辑。
  • connected: 成功连接的集合,List 集合类型,存储成功请求的 ChannelId。
  • failedSends: 发送失败的请求集合,List 集合类型, 存储失败请求的 ChannelId。
  • channelBuilder: 用来构建 KafkaChannel 的工具类。
  • maxReceiveSize: 最大可以接收的数据量大小。
  • idleExpiryManager: 空闲超时到期连接管理器。
  • memoryPool: 用来管理 ByteBuffer 的内存池,分配以及回收。
  • 介绍完字段后,我们来看看该类的方法。方法比较多,这里深度剖析下其中几个重要方法,通过学习这些方法的不仅可以复习下 Java NIO 底层组件,另外还可以学到 Kafka 封装这些底层组件的实现思想。

    NetworkClient 的请求一般都是交给 Kselector 去处理并完成的。而 Kselector 使用 NIO 异步非阻塞模式负责具体的连接、读写事件等操作。

    我们先看下连接过程,客户端在和节点连接的时候,会创建和服务端的 SocketChannel 连接通道。Kselector 维护了每个目标节点对应的 KafkaChannel。

    如下图所示:

    Kafka 是如何封装 Selector 多路复用器(中篇)

    02.1 connect()

    @Override
    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
        // 1.先确认是否已经被连接过  
        ensureNotRegistered(id);
        // 2.打开一个 SocketChannel
        SocketChannel socketChannel = SocketChannel.open();
        SelectionKey key = null;
        try {
            // 3.设置 socketChannel 信息 
            configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
            // 4.尝试发起连接
            boolean connected = doConnect(socketChannel, address);
            // 5. 将该 socketChannel 注册到 nioSelector 上,并关注 OP_CONNECT 事件
            key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
            // 6.如果立即连接成功了
            if (connected) {
                ...
                // 先将 key 放入 immediatelyConnectedKeys 集合
                immediatelyConnectedKeys.add(key);
                // 并取消对 OP_CONNECT 的监听
                key.interestOps(0);
            }
        } catch (IOException | RuntimeException e) {
            if (key != null)
                immediatelyConnectedKeys.remove(key);
            channels.remove(id);
            socketChannel.close();
            throw e;
        }
    }
    
    // 设置 socketChannel 信息 
    private void configureSocketChannel(SocketChannel socketChannel, int sendBufferSize, int receiveBufferSize) throws IOException {
        // 1. 设置非阻塞模式
        socketChannel.configureBlocking(false);
        // 2. 创建一个新的 Socket
        Socket socket = socketChannel.socket();
        // 3. 开启长连接 keepalive 探活机制
        socket.setKeepAlive(true);
        // 4. 设置 SocketOptions.SO_SNDBUF,默认12kb
        if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
            socket.setSendBufferSize(sendBufferSize);
        // 5. 设置 SocketOptions.SO_RCVBUF,默认32kb
        if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
            socket.setReceiveBufferSize(receiveBufferSize);
        // 6. 设置 TcpNoDelay 算法,默认为 false 即开启 Nagle 算法,true 为关闭 Nagle 算法
        socket.setTcpNoDelay(true);
    }
    
    // 发起连接
    protected boolean doConnect(SocketChannel channel, InetSocketAddress address) throws IOException {
        try {
            // 调用socketChannel的connect方法进行发起连接,该方法会向远端发起tcp请求
            // 因为是非阻塞的,返回时,连接不一定已经建立好(即完成3次握手)。连接如果已经建立好则返回true,否则返回false。
            //一般来说server和client在一台机器上,该方法可能返回true。在后面会通过 KSelector.finishConnect() 方法确认连接是否真正建立了。
            return channel.connect(address);
        } catch (UnresolvedAddressException e) {
            throw new IOException("Can't resolve address: " + address, e);
        }
    }
    

    该方法主要是用来发起网络连接,连接过程大致分为如下六步:

  • 先确认是否已经被连接过,即是否已经存在于连接成功集合或正在关闭连接的集合里,如果存在说明连接已经存在或者关闭了,就不应再次发起连接。
  • 打开一个 SocketChannel,创建一个连接。
  • 设置 SocketChannel 信息。其中包括设置「非阻塞模式」、「长链接探活机制」、「SocketOptions.SO_SNDBUF 大小」、「SocketOptions.SO_RCVBUF 大小」、「关闭 Nagle 算法」等,其中 SO_SNDBUF、SO_RCVBUF 表示内核发送和接收数据缓存的大小。
  • 尝试发起连接,由于是设置为非阻塞,调用完方法会直接返回,「此时连接不一定已经建立了」。当然也可能立即就连接上了,如果立即连接上返回值为true,没立即连接上返回值为false。
  • 将该 socketChannel 注册到 nioSelector 上,并关注 OP_CONNECT 事件,如果上一步没立即连接上,还需要继续监听 OP_CONNECT 事件,等连接上了再做处理。
  • 如果立即连接成功了,先将 key 放入 immediatelyConnectedKeys 集合,然后取消对 OP_CONNECT 的监听。此时已经连接成功了就没必要在监听 OP_CONNECT 事件了。
  • 这里需要注意下: 因为是非阻塞方式,所以 channel.connect() 发起连接,「可能在正式建立连接前就返回了」,为了确定连接是否建立,需要再调用 「finishConnect」 确认完全连接上了。

    02.2 registerChannel()

    // 将该 socketChannel 注册到 nioSelector 上,并关注感兴趣的事件
    protected SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException {
            // 1. 将该 socketChannel 注册到 nioSelector 上,并设置读事件监听
            SelectionKey key = socketChannel.register(nioSelector, interestedOps);
            // 2. 构建 KafkaChannel,将 key 与 KafkaChannel 做注册绑定
            KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key);
            // 3. 将nodeid,channel 绑定并放入到 channels 集合中
            this.channels.put(id, channel);
            if (idleExpiryManager != null)
                // 4. 更新连接到空闲超时到期连接管理器中,并记录活跃时间
                idleExpiryManager.update(channel.id(), time.nanoseconds());
            return key;
    }
    
    // 构建 KafkaChannel 并关联 key 和 Channel,方便查找
    private KafkaChannel buildAndAttachKafkaChannel(SocketChannel socketChannel, String id, SelectionKey key) throws IOException {
        try {
            // 1. 构建 KafkaChannel
            KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool,
                new SelectorChannelMetadataRegistry());
            // 2. 将 KafkaChannel 注册到 key 上,并做关联,方便查找
            key.attach(channel);
            // 3. 返回建立好的 KafkaChannel
            return channel;
        } catch (Exception e) {
            try {
                socketChannel.close();
            } finally {
                key.cancel();
            }
            throw new IOException("Channel could not be created for socket " + socketChannel, e);
        }
    }
    

    该方法主要用来注册和绑定连接的,过程如下:

  • 将该 socketChannel 注册到 nioSelector 上,并设置读事件监听。
  • 构建 KafkaChannel,以及将 key 与 KafkaChannel 做关联绑定,方便查找,既可以通过 key 找到 channel,也可以通过 channel 找到 key。
  • 讲解完建立连接后,我们来看看消息发送的相关方法。

    KSelector.send() 方法是将之前创建的 RequestSend 对象先缓存到 KafkaChannel 的 send 字段中,并关注此连接的 OP_WRITE 事件,并没有真正发生网络 I/O 操作。会在下次调用 KSelector.poll() 时,才会将 RequestSend 对象发送出去。

    如果此 KafkaChannel 的 send 字段上还保存着一个未完全发送成功的 RequestSend 请求,为了防止覆盖,会抛出异常。每个 KafkaChannel 一次 poll 过程中只能发送一个 Send 请求。

    客户端的请求 Send 会被设置到 KafkaChannel 中,KafkaChannel 的 TransportLayer 会为 SelectionKey 注册 OP_WRITE 事件。

    此时 Channel 的 SelectionKey 就有了 OP_CONNECT、OP_WRITE 事件,在 Kselector 的轮询过程中当发现这些事件准备就绪后,就开始执行真正的操作。

    基本流程就是:

    Kafka 是如何封装 Selector 多路复用器(中篇)

    02.3 send()

    /**
     * 消息预发送
     */
    public void send(Send send) {
        // 1. 从服务端获取 connectionId
        String connectionId = send.destination();
        // 2. 从数据包中获取对应连接
        KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
        // 3. 如果关闭连接集合中存在该连接
        if (closingChannels.containsKey(connectionId)) {
            // 把 connectionId 放入 failedSends 集合里
            this.failedSends.add(connectionId);
        } else {
            try {
                // 4. 暂存数据预发送,并没有真正的发送,一次只能发送一个
                channel.setSend(send);
            } catch (Exception e) {
                // 5. 更新 KafkaChannel 的状态为发送失败  
                channel.state(ChannelState.FAILED_SEND);
                // 6. 把 connectionId 放入 failedSends 集合里
                this.failedSends.add(connectionId);
                // 7. 关闭连接
                close(channel, CloseMode.DISCARD_NO_NOTIFY);
                ...
            }
        }
    }
    
    // 判断 channelid 是否存在
    private KafkaChannel openOrClosingChannelOrFail(String id) {
        // 通过 channelid 先从 channels 集合中获取
        KafkaChannel channel = this.channels.get(id);
        // 如果为空那么再从 closingChannels 集合中获取
        if (channel == null)
            channel = this.closingChannels.get(id);
        // 如果还为空则抛异常    
        if (channel == null)
            throw new IllegalStateException("Attempt to retrieve channel for which there is no connection. Connection id " + id + " existing connections " + channels.keySet());
        return channel;
    }
    

    该方法主要用来消息预发送,即在发送的时候把消息线暂存在 KafkaChannel 的 send 字段里,然后等着 poll() 执行真正的发送,过程如下:

  • 从服务端获取 connectionId。
  • 从 channels 或 closingChannels 集合中找对应的 KafkaChannel,如果都为空就抛异常。
  • 如果关闭连接 closingChannels 集合中存在该连接,说明连接还没有被建立,则把连接放到发送失败 failedSends 的集合中。
  • 否则即是连接建立成功,「就把要发送的数据先保存在 send 字段里暂存起来,等待后续 poll() 去调用真正的发送」。
  • 如果暂存异常后,则更新 KafkaChannel 的状态为发送失败。
  • 把 connectionId 放入 failedSends 集合里。
  • 最后关闭连接。
  • 讲完消息预发送,接下来我们来看看最核心的 poll 和 pollSelectionKeys 方法。

    在 Kselector 的轮询中可以操作连接事件、读写事件等,是真正执行网络I/O事件操作的地方,它会调用 nioSelector.select() 方法等待 I/O 事件就绪。

    当 Channel 可写时,发送 KafkaChannel.send 字段,「一次最多只发送一个 RequestSend,有时候一个 RequestSend 也发送不完,需要多次 poll 才能发送完成」。

    当 Channel 可读时,读取数据到 KafkaChannel.receive,「当读取一个完整的 NetworkReceive ,并在一次 pollSelectionKeys() 完成后会将 NetworkReceive 中的数据转移到 completedReceives 集合中」。

    最后调用 maybeCloseOldestConnection() 方法,根据 lruConnections 记录,设置 channel 状态为过期,并关闭长期空闲的连接。

    02.4 poll()

    @Override
    public void poll(long timeout) throws IOException {
        ...
        // 1. 先将上次的结果清理掉
        clear();
        boolean dataInBuffers = !keysWithBufferedRead.isEmpty();
        ...
        /* check ready keys */
        long startSelect = time.nanoseconds();
        // 2. 调用nioSelector.select线程阻塞等待I/O事件并设置阻塞时间,等待I/O事件就绪发生,然后返回已经监控到了多少准备就绪的事件
        int numReadyKeys = select(timeout);
        long endSelect = time.nanoseconds();
        // 记录耗时
        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
        // 3. 监听到事件发生或立即连接集合不为空或存在缓存数据
        if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
            // 4. 获取监听到的准备就绪事件集合 
            Set readyKeys = this.nioSelector.selectedKeys();
            // 在SSL连接才可能会存在缓存数据
            if (dataInBuffers) {
                // 清除所有的就绪事件
                keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
                Set toPoll = keysWithBufferedRead;
                // 重新初始化
                keysWithBufferedRead = new HashSet(); //poll() calls will repopulate if needed
                // 处理事件
                pollSelectionKeys(toPoll, false, endSelect);
            }
            // 5. 处理监听到的准备就绪事件
            pollSelectionKeys(readyKeys, false, endSelect);
            // 6. 就绪事件集合清理
            // Clear all selected keys so that they are included in the ready count for the next select
            readyKeys.clear();
            // 7. 处理立即连接集合
            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
            // 8. 立即连接集合清理
            immediatelyConnectedKeys.clear();
            ...
            maybeCloseOldestConnection(endSelect);
        } else {
            ...
        }
        ...
    }
    
    // 调用nioselector.select进行阻塞监听就绪事件
    private int select(long timeoutMs) throws IOException {
      if (timeoutMs  0 || send != null) {
            long currentTimeMs = time.milliseconds();
            if (bytesSent > 0)
                // 记录发送字节 Metrics 信息
                this.sensors.recordBytesSent(nodeId, bytesSent, currentTimeMs);
            // 发送完成
            if (send != null) {
                // 将 send 添加到 completedSends
                this.completedSends.add(send);
                //  记录发送完成 Metrics 信息
                this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs);
            }
        }
    }
    

    该方法用来真正执行写操作,数据就是上面send()方法被填充的send字段。具体过程如下:

  • 获取 channel 对应的节点id。
  • 将保存在 send 上的数据真正发送出去,但是「一次不一定能发送完」,会返回已经发出的字节数。
  • 判断是否发送完成
    • 如果未发送完成返回 null,「等待下次 poll 继续发送」,并继续关注这个 channel 的写事件。
    • 如果发送完成,则返回 send,并取消对写事件的关注。
  • 发送完成,将 send 添加到 completedSends 集合中。
  • 接下来我们来看看读操作过程。

    02.8 attemptRead()

    private void attemptRead(KafkaChannel channel) throws IOException {
        // 获取 channel 对应的节点 id
        String nodeId = channel.id();
        // 将从传输层中读取数据到NetworkReceive对象中
        long bytesReceived = channel.read();
        if (bytesReceived != 0) {
            ...
            // 判断 NetworkReceive 对象是否已经读完了
            NetworkReceive receive = channel.maybeCompleteReceive();
            // 当读完后把这个 NetworkReceive 对象添加到已经接收完毕网络请求集合里
            if (receive != null) {
                addToCompletedReceives(channel, receive, currentTimeMs);
            }
        }
        ...
    }
    
    // KafkaChannel 方法
    public long read() throws IOException {
        if (receive == null) {
            // 初始化 NetworkReceive 对象
            receive = new NetworkReceive(maxReceiveSize, id, memoryPool);
        }
        // 尝试把 channel 的数据读到 NetworkReceive 对象中
        long bytesReceived = receive(this.receive);
        ...
        return bytesReceived;
    }
    
    /**
     * adds a receive to completed receives
     */
    private void addToCompletedReceives(KafkaChannel channel, NetworkReceive networkReceive, long currentTimeMs) {
        if (hasCompletedReceive(channel))
            throw new IllegalStateException("Attempting to add second completed receive to channel " + channel.id());
        // 将 networkReceive 添加到已经接收完毕网络请求集合里 
        this.completedReceives.put(channel.id(), networkReceive);
        ...
    }
    

    该方法主要用来尝试读取数据并添加已经接收完毕的集合中。

  • 先从 channel 中获取节点id。
  • 然后调用 channel.read() 方法从传输层中读取数据到 NetworkReceive 对象中。
  • 判断本次是否已经读完了即填满了 NetworkReceive 对象,如果没有读完,那么下次触发读事件的时候继续读取填充,如果读取完成后,则将其置为空,下次触发读事件时则创建新的  NetworkReceive 对象。
  • 当读完后把这个 NetworkReceive 对象添加到已经接收完毕网络请求集合里。
  • 接下来我们看看几个其他比较简单的方法。

    02.9 completedSends()

    @Override
    public List completedSends() {
        return this.completedSends;
    }
    

    该方法主要用来返回发送完成的Send集合数据。

    02.10 completedReceives()

    @Override
    public Collection completedReceives() {
        return this.completedReceives.values();
    }
    

    该方法主要用来返回已经接收完毕的请求集合数据。

    02.11 disconnected()

    @Override
    public Map disconnected() {
        return this.disconnected;
    }
    

    该方法主要用来返回断开连接的 broker 集合数据。

    02.12 connected()

    @Override
    public List connected() {
        return this.connected;
    }
    

    该方法主要用来返回连接成功的 broker 集合数据。

    02.13 isChannelReady()

    /**
     * check if channel is ready
     */
    @Override
    public boolean isChannelReady(String id) {
        // 从 Channels 集合中获取该id对应的 channel 
        KafkaChannel channel = this.channels.get(id);
        // 然后 channel 不为空 则判断是否准备好
        return channel != null && channel.ready();
    }
    
    // KafkaChannel 类方法
    public boolean ready() {
        // 判断传输层是否准备好,默认是 PlaintextTransportLayer
        return transportLayer.ready() && authenticator.complete();
    }
    

    该方法主要用来判断对应的 Channel 是否准备好,参数是 channel id。

    02.14 addToCompletedReceives()

    /**
     * adds a receive to completed receives
     */
    private void addToCompletedReceives(KafkaChannel channel, NetworkReceive networkReceive, long currentTimeMs) {
        if (hasCompletedReceive(channel))
            throw new IllegalStateException("Attempting to add second completed receive to channel " + channel.id());
        // 将 channel id 添加到已经接收完毕的网络请求集合中
        this.completedReceives.put(channel.id(), networkReceive);
        sensors.recordCompletedReceive(channel.id(), networkReceive.size(), currentTimeMs);
    }
    
    /**
     * Check if given channel has a completed receive
     */
    private boolean hasCompletedReceive(KafkaChannel channel) {
        // 判断已经接收完毕的网络集合中是否存在该 channel id
        return completedReceives.containsKey(channel.id());
    }
    

    该方法主要用来将某个 channel 添加到已经接收完毕的网络请求集合中。

  • 先判断该 Channel 对应的 id 是否已经存在于已经接收完毕的网络请求集合中。
  • 如果不存在的话再将该 Channel id 添加到已经存在于已经接收完毕的网络请求集合中。
  • 记录 Metrics 信息。
  • 03 空闲超时到期连接管理器

    为什么会有这个管理器,大家都知道对于 TCP 大量连接或者重连是会对 Kafka 造成性能影响的,而 Kafka 客户端又不能同时连接过多的节点。因此设计这样一个 LRU 算法,每隔9分钟就删除一个空闲过期的连接,以保证已有连接的有效。

    private static class IdleExpiryManager {
        // lru 连接集合
        private final Map lruConnections;
        // 连接最大的空闲时间 默认9分钟
        private final long connectionsMaxIdleNanos;
        private long nextIdleCloseCheckTime;
        // 初始化管理器
        public IdleExpiryManager(Time time, long connectionsMaxIdleMs) {
            this.connectionsMaxIdleNanos = connectionsMaxIdleMs * 1000 * 1000;
            // initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true
            // 初始化lru连接集合,设置初始容量,扩容因子,是否排序
            this.lruConnections = new LinkedHashMap(16, .75F, true);
            this.nextIdleCloseCheckTime = time.nanoseconds() + this.connectionsMaxIdleNanos;
        }
        
        // 更新活跃时间
        public void update(String connectionId, long currentTimeNanos) {
            lruConnections.put(connectionId, currentTimeNanos);
        }
        ...
        // 删除连接
        public void remove(String connectionId) {
            lruConnections.remove(connectionId);
        }
    }
    

    该类通过「LinkedHashMap 结构来实现一个 lru 连接集合」,最核心的方法就是 update() 来更新链接的活跃时间,remove() 来删除连接。

    主要用在以下3个地方:

  • 在将 channel 注册到 nioSelector 的时候,即调用 registerChannel() 会第一次设置连接的活跃时间。
  • 在调用 pollSelectionKeys() 检查到准备就绪的网络事件时,更新连接对应的活跃时间。
  • 在调用 close() 关闭连接的时候会从 lru 连接集合中删除该连接。
  • 04 网络连接的全流程

    网络连接总共分为以下两个阶段:

  • 连接的初始化。
  • 完成连接。
  • Kafka 是如何封装 Selector 多路复用器(中篇)

    05 总结

    这里,我们一起来总结一下这篇文章的重点。

    1、带你先整体的梳理了 Kafka 对 Java NIO 三剑客中的 Selector 的功能介绍。

    2、又带你剖析了 Selector 的重要方法和具体的操作过程。

    3、介绍空闲超时到期连接管理器是什么,有什么作用?

    4、最后带你梳理了网络连接的全流程。

    相关文章

    服务器端口转发,带你了解服务器端口转发
    服务器开放端口,服务器开放端口的步骤
    产品推荐:7月受欢迎AI容器镜像来了,有Qwen系列大模型镜像
    如何使用 WinGet 下载 Microsoft Store 应用
    百度搜索:蓝易云 – 熟悉ubuntu apt-get命令详解
    百度搜索:蓝易云 – 域名解析成功但ping不通解决方案

    发布评论