为什么需要 Tcp Connect Failed Retry ?
K8S 多副本环境下,如果服务消费方使用 http client 访问不健康的 pod(服务提供方),http client 会抛出异常,一般这个异常会带回到前端页面,显示【系统繁忙,请稍候重试】或者【网络异常,请稍候重试】等信息。
K8S 集群内部,一般使用 Service 进行 Pod 间通信。如果服务消费方使用 http client 进行 tcp 握手失败后,可以继续使用 Service 地址进行重试,因为 Service 底层由 Endpoint 组件实现,自带负载均衡效果,重试就有机会将 tcp 连接打到健康的 pod 上去。
为什么只做 Tcp Connect Failed Retry,不做 http read(write) timeout retry ?如果要做 http read timeout retry,需要保证服务提供方的接口均为幂等接口,这个成本比较高,折中,只做 Tcp Connect Failed Retry,剩下的 read timeout 异常,往前端抛。
Feign Client Tcp 连接失败后进行 Retry
Feign Client 提供 Retryer 接口,并提供了两个默认实现:
/**
* Cloned for each invocation to {@link Client#execute(Request, feign.Request.Options)}.
* Implementations may keep state to determine if retry operations should continue or not.
*/
public interface Retryer extends Cloneable {
/**
* if retry is permitted, return (possibly after sleeping). Otherwise propagate the exception.
*/
void continueOrPropagate(RetryableException e);
Retryer clone();
class Default implements Retryer {
private final int maxAttempts;
private final long period;
private final long maxPeriod;
int attempt;
long sleptForMillis;
public Default() {
this(100, SECONDS.toMillis(1), 5);
}
public Default(long period, long maxPeriod, int maxAttempts) {
this.period = period;
this.maxPeriod = maxPeriod;
this.maxAttempts = maxAttempts;
this.attempt = 1;
}
// visible for testing;
protected long currentTimeMillis() {
return System.currentTimeMillis();
}
public void continueOrPropagate(RetryableException e) {
if (attempt++ >= maxAttempts) {
throw e;
}
long interval;
if (e.retryAfter() != null) {
interval = e.retryAfter().getTime() - currentTimeMillis();
if (interval > maxPeriod) {
interval = maxPeriod;
}
if (interval maxPeriod ? maxPeriod : interval;
}
@Override
public Retryer clone() {
return new Default(period, maxPeriod, maxAttempts);
}
}
/**
* Implementation that never retries request. It propagates the RetryableException.
*/
Retryer NEVER_RETRY = new Retryer() {
@Override
public void continueOrPropagate(RetryableException e) {
throw e;
}
@Override
public Retryer clone() {
return this;
}
};
}
SpringBoot OpenFeign 自动装配默认使用 NEVER_RETRY 实现,如果想要切换为 Default 实现,可以在 yml 中配置(内部类使用 $ 分隔符),也可使用注入 Bean 的方式进行配置。
feign:
client:
config:
default:
connectTimeout: 1000
readTimeout: 5000
retryer: feign.Retryer$Default
Default 是一个很好的实现范例,但是不满足我们的要求,该实现发生任何异常均会进行重试,而我们只想在 tcp 握手失败时进行重试。
经过实验,发现 tcp 握手失败有两种情况:
注意:如果抛出 SocketTimeoutException,异常消息为 Read time out,说明发生了 http read time out,不能进行重试。
ConnectFailedRetryer 使用了装饰器模式,内部有一个成员变量 Retryer retryer 执行真正的 retry 操作,在 ConnectFailedRetryer#continueOrPropagate() 方法中,仅仅只是针对指定的异常,调用 retryer#continueOrPropagate() 方法执行 retry,其他情况则直接抛出异常。
@Slf4j
public class ConnectFailedRetryer implements Retryer {
private final Retryer retryer;
public ConnectFailedRetryer() {
this(100, SECONDS.toMillis(1), 5);
}
public ConnectFailedRetryer(long period, long maxPeriod, int maxAttempts) {
this(new Default(period, maxPeriod, maxAttempts));
}
public ConnectFailedRetryer(Retryer retryer) {
this.retryer = retryer;
}
@Override
public void continueOrPropagate(RetryableException e) {
Request request = e.request();
String url = request.url();
Request.HttpMethod httpMethod = request.httpMethod();
Throwable throwable = e.getCause();
if (throwable instanceof ConnectException) {
ConnectException connectException = (ConnectException) throwable;
log.warn("url: {}, httpMethod: {}, connectException: {}", url, httpMethod, connectException.getMessage());
retryer.continueOrPropagate(e);
} else if (throwable instanceof SocketTimeoutException) {
SocketTimeoutException socketTimeoutException = (SocketTimeoutException) throwable;
String timeoutExceptionMessage = socketTimeoutException.getMessage();
log.warn("url: {}, httpMethod: {}, socketTimeoutException: {}", url, httpMethod, timeoutExceptionMessage);
if (StrUtil.equals(timeoutExceptionMessage, "connect timed out")) {
retryer.continueOrPropagate(e);
} else {
throw e;
}
} else {
throw e;
}
}
@Override
public Retryer clone() {
Retryer cloneRetryer = retryer.clone();
return new ConnectFailedRetryer(cloneRetryer);
}
}
在 yml 中配置自定义的 ConnectFailedRetryer:
feign:
client:
config:
default:
connectTimeout: 1000
readTimeout: 5000
retryer: com.oneby.common.feign.ConnectFailedRetryer
OkHttp Client Tcp 连接失败后进行 Retry
OkHttp 需要在 Interceptor#intercept() 方法中实现自定义 retry 逻辑。
@Slf4j
public class ConnectFailedRetryInterceptor implements Interceptor {
private final static int MAX_RETRY = 5;
@Override
public Response intercept(Chain chain) {
Request request = chain.request();
return RetryUtils.retryWithCondition(() -> chain.proceed(request), response -> false, throwable -> {
if (throwable instanceof ConnectException) {
ConnectException connectException = (ConnectException) throwable;
log.warn("url: {}, httpMethod: {}, connectException: {}", request.url(), request.method(),
connectException.getMessage());
return true;
} else if (throwable instanceof SocketTimeoutException) {
SocketTimeoutException socketTimeoutException = (SocketTimeoutException) throwable;
String timeoutExceptionMessage = socketTimeoutException.getMessage();
log.warn("url: {}, httpMethod: {}, connectException: {}", request.url(), request.method(),
timeoutExceptionMessage);
return StrUtil.equals(timeoutExceptionMessage, "connect timed out");
}
return false;
}, MAX_RETRY);
}
}
这里简单封装了一个 RetryUtils 工具类,成熟的解决方案可以使用 spring retry 或者 guava retry。
@Slf4j
public abstract class RetryUtils {
private static final int RETRY_DELAY = 500;
@SneakyThrows
public static T retryWithCondition(Callable callable, Predicate retValRetryPredicate,
Predicate throwableRetryPredicate, int maxRetryCount) {
int retryCount = 0;
while (true) {
try {
T ret = callable.call();
if (!retValRetryPredicate.test(ret)) {
return ret;
}
log.warn("return value: {} does not match expected value, retry later", ret);
if (retryCount >= maxRetryCount) {
return ret;
}
try {
Thread.sleep(RETRY_DELAY);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
} catch (Throwable throwable) {
retryCount++;
if (throwableRetryPredicate.test(throwable)) {
if (retryCount >= maxRetryCount) {
throw throwable;
}
}
log.warn("throwableRetryPredicate pass, retry later, throwable message: {}", throwable.getMessage());
try {
Thread.sleep(RETRY_DELAY);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
throw throwable;
}
}
}
}
}