Horovod 可以说是 RingAllReduce 数据并行训练框架方面的 State-of-art 了,不过最近还有一个工作,同样受到了很多的关注,那就是字节跳动的 BytePS[28],甚至发了 SOSP(羡慕到变形!)。论文在此处[29]可预览。不过论文和实现有挺大不同的,这里我们以开源实现为准,介绍一下 byteps

BytePS 的代码目录结构跟 Horovod 很像,在 TensorFlow 的支持上,做法与百度和 Horovod 并无二致。对 Optimizer[30] 进行了包装,实现了自定义的 Op。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
            def push_pull_grads(grads):
                with tf.name_scope(self._name + "_Push_Pull") as scope:
                    if self._sparse_as_dense:
                        grads = [tf.convert_to_tensor(grad)
                                if grad is not None and isinstance(grad, tf.IndexedSlices)
                                else grad for grad in grads]

                    return [push_pull(grad, scope,
                                    device_dense=self._device_dense,
                                    device_sparse=self._device_sparse,
                                    compression=self._compression,
                                    enable_async=self._enable_async)
                            if grad is not None else grad
                            for grad in grads]

            if _executing_eagerly():
                self._push_pull_grads = tf.contrib.eager.defun(push_pull_grads)
            else:
                self._push_pull_grads = push_pull_grads

        def compute_gradients(self, *args, **kwargs):
            """Compute gradients of all trainable variables.
            See Optimizer.compute_gradients() for more info.
            In DistributedOptimizer, compute_gradients() is overriden to also
            push_pull the gradients before returning them.
            """
            gradients = self._optimizer.compute_gradients(*args, **kwargs)
            if size() > 1 and not self._enable_async:
                grads, vars = zip(*gradients)
                avg_grads = self._push_pull_grads(grads)
                return list(zip(avg_grads, vars))
            else:
                return gradients

在 Horovod 里,C++ 的 Op 会把请求入队到全局的队列中,被后台进程中。而在 BytePS 里,逻辑也类似。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
  void ComputeAsync(::tensorflow::OpKernelContext* context,
                    DoneCallback done) override {
    ...
    if (bps_context.initialized) {
      StartTask(context, done, tmp_name, bps_input, bps_output, ready_event);
    } else {
      std::thread t(StartTask, context, done, tmp_name, bps_input, bps_output,
                    ready_event);
      t.detach();
    }
  }

void StartTask(::tensorflow::OpKernelContext* context,
               ::tensorflow::AsyncOpKernel::DoneCallback done,
               std::string node_name, std::shared_ptr<TFTensor> byteps_input,
               std::shared_ptr<TFTensor> byteps_output,
               std::shared_ptr<common::ReadyEvent> ready_event) {
  ...
  auto queue_list = common::GetPushQueueList(device);
  auto queue_list_pull = common::GetPullQueueList(device);
  queue_list->insert(queue_list->end(), queue_list_pull->begin(),
                     queue_list_pull->end());

  // TODO: assign priority based on topological sort
  auto enqueue_result =
      EnqueueTensor(byteps_context, byteps_input, byteps_output, ready_event,
                    device, -byteps_context.declared_key, 0,
                    [context, done](const common::Status& status "context, done") {
                      context->SetStatus(ConvertStatus(status));
                      done();
                    },
                    queue_list);
  OP_REQUIRES_OK_ASYNC(context, ConvertStatus(enqueue_result), done);
}

代码注释[31]里写到需要给 Tensor 根据拓扑序设定优先级,这个是在 BytePS 的论文中提到的一个非常重要的优化,看代码这部分的逻辑应该已经实现了,具体可以见这里的讨论[32]。至于这里的注释是什么意思,还需要问一下上游才能确定。

最终请求会在 Partition 后入队。EnqueueTensor[33] 与 Horvod 虽然类似,但是它会划分 Partition,默认是 4096000 字节一个 Task,这个优化在论文中也有提到,不过在开源实现中没有找寻到论文中提到的基于贝叶斯优化的 AutoTune 的痕迹。

跟 Horovod 相比,还有一个比较大的不同。BytePS 为了能够流水线地处理 Push 和 Pull,引入了 QueueType 这个概念。上述代码中的 queue_list 就是将为了处理 Push 和 Pull 的不同事件组成了一个事件队列。后续 BytePS 会按照这一队列依次处理事件,处理完里面的所有事件后,就完成了 PushPullGradients 的过程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Status EnqueueTensor(BPSContext &context, std::shared_ptr<Tensor> input,
                     std::shared_ptr<Tensor> output,
                     std::shared_ptr<ReadyEvent> ready_event, const int device,
                     const int priority, const int version,
                     StatusCallback callback,
                     std::shared_ptr<std::vector<QueueType>> queue_list) {
  ...
  std::vector<std::shared_ptr<TensorTableEntry>> partitions;
  PartitionTensor(e, partitions);
  ...
  unsigned int accumulated = 0;
  for (size_t i = 0; i < partitions.size(); ++i) {
    auto task = partitions[i];
    task->key = context.key_list[i];  // assign the key now
    BPS_CHECK(task->tensor_name != "");
    BPS_LOG(TRACE) << "EnqueueTensor: " << (task->tensor_name)
                   << ", key=" << (task->key) << ", offset=" << (task->offset)
                   << ", len=" << (task->len) << ", device=" << (task->device)
                   << " rank=" << BytePSGlobal::GetLocalRank();

    BytePSGlobal::GetScheduledQueue(e->queue_list[0])->addTask(task);
    accumulated += task->len;
  }

  auto tensor = (e->tensor ? e->tensor : e->output);
  BPS_CHECK(tensor);
  BPS_CHECK_EQ(accumulated, tensor->size())
      << "accumulated partition size not equal to original tensor size";

  BPS_LOG(TRACE) << "EnqueueTensor finished: " << name
                 << ", rank=" << BytePSGlobal::GetLocalRank();
  return Status::OK();
}

BytePS 的 PS 部分是利用 dmlc/ps-lite[34] 实现的,dmlc/ps-lite[35] 也被用于 MXNet,因此 BytePS 的分布式训练中也有三个角色,Server,Worker 和 Scheduler。其中的 Server 并不是传统意义上的 Parameter Server,而是一个具备一定的计算能力和 KV 存储能力的,只使用 CPU 的普通 Server。为了加法做的足够好,Server 这边对加法操作也有一个抽象,那就是 CPUReducer[36]。从这个角度来理解,BytePS 是采用了 Server-Worker 这种通信的模型实现了 AllReduce 的语义,并不是传统意义上的 PS。从这样的设计来讲,确实可以通过 Tensor 的分区分段把流水线跑起来。就像知乎老师木的回答[37]一样。不过我对老师木说的这是把 ReduceScatter 和 AllGather 流水起来还是不太理解。

总体来看,自从百度的 RingAllReduce 以来,后续越来越多的工作是关注在怎么样能够把计算和通信重叠起来,通过类似于流水线的方式隐藏掉一部分的成本。这里我很赞同 BytePS 在文档里的两点观察:

  • Cloud, either public or private, is different from HPC. Using ideas from HPC is a shortcut, but not optimal.
  • In a (public or private) cloud, PS architecture is theoretically better than allreduce, with minimal additional costs.

AllReduce 来自 HPC,如果在真实的集群环境里不能做到机架感知,会带来一定的影响。BytePS 同样对调度提出了新要求,但这种拿 CPU 和一点点的网络换训练速度的事情,非常值得一试。

参考文献

  • 分布式训练的方案和效率对比[38]

[28]BytePS: https://github.com/bytedance/byteps [29]此处: * https://i.cs.hku.hk/~cwu/papers/yhpeng-sosp19.pdf [30]Optimizer: https://github.com/bytedance/byteps/blob/948c774c30f520d8c9e36931f257da2eda386a48/byteps/tensorflow/init.py#L185 [31]代码注释: https://github.com/bytedance/byteps/blob/948c774c30f520d8c9e36931f257da2eda386a48/byteps/tensorflow/ops.cc#L155 [32]这里的讨论: https://github.com/dyweb/papers-notebook/issues/177#issuecomment-656026592 [33]EnqueueTensor: https://github.com/bytedance/byteps/blob/948c774c30f520d8c9e36931f257da2eda386a48/byteps/common/operations.cc#L163 [34]dmlc/ps-lite: https://github.com/dmlc/ps-lite/[35]dmlc/ps-lite: https://github.com/dmlc/ps-lite/[36]CPUReducer: https://github.com/bytedance/incubator-mxnet/pull/4/files [37]回答: https://www.zhihu.com/question/331936923/answer/732262268 [38]分布式训练的方案和效率对比: https://zhuanlan.zhihu.com/p/50116885