摘要: kubernetes 中定义 node 的资源默认只包括:CPU、Memory、Pods、hugepages、ephemeral-storage 等。但实际上服务器上的设备不仅仅只包括这些。并且随着机器学习,人工智能的发展,服务器上越来越多的插入支持并行计算的设备,例如 GPU、FPGA 和一些厂商自研的芯片。为解决更多设备如何对 kubernetes 暴露并使用,并能和 kubernetes 解耦提升 kubernetes 代码的稳定性,提出了 Device Manager Proposal.

Device Manager 初始化

Image
Image

Device Manager 时序图

如上图所示,Device Manager 的初始化和启动是跟随 Container Manager 完成的。

// NewManagerImpl creates a new manager.
func NewManagerImpl(topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
 return newManagerImpl(pluginapi.KubeletSocket, topology, topologyAffinityStore)
}

Device Manager 初始化时,需要提供节点的 numa 拓扑结构(topology)和拓扑关系存储查询接口(topologyAffinityStore),以及 Device Manager RPC Server 的 unix socket 接口的位置(pluginapi.KubeletSocket)。

  • numa 拓扑结构,统计节点的 numa 节点个数,为容器分配设备时提供 Topology hints。
  • 拓扑关系存储查询接口,为容器分配设备时,检查是否已经做过 Topology hints。
  • Device Manager RPC Server 的 unix socket 接口的位置目前是硬编码到代码中的,位置是:/var/lib/kubelet/device-plugins/kubelet.sock
// Start starts the Device Plugin Manager and start initialization of
// podDevices and allocatedDevices information from checkpointed state and
// starts device plugin registration service.
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
    //--------省略部分不重要代码--------

 // Loads in allocatedDevices information from disk.
 err := m.readCheckpoint()
 if err != nil {
  klog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
 }

    //--------省略部分不重要代码--------

 s, err := net.Listen("unix", socketPath)
 if err != nil {
  klog.Errorf(errListenSocket+" %v", err)
  return err
 }

 m.wg.Add(1)
 m.server = grpc.NewServer([]grpc.ServerOption{}...)

 pluginapi.RegisterRegistrationServer(m.server, m)
 go func() {
  defer m.wg.Done()
  m.server.Serve(s)
 }()

 klog.V(2).Infof("Serving device plugin registration server on %q", socketPath)

 return nil
}

Device Manager 启动的时候,首先会加载 checkpoint 文件中内容,然后启动一个 RPC Server。

checkpoint 文件

checkpoint 文件记录了 Device Manager 模块注册的资源列表和 Device Manager 为每个 container 分配的资源信息。

// Data holds checkpoint data and its checksum
type Data struct {
 Data     checkpointData
 Checksum checksum.Checksum
}
// checkpointData struct is used to store pod to device allocation information
// in a checkpoint file.
// TODO: add version control when we need to change checkpoint format.
type checkpointData struct {
    //PodDeviceEntries 记录为pod中container分配的资源信息。以GPU设备为例会记录如下数据: PodUID->container name -> resource name -> device ID等信息
 PodDeviceEntries  []PodDevicesEntry
    //RegisteredDevices 记录注册的Device
 RegisteredDevices map[string][]string
}

checkpoint 文件位置: /var/lib/kubelet/device-plugins/kubelet_internal_checkpoint。Device Manager 在以下情况下都会更新 checkpoint 文件的内容:

  • 成功为容器分配 device
  • device plugin 通过 ListAndWatch 接口更新 device 状态
  • kubelet 定期更新 node status 时,注册的 device 的数量或者状态发生变化

Device Manager 模块为什么要记录 checkpoint 文件?主要是在 Device Manager 重启的时候可以重新加载历史数据。如果不加载历史数据,Device Manager 在为容器分配设备时,会认为所有的设备都没有分配。这样当有新的 pod 调度到节点上,申请设备的时候,会出现设备重新分配,多个容器共用同一个设备的问题。因此当您出现多个容器共用同一个设备的问题时,你可以排查一下几个方面:

  • kubelet_internal_checkpoint 文件是否丢失
  • kubelet_internal_checkpoint 文件是否出现版本兼容问题

当出现这个问题之后如何修复呢?答案是:把节点上的容器全部重启一下。或者你自己生成正确的 kubelet_internal_checkpoint 文件,重启 kubelet。

RPC Server

Image
Image

Device Manger 与 Device Plugin RPC 通信

service Registration {
 rpc Register(RegisterRequest) returns (Empty) {}
}

message RegisterRequest {
 // Version of the API the Device Plugin was built against
 string version = 1;
 // Name of the unix socket the device plugin is listening on
 // PATH = path.Join(DevicePluginPath, endpoint)
 string endpoint = 2;
 // Schedulable resource name. As of now it's expected to be a DNS Label
 string resource_name = 3;
 // Options to be communicated with Device Manager
 DevicePluginOptions options = 4;
}

如上是 Device Manager 和 Device Plugin RPC 通信图。Device Manager 的 RPC Server 只包含一个接口 Register。提供给 Device Plugin 做插件的服务发现。Device Plugin 也需提供一个 RPC Server,并提供两个接口 ListAndWatch、Allocate。

当 Device Plugin 启动时,需要对 Device Manager 公开的 Register 接口进行(客户端)gRPC 调用。Device Plugin 向 Device Manager 发送 RegisterRequest(通过 gRPC 请求)。Device Manager 成功响应请求后,Device Pluging 应立即启动自己的 RPC Server 服务,对外提供 ListAndWatch 和 Allocate 接口。

  • ListAndWatch 接口,当 Device Plugin 注册成功后,Device Manager 会调用 Device Plugin 的 ListAndWatch 接口建立长链接,阻塞等待接受 Device Plugin 传送过来的设备状态。
func (e *endpointImpl) run() {
    //调用Device Plugin的ListAndWatch接口
 stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{})
 if err != nil {
  klog.Errorf(errListAndWatch, e.resourceName, err)

  return
 }

    // 阻塞循环接受Device Plugin的response
 for {
  response, err := stream.Recv()
  if err != nil {
   klog.Errorf(errListAndWatch, e.resourceName, err)
   return
  }

  devs := response.Devices
  klog.V(2).Infof("State pushed for device plugin %s", e.resourceName)

  var newDevs []pluginapi.Device
  for _, d := range devs {
   newDevs = append(newDevs, *d)
  }

  e.callback(e.resourceName, newDevs)
 }
}
  • Allocate 方法: 在为 container 分配设备时,Device Manager 为 container 分配设别后会调用 Allocate 方法请求 device plugin 提供需要 mount 的设备和添加的 ENV。

设备的注册

如上所述,设备注册是 Device Plugin 发起 RPC 调用,请求 Device Manager 的 Register 接口,完成 Device Plugin 的注册。

// Register registers a device plugin.
func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
 klog.Infof("Got registration request from device plugin with resource name %q", r.ResourceName)
 metrics.DevicePluginRegistrationCount.WithLabelValues(r.ResourceName).Inc()
 var versionCompatible bool
 for _, v := range pluginapi.SupportedVersions {
  if r.Version == v {
   versionCompatible = true
   break
  }
 }
 if !versionCompatible {
  errorString := fmt.Sprintf(errUnsupportedVersion, r.Version, pluginapi.SupportedVersions)
  klog.Infof("Bad registration request from device plugin with resource name %q: %s", r.ResourceName, errorString)
  return &pluginapi.Empty{}, fmt.Errorf(errorString)
 }

 if !v1helper.IsExtendedResourceName(v1.ResourceName(r.ResourceName)) {
  errorString := fmt.Sprintf(errInvalidResourceName, r.ResourceName)
  klog.Infof("Bad registration request from device plugin: %s", errorString)
  return &pluginapi.Empty{}, fmt.Errorf(errorString)
 }

 // TODO: for now, always accepts newest device plugin. Later may consider to
 // add some policies here, e.g., verify whether an old device plugin with the
 // same resource name is still alive to determine whether we want to accept
 // the new registration.
 go m.addEndpoint(r)

 return &pluginapi.Empty{}, nil
}

如上代码所示主要注册的流程:

  • 首先需要校验 Device Plugin 使用的 API Version 是否支持。
  • 然后校验 Device Plugin 注册的设备或资源名称是否满足 kubernetes 要求的扩展资源名称定义的规范。*
  • 最后会根据 Device Plugin 注册的信息,实例一个 Endpoint(一个 Endpoint 代表一类资源的 Device Plugin),然后启动 Endpoint。Endpoint 的启动后的工作见上面介绍的 RPC Server 的 ListAndWatch 接口。

设备分配

Device Manager 为容器分配设备分两种场景:

  • 没有开启 Topology Manager 功能
  • 开启 Topology Manager 功能

在 kubelet 没有开启 Topology Manager 功能的场景下,Device Manager 使用 Allocate()接口为容器分配设备。

  • Device Manager 通过 Allocate 接口分配设备要求,pod 申请的扩展资源的 request 等于 limit。因此扩展资源是无法做超卖。
  • Device Manager 通过 Allocate 接口分配设备时,init container 和 pod 中其他的 container 是可以共用同一个设备的。
  • Device Manager 通过 Allocate 接口分配设备时,会尽量先给容器分配在同一个 numa 节点上的设备,如果无法在同一个 numa 节点上分配,才会出现设备跨 numa 节点的情况。同时 Device Manager 也会调用 Device Plugin 的 GetPreferredAllocation 接口,请求 Device Plugin 给出一个最优分配方案。Device Manager 综合 Device Plugin 和 numa 节点的方案,给出一个最优的分配方案。
  • Device Manager 成功分配出设备后,调用 Device Plugin 的 Allocate 方法通知 Device Plugin 最后将哪些设备分配给该容器,同时 Device Plugin 也会返回容器需要 mount 的设备和添加的 ENV。
  • 以上所有操作完成后,Device Manager 更新内存和 checkpoint 文件,标示设备已经被分配。

如上所述,Device Manager 在成功分配出设备后,调用 Device Plugin 的 Allocate 方法,Device Plugin 回容器需要 mount 的设备和添加的 ENV。Device Manager 在接收到这些响应只是简单的存储。当 container Manager 创建容器时,生成容器的 cgroup config 的时候,container Manager 回再次调用 Device Manager 的 GetDeviceRunContainerOptions 接口,Device Manager 会调用 Device Plugin 的 PreStartContainer 接口,重新获取需要容器需要挂载的目录设别和添加的 Env 等配置。

关于开启 Topology Manager 功能时,Device Manager 的分配方案可以查看 GetTopologyHints 接口。其原理和之前的 CPU 分配方法相似。

设备回收

设备分配出去之后,当 Pod 从 node 上删除之后,分配给 Pod 的设备需要回收。Device Manager 并没有在 container manager 的的 stop container 阶段注入 hook。而是在 Allocate 接口中添加了一段前置逻辑。即每次为 container 分配设备前,都会做一次回收操作,通过 UpdateAllocatedDevices 方法回收 Device。

func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.String) error {
 podUID := string(pod.UID)
 contName := container.Name
 allocatedDevicesUpdated := false
 needsUpdateCheckpoint := false
 // Extended resources are not allowed to be overcommitted.
 // Since device plugin advertises extended resources,
 // therefore Requests must be equal to Limits and iterating
 // over the Limits should be sufficient.
 for k, v := range container.Resources.Limits {
  //-------省略不重要代码-------
  // Updates allocatedDevices to garbage collect any stranded resources
  // before doing the device plugin allocation.
  if !allocatedDevicesUpdated {
   m.UpdateAllocatedDevices()
   allocatedDevicesUpdated = true
  }
 }
}

UpdateAllocatedDevices 方法会删除已经从 node 上移除的 pod 的 device 的分配记录。这样做的优势:设备分配会按照 pod 分配,当 pod 上的 container 重启,不会触发重新分配。例如:GPU 服务预测场景下,通常需要将模型加载到显存中。container 重启之后,不重新分配设备,就可以减少模型加载时间,提升服务启动的速度。

// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
func (m *ManagerImpl) UpdateAllocatedDevices() {
 activePods := m.activePods()
 if !m.sourcesReady.AllReady() {
  return
 }
 m.mutex.Lock()
 defer m.mutex.Unlock()
 podsToBeRemoved := m.podDevices.pods()
 for _, pod := range activePods {
  podsToBeRemoved.Delete(string(pod.UID))
 }
 if len(podsToBeRemoved) <= 0 {
  return
 }
 klog.V(3).Infof("pods to be removed: %v", podsToBeRemoved.List())
 m.podDevices.delete(podsToBeRemoved.List())
 // Regenerated allocatedDevices after we update pod allocation information.
 m.allocatedDevices = m.podDevices.devices()
}

参考资料