Kubelet 作为 k8s 集群中 Node 上的关键组件,在每个 Node 上以 Agent 进程的形式运行,负责管理 Pod 和其中容器的生命周期。Kubelet 主要功能是定时从某个地方获取节点上 Pod/container 的期望状态,并调用容器平台接口达到这个状态。本文将作为 Kubelet 分析的开篇,介绍 Kubelet 的主要功能和实现原理。

Overview

下图展示了 kubelet 内部组件结构,Kubelet 由许多内部组件构成

  • Kubelet API,包括 10250 端口的认证 API、4194 端口的 cAdvisor API、10255 端口的只读 API 以及 10248 端口的健康检查 API
  • syncLoop:从 API 或者 manifest 目录接收 Pod 更新,发送到 PodWorkers 处理,大量使用 channel 处理来处理异步请求
  • 辅助的 manager,如 cAdvisor、kubelet-pleg、Volume Manager 等,处理 syncLoop 以外的其他工作
  • CRI:容器执行引擎接口,负责与 container runtime shim 通信,关于 CRI 的更多内容可以参考 CRI
  • 容器执行引擎,如 dockershim、rkt 等
  • 网络插件,目前支持 CNI 和 kubenet

Source

Kubelet 基于 PodSpec 来工作,它需要确保这些 PodSpec 中描述的容器处于运行状态且运行状况良好,因此 Kubelet 不管理那些不是由 k8s 创建的容器。PodSpec 的来源有以下四种:

  • API Server:通过 API Server 监听 etcd 目录获取数据,这也是最主要的来源
  • File:利用命令行参数传递路径,kubelet 周期性地监视此路径下的文件是否有更新,监视周期默认为 20s
  • HTTP Endpoint:利用命令行参数指定 HTTP 端点。 此端点的监视周期默认为 20 秒
  • HTTP Server:kubelet 还可以侦听 HTTP 并响应简单的 API (目前没有完整规范)来提交新的清单

所有以非 API Server 方式创建的 Pod 都叫 Static Pod。Kubelet 将 Static Pod 的状态汇报给 API Server,API Server 为该 Static Pod 创建一个 Mirror Pod 和其相匹配。Mirror Pod 的状态将真实反映 Static Pod 的状态。当 Static Pod 被删除时,与之相对应的 Mirror Pod 也会被删除。

监听 API Server

Kubelet 通过 API Server Client 使用 List/Watch 的方式监听 /registry/Nodes/<NodeName>/registry/Pods 路径,将获取的信息同步到本地缓存中。Kubelet 监听 etcd,所有针对 Pod 的操作都将会被 Kubelet 监听到:

  • 如果发现有新的绑定到本节点的 Pod,则按照 PodSpec 的要求创建该 Pod。
  • 如果发现本地的 Pod 被修改,则 Kubelet 会做出相应的修改,比如删除 Pod 中某个容器时,则通过 ContainerRuntime Client 删除该容器。
  • 如果发现删除本节点的 Pod,则删除相应的 Pod,并通过 ContainerRuntime Client 删除 Pod 中的容器。

Kubelet 读取监听到的信息,如果是创建和修改 Pod 任务,则执行如下处理:

  • 为该 Pod 创建一个数据目录
  • 从 API Server 读取该 PodSpec
  • 为该 Pod 挂载外部卷
  • 下载 Pod 用到的 Secret
  • 检查已经在节点上运行的 Pod,如果该 Pod 没有容器或 Pause 容器没有启动,则先停止 Pod 里所有容器的进程。如果在 Pod 中有需要删除的容器,则删除这些容器
  • kubernetes/pause 镜像为每个 Pod 创建一个容器。Pause 容器用于接管 Pod 中所有其他容器的网络。每创建一个新的 Pod,Kubelet 都会先创建一个 Pause 容器,然后创建其他容器。
  • 为 Pod 中的每个容器做如下处理:
    1. 为容器计算一个 hash 值,然后用容器的名字去 ContainerRuntime 查询对应容器的 hash 值。若查找到容器,且两者 hash 值不同,则停止 Docker 中容器的进程,并停止与之关联的 Pause 容器的进程;若两者相同,则不做任何处理;
    2. 如果容器被终止了,且容器没有指定的 restartPolicy,则不做任何处理;
    3. 调用 ContainerRuntime Client 下载容器镜像,调用 ContainerRuntime Client 运行容器。

SyncLoop 工作原理

Kubelet 的工作核心是 SyncLoop 这个控制循环,驱动整个控制循环的事件包括:

  • kubetypes.PodUpdate:Pod更新事件
  • pleg.PodLifeEvent:Pod生命周期变化
  • periodic sync events:kubelet本身设置的执行周期
  • housekeeping events:定时清理事件

在 SyncLoop 循环上有很多 Manager

kubelet-pleg

Pod Lifecycle Event Generator ,字面意思 Pod 生命周期事件生成器,是 kubelet 的核心模块。PLEG 会一直调用 ContainerRuntime 获取本节点 containers/sandboxes 的信息,并与自身维护的 Pods cache 信息进行对比,生成对应的 PodLifecycleEvent,然后输出到 eventChannel 中,通过 eventChannel 发送到 kubelet syncLoop 进行消费,然后由 kubelet syncPod 来触发 Pod 同步处理过程,最终达到用户的期望状态。

Pod Lifecycle Event:

  • ContainerStarted
  • ContainerDied
  • ContainerRemoved
  • ContainerChanged

PodWorkers

处理事件中 Pod 的同步。核心方法 managePodLoop() 间接调用 kubelet.syncPod() 完成 Pod 的同步:

  • 如果 Pod 正在被创建,记录其延迟
  • 生成 Pod 的 API Status,即 v1.PodStatus:从运行时的 status 转换成 api status
  • 记录 Pod 从 pendingrunning 的耗时
  • StatusManager 中更新 Pod 的状态
  • 杀掉不应该运行的 Pod
  • 如果网络插件未就绪,只启动使用了主机网络(host network)的 Pod
  • 如果 static Pod 不存在,为其创建镜像(Mirror)Pod
  • 为 Pod 创建文件系统目录:Pod 目录、卷目录、插件目录
  • 使用 VolumeManager 为 Pod 挂载卷
  • 获取 image pull secrets
  • 调用容器运行时(container runtime)的 #SyncPod() 方法

PodManager

提供了接口来存储和访问 Pod 的信息,维持 static Pod 和 mirror Pods 的关系,PodManager 会被 statusManager/volumeManager/runtimeManager 所调用,PodManager 的接口处理流程里面会调用 secretManager 以及 configMapManager

ContainerManager

负责 Node 节点上运行的容器的 cgroup 配置信息,kubelet 启动参数如果指定 --cgroups-per-qos 的时候,kubelet 会启动 goroutine 来周期性的更新 Pod 的 cgroup 信息,维护其正确性,该参数默认为 true,实现了 Pod 的Guaranteed/BestEffort/Burstable 三种级别的 Qos。

StatusManager

负责维护状态信息,并把 Pod 状态更新到 apiserver,但是它并不负责监控 Pod 状态的变化,而是提供对应的接口供其他组件调用,比如 probeManager

StatsProvider

提供节点和容器的统计信息,有 cAdvisorCRI 两种实现。

ContainerRuntime

ContainerRuntime 负责 kubelet 与遵循 CRI 规范的不同 runtime 实现进行对接,实现对于底层 container 的操作,初始化之后得到的 runtime 实例将会被之前描述的组件所使用。

containerGC:负责清理 Node 节点上已消亡的 Container,具体的 GC 操作由 runtime 来实现

Deps.PodConfig

PodConfig 是一个配置多路复用器,它将许多 Pod 配置源合并成一个单一的一致结构,然后按顺序向监听器传递增量变更通知。

配置源有:文件、apiserver、HTTP

PodAdmitHandlers

Pod admission 过程中调用的一系列处理器,比如 eviction handler(节点内存有压力时,不会驱逐 QoS 设置为 BestEffort 的 Pod)、shutdown admit handler(当节点关闭时,不处理 Pod 的同步操作)等。

OOMWatcher

系统 OOM 的监听器,会与 cadvisor 模块之间建立 SystemOOM,通过 Watch 方式从 cadvisor 那里收到的 OOM 信号,并产生相关事件。

CertificateManager

处理证书轮换。

ProbeManager

依赖于 statusManagerlivenessManagercontainerRefManager,会定时去监控 Pod 中容器的健康状况。

容器健康检查通过 LivenessProbeReadinessProbe 两类探针来判断容器是否健康。

  • LivenessProbe :用于判断容器是否健康,告诉 Kubelet 一个容器什么时候处于不健康的状态。如果 LivenessProbe 探针探测到容器不健康,则 Kubelet 将删除该容器,并根据容器的重启策略做相应的处理。如果一个容器不包含 LivenessProbe 探针,那么 Kubelet 认为该容器的 LivenessProbe 探针返回的值永远是 Success
  • ReadinessProbe:用于判断容器是否启动完成且准备接收请求。如果 ReadinessProbe 探针探测到失败,则 Pod 的状态将被修改。Endpoint Controller 将从 Service 的 Endpoint 中删除包含该容器所在 Pod 的 IP 地址的 Endpoint 条目。

Kubelet 定期调用容器中的 LivenessProbe 探针来诊断容器的健康状况。LivenessProbe 包含如下三种实现方式:

  • ExecAction:在容器内部执行一个命令,如果该命令的退出状态码为 0,则表明容器健康;
  • TCPSocketAction:通过容器的 IP 地址和端口号执行 TCP 检查,如果端口能被访问,则表明容器健康;
  • HTTPGetAction:通过容器的 IP 地址和端口号及路径调用 HTTP GET 方法,如果响应的状态码大于等于 200 且小于 400,则认为容器状态健康。

EvictionManager

Kubelet 会监控资源的使用情况,并使用驱逐机制防止计算和存储资源耗尽。当节点的内存、磁盘或 iNode 等资源不足时,达到了配置的 evict 策略, Node 会变为 pressure 状态,此时 kubelet 会按照 qosClass 顺序来驱赶 Pod,以此来保证节点的稳定性。

在驱逐时,Kubelet 将 Pod 的所有容器停止,并将 PodPhase 设置为 Failed。Kubelet 定期(housekeeping-interval)检查系统的资源是否达到了预先配置的驱逐阈值,包括

Eviction Signal Condition Description
memory.available MemoryPressue memory.available := Node.status.capacity[memory] - Node.stats.memory.workingSet (计算方法参考这里
Nodefs.available DiskPressure Nodefs.available := Node.stats.fs.available(Kubelet Volume以及日志等)
Nodefs.iNodesFree DiskPressure Nodefs.iNodesFree := Node.stats.fs.iNodesFree
imagefs.available DiskPressure imagefs.available := Node.stats.runtime.imagefs.available(镜像以及容器可写层等)
imagefs.iNodesFree DiskPressure imagefs.iNodesFree := Node.stats.runtime.imagefs.iNodesFree

这些驱逐阈值可以使用百分比,也可以使用绝对值,如

1
2
3
--eviction-hard=memory.available<500Mi,Nodefs.available<1Gi,imagefs.available<100Gi
--eviction-minimum-reclaim="memory.available=0Mi,Nodefs.available=500Mi,imagefs.available=2Gi"`
--system-reserved=memory=1.5Gi

ImageManager

调用 kubecontainer 提供的 PullImage/GetImageRef/ListImages/RemoveImage/ImageStates 方法来保证Pod 运行所需要的镜像。

imageGC: 负责 Node 节点的镜像回收,当本地的存放镜像的本地磁盘空间达到某阈值的时候,会触发镜像的回收,删除掉不被 Pod 所使用的镜像

VolumeManager

负责 Node 节点上 Pod 所使用 Volume 的管理,Volume 与 Pod 的生命周期关联,负责 Pod 创建删除过程中 volume 的 mount/umount/attach/detach 流程,kubernetes 采用 Volume Plugins 的方式,实现存储卷的挂载等操作,内置几十种存储插件。

cAdvisor

Kubernetes 集群中,应用程序的执行情况可以在不同的级别上监测到,这些级别包括:容器、Pod、Service 和整个集群。Kubelet 通过 cAdvisor 获取其所在节点及容器的数据。

cAdvisor 是一个开源的分析容器资源使用率和性能特性的代理工具,集成到 Kubelet中,当Kubelet启动时会同时启动cAdvisor,且一个cAdvisor只监控一个Node节点的信息。cAdvisor 自动查找所有在其所在节点上的容器,自动采集 CPU、内存、文件系统和网络使用的统计信息。cAdvisor 通过它所在节点机的 Root 容器,采集并分析该节点机的全面使用情况。

关于 cAdvisor 更多的内容,可以参考 cAdvisor

源码分析

bootstrap

cmd/kubelet/app/server.goRun 方法为程序实际入口:

  • 调用 RunKubelet 方法。
  • 调用 createAndInitKubelet 方法,创建并初始化 kubelet
    • pkg/kubelet/kubelet.goNewMainKubelet 方法,创建 kubelet的 各种组件。
    • 调用 BirthCry 方法:放出 Starting 事件
    • 调用 StartGarbageCollection 方法,开启 ContainerGCImageGC
  • 调用 startKubelet 方法(大量使用 goroutine 和通道)
    • goroutine:kubelet.Run()
    • 初始化模块
      • metrics 相关
      • 创建文件系统目录目录
      • 创建容器日志目录
      • 启动 ImageGCManager
      • 启动 ServerCertificateManager
      • 启动 OOMWatcher
      • 启动 ResourceAnalyzer
    • goroutine:volumeManager.Run() 开始处理 Pod Volume 的卸载和挂载,保障存储与容器状态一致
    • goroutine:syncNodeStatus() 将节点注册到k8s集群,并收集节点信息定期上报到api-server
    • goroutine:状态更新 fastStatusUpdateOnce() (更新 Pod CIDR -> 更新 ContainerRuntime 状态 -> 更新 Node 节点状态)
    • goroutine: NodeLeaseController.Run() 更新节点租约
    • goroutine:PodKiller.PerformPodKillingWork 杀掉未被正确处理的 Pod
    • initNetworkUtil() 同步 iptables 相关规则
    • StatusManager.Start() 开始向 apiserver 更新 Pod 状态
    • RuntimeClassManager.Start()
    • k.pleg.Start():持续从 ContainerRuntime 获取 Pod/容器的状态,并与 kubelet 本地 cache 中的比较,生成对应的 Event
    • syncLoop() 重点, 持续监控并处理来自文件、apiserver、http 的变更 。包括 Pod 的增加、更新、优雅删除、非优雅删除、调和。
  • 启动 server,暴露 /healthz 端点
  • 通知 systemd kuberlet 服务已经启动

syncLoop

syncLoop 是 Kubelet 的核心同步逻辑。该模块将同时 watch 3 个不同来源的 Pod 信息的变化(file,http,apiserver),一旦某个来源的 Pod 信息发生了更新(创建/更新/删除),PodUpdate channel 中就会出现被更新的 Pod 信息和更新的具体操作。

syncLoop 中首先定义了一个 syncTicker 和 housekeepingTicker,即使没有需要更新的 pod 配置,kubelet 也会定时去做同步和清理 pod 的工作。然后在 for 循环中一直调用 syncLoopIteration,如果在每次循环过程中出现比较严重的错误,kubelet 会记录到 runtimeState 中,遇到错误就等待 5 秒中继续循环。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
(kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
	// ...
  	syncTicker := time.NewTicker(time.Second)
	defer syncTicker.Stop()
	housekeepingTicker := time.NewTicker(housekeepingPeriod)
	defer housekeepingTicker.Stop()
	plegCh := kl.pleg.Watch()
	for {
		...
		kl.syncLoopMonitor.Store(kl.clock.Now())
		if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
			break
		}
		kl.syncLoopMonitor.Store(kl.clock.Now())
	}
}

syncLoop方法在一个循环中不断的调用 syncLoopIteration 方法执行主要逻辑。

syncLoopIteration

syncLoopIteration 这个方法就会对多个管道进行遍历,发现任何一个管道有消息就交给 handler 去处理。它会从以下管道中获取消息:

  • configCh:该信息源由 kubeDeps 对象中的 PodConfig 子模块提供,该模块将同时 watch 3 个不同来源的 pod 信息的变化(file,http,apiserver),一旦某个来源的 pod 信息发生了更新(创建/更新/删除),这个 channel 中就会出现被更新的 pod 信息和更新的具体操作。
  • syncCh:定时器管道,每隔一秒去同步最新保存的 pod 状态
  • houseKeepingCh:housekeeping 事件的管道,做 pod 清理工作
  • plegCh:该信息源由 kubelet 对象中的 pleg 子模块提供,该模块主要用于周期性地向 container runtime 查询当前所有容器的状态,如果状态发生变化,则这个 channel 产生事件。
  • livenessManager.Updates():健康检查发现某个 pod 不可用,kubelet 将根据 Pod 的 restartPolicy 自动执行正确的操作
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// syncLoopIteration 方法会监听多个 channel,当发现任何一个 channel 有数据就交给 handler 去处理,在 handler 中通过调用 dispatchWork 分发任务
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    select {
    case u, open := <-configCh:
        switch case...
    case e := <-plegCh:
        ...
    case <-syncCh:
        ...
    case update := <-kl.livenessManager.Updates():
        ...
    case <-housekeepingCh:
        ...
    }
    return true
}

configCh

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
(kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	case u, open := <-configCh: 
		if !open {
			klog.Errorf("Update channel is closed. Exiting the sync loop.")
			return false
		}

		switch u.Op {
		case kubetypes.ADD:
			klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods)) 
			handler.HandlePodAdditions(u.Pods)
		case kubetypes.UPDATE:
			klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.REMOVE:
			klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
			handler.HandlePodRemoves(u.Pods)
		case kubetypes.RECONCILE:
			klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
			handler.HandlePodReconcile(u.Pods)
		case kubetypes.DELETE:
			klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods)) 
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.SET: 
			klog.Errorf("Kubelet does not support snapshot update")
		default:
			klog.Errorf("Invalid event type received: %d.", u.Op)
		}

		kl.sourcesReady.AddSource(u.Source)

	// ...
}

configCh 读取配置事件的管道,该模块将同时 watch 3 个不同来源的 Pod 信息的变化(file,http,apiserver),一旦某个来源的 Pod 信息发生了更新(创建/更新/删除),这个 channel 中就会出现被更新的 Pod 信息和更新的具体操作。这里对于Pod的操作我们下一篇再讲。

plegCh

PLEG.Start 的时候会每秒钟启动调用一次relist,根据最新的 PodStatus 生成PodLiftCycleEvent,然后存入到 PLEG Channel 中。

syncLoop 会调用 pleg.Watch 方法获取 PLEG Channel 管道,然后传给 syncLoopIteration 方法,在syncLoopIteration方法中也就是 plegCh 这个管道,syncLoopIteration 会消费 plegCh 中的数据,在 handler 中通过调用 dispatchWork 分发任务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
(kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	...
	case e := <-plegCh:
		if e.Type == pleg.ContainerStarted {
			kl.lastContainerStartedTime.Add(e.ID, time.Now())
		}
		if isSyncPodWorthy(e) { 
			if Pod, ok := kl.PodManager.GetPodByUID(e.ID); ok {
				klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(Pod), e)
				handler.HandlePodSyncs([]*v1.Pod{Pod})
			} else { 
				klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
			}
		} 
		if e.Type == pleg.ContainerDied {
			if containerID, ok := e.Data.(string); ok {
				kl.cleanUpContainersInPod(e.ID, containerID)
			}
		}
	...
}

syncCh

syncCh 是由 syncLoop 方法里面创建的一个定时任务,每秒钟会向syncCh添加一个数据,然后就会执行到这里。这个方法会同步所有等待同步的Pod。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
(kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	...
	//	每秒钟会执行到一次
	case <-syncCh:
		// Sync Pods waiting for sync
		PodsToSync := kl.getPodsToSync()
		if len(PodsToSync) == 0 {
			break
		}
		klog.V(4).Infof("SyncLoop (SYNC): %d Pods; %s", len(PodsToSync), format.Pods(PodsToSync))
		//同步最新保存的 Pod 状态
		handler.HandlePodSyncs(PodsToSync)
	...
}

对失败的Pod或者liveness检查失败的Pod进行sync操作。

housekeepingCh

housekeepingCh 这个管道也是由 syncLoop 创建,每两秒钟会触发清理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
(kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	...
	//	每两秒钟执行一次
	case <-housekeepingCh:
		if !kl.sourcesReady.AllReady() { 
			klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
		} else {
			klog.V(4).Infof("SyncLoop (housekeeping)")
			//执行一些清理工作,包括终止Pod workers、删除不想要的Pod,移除volumes、Pod目录
			if err := handler.HandlePodCleanups(); err != nil {
				klog.Errorf("Failed cleaning Pods: %v", err)
			}
		}
	...
}

livenessManager.Updates

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
(kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	...
	case update := <-kl.livenessManager.Updates():
		//如果探针检测失败,需要更新Pod的状态
		if update.Result == proberesults.Failure {
			handleProbeSync(kl, update, handler, "liveness", "unhealthy")
		}
	...
}

readinessManager.Updates

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
(kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	...
	case update := <-kl.readinessManager.Updates():
		ready := update.Result == proberesults.Success
		kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)

		status := ""
		if ready {
			status = "ready"
		}
		handleProbeSync(kl, update, handler, "readiness", status)
	...
}

syncHandler

kl.syncLoop 接受两个参数,第一个是 <-chan kubetypes.PodUpdate,第二个是 SyncHandler,SyncHandler 定义如下:

1
2
3
4
5
6
7
8
9
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
	HandlePodAdditions(pods []*v1.Pod)
	HandlePodUpdates(pods []*v1.Pod)
	HandlePodRemoves(pods []*v1.Pod)
	HandlePodReconcile(pods []*v1.Pod)
	HandlePodSyncs(pods []*v1.Pod)
	HandlePodCleanups() error
}

在 kl.syncLoop 中调用了 kl.syncLoopIteration,接收了以下几种 chan:

1
2
3
4
5
6
7
8
- configCh       <-chan kubetypes.PodUpdate
  - configCh 会触发 HandlePodAdditionsHandlePodUpdatesHandlePodRemovesHandlePodReconcile 四种操作
- syncCh         <-chan time.Time,定时触发的 resync 操作
  - syncCh 会触发 HandlePodSyncs 操作
- housekeepingCh <-chan time.Time,定时触发的清理操作
  - housekeepingCh 会触发 HandlePodCleanups 操作
- plegCh         <-chan *pleg.PodLifecycleEvent
  - plegCh 会触发 HandlePodSyncs 操作

dispatchWork

dispatchWorker 的主要作用是把某个对 Pod 的操作(创建/更新/删除)下发给 podWorkers。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod has completed termination, dispatchWork will perform no action.
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
	// Run the sync in an async worker.
	kl.podWorkers.UpdatePod(UpdatePodOptions{
		Pod:        pod,
		MirrorPod:  mirrorPod,
		UpdateType: syncType,
		StartTime:  start,
	})
	// Note the number of containers for new pods.
	if syncType == kubetypes.SyncPodCreate {
		metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
	}
}

HandlePodAddtions

对于事件中的每个 pod,执行以下操作:

  1. 把所有的 Pod 按照创建日期进行排序,保证最先创建的 Pod 会最先被处理
  2. 把它加入到 podManager 中,podManager 子模块负责管理这台机器上的 Pod 的信息,pod 和 mirrorPod 之间的对应关系等等。所有被管理的 pod 都要出现在里面,如果 podManager 中找不到某个 Pod,就认为这个 Pod 被删除了
  3. 如果是 mirror Pod 调用其单独的方法
  4. 验证 Pod 是否能在该节点运行,如果不可以直接拒绝
  5. 通过 dispatchWork 把创建 Pod 的工作下发给 podWorkers 子模块做异步处理
  6. 在 probeManager 中添加 Pod,如果 Pod 中定义了 readiness 和 liveness 健康检查,启动 goroutine 定期进行检测
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
	start := kl.clock.Now()
	sort.Sort(sliceutils.PodsByCreationTime(pods))
	for _, pod := range pods {
		existingPods := kl.podManager.GetPods()
		// Always add the pod to the pod manager. Kubelet relies on the pod
		// manager as the source of truth for the desired state. If a pod does
		// not exist in the pod manager, it means that it has been deleted in
		// the apiserver and no action (other than cleanup) is required.
		kl.podManager.AddPod(pod)

		if kubetypes.IsMirrorPod(pod) {
			kl.handleMirrorPod(pod, start)
			continue
		}

		// Only go through the admission process if the pod is not requested
		// for termination by another part of the kubelet. If the pod is already
		// using resources (previously admitted), the pod worker is going to be
		// shutting it down. If the pod hasn't started yet, we know that when
		// the pod worker is invoked it will also avoid setting up the pod, so
		// we simply avoid doing any work.
		if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
			// We failed pods that we rejected, so activePods include all admitted
			// pods that are alive.
			activePods := kl.filterOutInactivePods(existingPods)

			// Check if we can admit the pod; if not, reject it.
			if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
				kl.rejectPod(pod, reason, message)
				continue
			}
		}
		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
		kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
	}
}

HandlePodUpdates

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
	start := kl.clock.Now()
	for _, pod := range pods {
		kl.podManager.UpdatePod(pod)
		if kubetypes.IsMirrorPod(pod) {
			kl.handleMirrorPod(pod, start)
			continue
		}
		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
		kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
	}
}

HandlePodRemoves

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
	start := kl.clock.Now()
	for _, pod := range pods {
		kl.podManager.DeletePod(pod)
		if kubetypes.IsMirrorPod(pod) {
			kl.handleMirrorPod(pod, start)
			continue
		}
		// Deletion is allowed to fail because the periodic cleanup routine
		// will trigger deletion again.
		if err := kl.deletePod(pod); err != nil {
			klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err)
		}
	}
}

HandlePodReconcile

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
	start := kl.clock.Now()
	for _, pod := range pods {
		// Update the pod in pod manager, status manager will do periodically reconcile according
		// to the pod manager.
		kl.podManager.UpdatePod(pod)

		// Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation.
		if status.NeedToReconcilePodReadiness(pod) {
			mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
			kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
		}

		// After an evicted pod is synced, all dead containers in the pod can be removed.
		if eviction.PodIsEvicted(pod.Status) {
			if podStatus, err := kl.podCache.Get(pod.UID); err == nil {
				kl.containerDeletor.deleteContainersInPod("", podStatus, true)
			}
		}
	}
}

HandlePodSyncs

1
2
3
4
5
6
7
func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
	start := kl.clock.Now()
	for _, pod := range pods {
		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
		kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
	}
}

PodWorkers

podWorkers 子模块主要的作用就是处理针对每一个的 Pod 的更新事件,比如 Pod 的创建,删除,更新。而 podWorkers 采取的基本思路是:为每一个 Pod 都单独创建一个 goroutine 和更新事件的 channel,goroutine 会阻塞式的等待 channel 中的事件,并且对获取的事件进行处理。而 podWorkers 对象自身则主要负责对更新事件进行下发。

managePodLoop

managePodLoop 调用 syncPodFn 方法去同步 pod,syncPodFn 实际上就是 kubelet.syncPod

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
	var lastSyncTime time.Time
	var podStarted bool
	for update := range podUpdates {
		pod := update.Options.Pod
		err := func() error {
			// ...
			// Take the appropriate action (illegal phases are prevented by UpdatePod)
			switch {
			case update.WorkType == TerminatedPodWork:
				err = p.syncTerminatedPodFn(ctx, pod, status)

			case update.WorkType == TerminatingPodWork:
				var gracePeriod *int64
				if opt := update.Options.KillPodOptions; opt != nil {
					gracePeriod = opt.PodTerminationGracePeriodSecondsOverride
				}
				podStatusFn := p.acknowledgeTerminating(pod)

				err = p.syncTerminatingPodFn(ctx, pod, status, update.Options.RunningPod, gracePeriod, podStatusFn)

			default:
				isTerminal, err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status)
			}

			lastSyncTime = time.Now()
			return err
		}()

		var phaseTransition bool
		switch {
		case err == context.Canceled:
		case err != nil:
			// we will queue a retry
			klog.ErrorS(err, "Error syncing pod, skipping", "pod", klog.KObj(pod), "podUID", pod.UID)

		case update.WorkType == TerminatedPodWork:
			// we can shut down the worker
			p.completeTerminated(pod)
			return

		case update.WorkType == TerminatingPodWork:
			// pods that don't exist in config don't need to be terminated, garbage collection will cover them
			// otherwise we move to the terminating phase
			p.completeTerminating(pod)
			phaseTransition = true

		case isTerminal:
			// if syncPod indicated we are now terminal, set the appropriate pod status to move to terminating
			p.completeSync(pod)
			phaseTransition = true
		}

		// queue a retry if necessary, then put the next event in the channel if any
		p.completeWork(pod, phaseTransition, err)

	}
}

将这个 pod 信息插入 kubelet 的 workQueue 队列中,等待下一次周期性的对这个 pod 的状态进行 sync 将在这次 sync 期间堆积的没有能够来得及处理的最近一次 update 操作加入 goroutine 的事件 channel 中,立即处理。

syncPod

syncPod 用来完成创建容器前的准备工作,在这个方法中,主要完成以下几件事情:

  • 如果是删除 pod,立即执行并返回
  • 同步 podStatus 到 kubelet.statusManager
  • 检查 pod 是否能运行在本节点,主要是权限检查(是否能使用主机网络模式,是否可以以 privileged 权限运行等)。如果没有权限,就删除本地旧的 pod 并返回错误信息
  • 创建 containerManagar 对象,并且创建 pod level cgroup,更新 Qos level cgroup
  • 如果是 static Pod,就创建或者更新对应的 mirrorPod
  • 创建 pod 的数据目录,存放 volume 和 plugin 信息,如果定义了 pv,等待所有的 volume mount 完成(volumeManager 会在后台做这些事情),如果有 image secrets,去 apiserver 获取对应的 secrets 数据
  • 然后调用 kubelet.volumeManager 组件,等待它将 pod 所需要的所有外挂的 volume 都准备好。
  • 调用 container runtime 的 SyncPod 方法,去实现真正的容器创建逻辑

这里所有的事情都和具体的容器没有关系,可以看到该方法是创建 pod 实体(即容器)之前需要完成的准备工作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
    // pull out the required options
    pod := o.pod
    mirrorPod := o.mirrorPod
    podStatus := o.podStatus
    updateType := o.updateType
    // 是否为 删除 pod
    if updateType == kubetypes.SyncPodKill {
        ...
    }
    ...
    // 检查 pod 是否能运行在本节点
    runnable := kl.canRunPod(pod)
    if !runnable.Admit {
        ...
    }
    // 更新 pod 状态
    kl.statusManager.SetPodStatus(pod, apiPodStatus)
    // 如果 pod 非 running 状态则直接 kill 掉
    if !runnable.Admit {
        ...
    }
    // 加载网络插件
    if rs := kl.runtimeState.networkErrors(); len(rs) != 0 && !kubecontainer.IsHostNetworkPod(pod) {
        ...
    }
    pcm := kl.containerManager.NewPodContainerManager()
	if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
        ...
        // 创建并更新 pod 的 cgroups
        if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
            if !pcm.Exists(pod) {
                ...
            }
        }
    }
    // 为 static pod 创建对应的 mirror pod
    if kubepod.IsStaticPod(pod) {
        ...
    }
    // 创建数据目录
    if err := kl.makePodDataDirs(pod); err != nil {
        ...
    }
    // 挂载 volume
	if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
		if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
            ...
        }
    }
    // 获取 secret 信息
    pullSecrets := kl.getPullSecretsForPod(pod)
    // 调用 containerRuntime 的 SyncPod 方法开始创建容器
    result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
    kl.reasonCache.Update(pod.UID, result)
    if err := result.Error(); err != nil {
        ...
    }
    return nil
}

syncTerminatingPod

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// syncTerminatingPod is expected to terminate all running containers in a pod. Once this method
// returns without error, the pod's local state can be safely cleaned up. If runningPod is passed,
// we perform no status updates.
func (kl *Kubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
	// when we receive a runtime only pod (runningPod != nil) we don't need to update the status
	// manager or refresh the status of the cache, because a successful killPod will ensure we do
	// not get invoked again
	if runningPod != nil {
		if err := kl.killPod(pod, *runningPod, gracePeriod); err != nil {
			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
			// there was an error killing the pod, so we return that error directly
			utilruntime.HandleError(err)
			return err
		}
		//...
	}
	// ...
}

syncTerminatedPod

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (kl *Kubelet) syncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
	// ...
	if err := kl.volumeManager.WaitForUnmount(pod); err != nil {
		return err
	}
	if kl.secretManager != nil {
		kl.secretManager.UnregisterPod(pod)
	}
	if kl.configMapManager != nil {
		kl.configMapManager.UnregisterPod(pod)
	}
	// remove any cgroups in the hierarchy for pods that are no longer running.
	if kl.cgroupsPerQOS {
		pcm := kl.containerManager.NewPodContainerManager()
		name, _ := pcm.GetPodContainerName(pod)
		if err := pcm.Destroy(name); err != nil {
			return err
		}
		klog.V(4).InfoS("Pod termination removed cgroups", "pod", klog.KObj(pod), "podUID", pod.UID)
	}

	kl.usernsManager.Release(pod.UID)

	// mark the final pod status
	kl.statusManager.TerminatePod(pod)

	return nil
}

ContainerRuntime

SyncPod

ContainerRuntime(pkg/kubelet/kuberuntime)子模块的 SyncPod 函数才是真正完成 Pod 内容器实体的创建。 syncPod 主要执行以下几个操作:

  1. Compute sandbox and container changes.
  2. Kill pod sandbox if necessary.
  3. Kill any containers that should not be running.
  4. Create sandbox if necessary.
  5. Create ephemeral containers.
  6. Create init containers.
  7. Create normal containers.

initContainers 可以有多个,多个 container 严格按照顺序启动,只有当前一个 container 退出了以后,才开始启动下一个 container。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
	// Step 1: Compute sandbox and container changes.
	podContainerChanges := m.computePodActions(pod, podStatus)

	// Step 2: Kill the pod if the sandbox has changed.
	if podContainerChanges.KillPod {
		// ...
		killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
		if podContainerChanges.CreateSandbox {
			m.purgeInitContainers(pod, podStatus)
		}
		// ...
	} else {
		// Step 3: kill any running containers in this pod which are not to keep.
        ...
        for containerID, containerInfo := range podContainerChanges.ContainersToKill {
            ...
            if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
                ...
            }
        }
    }
    m.pruneInitContainersBeforeStart(pod, podStatus)
    podIP := ""
    if podStatus != nil {
        podIP = podStatus.IP
    }

	// Step 4: Create a sandbox for the pod if necessary.
    podSandboxID := podContainerChanges.SandboxID
    if podContainerChanges.CreateSandbox {
        podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
        if err != nil {
            ...
        }
        ...
        podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
        if err != nil {
            ...
        }
        // 如果 pod 网络是 host 模式,容器也相同;其他情况下,容器会使用 None 网络模式,让 kubelet 的网络插件自己进行网络配置
        if !kubecontainer.IsHostNetworkPod(pod) {
            podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus)
            glog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod))
        }
    }
    configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
    // 获取 PodSandbox 的配置(如:metadata,clusterDNS,容器的端口映射等)
    podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
    ...
	// Step 5: start ephemeral containers
	for _, idx := range podContainerChanges.EphemeralContainersToStart {
		start("ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
	}

	// Step 6: start the init container.
    if container := podContainerChanges.NextInitContainerToStart; container != nil {
    	// Start the next init container.
		if err := start("init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
			return
		}
    }
	// Step 7: start containers in podContainerChanges.ContainersToStart.
    for _, idx := range podContainerChanges.ContainersToStart {
 		start("container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
    }
    return
}

startContainer

It starts the container through the following steps:

  1. pull the image
  2. create the container
  3. start the container
  4. run the post start lifecycle hooks (if applicable)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, containerType kubecontainer.ContainerType) (string, error) {
	// Step 1: pull the image.
	imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
    if err != nil {
        ...
    }
 
	// Step 2: create the container.
    restartCount := 0
    containerStatus := podStatus.FindContainerStatusByName(container.Name)
    if containerStatus != nil {
        restartCount = containerStatus.RestartCount + 1
    }
	containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, podIPs, target)
    if cleanupAction != nil {
        defer cleanupAction()
    }
    ...
	err = m.internalLifecycle.PreCreateContainer(pod, container, containerConfig)
 	if err != nil {
        ...
    }

    containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
    if err != nil {
        ...
    }
    err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
    if err != nil {
        ...
    }
    ...

	// Step 3: start the container.
    err = m.runtimeService.StartContainer(containerID)
    if err != nil {
        ...
    }
    containerMeta := containerConfig.GetMetadata()
    sandboxMeta := podSandboxConfig.GetMetadata()
    legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
        sandboxMeta.Namespace)
    containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
    if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
        if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
            glog.Errorf("Failed to create legacy symbolic link %q to container %q log %q: %v",
                legacySymlink, containerID, containerLog, err)
        }
    }

	// Step 4: execute the post start hook.
    if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
        kubeContainerID := kubecontainer.ContainerID{
            Type: m.runtimeName,
            ID:   containerID,
        }
        // runner.Run 这个方法的主要作用就是在业务容器起来的时候,
        // 首先会执行一个 container hook(PostStart 和 PreStop),做一些预处理工作。
        // 只有 container hook 执行成功才会运行具体的业务服务,否则容器异常。
        msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
        if handlerErr != nil {
            ...
        }
    }
    return "", nil
}