Horovod 是 Uber 于 2017 年发布的一个易于使用的高性能的分布式训练框架,支持 TensorFlow,Keras,PyTorch 和 MXNet。Horovod 的名字来自于俄国传统民间舞蹈,舞者手牵手围成一个圈跳舞,与分布式 TensorFlow 流程使用 Horovod 互相通信的场景很像。

因为各个机器学习框架对于底层集合通信库(NCCL,OpenMPI,Gloo 等等)的利用水平可能各不相同,使得他们无法充分利用这些底层集合通信库的威力。因而,hovorod 就整合这些框架,提供一个易用高效的解决方案。

Uber 的工程师就是根据 FaceBook 的一篇 paper:Accurate, Large Minibatch SGD: Training ImageNet in 1 Hour 1 和百度的一篇 Bringing HPC Techniques to Deep Learning 2 改进并发布了开源框架 Horovod。

Horovod 相比于百度的工作,并无学术上的贡献。但是 Horovod 扎实的工程实现,使得它受到了更多的关注。它最大的优势在于对 RingAllReduce 进行了更高层次的抽象,使其支持多种不同的框架。同时引入了 Nvidia NCCL,对 GPU 更加友好。

Horovod 依赖于 Nvidia 的 NCCL2 做 All Reduce,依赖于 MPI 做进程间通信,简化了同步多 GPU 或多节点分布式训练的开发流程。由于使用了 NCCL2,Horovod 也可以利用以下功能:NVLINK,RDMA,GPUDirectRDMA,自动检测通信拓扑,能够回退到 PCIe 和 TCP/IP 通信。

如何使用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import tensorflow as tf
import horovod.tensorflow as hvd

# Initialize Horovod
hvd.init()

# Pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto() config.gpu_options.visible_device_list = str(hvd.local_rank())

# Build model...
loss = ...
opt = tf.train.AdagradOptimizer(0.01)

# Add Horovod Distributed Optimizer
opt = hvd.DistributedOptimizer(opt)

# Add hook to broadcast variables from rank 0 to all other processes
# during initialization.
hooks = [hvd.BroadcastGlobalVariablesHook(0)]

# Make training operation
train_op = opt.minimize(loss)

# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing
# when done or an error occurs.
with tf.train.MonitoredTrainingSession(checkpoint_dir="/tmp/train_logs", config=config, hooks=hooks) as mon_sess:
    while not mon_sess.should_stop():
        # Perform synchronous
        training. mon_sess.run(train_op

下面的日志展示了双机十六卡创建 Ring 的过程,包括 worker-0worker-1,每个 worker 使用 8 张卡。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
root@tensorflow-mnist-worker-0:~# NP=16
root@tensorflow-mnist-gcy-worker-0:~# HOSTS=10.0.0.122:8,10.0.0.49:8
root@tensorflow-mnist-gcy-worker-0:~# /usr/mpi/gcc/openmpi-4.1.2rc2/bin/mpirun --allow-run-as-root --report-bindings \
>     -host $HOSTS -np $NP -npernode 8 --bind-to numa \
>     --mca btl_openib_want_cuda_gdr 1 -mca coll_fca_enable 0 \
>     -x NCCL_ALGO=RING \
>     -x NCCL_DEBUG=INFO \
>     -x NCCL_IB_DISABLE=0 \
>     -x NCCL_SOCKET_IFNAME=eth1 \
>     -x NCCL_IB_GID_INDEX=3 \
>     -x HOROVOD_MPI_THREADS_DISABLE=1 \
>     -mca pml ob1 \
>     -mca btl_tcp_if_include eth1 \
>     -mca btl ^openib \
>     python3.7 ./benchmarks/scripts/tf_cnn_benchmarks/tf_cnn_benchmarks.py \
>     --data_name=imagenet --num_gpus=1 --batch_size=64 \
>     --model=resnet50 --use_fp16 \
>     --all_reduce_spec="nccl/xring" --variable_update=horovod

实现原理

Horovod 相比于百度的工作,并无学术上的贡献。但是 Horovod 扎实的工程实现,使得它受到了更多的关注。它最大的优势在于对 RingAllReduce 进行了更高层次的抽象,使其支持多种不同的框架。同时引入了 Nvidia NCCL,对 GPU 更加友好。

与百度的实现类似,Horovod 也需要先进行初始化。只不过百度把这个过程放在了 Session 构建的时候,而 Horovod 提供了显式初始化的函数。在初始化的时候,Horovod 会调用 MPI_Comm_dup 获取一个 Communicator。之所以不直接使用默认的 MPI_COMM_WORLD,参考这里的文档[^9]:

While MPI_Comm_split is the most common communicator creation function, there are many others. MPI_Comm_dup is the most basic and creates a duplicate of a communicator. It may seem odd that there would exist a function that only creates a copy, but this is very useful for applications which use libraries to perform specialized functions, such as mathematical libraries. In these kinds of applications, it’s important that user codes and library codes do not interfere with each other. To avoid this, the first thing every application should do is to create a duplicate of MPI_COMM_WORLD, which will avoid the problem of other libraries also using MPI_COMM_WORLD. The libraries themselves should also make duplicates of MPI_COMM_WORLD to avoid the same problem.

除此之外,在初始化的时候,Horovod 还会创建一个后台线程。这里的后台线程的作用与百度的实现类似。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void horovod_init_comm(MPI_Comm comm) {
  MPI_Comm_dup(comm, &mpi_context.mpi_comm);
  InitializeHorovodOnce(nullptr, 0);
}
// Start Horovod background thread. Ensure that this is
// only done once no matter how many times this function is called.
void InitializeHorovodOnce(const int* ranks, int nranks) {
  // Ensure background thread is only started once.
  if (!horovod_global.initialize_flag.test_and_set()) {
    horovod_global.control_operation = ParseControllerOpsFromEnv();
    horovod_global.cpu_operation = ParseCPUOpsFromEnv();
#if HAVE_MPI
    // Enable mpi is it's used either in cpu data transfer or controller
    if (horovod_global.cpu_operation == LibType::MPI ||
        horovod_global.control_operation == LibType::MPI) {
      mpi_context.Enable();
    }

    if (horovod_global.control_operation == LibType::MPI){
      horovod_global.controller.reset(new MPIController(
          horovod_global.response_cache,
          horovod_global.tensor_queue, horovod_global.timeline,
          horovod_global.parameter_manager, mpi_context));
      horovod_global.controller->SetRanks(ranks, nranks);
    }
#endif
    // Reset initialization flag
    horovod_global.initialization_done = false;
    horovod_global.background_thread = std::thread(
        BackgroundThreadLoop, std::ref(horovod_global));
  }

  // Wait to ensure that the background thread has finished initializing MPI.
  while (!horovod_global.initialization_done) {
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
  }
  LOG(DEBUG) << "Background thread init done";
}

在这个后台线程的初始化过程中,它会利用进程内共享的全局状态在自己的内存里创建一些对象,以及一些逻辑判断。比如要不要进行 Hierarchical AllReduce,要不要 AutoTune(后面会详细介绍)等。这里是初始化阶段的日志。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
$ horovodrun -np 2 python hvd.py
[1,1]<stdout>:[2020-07-09 10:27:48.952760: D horovod/common/utils/env_parser.cc:106] Using MPI to perform controller operations.
[1,1]<stdout>:[2020-07-09 10:27:48.952813: D horovod/common/utils/env_parser.cc:72] Using MPI to perform CPU operations.
[1,1]<stdout>:[2020-07-09 10:27:48.952922: D horovod/common/mpi/mpi_context.h:46] MPI context enabled.
[1,1]<stdout>:[2020-07-09 10:27:48.952968: D horovod/common/mpi/mpi_controller.h:32] MPI Controller Initialized.
[1,0]<stdout>:[2020-07-09 10:27:49. 27002: D horovod/common/utils/env_parser.cc:106] Using MPI to perform controller operations.
[1,0]<stdout>:[2020-07-09 10:27:49. 27064: D horovod/common/utils/env_parser.cc:72] Using MPI to perform CPU operations.
[1,0]<stdout>:[2020-07-09 10:27:49. 27094: D horovod/common/mpi/mpi_context.h:46] MPI context enabled.
[1,0]<stdout>:[2020-07-09 10:27:49. 27118: D horovod/common/mpi/mpi_controller.h:32] MPI Controller Initialized.
[1,0]<stdout>:[2020-07-09 10:27:49. 88254: D horovod/common/mpi/mpi_context.cc:142] Using MPI_COMM_WORLD as a communicator.
[1,1]<stdout>:[2020-07-09 10:27:49. 88459: D horovod/common/mpi/mpi_context.cc:142] Using MPI_COMM_WORLD as a communicator.
[1,0]<stdout>:[2020-07-09 10:27:49. 88947: D horovod/common/mpi/mpi_controller.cc:39] Started Horovod with 2 processes
[1,0]<stdout>:[2020-07-09 10:27:49. 89143: D horovod/common/mpi/mpi_controller.cc:80] MPI controller initialized.
[1,0]<stdout>:[2020-07-09 10:27:49. 89195: I horovod/common/operations.cc:506] [0]: Horovod Initialized
[1,1]<stdout>:[2020-07-09 10:27:49. 89147: D horovod/common/mpi/mpi_controller.cc:80] MPI controller initialized.
[1,1]<stdout>:[2020-07-09 10:27:49. 89489: I horovod/common/operations.cc:506] [1]: Horovod Initialized
[1,0]<stdout>:[2020-07-09 10:27:49. 89945: D horovod/common/operations.cc:649] Background thread init done
[1,1]<stdout>:[2020-07-09 10:27:49. 91335: D horovod/common/operations.cc:649] Background thread init done

在初始化的过程中,有一些比较重要的对象会被构造出来。不过这里暂且按下不表,后续再介绍。在初始化好之后,我们利用下面的代码进行模型的训练:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@tf.function
def training_step(images, labels, first_batch):
    with tf.GradientTape() as tape:
        probs = mnist_model(images, training=True)
        loss_value = loss(labels, probs)

    # Horovod: add Horovod Distributed GradientTape.
    tape = hvd.DistributedGradientTape(tape)

    grads = tape.gradient(loss_value, mnist_model.trainable_variables)
    opt.apply_gradients(zip(grads, mnist_model.trainable_variables))

    # Horovod: broadcast initial variable states from rank 0 to all other processes.
    # This is necessary to ensure consistent initialization of all workers when
    # training is started with random weights or restored from a checkpoint.
    #
    # Note: broadcast should be done after the first gradient step to ensure optimizer
    # initialization.
    if first_batch:
        hvd.broadcast_variables(mnist_model.variables, root_rank=0)
        hvd.broadcast_variables(opt.variables(), root_rank=0)

    return loss_value

首先会利用 Bcast 来同步 Rank 0 进程的初始化参数给所有的进程,这里是为了保证初始参数一致。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def broadcast_variables(variables, root_rank):
    """Broadcasts variables from root rank to all other processes.

    Arguments:
        variables: variables for broadcast
        root_rank: rank of the process from which global variables will be broadcasted
                   to all other processes.
    """
    broadcast_group = _make_broadcast_group_fn()
    return broadcast_group(variables, root_rank)

由于我们是利用 TensorFlow 2 来进行训练。所以梯度更新部分的实现不是基于计算图的实现,而是使用 hvd.DistributedGradientTape3。它的实现如下所示,当调用 gradient 的时候,首先会调用 tf.GradientTape 的同名函数,同时进行 AllReduce。这里的逻辑与百度实现中的 Optimizer 是否似曾相识:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class _DistributedGradientTape(tf.GradientTape):
    def gradient(self, target, sources, output_gradients=None):
        gradients = super(self.__class__, self).gradient(target, sources, output_gradients)
        return self._allreduce_grads(gradients)

@_cache
def _make_allreduce_grads_fn(name, device_dense, device_sparse,
                             compression, sparse_as_dense, op):
    def allreduce_grads(grads):
        with tf.name_scope(name + "_Allreduce"):
            if sparse_as_dense:
                grads = [tf.convert_to_tensor(grad)
                         if grad is not None and isinstance(grad, tf.IndexedSlices)
                         else grad for grad in grads]

            return [_allreduce_cond(grad,
                                    device_dense=device_dense,
                                    device_sparse=device_sparse,
                                    compression=compression,
                                    op=op)
                    if grad is not None else grad
                    for grad in grads]

def _allreduce_cond(tensor, *args, **kwargs):
    def allreduce_fn():
        return allreduce(tensor, *args, **kwargs)

    def id_fn():
        return tensor

    return tf.cond(size_op() > 1, allreduce_fn, id_fn)

def _allreduce(tensor, name=None, op=Sum):
    """An op which reduces an input tensor over all the Horovod processes. The
    default reduction is a sum.

    The reduction operation is keyed by the name of the op. The tensor type and
    shape must be the same on all Horovod processes for a given name. The reduction
    will not start until all processes are ready to send and receive the tensor.

    Returns:
      A tensor of the same shape and type as `tensor`, summed across all
      processes.
    """
    if name is None and not _executing_eagerly():
        name = 'HorovodAllreduce_%s' % _normalize_name(tensor.name)
    return MPI_LIB.horovod_allreduce(tensor, name=name, reduce_op=op)

allreduce_grads4 会修改 name scope,添加后缀 _Allreduce。在后续的调用中,进行了一些复杂但不核心的逻辑,如压缩等。最后调用 _allreduce5。在这一函数中,会直接调用 C++ 实现的 Kernel。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
  void ComputeAsync(OpKernelContext* context, DoneCallback done) override {
    OP_REQUIRES_OK_ASYNC(context, ConvertStatus(common::CheckInitialized()),
                         done);
    ...
    auto enqueue_result = EnqueueTensorAllreduce(
        hvd_context, hvd_tensor, hvd_output, ready_event, node_name, device,
        [context, done](const common::Status& status "context, done") {
          context->SetStatus(ConvertStatus(status));
          done();
        }, reduce_op);
    ...
  }

ComputeAsync6 里,会把这一 AllReduce 的请求入队。可以看到,在 TensorFlow 支持的实现上,Horovod 与百度大同小异。都是自定义了 AllReduce Op,在 Op 中把请求入队。

所以在 Horovod 的日志中,我们可以看到这样的日志(当然要设置 HOROVOD_LOG_LEVEL=trace 环境变量)。DistributedGradientTape 的 name scope 被改写成了 DistributedGradientTapeAllreduce,名字被加上了 HorovodAllreduce 的前缀。

1
2
3
[1,1]<stdout>:[2020-07-09 10:27:56.839122: T horovod/common/operations.cc:849] [1]: Enqueued DistributedGradientTape_Allreduce/HorovodAllreduce_gradient_tape_sequential_dense_1_BiasAdd_BiasAddGrad_0
[1,1]<stdout>:[2020-07-09 10:27:56.839176: T horovod/common/operations.cc:849] [1]: Enqueued DistributedGradientTape_Allreduce/HorovodAllreduce_gradient_tape_sequential_dense_1_MatMul_1_0
[1,1]<stdout>:[2020-07-09 10:27:56.839280: T horovod/common/operations.cc:849] [1]: Enqueued DistributedGradientTape_Allreduce/HorovodAllreduce_gradient_tape_sequential_dense_BiasAdd_BiasAddGrad_0

EnqueueTensorAllreduce7 是进入了一个进程内共享的全局对象维护的一个队列中。之前提到的后台进程,会一直在执行一个循环 RunLoopOnce8。在其中,后台线程会利用 MPIController[^25] 来处理入队的请求。MPIController9 可以理解为是协调不同的 Rank 进程,处理请求的对象。这个抽象是百度所不具备的,主要是为了支持 Facebook gloo 等其他的集合计算库。因此 Horovod 也有 GlooController 等等实现。

在后台线程里,最重要的一个函数调用是 ComputeResponseList10。Horovod 也遵循着 Coordinator 的设计,与百度类似。无论是百度还是 Horovod 中的 Coordinator 都类似是 Actor 模式,主要起来协调多个进程工作的作用。在真正执行计算的时候,Horovod 同样引入了一个新的抽象 op_manager。从某种程度来说,我们可以把 controller 看做是对通信和协调管理能力的抽象,而 op_manager 是对实际计算的抽象。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
class OperationManager {
public:
  OperationManager(ParameterManager* param_manager,
                   std::vector<std::shared_ptr<AllreduceOp>> allreduce_ops,
                   std::vector<std::shared_ptr<AllgatherOp>> allgather_ops,
                   std::vector<std::shared_ptr<BroadcastOp>> broadcast_ops,
                   std::shared_ptr<JoinOp> join_op,
                   std::vector<std::shared_ptr<AllreduceOp>> adasum_ops,
                   std::shared_ptr<ErrorOp> error_op);

  virtual ~OperationManager() = default;

  Status ExecuteAllreduce(std::vector<TensorTableEntry>& entries, const Response& response) const;
  ...
}

总结来说,Horovod 的设计与实现都与百度的工作并无二致,只是进行了更多的抽象,支持更多的通信库,更多的训练框架。这些工作虽然都是 dirty work,但也是它受欢迎的最大原因。

详细解读

初始化 in Python

horovod/tensorflow/mpi_ops.py 之中会引入 SO 库。比如 dist-packages/horovod/tensorflow/mpi_lib.cpython-36m-x86_64-linux-gnu.so

SO 库 就是 horovod 中 C++ 代码编译出来的结果。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def _load_library(name):
    """Loads a .so file containing the specified operators.
    """
    filename = resource_loader.get_path_to_datafile(name)
    library = load_library.load_op_library(filename)
    return library

# Check possible symbol not found error from tensorflow version mismatch
try:
    MPI_LIB = _load_library('mpi_lib' + get_ext_suffix())
except Exception as e:
    check_installed_version('tensorflow', tf.__version__, e)
    raise e
else:
    check_installed_version('tensorflow', tf.__version__)

引入库的作用是获取到 C++ 的函数,并且用 python 封装一下,这样就可以在 python 世界使用 C++代码了。

由下文可以看出来,python 的 _allreduce 函数就会把功能转发给 C++,由 MPI_LIB.horovod_allreduce 完成。

1
2
3
4
5
6
7
8
def _allreduce(tensor, name=None, op=Sum, prescale_factor=1.0, postscale_factor=1.0,
               ignore_name_scope=False):
    if name is None and not _executing_eagerly():
        name = 'HorovodAllreduce_%s' % _normalize_name(tensor.name)
    return MPI_LIB.horovod_allreduce(tensor, name=name, reduce_op=op,
                                     prescale_factor=prescale_factor,
                                     postscale_factor=postscale_factor,
                                     ignore_name_scope=ignore_name_scope)

4.2.2 初始化配置

我们摘录了主要部分,就是初始化 _HorovodBasics,然后从 _HorovodBasics 内获取各种函数,变量和配置,比如是否编译了 mpi,gloo 等等。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from horovod.common.basics import HorovodBasics as _HorovodBasics

_basics = _HorovodBasics(__file__, 'mpi_lib')

# import basic methods
init = _basics.init
size = _basics.size
local_size = _basics.local_size
rank = _basics.rank
local_rank = _basics.local_rank
mpi_built = _basics.mpi_built
gloo_enabled = _basics.gloo_enabled
......

4.2.3 hvd.init() 初始化

首先需要用 hvd.init() 来初始化,horovod  管理的所有状态都会传到 hvd 对象中。

1
2
# Horovod: initialize Horovod.
hvd.init()

此处调用的是 HorovodBasics 中的函数,我们看看做了什么。

可以看到,这部分会一直深入到 C++世界,调用了大量的 MPI_LIB_CTYPES 函数,所以我们接下来就要进入到 C++的世界看看。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def init(self, comm=None):
    """A function that initializes Horovod.
    """
    atexit.register(self.shutdown)

    if not isinstance(comm, list):
        mpi_built = self.MPI_LIB_CTYPES.horovod_mpi_built()

        from mpi4py import MPI
        if MPI._sizeof(MPI.Comm) == ctypes.sizeof(ctypes.c_int):
            MPI_Comm = ctypes.c_int
        else:
            MPI_Comm = ctypes.c_void_p
            self.MPI_LIB_CTYPES.horovod_init_comm.argtypes = [MPI_Comm]

        comm_obj = MPI_Comm.from_address(MPI._addressof(comm))
        self.MPI_LIB_CTYPES.horovod_init_comm(comm_obj)
    else:
        comm_size = len(comm)
        self.MPI_LIB_CTYPES.horovod_init(
            (ctypes.c_int * comm_size)(*comm), ctypes.c_int(comm_size))

目前逻辑如下图:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
           Import python files

                    +
                    |
                    |
                    v

           Import C++ SO files
                    |
                    |
                    |
                    v

           Create _HorovodBasics
                    +
                    |
                    |
                    v
                hvd.init()
                    +
Python              |
+------------------------------------------+
C++                 |
                    |
                    v

4.3 初始化 in C++

4.3.1 horovod_init_comm

在初始化的时候,Horovod 会:

  • 调用 MPI_Comm_dup 获取一个 Communicator,这样就有了和 MPI 协调的基础。
  • 然后调用 InitializeHorovodOnce。
1
2
3
4
void horovod_init_comm(MPI_Comm comm) {
  MPI_Comm_dup(comm, &mpi_context.mpi_comm);
  InitializeHorovodOnce(nullptr, 0);
}

4.3.2 InitializeHorovodOnce

InitializeHorovodOnce 是初始化的主要工作,主要是:

  • 依据是否编译了 mpi 或者 gloo,对各自的 context 进行处理,为 globalstate 创建对应的 controller;
  • 启动了后台线程 BackgroundThreadLoop 用来在各个 worker 之间协调;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
void horovod_init(const int* ranks, int nranks) {
  InitializeHorovodOnce(ranks, nranks);
}

void InitializeHorovodOnce(const int* ranks, int nranks) {
  // Ensure background thread is only started once.
  if (!horovod_global.initialize_flag.test_and_set()) {
    horovod_global.control_operation = ParseControllerOpsFromEnv();
    horovod_global.cpu_operation = ParseCPUOpsFromEnv();

#if HAVE_MPI // 依据是否编译了MPI进行处理
    // Enable mpi is it's used either in cpu data transfer or controller
    if (horovod_global.cpu_operation == LibType::MPI ||
        horovod_global.control_operation == LibType::MPI) {
      mpi_context.Enable();
    }

    if (horovod_global.control_operation == LibType::MPI){
      // 创建一个 MPIController 对象
      horovod_global.controller.reset(new MPIController(
          horovod_global.response_cache,
          horovod_global.tensor_queue, horovod_global.timeline,
          horovod_global.parameter_manager, horovod_global.group_table,
          mpi_context));
      horovod_global.controller->SetRanks(ranks, nranks);
    }
#endif

#if HAVE_GLOO // 依据是否编译了 GLOO 进行处理
    // Enable gloo is it's used either in cpu data transfer or controller
    if (horovod_global.cpu_operation == LibType::GLOO ||
        horovod_global.control_operation == LibType::GLOO) {
      gloo_context.Enable();
    }

    if (horovod_global.control_operation == LibType::GLOO) {
      horovod_global.controller.reset(new GlooController(
          horovod_global.response_cache,
          horovod_global.tensor_queue, horovod_global.timeline,
          horovod_global.parameter_manager, horovod_global.group_table,
          gloo_context));
    }
#endif
    // Reset initialization flag
    // 启动后台线程
    horovod_global.initialization_done = false;
    horovod_global.background_thread = std::thread(
        BackgroundThreadLoop, std::ref(horovod_global));
  }

  // Wait to ensure that the background thread has finished initializing MPI.
  while (!horovod_global.initialization_done) {
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
  }
}

4.3.3 HorovodGlobalState

在 C++ 世界,HorovodGlobalState 起到了集中管理各种全局变量的作用。

HorovodGlobalState 在 horovod 中是一个全局变量,其中的元素可以供不同的线程访问。HorovodGlobalState 在加载 C++ 的代码时候就已经创建了,同时创建的还有各种 context(mpi_context, nccl_context, gpu_context)。

Horovod 主要会在 backgroundThreadLoop 中完成 HorovodGlobalState 不同元素初始化,比较重要的有:

  • controller  管理总体通信控制流;
  • tensor_queue  会处理从前端过来的通信需求(allreduce,broadcast 等);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// All the Horovod state that must be stored globally per-process.
HorovodGlobalState horovod_global;

#if HAVE_MPI
MPIContext mpi_context;
#endif

#if HAVE_GLOO
GlooContext gloo_context;
#endif

....

std::unique_ptr<OperationManager> op_manager;

HorovodGlobalState 摘要如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
struct HorovodGlobalState {

  // Background thread running MPI communication.
  std::thread background_thread; // 后台线程,用来在各个worker之间协调

  ParameterManager parameter_manager; // 维护后台总体参数配置

  // Encapsulates the fusion buffers, handles resizing and auto-tuning of buffer
  // size.
  FusionBufferManager fusion_buffer; // 融合tensor,以便缩减通信开销

  std::shared_ptr<Controller> controller; //管理总体通信控制流

  TensorQueue tensor_queue; //处理从前端过来的通信需求(allreduce,broadcast 等)

  // Pointer to shared buffer for allgather
  void* shared_buffer = nullptr;

  // LRU cache of Responses
  ResponseCache response_cache;

  // Information on registered groups.
  GroupTable group_table;

  ~HorovodGlobalState() {
    // Make sure that the destructor of the background thread is safe to
    // call. If a thread is still joinable (not detached or complete) its
    // destructor cannot be called.
    if (background_thread.joinable()) {
      shut_down = true;
      background_thread.join();
    }
  }
};

目前具体逻辑如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
           Import python files

                    +
                    |
                    |
                    v

           Import C++ SO files
                    |
                    |
                    |
                    v

           Create _HorovodBasics
                    +
                    |
                    |
                    v
                hvd.init()
                    +
Python              |
+-------------------------------------------------------------------------------------------------------------+
                    |
c++                 |
                    v                                                          +-----------------------------+
                                                                               |  HorovodGlobalState         |
              horovod_init_comm                                                |                             |
                    +                             +------------------+         |                             |
                    |                             | horovod_global +---------> |        TensorQueue          |
                    |                             |                  |         |                             |
                    v                             |                  |         |        background_thread    |
                                                  | mpi_context      |         |                             |
           InitializeHorovodOnce   +------------> |                  |         |        ParameterManager     |
                    +                             |                  |         |                             |
                    |                             | gloo_context     |         |        FusionBufferManager  |
                    |                             |                  |         |                             |
                    |                             |                  |         |        Controller           |
                    v                             | op_manager       |         |                             |
             background_threa                     |                  |         |        ResponseCache        |
                                                  +------------------+         |                             |
                                                                               |        shared_buffer        |
                                                                               +-----------------------------+

至此,horovod 已经初始化完成,用户代码可以使用了。

4.4 hvd 概念

在用户代码中,接下来是 rank 概念。

1
2
3
hvd.local_rank()

hvd.rank()

我们介绍下几个相关概念:

  • Horovod 为设备上的每个 GPU 启动了该训练脚本的一个副本。local rank 就是分配给某一台计算机上每个执行训练的唯一编号(也可以认为是进程号或者 GPU 设备的 ID 号),范围是 0 到 n-1,其中 n 是该计算机上 GPU 设备的数量。
  • rank  可以认为是代表分布式任务里的一个执行训练的唯一全局编号(用于进程间通讯)。Rank 0 在 Horovod 中通常具有特殊的意义:它是负责此同步的设备。
    • 在百度的实现中,不同 Rank 的角色是不一样的,Rank 0 会充当 coordinator 的角色。它会协调来自其他 Rank 的 MPI 请求,是一个工程上的考量。这一设计也被后来的 Horovod 采用。
    • Rank 0 也用来把参数广播到其他进程 & 存储 checkpoint。
  • world_size:进程总数量,会等到所有 world_size 个进程就绪之后才会开始训练。

hvd.init 这部分的目的就是让并行进程们可以知道自己被分配的 rank / local rank 等信息,于是后续可以根据 local rank(所在节点上的第几张 GPU 卡) 来设置所需的显存分配。

4.5 数据处理

接下来是数据处理。

1
2
3
4
5
dataset = tf.data.Dataset.from_tensor_slices(
    (tf.cast(mnist_images[..., tf.newaxis] / 255.0, tf.float32),
             tf.cast(mnist_labels, tf.int64))
)
dataset = dataset.repeat().shuffle(10000).batch(128)

这里有几点需要说明:

  • 首先,训练的数据需要放置在任何节点都能访问的地方。
  • 其次,Horovod 需要对数据进行分片处理,需要在不同机器上按 Rank 进行切分,以保证每个 GPU 进程训练的数据集是不一样的。
  • 数据集本体需要出于数据并行性的需求而被拆分为多个分片,Horovod 的不同工作节点都将分别读取自己的数据集分片。

从 PyTorch 示例脚本看得更加清楚。

1
2
3
4
5
# Horovod: use DistributedSampler to partition the training data.
train_sampler = torch.utils.data.distributed.DistributedSampler(
    train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=args.batch_size, sampler=train_sampler, **kwargs)
  • DataLoader的采样器组件从要绘制的数据集中返回可迭代的索引。 PyTorch 中的默认采样器是顺序的,返回序列0, 1, 2, …, n 。 Horovod 使用其DistributedSampler覆盖了此行为,该DistributedSampler处理跨计算机的数据集分区。 DistributedSampler本身接受两个参数作为输入: hvd.size() (GPU 的总数,例如 16)和hvd.rank() (从总体列表中分配给该设备的 ID,例如 0…15)。
  • Pytorch 使用的是数据分布式训练,每个进程实际上是独立加载数据的,所以需要加载相同数据集后用一定的规则根据 rank 来顺序切割获取不同的数据子集,DistributedSampler 就是用来确保 dataloader 只会 load 到整个数据集的一个特定子集的做法(实际上不用 Pytorch 提供的 DistributedSampler 工具,自己做加载数据后切分 word_size 个子集按 rank 顺序拿到子集效果也是一样)。
  • 同时为了能够按顺序划分数据子集,拿到不同部分数据,所以数据集不能够进行随机打散,所以用了参数 ‘shuffle’: False。

4.6 广播初始化变量

以下代码完成广播初始化的功能。

1
hvd.callbacks.BroadcastGlobalVariablesCallback(0)

这句代码保证的是  rank 0 上的所有参数只在 rank 0 初始化,然后广播给其他节点,即变量从第一个流程向其他流程传播,以实现参数一致性初始化。

下面就介绍下 Horvod 之中广播的使用。

4.6.1 广播定义

广播的具体实现是:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class BroadcastGlobalVariablesCallbackImpl(object):
    def __init__(self, backend, root_rank, device='', *args):
        super(BroadcastGlobalVariablesCallbackImpl, self).__init__(*args)
        self.backend = backend
        self.root_rank = root_rank
        self.device = device
        self.broadcast_done = False

    def on_batch_end(self, batch, logs=None):
        if self.broadcast_done:
            return

        with tf.device(self.device):
            if hvd._executing_eagerly() and hasattr(self.model, 'variables'):
                # TensorFlow 2.0 or TensorFlow eager
                hvd.broadcast_variables(self.model.variables,
                                        root_rank=self.root_rank)
                hvd.broadcast_variables(self.model.optimizer.variables(),
                                        root_rank=self.root_rank)
            else:
                bcast_op = hvd.broadcast_global_variables(self.root_rank)
                self.backend.get_session().run(bcast_op)

        self.broadcast_done = True

4.6.2 broadcast_variables

broadcast_variables 调用了 _make_broadcast_group_fn 完成功能,可以看到对于 执行图 的每个变量,调用了 broadcast。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def broadcast_variables(variables, root_rank):
    """Broadcasts variables from root rank to all other processes.

    Arguments:
        variables: variables for broadcast
        root_rank: rank of the process from which global variables will be broadcasted
                   to all other processes.
    """
    broadcast_group = _make_broadcast_group_fn()
    return broadcast_group(variables, root_rank)

以及

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
@_cache
def _make_broadcast_group_fn():
    if _executing_eagerly():
        # Eager mode will parallelize independent control flow
        def broadcast_group(variables, root_rank):
            for var in variables:
                var.assign(broadcast(var, root_rank))

        return _make_subgraph(broadcast_group)
    else:
        # Graph mode requires an Op
        def broadcast_group(variables, root_rank):
            return tf.group(*[var.assign(broadcast(var, root_rank))
                              for var in variables])

        return broadcast_group

4.6.3 调用 MPI

broadcast 就是调用了 MPI 函数真正完成了功能。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def broadcast(tensor, root_rank, name=None, ignore_name_scope=False):
    """An op which broadcasts the input tensor on root rank to the same input tensor
    on all other Horovod processes.

    The broadcast operation is keyed by the name of the op. The tensor type and
    shape must be the same on all Horovod processes for a given name. The broadcast
    will not start until all processes are ready to send and receive the tensor.

    Returns:
      A tensor of the same shape and type as `tensor`, with the value broadcasted
      from root rank.
    """
    if name is None and not _executing_eagerly():
        name = 'HorovodBroadcast_%s' % _normalize_name(tensor.name)
    return MPI_LIB.horovod_broadcast(tensor, name=name, root_rank=root_rank,
                                     ignore_name_scope=ignore_name_scope)

4.6.4 同步参数

在后台进程中,会根据情况定期同步参数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
bool RunLoopOnce(HorovodGlobalState& state) {
	// 业务逻辑功能

  if (state.parameter_manager.IsAutoTuning()) {
    bool should_sync =
        state.parameter_manager.Update(tensor_names, total_tensor_size);
    // 看看是否需要同步,如果需要,就同步。
    if (should_sync) {
      state.controller->SynchronizeParameters();
    }
  }
  ......
}

同步参数代码也是调用了 Bcast 功能完成。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
void Controller::SynchronizeParameters() {
  ParameterManager::Params param;
  if (is_coordinator_) { // rank 0 执行操作
    param = parameter_manager_.GetParams();
  }

  void* buffer = (void*)(&param);
  size_t param_size = sizeof(param);
  Bcast(buffer, param_size, 0, Communicator::GLOBAL);

  if (!is_coordinator_) { // worker 执行操作
    parameter_manager_.SetParams(param);
  }
}

4.7 DistributedOptimizer

最后需要配置 DistributedOptimizer,这就是关键点之一。

1
2
3
# Horovod: add Horovod DistributedOptimizer.
opt = hvd.DistributedOptimizer(
    opt, backward_passes_per_step=1, average_aggregated_gradients=True)

TF Optimizer 是模型训练的关键 API,可以获取到每个 OP 的梯度并用来更新权重。HVD 在原始 TF Optimizer 的基础上包装了hvd.DistributedOptimizer

DistributedOptimizer包装器将原始优化器作为输入,将梯度计算委托给它。 即DistributedOptimizer会调用原始优化器进行梯度计算。这样,在集群中每台机器都会用原始优化器得到自己的梯度(Local Gradient)。

Horovod DistributedOptimizer接下来会使用 all-reduce 或 all-gather 来完成全局梯度归并,然后将这些平均梯度应用于所有设备。

我们梳理下其中的调用关系:

  • hvd.DistributedOptimizer 继承 keras Optimizer,在计算时候,依然由传入的原始优化器做计算。
  • 在得到计算的梯度之后,调用 hvd.allreduce 或者 hvd.allgather 来计算。
  • 最后实施这些平均之后的梯度。从而实现整个集群的梯度归并操作。

具体后文会详细介绍。

4.8 未来可能

Horovod 目前架构的基础是:机器学习的模型参数在一张 GPU 上可以存下。

未来是否可以把模型分片结合进来,是一个很大的看点。

另外,如果模型的全连接层较多,则全连接层的强耦合性结合 allreduce 类似 bsp 的同步机制,还是会让网络通信时间成为瓶颈。因此,在 ring-allreduce 环境下,同步协议的改造,比如利用 SSP 来替换 BSP,或者利用梯度压缩来加快 allreduce 进程也是值得探索的方向。

参考资料