在前面的文章中我讲了 genericConfig 配置的初始化,genericConfig 是 kube-apiserver 的核心配置,有了配置后,我们再来讲讲 kube-apiserver 三个 server 中 KubeAPIServer 的创建。
KubeAPIServer的创建只要有三个步骤:
- 创建 genericServer
- 安装 legacyAPI 路由
- 安装 api 路由
这一篇我们说说 genericServer 的创建
我们在前面说过,kube-apiserver 包括三个 server,这三个 server 底层都是 genericServer ,这三个 server 都在 genericServer 上注册了路由,分别处理自己负责的请求。
// cmd/kube-apiserver/app/server.go
func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatorapiserver.APIAggregator, error) {
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
...
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
if err != nil {
return nil, err
}
...
}
// cmd/kube-apiserver/app/server.go
func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
if err != nil {
return nil, err
}
return kubeAPIServer, nil
}
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
if reflect.DeepEqual(c.ExtraConfig.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
}
...
}
genericServer 的逻辑在 New 函数中,我们先看下 New 函数的参数:
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error)
-
name: 上面说过,kube-apiserver 的底层都是 genericServer,所以 name 参数就是用来说明创建的 genericServer 是属于哪个 server 的,方便日志记录
-
delegationTarget:这个参数就比较有意思了,我们在前面的文章中说过,kube-apiserver 的三个 server:apiExtensionsServer、kubeAPIServer、aggregatorServer,按照该顺序创建,请求经过的 server 顺序跟创建顺序相反。所以所有请求首先都会来到 aggregatorServer,实际上 kube-apiserver 监听端口,就是由 aggregatorServer 监听的,如果 aggregatorServer 无法处理请求就委托给 kubeAPIServer,依此类推,但是要能够实现委托必须得知道下游 server 是谁,delegationTarget 这个参数就是用来告诉 server,他的下游 server 是谁。
下面我们来看下 New 的具体逻辑
// cmd/kube-apiserver/app/server.go
...
handlerChainBuilder := func(handler http.Handler) http.Handler {
return c.BuildHandlerChainFunc(handler, c.Config)
}
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
...
handlerChainBuilder 是一个关键的点,我们在前面文章中说过的请求的认证、鉴权、审计、流控执行是在一条链上执行的,这条链的构建就是由 handlerChainBuilder 完成的,这个构建器实际上是 DefaultBuildHandlerChain,我们看下这个方法
// vendor/k8s.io/apiserver/pkg/server/config.go
func NewConfig(codecs serializer.CodecFactory) *Config {
...
return &Config{
Serializer: codecs,
BuildHandlerChainFunc: DefaultBuildHandlerChain,
...
}
// vendor/k8s.io/apiserver/pkg/server/config.go
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := filterlatency.TrackCompleted(apiHandler)
handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
handler = filterlatency.TrackStarted(handler, "authorization")
if c.FlowControl != nil {
workEstimatorCfg := flowcontrolrequest.DefaultWorkEstimatorConfig()
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(
c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg)
handler = filterlatency.TrackCompleted(handler)
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
handler = filterlatency.TrackStarted(handler, "priorityandfairness")
} else {
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
}
handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
handler = filterlatency.TrackStarted(handler, "impersonation")
handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc)
handler = filterlatency.TrackStarted(handler, "audit")
failedHandler := genericapifilters.Unauthorized(c.Serializer)
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
failedHandler = filterlatency.TrackCompleted(failedHandler)
handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
handler = filterlatency.TrackStarted(handler, "authentication")
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
// WithTimeoutForNonLongRunningRequests will call the rest of the request handling in a go-routine with the
// context with deadline. The go-routine can keep running, while the timeout logic will return a timeout to the client.
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)
handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator,
c.LongRunningFunc, c.Serializer, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
}
handler = genericapifilters.WithAuditAnnotations(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
handler = genericapifilters.WithWarningRecorder(handler)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
if c.ShutdownSendRetryAfter {
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.NotAcceptingNewRequest.Signaled())
}
handler = genericfilters.WithHTTPLogging(handler)
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
handler = genericapifilters.WithTracing(handler, c.TracerProvider)
}
handler = genericapifilters.WithLatencyTrackers(handler)
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
handler = genericapifilters.WithRequestReceivedTimestamp(handler)
handler = genericapifilters.WithMuxAndDiscoveryComplete(handler, c.lifecycleSignals.MuxAndDiscoveryComplete.Signaled())
handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver)
handler = genericapifilters.WithAuditID(handler)
return handler
}
这个模式有点像“套娃”,先创建的 handler 被后创建的 handler 包裹在里面,在执行这条链的时候,最外层的 handler 最先被执行,所以 WithAuditID(在请求头加审计ID) 第一个被执行,然后就像“剥洋葱”一样,一层层的剥去,依次执行 handler。被包裹在最里面的那层是 DefaultBuildHandlerChain 的入参 apiHandler 就是真正处理请求的 handler,下文细说。
我们再来看看 NewAPIServerHandler,这个函数非常关键,它创建了处理请求的 handler:
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
nonGoRestfulMux := mux.NewPathRecorderMux(name)
if notFoundHandler != nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}
gorestfulContainer := restful.NewContainer()
gorestfulContainer.ServeMux = http.NewServeMux()
gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})
// vendor/k8s.io/apiserver/pkg/server/handler.go
// 在后面controller中往 gorestfulContainer 加ws
director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}
return &APIServerHandler{
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}
请求先被APIServerHandler.ServeHTTP处理,他实际调用了 FullHandlerChain.ServeHTTP,FullHandlerChain 我们在前面说过就是一条链,这条链上面会执行审计、认证、鉴权、限速等操作,在链的末端才开始执行真正处理请求的 handler(handlerChainBuilder的入参),这个 handler 就是 director,也就是执行 director.ServeHTTP。我们先大致看下 director.ServeHTTP
func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
path := req.URL.Path
// check to see if our webservices want to claim this path
for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
switch {
case ws.RootPath() == "/apis":
// if we are exactly /apis or /apis/, then we need special handling in loop.
// normally these are passed to the nonGoRestfulMux, but if discovery is enabled, it will go directly.
// We can't rely on a prefix match since /apis matches everything (see the big comment on Director above)
if path == "/apis" || path == "/apis/" {
klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
// don't use servemux here because gorestful servemuxes get messed up when removing webservices
// TODO fix gorestful, remove TPRs, or stop using gorestful
// func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error
// 是在上面注释这个函数里注册的 webservice
d.goRestfulContainer.Dispatch(w, req)
return
}
case strings.HasPrefix(path, ws.RootPath()):
// ensure an exact match or a path boundary match
if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' {
klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
// don't use servemux here because gorestful servemuxes get messed up when removing webservices
// TODO fix gorestful, remove TPRs, or stop using gorestful
d.goRestfulContainer.Dispatch(w, req)
return
}
}
}
// if we didn't find a match, then we just skip gorestful altogether
klog.V(5).Infof("%v: %v %q satisfied by nonGoRestful", d.name, req.Method, path)
d.nonGoRestfulMux.ServeHTTP(w, req)
}
director 中包含两种类型路由:restful 和 nonRestful,restful类型的路由处理注册在该 server 的 api,其他 api 由 nonRestful 处理。所以请求就是按照这个路径处理的:
FullHandlerChain -> Director -> {GoRestfulContainer,NonGoRestfulMux}
我们再回到 NewAPIServerHandler 函数
nonGoRestfulMux := mux.NewPathRecorderMux(name)
if notFoundHandler != nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}
下游委托服务器(notFoundHandler)被传递给了 nonGoRestfulMux,当 nonRestful 类型路由 也无法处理请求的话,请求就会委派给 notFoundHandler :
func (m *PathRecorderMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.mux.Load().(*pathHandler).ServeHTTP(w, r)
}
// ServeHTTP makes it an http.Handler
func (h *pathHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if exactHandler, ok := h.pathToHandler[r.URL.Path]; ok {
klog.V(5).Infof("%v: %q satisfied by exact match", h.muxName, r.URL.Path)
exactHandler.ServeHTTP(w, r)
return
}
for _, prefixHandler := range h.prefixHandlers {
if strings.HasPrefix(r.URL.Path, prefixHandler.prefix) {
klog.V(5).Infof("%v: %q satisfied by prefix %v", h.muxName, r.URL.Path, prefixHandler.prefix)
prefixHandler.handler.ServeHTTP(w, r)
return
}
}
klog.V(5).Infof("%v: %q satisfied by NotFoundHandler", h.muxName, r.URL.Path)
h.notFoundHandler.ServeHTTP(w, r)
}
可以看到,在 nonRestful 类型路由处理器中,首先会对请求 url 做精确匹配,如果可以匹配上就处理,如果无法匹配就做前缀匹配,如果能够匹配也能处理,如果都不能匹配就使用 notFoundHandler 进行处理(也就是下游的委托 server)。
其中,这两种类型的路由的注册是比较复杂的,也是很关键的部分,我们在下一篇专门讲。
nonRestful 类型的路由存在于 aggregatorServer 的 genericServer 中。下图为请求流转图
我们再回到 New 函数,接下来就创建了 GenericAPIServer 类型的结构体,也就是我们的目标 genericAPIServer:
s := &GenericAPIServer{
discoveryAddresses: c.DiscoveryAddresses,
LoopbackClientConfig: c.LoopbackClientConfig,
legacyAPIGroupPrefixes: c.LegacyAPIGroupPrefixes,
admissionControl: c.AdmissionControl,
Serializer: c.Serializer,
AuditBackend: c.AuditBackend,
Authorizer: c.Authorization.Authorizer,
delegationTarget: delegationTarget,
EquivalentResourceRegistry: c.EquivalentResourceRegistry,
HandlerChainWaitGroup: c.HandlerChainWaitGroup,
Handler: apiServerHandler,
listedPathProvider: apiServerHandler,
minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
ShutdownTimeout: c.RequestTimeout,
ShutdownDelayDuration: c.ShutdownDelayDuration,
SecureServingInfo: c.SecureServing,
ExternalAddress: c.ExternalAddress,
...
}
主要做的事情,就是将前面初始化的配置赋值到这个结构体里面,如审计、认证、鉴权、流控、handler。
再往后主要就是启动流控、启动 informer等,这些不是关键流程,我们就不一一分析了。