kube-apiserver 启动流程分析

kubernetes 版本:v1.16

首先分析 kube-apiserver 的启动方式,kube-apiserver 也是通过其 Run 方法启动主逻辑的,在Run 方法调用之前会进行解析命令行参数、设置默认值等。

启动流程

资源注册

Cobra命令行参数解析

创建APIServer通用配置

创建APIExtensionServer

创建KubeAPIServer

创建AggregatorServer

创建GenericAPIServer

启动HTTP服务

启动HTTPS服务

权限控制

认证

BasicAuth认证

ClientCA认证

TokenAuth认证

Bootstrap Token认证

RequestHeader认证

Webhook TokenAuth认证

Anonymous 认证

OIDC 认证

ServiceAccountAuth 认证

授权

AlwaysAllow 授权

AlwaysDeny 授权

ABAC 授权

Webhook 授权

RBAC 授权

Node 授权

准入控制器

AlwaysPullImages 准入控制器

PodNodeSelector 准入控制器

进程信号处理机制

常驻进程实现

进程的优雅关闭

向 systemd 报告进程状态


Run

Run 方法的主要逻辑为:

  • 1、调用 CreateServerChain 构建服务调用链并判断是否启动非安全的 http server,http server 链中包含 apiserver 要启动的三个 server,以及为每个 server 注册对应资源的路由;
  • 2、调用 server.PrepareRun 进行服务运行前的准备,该方法主要完成了健康检查、存活检查和OpenAPI路由的注册工作;
  • 3、调用 prepared.Run 启动 https server;

server 的初始化使用委托模式,通过 DelegationTarget 接口,把基本的 API Server、CustomResource、Aggregator 这三种服务采用链式结构串联起来,对外提供服务。

k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:147
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
    server, err := CreateServerChain(completeOptions, stopCh)
    if err != nil {
        return err
    }

    prepared, err := server.PrepareRun()
    if err != nil {
        return err
    }

    return prepared.Run(stopCh)
}

CreateServerChain

CreateServerChain 是完成 server 初始化的方法,里面包含 APIExtensionsServerKubeAPIServerAggregatorServer 初始化的所有流程,最终返回 aggregatorapiserver.APIAggregator 实例,初始化流程主要有:http filter chain 的配置、API Group 的注册、http path 与 handler 的关联以及 handler 后端存储 etcd 的配置。其主要逻辑为:

  • 1、调用 CreateKubeAPIServerConfig 创建 KubeAPIServer 所需要的配置,主要是创建 master.Config,其中会调用 buildGenericConfig 生成 genericConfig,genericConfig 中包含 apiserver 的核心配置;
  • 2、判断是否启用了扩展的 API server 并调用 createAPIExtensionsConfig 为其创建配置,apiExtensions server 是一个代理服务,用于代理 kubeapiserver 中的其他 server,比如 metric-server;
  • 3、调用 createAPIExtensionsServer 创建 apiExtensionsServer 实例;
  • 4、调用 CreateKubeAPIServer初始化 kubeAPIServer;
  • 5、调用 createAggregatorConfig 为 aggregatorServer 创建配置并调用 createAggregatorServer 初始化 aggregatorServer;
  • 6、配置并判断是否启动非安全的 http server;
k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:165
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
    nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
    if err != nil {
        return nil, err
    }
    // 1、为 kubeAPIServer 创建配置
    kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, admissionPostStartHook, err :=                                         CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
    if err != nil {
        return nil, err
    }

    // 2、判断是否配置了 APIExtensionsServer,创建 apiExtensionsConfig 
    apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers,        pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
        serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
    if err != nil {
        return nil, err
    }
    
    // 3、初始化 APIExtensionsServer
    apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
    if err != nil {
        return nil, err
    }

    // 4、初始化 KubeAPIServer
    kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, admissionPostStartHook)
    if err != nil {
        return nil, err
    }
    
    // 5、创建 AggregatorConfig
    aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.          ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
    if err != nil {
        return nil, err
    }
    
    // 6、初始化 AggregatorServer
    aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
    if err != nil {
        return nil, err
    }

    // 7、判断是否启动非安全端口的 http server
    if insecureServingInfo != nil {
        insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
        if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
            return nil, err
        }
    }
    return aggregatorServer, nil
}
CreateKubeAPIServerConfig

CreateKubeAPIServerConfig 中主要是调用 buildGenericConfig 创建 genericConfig 以及构建 master.Config 对象。

k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:271
func CreateKubeAPIServerConfig(
    s completedServerRunOptions,
    nodeTunneler tunneler.Tunneler,
    proxyTransport *http.Transport,
) (......) {

    // 1、构建 genericConfig
    genericConfig, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory,    lastErr = buildGenericConfig(s.ServerRunOptions, proxyTransport)
    if lastErr != nil {
        return
    }

    ......

    // 2、初始化所支持的 capabilities
    capabilities.Initialize(capabilities.Capabilities{
        AllowPrivileged: s.AllowPrivileged,
        PrivilegedSources: capabilities.PrivilegedSources{
            HostNetworkSources: []string{},
            HostPIDSources:     []string{},
            HostIPCSources:     []string{},
        },
        PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec,
    })

    // 3、获取 service ip range 以及 api server service IP
    serviceIPRange, apiServerServiceIP, lastErr := master.DefaultServiceIPRange(s.PrimaryServiceClusterIPRange)
    if lastErr != nil {
        return
    }

    ......

    // 4、构建 master.Config 对象
    config = &master.Config{......}
    
    if nodeTunneler != nil {
        config.ExtraConfig.KubeletClientConfig.Dial = nodeTunneler.Dial
    }
    if config.GenericConfig.EgressSelector != nil {
        config.ExtraConfig.KubeletClientConfig.Lookup = config.GenericConfig.EgressSelector.Lookup
    }

    return
}
buildGenericConfig

主要逻辑为:

  • 1、调用 genericapiserver.NewConfig 生成默认的 genericConfig,genericConfig 中主要配置了 DefaultBuildHandlerChainDefaultBuildHandlerChain 中包含了认证、鉴权等一系列 http filter chain;
  • 2、调用 master.DefaultAPIResourceConfigSource 加载需要启用的 API Resource,集群中所有的 API Resource 可以在代码的 k8s.io/api 目录中看到,随着版本的迭代也会不断变化;
  • 3、为 genericConfig 中的部分字段设置默认值;
  • 4、调用 completedStorageFactoryConfig.New 创建 storageFactory,后面会使用 storageFactory 为每种API Resource 创建对应的 RESTStorage;
k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:386
func buildGenericConfig(
    s *options.ServerRunOptions,
    proxyTransport *http.Transport,
) (......) {
    // 1、为 genericConfig 设置默认值
    genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
    genericConfig.MergedResourceConfig = master.DefaultAPIResourceConfigSource()

    if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
        return
    }
    ......

    genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(......)
    genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
    genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
        sets.NewString("watch", "proxy"),
        sets.NewString("attach", "exec", "proxy", "log", "portforward"),
    )

    kubeVersion := version.Get()
    genericConfig.Version = &kubeVersion

    storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
    storageFactoryConfig.ApiResourceConfig = genericConfig.MergedResourceConfig
    completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
    if err != nil {
        lastErr = err
        return
    }
    // 初始化 storageFactory
    storageFactory, lastErr = completedStorageFactoryConfig.New()
    if lastErr != nil {
        return
    }
    if genericConfig.EgressSelector != nil {
        storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
    }
    
    // 2、初始化 RESTOptionsGetter,后期根据其获取操作 Etcd 的句柄,同时添加 etcd 的健康检查方法
    if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
        return
    }

    // 3、设置使用 protobufs 用来内部交互,并且禁用压缩功能
    genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
    
    genericConfig.LoopbackClientConfig.DisableCompression = true
		
    // 4、创建 clientset
    kubeClientConfig := genericConfig.LoopbackClientConfig
    clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
    if err != nil {
        lastErr = fmt.Errorf("failed to create real external clientset: %v", err)
        return
    }
    versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)

    // 5、创建认证实例,支持多种认证方式:请求 Header 认证、Auth 文件认证、CA 证书认证、Bearer token 认证、
    // ServiceAccount 认证、BootstrapToken 认证、WebhookToken 认证等
    genericConfig.Authentication.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s,                 clientgoExternalClient, versionedInformers)
    if err != nil {
        lastErr = fmt.Errorf("invalid authentication config: %v", err)
        return
    }

    // 6、创建鉴权实例,包含:Node、RBAC、Webhook、ABAC、AlwaysAllow、AlwaysDeny
    genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, versionedInformers)
    ......
		
    serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)

    authInfoResolverWrapper := webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, genericConfig.LoopbackClientConfig)

    // 7、审计插件的初始化
    lastErr = s.Audit.ApplyTo(......)
    if lastErr != nil {
        return
    }

    // 8、准入插件的初始化
    pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, serviceResolver)
    if err != nil {
        lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)
        return
    }
    err = s.Admission.ApplyTo(......)
    if err != nil {
        lastErr = fmt.Errorf("failed to initialize admission: %v", err)
    }

    return
}

以上主要分析 KubeAPIServerConfig 的初始化,其他两个 server config 的初始化暂且不详细分析,下面接着继续分析 server 的初始化。

createAPIExtensionsServer

APIExtensionsServer 是最先被初始化的,在 createAPIExtensionsServer 中调用 apiextensionsConfig.Complete().New 来完成 server 的初始化,其主要逻辑为:

  • 1、首先调用 c.GenericConfig.New 按照go-restful的模式初始化 Container,在 c.GenericConfig.New 中会调用 NewAPIServerHandler 初始化 handler,APIServerHandler 包含了 API Server 使用的多种http.Handler 类型,包括 go-restful 以及 non-go-restful,以及在以上两者之间选择的 Director 对象,go-restful 用于处理已经注册的 handler,non-go-restful 用来处理不存在的 handler,API URI 处理的选择过程为:FullHandlerChain-> Director ->{GoRestfulContainer, NonGoRestfulMux}。在 c.GenericConfig.New 中还会调用 installAPI来添加包括 //debug/*/metrics/version 等路由信息。三种 server 在初始化时首先都会调用 c.GenericConfig.New 来初始化一个 genericServer,然后进行 API 的注册;
  • 2、调用 s.GenericAPIServer.InstallAPIGroup 在路由中注册 API Resources,此方法的调用链非常深,主要是为了将需要暴露的 API Resource 注册到 server 中,以便能通过 http 接口进行 resource 的 REST 操作,其他几种 server 在初始化时也都会执行对应的 InstallAPI
  • 3、初始化 server 中需要使用的 controller,主要有 openapiControllercrdControllernamingControllerestablishingControllernonStructuralSchemaControllerapiApprovalControllerfinalizingController;
  • 4、将需要启动的 controller 以及 informer 添加到 PostStartHook 中;
k8s.io/kubernetes/cmd/kube-apiserver/app/apiextensions.go:94
func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*  apiextensionsapiserver.CustomResourceDefinitions, error) {
    return apiextensionsConfig.Complete().New(delegateAPIServer)
}
k8s.io/kubernetes/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go:132
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
    // 1、初始化 genericServer
    genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
    if err != nil {
        return nil, err
    }

    s := &CustomResourceDefinitions{
        GenericAPIServer: genericServer,
    }

    // 2、初始化 APIGroup Info,APIGroup 指该 server 需要暴露的 API
    apiResourceConfig := c.GenericConfig.MergedResourceConfig
    apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
    if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) {
        storage := map[string]rest.Storage{}
        customResourceDefintionStorage := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
        storage["customresourcedefinitions"] = customResourceDefintionStorage
        storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefintionStorage)

        apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = storage
    }
    if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) {
        ......
    }

    // 3、注册 APIGroup
    if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
        return nil, err
    }

    // 4、初始化需要使用的 controller
    crdClient, err := internalclientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
    if err != nil {
        return nil, fmt.Errorf("failed to create clientset: %v", err)
    }
    s.Informers = internalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)
		
    ......
    establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().InternalVersion().                    CustomResourceDefinitions(), crdClient.Apiextensions())
    crdHandler, err := NewCustomResourceDefinitionHandler(......)
    if err != nil {
        return nil, err
    }
    s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
    s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)

    crdController := NewDiscoveryController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),                 versionDiscoveryHandler, groupDiscoveryHandler)
    namingController := status.NewNamingConditionController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
    nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().InternalVersion().         CustomResourceDefinitions(), crdClient.Apiextensions())
    apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().      InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
    finalizingController := finalizer.NewCRDFinalizer(
        s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
        crdClient.Apiextensions(),
        crdHandler,
    )
    var openapiController *openapicontroller.Controller
    if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourcePublishOpenAPI) {
        openapiController = openapicontroller.NewController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions())
    }

    // 5、将 informer 以及 controller 添加到 PostStartHook 中
    s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
        s.Informers.Start(context.StopCh)
        return nil
    })
    s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
        ......
        go crdController.Run(context.StopCh)
        go namingController.Run(context.StopCh)
        go establishingController.Run(context.StopCh)
        go nonStructuralSchemaController.Run(5, context.StopCh)
        go apiApprovalController.Run(5, context.StopCh)
        go finalizingController.Run(5, context.StopCh)
        return nil
    })

    s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {
        return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
            return s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions().Informer().HasSynced(), nil
        }, context.StopCh)
    })

    return s, nil
}

以上是 APIExtensionsServer 的初始化流程,其中最核心方法是 s.GenericAPIServer.InstallAPIGroup,也就是 API 的注册过程,三种 server 中 API 的注册过程都是其核心。

CreateKubeAPIServer

本节继续分析 KubeAPIServer 的初始化,在CreateKubeAPIServer 中调用了 kubeAPIServerConfig.Complete().New 来完成相关的初始化操作。

kubeAPIServerConfig.Complete().New

主要逻辑为:

  • 1、调用 c.GenericConfig.New 初始化 GenericAPIServer,其主要实现在上文已经分析过;
  • 2、判断是否支持 logs 相关的路由,如果支持,则添加 /logs 路由;
  • 3、调用 m.InstallLegacyAPI 将核心 API Resource 添加到路由中,对应到 apiserver 就是以 /api 开头的 resource;
  • 4、调用 m.InstallAPIs 将扩展的 API Resource 添加到路由中,在 apiserver 中即是以 /apis 开头的 resource;
k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:214
func CreateKubeAPIServer(......) (*master.Master, error) {
    kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
    if err != nil {
        return nil, err
    }

    kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-admission-initializer", admissionPostStartHook)

    return kubeAPIServer, nil
}
k8s.io/kubernetes/pkg/master/master.go:325
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
    ......
    // 1、初始化 GenericAPIServer
    s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
    if err != nil {
        return nil, err
    }

    // 2、注册 logs 相关的路由
    if c.ExtraConfig.EnableLogsSupport {
        routes.Logs{}.Install(s.Handler.GoRestfulContainer)
    }

    m := &Master{
        GenericAPIServer: s,
    }
    
    // 3、安装 LegacyAPI
    if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
        legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
            StorageFactory:              c.ExtraConfig.StorageFactory,
            ProxyTransport:              c.ExtraConfig.ProxyTransport,
            ......
        }
        if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
            return nil, err
        }
    }
    restStorageProviders := []RESTStorageProvider{
        auditregistrationrest.RESTStorageProvider{},
        authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.  Authentication.APIAudiences},
        ......
    }
    // 4、安装 APIs
    if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
        return nil, err
    }

    if c.ExtraConfig.Tunneler != nil {
        m.installTunneler(c.ExtraConfig.Tunneler, corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes())
    }

    m.GenericAPIServer.AddPostStartHookOrDie("ca-registration", c.ExtraConfig.ClientCARegistrationHook.PostStartHook)

    return m, nil
}
m.InstallLegacyAPI

此方法的主要功能是将 core API 注册到路由中,是 apiserver 初始化流程中最核心的方法之一,不过其调用链非常深,下面会进行深入分析。将 API 注册到路由其最终的目的就是对外提供 RESTful API 来操作对应 resource,注册 API 主要分为两步,第一步是为 API 中的每个 resource 初始化 RESTStorage 以此操作后端存储中数据的变更,第二步是为每个 resource 根据其 verbs 构建对应的路由。m.InstallLegacyAPI 的主要逻辑为:

  • 1、调用 legacyRESTStorageProvider.NewLegacyRESTStorage 为 LegacyAPI 中各个资源创建 RESTStorage,RESTStorage 的目的是将每种资源的访问路径及其后端存储的操作对应起来;
  • 2、初始化 bootstrap-controller,并将其加入到 PostStartHook 中,bootstrap-controller 是 apiserver 中的一个 controller,主要功能是创建系统所需要的一些 namespace 以及创建 kubernetes service 并定期触发对应的 sync 操作,apiserver 在启动后会通过调用 PostStartHook 来启动 bootstrap-controller
  • 3、在为资源创建完 RESTStorage 后,调用 m.GenericAPIServer.InstallLegacyAPIGroup 为 APIGroup 注册路由信息,InstallLegacyAPIGroup方法的调用链非常深,主要为InstallLegacyAPIGroup--> installAPIResources --> InstallREST --> Install --> registerResourceHandlers,最终核心的路由构造在registerResourceHandlers方法内,该方法比较复杂,其主要功能是通过上一步骤构造的 REST Storage 判断该资源可以执行哪些操作(如 create、update等),将其对应的操作存入到 action 中,每一个 action 对应一个标准的 REST 操作,如 create 对应的 action 操作为 POST、update 对应的 action 操作为PUT。最终根据 actions 数组依次遍历,对每一个操作添加一个 handler 方法,注册到 route 中去,再将 route 注册到 webservice 中去,webservice 最终会注册到 container 中,遵循 go-restful 的设计模式;

关于 legacyRESTStorageProvider.NewLegacyRESTStorage 以及 m.GenericAPIServer.InstallLegacyAPIGroup 方法的详细说明在后文中会继续进行讲解。

k8s.io/kubernetes/pkg/master/master.go:406
func (m *Master) InstallLegacyAPI(......) error {
    legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
    if err != nil {
        return fmt.Errorf("Error building core storage: %v", err)
    }

    controllerName := "bootstrap-controller"
    coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
    bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
    m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
    m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)

    if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
        return fmt.Errorf("Error in registering group versions: %v", err)
    }
    return nil
}

InstallAPIsInstallLegacyAPI 的主要流程是类似的,限于篇幅此处不再深入分析。

createAggregatorServer

AggregatorServer 主要用于自定义的聚合控制器的,使 CRD 能够自动注册到集群中。

主要逻辑为:

  • 1、调用 aggregatorConfig.Complete().NewWithDelegate 创建 aggregatorServer;
  • 2、初始化 crdRegistrationControllerautoRegistrationControllercrdRegistrationController 负责注册 CRD,autoRegistrationController 负责将 CRD 对应的 APIServices 自动注册到 apiserver 中,CRD 创建后可通过 $ kubectl get apiservices 查看是否注册到 apiservices 中;
  • 3、将 autoRegistrationControllercrdRegistrationController 加入到 PostStartHook 中;
k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go:124
func createAggregatorServer(......) (*aggregatorapiserver.APIAggregator, error) {
    // 1、初始化 aggregatorServer
    aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
    if err != nil {
        return nil, err
    }

    // 2、初始化 auto-registration controller
    apiRegistrationClient, err := apiregistrationclient.NewForConfig(aggregatorConfig.GenericConfig.LoopbackClientConfig)
    if err != nil {
        return nil, err
    }
    autoRegistrationController := autoregister.NewAutoRegisterController(......)
    apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)
    crdRegistrationController := crdregistration.NewCRDRegistrationController(......)
    err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
        go crdRegistrationController.Run(5, context.StopCh)
        go func() {
            if aggregatorConfig.GenericConfig.MergedResourceConfig.AnyVersionForGroupEnabled("apiextensions.k8s.io") {
                crdRegistrationController.WaitForInitialSync()
            }
            autoRegistrationController.Run(5, context.StopCh)
        }()
        return nil
    })
    if err != nil {
        return nil, err
    }

    err = aggregatorServer.GenericAPIServer.AddBootSequenceHealthChecks(
        makeAPIServiceAvailableHealthCheck(
            "autoregister-completion",
            apiServices,
            aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(),
        ),
    )
    if err != nil {
        return nil, err
    }

    return aggregatorServer, nil
}
aggregatorConfig.Complete().NewWithDelegate

aggregatorConfig.Complete().NewWithDelegate 是初始化 aggregatorServer 的方法,主要逻辑为:

  • 1、调用 c.GenericConfig.New 初始化 GenericAPIServer,其内部的主要功能在上文已经分析过;
  • 2、调用 apiservicerest.NewRESTStorage 为 APIServices 资源创建 RESTStorage,RESTStorage 的目的是将每种资源的访问路径及其后端存储的操作对应起来;
  • 3、调用 s.GenericAPIServer.InstallAPIGroup 为 APIGroup 注册路由信息;
k8s.io/kubernetes/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go:158
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
    openAPIConfig := c.GenericConfig.OpenAPIConfig
    c.GenericConfig.OpenAPIConfig = nil
    // 1、初始化 genericServer
    genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
    if err != nil {
        return nil, err
    }

    apiregistrationClient, err := clientset.NewForConfig(c.GenericConfig.LoopbackClientConfig)
    if err != nil {
        return nil, err
    }
    informerFactory := informers.NewSharedInformerFactory(
        apiregistrationClient,
        5*time.Minute, 
    )
    s := &APIAggregator{
        GenericAPIServer: genericServer,
        delegateHandler: delegationTarget.UnprotectedHandler(),
        ......
    }

    // 2、为 API 注册路由
    apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter)
    if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
        return nil, err
    }
	
    // 3、初始化 apiserviceRegistrationController、availableController
    apisHandler := &apisHandler{
        codecs: aggregatorscheme.Codecs,
        lister: s.lister,
    }
    s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler)
    s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler)
    apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().V1().APIServices(), s)
    availableController, err := statuscontrollers.NewAvailableConditionController(
       ......
    )
    if err != nil {
        return nil, err
    }

    // 4、添加 PostStartHook
    s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
        informerFactory.Start(context.StopCh)
        c.GenericConfig.SharedInformerFactory.Start(context.StopCh)
        return nil
    })
    s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext)      error {
        go apiserviceRegistrationController.Run(context.StopCh)
        return nil
    })
    s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext)  error {
        go availableController.Run(5, context.StopCh)
        return nil
    })

    return s, nil
}

以上是对 AggregatorServer 初始化流程的分析,可以看出,在创建 APIExtensionsServer、KubeAPIServer 以及 AggregatorServer 时,其模式都是类似的,首先调用 c.GenericConfig.New 按照go-restful的模式初始化 Container,然后为 server 中需要注册的资源创建 RESTStorage,最后将 resource 的 APIGroup 信息注册到路由中。

至此,CreateServerChain 中流程已经分析完,其中的调用链如下所示:

                    |--> CreateNodeDialer
                    |
                    |--> CreateKubeAPIServerConfig
                    |
CreateServerChain --|--> createAPIExtensionsConfig
                    |
                    |                                                                       |--> c.GenericConfig.New
                    |--> createAPIExtensionsServer --> apiextensionsConfig.Complete().New --|
                    |                                                                       |--> s.GenericAPIServer.InstallAPIGroup
                    |
                    |                                                                 |--> c.GenericConfig.New --> legacyRESTStorageProvider.NewLegacyRESTStorage
                    |                                                                 |
                    |--> CreateKubeAPIServer --> kubeAPIServerConfig.Complete().New --|--> m.InstallLegacyAPI
                    |                                                                 |
                    |                                                                 |--> m.InstallAPIs
                    |
                    |
                    |--> createAggregatorConfig
                    |
                    |                                                                             |--> c.GenericConfig.New
                    |                                                                             |
                    |--> createAggregatorServer --> aggregatorConfig.Complete().NewWithDelegate --|--> apiservicerest.NewRESTStorage
                                                                                                  |
                                                                                                  |--> s.GenericAPIServer.InstallAPIGroup

prepared.Run

Run 方法中首先调用 CreateServerChain 完成各 server 的初始化,然后调用 server.PrepareRun 完成服务启动前的准备工作,最后调用 prepared.Run 方法来启动安全的 http server。server.PrepareRun 主要完成了健康检查、存活检查和OpenAPI路由的注册工作,下面继续分析 prepared.Run 的流程,在 prepared.Run 中主要调用 s.NonBlockingRun 来完成启动工作。

k8s.io/kubernetes/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go:269
func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error {
    return s.runnable.Run(stopCh)
}
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go:316
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
    delayedStopCh := make(chan struct{})

    go func() {
        defer close(delayedStopCh)
        <-stopCh

        time.Sleep(s.ShutdownDelayDuration)
    }()

    // 调用 s.NonBlockingRun 完成启动流程
    err := s.NonBlockingRun(delayedStopCh)
    if err != nil {
        return err
    }

    // 当收到退出信号后完成一些收尾工作
    <-stopCh
    err = s.RunPreShutdownHooks()
    if err != nil {
        return err
    }

    <-delayedStopCh
    s.HandlerChainWaitGroup.Wait()
    return nil
}
s.NonBlockingRun

s.NonBlockingRun 的主要逻辑为:

  • 1、判断是否要启动审计日志服务;
  • 2、调用 s.SecureServingInfo.Serve 配置并启动 https server;
  • 3、执行 postStartHooks;
  • 4、向 systemd 发送 ready 信号;
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go:351
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
    auditStopCh := make(chan struct{})

    // 1、判断是否要启动审计日志
    if s.AuditBackend != nil {
        if err := s.AuditBackend.Run(auditStopCh); err != nil {
            return fmt.Errorf("failed to run the audit backend: %v", err)
        }
    }

    // 2、启动 https server
    internalStopCh := make(chan struct{})
    var stoppedCh <-chan struct{}
    if s.SecureServingInfo != nil && s.Handler != nil {
        var err error
        stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)
        if err != nil {
            close(internalStopCh)
            close(auditStopCh)
            return err
        }
    }

    go func() {
        <-stopCh
        close(s.readinessStopCh)
        close(internalStopCh)
        if stoppedCh != nil {
            <-stoppedCh
        }
        s.HandlerChainWaitGroup.Wait()
        close(auditStopCh)
    }()

    // 3、执行 postStartHooks
    s.RunPostStartHooks(stopCh)

    // 4、向 systemd 发送 ready 信号
    if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil {
        klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
    }

    return nil
}

以上就是 server 的初始化以及启动流程过程的分析,上文已经提到各 server 初始化过程中最重要的就是 API Resource RESTStorage 的初始化以及路由的注册,由于该过程比较复杂,下文会单独进行讲述。

storageFactory 的构建

上文已经提到过,apiserver 最终实现的 handler 对应的后端数据是以 Store 的结构保存的,这里以 /api 开头的路由举例,通过NewLegacyRESTStorage方法创建各个资源的RESTStorage。RESTStorage 是一个结构体,具体的定义在k8s.io/apiserver/pkg/registry/generic/registry/store.go下,结构体内主要包含NewFunc返回特定资源信息、NewListFunc返回特定资源列表、CreateStrategy特定资源创建时的策略、UpdateStrategy更新时的策略以及DeleteStrategy删除时的策略等重要方法。在NewLegacyRESTStorage内部,可以看到创建了多种资源的 RESTStorage。

NewLegacyRESTStorage 的调用链为 CreateKubeAPIServer --> kubeAPIServerConfig.Complete().New --> m.InstallLegacyAPI --> legacyRESTStorageProvider.NewLegacyRESTStorage

NewLegacyRESTStorage

一个 API Group 下的资源都有其 REST 实现,k8s.io/kubernetes/pkg/registry下所有的 Group 都有一个rest目录,存储的就是对应资源的 RESTStorage。在NewLegacyRESTStorage方法中,通过NewREST或者NewStorage会生成各种资源对应的 Storage,此处以 pod 为例进行说明。

k8s.io/kubernetes/pkg/registry/core/rest/storage_core.go:102
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.  APIGroupInfo, error) {
    apiGroupInfo := genericapiserver.APIGroupInfo{
        PrioritizedVersions:          legacyscheme.Scheme.PrioritizedVersionsForGroup(""),
        VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
        Scheme:                       legacyscheme.Scheme,
        ParameterCodec:               legacyscheme.ParameterCodec,
        NegotiatedSerializer:         legacyscheme.Codecs,
    }

    var podDisruptionClient policyclient.PodDisruptionBudgetsGetter
    if policyGroupVersion := (schema.GroupVersion{Group: "policy", Version: "v1beta1"}); legacyscheme.Scheme.                               IsVersionRegistered(policyGroupVersion) {
        var err error
        podDisruptionClient, err = policyclient.NewForConfig(c.LoopbackClientConfig)
        if err != nil {
            return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
        }
    }
    // 1、LegacyAPI 下的 resource RESTStorage 的初始化
    restStorage := LegacyRESTStorage{}

    podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }
    eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }
    limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    ......

    endpointsStorage, err := endpointsstore.NewREST(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }

    // 2、pod RESTStorage 的初始化
    podStorage, err := podstore.NewStorage(......)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }
    ......
		
    serviceClusterIPAllocator, err := ipallocator.NewAllocatorCIDRRange(&serviceClusterIPRange, func(max int, rangeSpec string) (allocator. Interface, error) {
        ......
    })
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster IP allocator: %v", err)
    }
    restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry

    var secondaryServiceClusterIPAllocator ipallocator.Interface
    if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) && c.SecondaryServiceIPRange.IP != nil {
        ......
    }

    var serviceNodePortRegistry rangeallocation.RangeRegistry
    serviceNodePortAllocator, err := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string)      (allocator.Interface, error) {
        ......
    })
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster port allocator: %v", err)
    }
    restStorage.ServiceNodePortAllocator = serviceNodePortRegistry

    controllerStorage, err := controllerstore.NewStorage(restOptionsGetter)
    if err != nil {
        return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
    }
    
    serviceRest, serviceRestProxy := servicestore.NewREST(......)
    
    // 3、restStorageMap 保存 resource http path 与 RESTStorage 对应关系
    restStorageMap := map[string]rest.Storage{
        "pods":             podStorage.Pod,
        "pods/attach":      podStorage.Attach,
        "pods/status":      podStorage.Status,
        "pods/log":         podStorage.Log,
        "pods/exec":        podStorage.Exec,
        "pods/portforward": podStorage.PortForward,
        "pods/proxy":       podStorage.Proxy,
        ......
        "componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
    }
    ......
}
podstore.NewStorage

podstore.NewStorage 是为 pod 生成 storage 的方法,该方法主要功能是为 pod 创建后端存储最终返回一个 RESTStorage 对象,其中调用 store.CompleteWithOptions 来创建后端存储的。

k8s.io/kubernetes/pkg/registry/core/pod/storage/storage.go:71
func NewStorage(......) (PodStorage, error) {
    store := &genericregistry.Store{
        NewFunc:                  func() runtime.Object { return &api.Pod{} },
        NewListFunc:              func() runtime.Object { return &api.PodList{} },
        ......
    }
    options := &generic.StoreOptions{
        RESTOptions: optsGetter,
        AttrFunc:    pod.GetAttrs,
        TriggerFunc: map[string]storage.IndexerFunc{"spec.nodeName": pod.NodeNameTriggerFunc},
    }
    
    // 调用 store.CompleteWithOptions
    if err := store.CompleteWithOptions(options); err != nil {
        return PodStorage{}, err
    }
    statusStore := *store
    statusStore.UpdateStrategy = pod.StatusStrategy
    ephemeralContainersStore := *store
    ephemeralContainersStore.UpdateStrategy = pod.EphemeralContainersStrategy

    bindingREST := &BindingREST{store: store}
    
    // PodStorage 对象
    return PodStorage{
        Pod:                 &REST{store, proxyTransport},
        Binding:             &BindingREST{store: store},
        LegacyBinding:       &LegacyBindingREST{bindingREST},
        Eviction:            newEvictionStorage(store, podDisruptionBudgetClient),
        Status:              &StatusREST{store: &statusStore},
        EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore},
        Log:                 &podrest.LogREST{Store: store, KubeletConn: k},
        Proxy:               &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
        Exec:                &podrest.ExecREST{Store: store, KubeletConn: k},
        Attach:              &podrest.AttachREST{Store: store, KubeletConn: k},
        PortForward:         &podrest.PortForwardREST{Store: store, KubeletConn: k},
    }, nil
}

可以看到最终返回的对象里对 pod 的不同操作都是一个 REST 对象,REST 中自动集成了 genericregistry.Store 对象,而 store.CompleteWithOptions 方法就是对 genericregistry.Store 对象中存储实例就行初始化的。

type REST struct {
    *genericregistry.Store
    proxyTransport http.RoundTripper
}

type BindingREST struct {
    store *genericregistry.Store
}
......
store.CompleteWithOptions

store.CompleteWithOptions 主要功能是为 store 中的配置设置一些默认的值以及根据提供的 options 更新 store,其中最主要的就是初始化 store 的后端存储实例。

CompleteWithOptions方法内,调用了options.RESTOptions.GetRESTOptions 方法,其最终返回generic.RESTOptions 对象,generic.RESTOptions 对象中包含对 etcd 初始化的一些配置、数据序列化方法以及对 etcd 操作的 storage.Interface 对象。其会依次调用StorageWithCacher-->NewRawStorage-->Create方法创建最终依赖的后端存储。

k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go:1192
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
    ......

    var isNamespaced bool
    switch {
    case e.CreateStrategy != nil:
        isNamespaced = e.CreateStrategy.NamespaceScoped()
    case e.UpdateStrategy != nil:
        isNamespaced = e.UpdateStrategy.NamespaceScoped()
    default:
        return fmt.Errorf("store for %s must have CreateStrategy or UpdateStrategy set", e.DefaultQualifiedResource.String())
    }
    ......

    // 1、调用 options.RESTOptions.GetRESTOptions 
    opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
    if err != nil {
        return err
    }

    // 2、设置 ResourcePrefix 
    prefix := opts.ResourcePrefix
    if !strings.HasPrefix(prefix, "/") {
        prefix = "/" + prefix
    }
    
    if prefix == "/" {
        return fmt.Errorf("store for %s has an invalid prefix %q", e.DefaultQualifiedResource.String(), opts.ResourcePrefix)
    }
    
    if e.KeyRootFunc == nil && e.KeyFunc == nil {
        ......
    }

    keyFunc := func(obj runtime.Object) (string, error) {
        ......
    }

    // 3、以下操作主要是将 opts 对象中的值赋值到 store 对象中
    if e.DeleteCollectionWorkers == 0 {
        e.DeleteCollectionWorkers = opts.DeleteCollectionWorkers
    }

    e.EnableGarbageCollection = opts.EnableGarbageCollection
    if e.ObjectNameFunc == nil {
        ......
    }

    if e.Storage.Storage == nil {
        e.Storage.Codec = opts.StorageConfig.Codec
        var err error
        e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
            opts.StorageConfig,
            prefix,
            keyFunc,
            e.NewFunc,
            e.NewListFunc,
            attrFunc,
            options.TriggerFunc,
        )
        if err != nil {
            return err
        }
        e.StorageVersioner = opts.StorageConfig.EncodeVersioner

        if opts.CountMetricPollPeriod > 0 {
            stopFunc := e.startObservingCount(opts.CountMetricPollPeriod)
            previousDestroy := e.DestroyFunc
            e.DestroyFunc = func() {
                stopFunc()
                if previousDestroy != nil {
                    previousDestroy()
                }
            }
        }
    }

    return nil
}

options.RESTOptions 是一个 interface,想要找到其 GetRESTOptions 方法的实现必须知道 options.RESTOptions 初始化时对应的实例,其初始化是在 CreateKubeAPIServerConfig --> buildGenericConfig --> s.Etcd.ApplyWithStorageFactoryTo 方法中进行初始化的,RESTOptions 对应的实例为 StorageFactoryRestOptionsFactory,所以 PodStorage 初始时构建的 store 对象中genericserver.Config.RESTOptionsGetter 实际的对象类型为 StorageFactoryRestOptionsFactory,其 GetRESTOptions 方法如下所示:

k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go:253
func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
    storageConfig, err := f.StorageFactory.NewConfig(resource)
    if err != nil {
        return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
    }

    ret := generic.RESTOptions{
        StorageConfig:           storageConfig,
        Decorator:               generic.UndecoratedStorage,
        DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
        EnableGarbageCollection: f.Options.EnableGarbageCollection,
        ResourcePrefix:          f.StorageFactory.ResourcePrefix(resource),
        CountMetricPollPeriod:   f.Options.StorageConfig.CountMetricPollPeriod,
    }
    if f.Options.EnableWatchCache {
        sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
        if err != nil {
            return generic.RESTOptions{}, err
        }
        cacheSize, ok := sizes[resource]
        if !ok {
            cacheSize = f.Options.DefaultWatchCacheSize
        }
        // 调用 generic.StorageDecorator
        ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
    }

    return ret, nil
}

genericregistry.StorageWithCacher 中又调用了不同的方法最终会调用 factory.Create 来初始化存储实例,其调用链为:genericregistry.StorageWithCacher --> generic.NewRawStorage --> factory.Create

k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go:30
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
    switch c.Type {
    case "etcd2":
        return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
    // 目前 k8s 只支持使用 etcd v3
    case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
        return newETCD3Storage(c)
    default:
        return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
    }
}
newETCD3Storage

newETCD3Storage 中,首先通过调用 newETCD3Client 创建 etcd 的 client,client 的创建最终是通过 etcd 官方提供的客户端工具 clientv3 进行创建的。

k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go:209
func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
    stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval)
    if err != nil {
        return nil, nil, err
    }

    client, err := newETCD3Client(c.Transport)
    if err != nil {
        stopCompactor()
        return nil, nil, err
    }

    var once sync.Once
    destroyFunc := func() {
        once.Do(func() {
            stopCompactor()
            client.Close()
        })
    }
    transformer := c.Transformer
    if transformer == nil {
        transformer = value.IdentityTransformer
    }
    return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
}

至此对于 pod resource 中 store 的构建基本分析完成,不同 resource 对应一个 REST 对象,其中又引用了 genericregistry.Store 对象,最终是对 genericregistry.Store 的初始化。在分析完 store 的初始化后还有一个重要的步骤就是路由的注册,路由注册主要的流程是为 resource 根据不同 verbs 构建 http path 以及将 path 与对应 handler 进行绑定。

路由注册

上文 RESTStorage 的构建对应的是 InstallLegacyAPI 中的 legacyRESTStorageProvider.NewLegacyRESTStorage 方法,下面继续分析 InstallLegacyAPI 中的 m.GenericAPIServer.InstallLegacyAPIGroup 方法的实现。

k8s.io/kubernetes/pkg/master/master.go:406
func (m *Master) InstallLegacyAPI(......) error {
    legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
    if err != nil {
        return fmt.Errorf("Error building core storage: %v", err)
    }
    ......

    if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
        return fmt.Errorf("Error in registering group versions: %v", err)
    }
    return nil
}

m.GenericAPIServer.InstallLegacyAPIGroup 的调用链非常深,最终是为 Group 下每一个 API resources 注册 handler 及路由信息,其调用链为:m.GenericAPIServer.InstallLegacyAPIGroup --> s.installAPIResources --> apiGroupVersion.InstallREST --> installer.Install --> a.registerResourceHandlers。其中几个方法的作用如下所示:

  • s.installAPIResources:为每一个 API resource 调用 apiGroupVersion.InstallREST 添加路由;
  • apiGroupVersion.InstallREST:将 restful.WebServic 对象添加到 container 中;
  • installer.Install:返回最终的 restful.WebService 对象
a.registerResourceHandlers

该方法实现了 rest.Storagerestful.Route 的转换,其首先会判断 API Resource 所支持的 REST 接口,然后为 REST 接口添加对应的 handler,最后将其注册到路由中。

k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go:181
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {       
    admit := a.group.Admit

    ......
   
    // 1、判断该 resource 实现了哪些 REST 操作接口,以此来判断其支持的 verbs 以便为其添加路由
    creater, isCreater := storage.(rest.Creater)
    namedCreater, isNamedCreater := storage.(rest.NamedCreater)
    lister, isLister := storage.(rest.Lister)
    getter, isGetter := storage.(rest.Getter)
    getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
    gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
    collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
    updater, isUpdater := storage.(rest.Updater)
    patcher, isPatcher := storage.(rest.Patcher)
    watcher, isWatcher := storage.(rest.Watcher)
    connecter, isConnecter := storage.(rest.Connecter)
    storageMeta, isMetadata := storage.(rest.StorageMetadata)
    storageVersionProvider, isStorageVersionProvider := storage.(rest.StorageVersionProvider)
    if !isMetadata {
        storageMeta = defaultStorageMetadata{}
    }
    exporter, isExporter := storage.(rest.Exporter)
    if !isExporter {
        exporter = nil
    }

    ......
    
    // 2、为 resource 添加对应的 actions 并根据是否支持 namespace 
    switch {
    case !namespaceScoped:
        ......

        actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
        actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
        actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
        actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)

        actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
        if getSubpath {
            actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
        }
        actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
        actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
        actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)
        actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
        actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
        actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)
    default:
        ......
        actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
        actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
        actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
        actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)

        actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
        ......
    }

    // 3、根据 action 创建对应的 route
    kubeVerbs := map[string]struct{}{}
    reqScope := handlers.RequestScope{
        Serializer:      a.group.Serializer,
        ParameterCodec:  a.group.ParameterCodec,
        Creater:         a.group.Creater,
        Convertor:       a.group.Convertor,
        ......
    }
    ......
    // 4、从 rest.Storage 到 restful.Route 映射
    // 为每个操作添加对应的 handler
    for _, action := range actions {
        ......
        verbOverrider, needOverride := storage.(StorageMetricsOverride)
        switch action.Verb {
        case "GET": ......
        case "LIST":
        case "PUT":
        case "PATCH":
        // 此处以 POST 操作进行说明
        case "POST": 
            var handler restful.RouteFunction
            // 5、初始化 handler
            if isNamedCreater {
                handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
            } else {
                handler = restfulCreateResource(creater, reqScope, admit)
            }
            handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)
            article := GetArticleForNoun(kind, " ")
            doc := "create" + article + kind
            if isSubresource {
                doc = "create " + subresource + " of" + article + kind
            }
            // 6、route 与 handler 进行绑定
            route := ws.POST(action.Path).To(handler).
                Doc(doc).
                Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
                Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
                Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
                Returns(http.StatusOK, "OK", producedObject).
                Returns(http.StatusCreated, "Created", producedObject).
                Returns(http.StatusAccepted, "Accepted", producedObject).
                Reads(defaultVersionedObject).
                Writes(producedObject)
            if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil {
                return nil, err
            }
            addParams(route, action.Params)
            // 7、添加到路由中
            routes = append(routes, route)
        case "DELETE": 
        case "DELETECOLLECTION":
        case "WATCH":
        case "WATCHLIST":
        case "CONNECT":
        default:
    }
    ......
    return &apiResource, nil
}
restfulCreateNamedResource

restfulCreateNamedResource 是 POST 操作对应的 handler,最终会调用 createHandler 方法完成。

k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go:1087
func restfulCreateNamedResource(r rest.NamedCreater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
    return func(req *restful.Request, res *restful.Response) {
        handlers.CreateNamedResource(r, &scope, admit)(res.ResponseWriter, req.Request)
    }
}

func CreateNamedResource(r rest.NamedCreater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
    return createHandler(r, scope, admission, true)
}
createHandler

createHandler 是将数据写入到后端存储的方法,对于资源的操作都有相关的权限控制,在 createHandler 中首先会执行 decoderadmission 操作,然后调用 create 方法完成 resource 的创建,在 create 方法中会进行 validate 以及最终将数据保存到后端存储中。admit 操作即执行 kube-apiserver 中的 admission-plugins,admission-plugins 在 CreateKubeAPIServerConfig 中被初始化为了 admissionChain,其初始化的调用链为 CreateKubeAPIServerConfig --> buildGenericConfig --> s.Admission.ApplyTo --> a.GenericAdmission.ApplyTo --> a.Plugins.NewFromPlugins,最终在 a.Plugins.NewFromPlugins 中将所有已启用的 plugins 封装为 admissionChain,此处要执行的 admit 操作即执行 admission-plugins 中的 admit 操作。

createHandler 中调用的 create 方法是genericregistry.Store 对象的方法,在每个 resource 初始化 RESTStorage 都会引入 genericregistry.Store 对象。

createHandler 中所有的操作就是本文开头提到的请求流程,如下所示:

v1beta1 ⇒ internal ⇒    |    ⇒       |    ⇒  v1  ⇒ json/yaml ⇒ etcd
                     admission    validation
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go:46
func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
    return func(w http.ResponseWriter, req *http.Request) {
        trace := utiltrace.New("Create", utiltrace.Field{"url", req.URL.Path})
        defer trace.LogIfLong(500 * time.Millisecond)
        ......

        gv := scope.Kind.GroupVersion()
        // 1、得到合适的SerializerInfo
        s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)
        if err != nil {
            scope.err(err, w, req)
            return
        }
        // 2、找到合适的 decoder
        decoder := scope.Serializer.DecoderToVersion(s.Serializer, scope.HubGroupVersion)

        body, err := limitedReadBody(req, scope.MaxRequestBodyBytes)
        if err != nil {
            scope.err(err, w, req)
            return
        }

        ......

        defaultGVK := scope.Kind
        original := r.New()
        trace.Step("About to convert to expected version")
        // 3、decoder 解码
        obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
        ......

        ae := request.AuditEventFrom(ctx)
        admit = admission.WithAudit(admit, ae)
        audit.LogRequestObject(ae, obj, scope.Resource, scope.Subresource, scope.Serializer)

        userInfo, _ := request.UserFrom(ctx)


        if len(name) == 0 {
            _, name, _ = scope.Namer.ObjectName(obj)
        }
        // 4、执行 admit 操作,即执行 kube-apiserver 启动时加载的 admission-plugins,
        admissionAttributes := admission.NewAttributesRecord(......)
        if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
            err = mutatingAdmission.Admit(ctx, admissionAttributes, scope)
            if err != nil {
                scope.err(err, w, req)
                return
            }
        }

        ......
        // 5、执行 create 操作
        result, err := finishRequest(timeout, func() (runtime.Object, error) {
            return r.Create(
                ctx,
                name,
                obj,
                rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
                options,
            )
        })
        ......
    }
}

总结

本文主要分析 kube-apiserver 的启动流程,kube-apiserver 中包含三个 server,分别为 KubeAPIServer、APIExtensionsServer 以及 AggregatorServer,三个 server 是通过委托模式连接在一起的,初始化过程都是类似的,首先为每个 server 创建对应的 config,然后初始化 http server,http server 的初始化过程为首先初始化 GoRestfulContainer,然后安装 server 所包含的 API,安装 API 时首先为每个 API Resource 创建对应的后端存储 RESTStorage,再为每个 API Resource 支持的 verbs 添加对应的 handler,并将 handler 注册到 route 中,最后将 route 注册到 webservice 中,启动流程中 RESTFul API 的实现流程是其核心,至于 kube-apiserver 中认证鉴权等 filter 的实现、多版本资源转换、kubernetes service 的实现等一些细节会在后面的文章中继续进行分析。

启动流程

cmd/kube-apiserver/apiserver.go的main包中启动apiserver,使用options包中的NewServerRunOptions()函数初始化默认配置,并使用pflag包和AddFlags()方法通过命令行启动参数填充配置。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func main() {
	rand.Seed(time.Now().UTC().UnixNano())

	s := options.NewServerRunOptions()
	s.AddFlags(pflag.CommandLine)

	flag.InitFlags()
	logs.InitLogs()
	defer logs.FlushLogs()

	verflag.PrintAndExitIfRequested()

	if err := app.Run(s); err != nil {
		fmt.Fprintf(os.Stderr, "%v\n", err)
		os.Exit(1)
	}
}

初始化完成后调用app包中的Run()函数启动实例,将创建的ServerRunOptions对象传入app.Run()中,并创建一个http server和一个https server。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
//Run()让apiserver跑起来,永远不会退出
func Run(s *options.ServerRunOptions) error {
	genericvalidation.VerifyEtcdServersList(s.GenericServerRunOptions)
	genericapiserver.DefaultAndValidateRunOptions(s.GenericServerRunOptions)
	genericConfig := genericapiserver.NewConfig(). // create the new config
							ApplyOptions(s.GenericServerRunOptions). // apply the options selected
							Complete()                               // set default values based on the known values

 ...
  
	m, err := config.Complete().New()
	if err != nil {
		return err
	}

	sharedInformers.Start(wait.NeverStop)
	m.GenericAPIServer.PrepareRun().Run(wait.NeverStop)
	return nil
}

该函数主要用于生成master实例对象,m, err := config.Complete().New() 用来创建master,Complete()完善Config的初始化,New()进行resources的初始化和RESTful-api的注册,各种api的请求最后都是通过master对象来处理的,在最后APIServer会通过启动Run(wait.NeverStop)的方法来启动HTTP/HTTPS服务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) {
	if s.SecureServingInfo != nil && s.Handler != nil {
		if err := s.serveSecurely(stopCh); err != nil {
			glog.Fatal(err)
		}
	}

	if s.InsecureServingInfo != nil && s.InsecureHandler != nil {
		if err := s.serveInsecurely(stopCh); err != nil {
			glog.Fatal(err)
		}
	}

	s.RunPostStartHooks()

	// err == systemd.SdNotifyNoSocket when not running on a systemd system
	if err := systemd.SdNotify("READY=1\n"); err != nil && err != systemd.SdNotifyNoSocket {
		glog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
	}

	<-stopCh
}

在这个Run()中实际由s.serveSecurely(stopCh)s.serveInsecurely(stopCh)分别运行了https和http server。

而实际上,s.serveSecurely(stopCh)s.serveInsecurely(stopCh)中都会调用runServer()函数来运行http和https server,runServer()会监听传入的端口号,调用goroutine持续服务直到stopCH这个只读通道关闭。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
func runServer(server *http.Server, network string, stopCh <-chan struct{}) (int, error) {
	if len(server.Addr) == 0 {
		return 0, errors.New("address cannot be empty")
	}

	if len(network) == 0 {
		network = "tcp"
	}

	// first listen is synchronous (fail early!)
	ln, err := net.Listen(network, server.Addr)
	if err != nil {
		return 0, fmt.Errorf("failed to listen on %v: %v", server.Addr, err)
	}

	// get port
	tcpAddr, ok := ln.Addr().(*net.TCPAddr)
	if !ok {
		ln.Close()
		return 0, fmt.Errorf("invalid listen address: %q", ln.Addr().String())
	}

	lock := sync.Mutex{} // to avoid we close an old listener during a listen retry
	go func() {
		<-stopCh
		lock.Lock()
		defer lock.Unlock()
		ln.Close()
	}()

	go func() {
		defer utilruntime.HandleCrash()

		for {
			var listener net.Listener
			listener = tcpKeepAliveListener{ln.(*net.TCPListener)}
			if server.TLSConfig != nil {
				listener = tls.NewListener(listener, server.TLSConfig)
			}

			err := server.Serve(listener)
			glog.Errorf("Error serving %v (%v); will try again.", server.Addr, err)

			// listen again
			func() {
				lock.Lock()
				defer lock.Unlock()
				for {
					time.Sleep(15 * time.Second)

					ln, err = net.Listen("tcp", server.Addr)
					if err == nil {
						return
					}
					select {
					case <-stopCh:
						return
					default:
					}
					glog.Errorf("Error listening on %v (%v); will try again.", server.Addr, err)
				}
			}()

			select {
			case <-stopCh:
				return
			default:
			}
		}
	}()

	return tcpAddr.Port, nil
}

所以整个apiserver的启动主体过程就是下图:

apiserver start
apiserver start

关键数据结构

type APIRegistrationManager struct

APIRegistrationManager负责对外提供已经注册并enable了的GroupVersions,将所有已经注册的,已经enable的,第三方的的GroupVersions进行了汇总,还包括了各个GroupVersionGroupMeta

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type APIRegistrationManager struct {
	// registeredGroupVersions stores all API group versions for which RegisterGroup is called.
    //所有已经registered的GroupVersions都是通过调用RegisterVersions()方法来进行注册的

	registeredVersions map[unversioned.GroupVersion]struct{}

	// thirdPartyGroupVersions are API versions which are dynamically
	// registered (and unregistered) via API calls to the apiserver
	//第三方注册的GroupVersions,这些都向apiServer动态注册的使用AddThirdPartyAPIGroupVersions()进行注册
	
	thirdPartyGroupVersions []unversioned.GroupVersion

	// enabledVersions represents all enabled API versions. It should be a
	// subset of registeredVersions. Please call EnableVersions() to add
	// enabled versions.
	//所有已经enable的GroupVersions,可以通过EnableVersions()将要enable的GroupVersion加入进来。只有enable了,才能使用对应的GroupVersion

	enabledVersions map[unversioned.GroupVersion]struct{}

	// map of group meta for all groups.
	// 所有groups的GroupMeta

	groupMetaMap map[string]*apimachinery.GroupMeta

	// envRequestedVersions represents the versions requested via the
	// KUBE_API_VERSIONS environment variable. The install package of each group
	// checks this list before add their versions to the latest package and
	// Scheme.  This list is small and order matters, so represent as a slice
	//存储KUBE_API_VERSIONS环境变量包含的版本,如果未指定,则KUBE_API_VERSIONS为空
	
	envRequestedVersions []unversioned.GroupVersion
}

type RESTMapper interface

RESTMapper是一个接口,RESTMapper可以从GVR获取GVK,并生成一个RESTMapping来处理该GVR

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// RESTMapper allows clients to map resources to kind, and map kind and version
// to interfaces for manipulating those objects. It is primarily intended for
// consumers of Kubernetes compatible REST APIs as defined in docs/devel/api-conventions.md.
//
// The Kubernetes API provides versioned resources and object kinds which are scoped
// to API groups. In other words, kinds and resources should not be assumed to be
// unique across groups.

type RESTMapper interface {
	// KindFor takes a partial resource and returns the single match.  Returns an error if there are multiple matches
	KindFor(resource unversioned.GroupVersionResource) (unversioned.GroupVersionKind, error)

	// KindsFor takes a partial resource and returns the list of potential kinds in priority order
	KindsFor(resource unversioned.GroupVersionResource) ([]unversioned.GroupVersionKind, error)

	// ResourceFor takes a partial resource and returns the single match.  Returns an error if there are multiple matches
	ResourceFor(input unversioned.GroupVersionResource) (unversioned.GroupVersionResource, error)

	// ResourcesFor takes a partial resource and returns the list of potential resource in priority order
	ResourcesFor(input unversioned.GroupVersionResource) ([]unversioned.GroupVersionResource, error)

	// RESTMapping identifies a preferred resource mapping for the provided group kind.
	// RESTMapping为指定的group kind 生成一个resource mapping。
	RESTMapping(gk unversioned.GroupKind, versions ...string) (*RESTMapping, error)
	// RESTMappings returns all resource mappings for the provided group kind.
	RESTMappings(gk unversioned.GroupKind) ([]*RESTMapping, error)

	AliasesForResource(resource string) ([]string, bool)
	ResourceSingularizer(resource string) (singular string, err error)
}

type RESTMapping struct

RESTMapping包含一个Resource名称,及其对应的GVK,一个Scope(标明资源是否为root或者namespaced),一个Convertor用来转换该GVK对应的Object和一个MetadataAccessor用来提取Object的meta信息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
type RESTMapping struct {
	// Resource is a string representing the name of this resource as a REST client would see it
	Resource string

	GroupVersionKind unversioned.GroupVersionKind

	// Scope contains the information needed to deal with REST Resources that are in a resource hierarchy
	Scope RESTScope

	runtime.ObjectConvertor
	MetadataAccessor
}

type RESTScope interface

RESTScope用于标识某个资源是处于Namespace下,还是全局资源

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
const (

	RESTScopeNameNamespace RESTScopeName = "namespace"
	RESTScopeNameRoot      RESTScopeName = "root"
)

// RESTScope contains the information needed to deal with REST resources that are in a resource hierarchy
// RESTScope包含处理资源层次结构中的REST资源所需的信息

type RESTScope interface {
	// Name of the scope
	Name() RESTScopeName
	// ParamName is the optional name of the parameter that should be inserted in the resource url
	// If empty, no param will be inserted
	ParamName() string
	// ArgumentName is the optional name that should be used for the variable holding the value.
	ArgumentName() string
	// ParamDescription is the optional description to use to document the parameter in api documentation
	ParamDescription() string
}

type ObjectConvertor interface

Convertor用来转换该GVK对应的Object

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// ObjectConvertor converts an object to a different version.
// ObjectConvertor将一个object转换为不同的版本。
type ObjectConvertor interface {
	// Convert attempts to convert one object into another, or returns an error. This method does
	// not guarantee the in object is not mutated. The context argument will be passed to
	// all nested conversions.
	Convert(in, out, context interface{}) error
	// ConvertToVersion takes the provided object and converts it the provided version. This
	// method does not guarantee that the in object is not mutated. This method is similar to
	// Convert() but handles specific details of choosing the correct output version.
	ConvertToVersion(in Object, gv GroupVersioner) (out Object, err error)
	ConvertFieldLabel(version, kind, label, value string) (string, string, error)
}

type MetadataAccessor interface

type MetadataAccessor interface可以让你在任何external version或者internal version中操作object和list这些metadata

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
type MetadataAccessor interface {
	APIVersion(obj runtime.Object) (string, error)
	SetAPIVersion(obj runtime.Object, version string) error

	Kind(obj runtime.Object) (string, error)
	SetKind(obj runtime.Object, kind string) error

	Namespace(obj runtime.Object) (string, error)
	SetNamespace(obj runtime.Object, namespace string) error

	Name(obj runtime.Object) (string, error)
	SetName(obj runtime.Object, name string) error

	GenerateName(obj runtime.Object) (string, error)
	SetGenerateName(obj runtime.Object, name string) error

	UID(obj runtime.Object) (types.UID, error)
	SetUID(obj runtime.Object, uid types.UID) error

	SelfLink(obj runtime.Object) (string, error)
	SetSelfLink(obj runtime.Object, selfLink string) error

	Labels(obj runtime.Object) (map[string]string, error)
	SetLabels(obj runtime.Object, labels map[string]string) error

	Annotations(obj runtime.Object) (map[string]string, error)
	SetAnnotations(obj runtime.Object, annotations map[string]string) error

	runtime.ResourceVersioner
}

//runtime.Object 定义在/pkg/runtime/interfaces.go

// All API types registered with Scheme must support the Object interface. Since objects in a scheme are
// expected to be serialized to the wire, the interface an Object must provide to the Scheme allows
// serializers to set the kind, version, and group the object is represented as. An Object may choose
// to return a no-op ObjectKindAccessor in cases where it is not expected to be serialized.
// 在Scheme中注册的所有API类型都必须支持Object接口。这是因为在scheme中的objects是会被序列化成线的,所以一个Object必须提供接口给scheme来序列化地设置其kind、version、group。在不需要序列化的情况下,Object可以选择返回一个无操作的ObjectKindAccessor。

type Object interface {
	GetObjectKind() unversioned.ObjectKind
}

// ResourceVersioner provides methods for setting and retrieving
// the resource version from an API object.
// 设置和接收一个API object的resource version
type ResourceVersioner interface {
	SetResourceVersion(obj Object, version string) error
	ResourceVersion(obj Object) (string, error)
}

多版本资源注册

初始化流程

pkg/master/import_known_versions.go中会初始化所有group的install包

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package master

/*
	k8s现阶段,API一共分为13个Group:
		Core、
		apps、authentication、authorization、
		autoscaling、batch、certificates、componentconfig、
		extensions、imagepolicy、policy、rbac、storage。
	其中Core的Group Name为空,它包含的API是最核心的API,如Pod、Service等,它的代码位于pkg/api下面。
	其它12个Group代码位于pkg/apis。
	每个目录下都有一个install目录,里面有一个install.go文件,接着通过init()负责初始化。
	所有的install都是通过下面的import进行导入的!
*/

// These imports are the API groups the API server will support.
import (
	"fmt"

	_ "k8s.io/kubernetes/pkg/api/install"
	"k8s.io/kubernetes/pkg/apimachinery/registered"
	_ "k8s.io/kubernetes/pkg/apis/apps/install"
	_ "k8s.io/kubernetes/pkg/apis/authentication/install"
	_ "k8s.io/kubernetes/pkg/apis/authorization/install"
	_ "k8s.io/kubernetes/pkg/apis/autoscaling/install"
	_ "k8s.io/kubernetes/pkg/apis/batch/install"
	_ "k8s.io/kubernetes/pkg/apis/certificates/install"
	_ "k8s.io/kubernetes/pkg/apis/componentconfig/install"
	_ "k8s.io/kubernetes/pkg/apis/extensions/install"
	_ "k8s.io/kubernetes/pkg/apis/imagepolicy/install"
	_ "k8s.io/kubernetes/pkg/apis/policy/install"
	_ "k8s.io/kubernetes/pkg/apis/rbac/install"
	_ "k8s.io/kubernetes/pkg/apis/storage/install"
)

func init() {
	if missingVersions := registered.ValidateEnvRequestedVersions(); len(missingVersions) != 0 {
		panic(fmt.Sprintf("KUBE_API_VERSIONS contains versions that are not installed: %q.", missingVersions))
	}
}

在所有的/install/install.go文件中,都会生成groupMeta,并向registered.DefaultAPIRegistrationManager注册。这个groupMeta中包含一个DefaultRESTMapperpkg/api/install/install.go中core group的install包为例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func init() {
    // 通过registered.RegisterVersions将core group所有的versions注册到DefaultAPIRegistrationManager中
	registered.RegisterVersions(availableVersions)
	externalVersions := []unversioned.GroupVersion{}
	for _, v := range availableVersions {
        // 判断下环境变量KUBE_API_VERSIONS的设置是否允许该gv,并append成一个切片,默认情况下,是不会设置环境变量KUBE_API_VERSIONS,就Core Group而言,此时externalVersions的值应该是 v1
		if registered.IsAllowedVersion(v) {
			externalVersions = append(externalVersions, v)
		}
	}
	if len(externalVersions) == 0 {
		glog.V(4).Infof("No version is registered for group %v", api.GroupName)
		return
	}
	// 再进行gv的enable,其实就是存入APIRegistrationManager.enabledVersions
	if err := registered.EnableVersions(externalVersions...); err != nil {
		glog.V(4).Infof("%v", err)
		return
	}
	if err := enableVersions(externalVersions); err != nil {
		glog.V(4).Infof("%v", err)
		return
	}
}

最后一步的err := enableVersions(externalVersions)非常重要,完成了填充Scheme,初始化groupMeta的步骤:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func enableVersions(externalVersions []unversioned.GroupVersion) error {
    
	// 将所有的GroupVersions添加到Scheme
    
    addVersionsToScheme(externalVersions...)
    
    // 将第一个GroupVersion作为默认的值 ""/v1
    
	preferredExternalVersion := externalVersions[0]
    
    // 这里开始进行GroupMeta的初始化。主要是用external versions去填充GroupMeta以及其成员RESTMapper。GroupMeta主要用于初始化APIGroupVersion
    
	groupMeta := apimachinery.GroupMeta{
		GroupVersion:  preferredExternalVersion,
		GroupVersions: externalVersions,
		RESTMapper:    newRESTMapper(externalVersions),
		SelfLinker:    runtime.SelfLinker(accessor),
		InterfacesFor: interfacesFor,
	}
    
    // registered.RegisterGroup(groupMeta)其实就是以第一个GroupVersion的groupName为key,groupMeta为value,向APIRegistrationManager注册了v1版本的groupMeta。所有group的/install/install.go文件中,都会生成groupMeta,并向registered.DefaultAPIRegistrationManager注册
    
	if err := registered.RegisterGroup(groupMeta); err != nil {
		return err
	}
	return nil
}
  • addVersionsToScheme:将所有的Versions添加到Scheme
  • 生成一个groupMeta,即groupMeta的初始化
  • registered.RegisterGroup(groupMeta),真正注册一个group

其中newRESTMapper(externalVersions)其实包含的是一种转换关系,resource到kind,kind到resource,kind到scope的转换。 RESTMapper映射是指GVR(GroupVersionResource)和GVK(GroupVersionKind)的关系,可以通过GVR找到合适的GVK

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func newRESTMapper(externalVersions []unversioned.GroupVersion) meta.RESTMapper {
	// the list of kinds that are scoped at the root of the api hierarchy
	// if a kind is not enumerated here, it is assumed to have a namespace scope
	//rootScoped枚举列出的是API最顶层的对象,可以理解为没有namespace的对象。
	rootScoped := sets.NewString(
		"Node",
		"Namespace",
		"PersistentVolume",
		"ComponentStatus",
	)

	// these kinds should be excluded from the list of resources
	//ignoredKinds是下面接口需要用到的参数,表示遍历Scheme时忽略这些kinds。
	ignoredKinds := sets.NewString(
		"ListOptions",
		"DeleteOptions",
		"Status",
		"PodLogOptions",
		"PodExecOptions",
		"PodAttachOptions",
		"PodProxyOptions",
		"NodeProxyOptions",
		"ServiceProxyOptions",
		"ThirdPartyResource",
		"ThirdPartyResourceData",
		"ThirdPartyResourceList")

	/*
		调用api.NewDefaultRESTMapper(),
		==>定义在pkg/api/mapper.go
			==>func NewDefaultRESTMapper
		importPrefix 的值为:"k8s.io/kubernetes/pkg/api",
		externalVersions: [v1]
		interfacesFor是一个函数func interfacesFor(version unversioned.GroupVersion)
	*/
	mapper := api.NewDefaultRESTMapper(externalVersions, interfacesFor, importPrefix, ignoredKinds, rootScoped)

	return mapper
}

其中NewDefaultRESTMapper定义在pkg/api/mapper.go

1
2
3
4
5
6
7
// 根据在api.Scheme中注册的types来实例化一个DefaultRESTMapper
	
func NewDefaultRESTMapper(defaultGroupVersions []unversioned.GroupVersion, interfacesFunc meta.VersionInterfacesFunc,
	importPathPrefix string, ignoredKinds, rootScoped sets.String) *meta.DefaultRESTMapper {
	// 指定一个Scheme,并继续调用下面的接口
	return NewDefaultRESTMapperFromScheme(defaultGroupVersions, interfacesFunc, importPathPrefix, ignoredKinds, rootScoped, Scheme)
}

再调用NewDefaultRESTMapperFromScheme函数,它主要流程是:

  1. 先创建了一个空的DefaultRESTMapper,
  2. 然后根据"/api/v1“的groupVersion(只举了其中的一个groupversion,所以可以依据defaultGroupVersions来区别DefaultRESTMapper),
  3. 遍历Scheme中所有的kinds,
  4. 接着再调用mapper.Add(gvk, scope)去填充这个mapper,
  5. 最后返回该mapper。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/*
	基于指定的 Scheme 中注册的“types”实例化一个DefaultRESTMapper

	scope=RESTScopeNamespace或RESTScopeRoot

NewDefaultRESTMapperFromScheme()函数依据传入的defaultGroupVersions和interfacesFunc参数生成mapper,
然后把在Scheme中defaultGroupVersions下的资源注册到mapper中。
这里的Scheme即api.Scheme,全部的类型都会注册到api.Scheme中。
所以可以依据defaultGroupVersions来区别DefaultRESTMapper。
*/
func NewDefaultRESTMapperFromScheme(defaultGroupVersions []unversioned.GroupVersion, interfacesFunc meta.VersionInterfacesFunc,
	importPathPrefix string, ignoredKinds, rootScoped sets.String, scheme *runtime.Scheme) *meta.DefaultRESTMapper {

	/*
		初始化了一个DefaultRESTMapper对象
		meta.NewDefaultRESTMapper定义在/pkg/api/meta/restmapper.go
	*/
	mapper := meta.NewDefaultRESTMapper(defaultGroupVersions, interfacesFunc)
	fmt.Println("defaultGroupVersions is: ", reflect.ValueOf(defaultGroupVersions))
	fmt.Println("initial time, mapper is: ", reflect.ValueOf(mapper))
	// enumerate all supported versions, get the kinds, and register with the mapper how to address
	// our resources.
	/*
		遍历所有支持的versions,获取kinds,在mapper中注册如何去address our resource
	*/
	/*
		根据输入的defaultGroupVersions,比如"/api/v1",
		从Scheme中遍历所有的kinds,
		然后进行Add
	*/
	for _, gv := range defaultGroupVersions {
		//从scheme获取一个指定GV的所有Type
		for kind, oType := range scheme.KnownTypes(gv) {
			fmt.Println("gv, kind is:", gv, kind)
			gvk := gv.WithKind(kind)
			// TODO: Remove import path check.
			// We check the import path because we currently stuff both "api" and "extensions" objects
			// into the same group within Scheme since Scheme has no notion of groups yet.
            
			if !strings.Contains(oType.PkgPath(), importPathPrefix) || ignoredKinds.Has(kind) {
				continue
			}
			// 判断该kind是否有namespace属性
			scope := meta.RESTScopeNamespace
			if rootScoped.Has(kind) {
				scope = meta.RESTScopeRoot
			}
			/*
				然后将该gvk加入到对应的组中
				Add定义在/pkg/api/meta/restmapper.go
				==>func (m *DefaultRESTMapper) Add(kind unversioned.GroupVersionKind, scope RESTScope)
			*/
			mapper.Add(gvk, scope)
		}
	}
	return mapper
}

其中的mapper.Add(gvk, scope)方法是把GVK(kind)和GVK对应的scope加入到DefaultRESTMapper对应的map属性中

type DefaultRESTMapper struct中字段的含义:

  • defaultGroupVersions: 默认的GroupVersion,如v1,apps/v1beta1等,一般一个DefaultRESTMapper只设一个默认的GroupVersion
  • resourceToKind:GVR(单数,复数)到GVK的map;
  • kindToPluralResource:GVK到GVR(复数)的map;
  • kindToScope:GVK到Scope的map;
  • singularToPlural:GVR(单数)到GVR(复数)的map;
  • interfacesFunc:用来产生Convertor和MetadataAccessor,具体实现为/pkg/api/install/install.go中的interfacesFor()函数。
  • aliasToResource:用于将别名映射到资源
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
type DefaultRESTMapper struct {

	defaultGroupVersions []unversioned.GroupVersion

	resourceToKind       map[unversioned.GroupVersionResource]unversioned.GroupVersionKind
	kindToPluralResource map[unversioned.GroupVersionKind]unversioned.GroupVersionResource
	kindToScope          map[unversioned.GroupVersionKind]RESTScope
	singularToPlural     map[unversioned.GroupVersionResource]unversioned.GroupVersionResource
	pluralToSingular     map[unversioned.GroupVersionResource]unversioned.GroupVersionResource

	interfacesFunc VersionInterfacesFunc

	// aliasToResource is used for mapping aliases to resources
	aliasToResource map[string][]string
}

addVersionsToScheme:将所有的Versions添加到Scheme,Apiserver全局范围内,只有一个Scheme,即api.Scheme。 所有的GroupVersion受这个api.Scheme管理。所有的GroupVersion的Type都是往这个全局唯一的api.Scheme里面注册。

Scheme`定义在`pkg/api/register.go`,`NewScheme()`定义在`/pkg/runtime/scheme.go
var Scheme = runtime.NewScheme()

在Scheme的定义里面

  • 一个Type,就是一个特定的Go Struct
  • 一个Version,是该Type的特定表示的时间点标识符(通常向后兼容)
  • 一个Kind,是一个Type在该Version中的唯一name
  • 一个Group,标识了一组Versions, Kinds, and Types
  • 一个Unversioned Type,是一种还没正式绑定到一个Type的Type,会被往后兼容

RESTMapper管理的是GVR和GVK的关系,Scheme管理的是GVK和Type的关系

pkg/runtime/scheme.go中,定义了type Scheme struct

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
type Scheme struct {
	// versionMap allows one to figure out the go type of an object with
	// the given version and name.
	// 用gvk找出对应的Type,一个gvk只能对应一个Type
	gvkToType map[unversioned.GroupVersionKind]reflect.Type

	// typeToGroupVersion allows one to find metadata for a given go object.
	// The reflect.Type we index by should *not* be a pointer.
	// 存储Type和gvk的关系,一个type可能对应多个GVK
	typeToGVK map[reflect.Type][]unversioned.GroupVersionKind

	// unversionedTypes are transformed without conversion in ConvertToVersion.
	// 记录没有版本控制的Type(即unversionedTypes)和GVK的关系,unversionedTypes无需版本转换
	unversionedTypes map[reflect.Type]unversioned.GroupVersionKind

	// unversionedKinds are the names of kinds that can be created in the context of any group
	// or version
    // 记录unversioned的GVK和Type的关系
	unversionedKinds map[string]reflect.Type

	// Map from version and resource to the corresponding func to convert
	// resource field labels in that version to internal version.
	fieldLabelConversionFuncs map[string]map[string]FieldLabelConversionFunc

	// defaulterFuncs is an array of interfaces to be called with an object to provide defaulting
	// the provided object must be a pointer.
	defaulterFuncs map[reflect.Type]func(interface{})

	// converter stores all registered conversion functions. It also has
	// default coverting behavior.
	// converter存储所有注册转换函数。 它还具有默认转换功能。用来转换不同版本的结构体值
	converter *conversion.Converter

	// cloner stores all registered copy functions. It also has default
	// deep copy behavior.
	// 用来获取结构体值的拷贝
	cloner *conversion.Cloner
}

Kubernetes内部组件的流通的结构体值使用的是内部版本,所有的外部版本都要向内部版本进行转换; 内部版本必须转换成外部版本才能进行输出。 外部版本之间不能直接转换。 etcd中存储的是带有版本的数据

addVersionsToScheme()函数中,主要就是向Scheme注册internal version和external version

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func addVersionsToScheme(externalVersions ...unversioned.GroupVersion) {
	// add the internal version to Scheme
    /* 将internal version加入到api.Scheme。
		Scheme就是在pkg/api/register.go中初始化的
		var Scheme = runtime.NewScheme()
	*/
	if err := api.AddToScheme(api.Scheme); err != nil {
		// Programmer error, detect immediately
		panic(err)
	}
	// add the enabled external versions to Scheme
	for _, v := range externalVersions {
		if !registered.IsEnabledVersion(v) {
			glog.Errorf("Version %s is not enabled, so it will not be added to the Scheme.", v)
			continue
		}
		switch v {
		case v1.SchemeGroupVersion:
			if err := v1.AddToScheme(api.Scheme); err != nil {
				// Programmer error, detect immediately
				panic(err)
			}
		}
	}
}

func (c completedConfig) New()基于给定的配置生成一个新的Master实例,在这个方法中生成各版本资源对应的RESTful API。

func (c completedConfig) New()方法的流程如下:

  1. 调用func (c completedConfig) New() (*GenericAPIServer, error),创建一type GenericAPIServer struct实例
  2. 判断是否enable了用于Watch的Cache,和etcd建立连接
  3. 调用InstallLegacyAPI进行”/api“的API安装
  4. 调用InstallAPIs进行”/apis“的API安装,如果其处于enabled状态
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
func (c completedConfig) New() (*Master, error) {
	if reflect.DeepEqual(c.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
		return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
	}
	/* 
	
	返回值s中包涵了s.HandlerContainer,也就是说s.HandlerContainer在这里完成了初始化
	里面还完成WebService的创建,该WebService是用于list 一个group下的所有versions,因为只注册了简单的路由规则。同时把WebService注入到了s.HandlerContainer中,真正核心的注册都会在pkg/apiserver/apiserver.go中的func (g *APIGroupVersion) InstallREST 中进行
    
    */
	s, err := c.Config.GenericConfig.SkipComplete().New() // completion is done in Complete, no need for a second time
	if err != nil {
		return nil, err
	}

	if c.EnableUISupport {
		routes.UIRedirect{}.Install(s.HandlerContainer)
	}
	if c.EnableLogsSupport {
		routes.Logs{}.Install(s.HandlerContainer)
	}

	m := &Master{
		GenericAPIServer: s,
	}

	restOptionsFactory := restOptionsFactory{
		deleteCollectionWorkers: c.DeleteCollectionWorkers,
		enableGarbageCollection: c.GenericConfig.EnableGarbageCollection,
		storageFactory:          c.StorageFactory,
	}
	
    /* 
    判断是否enable了用于Watch的Cache。有无cache,赋值的是不同的接口实现。
    有无cache的接口差异就在于:
			有cache的话,就提供操作cache的接口;
			无cache的话,就提供直接操作etcd的接口
    */
    
	if c.EnableWatchCache {
		restOptionsFactory.storageDecorator = registry.StorageWithCacher
	} else {
		restOptionsFactory.storageDecorator = generic.UndecoratedStorage
	}

	// install legacy rest storage
    // 判断/api/v1的group是否已经注册并enable,是的话再进行install
    
	if c.GenericConfig.APIResourceConfigSource.AnyResourcesForVersionEnabled(apiv1.SchemeGroupVersion) {
		legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
			StorageFactory:       c.StorageFactory,
			ProxyTransport:       c.ProxyTransport,
			KubeletClientConfig:  c.KubeletClientConfig,
			EventTTL:             c.EventTTL,
			ServiceIPRange:       c.ServiceIPRange,
			ServiceNodePortRange: c.ServiceNodePortRange,
			LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
		}
        
        // 进行"/api"的API安装
        
		m.InstallLegacyAPI(c.Config, restOptionsFactory.NewFor, legacyRESTStorageProvider)
	}

	restStorageProviders := []genericapiserver.RESTStorageProvider{
		appsrest.RESTStorageProvider{},
		authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authenticator},
		authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorizer},
		autoscalingrest.RESTStorageProvider{},
		batchrest.RESTStorageProvider{},
		certificatesrest.RESTStorageProvider{},
		extensionsrest.RESTStorageProvider{ResourceInterface: thirdparty.NewThirdPartyResourceServer(s, c.StorageFactory)},
		policyrest.RESTStorageProvider{},
		rbacrest.RESTStorageProvider{AuthorizerRBACSuperUser: c.GenericConfig.AuthorizerRBACSuperUser},
		storagerest.RESTStorageProvider{},
	}
    
    // 进行"/apis"的API安装
    
	m.InstallAPIs(c.Config.GenericConfig.APIResourceConfigSource, restOptionsFactory.NewFor, restStorageProviders...)

	if c.Tunneler != nil {
		m.installTunneler(c.Tunneler, coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes())
	}

	return m, nil
}

其中InstallLegacyAPI()进行了/api的安装

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (m *Master) InstallLegacyAPI(c *Config, restOptionsGetter genericapiserver.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) {

	glog.Infof("生成apiGroupInfo, apiGroupInfo携带着restStorageMap")
	legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
	if err != nil {
		glog.Fatalf("Error building core storage: %v", err)
	}


	if c.EnableCoreControllers {
		serviceClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)

		bootstrapController := c.NewBootstrapController(legacyRESTStorage, serviceClient)
		if err := m.GenericAPIServer.AddPostStartHook("bootstrap-controller", bootstrapController.PostStartHook); err != nil {
			glog.Fatalf("Error registering PostStartHook %q: %v", "bootstrap-controller", err)
		}
	}

	// install core Group's API
	/*
		调用InstallLegacyAPIGroup,定义在
		==>/pkg/genericapiserver/genericapiserver.go
			==>func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo)
	*/
	if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
		glog.Fatalf("Error in registering group versions: %v", err)
	}
}

其中在NewLegacyRESTStorage方法中,会进行storage的创建,其流程如下:

  1. 生成一个type APIGroupInfo struct实例,这个和前面说的资源注册的APIRegistrationManager、Scheme、GroupMeta...有关系。
  2. 初始化一个LegacyRESTStorage对象,即restStorage
  3. 创建各类Storage,如podStorage、nodeStorage..
  4. 把步骤3中创建的各种Storage保存到restStorageMap中,然后装在到APIGroupInfo中,APIGroupInfo.VersionedResourcesStorageMap[“v1”]。这是API映射map,这很重要,在后面的利用APIGroupInfo来生成APIGroupVersion的时候,就是依靠这个map映射关系来获取对应version的资源的rest strorage实现。
  5. return restStorage, APIGroupInfo

InstallLegacyAPIGroup()方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
	// 判断前缀参数是否正确
	/*
		s.legacyAPIGroupPrefixes is: map[/api:{}]
	*/
	if !s.legacyAPIGroupPrefixes.Has(apiPrefix) {
		return fmt.Errorf("%q is not in the allowed legacy API prefixes: %v", apiPrefix, s.legacyAPIGroupPrefixes.List())
	}
	/*
		关键接口,真正install API(转化为resuful api)
	*/
	if err := s.installAPIResources(apiPrefix, apiGroupInfo); err != nil {
		return err
	}

	// setup discovery
	/*
		获取了该Group下所有的version信息
		添加一个WebService,其route路径是/api
	*/
	apiVersions := []string{}
	for _, groupVersion := range apiGroupInfo.GroupMeta.GroupVersions {
		apiVersions = append(apiVersions, groupVersion.Version)
	}
	// Install the version handler.
	// Add a handler at /<apiPrefix> to enumerate the supported api versions.
	apiserver.AddApiWebService(s.Serializer, s.HandlerContainer.Container, apiPrefix, func(req *restful.Request) *unversioned.APIVersions {
		clientIP := utilnet.GetClientIP(req.Request)

		apiVersionsForDiscovery := unversioned.APIVersions{
			ServerAddressByClientCIDRs: s.discoveryAddresses.ServerAddressByClientCIDRs(clientIP),
			Versions:                   apiVersions,
		}
		return &apiVersionsForDiscovery
	})
	return nil
}

其中installAPIResources()方法用于安装每个api groupversionresource的REST存储,基本流程如下:

  1. 遍历该Group的所有versions(一个Group调用一次本函数,亦即所有Group最后都是调用本函数来安装Restful API)
  2. 基于apiGroupInfo, groupVersion, apiPrefix创建一个type APIGroupVersion struct对象
  3. 根据创建的APIGroupVersion,然后安装restful API,apiGroupVersion.InstallREST
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
	// 遍历该Group下的所有GroupVersons
	for _, groupVersion := range apiGroupInfo.GroupMeta.GroupVersions {
		/*
			创建APIGroupVersion

			调用func (s *GenericAPIServer) getAPIGroupVersion
		*/
		apiGroupVersion, err := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
		if err != nil {
			return err
		}
		if apiGroupInfo.OptionsExternalVersion != nil {
			apiGroupVersion.OptionsExternalVersion = apiGroupInfo.OptionsExternalVersion
		}

		/*
			根据之前创建的APIGroupVersion,然后安装restful API
			该s.HandlerContainer.Container就是go-restful的Container
			InstallREST 定义在:pkg/apiserver/apiserver.go
					==>func (g *APIGroupVersion) InstallREST(container *restful.Container)
		*/
		if err := apiGroupVersion.InstallREST(s.HandlerContainer.Container); err != nil {
			return fmt.Errorf("Unable to setup API %v: %v", apiGroupInfo, err)
	}

	return nil
}

最后调用InstallREST()安装restful API,InstallREST()将REST handlers(storage, watch, proxy and redirect)注册到go-restful框架的Container中,流程如下:

  1. 创建了一个type APIInstaller struct对象
  2. 构造一个webservice
  3. 往webservice里面添加Route
  4. 往container中添加webservice
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
	/*
		newInstaller()  拼装path: "Prefix/Group/Version"
		然后填充并返回一个APIInstaller对象
	*/
	installer := g.newInstaller()
	// 创建一个WebService,设置了ws的path属性
	ws := installer.NewWebService()
	/*
		*********************************************
		*********************************************
		调用Install函数
		这个是关键,会对各种URL进行注册!
		在这个注册的过程中,InstallREST最终调用了registerResourceHandlers()接口,
		registerResourceHandlers()接口最终会把一个rest.Storage对象转换成实际的getter、lister等处理函数,
		并和实际的URL关联起来。
	*/
	apiResources, registrationErrors := installer.Install(ws)
	/*
		一个list功能的API
		添加了一个Route,对应路径是"/"
		访问形如"Prefix/Group/Version"这样的根路径时候,返回该GroupVersion所支持的resources
	    curl http://192.168.56.101:8080/api/v1
	*/
	lister := g.ResourceLister
	if lister == nil {
		lister = staticLister{apiResources}
	}
	AddSupportedResourcesWebService(g.Serializer, ws, g.GroupVersion, lister)
	// 将该WebService加入到Container
	container.Add(ws)
	return utilerrors.NewAggregate(registrationErrors)
}

InstallAPIs()进行/apis的安装

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (m *Master) InstallAPIs(apiResourceConfigSource genericapiserver.APIResourceConfigSource, restOptionsGetter genericapiserver.RESTOptionsGetter, restStorageProviders ...genericapiserver.RESTStorageProvider) {
	apiGroupsInfo := []genericapiserver.APIGroupInfo{}

	for _, restStorageBuilder := range restStorageProviders {
		groupName := restStorageBuilder.GroupName()
		if !apiResourceConfigSource.AnyResourcesForGroupEnabled(groupName) {
			glog.V(1).Infof("Skipping disabled API group %q.", groupName)
			continue
		}
		apiGroupInfo, enabled := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
		if !enabled {
			glog.Warningf("Problem initializing API group %q, skipping.", groupName)
			continue
		}
		glog.V(1).Infof("Enabling API group %q.", groupName)

		if postHookProvider, ok := restStorageBuilder.(genericapiserver.PostStartHookProvider); ok {
			name, hook, err := postHookProvider.PostStartHook()
			if err != nil {
				glog.Fatalf("Error building PostStartHook: %v", err)
			}
			if err := m.GenericAPIServer.AddPostStartHook(name, hook); err != nil {
				glog.Fatalf("Error registering PostStartHook %q: %v", name, err)
			}
		}

		apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
	}

	for i := range apiGroupsInfo {
		/*
			调用func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo)
			==>定义在/pkg/genericapiserver/genericapiserver.go
		*/
		if err := m.GenericAPIServer.InstallAPIGroup(&apiGroupsInfo[i]); err != nil {
			glog.Fatalf("Error in registering group versions: %v", err)
		}
	}
}

s, err := c.Config.GenericConfig.SkipComplete().New()会根据config创建了一个GenericAPIServer实例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func (c completedConfig) New() (*GenericAPIServer, error) {
	if c.Serializer == nil {
		return nil, fmt.Errorf("Genericapiserver.New() called with config.Serializer == nil")
	}

	s := &GenericAPIServer{
		discoveryAddresses:     c.DiscoveryAddresses,
		LoopbackClientConfig:   c.LoopbackClientConfig,
		/*
			c.LegacyAPIGroupPrefixes值是/api,取值于
				==>/pkg/genericapiserver/config.go
					==>DefaultLegacyAPIPrefix = "/api"
		*/
		legacyAPIGroupPrefixes: c.LegacyAPIGroupPrefixes,
		admissionControl:       c.AdmissionControl,
		requestContextMapper:   c.RequestContextMapper,
		Serializer:             c.Serializer,

		minRequestTimeout:    time.Duration(c.MinRequestTimeout) * time.Second,
		enableSwaggerSupport: c.EnableSwaggerSupport,

		SecureServingInfo:   c.SecureServingInfo,
		InsecureServingInfo: c.InsecureServingInfo,
		ExternalAddress:     c.ExternalAddress,

		apiGroupsForDiscovery: map[string]unversioned.APIGroup{},

		enableOpenAPISupport: c.EnableOpenAPISupport,
		openAPIConfig:        c.OpenAPIConfig,

		postStartHooks: map[string]postStartHookEntry{},
	}

	/*
		这里进行了HandlerContainer的初始化
		NewAPIContainer定义在/pkg/genericapiserver/mux/container.go
			==>func NewAPIContainer(mux *http.ServeMux, s runtime.NegotiatedSerializer) *APIContainer

		传进去的两个参数:
				http.NewServeMux()新建了一个http的ServeMux;
				c.Serializer则是实现了编解码序列化反序列化的对象
	*/
	s.HandlerContainer = mux.NewAPIContainer(http.NewServeMux(), c.Serializer)
	/*
		上面Container已创建并且也进行了初始化。该轮到WebService了,添加了WebService
	*/
	s.installAPI(c.Config)

	s.Handler, s.InsecureHandler = c.BuildHandlerChainsFunc(s.HandlerContainer.ServeMux, c.Config)

	return s, nil
}

整个初始化流程:

  1. initial.go中的初始化主要用internal versionexternal versions填充了Scheme,完成了 APIRegistrationManagerGroupMeta的初始化。GroupMeta的主要用于后面的初始化APIGroupVersion
  2. 初始化groupMeta的时候会根据SchemeexternalVersions新建一个RESTMapper
  3. /pkg/registry/core/rest/storage_core.go中的NewLegacyRESTStorage基于上面的SchemeGroupMeta生成了一个APIGroupInfo
  4. 然后基于APIGroupInfo生成一个APIGroupVersion
  5. 基于APIGroupVersion来生成Restful API

多版本资源的初始化调用图

init
init

资源注册成RESTful API调用图

restful api
restful api

kube-apiserver端List-Watch机制

apiserver针对集群中的每一类资源都会与etcd建立一个连接,获取该资源的opt,watch功能是其中一个opt。kubelet、kube-controller-manager、kube-scheduler需要监控各种资源的变化, 当这些对象发生变化时(add、delete、update),kube-apiserver能够主动通知这些组件。而apiserver端的Watch机制是建立在etcd的Watch基础上的。 etcd的watch是没有过滤功能的,而kube-apiserver增加了过滤功能,能将订阅方感兴趣的部分资源发给订阅方。

Event数据流向如下:

  1. 从etcd–>Cacher,是一个watchCache,存储apiserver从etcd那里watch到的对象。
  2. 结合etcd和Cacher的resourceVersion进行对比,形成一个WatchEvent,分发到各个观察者watcher中

/pkg/storage/cacher.go中的func NewCacherFromConfig(),用来创建一个新的cacher,负责服务内部的watch-list缓存请求,并在后台更新缓存,流程如下:

  1. 新建一个watchCache,用来存储apiserver从etcd那里watch到的对象
  2. 新建一个listerWatcher
  3. 实例化一个type Cacher struct对象,其核心是reflector机制
  4. 启动dispatchEvents协程,分发event到各个订阅方
  5. cacher.startCaching(stopCh)
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
func NewCacherFromConfig(config CacherConfig) *Cacher {
	/*
		新建一个watchCache,用来存储apiserver从etcd那里watch到的对象
	*/
	watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc)
	/*
		对config.Storage进行list和watch
		config.Storage是数据源(可以简单理解为etcd、带cache的etcd),一个资源的etcd handler
	*/
	listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)

	// Give this error when it is constructed rather than when you get the
	// first watch item, because it's much easier to track down that way.
	/*
		译:在构造时给出错误,而不是在第一次去watch该item时。因为这种方式更容易跟踪。

		编码器进行类型检查
	*/
	if obj, ok := config.Type.(runtime.Object); ok {
		if err := runtime.CheckCodec(config.Codec, obj); err != nil {
			panic("storage codec doesn't seem to match given type: " + err.Error())
		}
	}

	/*
		Cacher接口必然也实现了storage.Interface接口所需要的方法。
		因为该Cacher只用于WATCH和LIST的request,
		所以可以看下cacher提供的API,除了WATCH和LIST相关的之外的接口都是调用了之前创建的storage的API。

		四个重要的成员:storage、watchCache、reflector、watchers
	*/
	cacher := &Cacher{
		ready: newReady(),
		//config.Storage就是和etcd建立连接后返回该资源的handler
		storage:    config.Storage,
		objectType: reflect.TypeOf(config.Type),
		//watchCache用来存储apiserver从etcd那里watch到的对象
		watchCache: watchCache,
		/*
			reflector这个对象,包含两个重要的数据成员listerWatcher和watchCache,
			而listerWatcher包装了config.Storage,会对storage进行list和watch。
			reflector工作主要是将watch到的config.Type类型的对象存放到watcherCache中。
			==>定义在/pkg/client/cache/reflector.go
				==>func NewReflector
		*/
		reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
		//Versioner控制resource的版本
		versioner:   config.Versioner,
		triggerFunc: config.TriggerPublisherFunc,
		watcherIdx:  0,
		/*
			allWatchers、valueWatchers 都是一个map,map的值类型为cacheWatcher,
			当kubelet、kube-scheduler需要watch某类资源时,
			他们会向kube-apiserver发起watch请求,kube-apiserver就会生成一个cacheWatcher,
			他们负责将watch的资源通过http从apiserver传递到kubelet、kube-scheduler
				==>event分发功能是在下面的 go cacher.dispatchEvents()中完成

			watcher是kube-apiserver watch的发布方和订阅方的枢纽
			watchers是在哪里注册添加成员的???
				==>func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCacheEvent, filter filterObjectFunc, forget func(bool)) *cacheWatcher {
		*/
		watchers: indexedWatchers{
			allWatchers:   make(map[int]*cacheWatcher),
			valueWatchers: make(map[string]watchersMap),
		},
		// TODO: Figure out the correct value for the buffer size.
		/*
			incoming会被分发到 watchers中

			这个要和/pkg/storage/etcd/etcd_watcher.go中的channel etcdIncoming进行区分,两者不是一个通道
		*/
		incoming: make(chan watchCacheEvent, 100),
		// We need to (potentially) stop both:
		// - wait.Until go-routine
		// - reflector.ListAndWatch
		// and there are no guarantees on the order that they will stop.
		// So we will be simply closing the channel, and synchronizing on the WaitGroup.
		stopCh: make(chan struct{}),
	}
	/*
		设置watchCache的onEvent这个handler。
		cacher.processEvent是incoming chan watchCacheEvent的生产者
	*/
	watchCache.SetOnEvent(cacher.processEvent)
	/*
		完成event分发功能,把event分发到对应的watchers中。
		是incoming chan watchCacheEvent的消费者
	*/
	go cacher.dispatchEvents()

	stopCh := cacher.stopCh
	cacher.stopWg.Add(1)
	go func() {
		defer cacher.stopWg.Done()
		wait.Until(
			func() {
				if !cacher.isStopped() {
					/*
						apiserver端,list-watch机制 V1.0
					*/
					cacher.startCaching(stopCh)
				}
			}, time.Second, stopCh,
		)
	}()
	return cacher
}

startCaching()方法流程如下:

  1. 首先会通过terminateAllWatchers注销所有的cachewatcher,因为这个时候apiserver还处于初始化阶段,因此不可能接受其他组件的watch,也就不可能有watcher。
  2. 然后调用c.reflector.ListAndWatch函数,完成前面说过的功能:reflector主要将apiserver组件从etcd中watch到的资源存储到watchCache中。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
	// The 'usable' lock is always 'RLock'able when it is safe to use the cache.
	// It is safe to use the cache after a successful list until a disconnection.
	// We start with usable (write) locked. The below OnReplace function will
	// unlock it after a successful list. The below defer will then re-lock
	// it when this function exits (always due to disconnection), only if
	// we actually got a successful list. This cycle will repeat as needed.
	/*
		在连接中断之前,在一个成功的lis操作之后使用cache是读写安全的
	*/
	successfulList := false
	c.watchCache.SetOnReplace(func() {
		successfulList = true
		c.ready.set(true)
	})
	defer func() {
		if successfulList {
			c.ready.set(false)
		}
	}()

	//终止所有的watcher
	c.terminateAllWatchers()
	// Note that since onReplace may be not called due to errors, we explicitly
	// need to retry it on errors under lock.
	// Also note that startCaching is called in a loop, so there's no need
	// to have another loop here.
	/*
		apiserver端,list-watch机制
		func (c *Cacher) startCaching已经是在一个循环中被调用,所以这里不再有循环
		ListAndWatch(stopChannel)定义在/pkg/client/cache/reflector.go
			==>func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error
	*/
	if err := c.reflector.ListAndWatch(stopChannel); err != nil {
		glog.Errorf("unexpected ListAndWatch error: %v", err)
	}
}

调用Reflector的ListAndWatch()

分析其流程,如下:

  1. 执行list操作
  2. 执行watch操作
  3. 调用func (r *Reflector) watchHandler
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/*
	译:func (r *Reflector) ListAndWatch 首先会list所有的items,得到resource version;
		然后使用该resource version去watch。
		如果ListAndWatch没有尝试去初始化watch,返回error

	注意func (r *Reflector) ListAndWatch函数会被apiserver和kubelet等多个组件复用。
	区别: apiserver去watch etcd,而kubelet去watch apiserver
*/
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
	var resourceVersion string
	resyncCh, cleanup := r.resyncChan()
	defer cleanup()

	// Explicitly set "0" as resource version - it's fine for the List()
	// to be served from cache and potentially be delayed relative to
	// etcd contents. Reflector framework will catch up via Watch() eventually.
	/*
		译:明确把resource version设置为"0"---这样子是适用于对cache进行 List()操作的,虽然可能会造成内容相对于
			etcd中的数据有所延迟。
		   Reflector框架是通过Watch()操作来追赶上来。
	*/
	options := api.ListOptions{ResourceVersion: "0"}
	/*
		apiserver端,list-watch机制 V3.0 ,List操作

		用resource version="0"来进行list操作,
		r.listerWatcher.List定义在/pkg/storage/cacher.go
			==>func (lw *cacherListerWatcher) List(options api.ListOptions)
	*/
	list, err := r.listerWatcher.List(options)
	if err != nil {
		return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
	}
	/*
		获取该类型的List接口,定义在
		==>/pkg/api/meta/meta.go
			==>func ListAccessor(obj interface{}) (List, error)
	*/
	listMetaInterface, err := meta.ListAccessor(list)
	if err != nil {
		return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
	}
	resourceVersion = listMetaInterface.GetResourceVersion()
	items, err := meta.ExtractList(list)
	if err != nil {
		return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
	}
	if err := r.syncWith(items, resourceVersion); err != nil {
		return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
	}
	r.setLastSyncResourceVersion(resourceVersion)

	resyncerrc := make(chan error, 1)
	cancelCh := make(chan struct{})
	defer close(cancelCh)
	go func() {
		for {
			select {
			case <-resyncCh:
			case <-stopCh:
				return
			case <-cancelCh:
				return
			}
			glog.V(4).Infof("%s: forcing resync", r.name)
			if err := r.store.Resync(); err != nil {
				resyncerrc <- err
				return
			}
			cleanup()
			resyncCh, cleanup = r.resyncChan()
		}
	}()

	for {
		timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
		options = api.ListOptions{
			ResourceVersion: resourceVersion,
			// We want to avoid situations of hanging watchers. Stop any wachers that do not
			// receive any events within the timeout window.
			TimeoutSeconds: &timemoutseconds,
		}

		/*
			apiserver端,list-watch机制 V3.0 ,Watch操作
			定义在/pkg/storage/cacher.go
			==>func (lw *cacherListerWatcher) Watch(options api.ListOptions) (watch.Interface, error)

			生成一个watcher,该watcher实现了watch.Interface(用接口来让kubelet、apiserver复用该接口)
		*/
		w, err := r.listerWatcher.Watch(options)
		if err != nil {
			switch err {
			case io.EOF:
				// watch closed normally
			case io.ErrUnexpectedEOF:
				glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
			default:
				utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
			}
			// If this is "connection refused" error, it means that most likely apiserver is not responsive.
			// It doesn't make sense to re-list all objects because most likely we will be able to restart
			// watch where we ended.
			// If that's the case wait and resend watch request.
			if urlError, ok := err.(*url.Error); ok {
				if opError, ok := urlError.Err.(*net.OpError); ok {
					if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
						time.Sleep(time.Second)
						continue
					}
				}
			}
			return nil
		}

		/*
			apiserver端,list-watch机制 V4.0
			把上面生成的watcher w传进去
			调用func (r *Reflector) watchHandler
		*/
		if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
			if err != errorStopRequested {
				glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
			}
			return nil
		}
	}
}

其中的list, err := r.listerWatcher.List(options)w, err := r.listerWatcher.Watch(options),真正调用的是etcdHelper的list watch方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (lw *cacherListerWatcher) List(options api.ListOptions) (runtime.Object, error) {
	list := lw.newListFunc()
	/*
		调用storage的List方法,定义在
		==>/pkg/storage/etcd/etcd_helper.go
			==>func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error
	*/
	if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", Everything, list); err != nil {
		return nil, err
	}
	return list, nil
}

// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) Watch(options api.ListOptions) (watch.Interface, error) {
	/*
		调用storage的WatchList方法,定义在
		==>/pkg/storage/etcd/etcd_helper.go
			==>func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate)
	*/
	return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, Everything)
}

err := r.watchHandler是将event对象从channel outgoing中读取出来,而方法中调用的r.store.Add(event.Object)则是将event添加到cache中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func (w *watchCache) Add(obj interface{}) error {
	object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
	if err != nil {
		return err
	}
	/*
		把入口参数object重新包装成event
	*/
	event := watch.Event{Type: watch.Added, Object: object}

	/*
		定义函数面值f, w.store.Add(elem)定义在是pkg/client/cache/store.go
			==>func (c *cache) Add(obj interface{}) error
	*/
	f := func(elem *storeElement) error { return w.store.Add(elem) }
	/*
		调用func (w *watchCache) processEvent
	*/
	return w.processEvent(event, resourceVersion, f)
}

最后的processEvent()方法最终完成了Event从etcd流向Cache。

以上是完成了event的生产过程,最终event都要流向消费它的订阅方,在上面的代码中dispatchEvents()就是那个分发event的方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (c *Cacher) dispatchEvents() {
	for {
		select {
		/*
			type Cacher struct的channel incoming的消费者
		*/
		case event, ok := <-c.incoming:
			if !ok {
				return
			}
			c.dispatchEvent(&event)
		case <-c.stopCh:
			return
		}
	}
}

继续调用dispatchEvent()方法,将event分发给所有的watcher

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
	/*
		获取event中的value、前一个event的value
	*/
	triggerValues, supported := c.triggerValues(event)

	// TODO: For now we assume we have a given <timeout> budget for dispatching
	// a single event. We should consider changing to the approach with:
	// - budget has upper bound at <max_timeout>
	// - we add <portion> to current timeout every second
	timeout := time.Duration(250) * time.Millisecond

	/*
	   RWMutex提供了四个方法:
	   func (*RWMutex) Lock  写锁定
	   func (*RWMutex) Unlock  写解锁
	   func (*RWMutex) RLock  读锁定
	   func (*RWMutex) RUnlock  读解锁
	*/
	c.Lock()
	defer c.Unlock()
	// Iterate over "allWatchers" no matter what the trigger function is.
	/*
		对Cacher中的watchers.allWatchers进行遍历,
		把event 发送到所有的watcher中
	*/
	for _, watcher := range c.watchers.allWatchers {
		watcher.add(event, &timeout)
	}
	if supported {
		// Iterate over watchers interested in the given values of the trigger.
		for _, triggerValue := range triggerValues {
			for _, watcher := range c.watchers.valueWatchers[triggerValue] {
				watcher.add(event, &timeout)
			}
		}
	} else {
		// supported equal to false generally means that trigger function
		// is not defined (or not aware of any indexes). In this case,
		// watchers filters should generally also don't generate any
		// trigger values, but can cause problems in case of some
		// misconfiguration. Thus we paranoidly leave this branch.

		// Iterate over watchers interested in exact values for all values.
		for _, watchers := range c.watchers.valueWatchers {
			for _, watcher := range watchers {
				watcher.add(event, &timeout)
			}
		}
	}
}

其中的watcher.add(event, &timeout)方法,把event分发到一个type cacheWatcher struct

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func (c *cacheWatcher) add(event *watchCacheEvent, timeout *time.Duration) {
	// Try to send the event immediately, without blocking.
	/*
		channel取不到值时,走default通道,就是select语句结束,继续执行后续部分
		如果取到值,直接return
		这里完成event的分发,channel input的生产者,
		其对应的消费者在type cacheWatcher struct
		==>/pkg/storage/cacher.go
			==>func newCacheWatcher
				==>func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uint64)
	*/
	select {
	case c.input <- *event:
		return
	default:
	}

	// OK, block sending, but only for up to <timeout>.
	// cacheWatcher.add is called very often, so arrange
	// to reuse timers instead of constantly allocating.
	/*
		func (c *cacheWatcher) add会很频繁地被调用,设置了一个定时器
	*/
	startTime := time.Now()

	t, ok := timerPool.Get().(*time.Timer)
	if ok {
		t.Reset(*timeout)
	} else {
		t = time.NewTimer(*timeout)
	}
	defer timerPool.Put(t)

	select {
	case c.input <- *event:
		stopped := t.Stop()
		if !stopped {
			// Consume triggered (but not yet received) timer event
			// so that future reuse does not get a spurious timeout.
			<-t.C
		}
	case <-t.C:
		// This means that we couldn't send event to that watcher.
		// Since we don't want to block on it infinitely,
		// we simply terminate it.
		c.forget(false)
		c.stop()
	}

	if *timeout = *timeout - time.Since(startTime); *timeout < 0 {
		*timeout = 0
	}
}

到这里Event已经分发到了各个订阅者的watcher中了,后续各个Watcher组件会从channel input中获取到event

kube-apiserver初始化时,建立对etcd的连接,并对etcd进行watch,将watch的结果存入watchCache。 当其他组件需要watch资源时,其他组件向apiserver发送一个watch请求,这个请求是可以带filter函数的。 apiserver针对这个请求会创建一个watcher,并基于watcher创建WatchServer。 watchCache watch的对象,首先会通过filter函数的过滤,假如过滤通过的话,则会通过WatcherServer发送给订阅组件。

整个list-watch过程的调用链