【Netty「源码解析」(三)设置连接超时:深入分析 ChannelFuture.sync() 的执行过程

2023年 7月 13日 49.0k 0

前言

本篇博文是《从0到1学习 Netty》中源码系列的第三篇博文,主要内容是深入分析连接超时的实现原理,包括了 connect 方法的源码解析和 ChannelFuture.sync() 执行过程的解析。,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;

介绍

在实际应用中,当客户端尝试连接服务器时,可能会面临多种原因导致连接失败的情况。为了避免无限等待,我们可以在客户端代码中设置一个超时连接时间 CONNECT_TIMEOUT_MILLIS,该时间表示客户端尝试连接服务器的最长时间限制,如果在指定的超时时间内未能成功建立连接,客户端应该主动抛出连接超时的异常。

.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)

上述代码的作用是设置连接超时时间为 1000 毫秒,这个选项用于指定连接建立的最大时间,如果超过该时间仍未建立连接,则会放弃连接尝试。

运行结果:

image.png

然而,当服务器没有启动时,且连接超时时间大于 2 秒钟时,则会抛出连接被拒绝的异常,运行结果如下所示:

image.png

这是 Java 底层的网络异常。

需要完整代码的读者请访问博主的 Github:TestConnectionTimeout;

接下来让我们探索 connect()ChannelFuture.sync() 的执行过程。

connect 源码解析

我们先来探究成功执行连接超时所进行的过程,核心方法 connect() 的部分源码如下所示:

@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    ...
    try {
        ...
        // Schedule connect timeout.
        int connectTimeoutMillis = config().getConnectTimeoutMillis();
        if (connectTimeoutMillis > 0) {
            connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                @Override
                public void run() {
                    ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                    ConnectTimeoutException cause =
                            new ConnectTimeoutException("connection timed out: " + remoteAddress);
                    if (connectPromise != null && connectPromise.tryFailure(cause)) {
                        close(voidPromise());
                    }
                }
            }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
        }
        ...
    } catch (Throwable t) {
        promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();
    }
}

上述代码的主要内容是根据配置获取连接超时时间,并使用事件循环调度一个定时任务,在指定的时间内检查连接是否超时。如果连接超时,会触发一个 ConnectTimeoutException 异常,并尝试向 connectPromise 发送连接超时的失败信息;否则,连接超时任务被取消,通道关闭。

那主线程是如何知道消息的呢?其实是通过 connectPromise 进行传递消息,我们可以在主线程中标记一下 future,如下图所示:

image.png

然后切换至 NIO 线程,可以发现 connectPromise 也被标记了,说明他们共属于一个主体,如下图所示:

image.png

如果不是很了解 FuturePromise 之间的联系的话,可以阅读博主的另一篇文章:异步编程模型:利用 Future 和 Promise 提高性能与响应能力;

在上述事例中,我们设置了两秒钟的连接超时时间,由于两秒钟内客户端并没有与服务器建立连接,因此触发了定时任务,执行了 run() 方法,抛出了连接超时异常 ConnectTimeoutException

ChannelFuture.sync() 执行过程解析

下面是 ChannelFuture.sync() 方法的执行过程:

  • 调用 ChannelFuture.sync() 方法将当前线程阻塞,直到对应的操作完成或发生异常。

  • sync() 方法内部,会获取当前线程绑定的 EventLoop 对象,然后将当前任务包装成一个特殊的 Promise 对象。

  • Promise 对象会被注册到 EventLoop 中的任务队列中,等待执行。EventLoop 会按顺序从任务队列中取出任务并执行。

  • 一旦 Promise 执行完成,即异步操作完成或发生异常,sync() 方法会解除当前线程的阻塞状态,并返回操作的结果或抛出异常。

  • 操作成功完成,可以通过 ChannelFuture.isSuccess() 方法检查操作是否成功。如果成功,可以继续执行后续的操作;如果失败,可以通过 ChannelFuture.cause() 方法获取失败的原因。

  • sync 执行过程.png

    需要注意的是,由于 ChannelFuture.sync() 是一个同步阻塞方法,如果在事件循环线程中调用该方法,可能会导致死锁或性能问题。因此,通常建议在其他线程中使用 ChannelFuture.addListener() 方法注册监听器来处理异步操作的结果,而不是直接使用 sync() 方法。

    sync 源码解析

    首先使用 super.sync() 调用了父类的 sync() 方法,将当前对象作为结果返回。

    @Override  
    public ChannelPromise sync() throws InterruptedException {  
        super.sync();  
        return this;  
    }
    

    上述代码的目的是在执行特定的同步操作后,返回当前的 ChannelPromise 对象。在这种情况下,子类通过调用父类的 sync() 方法来实现同步操作,并在执行完成后返回当前对象,以便支持链式调用或其他需要获取该对象的操作。

    然后在父类的 sync() 方法中,调用 await()rethrowIfFailed() 来实现同步等待和异常检查,并返回当前对象。

    @Override  
    public Promise sync() throws InterruptedException {  
        await();  
        rethrowIfFailed();  
        return this;  
    }
    

    在之后的几个方法中,就不对子类做过多的介绍了。

    await 源码解析

    await 方法是一种等待机制的实现,它通过检查承诺是否已完成,处理中断异常以及使用同步块和等待机制来让线程等待承诺的完成。

    @Override  
    public Promise await() throws InterruptedException {  
        if (isDone()) {  
            return this;  
        }  
        // 处理线程中断
        if (Thread.interrupted()) {  
            throw new InterruptedException(toString());  
        }  
        // 检查死锁
        checkDeadLock();  
    
        synchronized (this) {  
            while (!isDone()) {  
                incWaiters();  
                try {  
                    wait();  
                } finally {  
                    decWaiters();  
                }  
            }  
        }  
        return this;  
    }
    

    在上述代码中,如果 isDone() 方法返回 true,说明该承诺已经完成,直接返回当前对象。

    Thread.interrupted() 用于检查当前线程是否被中断,如果是,则抛出 InterruptedException 异常,并将当前对象的字符串表示作为异常消息。

    checkDeadLock() 方法用于检查是否存在死锁情况。

    对于 synchronized (this) {...} 代码块,使用当前对象作为同步锁,确保在多线程环境下只有一个线程可以进入代码块。其中,该代码块核心为当承诺未完成时,一直执行循环。

    在循环内部,调用 incWaiters() 方法增加等待中的线程计数器。同时,调用 wait() 方法,使当前线程进入等待状态,直到其他线程调用该对象的 notify()notifyAll() 方法唤醒。但无论如何,最终都会执行 decWaiters() 方法来减少等待中的线程计数器。

    接下来,我们看看 isDone() 方法的具体实现。

    isDone 源码解析

    private static boolean isDone0(Object result) {  
        return result != null && result != UNCANCELLABLE;  
    }
    

    上述代码主要作用是判断给定的 result 是否满足完成的条件。

    后记

    我们深入分析了 ChannelFuture.sync() 方法的执行过程,通过对 connect 源码的解析,我们了解到它在超时连接设置中的作用。而在 ChannelFuture.sync() 的执行过程中,我们进一步解析了 syncawaitisDone 的源码。

    这些源码解析的过程帮助我们更好地理解了 ChannelFuture.sync() 方法的执行流程,并且使我们能够更好地降低意外情况的发生率,并提高系统的稳定性和可靠性。

    以上就是 设置连接超时:深入分析 ChannelFuture.sync() 的执行过程 的所有内容了,希望本篇博文对大家有所帮助!

    参考:

    • Netty API reference;
    • 黑马程序员Netty全套教程 ;

    📝 上篇精讲:「项目实战」(三)序列化算法选型对聊天室可扩展性的影响

    💖 我是 𝓼𝓲𝓭𝓲𝓸𝓽,期待你的关注,创作不易,请多多支持;

    👍 公众号:sidiot的技术驿站;

    🔥 系列专栏:探索 Netty:源码解析与应用案例分享

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论