kubeapiserver源码剖析与开发(六):创建一个 Pod 在 kukbeapiserver 经历了什么

2023年 7月 19日 102.5k 0

在前面的文章里我们说过,kube-apiserver 由 aggregatorServer、kubeAPIServer、apiExtensionsServer 三个 server 组成,他们各司其职,当自己处理不了请求时,就向下传递(委派)请求,处理的顺序如下:

pCZEG4A.png

虽然说 kube-apiserver 由三个 server 组成,但是只有 aggregatorServer 监听了端口,是请求的入口。底层用的是 golang 的 net/http 实现的,是一个标准的 tcp server

net/http/server.go

func (srv *Server) Serve(l net.Listener) error {
    ...
	for {
		rw, err := l.Accept()
		if err != nil {
			select {
			case  max {
					tempDelay = max
				}
				srv.logf("http: Accept error: %v; retrying in %v", err, tempDelay)
				time.Sleep(tempDelay)
				continue
			}
			return err
		}
		connCtx := ctx
		if cc := srv.ConnContext; cc != nil {
			connCtx = cc(connCtx, rw)
			if connCtx == nil {
				panic("ConnContext returned nil")
			}
		}
		tempDelay = 0
		c := srv.newConn(rw)
		c.setState(c.rwc, StateNew, runHooks) // before Serve can return
		go c.serve(connCtx)
	}
}

主线程中 accept 请求,如果有请求过来另起一个协程进行处理,该协程会执行下面代码

net/http/server.go

func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
	handler := sh.srv.Handler
    
    ...
    
	// APIServerHandler
	// vendor/k8s.io/apiserver/pkg/server/handler.go
	handler.ServeHTTP(rw, req)
}

而这里的 handler 就是前面创建的 aggregatorServer 中 genericServer 的 Handler,它在如下代码中创建

// vendor/k8s.io/apiserver/pkg/server/handler.go

func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
	nonGoRestfulMux := mux.NewPathRecorderMux(name)
	if notFoundHandler != nil {
		nonGoRestfulMux.NotFoundHandler(notFoundHandler)
	}

	gorestfulContainer := restful.NewContainer()
	gorestfulContainer.ServeMux = http.NewServeMux()
	gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
	gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
		logStackOnRecover(s, panicReason, httpWriter)
	})
	gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
		serviceErrorHandler(s, serviceErr, request, response)
	})
// vendor/k8s.io/apiserver/pkg/server/handler.go
// 在后面controller中往 gorestfulContainer 加ws
	director := director{
		name:               name,
		goRestfulContainer: gorestfulContainer,
		nonGoRestfulMux:    nonGoRestfulMux,
	}

	return &APIServerHandler{
		FullHandlerChain:   handlerChainBuilder(director),
		GoRestfulContainer: gorestfulContainer,
		NonGoRestfulMux:    nonGoRestfulMux,
		Director:           director,
	}
}

所以请求实际上是被 APIServerHandler 的 ServeHTTP 方法处理,如下:

// vendor/k8s.io/apiserver/pkg/server/handler.go

func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	a.FullHandlerChain.ServeHTTP(w, r)
}

请求就来到了 FullHandlerChain.ServeHTT 方法中。我们在前面的文章中说过,所有请求都会经过一条链,这条路链会在下面的函数中创建:

// vendor/k8s.io/apiserver/pkg/server/config.go

func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
	handler := filterlatency.TrackCompleted(apiHandler)
	handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
	handler = filterlatency.TrackStarted(handler, "authorization")

	if c.FlowControl != nil {
		workEstimatorCfg := flowcontrolrequest.DefaultWorkEstimatorConfig()
		requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(
			c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg)
		handler = filterlatency.TrackCompleted(handler)
		handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
		handler = filterlatency.TrackStarted(handler, "priorityandfairness")
	} else {
		handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
	}

	handler = filterlatency.TrackCompleted(handler)
	handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
	handler = filterlatency.TrackStarted(handler, "impersonation")

	handler = filterlatency.TrackCompleted(handler)
	handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc)
	handler = filterlatency.TrackStarted(handler, "audit")

	failedHandler := genericapifilters.Unauthorized(c.Serializer)
	failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator)

	failedHandler = filterlatency.TrackCompleted(failedHandler)
	handler = filterlatency.TrackCompleted(handler)
	handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
	handler = filterlatency.TrackStarted(handler, "authentication")

	handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")

	// WithTimeoutForNonLongRunningRequests will call the rest of the request handling in a go-routine with the
	// context with deadline. The go-routine can keep running, while the timeout logic will return a timeout to the client.
	handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)

	handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator,
		c.LongRunningFunc, c.Serializer, c.RequestTimeout)
	handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
	if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
		handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
	}
	handler = genericapifilters.WithAuditAnnotations(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
	handler = genericapifilters.WithWarningRecorder(handler)
	handler = genericapifilters.WithCacheControl(handler)
	handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
	if c.ShutdownSendRetryAfter {
		handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.NotAcceptingNewRequest.Signaled())
	}
	handler = genericfilters.WithHTTPLogging(handler)
	if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
		handler = genericapifilters.WithTracing(handler, c.TracerProvider)
	}
	handler = genericapifilters.WithLatencyTrackers(handler)
	handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
	handler = genericapifilters.WithRequestReceivedTimestamp(handler)
	handler = genericapifilters.WithMuxAndDiscoveryComplete(handler, c.lifecycleSignals.MuxAndDiscoveryComplete.Signaled())
	handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver)
	handler = genericapifilters.WithAuditID(handler)
	return handler
}

这个函数我们在前面的文章说过,它将所有需要执行的操作串联在一起,函数最后的的 handler 最先执行,开头的 handler 最后执行。根据上面代码,我们能看到创建一个 Pod 在这条链上经历了如下关键步骤:

  • Authentication:认证
  • Audit:审计
  • FlowControl:限速检测
  • Authorization:鉴权
  • apiHandler:如果上述步骤都顺利执行没有错误,则在链的末端开始执行请求的处理步骤

而 apiHandler 被传入的是下面的对象(director):

// vendor/k8s.io/apiserver/pkg/server/handler.go

director := director{
	name:               name,
	goRestfulContainer: gorestfulContainer,
	nonGoRestfulMux:    nonGoRestfulMux,
}

return &APIServerHandler{
	FullHandlerChain:   handlerChainBuilder(director),
	GoRestfulContainer: gorestfulContainer,
	NonGoRestfulMux:    nonGoRestfulMux,
	Director:           director,
}

所以接下来就是执行 director 的 ServeHTTP 方法:


func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	path := req.URL.Path

	// check to see if our webservices want to claim this path
	for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
		switch {
		case ws.RootPath() == "/apis":
			// if we are exactly /apis or /apis/, then we need special handling in loop.
			// normally these are passed to the nonGoRestfulMux, but if discovery is enabled, it will go directly.
			// We can't rely on a prefix match since /apis matches everything (see the big comment on Director above)
			if path == "/apis" || path == "/apis/" {
				klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
				// don't use servemux here because gorestful servemuxes get messed up when removing webservices
				// TODO fix gorestful, remove TPRs, or stop using gorestful

				// func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error
				d.goRestfulContainer.Dispatch(w, req)
				return
			}

		case strings.HasPrefix(path, ws.RootPath()):
			// ensure an exact match or a path boundary match
			if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' {
				klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
				// don't use servemux here because gorestful servemuxes get messed up when removing webservices
				// TODO fix gorestful, remove TPRs, or stop using gorestful
				d.goRestfulContainer.Dispatch(w, req)
				return
			}
		}
	}
	// if we didn't find a match, then we just skip gorestful altogether
	klog.V(5).Infof("%v: %v %q satisfied by nonGoRestful", d.name, req.Method, path)
	d.nonGoRestfulMux.ServeHTTP(w, req)
}

这个方法内,首先会遍历注册在 aggregatorServer 内的路由,也就是 goRestFul 类型的路由,如果有匹配到 url,那么就在这里处理请求。我们本文讨论的是创建 Pod 的请求,它属于 legacyAPI(/api 为前缀,group 为空),路由是在 kubeAPIServer 中注册的,所以不会在 aggregatorServer 中处理,所以请求会走到最后的 nonGoRestfulMux.ServeHTTP 方法,nonGoRestfulMux 也是在 NewAPIServerHandler 中创建的

// vendor/k8s.io/apiserver/pkg/server/handler.go

func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
	nonGoRestfulMux := mux.NewPathRecorderMux(name)
	if notFoundHandler != nil {
		nonGoRestfulMux.NotFoundHandler(notFoundHandler)
	}
	...
}
// vendor/k8s.io/apiserver/pkg/server/mux/pathrecorder.go

func NewPathRecorderMux(name string) *PathRecorderMux {
	ret := &PathRecorderMux{
		name:            name,
		pathToHandler:   map[string]http.Handler{},
		prefixToHandler: map[string]http.Handler{},
		mux:             atomic.Value{},
		exposedPaths:    []string{},
		pathStacks:      map[string]string{},
	}

	ret.mux.Store(&pathHandler{notFoundHandler: http.NotFoundHandler()})
	return ret
}

nonGoRestfulMux 的类型是 PathRecorderMux,它实现了 ServeHTTP 方法,所以接下来就是执行它的 ServeHTTP 方法。nonGoRestfulMux 是处理没有注册在 goRestFul 类型路由中的请求

// jetbrains://goland/navigate/reference?project=kubernetes&path=vendor/k8s.io/apiserver/pkg/server/mux/pathrecorder.go

func (m *PathRecorderMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	m.mux.Load().(*pathHandler).ServeHTTP(w, r)
}

// ServeHTTP makes it an http.Handler
func (h *pathHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	if exactHandler, ok := h.pathToHandler[r.URL.Path]; ok {
		klog.V(5).Infof("%v: %q satisfied by exact match", h.muxName, r.URL.Path)
		exactHandler.ServeHTTP(w, r)
		return
	}

	for _, prefixHandler := range h.prefixHandlers {
		if strings.HasPrefix(r.URL.Path, prefixHandler.prefix) {
			klog.V(5).Infof("%v: %q satisfied by prefix %v", h.muxName, r.URL.Path, prefixHandler.prefix)
			prefixHandler.handler.ServeHTTP(w, r)
			return
		}
	}
	klog.V(5).Infof("%v: %q satisfied by NotFoundHandler", h.muxName, r.URL.Path)
	h.notFoundHandler.ServeHTTP(w, r)
}

这里我们要看下,nonGoRestfulMux 的路由是在哪里注册的,只有知道了里面注册了什么,才能知道请求在这里怎么处理

nonGoRestfulMux 是在创建 aggregatorServer 时创建的,并注册路由

// vendor/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go

func (c completedConfig) NewWithDelegate(...) {
...

apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().V1().APIServices(), s)
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
go apiserviceRegistrationController.Run(context.StopCh, apiServiceRegistrationControllerInitiated)
select {
case

相关文章

KubeSphere 部署向量数据库 Milvus 实战指南
探索 Kubernetes 持久化存储之 Longhorn 初窥门径
征服 Docker 镜像访问限制!KubeSphere v3.4.1 成功部署全攻略
那些年在 Terraform 上吃到的糖和踩过的坑
无需 Kubernetes 测试 Kubernetes 网络实现
Kubernetes v1.31 中的移除和主要变更

发布评论