Informer
List/Watch 机制是 Kubernetes 中实现集群控制模块最核心的设计之一,它采用统一的异步消息处理机制,保证了消息的实时性、可靠性、顺序性和性能等,为声明式风格的 API 奠定了良好的基础。Informer 模块是 Kubernetes 中的基础组件,负责各组件与 ApiServer 的资源与事件同步。Kubernetes 中的组件,如果要访问 Kubernetes 中的 Object,绝大部分情况下会使用 Informer 中的 Lister()方法,而非直接请求 Kubernetes API。
原理示意
client-go
Reflector:reflector 用来 watch 特定的 k8s API 资源。具体的实现是通过ListAndWatch的方法,watch 可以是 k8s 内建的资源或者是自定义的资源。当 reflector 通过watch API接收到有关新资源实例存在的通知时,它使用相应的list API获取新创建的对象,并将其放入 watchHandler 函数内的 Delta FIFO 队列中。Informer:informer 从 Delta FIFO 队列中弹出对象。执行此操作的功能是 processLoop。base controller 的作用是保存对象以供以后检索,并调用我们的控制器将对象传递给它。Indexer:索引器提供对象的索引功能。典型的索引用例是基于对象标签创建索引。 Indexer 可以根据多个索引函数维护索引。Indexer 使用线程安全的数据存储来存储对象及其键。
|
|
controller
-
Informer reference:指的是 Informer 实例的引用,定义如何使用自定义资源对象。 自定义控制器代码需要创建对应的 Informer。 -
Indexer reference: 自定义控制器对 Indexer 实例的引用。自定义控制器需要创建对应的 Indexser。 -
Workqueue:任务队列。 编写资源事件处理程序函数以提取传递的对象的 key 并将其添加到任务队列。
|
|
EventHandler
Resource Event Handlers:资源事件回调函数,当它想要将对象传递给 controller 时,它将被调用。编写这些函数的典型模式是获取调度对象的 key,并将该 key 排入工作队列以进行进一步处理。
|
|
ProcessItems
Process Item:处理任务队列中对象的函数,这些函数通常使用 Indexer 引用或 Listing 包装器来重试与该 key 对应的对象。
关键设计
Informer 只会调用 Kubernetes List 和 Watch 两种类型的 API。 通过 Lister()对象来 List/Get 对象时,Informer 不会去请求 Kubernetes API,而是直接查询本地缓存,减少对 Kubernetes API 的直接调用。
- Informer 在初始化时,调用 Kubernetes List API 获得某种 resource 的全部 Object,缓存在内存中;
- 接下来,Informer 调用 Watch API 去 watch 这种 resource,去维护这份缓存;
- 最后,Informer 就不再调用 Kubernetes 的任何 API。
我们以 Pod 为例,详细说明一下 Informer 的关键逻辑:
- Informer 在初始化时,Reflector 会先 List API 获得所有的 Pod
- Reflector 拿到全部 Pod 后,会将全部 Pod 放到 Store 中
- 如果有人调用 Lister 的 List/Get 方法获取 Pod, 那么 Lister 会直接从 Store 中拿数据
- Informer 初始化完成之后,Reflector 开始 Watch Pod,监听 Pod 相关 的所有事件。如果此时 pod_1 被删除,那么 Reflector 会监听到这个事件
- Reflector 将 pod_1 被删除 的这个事件发送到 DeltaFIFO
- DeltaFIFO 首先会将这个事件存储在自己的数据结构中(实际上是一个 queue),然后会直接操作 Store 中的数据,删除 Store 中的 pod_1
- DeltaFIFO 再 Pop 这个事件到 Controller 中
- Controller 收到这个事件,会触发 Processor 的回调函数
Informer 组件:
- Controller
- Reflector:通过 Kubernetes Watch API 监听 resource 下的所有事件
- Lister:用来被调用 List/Get 方法
- Processor:记录并触发回调函数
- DeltaFIFO
- LocalStore
DeltaFIFO 和 LocalStore 是 Informer 的两级缓存。 DeltaFIFO:用来存储 Watch API 返回的各种事件。 LocalStore:Lister 的 List/Get 方法访问。
设计目标
之前说到 kubernetes 里面的 apiserver 的只负责数据的 CRUD 接口实现,并不负责业务逻辑的处理,所以 k8s 中就通过外挂 controller 通过对应资源的控制器来负责事件的处理,controller 如何感知事件呢?答案就是 informer
基于 chunk 的消息通知
watcher 的设计在之前的文章中已经介绍,服务端是如何将 watcher 感知到的事件发送给 informer 呢?我们提到过 apiserver 本质上就是一个 http 的 rest 接口实现,watch 机制则也是基于 http 协议,不过不同于一般的 get,其通过 chunk 机制,来实现消息的通知
reflector
服务端通过 chunk 进行数据的发送,在客户端同样的需要根据对应的 chunk 来进行数据的解包,同时还要维护这个长链接
本地缓存
通过 listwatch 接口主要分为两部分,list 接口我们可以获取到对应资源当前版本的全量资源,watch 接口可以获取到后续变更的资源,通过全量加增量的数据,就构成了在 client 端一份完整的数据(基于当前版本的),那后续如果要获取对应的数据,就直接可以通过本地的缓存来进行获取,为此 informer 抽象了 cache 这个组件,并且实现了 store 接口,如果后续要获取资源,则就可以通过本地的缓存来进行获取
本地索引
上面将资源缓存在本地的内存中,那如果我们要进行数据查询,快速检索数据,这个时候就需要用到 informer 里面的 indexer。我们可以注册不同的索引函数,在添加对象的时候,会通过 indexer 为其建立对应的索引,这样后续我们就可以通过 key 来检索获取元数据
无界队列
为了协调数据生产与消费的不一致状态,在 cleint-go 中通过实现了一个无界队列来进行数据的缓冲,当 reflector 获取到数据之后,只需要将数据写入到无界队列中,则就可以继续 watch 后续事件,从而减少阻塞时间, 下面的事件去重也是在该队列中实现的
事件去重
事件去重是指的,在上面的无界队列中,如果针对某个资源的事件重复被触发,则就只会保留相同事件最后一个事件作为后续处理
到此对于事件接收和数据缓存相关优化就结束了,接下就是处理层的优化
复用连接
在 k8s 中一些控制器可能会关注多种资源,比如 Deployment 可能会关注 Pod 和 replicaset,replicaSet 可能还会关注 Pod,为了避免每个控制器都独立的去与 apiserver 建立链接,k8s 中抽象了 sharedInformer 的概念,即共享的 informer, 针对同一资源只建立一个链接
基于观察者模式的注册
因为彼此共用 informer,但是每个组件的处理逻辑可能各部相同,在 informer 中通过观察者模式,各个组件可以注册一个 EventHandler 来实现业务逻辑的注入
设计总结
源码解析
该部分的代码主要位于client-go这个第三方包中。
此部分的逻辑主要位于/vendor/k8s.io/client-go/tools/cache包中,代码目录结构如下:
|
|
sharedInformerFactory.Start
在 controller-manager 的 Run 函数部分调用了 InformerFactory.Start 的方法。
此部分代码位于/cmd/kube-controller-manager/app/controllermanager.go
|
|
InformerFactory 是一个SharedInformerFactory的接口,接口定义如下:
此部分代码位于 vendor/k8s.io/client-go/informers/internalinterfaces/factory_interfaces.go
|
|
Start 方法初始化各种类型的 informer,并且每个类型起了个 informer.Run 的 goroutine。其中,通过startdInformers这个 map 用来追踪有哪些 Informer 已经启动,从而可以让 Start 方法被多次调用。
此部分代码位于 vendor/k8s.io/client-go/informers/factory.go
|
|
sharedIndexInformer.Run
此部分的代码位于/vendor/k8s.io/client-go/tools/cache/shared_informer.go
|
|
NewDeltaFIFO
DeltaFIFO 是一个对象变化的存储队列,依据先进先出的原则,process 的函数接收该队列的 Pop 方法的输出对象来处理相关功能。
|
|
Config
构造 controller 的配置文件,构造 process,即 HandleDeltas,该函数为后面使用到的 process 函数。
|
|
controller
调用 New(cfg),构建 sharedIndexInformer 的 controller。
|
|
cacheMutationDetector.Run
调用 s.cacheMutationDetector.Run,检查缓存对象是否变化。
|
|
defaultCacheMutationDetector.Run
|
|
CompareObjects
|
|
processor.run
调用 s.processor.run,将调用 sharedProcessor.run,会调用 Listener.run 和 Listener.pop,执行处理 queue 的函数。
|
|
sharedProcessor.Run
|
|
该部分逻辑待后面分析。
controller.Run
调用 s.controller.Run,构建 Reflector,进行对 etcd 的缓存
|
|
controller.Run
此部分代码位于/vendor/k8s.io/client-go/tools/cache/controller.go
|
|
核心代码:
|
|
Reflector
Reflector
Reflector的主要作用是 watch 指定的 k8s 资源,并将变化同步到本地是store中。Reflector只会放置指定的expectedType类型的资源到store中,除非expectedType为 nil。如果resyncPeriod不为零,那么Reflector为以resyncPeriod为周期定期执行 list 的操作,这样就可以使用Reflector来定期处理所有的对象,也可以逐步处理变化的对象。
常用属性说明:
- expectedType:期望放入缓存 store 的资源类型。
- store:watch 的资源对应的本地缓存。
- listerWatcher:list 和 watch 的接口。
- period:watch 的周期,默认为 1 秒。
- resyncPeriod:resync 的周期,当非零的时候,会按该周期执行 list。
- lastSyncResourceVersion:最新一次看到的资源的版本号,主要在 watch 时候使用。
|
|
NewReflector
NewReflector 主要用来构建 Reflector 的结构体。
此部分的代码位于/vendor/k8s.io/client-go/tools/cache/reflector.go
|
|
Reflector.Run
Reflector.Run 主要执行了ListAndWatch的方法。
|
|
ListAndWatch
ListAndWatch 第一次会列出所有的对象,并获取资源对象的版本号,然后 watch 资源对象的版本号来查看是否有被变更。首先会将资源版本号设置为 0,list()可能会导致本地的缓存相对于 etcd 里面的内容存在延迟,Reflector会通过watch的方法将延迟的部分补充上,使得本地的缓存数据与 etcd 的数据保持一致。
List
|
|
首先将资源的版本号设置为 0,然后调用listerWatcher.List(options),列出所有 list 的内容。
|
|
获取资源版本号,并将 list 的内容提取成对象列表。
|
|
将 list 中对象列表的内容和版本号存储到本地的缓存 store 中,并全量替换已有的 store 的内容。
|
|
syncWith 调用了 store 的 Replace 的方法来替换原来 store 中的数据。
|
|
Store.Replace 方法定义如下:
|
|
最后设置最新的资源版本号。
|
|
setLastSyncResourceVersion:
|
|
store.Resync
|
|
核心代码:
|
|
store 的具体对象为DeltaFIFO,即调用 DeltaFIFO.Resync
|
|
Watch
|
|
设置 watch 的超时时间,默认为 5 分钟。
|
|
执行 listerWatcher.Watch(options)。
|
|
执行 watchHandler。
|
|
watchHandler
watchHandler 主要是通过 watch 的方式保证当前的资源版本是最新的。
|
|
获取 watch 接口中的事件的 channel,来获取事件的内容。
|
|
当获得添加、更新、删除的事件时,将对应的对象更新到本地缓存 store 中。
|
|
更新当前的最新版本号。
|
|
通过对 Reflector 模块的分析,可以看到多次使用到本地缓存 store 模块,而 store 的数据由 DeltaFIFO 赋值而来,以下针对 DeltaFIFO 和 store 做分析。
DeltaFIFO
DeltaFIFO 由 NewDeltaFIFO 初始化,并赋值给 config.Queue。
|
|
NewDeltaFIFO
|
|
controller.Run 的部分调用了 NewReflector。
|
|
NewReflector 构造函数,将 c.config.Queue 赋值给 Reflector.store 的属性。
|
|
DeltaFIFO
DeltaFIFO 是一个生产者与消费者的队列,其中 Reflector 是生产者,消费者调用 Pop()的方法。
DeltaFIFO 主要用在以下场景:
- 希望对象变更最多处理一次
- 处理对象时,希望查看自上次处理对象以来发生的所有事情
- 要处理对象的删除
- 希望定期重新处理对象
|
|
Queue & Store
DeltaFIFO 的类型是 Queue 接口,Reflector.store 是 Store 接口,Queue 接口是一个存储队列,Process 的方法执行 Queue.Pop 出来的数据对象,
|
|
store
Store是一个通用的存储接口,Reflector 通过 watch server 的方式更新数据到 store 中,store 给 Reflector 提供本地的缓存,让 Reflector 可以像消息队列一样的工作。
Store实现的是一种可以准确的写入对象和获取对象的机制。
|
|
其中Replace方法会删除原来 store 中的内容,并将新增的 list 的内容存入 store 中,即完全替换数据。
cache
cache 实现了 store 的接口,而 cache 的具体实现又是调用ThreadSafeStore接口来实现功能的。
cache 的功能主要有以下两点:
- 通过 keyFunc 计算对象的 key
- 调用 ThreadSafeStorage 接口的方法
|
|
其中 ListAndWatch 主要用到以下的方法:
cache.Replace
|
|
cache.Add
|
|
cache.Update
|
|
cache.Delete
|
|
ThreadSafeStore
cache 的具体是调用ThreadSafeStore来实现的。
|
|
threadSafeMap
|
|
processLoop
|
|
在 controller.Run 方法中会调用 processLoop,以下分析 processLoop 的处理逻辑。
|
|
processLoop 主要处理任务队列中的任务,其中处理逻辑是调用具体的 ProcessFunc 函数来实现,核心代码为:
|
|
DeltaFIFO.Pop
Pop 会阻塞住直到队列里面添加了新的对象,如果有多个对象,按照先进先出的原则处理,如果某个对象没有处理成功会重新被加入该队列中。
Pop 中会调用具体的 process 函数来处理对象。
|
|
核心代码:
|
|
HandleDeltas
|
|
其中 process 函数就是在 sharedIndexInformer.Run 方法中,给 config.Process 赋值的HandleDeltas函数。
|
|
核心代码:
|
|
根据不同的类型,调用processor.distribute方法,该方法将对象加入processorListener的 channel 中。
sharedProcessor.distribute
|
|
processorListener.add:
|
|
综合以上的分析,可以看出 processLoop 通过调用 HandleDeltas,再调用 distribute,processorListener.add 最终将不同更新类型的对象加入processorListener的 channel 中,供 processorListener.Run 使用。以下分析 processorListener.Run 的部分。
processor
processor 的主要功能就是记录了所有的回调函数实例(即 ResourceEventHandler 实例),并负责触发这些函数。在 sharedIndexInformer.Run 部分会调用 processor.run。
流程:
- listenser 的 add 函数负责将 notify 装进 pendingNotifications。
- pop 函数取出 pendingNotifications 的第一个 nofify,输出到 nextCh channel。
- run 函数则负责取出 notify,然后根据 notify 的类型(增加、删除、更新)触发相应的处理函数,这些函数是在不同的
NewXxxcontroller实现中注册的。
|
|
sharedProcessor.Run
|
|
listener.pop
pop 函数取出 pendingNotifications 的第一个 nofify,输出到 nextCh channel。
|
|
listener.run
listener.run 部分根据不同的更新类型调用不同的处理函数。
|
|
其中具体的实现函数 handler 是在 NewDeploymentController(其他不同类型的 controller 类似)中赋值的,而该 handler 是一个接口,具体如下:
|
|
ResourceEventHandler
以下以 DeploymentController 的处理逻辑为例。
在NewDeploymentController部分会注册 deployment 的事件函数,以下注册了三种类型的事件函数,其中包括:dInformer、rsInformer 和 podInformer。
|
|
addDeployment
以下以addDeployment为例,addDeployment 主要是将对象加入到 enqueueDeployment 的队列中。
|
|
enqueueDeployment 的定义
|
|
将 dc.enqueue 赋值给 dc.enqueueDeployment
|
|
dc.enqueue 调用了 dc.queue.Add(key)
|
|
dc.queue 主要记录了需要被同步的 deployment 的对象,供 syncDeployment 使用。
|
|
NewNamedRateLimitingQueue
|
|
通过以上分析,可以看出 processor 记录了不同类似的事件函数,其中事件函数在 NewXxxController 构造函数部分注册,具体事件函数的处理,一般是将需要处理的对象加入对应的 controller 的任务队列中,然后由类似 syncDeployment 的同步函数来维持期望状态的同步逻辑。
总结
本文分析的部分主要是 k8s 的informer机制,即List-Watch机制。
Reflector
Reflector的主要作用是 watch 指定的 k8s 资源,并将变化同步到本地是store中。Reflector只会放置指定的expectedType类型的资源到store中,除非expectedType为 nil。如果resyncPeriod不为零,那么Reflector为以resyncPeriod为周期定期执行 list 的操作,这样就可以使用Reflector来定期处理所有的对象,也可以逐步处理变化的对象。
ListAndWatch
ListAndWatch 第一次会列出所有的对象,并获取资源对象的版本号,然后 watch 资源对象的版本号来查看是否有被变更。首先会将资源版本号设置为 0,list() 可能会导致本地的缓存相对于 etcd 里面的内容存在延迟,Reflector 会通过 watch 的方法将延迟的部分补充上,使得本地的缓存数据与 etcd 的数据保持一致。
DeltaFIFO
DeltaFIFO是一个生产者与消费者的队列,其中 Reflector 是生产者,消费者调用 Pop()的方法。
DeltaFIFO 主要用在以下场景:
- 希望对象变更最多处理一次
- 处理对象时,希望查看自上次处理对象以来发生的所有事情
- 要处理对象的删除
- 希望定期重新处理对象
Store
Store是一个通用的存储接口,Reflector 通过 watch server 的方式更新数据到 store 中,store 给 Reflector 提供本地的缓存,让 Reflector 可以像消息队列一样的工作。
Store实现的是一种可以准确的写入对象和获取对象的机制。
Processor
processor的主要功能就是记录了所有的回调函数实例(即 ResourceEventHandler 实例),并负责触发这些函数。在 sharedIndexInformer.Run 部分会调用 processor.run。
流程:
- listenser 的 add 函数负责将 notify 装进 pendingNotifications。
- pop 函数取出 pendingNotifications 的第一个 nofify,输出到 nextCh channel。
- run 函数则负责取出 notify,然后根据 notify 的类型(增加、删除、更新)触发相应的处理函数,这些函数是在不同的
NewXxxcontroller实现中注册的。
processor记录了不同类似的事件函数,其中事件函数在NewXxxController构造函数部分注册,具体事件函数的处理,一般是将需要处理的对象加入对应的 controller 的任务队列中,然后由类似syncDeployment的同步函数来维持期望状态的同步逻辑。
主要步骤
- 在 controller-manager 的 Run 函数部分调用了 InformerFactory.Start 的方法,Start 方法初始化各种类型的 informer,并且每个类型起了个 informer.Run 的 goroutine。
- informer.Run 的部分先生成一个 DeltaFIFO 的队列来存储对象变化的数据。然后调用 processor.Run 和 controller.Run 函数。
- controller.Run 函数会生成一个 Reflector,
Reflector的主要作用是 watch 指定的 k8s 资源,并将变化同步到本地是store中。Reflector以resyncPeriod为周期定期执行 list 的操作,这样就可以使用Reflector来定期处理所有的对象,也可以逐步处理变化的对象。 - Reflector 接着执行 ListAndWatch 函数,ListAndWatch 第一次会列出所有的对象,并获取资源对象的版本号,然后 watch 资源对象的版本号来查看是否有被变更。首先会将资源版本号设置为 0,
list()可能会导致本地的缓存相对于 etcd 里面的内容存在延迟,Reflector会通过watch的方法将延迟的部分补充上,使得本地的缓存数据与 etcd 的数据保持一致。 - controller.Run 函数还会调用 processLoop 函数,processLoop 通过调用 HandleDeltas,再调用 distribute,processorListener.add 最终将不同更新类型的对象加入
processorListener的 channel 中,供 processorListener.Run 使用。 - processor 的主要功能就是记录了所有的回调函数实例(即 ResourceEventHandler 实例),并负责触发这些函数。processor 记录了不同类型的事件函数,其中事件函数在 NewXxxController 构造函数部分注册,具体事件函数的处理,一般是将需要处理的对象加入对应的 controller 的任务队列中,然后由类似 syncDeployment 的同步函数来维持期望状态的同步逻辑。
常见问题
Informer 二级缓存中的同步问题
虽然 Informer 和 Kubernetes 之间没有 resync 机制,但 Informer 内部的这两级缓存 DeltaIFIFO 和 LocalStore 之间会存在 resync 机制,k8s 中 kube-controller-manager 的 StatefulSetController 中使用了两级缓存的 resync 机制(如下图所示),我们在生产环境中发现 sts 创建后过了很久 pod 才会创建,主要是由于 StatefulSetController 的两级缓存之间 30s 会同步一次,由于 StatefulSetController watch 到变化后就会把对应的 sts 放入 DeltaIFIFO 中,且每隔 30s 会把 LocalStore 中全部的 sts 重新入一遍 DeltaIFIFO,入队时会做一些处理,过滤掉一些不需要重复入队列的 sts,若间隔的 30s 内没有处理完队列中所有的 sts,则待处理队列中始终存在未处理完的 sts,并且在同步过程中产生的 sts 会加的队列的尾部,新加入队尾的 sts 只能等到前面的 sts 处理完成(也就是 resync 完成)才会被处理,所以导致的现象就是 sts 创建后过了很久 pod 才会创建。
优化的方法就是去掉二级缓存的同步策略(将 setInformer.Informer().AddEventHandlerWithResyncPeriod() 改为 informer.AddEventHandler())或者调大同步周期,但是在研究 kube-controller-manager 其他 controller 时发现并不是所有的 controller 都有同步策略,社区也有相关的 issue 反馈了这一问题,Remove resync period for sset controller,社区也会在以后的版本中去掉两级缓存之间的 resync 策略。
|
|
使用 Informer 如何监听所有资源对象?
一个 Informer 实例只能监听一种 resource,每个 resource 需要创建对应的 Informer 实例。
为什么不使用 workqueue?
建议使用 RateLimitingQueue,它相比普通的 workqueue 多了以下的功能:
- 限流:可以限制一个 item 被 reenqueued 的次数。
- 防止 hot loop:它保证了一个 item 被 reenqueued 后,不会马上被处理。
参考文章
-
No backlinks found.