ApiServer 启动分析
kube-apiserver 启动流程分析
kubernetes 版本:v1.16
首先分析 kube-apiserver 的启动方式,kube-apiserver 也是通过其 Run 方法启动主逻辑的,在Run 方法调用之前会进行解析命令行参数、设置默认值等。
启动流程
资源注册
Cobra命令行参数解析
创建APIServer通用配置
创建APIExtensionServer
创建KubeAPIServer
创建AggregatorServer
创建GenericAPIServer
启动HTTP服务
启动HTTPS服务
权限控制
认证
BasicAuth认证
ClientCA认证
TokenAuth认证
Bootstrap Token认证
RequestHeader认证
Webhook TokenAuth认证
Anonymous 认证
OIDC 认证
ServiceAccountAuth 认证
授权
AlwaysAllow 授权
AlwaysDeny 授权
ABAC 授权
Webhook 授权
RBAC 授权
Node 授权
准入控制器
AlwaysPullImages 准入控制器
PodNodeSelector 准入控制器
进程信号处理机制
常驻进程实现
进程的优雅关闭
向 systemd 报告进程状态
Run
Run 方法的主要逻辑为:
- 1、调用
CreateServerChain构建服务调用链并判断是否启动非安全的 http server,http server 链中包含 apiserver 要启动的三个 server,以及为每个 server 注册对应资源的路由; - 2、调用
server.PrepareRun进行服务运行前的准备,该方法主要完成了健康检查、存活检查和OpenAPI路由的注册工作; - 3、调用
prepared.Run启动 https server;
server 的初始化使用委托模式,通过 DelegationTarget 接口,把基本的 API Server、CustomResource、Aggregator 这三种服务采用链式结构串联起来,对外提供服务。
k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:147
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
server, err := CreateServerChain(completeOptions, stopCh)
if err != nil {
return err
}
prepared, err := server.PrepareRun()
if err != nil {
return err
}
return prepared.Run(stopCh)
}
CreateServerChain
CreateServerChain 是完成 server 初始化的方法,里面包含 APIExtensionsServer、KubeAPIServer、AggregatorServer 初始化的所有流程,最终返回 aggregatorapiserver.APIAggregator 实例,初始化流程主要有:http filter chain 的配置、API Group 的注册、http path 与 handler 的关联以及 handler 后端存储 etcd 的配置。其主要逻辑为:
- 1、调用
CreateKubeAPIServerConfig创建 KubeAPIServer 所需要的配置,主要是创建master.Config,其中会调用buildGenericConfig生成 genericConfig,genericConfig 中包含 apiserver 的核心配置; - 2、判断是否启用了扩展的 API server 并调用
createAPIExtensionsConfig为其创建配置,apiExtensions server 是一个代理服务,用于代理 kubeapiserver 中的其他 server,比如 metric-server; - 3、调用
createAPIExtensionsServer创建 apiExtensionsServer 实例; - 4、调用
CreateKubeAPIServer初始化 kubeAPIServer; - 5、调用
createAggregatorConfig为 aggregatorServer 创建配置并调用createAggregatorServer初始化 aggregatorServer; - 6、配置并判断是否启动非安全的 http server;
k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:165
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
if err != nil {
return nil, err
}
// 1、为 kubeAPIServer 创建配置
kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
if err != nil {
return nil, err
}
// 2、判断是否配置了 APIExtensionsServer,创建 apiExtensionsConfig
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
if err != nil {
return nil, err
}
// 3、初始化 APIExtensionsServer
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
if err != nil {
return nil, err
}
// 4、初始化 KubeAPIServer
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, admissionPostStartHook)
if err != nil {
return nil, err
}
// 5、创建 AggregatorConfig
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig. ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
// 6、初始化 AggregatorServer
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
if err != nil {
return nil, err
}
// 7、判断是否启动非安全端口的 http server
if insecureServingInfo != nil {
insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
return nil, err
}
}
return aggregatorServer, nil
}
CreateKubeAPIServerConfig
在 CreateKubeAPIServerConfig 中主要是调用 buildGenericConfig 创建 genericConfig 以及构建 master.Config 对象。
k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:271
func CreateKubeAPIServerConfig(
s completedServerRunOptions,
nodeTunneler tunneler.Tunneler,
proxyTransport *http.Transport,
) (......) {
// 1、构建 genericConfig
genericConfig, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, lastErr = buildGenericConfig(s.ServerRunOptions, proxyTransport)
if lastErr != nil {
return
}
......
// 2、初始化所支持的 capabilities
capabilities.Initialize(capabilities.Capabilities{
AllowPrivileged: s.AllowPrivileged,
PrivilegedSources: capabilities.PrivilegedSources{
HostNetworkSources: []string{},
HostPIDSources: []string{},
HostIPCSources: []string{},
},
PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec,
})
// 3、获取 service ip range 以及 api server service IP
serviceIPRange, apiServerServiceIP, lastErr := master.DefaultServiceIPRange(s.PrimaryServiceClusterIPRange)
if lastErr != nil {
return
}
......
// 4、构建 master.Config 对象
config = &master.Config{......}
if nodeTunneler != nil {
config.ExtraConfig.KubeletClientConfig.Dial = nodeTunneler.Dial
}
if config.GenericConfig.EgressSelector != nil {
config.ExtraConfig.KubeletClientConfig.Lookup = config.GenericConfig.EgressSelector.Lookup
}
return
}
buildGenericConfig
主要逻辑为:
- 1、调用
genericapiserver.NewConfig生成默认的 genericConfig,genericConfig 中主要配置了DefaultBuildHandlerChain,DefaultBuildHandlerChain中包含了认证、鉴权等一系列 http filter chain; - 2、调用
master.DefaultAPIResourceConfigSource加载需要启用的 API Resource,集群中所有的 API Resource 可以在代码的k8s.io/api目录中看到,随着版本的迭代也会不断变化; - 3、为 genericConfig 中的部分字段设置默认值;
- 4、调用
completedStorageFactoryConfig.New创建 storageFactory,后面会使用 storageFactory 为每种API Resource 创建对应的 RESTStorage;
k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:386
func buildGenericConfig(
s *options.ServerRunOptions,
proxyTransport *http.Transport,
) (......) {
// 1、为 genericConfig 设置默认值
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
genericConfig.MergedResourceConfig = master.DefaultAPIResourceConfigSource()
if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
return
}
......
genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(......)
genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
sets.NewString("watch", "proxy"),
sets.NewString("attach", "exec", "proxy", "log", "portforward"),
)
kubeVersion := version.Get()
genericConfig.Version = &kubeVersion
storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
storageFactoryConfig.ApiResourceConfig = genericConfig.MergedResourceConfig
completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
if err != nil {
lastErr = err
return
}
// 初始化 storageFactory
storageFactory, lastErr = completedStorageFactoryConfig.New()
if lastErr != nil {
return
}
if genericConfig.EgressSelector != nil {
storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
}
// 2、初始化 RESTOptionsGetter,后期根据其获取操作 Etcd 的句柄,同时添加 etcd 的健康检查方法
if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
return
}
// 3、设置使用 protobufs 用来内部交互,并且禁用压缩功能
genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
genericConfig.LoopbackClientConfig.DisableCompression = true
// 4、创建 clientset
kubeClientConfig := genericConfig.LoopbackClientConfig
clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
if err != nil {
lastErr = fmt.Errorf("failed to create real external clientset: %v", err)
return
}
versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)
// 5、创建认证实例,支持多种认证方式:请求 Header 认证、Auth 文件认证、CA 证书认证、Bearer token 认证、
// ServiceAccount 认证、BootstrapToken 认证、WebhookToken 认证等
genericConfig.Authentication.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, clientgoExternalClient, versionedInformers)
if err != nil {
lastErr = fmt.Errorf("invalid authentication config: %v", err)
return
}
// 6、创建鉴权实例,包含:Node、RBAC、Webhook、ABAC、AlwaysAllow、AlwaysDeny
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, versionedInformers)
......
serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
authInfoResolverWrapper := webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, genericConfig.LoopbackClientConfig)
// 7、审计插件的初始化
lastErr = s.Audit.ApplyTo(......)
if lastErr != nil {
return
}
// 8、准入插件的初始化
pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, serviceResolver)
if err != nil {
lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)
return
}
err = s.Admission.ApplyTo(......)
if err != nil {
lastErr = fmt.Errorf("failed to initialize admission: %v", err)
}
return
}
以上主要分析 KubeAPIServerConfig 的初始化,其他两个 server config 的初始化暂且不详细分析,下面接着继续分析 server 的初始化。
createAPIExtensionsServer
APIExtensionsServer 是最先被初始化的,在 createAPIExtensionsServer 中调用 apiextensionsConfig.Complete().New 来完成 server 的初始化,其主要逻辑为:
- 1、首先调用
c.GenericConfig.New按照go-restful的模式初始化 Container,在c.GenericConfig.New中会调用NewAPIServerHandler初始化 handler,APIServerHandler 包含了 API Server 使用的多种http.Handler 类型,包括go-restful以及non-go-restful,以及在以上两者之间选择的 Director 对象,go-restful用于处理已经注册的 handler,non-go-restful用来处理不存在的 handler,API URI 处理的选择过程为:FullHandlerChain-> Director ->{GoRestfulContainer, NonGoRestfulMux}。在c.GenericConfig.New中还会调用installAPI来添加包括/、/debug/*、/metrics、/version等路由信息。三种 server 在初始化时首先都会调用c.GenericConfig.New来初始化一个 genericServer,然后进行 API 的注册; - 2、调用
s.GenericAPIServer.InstallAPIGroup在路由中注册 API Resources,此方法的调用链非常深,主要是为了将需要暴露的 API Resource 注册到 server 中,以便能通过 http 接口进行 resource 的 REST 操作,其他几种 server 在初始化时也都会执行对应的InstallAPI; - 3、初始化 server 中需要使用的 controller,主要有
openapiController、crdController、namingController、establishingController、nonStructuralSchemaController、apiApprovalController、finalizingController; - 4、将需要启动的 controller 以及 informer 添加到 PostStartHook 中;
k8s.io/kubernetes/cmd/kube-apiserver/app/apiextensions.go:94
func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (* apiextensionsapiserver.CustomResourceDefinitions, error) {
return apiextensionsConfig.Complete().New(delegateAPIServer)
}
k8s.io/kubernetes/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go:132
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
// 1、初始化 genericServer
genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
if err != nil {
return nil, err
}
s := &CustomResourceDefinitions{
GenericAPIServer: genericServer,
}
// 2、初始化 APIGroup Info,APIGroup 指该 server 需要暴露的 API
apiResourceConfig := c.GenericConfig.MergedResourceConfig
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) {
storage := map[string]rest.Storage{}
customResourceDefintionStorage := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
storage["customresourcedefinitions"] = customResourceDefintionStorage
storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefintionStorage)
apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = storage
}
if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) {
......
}
// 3、注册 APIGroup
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
// 4、初始化需要使用的 controller
crdClient, err := internalclientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
if err != nil {
return nil, fmt.Errorf("failed to create clientset: %v", err)
}
s.Informers = internalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)
......
establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().InternalVersion(). CustomResourceDefinitions(), crdClient.Apiextensions())
crdHandler, err := NewCustomResourceDefinitionHandler(......)
if err != nil {
return nil, err
}
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
crdController := NewDiscoveryController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
namingController := status.NewNamingConditionController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().InternalVersion(). CustomResourceDefinitions(), crdClient.Apiextensions())
apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions(). InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
finalizingController := finalizer.NewCRDFinalizer(
s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
crdClient.Apiextensions(),
crdHandler,
)
var openapiController *openapicontroller.Controller
if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourcePublishOpenAPI) {
openapiController = openapicontroller.NewController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions())
}
// 5、将 informer 以及 controller 添加到 PostStartHook 中
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
s.Informers.Start(context.StopCh)
return nil
})
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
......
go crdController.Run(context.StopCh)
go namingController.Run(context.StopCh)
go establishingController.Run(context.StopCh)
go nonStructuralSchemaController.Run(5, context.StopCh)
go apiApprovalController.Run(5, context.StopCh)
go finalizingController.Run(5, context.StopCh)
return nil
})
s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {
return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
return s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions().Informer().HasSynced(), nil
}, context.StopCh)
})
return s, nil
}
以上是 APIExtensionsServer 的初始化流程,其中最核心方法是 s.GenericAPIServer.InstallAPIGroup,也就是 API 的注册过程,三种 server 中 API 的注册过程都是其核心。
CreateKubeAPIServer
本节继续分析 KubeAPIServer 的初始化,在CreateKubeAPIServer 中调用了 kubeAPIServerConfig.Complete().New 来完成相关的初始化操作。
kubeAPIServerConfig.Complete().New
主要逻辑为:
- 1、调用
c.GenericConfig.New初始化 GenericAPIServer,其主要实现在上文已经分析过; - 2、判断是否支持 logs 相关的路由,如果支持,则添加
/logs路由; - 3、调用
m.InstallLegacyAPI将核心 API Resource 添加到路由中,对应到 apiserver 就是以/api开头的 resource; - 4、调用
m.InstallAPIs将扩展的 API Resource 添加到路由中,在 apiserver 中即是以/apis开头的 resource;
k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:214
func CreateKubeAPIServer(......) (*master.Master, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
if err != nil {
return nil, err
}
kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-admission-initializer", admissionPostStartHook)
return kubeAPIServer, nil
}
k8s.io/kubernetes/pkg/master/master.go:325
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
......
// 1、初始化 GenericAPIServer
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
if err != nil {
return nil, err
}
// 2、注册 logs 相关的路由
if c.ExtraConfig.EnableLogsSupport {
routes.Logs{}.Install(s.Handler.GoRestfulContainer)
}
m := &Master{
GenericAPIServer: s,
}
// 3、安装 LegacyAPI
if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
StorageFactory: c.ExtraConfig.StorageFactory,
ProxyTransport: c.ExtraConfig.ProxyTransport,
......
}
if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
return nil, err
}
}
restStorageProviders := []RESTStorageProvider{
auditregistrationrest.RESTStorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig. Authentication.APIAudiences},
......
}
// 4、安装 APIs
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
return nil, err
}
if c.ExtraConfig.Tunneler != nil {
m.installTunneler(c.ExtraConfig.Tunneler, corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes())
}
m.GenericAPIServer.AddPostStartHookOrDie("ca-registration", c.ExtraConfig.ClientCARegistrationHook.PostStartHook)
return m, nil
}
m.InstallLegacyAPI
此方法的主要功能是将 core API 注册到路由中,是 apiserver 初始化流程中最核心的方法之一,不过其调用链非常深,下面会进行深入分析。将 API 注册到路由其最终的目的就是对外提供 RESTful API 来操作对应 resource,注册 API 主要分为两步,第一步是为 API 中的每个 resource 初始化 RESTStorage 以此操作后端存储中数据的变更,第二步是为每个 resource 根据其 verbs 构建对应的路由。m.InstallLegacyAPI 的主要逻辑为:
- 1、调用
legacyRESTStorageProvider.NewLegacyRESTStorage为 LegacyAPI 中各个资源创建 RESTStorage,RESTStorage 的目的是将每种资源的访问路径及其后端存储的操作对应起来; - 2、初始化
bootstrap-controller,并将其加入到 PostStartHook 中,bootstrap-controller是 apiserver 中的一个 controller,主要功能是创建系统所需要的一些 namespace 以及创建 kubernetes service 并定期触发对应的 sync 操作,apiserver 在启动后会通过调用 PostStartHook 来启动bootstrap-controller; - 3、在为资源创建完 RESTStorage 后,调用
m.GenericAPIServer.InstallLegacyAPIGroup为 APIGroup 注册路由信息,InstallLegacyAPIGroup方法的调用链非常深,主要为InstallLegacyAPIGroup--> installAPIResources --> InstallREST --> Install --> registerResourceHandlers,最终核心的路由构造在registerResourceHandlers方法内,该方法比较复杂,其主要功能是通过上一步骤构造的 REST Storage 判断该资源可以执行哪些操作(如 create、update等),将其对应的操作存入到 action 中,每一个 action 对应一个标准的 REST 操作,如 create 对应的 action 操作为 POST、update 对应的 action 操作为PUT。最终根据 actions 数组依次遍历,对每一个操作添加一个 handler 方法,注册到 route 中去,再将 route 注册到 webservice 中去,webservice 最终会注册到 container 中,遵循 go-restful 的设计模式;
关于 legacyRESTStorageProvider.NewLegacyRESTStorage 以及 m.GenericAPIServer.InstallLegacyAPIGroup 方法的详细说明在后文中会继续进行讲解。
k8s.io/kubernetes/pkg/master/master.go:406
func (m *Master) InstallLegacyAPI(......) error {
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
if err != nil {
return fmt.Errorf("Error building core storage: %v", err)
}
controllerName := "bootstrap-controller"
coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("Error in registering group versions: %v", err)
}
return nil
}
InstallAPIs 与 InstallLegacyAPI 的主要流程是类似的,限于篇幅此处不再深入分析。
createAggregatorServer
AggregatorServer 主要用于自定义的聚合控制器的,使 CRD 能够自动注册到集群中。
主要逻辑为:
- 1、调用
aggregatorConfig.Complete().NewWithDelegate创建 aggregatorServer; - 2、初始化
crdRegistrationController和autoRegistrationController,crdRegistrationController负责注册 CRD,autoRegistrationController负责将 CRD 对应的 APIServices 自动注册到 apiserver 中,CRD 创建后可通过$ kubectl get apiservices查看是否注册到 apiservices 中; - 3、将
autoRegistrationController和crdRegistrationController加入到 PostStartHook 中;
k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go:124
func createAggregatorServer(......) (*aggregatorapiserver.APIAggregator, error) {
// 1、初始化 aggregatorServer
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
if err != nil {
return nil, err
}
// 2、初始化 auto-registration controller
apiRegistrationClient, err := apiregistrationclient.NewForConfig(aggregatorConfig.GenericConfig.LoopbackClientConfig)
if err != nil {
return nil, err
}
autoRegistrationController := autoregister.NewAutoRegisterController(......)
apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)
crdRegistrationController := crdregistration.NewCRDRegistrationController(......)
err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
go crdRegistrationController.Run(5, context.StopCh)
go func() {
if aggregatorConfig.GenericConfig.MergedResourceConfig.AnyVersionForGroupEnabled("apiextensions.k8s.io") {
crdRegistrationController.WaitForInitialSync()
}
autoRegistrationController.Run(5, context.StopCh)
}()
return nil
})
if err != nil {
return nil, err
}
err = aggregatorServer.GenericAPIServer.AddBootSequenceHealthChecks(
makeAPIServiceAvailableHealthCheck(
"autoregister-completion",
apiServices,
aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(),
),
)
if err != nil {
return nil, err
}
return aggregatorServer, nil
}
aggregatorConfig.Complete().NewWithDelegate
aggregatorConfig.Complete().NewWithDelegate 是初始化 aggregatorServer 的方法,主要逻辑为:
- 1、调用
c.GenericConfig.New初始化 GenericAPIServer,其内部的主要功能在上文已经分析过; - 2、调用
apiservicerest.NewRESTStorage为 APIServices 资源创建 RESTStorage,RESTStorage 的目的是将每种资源的访问路径及其后端存储的操作对应起来; - 3、调用
s.GenericAPIServer.InstallAPIGroup为 APIGroup 注册路由信息;
k8s.io/kubernetes/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go:158
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
openAPIConfig := c.GenericConfig.OpenAPIConfig
c.GenericConfig.OpenAPIConfig = nil
// 1、初始化 genericServer
genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
if err != nil {
return nil, err
}
apiregistrationClient, err := clientset.NewForConfig(c.GenericConfig.LoopbackClientConfig)
if err != nil {
return nil, err
}
informerFactory := informers.NewSharedInformerFactory(
apiregistrationClient,
5*time.Minute,
)
s := &APIAggregator{
GenericAPIServer: genericServer,
delegateHandler: delegationTarget.UnprotectedHandler(),
......
}
// 2、为 API 注册路由
apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter)
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
// 3、初始化 apiserviceRegistrationController、availableController
apisHandler := &apisHandler{
codecs: aggregatorscheme.Codecs,
lister: s.lister,
}
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler)
apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().V1().APIServices(), s)
availableController, err := statuscontrollers.NewAvailableConditionController(
......
)
if err != nil {
return nil, err
}
// 4、添加 PostStartHook
s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
informerFactory.Start(context.StopCh)
c.GenericConfig.SharedInformerFactory.Start(context.StopCh)
return nil
})
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
go apiserviceRegistrationController.Run(context.StopCh)
return nil
})
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {
go availableController.Run(5, context.StopCh)
return nil
})
return s, nil
}
以上是对 AggregatorServer 初始化流程的分析,可以看出,在创建 APIExtensionsServer、KubeAPIServer 以及 AggregatorServer 时,其模式都是类似的,首先调用 c.GenericConfig.New 按照go-restful的模式初始化 Container,然后为 server 中需要注册的资源创建 RESTStorage,最后将 resource 的 APIGroup 信息注册到路由中。
至此,CreateServerChain 中流程已经分析完,其中的调用链如下所示:
|--> CreateNodeDialer
|
|--> CreateKubeAPIServerConfig
|
CreateServerChain --|--> createAPIExtensionsConfig
|
| |--> c.GenericConfig.New
|--> createAPIExtensionsServer --> apiextensionsConfig.Complete().New --|
| |--> s.GenericAPIServer.InstallAPIGroup
|
| |--> c.GenericConfig.New --> legacyRESTStorageProvider.NewLegacyRESTStorage
| |
|--> CreateKubeAPIServer --> kubeAPIServerConfig.Complete().New --|--> m.InstallLegacyAPI
| |
| |--> m.InstallAPIs
|
|
|--> createAggregatorConfig
|
| |--> c.GenericConfig.New
| |
|--> createAggregatorServer --> aggregatorConfig.Complete().NewWithDelegate --|--> apiservicerest.NewRESTStorage
|
|--> s.GenericAPIServer.InstallAPIGroup
prepared.Run
在 Run 方法中首先调用 CreateServerChain 完成各 server 的初始化,然后调用 server.PrepareRun 完成服务启动前的准备工作,最后调用 prepared.Run 方法来启动安全的 http server。server.PrepareRun 主要完成了健康检查、存活检查和OpenAPI路由的注册工作,下面继续分析 prepared.Run 的流程,在 prepared.Run 中主要调用 s.NonBlockingRun 来完成启动工作。
k8s.io/kubernetes/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go:269
func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error {
return s.runnable.Run(stopCh)
}
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go:316
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
delayedStopCh := make(chan struct{})
go func() {
defer close(delayedStopCh)
<-stopCh
time.Sleep(s.ShutdownDelayDuration)
}()
// 调用 s.NonBlockingRun 完成启动流程
err := s.NonBlockingRun(delayedStopCh)
if err != nil {
return err
}
// 当收到退出信号后完成一些收尾工作
<-stopCh
err = s.RunPreShutdownHooks()
if err != nil {
return err
}
<-delayedStopCh
s.HandlerChainWaitGroup.Wait()
return nil
}
s.NonBlockingRun
s.NonBlockingRun 的主要逻辑为:
- 1、判断是否要启动审计日志服务;
- 2、调用
s.SecureServingInfo.Serve配置并启动 https server; - 3、执行 postStartHooks;
- 4、向 systemd 发送 ready 信号;
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go:351
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
auditStopCh := make(chan struct{})
// 1、判断是否要启动审计日志
if s.AuditBackend != nil {
if err := s.AuditBackend.Run(auditStopCh); err != nil {
return fmt.Errorf("failed to run the audit backend: %v", err)
}
}
// 2、启动 https server
internalStopCh := make(chan struct{})
var stoppedCh <-chan struct{}
if s.SecureServingInfo != nil && s.Handler != nil {
var err error
stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)
if err != nil {
close(internalStopCh)
close(auditStopCh)
return err
}
}
go func() {
<-stopCh
close(s.readinessStopCh)
close(internalStopCh)
if stoppedCh != nil {
<-stoppedCh
}
s.HandlerChainWaitGroup.Wait()
close(auditStopCh)
}()
// 3、执行 postStartHooks
s.RunPostStartHooks(stopCh)
// 4、向 systemd 发送 ready 信号
if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil {
klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
}
return nil
}
以上就是 server 的初始化以及启动流程过程的分析,上文已经提到各 server 初始化过程中最重要的就是 API Resource RESTStorage 的初始化以及路由的注册,由于该过程比较复杂,下文会单独进行讲述。
storageFactory 的构建
上文已经提到过,apiserver 最终实现的 handler 对应的后端数据是以 Store 的结构保存的,这里以 /api 开头的路由举例,通过NewLegacyRESTStorage方法创建各个资源的RESTStorage。RESTStorage 是一个结构体,具体的定义在k8s.io/apiserver/pkg/registry/generic/registry/store.go下,结构体内主要包含NewFunc返回特定资源信息、NewListFunc返回特定资源列表、CreateStrategy特定资源创建时的策略、UpdateStrategy更新时的策略以及DeleteStrategy删除时的策略等重要方法。在NewLegacyRESTStorage内部,可以看到创建了多种资源的 RESTStorage。
NewLegacyRESTStorage 的调用链为 CreateKubeAPIServer --> kubeAPIServerConfig.Complete().New --> m.InstallLegacyAPI --> legacyRESTStorageProvider.NewLegacyRESTStorage。
NewLegacyRESTStorage
一个 API Group 下的资源都有其 REST 实现,k8s.io/kubernetes/pkg/registry下所有的 Group 都有一个rest目录,存储的就是对应资源的 RESTStorage。在NewLegacyRESTStorage方法中,通过NewREST或者NewStorage会生成各种资源对应的 Storage,此处以 pod 为例进行说明。
k8s.io/kubernetes/pkg/registry/core/rest/storage_core.go:102
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver. APIGroupInfo, error) {
apiGroupInfo := genericapiserver.APIGroupInfo{
PrioritizedVersions: legacyscheme.Scheme.PrioritizedVersionsForGroup(""),
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
Scheme: legacyscheme.Scheme,
ParameterCodec: legacyscheme.ParameterCodec,
NegotiatedSerializer: legacyscheme.Codecs,
}
var podDisruptionClient policyclient.PodDisruptionBudgetsGetter
if policyGroupVersion := (schema.GroupVersion{Group: "policy", Version: "v1beta1"}); legacyscheme.Scheme. IsVersionRegistered(policyGroupVersion) {
var err error
podDisruptionClient, err = policyclient.NewForConfig(c.LoopbackClientConfig)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
}
// 1、LegacyAPI 下的 resource RESTStorage 的初始化
restStorage := LegacyRESTStorage{}
podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
......
endpointsStorage, err := endpointsstore.NewREST(restOptionsGetter)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
// 2、pod RESTStorage 的初始化
podStorage, err := podstore.NewStorage(......)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
......
serviceClusterIPAllocator, err := ipallocator.NewAllocatorCIDRRange(&serviceClusterIPRange, func(max int, rangeSpec string) (allocator. Interface, error) {
......
})
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster IP allocator: %v", err)
}
restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry
var secondaryServiceClusterIPAllocator ipallocator.Interface
if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) && c.SecondaryServiceIPRange.IP != nil {
......
}
var serviceNodePortRegistry rangeallocation.RangeRegistry
serviceNodePortAllocator, err := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string) (allocator.Interface, error) {
......
})
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster port allocator: %v", err)
}
restStorage.ServiceNodePortAllocator = serviceNodePortRegistry
controllerStorage, err := controllerstore.NewStorage(restOptionsGetter)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
serviceRest, serviceRestProxy := servicestore.NewREST(......)
// 3、restStorageMap 保存 resource http path 与 RESTStorage 对应关系
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
"pods/portforward": podStorage.PortForward,
"pods/proxy": podStorage.Proxy,
......
"componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
}
......
}
podstore.NewStorage
podstore.NewStorage 是为 pod 生成 storage 的方法,该方法主要功能是为 pod 创建后端存储最终返回一个 RESTStorage 对象,其中调用 store.CompleteWithOptions 来创建后端存储的。
k8s.io/kubernetes/pkg/registry/core/pod/storage/storage.go:71
func NewStorage(......) (PodStorage, error) {
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &api.Pod{} },
NewListFunc: func() runtime.Object { return &api.PodList{} },
......
}
options := &generic.StoreOptions{
RESTOptions: optsGetter,
AttrFunc: pod.GetAttrs,
TriggerFunc: map[string]storage.IndexerFunc{"spec.nodeName": pod.NodeNameTriggerFunc},
}
// 调用 store.CompleteWithOptions
if err := store.CompleteWithOptions(options); err != nil {
return PodStorage{}, err
}
statusStore := *store
statusStore.UpdateStrategy = pod.StatusStrategy
ephemeralContainersStore := *store
ephemeralContainersStore.UpdateStrategy = pod.EphemeralContainersStrategy
bindingREST := &BindingREST{store: store}
// PodStorage 对象
return PodStorage{
Pod: &REST{store, proxyTransport},
Binding: &BindingREST{store: store},
LegacyBinding: &LegacyBindingREST{bindingREST},
Eviction: newEvictionStorage(store, podDisruptionBudgetClient),
Status: &StatusREST{store: &statusStore},
EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore},
Log: &podrest.LogREST{Store: store, KubeletConn: k},
Proxy: &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
Exec: &podrest.ExecREST{Store: store, KubeletConn: k},
Attach: &podrest.AttachREST{Store: store, KubeletConn: k},
PortForward: &podrest.PortForwardREST{Store: store, KubeletConn: k},
}, nil
}
可以看到最终返回的对象里对 pod 的不同操作都是一个 REST 对象,REST 中自动集成了 genericregistry.Store 对象,而 store.CompleteWithOptions 方法就是对 genericregistry.Store 对象中存储实例就行初始化的。
type REST struct {
*genericregistry.Store
proxyTransport http.RoundTripper
}
type BindingREST struct {
store *genericregistry.Store
}
......
store.CompleteWithOptions
store.CompleteWithOptions 主要功能是为 store 中的配置设置一些默认的值以及根据提供的 options 更新 store,其中最主要的就是初始化 store 的后端存储实例。
在CompleteWithOptions方法内,调用了options.RESTOptions.GetRESTOptions 方法,其最终返回generic.RESTOptions 对象,generic.RESTOptions 对象中包含对 etcd 初始化的一些配置、数据序列化方法以及对 etcd 操作的 storage.Interface 对象。其会依次调用StorageWithCacher-->NewRawStorage-->Create方法创建最终依赖的后端存储。
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go:1192
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
......
var isNamespaced bool
switch {
case e.CreateStrategy != nil:
isNamespaced = e.CreateStrategy.NamespaceScoped()
case e.UpdateStrategy != nil:
isNamespaced = e.UpdateStrategy.NamespaceScoped()
default:
return fmt.Errorf("store for %s must have CreateStrategy or UpdateStrategy set", e.DefaultQualifiedResource.String())
}
......
// 1、调用 options.RESTOptions.GetRESTOptions
opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
if err != nil {
return err
}
// 2、设置 ResourcePrefix
prefix := opts.ResourcePrefix
if !strings.HasPrefix(prefix, "/") {
prefix = "/" + prefix
}
if prefix == "/" {
return fmt.Errorf("store for %s has an invalid prefix %q", e.DefaultQualifiedResource.String(), opts.ResourcePrefix)
}
if e.KeyRootFunc == nil && e.KeyFunc == nil {
......
}
keyFunc := func(obj runtime.Object) (string, error) {
......
}
// 3、以下操作主要是将 opts 对象中的值赋值到 store 对象中
if e.DeleteCollectionWorkers == 0 {
e.DeleteCollectionWorkers = opts.DeleteCollectionWorkers
}
e.EnableGarbageCollection = opts.EnableGarbageCollection
if e.ObjectNameFunc == nil {
......
}
if e.Storage.Storage == nil {
e.Storage.Codec = opts.StorageConfig.Codec
var err error
e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
opts.StorageConfig,
prefix,
keyFunc,
e.NewFunc,
e.NewListFunc,
attrFunc,
options.TriggerFunc,
)
if err != nil {
return err
}
e.StorageVersioner = opts.StorageConfig.EncodeVersioner
if opts.CountMetricPollPeriod > 0 {
stopFunc := e.startObservingCount(opts.CountMetricPollPeriod)
previousDestroy := e.DestroyFunc
e.DestroyFunc = func() {
stopFunc()
if previousDestroy != nil {
previousDestroy()
}
}
}
}
return nil
}
options.RESTOptions 是一个 interface,想要找到其 GetRESTOptions 方法的实现必须知道 options.RESTOptions 初始化时对应的实例,其初始化是在 CreateKubeAPIServerConfig --> buildGenericConfig --> s.Etcd.ApplyWithStorageFactoryTo 方法中进行初始化的,RESTOptions 对应的实例为 StorageFactoryRestOptionsFactory,所以 PodStorage 初始时构建的 store 对象中genericserver.Config.RESTOptionsGetter 实际的对象类型为 StorageFactoryRestOptionsFactory,其 GetRESTOptions 方法如下所示:
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go:253
func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
storageConfig, err := f.StorageFactory.NewConfig(resource)
if err != nil {
return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
}
ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
}
if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
if err != nil {
return generic.RESTOptions{}, err
}
cacheSize, ok := sizes[resource]
if !ok {
cacheSize = f.Options.DefaultWatchCacheSize
}
// 调用 generic.StorageDecorator
ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
}
return ret, nil
}
在 genericregistry.StorageWithCacher 中又调用了不同的方法最终会调用 factory.Create 来初始化存储实例,其调用链为:genericregistry.StorageWithCacher --> generic.NewRawStorage --> factory.Create。
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go:30
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case "etcd2":
return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
// 目前 k8s 只支持使用 etcd v3
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}
newETCD3Storage
在 newETCD3Storage 中,首先通过调用 newETCD3Client 创建 etcd 的 client,client 的创建最终是通过 etcd 官方提供的客户端工具 clientv3 进行创建的。
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go:209
func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval)
if err != nil {
return nil, nil, err
}
client, err := newETCD3Client(c.Transport)
if err != nil {
stopCompactor()
return nil, nil, err
}
var once sync.Once
destroyFunc := func() {
once.Do(func() {
stopCompactor()
client.Close()
})
}
transformer := c.Transformer
if transformer == nil {
transformer = value.IdentityTransformer
}
return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
}
至此对于 pod resource 中 store 的构建基本分析完成,不同 resource 对应一个 REST 对象,其中又引用了 genericregistry.Store 对象,最终是对 genericregistry.Store 的初始化。在分析完 store 的初始化后还有一个重要的步骤就是路由的注册,路由注册主要的流程是为 resource 根据不同 verbs 构建 http path 以及将 path 与对应 handler 进行绑定。
路由注册
上文 RESTStorage 的构建对应的是 InstallLegacyAPI 中的 legacyRESTStorageProvider.NewLegacyRESTStorage 方法,下面继续分析 InstallLegacyAPI 中的 m.GenericAPIServer.InstallLegacyAPIGroup 方法的实现。
k8s.io/kubernetes/pkg/master/master.go:406
func (m *Master) InstallLegacyAPI(......) error {
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
if err != nil {
return fmt.Errorf("Error building core storage: %v", err)
}
......
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("Error in registering group versions: %v", err)
}
return nil
}
m.GenericAPIServer.InstallLegacyAPIGroup 的调用链非常深,最终是为 Group 下每一个 API resources 注册 handler 及路由信息,其调用链为:m.GenericAPIServer.InstallLegacyAPIGroup --> s.installAPIResources --> apiGroupVersion.InstallREST --> installer.Install --> a.registerResourceHandlers。其中几个方法的作用如下所示:
s.installAPIResources:为每一个 API resource 调用apiGroupVersion.InstallREST添加路由;apiGroupVersion.InstallREST:将restful.WebServic对象添加到 container 中;installer.Install:返回最终的restful.WebService对象
a.registerResourceHandlers
该方法实现了 rest.Storage 到 restful.Route 的转换,其首先会判断 API Resource 所支持的 REST 接口,然后为 REST 接口添加对应的 handler,最后将其注册到路由中。
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go:181
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {
admit := a.group.Admit
......
// 1、判断该 resource 实现了哪些 REST 操作接口,以此来判断其支持的 verbs 以便为其添加路由
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)
getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
updater, isUpdater := storage.(rest.Updater)
patcher, isPatcher := storage.(rest.Patcher)
watcher, isWatcher := storage.(rest.Watcher)
connecter, isConnecter := storage.(rest.Connecter)
storageMeta, isMetadata := storage.(rest.StorageMetadata)
storageVersionProvider, isStorageVersionProvider := storage.(rest.StorageVersionProvider)
if !isMetadata {
storageMeta = defaultStorageMetadata{}
}
exporter, isExporter := storage.(rest.Exporter)
if !isExporter {
exporter = nil
}
......
// 2、为 resource 添加对应的 actions 并根据是否支持 namespace
switch {
case !namespaceScoped:
......
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)
actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
if getSubpath {
actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
}
actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)
actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)
default:
......
actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)
actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
......
}
// 3、根据 action 创建对应的 route
kubeVerbs := map[string]struct{}{}
reqScope := handlers.RequestScope{
Serializer: a.group.Serializer,
ParameterCodec: a.group.ParameterCodec,
Creater: a.group.Creater,
Convertor: a.group.Convertor,
......
}
......
// 4、从 rest.Storage 到 restful.Route 映射
// 为每个操作添加对应的 handler
for _, action := range actions {
......
verbOverrider, needOverride := storage.(StorageMetricsOverride)
switch action.Verb {
case "GET": ......
case "LIST":
case "PUT":
case "PATCH":
// 此处以 POST 操作进行说明
case "POST":
var handler restful.RouteFunction
// 5、初始化 handler
if isNamedCreater {
handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
} else {
handler = restfulCreateResource(creater, reqScope, admit)
}
handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)
article := GetArticleForNoun(kind, " ")
doc := "create" + article + kind
if isSubresource {
doc = "create " + subresource + " of" + article + kind
}
// 6、route 与 handler 进行绑定
route := ws.POST(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
Returns(http.StatusCreated, "Created", producedObject).
Returns(http.StatusAccepted, "Accepted", producedObject).
Reads(defaultVersionedObject).
Writes(producedObject)
if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil {
return nil, err
}
addParams(route, action.Params)
// 7、添加到路由中
routes = append(routes, route)
case "DELETE":
case "DELETECOLLECTION":
case "WATCH":
case "WATCHLIST":
case "CONNECT":
default:
}
......
return &apiResource, nil
}
restfulCreateNamedResource
restfulCreateNamedResource 是 POST 操作对应的 handler,最终会调用 createHandler 方法完成。
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go:1087
func restfulCreateNamedResource(r rest.NamedCreater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.CreateNamedResource(r, &scope, admit)(res.ResponseWriter, req.Request)
}
}
func CreateNamedResource(r rest.NamedCreater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
return createHandler(r, scope, admission, true)
}
createHandler
createHandler 是将数据写入到后端存储的方法,对于资源的操作都有相关的权限控制,在 createHandler 中首先会执行 decoder 和 admission 操作,然后调用 create 方法完成 resource 的创建,在 create 方法中会进行 validate 以及最终将数据保存到后端存储中。admit 操作即执行 kube-apiserver 中的 admission-plugins,admission-plugins 在 CreateKubeAPIServerConfig 中被初始化为了 admissionChain,其初始化的调用链为 CreateKubeAPIServerConfig --> buildGenericConfig --> s.Admission.ApplyTo --> a.GenericAdmission.ApplyTo --> a.Plugins.NewFromPlugins,最终在 a.Plugins.NewFromPlugins 中将所有已启用的 plugins 封装为 admissionChain,此处要执行的 admit 操作即执行 admission-plugins 中的 admit 操作。
createHandler 中调用的 create 方法是genericregistry.Store 对象的方法,在每个 resource 初始化 RESTStorage 都会引入 genericregistry.Store 对象。
createHandler 中所有的操作就是本文开头提到的请求流程,如下所示:
v1beta1 ⇒ internal ⇒ | ⇒ | ⇒ v1 ⇒ json/yaml ⇒ etcd
admission validation
k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/create.go:46
func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
trace := utiltrace.New("Create", utiltrace.Field{"url", req.URL.Path})
defer trace.LogIfLong(500 * time.Millisecond)
......
gv := scope.Kind.GroupVersion()
// 1、得到合适的SerializerInfo
s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)
if err != nil {
scope.err(err, w, req)
return
}
// 2、找到合适的 decoder
decoder := scope.Serializer.DecoderToVersion(s.Serializer, scope.HubGroupVersion)
body, err := limitedReadBody(req, scope.MaxRequestBodyBytes)
if err != nil {
scope.err(err, w, req)
return
}
......
defaultGVK := scope.Kind
original := r.New()
trace.Step("About to convert to expected version")
// 3、decoder 解码
obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
......
ae := request.AuditEventFrom(ctx)
admit = admission.WithAudit(admit, ae)
audit.LogRequestObject(ae, obj, scope.Resource, scope.Subresource, scope.Serializer)
userInfo, _ := request.UserFrom(ctx)
if len(name) == 0 {
_, name, _ = scope.Namer.ObjectName(obj)
}
// 4、执行 admit 操作,即执行 kube-apiserver 启动时加载的 admission-plugins,
admissionAttributes := admission.NewAttributesRecord(......)
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
err = mutatingAdmission.Admit(ctx, admissionAttributes, scope)
if err != nil {
scope.err(err, w, req)
return
}
}
......
// 5、执行 create 操作
result, err := finishRequest(timeout, func() (runtime.Object, error) {
return r.Create(
ctx,
name,
obj,
rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
options,
)
})
......
}
}
总结
本文主要分析 kube-apiserver 的启动流程,kube-apiserver 中包含三个 server,分别为 KubeAPIServer、APIExtensionsServer 以及 AggregatorServer,三个 server 是通过委托模式连接在一起的,初始化过程都是类似的,首先为每个 server 创建对应的 config,然后初始化 http server,http server 的初始化过程为首先初始化 GoRestfulContainer,然后安装 server 所包含的 API,安装 API 时首先为每个 API Resource 创建对应的后端存储 RESTStorage,再为每个 API Resource 支持的 verbs 添加对应的 handler,并将 handler 注册到 route 中,最后将 route 注册到 webservice 中,启动流程中 RESTFul API 的实现流程是其核心,至于 kube-apiserver 中认证鉴权等 filter 的实现、多版本资源转换、kubernetes service 的实现等一些细节会在后面的文章中继续进行分析。
启动流程
在cmd/kube-apiserver/apiserver.go的main包中启动apiserver,使用options包中的NewServerRunOptions()函数初始化默认配置,并使用pflag包和AddFlags()方法通过命令行启动参数填充配置。
|
|
初始化完成后调用app包中的Run()函数启动实例,将创建的ServerRunOptions对象传入app.Run()中,并创建一个http server和一个https server。
|
|
该函数主要用于生成master实例对象,m, err := config.Complete().New() 用来创建master,Complete()完善Config的初始化,New()进行resources的初始化和RESTful-api的注册,各种api的请求最后都是通过master对象来处理的,在最后APIServer会通过启动Run(wait.NeverStop)的方法来启动HTTP/HTTPS服务。
|
|
在这个Run()中实际由s.serveSecurely(stopCh)和s.serveInsecurely(stopCh)分别运行了https和http server。
而实际上,s.serveSecurely(stopCh)和s.serveInsecurely(stopCh)中都会调用runServer()函数来运行http和https server,runServer()会监听传入的端口号,调用goroutine持续服务直到stopCH这个只读通道关闭。
|
|
所以整个apiserver的启动主体过程就是下图:
关键数据结构
type APIRegistrationManager struct
APIRegistrationManager负责对外提供已经注册并enable了的GroupVersions,将所有已经注册的,已经enable的,第三方的的GroupVersions进行了汇总,还包括了各个GroupVersion的GroupMeta
|
|
type RESTMapper interface
RESTMapper是一个接口,RESTMapper可以从GVR获取GVK,并生成一个RESTMapping来处理该GVR
|
|
type RESTMapping struct
RESTMapping包含一个Resource名称,及其对应的GVK,一个Scope(标明资源是否为root或者namespaced),一个Convertor用来转换该GVK对应的Object和一个MetadataAccessor用来提取Object的meta信息。
|
|
type RESTScope interface
RESTScope用于标识某个资源是处于Namespace下,还是全局资源
|
|
type ObjectConvertor interface
Convertor用来转换该GVK对应的Object
|
|
type MetadataAccessor interface
type MetadataAccessor interface可以让你在任何external version或者internal version中操作object和list这些metadata
|
|
多版本资源注册
初始化流程
在pkg/master/import_known_versions.go中会初始化所有group的install包
|
|
在所有的/install/install.go文件中,都会生成groupMeta,并向registered.DefaultAPIRegistrationManager注册。这个groupMeta中包含一个DefaultRESTMapper 以pkg/api/install/install.go中core group的install包为例:
|
|
最后一步的err := enableVersions(externalVersions)非常重要,完成了填充Scheme,初始化groupMeta的步骤:
|
|
- addVersionsToScheme:将所有的Versions添加到Scheme
- 生成一个groupMeta,即groupMeta的初始化
- registered.RegisterGroup(groupMeta),真正注册一个group
其中newRESTMapper(externalVersions)其实包含的是一种转换关系,resource到kind,kind到resource,kind到scope的转换。 RESTMapper映射是指GVR(GroupVersionResource)和GVK(GroupVersionKind)的关系,可以通过GVR找到合适的GVK
|
|
其中NewDefaultRESTMapper定义在pkg/api/mapper.go:
|
|
再调用NewDefaultRESTMapperFromScheme函数,它主要流程是:
- 先创建了一个空的
DefaultRESTMapper, - 然后根据"
/api/v1“的groupVersion(只举了其中的一个groupversion,所以可以依据defaultGroupVersions来区别DefaultRESTMapper), - 遍历Scheme中所有的kinds,
- 接着再调用
mapper.Add(gvk, scope)去填充这个mapper, - 最后返回该mapper。
|
|
其中的mapper.Add(gvk, scope)方法是把GVK(kind)和GVK对应的scope加入到DefaultRESTMapper对应的map属性中
type DefaultRESTMapper struct中字段的含义:
- defaultGroupVersions: 默认的GroupVersion,如v1,apps/v1beta1等,一般一个DefaultRESTMapper只设一个默认的GroupVersion
- resourceToKind:GVR(单数,复数)到GVK的map;
- kindToPluralResource:GVK到GVR(复数)的map;
- kindToScope:GVK到Scope的map;
- singularToPlural:GVR(单数)到GVR(复数)的map;
- interfacesFunc:用来产生Convertor和MetadataAccessor,具体实现为/pkg/api/install/install.go中的interfacesFor()函数。
- aliasToResource:用于将别名映射到资源
|
|
addVersionsToScheme:将所有的Versions添加到Scheme,Apiserver全局范围内,只有一个Scheme,即api.Scheme。 所有的GroupVersion受这个api.Scheme管理。所有的GroupVersion的Type都是往这个全局唯一的api.Scheme里面注册。
Scheme`定义在`pkg/api/register.go`,`NewScheme()`定义在`/pkg/runtime/scheme.go
var Scheme = runtime.NewScheme()
在Scheme的定义里面
- 一个Type,就是一个特定的Go Struct
- 一个Version,是该Type的特定表示的时间点标识符(通常向后兼容)
- 一个Kind,是一个Type在该Version中的唯一name
- 一个Group,标识了一组Versions, Kinds, and Types
- 一个Unversioned Type,是一种还没正式绑定到一个Type的Type,会被往后兼容
RESTMapper管理的是GVR和GVK的关系,Scheme管理的是GVK和Type的关系
在pkg/runtime/scheme.go中,定义了type Scheme struct
|
|
Kubernetes内部组件的流通的结构体值使用的是内部版本,所有的外部版本都要向内部版本进行转换; 内部版本必须转换成外部版本才能进行输出。 外部版本之间不能直接转换。 etcd中存储的是带有版本的数据
在addVersionsToScheme()函数中,主要就是向Scheme注册internal version和external version
|
|
func (c completedConfig) New()基于给定的配置生成一个新的Master实例,在这个方法中生成各版本资源对应的RESTful API。
func (c completedConfig) New()方法的流程如下:
- 调用
func (c completedConfig) New() (*GenericAPIServer, error),创建一type GenericAPIServer struct实例 - 判断是否enable了用于Watch的Cache,和etcd建立连接
- 调用
InstallLegacyAPI进行”/api“的API安装 - 调用
InstallAPIs进行”/apis“的API安装,如果其处于enabled状态
|
|
其中InstallLegacyAPI()进行了/api的安装
|
|
其中在NewLegacyRESTStorage方法中,会进行storage的创建,其流程如下:
- 生成一个type APIGroupInfo struct实例,这个和前面说的资源注册的
APIRegistrationManager、Scheme、GroupMeta...有关系。 - 初始化一个LegacyRESTStorage对象,即restStorage
- 创建各类Storage,如podStorage、nodeStorage..
- 把步骤3中创建的各种Storage保存到restStorageMap中,然后装在到APIGroupInfo中,APIGroupInfo.VersionedResourcesStorageMap[“v1”]。这是API映射map,这很重要,在后面的利用APIGroupInfo来生成APIGroupVersion的时候,就是依靠这个map映射关系来获取对应version的资源的rest strorage实现。
- return restStorage, APIGroupInfo
InstallLegacyAPIGroup()方法
|
|
其中installAPIResources()方法用于安装每个api groupversionresource的REST存储,基本流程如下:
- 遍历该Group的所有versions(一个Group调用一次本函数,亦即所有Group最后都是调用本函数来安装Restful API)
- 基于
apiGroupInfo,groupVersion,apiPrefix创建一个type APIGroupVersion struct对象 - 根据创建的
APIGroupVersion,然后安装restful API,apiGroupVersion.InstallREST
|
|
最后调用InstallREST()安装restful API,InstallREST()将REST handlers(storage, watch, proxy and redirect)注册到go-restful框架的Container中,流程如下:
- 创建了一个type APIInstaller struct对象
- 构造一个webservice
- 往webservice里面添加Route
- 往container中添加webservice
|
|
InstallAPIs()进行/apis的安装
|
|
s, err := c.Config.GenericConfig.SkipComplete().New()会根据config创建了一个GenericAPIServer实例
|
|
整个初始化流程:
initial.go中的初始化主要用internal version和external versions填充了Scheme,完成了APIRegistrationManager中GroupMeta的初始化。GroupMeta的主要用于后面的初始化APIGroupVersion- 初始化
groupMeta的时候会根据Scheme和externalVersions新建一个RESTMapper /pkg/registry/core/rest/storage_core.go中的NewLegacyRESTStorage基于上面的Scheme和GroupMeta生成了一个APIGroupInfo- 然后基于
APIGroupInfo生成一个APIGroupVersion - 基于
APIGroupVersion来生成Restful API
多版本资源的初始化调用图
资源注册成RESTful API调用图
kube-apiserver端List-Watch机制
apiserver针对集群中的每一类资源都会与etcd建立一个连接,获取该资源的opt,watch功能是其中一个opt。kubelet、kube-controller-manager、kube-scheduler需要监控各种资源的变化, 当这些对象发生变化时(add、delete、update),kube-apiserver能够主动通知这些组件。而apiserver端的Watch机制是建立在etcd的Watch基础上的。 etcd的watch是没有过滤功能的,而kube-apiserver增加了过滤功能,能将订阅方感兴趣的部分资源发给订阅方。
Event数据流向如下:
- 从etcd–>Cacher,是一个watchCache,存储apiserver从etcd那里watch到的对象。
- 结合etcd和Cacher的resourceVersion进行对比,形成一个WatchEvent,分发到各个观察者watcher中
在/pkg/storage/cacher.go中的func NewCacherFromConfig(),用来创建一个新的cacher,负责服务内部的watch-list缓存请求,并在后台更新缓存,流程如下:
- 新建一个watchCache,用来存储apiserver从etcd那里watch到的对象
- 新建一个listerWatcher
- 实例化一个type Cacher struct对象,其核心是reflector机制
- 启动dispatchEvents协程,分发event到各个订阅方
- cacher.startCaching(stopCh)
|
|
startCaching()方法流程如下:
- 首先会通过terminateAllWatchers注销所有的cachewatcher,因为这个时候apiserver还处于初始化阶段,因此不可能接受其他组件的watch,也就不可能有watcher。
- 然后调用c.reflector.ListAndWatch函数,完成前面说过的功能:reflector主要将apiserver组件从etcd中watch到的资源存储到watchCache中。
|
|
调用Reflector的ListAndWatch()
分析其流程,如下:
- 执行list操作
- 执行watch操作
- 调用
func (r *Reflector) watchHandler
|
|
其中的list, err := r.listerWatcher.List(options)和w, err := r.listerWatcher.Watch(options),真正调用的是etcdHelper的list watch方法
|
|
err := r.watchHandler是将event对象从channel outgoing中读取出来,而方法中调用的r.store.Add(event.Object)则是将event添加到cache中。
|
|
最后的processEvent()方法最终完成了Event从etcd流向Cache。
以上是完成了event的生产过程,最终event都要流向消费它的订阅方,在上面的代码中dispatchEvents()就是那个分发event的方法。
|
|
继续调用dispatchEvent()方法,将event分发给所有的watcher
|
|
其中的watcher.add(event, &timeout)方法,把event分发到一个type cacheWatcher struct
|
|
到这里Event已经分发到了各个订阅者的watcher中了,后续各个Watcher组件会从channel input中获取到event
kube-apiserver初始化时,建立对etcd的连接,并对etcd进行watch,将watch的结果存入watchCache。 当其他组件需要watch资源时,其他组件向apiserver发送一个watch请求,这个请求是可以带filter函数的。 apiserver针对这个请求会创建一个watcher,并基于watcher创建WatchServer。 watchCache watch的对象,首先会通过filter函数的过滤,假如过滤通过的话,则会通过WatcherServer发送给订阅组件。
整个list-watch过程的调用链
-
No backlinks found.