Yarn 能够动态申请资源,如 MapReduce 中 reduce 的 container 会在 map 过程结束后申请。但 Spark On YARN 的机制为申请固定的 executor,而不动态改变已申请的资源。

Yarn 上新运行的任务能够使用已运行任务回收的资源(如运行完 Map task 的 container),甚至还能够通过强行结束先前任务的 container 抢占资源。

基础架构

YARN 提供了 CapacityScheduler, FairScheduler, FifoScheduler 三个调度器,继承于 AbstractYarnScheduler,Resource Manager 通过调度器决定对提交 application 分配的资源大小。

CapacityScheduler 首先将所有资源分配到 hierarchical queue 中,每个任务执行时指定对应的 queue,使大任务不会占用整个集群的资源,通过对 queue 的资源管理提高整个集群的资源共享能力。通常会使小任务执行更快,大任务更慢。

Fair Scheduler 会在第一个任务运行时分配当前同级队列的所有资源,当有其它任务运行时,回收前面任务运行时的部分资源(一般为运行完成的 Container)用于其它任务。

至于 FIFO,源码里都没有描述,可能就是一般的先进先出了。

YARN 默认使用 CapacityScheduler,通过下面的属性配置:

1
2
3
4
<property>
  <name>yarn.resourcemanager.scheduler.class</name>
  <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
</property>

资源抽象

YARN 在 cpu,memory 这两个资源维度对集群资源做了抽象。

1
2
3
4
class Resource{
  int cpu;   //cpu核心个数
  int memory-mb; //内存的MB数
}

作业向 YARN 申请资源的请求是:List[ResourceRequest]

1
2
3
4
class ResourceRequest{
  int numContainers; //需要的container个数
  Resource capability;//每个container的资源
}

YARN 对作业响应是:List[Container]

1
2
3
4
5
class Container{
  ContainerId containerId; //YARN全局唯一的container标示
  Resource capability;  //该container的资源信息
  String nodeHttpAddress; //该container可以启动的NodeManager的hostname
}

YARN 调度架构

YARN调度器
YARN调度器

名词解释

  • ResourceScheduler 是 YARN 的调度器,负责 Container 的分配。
  • AsyncDispatcher 是单线程的事件分发器,负责向调度器发送调度事件。
  • ResourceTrackerService 是资源跟踪服务,主要负责接收处理 NodeManager 的心跳信息。
  • ApplicationMasterService 是作业的 RPC 服务,主要负责接收处理作业的心跳信息。
  • AppMaster 是作业的程序控制器,负责跟 YARN 交互获取/释放资源。

调度流程

  1. 作业资源申请过程:AppMaster 通过心跳告知 YARN 资源需求 List[ResourceRequest],并取回上次心跳之后,调度器已经分配好的资源 List[Container]
  2. 调度器分配资源流程是:Nodemanager 心跳触发调度器为该 NodeManager 分配 Container。

资源申请和分配是异步进行的。ResourceScheduler 是抽象类,需要自行实现。社区实现了公平调度器(FairScheduler)和容量调度器(CapacityScheduler)。

FIFO Scheduler

FIFO 是 Hadoop 设计之初提供的一个最简单的调度机制: 即先来先服务。所有应用程序被统一提交到一个队里中,Hadoop 按照提交顺序依次运行这些作业。只有等先来的应用程序资源满足后,再开始为下一个应用程序进行调度运行和分配资源。

  • 优点:原理是和实现简单。也不需要任何单独的配置
  • 缺点:
    • 无法提供 QoS,只能对所有的任务按照同一优先级处理。
    • 无法适应多租户资源管理。先来的大应用程序把集群资源占满,导致其他用户的程序无法得到及时执行。
    • 应用程序并发运行程度低。

Capacity Scheduler

Capacity Scheduler 容量调度是 Yahoo! 开发的多用户调度器。它以队列为单位划分资源。每个队列可设定一定比例的资源最低保证和使用上限。每个用户也可设置一定的资源使用上限,以防资源滥用。并支持资源共享,将队列剩余资源共享给其他队列使用。配置文件名称为 capacity-scheduler.xml。

主要特点:

  • 容量保证:管理员为每个队列设置资源最低保证(capacity)和资源使用上限(maximum-capacity,默认 100%),而所有提交到该队列的应用程序可以共享这个队列中的资源。
  • 弹性调度:如果队列中的资源有剩余或者空闲,可以暂时共享给那些需要资源的队列,而一旦该队列有新的应用程序需要资源运行,则其他队列释放的资源会归还给该队列(非强制回收),从而实现弹性灵活分配调度资源,提高系统资源利用率。
  • 多租户管理:支持多用户共享集群资源和多应用程序同时运行。且可对每个用户可使用资源量(user-limit-factor)设置上限。
  • 安全隔离:每个队列设置严格的 ACL 列表(acl_submit_applications),用以限制可以用户或者用户组可以在该队列提交应用程序。实现计算资源隔离。

Capacity Scheduler
Capacity Scheduler

应用程序提交过程

合法性检查:

  • 应用程序所属用户拥有该子队列的应用程序提交权限(ACL)
  • 该队列及其父队列当前处于 RUNNING 状态
  • 队列当前已提交应用程序数目未达到设定上限(yarn.scheduler.capacity.queue-name.maximum-applications,默认 1000)
  • 集群同时处于运行和等待状态的应用程序数量未达到上限(yarn.scheduler.capacity.maximum-applications,默认 1000)

资源分配

应用程序对应的 AppMaster 进程会为它申请资源。YARN 采用了三级资源分配策略。当一个 NodeManager 节点有空闲资源时,它会上报给 RM 节点。由 RM 节点的 ResourceScheduler 组件分配该空闲资源。分配顺序是:依次选用队列、应用程序和 Container。

  • 选择队列。

    所有的队列都是 root 根队列的子队列。因此在选择队列时,从 ROOT 开始,基于优先级深度优先遍历算法。具体的优先级可以是资源使用率(已使用的资源量/队列资源容量,对于非叶子队列,它的已使用资源量是各个子队列已使用资源量之和)由小到大排序。

  • 选择应用程序

    在上一步中选中一个应用程序后,Capacity Scheduler 按照 FIFO 方式排序,分配该资源给有申请 Container 资源的第一个应用程序。

  • 选择 Container 请求

    对于同一个应用程序,它请求的 Container 可能是多样化的,涉及不同的优先级,节点,资源量和数量。当选中一个应用程序后,Capacity Scheduler 将尝试优先满足优先级高的 Container

为什么叫容量调度?

Capacity Scheduler 资源调度算法如何体现是容量调度?

  • 队列资源采用容量占比的方式进行分配。
  • 队列间的资源分配算法也是采用最小资源使用率。
  • 每个用户的资源限制是资源量占比。

作业的组织方式

在公平调度器中,作业(App)是挂载如下图的树形队列的叶子。

队列结构
队列结构

核心调度流程

核心调度流程
核心调度流程

  1. 调度器锁住 FairScheduler 对象,避免核心数据结构冲突。
  2. 调度器选取集群的一个节点(node),从树形队列的根节点 ROOT 开始出发,每层队列都会按照公平策略选择一个子队列,最后在叶子队列按照公平策略选择一个 App,为这个 App 在 node 上找一块适配的资源。

对于每层队列进行如下流程:

  1. 队列预先检查:检查队列的资源使用量是否已经超过了队列的 Quota
  2. 排序子队列/App:按照公平调度策略,对子队列/App 进行排序
  3. 递归调度子队列/App

例如,某次调度的路径是 ROOT -> ParentQueueA -> LeafQueueA1 -> App11,这次调度会从 node 上给 App11 分配 Container。

伪代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
class FairScheduler{
  /* input:NodeId
   *  output:Resource 表示分配出来的某个app的一个container的资源量
   *  root 是树形队列Queue的根
   */  synchronized Resource attemptScheduling(NodeId node){    root.assignContainer(NodeId);  }}

class Queue{
  Resource assignContainer(NodeId node){    if(! preCheck(node) ) return;  //预先检查
      sort(this.children);  //排序
    if(this.isParent){      for(Queue q: this.children)        q.assignContainer(node);  //递归调用
    }else{      for(App app: this.runnableApps)        app.assignContainer(node);    }  }}

class App{
  Resource assignContainer(NodeId node){    ......  }}

公平调度器架构

公平调度器是一个多线程异步协作的架构,而为了保证调度过程中数据的一致性,在主要的流程中加入了 FairScheduler 对象锁。其中核心调度流程是单线程执行的。这意味着 Container 分配是串行的,这是调度器存在性能瓶颈的核心原因。

公平调度器架构
公平调度器架构

  • scheduler Lock:FairScheduler 对象锁
  • AllocationFileLoaderService:负责公平策略配置文件的热加载,更新队列数据结构
  • Continuous Scheduling Thread:核心调度线程,不停地执行上节的核心调度流程
  • Update Thread:更新队列资源需求,执行 Container 抢占流程等
  • Scheduler Event Dispatcher Thread: 调度器事件的处理器,处理 App 新增,App 结束,node 新增,node 移除等事件

Fair Scheduler

Fair Scheduler 是 Facebook 开发的多用户调度器。设计目标是为所有的应用分配公平的资源(对公平的定义可以通过参数来设置)。公平不仅可以在队列中的应用体现,也可以在多个队列之间工作。

举个例子,假设有两个用户 A 和 B,他们分别拥有一个队列,且分别设置容量最小为集群的一半,最大为全部集群资源。当 A 启动一个 job 而 B 没有任务时,A 会获得全部集群资源;当 B 启动一个 job 后,A 的 job 会继续运行,不过一会儿之后两个任务会各自获得一半的集群资源。如果此时 B 再启动第二个 job 并且其它 job 还在运行,则它将会和 B 的第一个 job 共享 B 这个队列的资源,也就是 B 的两个 job 会用于四分之一的集群资源,而 A 的 job 仍然用于集群一半的资源,结果就是资源最终在两个用户之间平等的共享。

与 Capacity Scheduler 不同之处:

  • 资源公平共享:每个队列中,Fair Scheduler 可选择 FIFO、Fair 或者 DRF 策略为应用程序分配资源。其中,Fair 策略是一种基于最大最小公平算法(内存资源使用率比率)实现的资源多路复用方式,默认情况下,每个队列内部采用该方式分配资源。schedulingPolicy 设置队列内部调度策略。如果是非叶子队列,该调度策略为队列间调度策略,如果没有设置,则采用 defaultQueueSchedulingPolicy 策略。
  • 支持资源抢占。队列空闲资源被共享给其他队列后,如果再提交用户程序,需要计算资源,调度器需要为它回收资源。为了尽可能降低不必要的计算浪费,调度器采用了先等待再强制回收的策略。如果等待一段时间后尚有未归还的资源,则会进行资源抢占:从超额使用资源的队列中杀死一部分任务,进而释放资源。
  • 负载均衡:Fair Scheduler 尽可能把系统中的任务均匀分配到各个节点上。此外用户也可以根据自己的需求设计负载均衡机制。
  • 调度策略配置灵活: 每个队列单独设置调度策略(FIFO、Fair 或 DRF)。

参考资料