Http Client Tcp Connect Failed Retry

2023年 10月 8日 67.2k 0

为什么需要 Tcp Connect Failed Retry ?

K8S 多副本环境下,如果服务消费方使用 http client 访问不健康的 pod(服务提供方),http client 会抛出异常,一般这个异常会带回到前端页面,显示【系统繁忙,请稍候重试】或者【网络异常,请稍候重试】等信息。

K8S 集群内部,一般使用 Service 进行 Pod 间通信。如果服务消费方使用 http client 进行 tcp 握手失败后,可以继续使用 Service 地址进行重试,因为 Service 底层由 Endpoint 组件实现,自带负载均衡效果,重试就有机会将 tcp 连接打到健康的 pod 上去。

为什么只做 Tcp Connect Failed Retry,不做 http read(write) timeout retry ?如果要做 http read timeout retry,需要保证服务提供方的接口均为幂等接口,这个成本比较高,折中,只做 Tcp Connect Failed Retry,剩下的 read timeout 异常,往前端抛。

Feign Client Tcp 连接失败后进行 Retry

Feign Client 提供 Retryer 接口,并提供了两个默认实现:

  • NEVER_RETRY:空实现,不重试。
  • Default:默认实现,重试指定次数。
  • /**
     * Cloned for each invocation to {@link Client#execute(Request, feign.Request.Options)}.
     * Implementations may keep state to determine if retry operations should continue or not.
     */
    public interface Retryer extends Cloneable {
    
      /**
       * if retry is permitted, return (possibly after sleeping). Otherwise propagate the exception.
       */
      void continueOrPropagate(RetryableException e);
    
      Retryer clone();
    
      class Default implements Retryer {
    
        private final int maxAttempts;
        private final long period;
        private final long maxPeriod;
        int attempt;
        long sleptForMillis;
    
        public Default() {
          this(100, SECONDS.toMillis(1), 5);
        }
    
        public Default(long period, long maxPeriod, int maxAttempts) {
          this.period = period;
          this.maxPeriod = maxPeriod;
          this.maxAttempts = maxAttempts;
          this.attempt = 1;
        }
    
        // visible for testing;
        protected long currentTimeMillis() {
          return System.currentTimeMillis();
        }
    
        public void continueOrPropagate(RetryableException e) {
          if (attempt++ >= maxAttempts) {
            throw e;
          }
    
          long interval;
          if (e.retryAfter() != null) {
            interval = e.retryAfter().getTime() - currentTimeMillis();
            if (interval > maxPeriod) {
              interval = maxPeriod;
            }
            if (interval  maxPeriod ? maxPeriod : interval;
        }
    
        @Override
        public Retryer clone() {
          return new Default(period, maxPeriod, maxAttempts);
        }
      }
    
      /**
       * Implementation that never retries request. It propagates the RetryableException.
       */
      Retryer NEVER_RETRY = new Retryer() {
    
        @Override
        public void continueOrPropagate(RetryableException e) {
          throw e;
        }
    
        @Override
        public Retryer clone() {
          return this;
        }
      };
    }
    

    SpringBoot OpenFeign 自动装配默认使用 NEVER_RETRY 实现,如果想要切换为 Default 实现,可以在 yml 中配置(内部类使用 $ 分隔符),也可使用注入 Bean 的方式进行配置。

    feign:
      client:
        config:
          default:
            connectTimeout: 1000
            readTimeout: 5000
            retryer: feign.Retryer$Default
    

    Default 是一个很好的实现范例,但是不满足我们的要求,该实现发生任何异常均会进行重试,而我们只想在 tcp 握手失败时进行重试。

    经过实验,发现 tcp 握手失败有两种情况:

  • 服务器未监听端口,客户端与服务器 TCP 握手会立即失败,抛出 ConnectException,异常消息一般为 Connection Refused,说明 TCP 连接被拒绝。
  • TCP 握手超时,超过 connectTimeout 时间后,抛出 SocketTimeoutException,异常 Message 为 connect timed out,说明 TCP 连接超时。
  • 注意:如果抛出 SocketTimeoutException,异常消息为 Read time out,说明发生了 http read time out,不能进行重试。

  • ConnectException:Signals that an error occurred while attempting to connect a socket to a remote address and port. Typically, the connection was refused remotely (e.g., no process is listening on the remote address/port).
  • SocketTimeoutException: Signals that a timeout has occurred on a socket read or accept.
  • ConnectFailedRetryer 使用了装饰器模式,内部有一个成员变量 Retryer retryer 执行真正的 retry 操作,在 ConnectFailedRetryer#continueOrPropagate() 方法中,仅仅只是针对指定的异常,调用 retryer#continueOrPropagate() 方法执行 retry,其他情况则直接抛出异常。

    @Slf4j
    public class ConnectFailedRetryer implements Retryer {
    
        private final Retryer retryer;
    
        public ConnectFailedRetryer() {
            this(100, SECONDS.toMillis(1), 5);
        }
    
        public ConnectFailedRetryer(long period, long maxPeriod, int maxAttempts) {
            this(new Default(period, maxPeriod, maxAttempts));
        }
    
        public ConnectFailedRetryer(Retryer retryer) {
            this.retryer = retryer;
        }
    
        @Override
        public void continueOrPropagate(RetryableException e) {
            Request request = e.request();
            String url = request.url();
            Request.HttpMethod httpMethod = request.httpMethod();
            Throwable throwable = e.getCause();
            if (throwable instanceof ConnectException) {
                ConnectException connectException = (ConnectException) throwable;
                log.warn("url: {}, httpMethod: {}, connectException: {}", url, httpMethod, connectException.getMessage());
                retryer.continueOrPropagate(e);
            } else if (throwable instanceof SocketTimeoutException) {
                SocketTimeoutException socketTimeoutException = (SocketTimeoutException) throwable;
                String timeoutExceptionMessage = socketTimeoutException.getMessage();
                log.warn("url: {}, httpMethod: {}, socketTimeoutException: {}", url, httpMethod, timeoutExceptionMessage);
                if (StrUtil.equals(timeoutExceptionMessage, "connect timed out")) {
                    retryer.continueOrPropagate(e);
                } else {
                    throw e;
                }
            } else {
                throw e;
            }
        }
    
        @Override
        public Retryer clone() {
            Retryer cloneRetryer = retryer.clone();
            return new ConnectFailedRetryer(cloneRetryer);
        }
    
    }
    

    在 yml 中配置自定义的 ConnectFailedRetryer:

    feign:
      client:
        config:
          default:
            connectTimeout: 1000
            readTimeout: 5000
            retryer: com.oneby.common.feign.ConnectFailedRetryer
    

    OkHttp Client Tcp 连接失败后进行 Retry

    OkHttp 需要在 Interceptor#intercept() 方法中实现自定义 retry 逻辑。

    @Slf4j
    public class ConnectFailedRetryInterceptor implements Interceptor {
    
        private final static int MAX_RETRY = 5;
    
        @Override
        public Response intercept(Chain chain) {
            Request request = chain.request();
            return RetryUtils.retryWithCondition(() -> chain.proceed(request), response -> false, throwable -> {
                if (throwable instanceof ConnectException) {
                    ConnectException connectException = (ConnectException) throwable;
                    log.warn("url: {}, httpMethod: {}, connectException: {}", request.url(), request.method(),
                            connectException.getMessage());
                    return true;
                } else if (throwable instanceof SocketTimeoutException) {
                    SocketTimeoutException socketTimeoutException = (SocketTimeoutException) throwable;
                    String timeoutExceptionMessage = socketTimeoutException.getMessage();
                    log.warn("url: {}, httpMethod: {}, connectException: {}", request.url(), request.method(),
                            timeoutExceptionMessage);
                    return StrUtil.equals(timeoutExceptionMessage, "connect timed out");
                }
                return false;
            }, MAX_RETRY);
        }
    
    }
    

    这里简单封装了一个 RetryUtils 工具类,成熟的解决方案可以使用 spring retry 或者 guava retry。

    @Slf4j
    public abstract class RetryUtils {
    
        private static final int RETRY_DELAY = 500;
    
        @SneakyThrows
        public static  T retryWithCondition(Callable callable, Predicate retValRetryPredicate,
                Predicate throwableRetryPredicate, int maxRetryCount) {
            int retryCount = 0;
            while (true) {
                try {
                    T ret = callable.call();
                    if (!retValRetryPredicate.test(ret)) {
                        return ret;
                    }
                    log.warn("return value: {} does not match expected value, retry later", ret);
                    if (retryCount >= maxRetryCount) {
                        return ret;
                    }
                    try {
                        Thread.sleep(RETRY_DELAY);
                    } catch (InterruptedException ignored) {
                        Thread.currentThread().interrupt();
                    }
                } catch (Throwable throwable) {
                    retryCount++;
                    if (throwableRetryPredicate.test(throwable)) {
                        if (retryCount >= maxRetryCount) {
                            throw throwable;
                        }
                    }
                    log.warn("throwableRetryPredicate pass, retry later, throwable message: {}", throwable.getMessage());
                    try {
                        Thread.sleep(RETRY_DELAY);
                    } catch (InterruptedException ignored) {
                        Thread.currentThread().interrupt();
                        throw throwable;
                    }
                }
            }
        }
    
    }
    

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论