kubeapiserver源码剖析与开发(四):KubeAPIServer创建(一)

2023年 7月 12日 58.8k 0

在前面的文章中我讲了 genericConfig 配置的初始化,genericConfig 是 kube-apiserver 的核心配置,有了配置后,我们再来讲讲 kube-apiserver 三个 server 中 KubeAPIServer 的创建。

KubeAPIServer的创建只要有三个步骤:

  • 创建 genericServer
  • 安装 legacyAPI 路由
  • 安装 api 路由

这一篇我们说说 genericServer 的创建

我们在前面说过,kube-apiserver 包括三个 server,这三个 server 底层都是 genericServer ,这三个 server 都在 genericServer 上注册了路由,分别处理自己负责的请求。

// cmd/kube-apiserver/app/server.go

func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatorapiserver.APIAggregator, error) {
	kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
    
    ...

	kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
	if err != nil {
		return nil, err
	}

    ...
}
// cmd/kube-apiserver/app/server.go

func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
	kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
	if err != nil {
		return nil, err
	}

	return kubeAPIServer, nil
}
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
	if reflect.DeepEqual(c.ExtraConfig.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
		return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
	}
	...
}

genericServer 的逻辑在 New 函数中,我们先看下 New 函数的参数:

func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error)
  • name: 上面说过,kube-apiserver 的底层都是 genericServer,所以 name 参数就是用来说明创建的 genericServer 是属于哪个 server 的,方便日志记录

  • delegationTarget:这个参数就比较有意思了,我们在前面的文章中说过,kube-apiserver 的三个 server:apiExtensionsServer、kubeAPIServer、aggregatorServer,按照该顺序创建,请求经过的 server 顺序跟创建顺序相反。所以所有请求首先都会来到 aggregatorServer,实际上 kube-apiserver 监听端口,就是由 aggregatorServer 监听的,如果 aggregatorServer 无法处理请求就委托给 kubeAPIServer,依此类推,但是要能够实现委托必须得知道下游 server 是谁,delegationTarget 这个参数就是用来告诉 server,他的下游 server 是谁。

下面我们来看下 New 的具体逻辑

// cmd/kube-apiserver/app/server.go
...
handlerChainBuilder := func(handler http.Handler) http.Handler {
	return c.BuildHandlerChainFunc(handler, c.Config)
}

apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
...

handlerChainBuilder 是一个关键的点,我们在前面文章中说过的请求的认证、鉴权、审计、流控执行是在一条链上执行的,这条链的构建就是由 handlerChainBuilder 完成的,这个构建器实际上是 DefaultBuildHandlerChain,我们看下这个方法

// vendor/k8s.io/apiserver/pkg/server/config.go

func NewConfig(codecs serializer.CodecFactory) *Config {
    ...
	return &Config{
		Serializer:                  codecs,
		BuildHandlerChainFunc:       DefaultBuildHandlerChain,
		...
	}
// vendor/k8s.io/apiserver/pkg/server/config.go

func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
	handler := filterlatency.TrackCompleted(apiHandler)
	handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
	handler = filterlatency.TrackStarted(handler, "authorization")

	if c.FlowControl != nil {
		workEstimatorCfg := flowcontrolrequest.DefaultWorkEstimatorConfig()
		requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(
			c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg)
		handler = filterlatency.TrackCompleted(handler)
		handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
		handler = filterlatency.TrackStarted(handler, "priorityandfairness")
	} else {
		handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
	}

	handler = filterlatency.TrackCompleted(handler)
	handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
	handler = filterlatency.TrackStarted(handler, "impersonation")

	handler = filterlatency.TrackCompleted(handler)
	handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc)
	handler = filterlatency.TrackStarted(handler, "audit")

	failedHandler := genericapifilters.Unauthorized(c.Serializer)
	failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator)

	failedHandler = filterlatency.TrackCompleted(failedHandler)
	handler = filterlatency.TrackCompleted(handler)
	handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
	handler = filterlatency.TrackStarted(handler, "authentication")

	handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")

	// WithTimeoutForNonLongRunningRequests will call the rest of the request handling in a go-routine with the
	// context with deadline. The go-routine can keep running, while the timeout logic will return a timeout to the client.
	handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)

	handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator,
		c.LongRunningFunc, c.Serializer, c.RequestTimeout)
	handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
	if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
		handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
	}
	handler = genericapifilters.WithAuditAnnotations(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
	handler = genericapifilters.WithWarningRecorder(handler)
	handler = genericapifilters.WithCacheControl(handler)
	handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
	if c.ShutdownSendRetryAfter {
		handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.NotAcceptingNewRequest.Signaled())
	}
	handler = genericfilters.WithHTTPLogging(handler)
	if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
		handler = genericapifilters.WithTracing(handler, c.TracerProvider)
	}
	handler = genericapifilters.WithLatencyTrackers(handler)
	handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
	handler = genericapifilters.WithRequestReceivedTimestamp(handler)
	handler = genericapifilters.WithMuxAndDiscoveryComplete(handler, c.lifecycleSignals.MuxAndDiscoveryComplete.Signaled())
	handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver)
	handler = genericapifilters.WithAuditID(handler)
	return handler
}

这个模式有点像“套娃”,先创建的 handler 被后创建的 handler 包裹在里面,在执行这条链的时候,最外层的 handler 最先被执行,所以 WithAuditID(在请求头加审计ID) 第一个被执行,然后就像“剥洋葱”一样,一层层的剥去,依次执行 handler。被包裹在最里面的那层是 DefaultBuildHandlerChain 的入参 apiHandler 就是真正处理请求的 handler,下文细说。

我们再来看看 NewAPIServerHandler,这个函数非常关键,它创建了处理请求的 handler:

func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
	nonGoRestfulMux := mux.NewPathRecorderMux(name)
	if notFoundHandler != nil {
		nonGoRestfulMux.NotFoundHandler(notFoundHandler)
	}

	gorestfulContainer := restful.NewContainer()
	gorestfulContainer.ServeMux = http.NewServeMux()
	gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
	gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
		logStackOnRecover(s, panicReason, httpWriter)
	})
	gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
		serviceErrorHandler(s, serviceErr, request, response)
	})
// vendor/k8s.io/apiserver/pkg/server/handler.go
// 在后面controller中往 gorestfulContainer 加ws
	director := director{
		name:               name,
		goRestfulContainer: gorestfulContainer,
		nonGoRestfulMux:    nonGoRestfulMux,
	}

	return &APIServerHandler{
		FullHandlerChain:   handlerChainBuilder(director),
		GoRestfulContainer: gorestfulContainer,
		NonGoRestfulMux:    nonGoRestfulMux,
		Director:           director,
	}
}

请求先被APIServerHandler.ServeHTTP处理,他实际调用了 FullHandlerChain.ServeHTTP,FullHandlerChain 我们在前面说过就是一条链,这条链上面会执行审计、认证、鉴权、限速等操作,在链的末端才开始执行真正处理请求的 handler(handlerChainBuilder的入参),这个 handler 就是 director,也就是执行 director.ServeHTTP。我们先大致看下 director.ServeHTTP

func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	path := req.URL.Path

	// check to see if our webservices want to claim this path
	for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
		switch {
		case ws.RootPath() == "/apis":
			// if we are exactly /apis or /apis/, then we need special handling in loop.
			// normally these are passed to the nonGoRestfulMux, but if discovery is enabled, it will go directly.
			// We can't rely on a prefix match since /apis matches everything (see the big comment on Director above)
			if path == "/apis" || path == "/apis/" {
				klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
				// don't use servemux here because gorestful servemuxes get messed up when removing webservices
				// TODO fix gorestful, remove TPRs, or stop using gorestful

				// func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error
				// 是在上面注释这个函数里注册的 webservice
				d.goRestfulContainer.Dispatch(w, req)
				return
			}

		case strings.HasPrefix(path, ws.RootPath()):
			// ensure an exact match or a path boundary match
			if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' {
				klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
				// don't use servemux here because gorestful servemuxes get messed up when removing webservices
				// TODO fix gorestful, remove TPRs, or stop using gorestful
				d.goRestfulContainer.Dispatch(w, req)
				return
			}
		}
	}
	// if we didn't find a match, then we just skip gorestful altogether
	klog.V(5).Infof("%v: %v %q satisfied by nonGoRestful", d.name, req.Method, path)
	d.nonGoRestfulMux.ServeHTTP(w, req)
}

director 中包含两种类型路由:restful 和 nonRestful,restful类型的路由处理注册在该 server 的 api,其他 api 由 nonRestful 处理。所以请求就是按照这个路径处理的:

FullHandlerChain -> Director -> {GoRestfulContainer,NonGoRestfulMux}

我们再回到 NewAPIServerHandler 函数

nonGoRestfulMux := mux.NewPathRecorderMux(name)
if notFoundHandler != nil {
	nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}

下游委托服务器(notFoundHandler)被传递给了 nonGoRestfulMux,当 nonRestful 类型路由 也无法处理请求的话,请求就会委派给 notFoundHandler :

func (m *PathRecorderMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	m.mux.Load().(*pathHandler).ServeHTTP(w, r)
}

// ServeHTTP makes it an http.Handler
func (h *pathHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	if exactHandler, ok := h.pathToHandler[r.URL.Path]; ok {
		klog.V(5).Infof("%v: %q satisfied by exact match", h.muxName, r.URL.Path)
		exactHandler.ServeHTTP(w, r)
		return
	}

	for _, prefixHandler := range h.prefixHandlers {
		if strings.HasPrefix(r.URL.Path, prefixHandler.prefix) {
			klog.V(5).Infof("%v: %q satisfied by prefix %v", h.muxName, r.URL.Path, prefixHandler.prefix)
			prefixHandler.handler.ServeHTTP(w, r)
			return
		}
	}
	klog.V(5).Infof("%v: %q satisfied by NotFoundHandler", h.muxName, r.URL.Path)
	h.notFoundHandler.ServeHTTP(w, r)
}

可以看到,在 nonRestful 类型路由处理器中,首先会对请求 url 做精确匹配,如果可以匹配上就处理,如果无法匹配就做前缀匹配,如果能够匹配也能处理,如果都不能匹配就使用 notFoundHandler 进行处理(也就是下游的委托 server)。

其中,这两种类型的路由的注册是比较复杂的,也是很关键的部分,我们在下一篇专门讲。

nonRestful 类型的路由存在于 aggregatorServer 的 genericServer 中。下图为请求流转图

pCgZD9H.png

我们再回到 New 函数,接下来就创建了 GenericAPIServer 类型的结构体,也就是我们的目标 genericAPIServer:

s := &GenericAPIServer{
		discoveryAddresses:         c.DiscoveryAddresses,
		LoopbackClientConfig:       c.LoopbackClientConfig,
		legacyAPIGroupPrefixes:     c.LegacyAPIGroupPrefixes,
		admissionControl:           c.AdmissionControl,
		Serializer:                 c.Serializer,
		AuditBackend:               c.AuditBackend,
		Authorizer:                 c.Authorization.Authorizer,
		delegationTarget:           delegationTarget,
		EquivalentResourceRegistry: c.EquivalentResourceRegistry,
		HandlerChainWaitGroup:      c.HandlerChainWaitGroup,
		Handler:                    apiServerHandler,

		listedPathProvider: apiServerHandler,

		minRequestTimeout:     time.Duration(c.MinRequestTimeout) * time.Second,
		ShutdownTimeout:       c.RequestTimeout,
		ShutdownDelayDuration: c.ShutdownDelayDuration,
		SecureServingInfo:     c.SecureServing,
		ExternalAddress:       c.ExternalAddress,

        ...
	}

主要做的事情,就是将前面初始化的配置赋值到这个结构体里面,如审计、认证、鉴权、流控、handler。

再往后主要就是启动流控、启动 informer等,这些不是关键流程,我们就不一一分析了。

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论