前言
本篇博文是《从0到1学习 Netty》中源码系列的第三篇博文,主要内容是深入分析连接超时的实现原理,包括了 connect
方法的源码解析和 ChannelFuture.sync()
执行过程的解析。,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;
介绍
在实际应用中,当客户端尝试连接服务器时,可能会面临多种原因导致连接失败的情况。为了避免无限等待,我们可以在客户端代码中设置一个超时连接时间 CONNECT_TIMEOUT_MILLIS
,该时间表示客户端尝试连接服务器的最长时间限制,如果在指定的超时时间内未能成功建立连接,客户端应该主动抛出连接超时的异常。
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)
上述代码的作用是设置连接超时时间为 1000 毫秒,这个选项用于指定连接建立的最大时间,如果超过该时间仍未建立连接,则会放弃连接尝试。
运行结果:
然而,当服务器没有启动时,且连接超时时间大于 2 秒钟时,则会抛出连接被拒绝的异常,运行结果如下所示:
这是 Java 底层的网络异常。
需要完整代码的读者请访问博主的 Github:TestConnectionTimeout;
接下来让我们探索 connect()
和 ChannelFuture.sync()
的执行过程。
connect 源码解析
我们先来探究成功执行连接超时所进行的过程,核心方法 connect()
的部分源码如下所示:
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
...
try {
...
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
...
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
上述代码的主要内容是根据配置获取连接超时时间,并使用事件循环调度一个定时任务,在指定的时间内检查连接是否超时。如果连接超时,会触发一个 ConnectTimeoutException
异常,并尝试向 connectPromise
发送连接超时的失败信息;否则,连接超时任务被取消,通道关闭。
那主线程是如何知道消息的呢?其实是通过 connectPromise
进行传递消息,我们可以在主线程中标记一下 future
,如下图所示:
然后切换至 NIO 线程,可以发现 connectPromise
也被标记了,说明他们共属于一个主体,如下图所示:
如果不是很了解 Future
和 Promise
之间的联系的话,可以阅读博主的另一篇文章:异步编程模型:利用 Future 和 Promise 提高性能与响应能力;
在上述事例中,我们设置了两秒钟的连接超时时间,由于两秒钟内客户端并没有与服务器建立连接,因此触发了定时任务,执行了 run()
方法,抛出了连接超时异常 ConnectTimeoutException
;
ChannelFuture.sync() 执行过程解析
下面是 ChannelFuture.sync()
方法的执行过程:
调用 ChannelFuture.sync()
方法将当前线程阻塞,直到对应的操作完成或发生异常。
在 sync()
方法内部,会获取当前线程绑定的 EventLoop
对象,然后将当前任务包装成一个特殊的 Promise
对象。
Promise
对象会被注册到 EventLoop
中的任务队列中,等待执行。EventLoop
会按顺序从任务队列中取出任务并执行。
一旦 Promise
执行完成,即异步操作完成或发生异常,sync()
方法会解除当前线程的阻塞状态,并返回操作的结果或抛出异常。
操作成功完成,可以通过 ChannelFuture.isSuccess()
方法检查操作是否成功。如果成功,可以继续执行后续的操作;如果失败,可以通过 ChannelFuture.cause()
方法获取失败的原因。
需要注意的是,由于 ChannelFuture.sync()
是一个同步阻塞方法,如果在事件循环线程中调用该方法,可能会导致死锁或性能问题。因此,通常建议在其他线程中使用 ChannelFuture.addListener()
方法注册监听器来处理异步操作的结果,而不是直接使用 sync()
方法。
sync 源码解析
首先使用 super.sync()
调用了父类的 sync()
方法,将当前对象作为结果返回。
@Override
public ChannelPromise sync() throws InterruptedException {
super.sync();
return this;
}
上述代码的目的是在执行特定的同步操作后,返回当前的 ChannelPromise
对象。在这种情况下,子类通过调用父类的 sync()
方法来实现同步操作,并在执行完成后返回当前对象,以便支持链式调用或其他需要获取该对象的操作。
然后在父类的 sync()
方法中,调用 await()
和 rethrowIfFailed()
来实现同步等待和异常检查,并返回当前对象。
@Override
public Promise sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
在之后的几个方法中,就不对子类做过多的介绍了。
await 源码解析
await
方法是一种等待机制的实现,它通过检查承诺是否已完成,处理中断异常以及使用同步块和等待机制来让线程等待承诺的完成。
@Override
public Promise await() throws InterruptedException {
if (isDone()) {
return this;
}
// 处理线程中断
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
// 检查死锁
checkDeadLock();
synchronized (this) {
while (!isDone()) {
incWaiters();
try {
wait();
} finally {
decWaiters();
}
}
}
return this;
}
在上述代码中,如果 isDone()
方法返回 true
,说明该承诺已经完成,直接返回当前对象。
Thread.interrupted()
用于检查当前线程是否被中断,如果是,则抛出 InterruptedException
异常,并将当前对象的字符串表示作为异常消息。
checkDeadLock()
方法用于检查是否存在死锁情况。
对于 synchronized (this) {...}
代码块,使用当前对象作为同步锁,确保在多线程环境下只有一个线程可以进入代码块。其中,该代码块核心为当承诺未完成时,一直执行循环。
在循环内部,调用 incWaiters()
方法增加等待中的线程计数器。同时,调用 wait()
方法,使当前线程进入等待状态,直到其他线程调用该对象的 notify()
或 notifyAll()
方法唤醒。但无论如何,最终都会执行 decWaiters()
方法来减少等待中的线程计数器。
接下来,我们看看 isDone()
方法的具体实现。
isDone 源码解析
private static boolean isDone0(Object result) {
return result != null && result != UNCANCELLABLE;
}
上述代码主要作用是判断给定的 result
是否满足完成的条件。
后记
我们深入分析了 ChannelFuture.sync()
方法的执行过程,通过对 connect
源码的解析,我们了解到它在超时连接设置中的作用。而在 ChannelFuture.sync()
的执行过程中,我们进一步解析了 sync
、await
和 isDone
的源码。
这些源码解析的过程帮助我们更好地理解了 ChannelFuture.sync()
方法的执行流程,并且使我们能够更好地降低意外情况的发生率,并提高系统的稳定性和可靠性。
以上就是 设置连接超时:深入分析 ChannelFuture.sync() 的执行过程 的所有内容了,希望本篇博文对大家有所帮助!
参考:
- Netty API reference;
- 黑马程序员Netty全套教程 ;
📝 上篇精讲:「项目实战」(三)序列化算法选型对聊天室可扩展性的影响
💖 我是 𝓼𝓲𝓭𝓲𝓸𝓽,期待你的关注,创作不易,请多多支持;
👍 公众号:sidiot的技术驿站;
🔥 系列专栏:探索 Netty:源码解析与应用案例分享