Kubelet Device Manager
摘要: kubernetes 中定义 node 的资源默认只包括:CPU、Memory、Pods、hugepages、ephemeral-storage 等。但实际上服务器上的设备不仅仅只包括这些。并且随着机器学习,人工智能的发展,服务器上越来越多的插入支持并行计算的设备,例如 GPU、FPGA 和一些厂商自研的芯片。为解决更多设备如何对 kubernetes 暴露并使用,并能和 kubernetes 解耦提升 kubernetes 代码的稳定性,提出了 Device Manager Proposal.
Device Manager 初始化
Device Manager 时序图
如上图所示,Device Manager 的初始化和启动是跟随 Container Manager 完成的。
// NewManagerImpl creates a new manager.
func NewManagerImpl(topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
return newManagerImpl(pluginapi.KubeletSocket, topology, topologyAffinityStore)
}
Device Manager 初始化时,需要提供节点的 numa 拓扑结构(topology)和拓扑关系存储查询接口(topologyAffinityStore),以及 Device Manager RPC Server 的 unix socket 接口的位置(pluginapi.KubeletSocket)。
- numa 拓扑结构,统计节点的 numa 节点个数,为容器分配设备时提供 Topology hints。
- 拓扑关系存储查询接口,为容器分配设备时,检查是否已经做过 Topology hints。
- Device Manager RPC Server 的 unix socket 接口的位置目前是硬编码到代码中的,位置是:/var/lib/kubelet/device-plugins/kubelet.sock
// Start starts the Device Plugin Manager and start initialization of
// podDevices and allocatedDevices information from checkpointed state and
// starts device plugin registration service.
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
//--------省略部分不重要代码--------
// Loads in allocatedDevices information from disk.
err := m.readCheckpoint()
if err != nil {
klog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
}
//--------省略部分不重要代码--------
s, err := net.Listen("unix", socketPath)
if err != nil {
klog.Errorf(errListenSocket+" %v", err)
return err
}
m.wg.Add(1)
m.server = grpc.NewServer([]grpc.ServerOption{}...)
pluginapi.RegisterRegistrationServer(m.server, m)
go func() {
defer m.wg.Done()
m.server.Serve(s)
}()
klog.V(2).Infof("Serving device plugin registration server on %q", socketPath)
return nil
}
Device Manager 启动的时候,首先会加载 checkpoint 文件中内容,然后启动一个 RPC Server。
checkpoint 文件
checkpoint 文件记录了 Device Manager 模块注册的资源列表和 Device Manager 为每个 container 分配的资源信息。
// Data holds checkpoint data and its checksum
type Data struct {
Data checkpointData
Checksum checksum.Checksum
}
// checkpointData struct is used to store pod to device allocation information
// in a checkpoint file.
// TODO: add version control when we need to change checkpoint format.
type checkpointData struct {
//PodDeviceEntries 记录为pod中container分配的资源信息。以GPU设备为例会记录如下数据: PodUID->container name -> resource name -> device ID等信息
PodDeviceEntries []PodDevicesEntry
//RegisteredDevices 记录注册的Device
RegisteredDevices map[string][]string
}
checkpoint 文件位置: /var/lib/kubelet/device-plugins/kubelet_internal_checkpoint。Device Manager 在以下情况下都会更新 checkpoint 文件的内容:
- 成功为容器分配 device
- device plugin 通过 ListAndWatch 接口更新 device 状态
- kubelet 定期更新 node status 时,注册的 device 的数量或者状态发生变化
Device Manager 模块为什么要记录 checkpoint 文件?主要是在 Device Manager 重启的时候可以重新加载历史数据。如果不加载历史数据,Device Manager 在为容器分配设备时,会认为所有的设备都没有分配。这样当有新的 pod 调度到节点上,申请设备的时候,会出现设备重新分配,多个容器共用同一个设备的问题。因此当您出现多个容器共用同一个设备的问题时,你可以排查一下几个方面:
- kubelet_internal_checkpoint 文件是否丢失
- kubelet_internal_checkpoint 文件是否出现版本兼容问题
当出现这个问题之后如何修复呢?答案是:把节点上的容器全部重启一下。或者你自己生成正确的 kubelet_internal_checkpoint 文件,重启 kubelet。
RPC Server
Device Manger 与 Device Plugin RPC 通信
service Registration {
rpc Register(RegisterRequest) returns (Empty) {}
}
message RegisterRequest {
// Version of the API the Device Plugin was built against
string version = 1;
// Name of the unix socket the device plugin is listening on
// PATH = path.Join(DevicePluginPath, endpoint)
string endpoint = 2;
// Schedulable resource name. As of now it's expected to be a DNS Label
string resource_name = 3;
// Options to be communicated with Device Manager
DevicePluginOptions options = 4;
}
如上是 Device Manager 和 Device Plugin RPC 通信图。Device Manager 的 RPC Server 只包含一个接口 Register。提供给 Device Plugin 做插件的服务发现。Device Plugin 也需提供一个 RPC Server,并提供两个接口 ListAndWatch、Allocate。
当 Device Plugin 启动时,需要对 Device Manager 公开的 Register 接口进行(客户端)gRPC 调用。Device Plugin 向 Device Manager 发送 RegisterRequest(通过 gRPC 请求)。Device Manager 成功响应请求后,Device Pluging 应立即启动自己的 RPC Server 服务,对外提供 ListAndWatch 和 Allocate 接口。
- ListAndWatch 接口,当 Device Plugin 注册成功后,Device Manager 会调用 Device Plugin 的 ListAndWatch 接口建立长链接,阻塞等待接受 Device Plugin 传送过来的设备状态。
func (e *endpointImpl) run() {
//调用Device Plugin的ListAndWatch接口
stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{})
if err != nil {
klog.Errorf(errListAndWatch, e.resourceName, err)
return
}
// 阻塞循环接受Device Plugin的response
for {
response, err := stream.Recv()
if err != nil {
klog.Errorf(errListAndWatch, e.resourceName, err)
return
}
devs := response.Devices
klog.V(2).Infof("State pushed for device plugin %s", e.resourceName)
var newDevs []pluginapi.Device
for _, d := range devs {
newDevs = append(newDevs, *d)
}
e.callback(e.resourceName, newDevs)
}
}
- Allocate 方法: 在为 container 分配设备时,Device Manager 为 container 分配设别后会调用 Allocate 方法请求 device plugin 提供需要 mount 的设备和添加的 ENV。
设备的注册
如上所述,设备注册是 Device Plugin 发起 RPC 调用,请求 Device Manager 的 Register 接口,完成 Device Plugin 的注册。
// Register registers a device plugin.
func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
klog.Infof("Got registration request from device plugin with resource name %q", r.ResourceName)
metrics.DevicePluginRegistrationCount.WithLabelValues(r.ResourceName).Inc()
var versionCompatible bool
for _, v := range pluginapi.SupportedVersions {
if r.Version == v {
versionCompatible = true
break
}
}
if !versionCompatible {
errorString := fmt.Sprintf(errUnsupportedVersion, r.Version, pluginapi.SupportedVersions)
klog.Infof("Bad registration request from device plugin with resource name %q: %s", r.ResourceName, errorString)
return &pluginapi.Empty{}, fmt.Errorf(errorString)
}
if !v1helper.IsExtendedResourceName(v1.ResourceName(r.ResourceName)) {
errorString := fmt.Sprintf(errInvalidResourceName, r.ResourceName)
klog.Infof("Bad registration request from device plugin: %s", errorString)
return &pluginapi.Empty{}, fmt.Errorf(errorString)
}
// TODO: for now, always accepts newest device plugin. Later may consider to
// add some policies here, e.g., verify whether an old device plugin with the
// same resource name is still alive to determine whether we want to accept
// the new registration.
go m.addEndpoint(r)
return &pluginapi.Empty{}, nil
}
如上代码所示主要注册的流程:
- 首先需要校验 Device Plugin 使用的 API Version 是否支持。
- 然后校验 Device Plugin 注册的设备或资源名称是否满足 kubernetes 要求的扩展资源名称定义的规范。*
- 最后会根据 Device Plugin 注册的信息,实例一个 Endpoint(一个 Endpoint 代表一类资源的 Device Plugin),然后启动 Endpoint。Endpoint 的启动后的工作见上面介绍的 RPC Server 的 ListAndWatch 接口。
设备分配
Device Manager 为容器分配设备分两种场景:
- 没有开启 Topology Manager 功能
- 开启 Topology Manager 功能
在 kubelet 没有开启 Topology Manager 功能的场景下,Device Manager 使用 Allocate()接口为容器分配设备。
- Device Manager 通过 Allocate 接口分配设备要求,pod 申请的扩展资源的 request 等于 limit。因此扩展资源是无法做超卖。
- Device Manager 通过 Allocate 接口分配设备时,init container 和 pod 中其他的 container 是可以共用同一个设备的。
- Device Manager 通过 Allocate 接口分配设备时,会尽量先给容器分配在同一个 numa 节点上的设备,如果无法在同一个 numa 节点上分配,才会出现设备跨 numa 节点的情况。同时 Device Manager 也会调用 Device Plugin 的 GetPreferredAllocation 接口,请求 Device Plugin 给出一个最优分配方案。Device Manager 综合 Device Plugin 和 numa 节点的方案,给出一个最优的分配方案。
- Device Manager 成功分配出设备后,调用 Device Plugin 的 Allocate 方法通知 Device Plugin 最后将哪些设备分配给该容器,同时 Device Plugin 也会返回容器需要 mount 的设备和添加的 ENV。
- 以上所有操作完成后,Device Manager 更新内存和 checkpoint 文件,标示设备已经被分配。
如上所述,Device Manager 在成功分配出设备后,调用 Device Plugin 的 Allocate 方法,Device Plugin 回容器需要 mount 的设备和添加的 ENV。Device Manager 在接收到这些响应只是简单的存储。当 container Manager 创建容器时,生成容器的 cgroup config 的时候,container Manager 回再次调用 Device Manager 的 GetDeviceRunContainerOptions 接口,Device Manager 会调用 Device Plugin 的 PreStartContainer 接口,重新获取需要容器需要挂载的目录设别和添加的 Env 等配置。
关于开启 Topology Manager 功能时,Device Manager 的分配方案可以查看 GetTopologyHints 接口。其原理和之前的 CPU 分配方法相似。
设备回收
设备分配出去之后,当 Pod 从 node 上删除之后,分配给 Pod 的设备需要回收。Device Manager 并没有在 container manager 的的 stop container 阶段注入 hook。而是在 Allocate 接口中添加了一段前置逻辑。即每次为 container 分配设备前,都会做一次回收操作,通过 UpdateAllocatedDevices 方法回收 Device。
func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.String) error {
podUID := string(pod.UID)
contName := container.Name
allocatedDevicesUpdated := false
needsUpdateCheckpoint := false
// Extended resources are not allowed to be overcommitted.
// Since device plugin advertises extended resources,
// therefore Requests must be equal to Limits and iterating
// over the Limits should be sufficient.
for k, v := range container.Resources.Limits {
//-------省略不重要代码-------
// Updates allocatedDevices to garbage collect any stranded resources
// before doing the device plugin allocation.
if !allocatedDevicesUpdated {
m.UpdateAllocatedDevices()
allocatedDevicesUpdated = true
}
}
}
UpdateAllocatedDevices 方法会删除已经从 node 上移除的 pod 的 device 的分配记录。这样做的优势:设备分配会按照 pod 分配,当 pod 上的 container 重启,不会触发重新分配。例如:GPU 服务预测场景下,通常需要将模型加载到显存中。container 重启之后,不重新分配设备,就可以减少模型加载时间,提升服务启动的速度。
// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
func (m *ManagerImpl) UpdateAllocatedDevices() {
activePods := m.activePods()
if !m.sourcesReady.AllReady() {
return
}
m.mutex.Lock()
defer m.mutex.Unlock()
podsToBeRemoved := m.podDevices.pods()
for _, pod := range activePods {
podsToBeRemoved.Delete(string(pod.UID))
}
if len(podsToBeRemoved) <= 0 {
return
}
klog.V(3).Infof("pods to be removed: %v", podsToBeRemoved.List())
m.podDevices.delete(podsToBeRemoved.List())
// Regenerated allocatedDevices after we update pod allocation information.
m.allocatedDevices = m.podDevices.devices()
}
参考资料
-
No backlinks found.