kubeapiserver源码剖析与开发(四):KubeAPIServer创建(一)

在前面的文章中我讲了 genericConfig 配置的初始化,genericConfig 是 kube-apiserver 的核心配置,有了配置后,我们再来讲讲 kube-apiserver 三个 server 中 KubeAPIServer 的创建。

KubeAPIServer的创建只要有三个步骤:

  • 创建 genericServer
  • 安装 legacyAPI 路由
  • 安装 api 路由

这一篇我们说说 genericServer 的创建

我们在前面说过,kube-apiserver 包括三个 server,这三个 server 底层都是 genericServer ,这三个 server 都在 genericServer 上注册了路由,分别处理自己负责的请求。

// cmd/kube-apiserver/app/server.go

func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatorapiserver.APIAggregator, error) {
    kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)

    ...

    kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
    if err != nil {
        return nil, err
    }

    ...
}
// cmd/kube-apiserver/app/server.go

func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
    kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
    if err != nil {
        return nil, err
    }

    return kubeAPIServer, nil
}
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
    if reflect.DeepEqual(c.ExtraConfig.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
        return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
    }
    ...
}

genericServer 的逻辑在 New 函数中,我们先看下 New 函数的参数:

func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error)
  • name: 上面说过,kube-apiserver 的底层都是 genericServer,所以 name 参数就是用来说明创建的 genericServer 是属于哪个 server 的,方便日志记录

  • delegationTarget:这个参数就比较有意思了,我们在前面的文章中说过,kube-apiserver 的三个 server:apiExtensionsServer、kubeAPIServer、aggregatorServer,按照该顺序创建,请求经过的 server 顺序跟创建顺序相反。所以所有请求首先都会来到 aggregatorServer,实际上 kube-apiserver 监听端口,就是由 aggregatorServer 监听的,如果 aggregatorServer 无法处理请求就委托给 kubeAPIServer,依此类推,但是要能够实现委托必须得知道下游 server 是谁,delegationTarget 这个参数就是用来告诉 server,他的下游 server 是谁。

下面我们来看下 New 的具体逻辑

// cmd/kube-apiserver/app/server.go
...
handlerChainBuilder := func(handler http.Handler) http.Handler {
    return c.BuildHandlerChainFunc(handler, c.Config)
}

apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
...

handlerChainBuilder 是一个关键的点,我们在前面文章中说过的请求的认证、鉴权、审计、流控执行是在一条链上执行的,这条链的构建就是由 handlerChainBuilder 完成的,这个构建器实际上是 DefaultBuildHandlerChain,我们看下这个方法

// vendor/k8s.io/apiserver/pkg/server/config.go

func NewConfig(codecs serializer.CodecFactory) *Config {
    ...
    return &Config{
        Serializer:                  codecs,
        BuildHandlerChainFunc:       DefaultBuildHandlerChain,
        ...
    }
// vendor/k8s.io/apiserver/pkg/server/config.go

func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
    handler := filterlatency.TrackCompleted(apiHandler)
    handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
    handler = filterlatency.TrackStarted(handler, "authorization")

    if c.FlowControl != nil {
        workEstimatorCfg := flowcontrolrequest.DefaultWorkEstimatorConfig()
        requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(
            c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg)
        handler = filterlatency.TrackCompleted(handler)
        handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
        handler = filterlatency.TrackStarted(handler, "priorityandfairness")
    } else {
        handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
    }

    handler = filterlatency.TrackCompleted(handler)
    handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
    handler = filterlatency.TrackStarted(handler, "impersonation")

    handler = filterlatency.TrackCompleted(handler)
    handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc)
    handler = filterlatency.TrackStarted(handler, "audit")

    failedHandler := genericapifilters.Unauthorized(c.Serializer)
    failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator)

    failedHandler = filterlatency.TrackCompleted(failedHandler)
    handler = filterlatency.TrackCompleted(handler)
    handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
    handler = filterlatency.TrackStarted(handler, "authentication")

    handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")

    // WithTimeoutForNonLongRunningRequests will call the rest of the request handling in a go-routine with the
    // context with deadline. The go-routine can keep running, while the timeout logic will return a timeout to the client.
    handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)

    handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator,
        c.LongRunningFunc, c.Serializer, c.RequestTimeout)
    handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
    if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
        handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
    }
    handler = genericapifilters.WithAuditAnnotations(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
    handler = genericapifilters.WithWarningRecorder(handler)
    handler = genericapifilters.WithCacheControl(handler)
    handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
    if c.ShutdownSendRetryAfter {
        handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.NotAcceptingNewRequest.Signaled())
    }
    handler = genericfilters.WithHTTPLogging(handler)
    if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
        handler = genericapifilters.WithTracing(handler, c.TracerProvider)
    }
    handler = genericapifilters.WithLatencyTrackers(handler)
    handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
    handler = genericapifilters.WithRequestReceivedTimestamp(handler)
    handler = genericapifilters.WithMuxAndDiscoveryComplete(handler, c.lifecycleSignals.MuxAndDiscoveryComplete.Signaled())
    handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver)
    handler = genericapifilters.WithAuditID(handler)
    return handler
}

这个模式有点像“套娃”,先创建的 handler 被后创建的 handler 包裹在里面,在执行这条链的时候,最外层的 handler 最先被执行,所以 WithAuditID(在请求头加审计ID) 第一个被执行,然后就像“剥洋葱”一样,一层层的剥去,依次执行 handler。被包裹在最里面的那层是 DefaultBuildHandlerChain 的入参 apiHandler 就是真正处理请求的 handler,下文细说。

我们再来看看 NewAPIServerHandler,这个函数非常关键,它创建了处理请求的 handler:

func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
    nonGoRestfulMux := mux.NewPathRecorderMux(name)
    if notFoundHandler != nil {
        nonGoRestfulMux.NotFoundHandler(notFoundHandler)
    }

    gorestfulContainer := restful.NewContainer()
    gorestfulContainer.ServeMux = http.NewServeMux()
    gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
    gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
        logStackOnRecover(s, panicReason, httpWriter)
    })
    gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
        serviceErrorHandler(s, serviceErr, request, response)
    })
// vendor/k8s.io/apiserver/pkg/server/handler.go
// 在后面controller中往 gorestfulContainer 加ws
    director := director{
        name:               name,
        goRestfulContainer: gorestfulContainer,
        nonGoRestfulMux:    nonGoRestfulMux,
    }

    return &APIServerHandler{
        FullHandlerChain:   handlerChainBuilder(director),
        GoRestfulContainer: gorestfulContainer,
        NonGoRestfulMux:    nonGoRestfulMux,
        Director:           director,
    }
}

请求先被APIServerHandler.ServeHTTP处理,他实际调用了 FullHandlerChain.ServeHTTP,FullHandlerChain 我们在前面说过就是一条链,这条链上面会执行审计、认证、鉴权、限速等操作,在链的末端才开始执行真正处理请求的 handler(handlerChainBuilder的入参),这个 handler 就是 director,也就是执行 director.ServeHTTP。我们先大致看下 director.ServeHTTP

func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    path := req.URL.Path

    // check to see if our webservices want to claim this path
    for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
        switch {
        case ws.RootPath() == "/apis":
            // if we are exactly /apis or /apis/, then we need special handling in loop.
            // normally these are passed to the nonGoRestfulMux, but if discovery is enabled, it will go directly.
            // We can't rely on a prefix match since /apis matches everything (see the big comment on Director above)
            if path == "/apis" || path == "/apis/" {
                klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
                // don't use servemux here because gorestful servemuxes get messed up when removing webservices
                // TODO fix gorestful, remove TPRs, or stop using gorestful

                // func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error
                // 是在上面注释这个函数里注册的 webservice
                d.goRestfulContainer.Dispatch(w, req)
                return
            }

        case strings.HasPrefix(path, ws.RootPath()):
            // ensure an exact match or a path boundary match
            if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' {
                klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
                // don't use servemux here because gorestful servemuxes get messed up when removing webservices
                // TODO fix gorestful, remove TPRs, or stop using gorestful
                d.goRestfulContainer.Dispatch(w, req)
                return
            }
        }
    }
    // if we didn't find a match, then we just skip gorestful altogether
    klog.V(5).Infof("%v: %v %q satisfied by nonGoRestful", d.name, req.Method, path)
    d.nonGoRestfulMux.ServeHTTP(w, req)
}

director 中包含两种类型路由:restful 和 nonRestful,restful类型的路由处理注册在该 server 的 api,其他 api 由 nonRestful 处理。所以请求就是按照这个路径处理的:

FullHandlerChain -> Director -> {GoRestfulContainer,NonGoRestfulMux}

我们再回到 NewAPIServerHandler 函数

nonGoRestfulMux := mux.NewPathRecorderMux(name)
if notFoundHandler != nil {
    nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}

下游委托服务器(notFoundHandler)被传递给了 nonGoRestfulMux,当 nonRestful 类型路由 也无法处理请求的话,请求就会委派给 notFoundHandler :

func (m *PathRecorderMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    m.mux.Load().(*pathHandler).ServeHTTP(w, r)
}

// ServeHTTP makes it an http.Handler
func (h *pathHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    if exactHandler, ok := h.pathToHandler[r.URL.Path]; ok {
        klog.V(5).Infof("%v: %q satisfied by exact match", h.muxName, r.URL.Path)
        exactHandler.ServeHTTP(w, r)
        return
    }

    for _, prefixHandler := range h.prefixHandlers {
        if strings.HasPrefix(r.URL.Path, prefixHandler.prefix) {
            klog.V(5).Infof("%v: %q satisfied by prefix %v", h.muxName, r.URL.Path, prefixHandler.prefix)
            prefixHandler.handler.ServeHTTP(w, r)
            return
        }
    }
    klog.V(5).Infof("%v: %q satisfied by NotFoundHandler", h.muxName, r.URL.Path)
    h.notFoundHandler.ServeHTTP(w, r)
}

可以看到,在 nonRestful 类型路由处理器中,首先会对请求 url 做精确匹配,如果可以匹配上就处理,如果无法匹配就做前缀匹配,如果能够匹配也能处理,如果都不能匹配就使用 notFoundHandler 进行处理(也就是下游的委托 server)。

其中,这两种类型的路由的注册是比较复杂的,也是很关键的部分,我们在下一篇专门讲。

nonRestful 类型的路由存在于 aggregatorServer 的 genericServer 中。下图为请求流转图

pCgZD9H.png

我们再回到 New 函数,接下来就创建了 GenericAPIServer 类型的结构体,也就是我们的目标 genericAPIServer:

s := &GenericAPIServer{
        discoveryAddresses:         c.DiscoveryAddresses,
        LoopbackClientConfig:       c.LoopbackClientConfig,
        legacyAPIGroupPrefixes:     c.LegacyAPIGroupPrefixes,
        admissionControl:           c.AdmissionControl,
        Serializer:                 c.Serializer,
        AuditBackend:               c.AuditBackend,
        Authorizer:                 c.Authorization.Authorizer,
        delegationTarget:           delegationTarget,
        EquivalentResourceRegistry: c.EquivalentResourceRegistry,
        HandlerChainWaitGroup:      c.HandlerChainWaitGroup,
        Handler:                    apiServerHandler,

        listedPathProvider: apiServerHandler,

        minRequestTimeout:     time.Duration(c.MinRequestTimeout) * time.Second,
        ShutdownTimeout:       c.RequestTimeout,
        ShutdownDelayDuration: c.ShutdownDelayDuration,
        SecureServingInfo:     c.SecureServing,
        ExternalAddress:       c.ExternalAddress,

        ...
    }

主要做的事情,就是将前面初始化的配置赋值到这个结构体里面,如审计、认证、鉴权、流控、handler。

再往后主要就是启动流控、启动 informer等,这些不是关键流程,我们就不一一分析了。