在前面的文章里我们说过,kube-apiserver 由 aggregatorServer、kubeAPIServer、apiExtensionsServer 三个 server 组成,他们各司其职,当自己处理不了请求时,就向下传递(委派)请求,处理的顺序如下:
虽然说 kube-apiserver 由三个 server 组成,但是只有 aggregatorServer 监听了端口,是请求的入口。底层用的是 golang 的 net/http 实现的,是一个标准的 tcp server
net/http/server.go
func (srv *Server) Serve(l net.Listener) error {
...
for {
rw, err := l.Accept()
if err != nil {
select {
case max {
tempDelay = max
}
srv.logf("http: Accept error: %v; retrying in %v", err, tempDelay)
time.Sleep(tempDelay)
continue
}
return err
}
connCtx := ctx
if cc := srv.ConnContext; cc != nil {
connCtx = cc(connCtx, rw)
if connCtx == nil {
panic("ConnContext returned nil")
}
}
tempDelay = 0
c := srv.newConn(rw)
c.setState(c.rwc, StateNew, runHooks) // before Serve can return
go c.serve(connCtx)
}
}
主线程中 accept 请求,如果有请求过来另起一个协程进行处理,该协程会执行下面代码
net/http/server.go
func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
handler := sh.srv.Handler
...
// APIServerHandler
// vendor/k8s.io/apiserver/pkg/server/handler.go
handler.ServeHTTP(rw, req)
}
而这里的 handler 就是前面创建的 aggregatorServer 中 genericServer 的 Handler,它在如下代码中创建
// vendor/k8s.io/apiserver/pkg/server/handler.go
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
nonGoRestfulMux := mux.NewPathRecorderMux(name)
if notFoundHandler != nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}
gorestfulContainer := restful.NewContainer()
gorestfulContainer.ServeMux = http.NewServeMux()
gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})
// vendor/k8s.io/apiserver/pkg/server/handler.go
// 在后面controller中往 gorestfulContainer 加ws
director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}
return &APIServerHandler{
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}
所以请求实际上是被 APIServerHandler 的 ServeHTTP 方法处理,如下:
// vendor/k8s.io/apiserver/pkg/server/handler.go
func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
a.FullHandlerChain.ServeHTTP(w, r)
}
请求就来到了 FullHandlerChain.ServeHTT 方法中。我们在前面的文章中说过,所有请求都会经过一条链,这条路链会在下面的函数中创建:
// vendor/k8s.io/apiserver/pkg/server/config.go
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := filterlatency.TrackCompleted(apiHandler)
handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
handler = filterlatency.TrackStarted(handler, "authorization")
if c.FlowControl != nil {
workEstimatorCfg := flowcontrolrequest.DefaultWorkEstimatorConfig()
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(
c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg)
handler = filterlatency.TrackCompleted(handler)
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
handler = filterlatency.TrackStarted(handler, "priorityandfairness")
} else {
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
}
handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
handler = filterlatency.TrackStarted(handler, "impersonation")
handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc)
handler = filterlatency.TrackStarted(handler, "audit")
failedHandler := genericapifilters.Unauthorized(c.Serializer)
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
failedHandler = filterlatency.TrackCompleted(failedHandler)
handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
handler = filterlatency.TrackStarted(handler, "authentication")
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
// WithTimeoutForNonLongRunningRequests will call the rest of the request handling in a go-routine with the
// context with deadline. The go-routine can keep running, while the timeout logic will return a timeout to the client.
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)
handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator,
c.LongRunningFunc, c.Serializer, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
}
handler = genericapifilters.WithAuditAnnotations(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
handler = genericapifilters.WithWarningRecorder(handler)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
if c.ShutdownSendRetryAfter {
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.NotAcceptingNewRequest.Signaled())
}
handler = genericfilters.WithHTTPLogging(handler)
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
handler = genericapifilters.WithTracing(handler, c.TracerProvider)
}
handler = genericapifilters.WithLatencyTrackers(handler)
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
handler = genericapifilters.WithRequestReceivedTimestamp(handler)
handler = genericapifilters.WithMuxAndDiscoveryComplete(handler, c.lifecycleSignals.MuxAndDiscoveryComplete.Signaled())
handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver)
handler = genericapifilters.WithAuditID(handler)
return handler
}
这个函数我们在前面的文章说过,它将所有需要执行的操作串联在一起,函数最后的的 handler 最先执行,开头的 handler 最后执行。根据上面代码,我们能看到创建一个 Pod 在这条链上经历了如下关键步骤:
- Authentication:认证
- Audit:审计
- FlowControl:限速检测
- Authorization:鉴权
- apiHandler:如果上述步骤都顺利执行没有错误,则在链的末端开始执行请求的处理步骤
而 apiHandler 被传入的是下面的对象(director):
// vendor/k8s.io/apiserver/pkg/server/handler.go
director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}
return &APIServerHandler{
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
所以接下来就是执行 director 的 ServeHTTP 方法:
func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
path := req.URL.Path
// check to see if our webservices want to claim this path
for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
switch {
case ws.RootPath() == "/apis":
// if we are exactly /apis or /apis/, then we need special handling in loop.
// normally these are passed to the nonGoRestfulMux, but if discovery is enabled, it will go directly.
// We can't rely on a prefix match since /apis matches everything (see the big comment on Director above)
if path == "/apis" || path == "/apis/" {
klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
// don't use servemux here because gorestful servemuxes get messed up when removing webservices
// TODO fix gorestful, remove TPRs, or stop using gorestful
// func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error
d.goRestfulContainer.Dispatch(w, req)
return
}
case strings.HasPrefix(path, ws.RootPath()):
// ensure an exact match or a path boundary match
if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' {
klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
// don't use servemux here because gorestful servemuxes get messed up when removing webservices
// TODO fix gorestful, remove TPRs, or stop using gorestful
d.goRestfulContainer.Dispatch(w, req)
return
}
}
}
// if we didn't find a match, then we just skip gorestful altogether
klog.V(5).Infof("%v: %v %q satisfied by nonGoRestful", d.name, req.Method, path)
d.nonGoRestfulMux.ServeHTTP(w, req)
}
这个方法内,首先会遍历注册在 aggregatorServer 内的路由,也就是 goRestFul 类型的路由,如果有匹配到 url,那么就在这里处理请求。我们本文讨论的是创建 Pod 的请求,它属于 legacyAPI(/api 为前缀,group 为空),路由是在 kubeAPIServer 中注册的,所以不会在 aggregatorServer 中处理,所以请求会走到最后的 nonGoRestfulMux.ServeHTTP 方法,nonGoRestfulMux 也是在 NewAPIServerHandler 中创建的
// vendor/k8s.io/apiserver/pkg/server/handler.go
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
nonGoRestfulMux := mux.NewPathRecorderMux(name)
if notFoundHandler != nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}
...
}
// vendor/k8s.io/apiserver/pkg/server/mux/pathrecorder.go
func NewPathRecorderMux(name string) *PathRecorderMux {
ret := &PathRecorderMux{
name: name,
pathToHandler: map[string]http.Handler{},
prefixToHandler: map[string]http.Handler{},
mux: atomic.Value{},
exposedPaths: []string{},
pathStacks: map[string]string{},
}
ret.mux.Store(&pathHandler{notFoundHandler: http.NotFoundHandler()})
return ret
}
nonGoRestfulMux 的类型是 PathRecorderMux,它实现了 ServeHTTP 方法,所以接下来就是执行它的 ServeHTTP 方法。nonGoRestfulMux 是处理没有注册在 goRestFul 类型路由中的请求
// jetbrains://goland/navigate/reference?project=kubernetes&path=vendor/k8s.io/apiserver/pkg/server/mux/pathrecorder.go
func (m *PathRecorderMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.mux.Load().(*pathHandler).ServeHTTP(w, r)
}
// ServeHTTP makes it an http.Handler
func (h *pathHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if exactHandler, ok := h.pathToHandler[r.URL.Path]; ok {
klog.V(5).Infof("%v: %q satisfied by exact match", h.muxName, r.URL.Path)
exactHandler.ServeHTTP(w, r)
return
}
for _, prefixHandler := range h.prefixHandlers {
if strings.HasPrefix(r.URL.Path, prefixHandler.prefix) {
klog.V(5).Infof("%v: %q satisfied by prefix %v", h.muxName, r.URL.Path, prefixHandler.prefix)
prefixHandler.handler.ServeHTTP(w, r)
return
}
}
klog.V(5).Infof("%v: %q satisfied by NotFoundHandler", h.muxName, r.URL.Path)
h.notFoundHandler.ServeHTTP(w, r)
}
这里我们要看下,nonGoRestfulMux 的路由是在哪里注册的,只有知道了里面注册了什么,才能知道请求在这里怎么处理
nonGoRestfulMux 是在创建 aggregatorServer 时创建的,并注册路由
// vendor/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
func (c completedConfig) NewWithDelegate(...) {
...
apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().V1().APIServices(), s)
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
go apiserviceRegistrationController.Run(context.StopCh, apiServiceRegistrationControllerInitiated)
select {
case