kubeapiserver源码剖析与开发(二):KubeServer初始化之GenericConfig创建(一)

在上一篇文章我们讲了,kube-apiserver 是由 AggregatorServer、KubeAPIServer、APIExtensionsServer 三者按照顺序级联而成,如果当前的 Server 处理不了请求就委派到下一级,直到最后一个 Server 也处理不了的话就返回 NotFound。这三个 Server 在初始化的过程中,最核心的流程就是注册路由和对应请求处理方法构造,简单来说就是一个请求来了如何去匹配请求的 url,匹配中了以后该如何处理这个请求。因为这三个 Server 的初始化过程都很类似,本文就分析 KubeAPIServer 的初始化,并且会着重分析路由的注册流程。

下图是 KubeServer 初始化主要做的事情(实际上另外两个 Server 初始化流程基本和这个一致)

pCugZkR.png

下面我们就详细说说这几个初始化阶段做的具体的事情

创建 kubeAPIServerConfig

这步主要做了 kube-apiserver 运行时配置的初始化,这三个 Server 都会使用这同一份配置。我们从源码看看具体做了什么

// cmd/kube-apiserver/app/server.go

func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatorapiserver.APIAggregator, error) {
    kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
    if err != nil {
        return nil, err
    }

    ...
}

func CreateKubeAPIServerConfig(s completedServerRunOptions) (
    *controlplane.Config,
    aggregatorapiserver.ServiceResolver,
    []admission.PluginInitializer,
    error,
) {
    proxyTransport := CreateProxyTransport()

    genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)

    config := &controlplane.Config{
        GenericConfig: genericConfig,
        ExtraConfig: controlplane.ExtraConfig{
            ...
        },
    }
    ...
    return config, serviceResolver, pluginInitializers, nil
}

kubeAPIServerConfig 的类型是 *controlplane.Config

type Config struct {
    GenericConfig *genericapiserver.Config
    ExtraConfig   ExtraConfig
}

其中 GenericConfig 就是 kubeAPIServerConfig 的主要配置,我们看下 GenericConfig 的创建

func buildGenericConfig(
    s *options.ServerRunOptions,
    proxyTransport *http.Transport,
) (
    genericConfig *genericapiserver.Config,
    versionedInformers clientgoinformers.SharedInformerFactory,
    serviceResolver aggregatorapiserver.ServiceResolver,
    pluginInitializers []admission.PluginInitializer,
    admissionPostStartHook genericapiserver.PostStartHookFunc,
    storageFactory *serverstorage.DefaultStorageFactory,
    lastErr error,
) {

    genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
    ...

    if lastErr = s.SecureServing.ApplyTo(&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); lastErr != nil {
        return
    }

    // etcd配置初始化
    storageFactory, lastErr = completedStorageFactoryConfig.New()
    if lastErr != nil {
        return
    }
    if genericConfig.EgressSelector != nil {
        storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
    }
    if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) && genericConfig.TracerProvider != nil {
        storageFactory.StorageConfig.Transport.TracerProvider = genericConfig.TracerProvider
    }
    if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
        return
    }

    // 鉴权初始化
    if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, genericConfig.OpenAPIV3Config, clientgoExternalClient, versionedInformers); lastErr != nil {
        return
    }
    genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)

    // 审计初始化
    lastErr = s.Audit.ApplyTo(genericConfig)
    if lastErr != nil {
        return
    }

    // 准入初始化
    err = s.Admission.ApplyTo(
    genericConfig,
    versionedInformers,
    kubeClientConfig,
    utilfeature.DefaultFeatureGate,
    pluginInitializers...)
    if err != nil {
        lastErr = fmt.Errorf("failed to initialize admission: %v", err)
        return
    }

    // 流控初始化
    if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnablePriorityAndFairness {
    genericConfig.FlowControl, lastErr = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers)
    }
}

genericConfig 创建和初始化主要做了etcd配置初始化,认证初始化,鉴权初始化,审计初始化,准入初始化,流控初始化。这些都是关键步骤,我们一个个仔细说。

etcd配置初始化

首先,创建了一个 storageFactory ,它是一个 *serverstorage.DefaultStorageFactory 类型的实例,这个类型就是存储etcd配置的,我们看下这个类型的定义

type DefaultStorageFactory struct {
    StorageConfig storagebackend.Config
    Overrides map[schema.GroupResource]groupResourceOverrides
    DefaultResourcePrefixes map[schema.GroupResource]string
    DefaultMediaType string
    DefaultSerializer runtime.StorageSerializer
    ResourceEncodingConfig ResourceEncodingConfig
    APIResourceConfigSource APIResourceConfigSource
    newStorageCodecFn func(opts StorageCodecConfig) (codec runtime.Codec, encodeVersioner runtime.GroupVersioner, err error)
}

成员 StorageConfig 最终用来存放 etcd 配置,包括了 etcd 使用地址、CA证书、私钥、服务端证书等配置,这些配置都是通过命令行参数传递解析出来的。

type TransportConfig struct {
    ServerList []string
    KeyFile       string
    CertFile      string
    TrustedCAFile string
    EgressLookup egressselector.Lookup
    TracerProvider *trace.TracerProvider
}

type Config struct {
    Type string
    Prefix string
    Transport TransportConfig
    Paging bool
    Codec runtime.Codec
    EncodeVersioner runtime.GroupVersioner
    Transformer value.Transformer
    CompactionInterval time.Duration
    CountMetricPollPeriod time.Duration
    DBMetricPollInterval time.Duration
    HealthcheckTimeout time.Duration
    LeaseManagerConfig etcd3.LeaseManagerConfig
    StorageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker
}

我们再来看下 DefaultStorageFactory 的另外一个成员 Overrides,这个成员的作用我们可能平时一般不会用到,但是对于那些大集群来说,却是一个很好的优化手段,他的作用是将不同的资源存储在不同的 etcd 里面,将压力分摊到多个集群。一般如果是大集群,我们会将 event 这种非关键路径资源但是量又很大的资源单独拆分出来存储在单独的集群,此时我们在命令行参数加上下面参数就可以达到这种目的

--etcd-servers-overrides="/events#http://host1:2379,http://host2:2379,http://host3:2379"

我们看下 storageFactory 创建函数

// New returns a new storage factory created from the completed storage factory configuration.
func (c *completedStorageFactoryConfig) New() (*serverstorage.DefaultStorageFactory, error) {
    resourceEncodingConfig := resourceconfig.MergeResourceEncodingConfigs(c.DefaultResourceEncoding, c.ResourceEncodingOverrides)
    storageFactory := serverstorage.NewDefaultStorageFactory(
        c.StorageConfig,
        c.DefaultStorageMediaType,
        c.Serializer,
        resourceEncodingConfig,
        c.APIResourceConfig,
        SpecialDefaultResourcePrefixes)
    ....
    for _, override := range c.EtcdServersOverrides {
        tokens := strings.Split(override, "#")
        apiresource := strings.Split(tokens[0], "/")
        group := apiresource[0]
        resource := apiresource[1]
        groupResource := schema.GroupResource{Group: group, Resource: resource}

        servers := strings.Split(tokens[1], ";")
        storageFactory.SetEtcdLocation(groupResource, servers)
    }
    ...
    return storageFactory, nil
}

从上面的代码我们能看到,storageFactory 实例包含了包含了 etcd配置、序列化器等;在该函数中还解析了上面说的 --etcd-servers-overrides 命令行参数传递的值,对解析出来的资源配置参数中指定的 etcd 地址。

创建完 storageFactory 后,就要把它跟 genericConfig 关联起来了

if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
    return
}

func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
    if err := s.addEtcdHealthEndpoint(c); err != nil {
        return err
    }

    // use the StorageObjectCountTracker interface instance from server.Config
    s.StorageConfig.StorageObjectCountTracker = c.StorageObjectCountTracker

    c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
    return nil
}

etcd配置初始化最终的目的就是给 genericConfig 的 RESTOptionsGetter 赋值,RESTOptionsGetter 是一个接口

type RESTOptionsGetter interface {
    GetRESTOptions(resource schema.GroupResource) (RESTOptions, error)
}

而在上面的代码中,RESTOptionsGetter 被赋予了 StorageFactoryRestOptionsFactory 类型的值,StorageFactoryRestOptionsFactory 实现了 RESTOptionsGetter 接口。

RESTOptionsGetter 是 genericConfig 一个非常关键的成员,InstallLegacyAPI 和 InstallLegacyAPI 步骤中频繁被调用。它的作用是什么呢?

对于每个 API 资源(如 Pod、Node、Deployment等),在 InstallLegacyAPI 或 InstallLegacyAPI 时会被安装路由和映射对应的处理函数,如执行 kubectl get po,就需要找到资源是 Pod,方法是 Get 的请求处理函数,这个函数就是在 InstallLegacyAPI 或 InstallLegacyAPI 的时候配置的,而这个处理函数最后都会落到对 etcd 的操作上。

在给资源创建路由和请求处理函数的时候,genericConfig.RESTOptionsGetter.GetRESTOptions 会被调用,因为 RESTOptionsGetter 的类型是 StorageFactoryRestOptionsFactory ,这个结构体实现了 RESTOptionsGetter 接口,StorageFactoryRestOptionsFactory 的方法 GetRESTOptions 如下:

func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
    storageConfig, err := f.StorageFactory.NewConfig(resource)
    if err != nil {
        return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
    }

    ret := generic.RESTOptions{
        StorageConfig:             storageConfig,
        Decorator:                 generic.UndecoratedStorage,
        DeleteCollectionWorkers:   f.Options.DeleteCollectionWorkers,
        EnableGarbageCollection:   f.Options.EnableGarbageCollection,
        ResourcePrefix:            f.StorageFactory.ResourcePrefix(resource),
        CountMetricPollPeriod:     f.Options.StorageConfig.CountMetricPollPeriod,
        StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker,
    }
    ...
    return ret, nil
}

GetRESTOptions 方法返回一个 RESTOptions 类型的实例,下面是该类型定义

type RESTOptions struct {
    StorageConfig *storagebackend.ConfigForResource
    Decorator     StorageDecorator

    EnableGarbageCollection   bool
    DeleteCollectionWorkers   int
    ResourcePrefix            string
    CountMetricPollPeriod     time.Duration
    StorageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker
}

这个类型的成员 StorageConfig 是使用 storageFactory.NewConfig 创建的,StorageConfig 里面就包含了 etcd 的配置和资源信息,StorageConfig 最后会用来初始化每个资源的 genericregistry.Store 结构,而该结构实现了 Get/Create/Update等方法,用户发起请求的时候,就会调用对应的方法,而这个方法最后就会调用 etcd 接口,这里的 etcd 信息就存在 genericregistry.Store.Storage 里面,而 genericregistry.Store.Storage 就是用 RESTOptions.StorageConfig 初始化的。

认证初始化

kube-apiserver 支持多重不同的认证方式

  • Anonymous
  • BootstrapToken
  • ClientCert
  • OIDC
  • RequestHeader
  • ServiceAccount
  • TokenFile
  • WebHook

初始化认证主要就是初始化 genericConfig.Authentication 字段,该字段定义如下

type AuthenticationInfo struct {
    // APIAudiences is a list of identifier that the API identifies as. This is
    // used by some authenticators to validate audience bound credentials.
    APIAudiences authenticator.Audiences
    // Authenticator determines which subject is making the request
    Authenticator authenticator.Request
}

Authenticator 这个字段表示的就是一个认证器。所有请求在被 Kube-apiserver 处理之前,都需要经过 DefaultBuildHandlerChain 这条链的处理后,才能被真正的落实到对etcd的调用, Authenticator 这个字段表示的就是一个认证器。所有请求在被 Kube-apiserver 处理之前,都需要经过 DefaultBuildHandlerChain 这条链的处理后,才能被真正的落实到对etcd的调用,

func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
    ...
    failedHandler = filterlatency.TrackCompleted(failedHandler)
    handler = filterlatency.TrackCompleted(handler)
    handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
    handler = filterlatency.TrackStarted(handler, "authentication")
    ...
    return handler
}

func WithAuthentication(handler http.Handler, auth authenticator.Request, failed http.Handler, apiAuds authenticator.Audiences) http.Handler {
    return withAuthentication(handler, auth, failed, apiAuds, recordAuthMetrics)
}

func withAuthentication(handler http.Handler, auth authenticator.Request, failed http.Handler, apiAuds authenticator.Audiences, metrics recordMetrics) http.Handler {
    if auth == nil {
        klog.Warning("Authentication is disabled")
        return handler
    }
    return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
        authenticationStart := time.Now()

        if len(apiAuds) > 0 {
            req = req.WithContext(authenticator.WithAudiences(req.Context(), apiAuds))
        }
        resp, ok, err := auth.AuthenticateRequest(req)
        ...
    })
}

可以看到,认证这一步在 DefaultBuildHandlerChain 这一处理链路中在对对请求进行认证的时候,最终就会调用到 genericConfig.Authentication.Authenticator.AuthenticateRequest(req)

那我们就看下 Authenticator 是怎么初始化的

pkg/kubeapiserver/options/authentication.go

func (o *BuiltInAuthenticationOptions) ApplyTo(...) error {
    ...
    authInfo.Authenticator, openAPIConfig.SecurityDefinitions, err = authenticatorConfig.New()
    ...
}

// pkg/kubeapiserver/authenticator/config.go

func (config Config) New() (authenticator.Request, *spec.SecurityDefinitions, error) {
    var authenticators []authenticator.Request
    var tokenAuthenticators []authenticator.Token
    securityDefinitions := spec.SecurityDefinitions{}

    // front-proxy, BasicAuth methods, local first, then remote
    // Add the front proxy authenticator if requested
    if config.RequestHeaderConfig != nil {
        requestHeaderAuthenticator := headerrequest.NewDynamicVerifyOptionsSecure(
            config.RequestHeaderConfig.CAContentProvider.VerifyOptions,
            config.RequestHeaderConfig.AllowedClientNames,
            config.RequestHeaderConfig.UsernameHeaders,
            config.RequestHeaderConfig.GroupHeaders,
            config.RequestHeaderConfig.ExtraHeaderPrefixes,
        )
        authenticators = append(authenticators, authenticator.WrapAudienceAgnosticRequest(config.APIAudiences, requestHeaderAuthenticator))
    }

    // X509 methods
    if config.ClientCAContentProvider != nil {
        certAuth := x509.NewDynamic(config.ClientCAContentProvider.VerifyOptions, x509.CommonNameUserConversion)
        authenticators = append(authenticators, certAuth)
    }

    // Bearer token methods, local first, then remote
    if len(config.TokenAuthFile) > 0 {
        tokenAuth, err := newAuthenticatorFromTokenFile(config.TokenAuthFile)
        if err != nil {
            return nil, nil, err
        }
        tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, tokenAuth))
    }
    if len(config.ServiceAccountKeyFiles) > 0 {
        serviceAccountAuth, err := newLegacyServiceAccountAuthenticator(config.ServiceAccountKeyFiles, config.ServiceAccountLookup, config.APIAudiences, config.ServiceAccountTokenGetter)
        if err != nil {
            return nil, nil, err
        }
        tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth)
    }
    if len(config.ServiceAccountIssuers) > 0 {
        serviceAccountAuth, err := newServiceAccountAuthenticator(config.ServiceAccountIssuers, config.ServiceAccountKeyFiles, config.APIAudiences, config.ServiceAccountTokenGetter)
        if err != nil {
            return nil, nil, err
        }
        tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth)
    }
    if config.BootstrapToken {
        if config.BootstrapTokenAuthenticator != nil {
            // TODO: This can sometimes be nil because of
            tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, config.BootstrapTokenAuthenticator))
        }
    }

    ...

    if len(config.WebhookTokenAuthnConfigFile) > 0 {
        webhookTokenAuth, err := newWebhookTokenAuthenticator(config)
        if err != nil {
            return nil, nil, err
        }

        tokenAuthenticators = append(tokenAuthenticators, webhookTokenAuth)
    }

    if len(tokenAuthenticators) > 0 {
        // Union the token authenticators
        tokenAuth := tokenunion.New(tokenAuthenticators...)
        // Optionally cache authentication results
        if config.TokenSuccessCacheTTL > 0 || config.TokenFailureCacheTTL > 0 {
            tokenAuth = tokencache.New(tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL)
        }
        authenticators = append(authenticators, bearertoken.New(tokenAuth), websocket.NewProtocolAuthenticator(tokenAuth))
        securityDefinitions["BearerToken"] = &spec.SecurityScheme{
            SecuritySchemeProps: spec.SecuritySchemeProps{
                Type:        "apiKey",
                Name:        "authorization",
                In:          "header",
                Description: "Bearer Token authentication",
            },
        }
    }

    if len(authenticators) == 0 {
        if config.Anonymous {
            return anonymous.NewAuthenticator(), &securityDefinitions, nil
        }
        return nil, &securityDefinitions, nil
    }

    authenticator := union.New(authenticators...)

    authenticator = group.NewAuthenticatedGroupAdder(authenticator)

    if config.Anonymous {
        // If the authenticator chain returns an error, return an error (don't consider a bad bearer token
        // or invalid username/password combination anonymous).
        authenticator = union.NewFailOnError(authenticator, anonymous.NewAuthenticator())
    }

    return authenticator, &securityDefinitions, nil
}

在 New 函数里面对我们上面提到的那几种认证方法都创建了认证器,被放在 authenticators 数组中,然后这个认证器数组被用来创建 unionAuthRequestHandler 类型的实例

// 上面 New 函数的一行
authenticator := union.New(authenticators...)

type unionAuthRequestHandler struct {
    // Handlers is a chain of request authenticators to delegate to
    Handlers []authenticator.Request
    // FailOnError determines whether an error returns short-circuits the chain
    FailOnError bool
}

func New(authRequestHandlers ...authenticator.Request) authenticator.Request {
    if len(authRequestHandlers) == 1 {
        return authRequestHandlers[0]
    }
    return &unionAuthRequestHandler{Handlers: authRequestHandlers, FailOnError: false}
}

当一个请求进来后,进行到认证这步的时候,就会遍历 unionAuthRequestHandler 的 Handlers 数组,也就是把系统支持的认证方法(上面创建的认证器)拿出来认证一遍直到有一个认证通过就停止,或者所有认证都没通过,然后返回到 DefaultBuildHandlerChain 这个函数中,如果是认证通过那么请求会被继续处理,否则返回认证失败给用户。

func (authHandler *unionAuthRequestHandler) AuthenticateRequest(req *http.Request) (*authenticator.Response, bool, error) {
    var errlist []error
    for _, currAuthRequestHandler := range authHandler.Handlers {
        resp, ok, err := currAuthRequestHandler.AuthenticateRequest(req)
        if err != nil {
            if authHandler.FailOnError {
                return resp, ok, err
            }
            errlist = append(errlist, err)
            continue
        }

        if ok {
            return resp, ok, err
        }
    }

    return nil, false, utilerrors.NewAggregate(errlist)
}

鉴权初始化

认证过后,系统就认为请求发起者是系统的合法用户,接着就是要确认该请求者是不是有权限访问目的资源,也就是鉴权。

需要开启那些鉴权方式,是由 kube-apiserver 启动参数指定的,如下面就开启了 Node 和 RBAC 鉴权方式

--authorization-mode=Node,RBAC

在鉴权初始化的时候,就会根据传入的参数创建对应的鉴权器

genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
if err != nil {
    lastErr = fmt.Errorf("invalid authorization config: %v", err)
    return
}
if !sets.NewString(s.Authorization.Modes...).Has(modes.ModeRBAC) {
    genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)
}

如下就是创建鉴权器的步骤,因为篇幅只列出了 Node 和 RBAC 的鉴权器创建

func (config Config) New() (authorizer.Authorizer, authorizer.RuleResolver, error) {
    if len(config.AuthorizationModes) == 0 {
        return nil, nil, fmt.Errorf("at least one authorization mode must be passed")
    }

    var (
        authorizers   []authorizer.Authorizer
        ruleResolvers []authorizer.RuleResolver
    )

    for _, authorizationMode := range config.AuthorizationModes {
        // Keep cases in sync with constant list in k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes/modes.go.
        switch authorizationMode {
        case modes.ModeNode:
            node.RegisterMetrics()
            graph := node.NewGraph()
            node.AddGraphEventHandlers(
                graph,
                config.VersionedInformerFactory.Core().V1().Nodes(),
                config.VersionedInformerFactory.Core().V1().Pods(),
                config.VersionedInformerFactory.Core().V1().PersistentVolumes(),
                config.VersionedInformerFactory.Storage().V1().VolumeAttachments(),
            )
            nodeAuthorizer := node.NewAuthorizer(graph, nodeidentifier.NewDefaultNodeIdentifier(), bootstrappolicy.NodeRules())
            authorizers = append(authorizers, nodeAuthorizer)
            ruleResolvers = append(ruleResolvers, nodeAuthorizer)

        ....

        case modes.ModeRBAC:
            rbacAuthorizer := rbac.New(
                &rbac.RoleGetter{Lister: config.VersionedInformerFactory.Rbac().V1().Roles().Lister()},
                &rbac.RoleBindingLister{Lister: config.VersionedInformerFactory.Rbac().V1().RoleBindings().Lister()},
                &rbac.ClusterRoleGetter{Lister: config.VersionedInformerFactory.Rbac().V1().ClusterRoles().Lister()},
                &rbac.ClusterRoleBindingLister{Lister: config.VersionedInformerFactory.Rbac().V1().ClusterRoleBindings().Lister()},
            )
            authorizers = append(authorizers, rbacAuthorizer)
            ruleResolvers = append(ruleResolvers, rbacAuthorizer)
        default:
            return nil, nil, fmt.Errorf("unknown authorization mode %s specified", authorizationMode)
        }
    }

    return union.New(authorizers...), union.NewRuleResolvers(ruleResolvers...), nil
}

因为我们平时使用的最多的就是 RBAC 鉴权方式,我们来看看该鉴权器的创建


type RBACAuthorizer struct {
    authorizationRuleResolver RequestToRuleMapper
}

func New(roles rbacregistryvalidation.RoleGetter, roleBindings rbacregistryvalidation.RoleBindingLister, clusterRoles rbacregistryvalidation.ClusterRoleGetter, clusterRoleBindings rbacregistryvalidation.ClusterRoleBindingLister) *RBACAuthorizer {
    authorizer := &RBACAuthorizer{
        authorizationRuleResolver: rbacregistryvalidation.NewDefaultRuleResolver(
            roles, roleBindings, clusterRoles, clusterRoleBindings,
        ),
    }
    return authorizer
}

func (r *RBACAuthorizer) Authorize(ctx context.Context, requestAttributes authorizer.Attributes) (authorizer.Decision, string, error) {
    ruleCheckingVisitor := &authorizingVisitor{requestAttributes: requestAttributes}

    r.authorizationRuleResolver.VisitRulesFor(requestAttributes.GetUser(), requestAttributes.GetNamespace(), ruleCheckingVisitor.visit)
    if ruleCheckingVisitor.allowed {
        return authorizer.DecisionAllow, ruleCheckingVisitor.reason, nil
    }

    ...
    return union.New(authorizers...), union.NewRuleResolvers(ruleResolvers...), nil
}

RBAC 鉴权器是一个 RBACAuthorizer 结构,该接口实现了 Authorize 方法。

type Authorizer interface {
    Authorize(ctx context.Context, a Attributes) (authorized Decision, reason string, err error)
}

跟认证一样,当一个请求进来后会经历 DefaultBuildHandlerChain 链式处理,鉴权也在这个链里面。可以看到在 DefaultBuildHandlerChain 中调用了 genericConfig 中的 Authorize,最后就会调用到我们前面创建的认证器

func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
    handler := filterlatency.TrackCompleted(apiHandler)
    handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
    handler = filterlatency.TrackStarted(handler, "authorization")
}
func WithAuthorization(handler http.Handler, a authorizer.Authorizer, s runtime.NegotiatedSerializer) http.Handler {
    ...
    return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
        ctx := req.Context()

        attributes, err := GetAuthorizerAttributes(ctx)
        if err != nil {
            responsewriters.InternalError(w, req, err)
            return
        }
        authorized, reason, err := a.Authorize(ctx, attributes)
        ...
}

genericConfig.Authorization.Authorizer 实际上是一个 unionAuthzHandler 类型

// vendor/k8s.io/apiserver/pkg/authorization/union/union.go

type unionAuthzHandler []authorizer.Authorizer

该类型实现了 Authorize 接口,DefaultBuildHandlerChain 中调用 genericConfig.Authorization.Authorizer 的 Authorize 方法,实际上就是调用到了 unionAuthzHandler 的 Authorize

// vendor/k8s.io/apiserver/pkg/authorization/union/union.go

func (authzHandler unionAuthzHandler) Authorize(ctx context.Context, a authorizer.Attributes) (authorizer.Decision, string, error) {
    var (
        errlist    []error
        reasonlist []string
    )

    for _, currAuthzHandler := range authzHandler {
        decision, reason, err := currAuthzHandler.Authorize(ctx, a)

        if err != nil {
            errlist = append(errlist, err)
        }
        if len(reason) != 0 {
            reasonlist = append(reasonlist, reason)
        }
        switch decision {
        case authorizer.DecisionAllow, authorizer.DecisionDeny:
            return decision, reason, err
        case authorizer.DecisionNoOpinion:
            // continue to the next authorizer
        }
    }

    return authorizer.DecisionNoOpinion, strings.Join(reasonlist, "n"), utilerrors.NewAggregate(errlist)
}

跟认证也一样,unionAuthzHandler.Authorize 也会遍历初始化过程中创建的所有鉴权器件,直到出现以下情况就返回:

  • 通过
  • 拒绝
  • 所有鉴权器都已经遍历了,没找到匹配的鉴权器

如果认证通过了,那么 DefaultBuildHandlerChain 链上的其余函数才会继续被执行。

由于篇幅原因,剩余的审计初始化、准入初始化、流控初始化我们下一篇讲。