Controller Manager
Controller Manager作为集群的管理控制中心,维护集群中的所有控制器,对维持集群的稳定和自我修复,实现高可用,副本控制等起关键作用。
内部结构图
关键性调用链
源码分析过程
组件启动的入口
|
|
读取配置文件,进行配置读取和初始化默认配置
位置: k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go ->NewControllerManagerCommand
- 初始化Controller-manager的配置选项结构:NewKubeControllerManagerOptions()
- 创建执行命令结构包括Use,Long,和Run:cmd := &cobra.Command{
- 解析配置文件: s.AddFlags 1.KnownControllers()获取所有controller 2.将配置文件中的配置选项注入到配置对象中 3.同时将controller需要的参数写入.
|
|
组件启动执行
从main中的command.Execute()到4.2中构造的Run 位置: k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go //加载所有控制器,并将对应参数注入到控制器中
|
|
位置: k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go KnownControllers()中的NewControllerInitializers初始化所有的控制器
|
|
位置: k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go 真正进入执行
- 启动controller-manager的http服务和对应处理器,包括安全和非安全:BuildHandlerChain
- 构造run的执行体
- 需要选主的情况,选主完执行run;不需要选主的直接执行run,然后panic
// Run runs the KubeControllerManagerOptions. This should never exit.
func Run(c *config.CompletedConfig) error {
// To help debugging, immediately log version
glog.Infof("Version: %+v", version.Get())
if cfgz, err := configz.New("componentconfig"); err == nil {
cfgz.Set(c.ComponentConfig)
} else {
glog.Errorf("unable to register configz: %c", err)
}
// Start the controller manager HTTP server
stopCh := make(chan struct{})
if c.SecureServing != nil {
handler := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Debugging)
handler = genericcontrollermanager.BuildHandlerChain(handler, &c.Authorization, &c.Authentication)
if err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
return err
}
}
if c.InsecureServing != nil {
handler := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Debugging)
handler = genericcontrollermanager.BuildHandlerChain(handler, &c.Authorization, &c.Authentication)
if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
return err
}
}
run := func(stop <-chan struct{}) {
rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig,
}
var clientBuilder controller.ControllerClientBuilder
if c.ComponentConfig.KubeCloudShared.UseServiceAccountCredentials {
if len(c.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
// It'c possible another controller process is creating the tokens for us.
// If one isn't, we'll timeout and exit when our client builder is unable to create the tokens.
glog.Warningf("--use-service-account-credentials was specified without providing a --service-account-private-key-file")
}
clientBuilder = controller.SAControllerClientBuilder{
ClientConfig: restclient.AnonymousClientConfig(c.Kubeconfig),
CoreClient: c.Client.CoreV1(),
AuthenticationClient: c.Client.AuthenticationV1(),
Namespace: "kube-system",
}
} else {
clientBuilder = rootClientBuilder
}
ctx, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, stop)
if err != nil {
glog.Fatalf("error building controller context: %v", err)
}
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
//启动控制器
if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers(ctx.LoopMode)); err != nil {
glog.Fatalf("error starting controllers: %v", err)
}
ctx.InformerFactory.Start(ctx.Stop)
close(ctx.InformersStarted)
select {}
}
//note 如果未启用选主(只是单节点),直接启动,并且panic,不在往下走,因为run内部有select挂起
if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect {
run(wait.NeverStop)
panic("unreachable")
}
id, err := os.Hostname()
if err != nil {
return err
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
//生成唯一ID,相当于进程锁
id = id + "_" + string(uuid.NewUUID())
rl, err := resourcelock.New(c.ComponentConfig.GenericComponent.LeaderElection.ResourceLock,
"kube-system",
"kube-controller-manager",
c.LeaderElectionClient.CoreV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: c.EventRecorder,
})
if err != nil {
glog.Fatalf("error creating lock: %v", err)
}
//进行选主,并在选为主节点后执行run
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.GenericComponent.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.GenericComponent.LeaderElection.RenewDeadline.Duration,
RetryPeriod: c.ComponentConfig.GenericComponent.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
//选主完成后执行
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leaderelection lost")
},
},
})
panic("unreachable")
}
转到run内部核心的三个动作 :CreateControllerContext 、 StartControllers和ctx.InformerFactory.Start
CreateControllerContext
位置: k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go
- 拿到对kube-APIserver中资源的操作句柄
- 确认Kube-APIServer的健康(最多等待10s),然后拿获取连接
- 创建控制器上下文
|
|
StartControllers
位置: k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go 启动初始化的所有控制器
|
|
ctx.InformerFactory.Start
controller-manager中的informer开始启动监听资源的事件,将事件放到自己的队列中(具有限流特性)。处理进程从队列总获取事件开始进行任务处理。
将新建的ReplicaSet,放入队列
// obj could be an *apps.ReplicaSet, or a DeletionFinalStateUnknown marker item.
func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
rsc.queue.Add(key)
}
从队列中获取对象进行处理(具体过程见下方)
func (rsc *ReplicaSetController) processNextWorkItem() bool {
key, quit := rsc.queue.Get()
if quit {
return false
}
defer rsc.queue.Done(key)
err := rsc.syncHandler(key.(string))
if err == nil {
rsc.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
rsc.queue.AddRateLimited(key)
return true
}
以startReplicaSetController为例
在StartControllers中initFn方法是NewControllerInitializers中初始化Controller是定义,以下主要看下startReplicaSetController。 位置: k8s.io/kubernetes/cmd/kube-controller-manager/app/apps.go 其中NewReplicaSetController主要是初始化ReplicaSetController的结构,包括apiserver的客户端,informer的回调函数等等。NewReplicaSetController->NewBaseController
|
|
**关键函数run:**k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go run中执行rsc.worker。
|
|
rsc.worker即为rsc.syncHandler,而syncHandler在创建时来源于rsc.syncReplicaSet(见NewBaseController方法) 那么我们转到syncReplicaSet **位置:**k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go **updateReplicaSetStatus:**在pod死亡或者新建时更新
|
|
**转到updateReplicaSetStatus:**k8s.io/kubernetes/pkg/controller/replicaset/replica_set_utils.go 调用UpdateStatus,通过apiserver更新
|
|
PodGCController
1.gc掉超过阈值限制的pod,按时间排序gc
|
|
2.gc掉孤儿pod:pod上的node信息不在当前可调度的节点上,即没有和有效node绑定
|
|
3.gc掉没有调度成功的pod:表现在pod的NodeName为空,主要由于资源等条件不满足
|
|
参考资料
-
No backlinks found.