nccl-algos

nccl > NCCL Examples 中我们可以看到 nccl 编程中首先需要创建 nccl communicator。在调用 ncclCommInitRank 前首先需要申请到 UniqId,我们将根据此分析 NCCL 代码。本文分析代码采用的是 NCLL v2.7.8-1 版本。

总的来说,NCCL 的工作流程如下:

  1. 首先,每个要参与数据传输的 GPU 都要调用 ncclCommInitRank 创建一个与其 rank 对应的 Communicator,同一个 communication group 中的每个 communicator 具有相同的 unique ID。
  2. 当每个设备调用 ncclCommInitRank 时,设备之间会交换一些信息,例如各自的 IP,bus ID 等。然后检测整个系统中的网络拓扑结构。
  3. 有了网络拓扑结构,NCCL 会进一步搜索当前网络中最佳的 RING、TREE、COLLNET 图结构。
  4. 有了设备之间的图结构信息,就可以在存在通路的设备之间建立点对点的连接。主要有三种连接方式:p2p,shared memory 以及 network。采用哪种方式取决于这两个节点之间支持怎样的连接方式。

以上就是初始化阶段的所有准备工作。

  1. 初始化完成后,就可以调用集合通信原语。例如 ncclAllReduce。集合通信函数会被 enqueue 到一个 CUDA stream 上,在 GPU 上异步执行
  2. 接下来在 CPU 上启动 Proxy 线程,作为 GPU 上集合通信 kernel 的代理,与 GPU kernel 协同完成与其他设备之间的数据传输。GPU kernel 负责计算所需传输的数据的地址以及数据量大小,而 Proxy 线程负责完成实际的数据传输。对于采用 p2pTransport 以及 shmTransport 的设备,在建立连接后可以直接传输数据,对于采用 netTransport 的设备,则需要通过 socket 进行数据传输。

ncclGetUniqueId

1
2
3
4
5
ncclResult_t ncclGetUniqueId(ncclUniqueId* out) {
  NCCLCHECK(ncclInit());
  NCCLCHECK(PtrCheck(out, "GetUniqueId", "out"));
  return bootstrapGetUniqueId(out);
}

ncclInit

ncclInit 首先执行 initEnv,设置环境变量

然后执行 initNet,用来初始化 nccl 所需要的网络,包括两个,一个是 bootstrap 网络,另外一个是数据通信网络,bootstrap 网络主要用于初始化时交换一些简单的信息,比如每个机器的 ip 端口,由于数据量很小,而且主要是在初始化阶段执行一次,因此 bootstrap 使用的是 tcp;而通信网络是用于实际数据的传输,因此会优先使用 rdma(支持 gdr 的话会优先使用 gdr)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
static ncclResult_t ncclInit() {
  if (initialized) return ncclSuccess;
  pthread_mutex_lock(&initLock);
  if (!initialized) {
    initEnv();
    NCCLCHECK(initNet());
    INFO(NCCL_INIT, "Using network %s", ncclNetName());
    initialized = true;
  }
  pthread_mutex_unlock(&initLock);
  return ncclSuccess;
}

initNet

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
ncclResult_t initNet() {
  // Always initialize bootstrap network
  NCCLCHECK(bootstrapNetInit());

  NCCLCHECK(initNetPlugin(&ncclNet, &ncclCollNet));
  if (ncclNet != NULL) return ncclSuccess;
  if (initNet(&ncclNetIb) == ncclSuccess) {
    ncclNet = &ncclNetIb;
  } else {
    NCCLCHECK(initNet(&ncclNetSocket));
    ncclNet = &ncclNetSocket;
  }
  return ncclSuccess;
}

对应日志:

1
2
3
4
5
6
7
8
tensorflow-mnist-worker-0:88413:88816 [0] NCCL INFO Bootstrap : Using [0]eth1:10.0.0.122<0>
tensorflow-mnist-worker-0:88413:88816 [0] NCCL INFO NET/Plugin : No plugin found (libnccl-net.so), using internal implementation
tensorflow-mnist-worker-0:88413:88816 [0] NCCL INFO NCCL_IB_DISABLE set by environment to 0.
tensorflow-mnist-worker-0:88413:88816 [0] NCCL INFO NET/IB : Using [0]mlx5_2:1/RoCE ; OOB eth1:10.0.0.122<0>
tensorflow-mnist-worker-0:88413:88816 [0] NCCL INFO Using network IB
tensorflow-mnist-worker-0:88415:88813 [2] NCCL INFO NCCL_IB_GID_INDEX set by environment to 3.
tensorflow-mnist-worker-1:99974:100372 [2] NCCL INFO NCCL_IB_GID_INDEX set by environment to 3.
# ...

ncclNet_t

ncclNet_t 结构体是一系列的函数指针,比如初始化,发送,接收等;socket,IB 等通信方式都实现了自己的 ncclNet_t,如 ncclNetSocket,ncclNetIb,初始化通信网络的过程就是依次看哪个通信模式可用,然后赋值给全局的 ncclNet

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
ncclNet_t ncclNetIb = {
  "IB",
  ncclIbInit,
  ncclIbDevices,
  ncclIbGetProperties,
  ncclIbListen,
  ncclIbConnect,
  ncclIbAccept,
  ncclIbRegMr,
  ncclIbRegMrDmaBuf,
  ncclIbDeregMr,
  ncclIbIsend,
  ncclIbIrecv,
  ncclIbIflush,
  ncclIbTest,
  ncclIbCloseSend,
  ncclIbCloseRecv,
  ncclIbCloseListen
};
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
ncclNet_t ncclNetSocket = {
  "Socket",
  ncclNetSocketInit,
  ncclNetSocketDevices,
  ncclNetSocketGetProperties,
  ncclNetSocketListen,
  ncclNetSocketConnect,
  ncclNetSocketAccept,
  ncclNetSocketRegMr,
  NULL, // No DMA-BUF support
  ncclNetSocketDeregMr,
  ncclNetSocketIsend,
  ncclNetSocketIrecv,
  ncclNetSocketIflush,
  ncclNetSocketTest,
  ncclNetSocketClose,
  ncclNetSocketClose,
  ncclNetSocketCloseListen
};

bootstrapNetInit

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
ncclResult_t bootstrapNetInit() {
  if (bootstrapNetIfs == -1) {
    pthread_mutex_lock(&bootstrapNetLock);
    if (bootstrapNetIfs == -1) {
      bootstrapNetIfs = findInterfaces(bootstrapNetIfNames, bootstrapNetIfAddrs, MAX_IF_NAME_SIZE, MAX_IFS);
      if (bootstrapNetIfs <= 0) {
        WARN("Bootstrap : no socket interface found");
        return ncclInternalError;
      } else {
        char line[1024];
        char addrline[1024];
        line[0] = '\0';
        for (int i=0; i<bootstrapNetIfs; i++) {
          snprintf(line+strlen(line), 1023-strlen(line), " [%d]%s:%s", i, bootstrapNetIfNames+i*MAX_IF_NAME_SIZE,
              socketToString(&bootstrapNetIfAddrs[i].sa, addrline));
        }
        line[1023] = '\0';
        INFO(NCCL_INIT, "Bootstrap : Using%s", line);
      }
    }
    pthread_mutex_unlock(&bootstrapNetLock);
  }
  return ncclSuccess;
}

BootstrapNetInit 就是 bootstrap 网络的初始化,主要就是通过 findInterfaces 遍历机器上所有的网卡信息,通过 prefixList 匹配选择使用哪些网卡,将可用网卡的信息保存下来,将 ifa_name 保存到全局的 bootstrapNetIfNames,ip 地址保存到全局 bootstrapNetIfAddrs,默认除了 docker 和 lo 其他的网卡都可以使用,例如在测试机器上有三张网卡,分别是 xgbe 0,xgbe 1,xgbe 2,那么就会把这三个 ifaname 和对应的 ip 地址保存下来,另外 nccl 提供了环境变量 NCCL_SOCKET_IFNAME 可以用来指定想用的网卡名,例如通过 export NCCL_SOCKET_IFNAME=xgbe 0 来指定使用 xgbe 0,其实就是通过 prefixList 来匹配做到的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
static int findInterfaces(const char* prefixList, char* names, union socketAddress *addrs, int sock_family, int maxIfNameSize, int maxIfs) {
  struct netIf userIfs[MAX_IFS];
  bool searchNot = prefixList && prefixList[0] == '^';
  if (searchNot) prefixList++;
  bool searchExact = prefixList && prefixList[0] == '=';
  if (searchExact) prefixList++;
  int nUserIfs = parseStringList(prefixList, userIfs, MAX_IFS);

  int found = 0;
  struct ifaddrs *interfaces, *interface;
  getifaddrs(&interfaces);
  for (interface = interfaces; interface && found < maxIfs; interface = interface->ifa_next) {
    if (interface->ifa_addr == NULL) continue;

    int family = interface->ifa_addr->sa_family;
    if (family != AF_INET && family != AF_INET6)
      continue;

    if (sock_family != -1 && family != sock_family)
      continue;

    if (family == AF_INET6) {
      struct sockaddr_in6* sa = (struct sockaddr_in6*)(interface->ifa_addr);
      if (IN6_IS_ADDR_LOOPBACK(&sa->sin6_addr)) continue;
    }

    if (!(matchIfList(interface->ifa_name, -1, userIfs, nUserIfs, searchExact) ^ searchNot)) {
      continue;
    }
    bool duplicate = false;
    for (int i = 0; i < found; i++) {
      if (strcmp(interface->ifa_name, names+i*maxIfNameSize) == 0) { duplicate = true; break; }
    }

    if (!duplicate) {
      strncpy(names+found*maxIfNameSize, interface->ifa_name, maxIfNameSize);
      int salen = (family == AF_INET) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6);
      memcpy(addrs+found, interface->ifa_addr, salen);
      found++;
    }
  }

  freeifaddrs(interfaces);
  return found;
}

initNetPlugin

首先执行 initNetPlugin,查看是否有 libnccl-net. So,测试环境没有这个 so,所以直接返回。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
ncclResult_t initNetPlugin(ncclNet_t** net, ncclCollNet_t** collnet) {
  void* netPluginLib = dlopen("libnccl-net.so", RTLD_NOW | RTLD_LOCAL);
  if (netPluginLib == NULL) {
    // dlopen does not guarantee to set errno, but dlerror only gives us a
    // string, so checking errno doesn't hurt to try to provide a better
    // error message
    if (errno == ENOENT) {
      INFO(NCCL_INIT|NCCL_NET, "NET/Plugin : No plugin found (libnccl-net.so), using internal implementation");
    } else {
      INFO(NCCL_INIT|NCCL_NET, "NET/Plugin : Plugin load returned %d : %s.", errno, dlerror());
    }
    return ncclSuccess;
  }
  ncclNet_t* extNet = (ncclNet_t*) dlsym(netPluginLib, STR(NCCL_PLUGIN_SYMBOL));
  if (extNet == NULL) {
    INFO(NCCL_INIT|NCCL_NET, "NET/Plugin: Failed to find " STR(NCCL_PLUGIN_SYMBOL) " symbol.");
  } else if (initNet(extNet) == ncclSuccess) {
    *net = extNet;
    // Check for CollNet
    ncclCollNet_t* extCollNet = (ncclCollNet_t*) dlsym(netPluginLib, STR(NCCL_COLLNET_PLUGIN_SYMBOL));
    if (extCollNet == NULL) {
      INFO(NCCL_INIT|NCCL_NET, "NET/Plugin: Failed to find " STR(NCCL_COLLNET_PLUGIN_SYMBOL) " symbol.");
    } else if (initCollNet(extCollNet) == ncclSuccess) {
      *collnet = extCollNet;
    }
    return ncclSuccess;
  }
  if (netPluginLib != NULL) dlclose(netPluginLib);
  return ncclSuccess;
}

initNet

然后通过 initNet 分别尝试使用 IB 网络和 Socket 网络,这里实际执行的是每个 ncclNet_t 结构的 init 函数,对于 IB 网络就是 ncclIbInit

1
2
3
4
5
6
7
ncclResult_t initNet(ncclNet_t* net) {
  int ndev;
  if (net->init(ncclDebugLog) != ncclSuccess) return ncclInternalError;
  if (net->devices(&ndev) != ncclSuccess) return ncclInternalError;
  if (ndev <= 0) return ncclSystemError;
  return ncclSuccess;
}

ncclIbInit

首先执行 ncclNetIb 的 init 函数,就是 ncclIbInit

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
ncclResult_t ncclIbInit(ncclDebugLogger_t logFunction) {
  static int shownIbHcaEnv = 0;
  if(wrap_ibv_symbols() != ncclSuccess) { return ncclInternalError; }
  if (ncclParamIbDisable()) return ncclInternalError;

  if (ncclNIbDevs == -1) {
    pthread_mutex_lock(&ncclIbLock);
    wrap_ibv_fork_init();
    if (ncclNIbDevs == -1) {
      ncclNIbDevs = 0;
      if (findInterfaces(ncclIbIfName, &ncclIbIfAddr, MAX_IF_NAME_SIZE, 1) != 1) {
        WARN("NET/IB : No IP interface found.");
        return ncclInternalError;
      }

      // Detect IB cards
      int nIbDevs;
      struct ibv_device** devices;

      // Check if user defined which IB device:port to use
      char* userIbEnv = getenv("NCCL_IB_HCA");
      if (userIbEnv != NULL && shownIbHcaEnv++ == 0) INFO(NCCL_NET|NCCL_ENV, "NCCL_IB_HCA set to %s", userIbEnv);
      struct netIf userIfs[MAX_IB_DEVS];
      bool searchNot = userIbEnv && userIbEnv[0] == '^';
      if (searchNot) userIbEnv++;
      bool searchExact = userIbEnv && userIbEnv[0] == '=';
      if (searchExact) userIbEnv++;
      int nUserIfs = parseStringList(userIbEnv, userIfs, MAX_IB_DEVS);

      if (ncclSuccess != wrap_ibv_get_device_list(&devices, &nIbDevs)) return ncclInternalError;

      for (int d=0; d<nIbDevs && ncclNIbDevs<MAX_IB_DEVS; d++) {
        struct ibv_context * context;
        if (ncclSuccess != wrap_ibv_open_device(&context, devices[d]) || context == NULL) {
          WARN("NET/IB : Unable to open device %s", devices[d]->name);
          continue;
        }
        int nPorts = 0;
        struct ibv_device_attr devAttr;
        memset(&devAttr, 0, sizeof(devAttr));
        if (ncclSuccess != wrap_ibv_query_device(context, &devAttr)) {
          WARN("NET/IB : Unable to query device %s", devices[d]->name);
          if (ncclSuccess != wrap_ibv_close_device(context)) { return ncclInternalError; }
          continue;
        }
        for (int port = 1; port <= devAttr.phys_port_cnt; port++) {
          struct ibv_port_attr portAttr;
          if (ncclSuccess != wrap_ibv_query_port(context, port, &portAttr)) {
            WARN("NET/IB : Unable to query port %d", port);
            continue;
          }
          if (portAttr.state != IBV_PORT_ACTIVE) continue;
          if (portAttr.link_layer != IBV_LINK_LAYER_INFINIBAND
              && portAttr.link_layer != IBV_LINK_LAYER_ETHERNET) continue;

          // check against user specified HCAs/ports
          if (! (matchIfList(devices[d]->name, port, userIfs, nUserIfs, searchExact) ^ searchNot)) {
            continue;
          }
          TRACE(NCCL_INIT|NCCL_NET,"NET/IB: [%d] %s:%d/%s ", d, devices[d]->name, port,
              portAttr.link_layer == IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE");
          ncclIbDevs[ncclNIbDevs].device = d;
          ncclIbDevs[ncclNIbDevs].guid = devAttr.sys_image_guid;
          ncclIbDevs[ncclNIbDevs].port = port;
          ncclIbDevs[ncclNIbDevs].link = portAttr.link_layer;
          ncclIbDevs[ncclNIbDevs].speed = ncclIbSpeed(portAttr.active_speed) * ncclIbWidth(portAttr.active_width);
          ncclIbDevs[ncclNIbDevs].context = context;
          strncpy(ncclIbDevs[ncclNIbDevs].devName, devices[d]->name, MAXNAMESIZE);
          NCCLCHECK(ncclIbGetPciPath(ncclIbDevs[ncclNIbDevs].devName, &ncclIbDevs[ncclNIbDevs].pciPath, &ncclIbDevs[ncclNIbDevs].realPort));
          ncclIbDevs[ncclNIbDevs].maxQp = devAttr.max_qp;
          ncclNIbDevs++;
          nPorts++;
          pthread_create(&ncclIbAsyncThread, NULL, ncclIbAsyncThreadMain, context);
        }
        if (nPorts == 0 && ncclSuccess != wrap_ibv_close_device(context)) { return ncclInternalError; }
      }
      if (nIbDevs && (ncclSuccess != wrap_ibv_free_device_list(devices))) { return ncclInternalError; };
    }
    if (ncclNIbDevs == 0) {
      INFO(NCCL_INIT|NCCL_NET, "NET/IB : No device found.");
    } else {
      char line[1024];
      line[0] = '\0';
      for (int d=0; d<ncclNIbDevs; d++) {
        snprintf(line+strlen(line), 1023-strlen(line), " [%d]%s:%d/%s", d, ncclIbDevs[d].devName,
            ncclIbDevs[d].port, ncclIbDevs[d].link == IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE");
      }
      line[1023] = '\0';
      char addrline[1024];
      INFO(NCCL_INIT|NCCL_NET, "NET/IB : Using%s ; OOB %s:%s", line, ncclIbIfName, socketToString(&ncclIbIfAddr.sa, addrline));
    }
    pthread_mutex_unlock(&ncclIbLock);
  }
  return ncclSuccess;
}

首先第三行通过 wrap_ibv_symbols 加载动态库 libibverbs.so,然后获取动态库的各个函数

然后通过 wrap_ibv_fork_init 避免 fork 引起 rdma 网卡读写出错

后面会讲到 ib 网络也会用到 socket 进行带外网络的传输,所以这里也通过 findInterfaces 获取一个可用的网卡保存到 ncclIbIfAddr

然后通过 ibv_get_device_list 获取所有 rdma 设备到 devices 中,遍历 devices 的每个 device,因为每个 HCA 可能有多个物理 port,所以对每个 device 遍历每一个物理 port,获取每个 port 的信息,然后将相关信息保存到全局的 ncclIbDevs 中,比如是哪个 device 的哪个 port,使用的是 IB 还是 ROCE,device 的 pci 路径,maxqp,device 的 name 等,注意这里也有类似 bootstrap 网络 NCCL_SOCKET_IFNAME 的环境变量,叫 NCCL_IB_HCA,可以指定使用哪个 IB HCA

到这里整个初始化的过程就完成了,一句话总结就是获取了当前机器上所有可用的 IB 网卡和普通以太网卡然后保存下来

bootstrapGetUniqueId

然后开始生成 UniqueId,如果不是环境变量设置,实际调用 bootstrapCreateRoot

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
ncclResult_t bootstrapGetUniqueId(ncclUniqueId* id) {
  static_assert(sizeof(ncclNetHandle_t) < sizeof(ncclUniqueId), "NetId does not fit inside ncclUniqueId");
  memset(id, 0, sizeof(ncclUniqueId));
  ncclNetHandle_t* netHandle = (ncclNetHandle_t*) id;

  char* env = getenv("NCCL_COMM_ID");
  if (env) {
    INFO(NCCL_ENV, "NCCL_COMM_ID set by environment to %s", env);
    if (bootstrapNetCreateHandle(netHandle, env) != 0) {
      WARN("Invalid NCCL_COMM_ID, please use format: <ipv4>:<port> or [<ipv6>]:<port> or <hostname>:<port>");
      return ncclInvalidArgument;
    }
  } else {
    NCCLCHECK(bootstrapCreateRoot(id, false));
  }

  return ncclSuccess;
}

bootstrapCreateRoot

1
2
3
4
5
6
7
8
ncclResult_t bootstrapCreateRoot(ncclUniqueId* id, bool idFromEnv) {
  ncclNetHandle_t* netHandle = (ncclNetHandle_t*) id;
  void* listenComm;
  NCCLCHECK(bootstrapNetListen(idFromEnv ? dontCareIf : 0, netHandle, &listenComm));
  pthread_t thread;
  pthread_create(&thread, NULL, bootstrapRoot, listenComm);
  return ncclSuccess;
}

ncclNetHandle_t 也是一个字符数组,然后执行 bootstrapNetListen

bootstrapNetListen

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
static ncclResult_t bootstrapNetListen(int dev, ncclNetHandle_t* netHandle, void** listenComm) {
  union socketAddress* connectAddr = (union socketAddress*) netHandle;
  static_assert(sizeof(union socketAddress) < NCCL_NET_HANDLE_MAXSIZE, "union socketAddress size is too large");
  // if dev >= 0, listen based on dev
  if (dev >= 0) {
    NCCLCHECK(bootstrapNetGetSocketAddr(dev, connectAddr));
  } else if (dev == findSubnetIf) {
    ...
  } // Otherwise, handle stores a local address
  struct bootstrapNetComm* comm;
  NCCLCHECK(bootstrapNetNewComm(&comm));
  NCCLCHECK(createListenSocket(&comm->fd, connectAddr));
  *listenComm = comm;
  return ncclSuccess;
}

bootstrapNetGetSocketAddr

首先是通过 bootstrapNetGetSocketAddr 获取一个可用的 ip 地址

1
2
3
4
5
static ncclResult_t bootstrapNetGetSocketAddr(int dev, union socketAddress* addr) {
  if (dev >= bootstrapNetIfs) return ncclInternalError;
  memcpy(addr, bootstrapNetIfAddrs+dev, sizeof(*addr));
  return ncclSuccess;
}

此时 dev 是 0, bootstrapNetIfs 是初始化 bootstrap 网络的时候一共找到了几个可用的网卡,这里就是获取了第 0 个可用的 ip 地址

bootstrapNetNewComm

然后是通过 bootstrapNetNewComm 创建 bootstrapNetComm,bootstrapNetComm 其实就是 fd,bootstrapNetNewComm 其实就是 new 了一个 bootstrapNetComm

1
2
3
struct bootstrapNetComm {
  int fd;
};

createListenSocket

然后通过 createListenSocket 启动 socker server

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
static ncclResult_t createListenSocket(int *fd, union socketAddress *localAddr) {
  /* IPv4/IPv6 support */
  int family = localAddr->sa.sa_family;
  int salen = (family == AF_INET) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6);

  /* Create socket and bind it to a port */
  int sockfd = socket(family, SOCK_STREAM, 0);
  if (sockfd == -1) {
    WARN("Net : Socket creation failed : %s", strerror(errno));
    return ncclSystemError;
  }

  if (socketToPort(&localAddr->sa)) {
    // Port is forced by env. Make sure we get the port.
    int opt = 1;
#if defined(SO_REUSEPORT)
    SYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)), "setsockopt");
#else
    SYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)), "setsockopt");
#endif
  }

  // localAddr port should be 0 (Any port)
  SYSCHECK(bind(sockfd, &localAddr->sa, salen), "bind");

  /* Get the assigned Port */
  socklen_t size = salen;
  SYSCHECK(getsockname(sockfd, &localAddr->sa, &size), "getsockname");

#ifdef ENABLE_TRACE
  char line[1024];
  TRACE(NCCL_INIT|NCCL_NET,"Listening on socket %s", socketToString(&localAddr->sa, line));
#endif

  /* Put the socket in listen mode
   * NB: The backlog will be silently truncated to the value in /proc/sys/net/core/somaxconn
   */
  SYSCHECK(listen(sockfd, 16384), "listen");
  *fd = sockfd;
  return ncclSuccess;
}

创建监听 fd,ip 由 localaddr 指定,初始端口为 0,bind 时随机找一个可用端口,并通过 getsockname (sockfd, &localAddr->sa, &size)将 ip 端口写回到 localaddr,这里 localaddr 就是 UniqueId。

到这里 UniqueId 也就产生了,其实就是当前机器的 ip 和 port

ncclCommInitRank

rank 0 的机器生成了 ncclUniqueId,并完成了机器的 bootstrap 网络和通信网络的初始化,这节接着看下所有节点间 bootstrap 的连接是如何建立的。

1
2
3
4
5
6
7
NCCL_API(ncclResult_t, ncclCommInitRank, ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank);
ncclResult_t ncclCommInitRank(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank) {
  int cudaDev;
  CUDACHECK(cudaGetDevice(&cudaDev));
  NCCLCHECK(ncclCommInitRankDev(newcomm, nranks, commId, myrank, cudaDev));
  return ncclSuccess;
}

ncclCommInitRankDev

Rank 0 节点执行 ncclGetUniqueId 生成 ncclUniqueId,通过 mpi 将 Id 广播到所有节点,然后所有节点都会执行 ncclCommInitRank,这里其他节点也会进行初始化 bootstrap 网络和通信网络的操作,然后会执行到 ncclCommInitRankSync

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static ncclResult_t ncclCommInitRankDev(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank, int cudaDev) {
  ncclResult_t res;
  char* env = getenv("NCCL_COMM_ID");
  if (env && myrank == 0) {
    INFO(NCCL_ENV, "NCCL_COMM_ID set by environment to %s", env);
    NCCLCHECKGOTO(bootstrapCreateRoot(&commId, true), res, end);
  }

  NCCLCHECKGOTO(ncclInit(), res, end);
  if (myrank == 0) showVersion();

  // Make sure the CUDA runtime is initialized.
  CUDACHECKGOTO(cudaFree(NULL), res, end);

  NCCLCHECKGOTO(PtrCheck(newcomm, "CommInitRank", "newcomm"), res, end);
  if (nranks < 1 || myrank < 0 || myrank >= nranks) {
    WARN("Invalid rank requested : %d/%d", myrank, nranks);
    res = ncclInvalidArgument;
    goto end;
  }

  if (ncclAsyncMode()) {
    NCCLCHECKGOTO(ncclAsyncInit(ncclCommInitRankSync, newcomm, nranks, commId, myrank, cudaDev), res, end);
  } else {
    NCCLCHECKGOTO(ncclCommInitRankSync(newcomm, nranks, commId, myrank, cudaDev), res, end);
  }
end:
  if (ncclAsyncMode()) return ncclAsyncErrCheck(res);
  else return res;
}

ncclCommInitRankSync

NcclComm_t 是指向 ncclComm 的指针,ncclComm 是一个大杂烩,包含了通信用到的所有上下文信息,里面的字段等用到的时候再介绍,然后通过 commAlloc 分配 newcom,并且完成初始化,比如当前是哪个卡,对应的 pcie busid 是什么,然后执行 initTransportsRank

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
ncclResult_t ncclCommInitRankSync(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank, int cudaDev) {
  ncclResult_t res;

  CUDACHECK(cudaSetDevice(cudaDev));
  NCCLCHECKGOTO(commAlloc(newcomm, nranks, myrank), res, cleanup);
  NCCLCHECKGOTO(initTransportsRank(*newcomm, &commId), res, cleanup);
  NCCLCHECKGOTO(devCommSetup(*newcomm), res, cleanup);

  INFO(NCCL_INIT,"comm %p rank %d nranks %d cudaDev %d busId %x - Init COMPLETE", *newcomm, myrank, nranks, (*newcomm)->cudaDev, (*newcomm)->busId);

  return ncclSuccess;
cleanup:
  if ((*newcomm) && (*newcomm)->bootstrap) bootstrapAbort((*newcomm)->bootstrap);
  *newcomm = NULL;
  return res;
}

initTransportsRank bootstrap 网络连接建立

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* commId) {
  // We use 3 AllGathers
  // 1. { peerInfo, comm }
  // 2. ConnectTransport[nranks], ConnectValue[nranks]
  // 3. { nThreads, nrings, compCap, prev[MAXCHANNELS], next[MAXCHANNELS] }

  int rank = comm->rank;
  int nranks = comm->nRanks;
  uint64_t commHash = getHash(commId->internal, NCCL_UNIQUE_ID_BYTES);
  TRACE(NCCL_INIT, "comm %p, commHash %lx, rank %d nranks %d - BEGIN", comm, commHash, rank, nranks);
  NCCLCHECK(bootstrapInit(commId, rank, nranks, &comm->bootstrap));

  // AllGather1 - begin
  struct {
    struct ncclPeerInfo peerInfo;
    struct ncclComm* comm;
  } *allGather1Data;

  // ...
}

bootstrapInit -> send info to root

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commState) {
  ncclNetHandle_t* netHandle = (ncclNetHandle_t*) id;
  bool idFromEnv = getenv("NCCL_COMM_ID") != NULL;
  struct extState* state;
  NCCLCHECK(ncclCalloc(&state, 1));
  state->rank = rank;
  state->nranks = nranks;
  *commState = state;

  TRACE(NCCL_INIT, "rank %d nranks %d", rank, nranks);

  struct extInfo info = { 0 };
  info.rank = rank;
  info.nranks = nranks;
  void *tmpSendComm, *tmpRecvComm;
  // Pass the remote address to listen via info
  if (idFromEnv) {
    memcpy(&info.extHandleListen, netHandle, sizeof(ncclNetHandle_t));
    memcpy(&info.extHandleListenRoot, netHandle, sizeof(ncclNetHandle_t));
  }
  // listen will return the local address via info (specify interface type 'findSubnetIf')
  state->dev = idFromEnv ? findSubnetIf : 0;
  void* extBstrapListenCommRoot;
  NCCLCHECK(bootstrapNetListen(state->dev, &info.extHandleListen, &state->extBstrapListenComm));
  NCCLCHECK(bootstrapNetListen(state->dev, &info.extHandleListenRoot, &extBstrapListenCommRoot));

  // stagger connection times to avoid an overload of the root at very high rank counts
  if (nranks > 128) {
    long msec = rank;
    struct timespec tv;
    tv.tv_sec = msec / 1000;
    tv.tv_nsec = 1000000 * (msec % 1000);
    TRACE(NCCL_INIT, "rank %d delaying connection to root by %ld msec", rank, msec);
    (void) nanosleep(&tv, NULL);
  }

  // send info on my listening socket to root
  NCCLCHECK(bootstrapNetConnect(state->dev, netHandle, &tmpSendComm));
  NCCLCHECK(bootstrapNetSend(tmpSendComm, &info, sizeof(info)));
  NCCLCHECK(bootstrapNetCloseSend(tmpSendComm));

  // ...
}

首先看下 commState,即 ncclComm 的 bootstrap,类型为 extState

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
Struct extState {
  Void* extBstrapListenComm;
  Void* extBstrapRingRecvComm;
  Void* extBstrapRingSendComm;
  NcclNetHandle_t* peerBstrapHandles;
  Struct unexConn* unexpectedConnections;
  Int rank;
  Int nranks;
  Int dev;
};

其中 extBstrapRingSendComm 是当前节点连接 next 的 socket 连接,extBstrapRingRecvComm 是当前节点和 prev 节点的 socket 连接,extBstrapListenComm 是当前节点的监听 socket,peerBstrapHandles 是所有 rank 的 ip port(对应 extBstrapListenComm),dev 默认为 0,表示用第几个 ip 地址。

然后通过 bootstrapNetListen 创建 extHandleListen 和 extHandleListenRoot 两个 bootstrap comm,如前文所述,bootstrap comm 其实就是保存了 fd,这里创建两个 comm 的原因是 extHandleListen 是 rank 之间实际使用的 bootstrap 连接,extHandleListenRoot 是 rank 0 节点和其他所有 rank 进行通信使用的连接

1
static ncclResult_t bootstrapNetListen (int dev, ncclNetHandle_t* netHandle, void** listenComm)

BootstrapNetListen 函数上节有介绍过,会获取到第 dev 个当前机器的 ip,然后 listen 获取监听 fd,将 ip port 写到 nethandle,获取到的 bootstrap comm 写到 listencomm

然后将 rank,nrank,extHandleListen 和 extHandleListenRoot 写到 extInfo 里

1
2
3
4
5
6
struct extInfo {
  Int rank;
  Int nranks;
  NcclNetHandle_t extHandleListenRoot;
  NcclNetHandle_t extHandleListen;
};

NetHandle 为 ncclUniqueId,即 rank 0 的 ip port,然后通过 bootstrapNetConnect 创建 bootstrap send comm,类比 bootstrapNetListen,bootstrapNetConnect 就是建立到 netHandle 的 socket 连接,将 socket 写到 sendComm 里,这里 dev 并没有用到

1
static ncclResult_t bootstrapNetConnect (int dev, ncclNetHandle_t* netHandle, void** sendComm)

然后通过 bootstrapNetSend 将 extInfo 发送出去,即发给 rank0

1
2
3
4
5
6
static ncclResult_t bootstrapNetSend (void* sendComm, void* data, int size) {
  struct bootstrapNetComm* comm = (struct bootstrapNetComm*) sendComm;
  NCCLCHECK(socketSend(comm->fd, &size, sizeof(int)));
  NCCLCHECK(socketSend(comm->fd, data, size));
  return ncclSuccess;
}

其中 socketSend 就是执行 send 接口发送数据

然后通过 bootstrapNetCloseSend 关闭 fd。

Rank 0 收到数据后会做什么工作呢,回顾一下,rank 0 的节执行 ncclGetUniqueId 生成 ncclUniqueId,其中在执行 bootstrapCreateRoot 的最后会启动一个线程执行 bootstrapRoot

bootstrapRoot

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
static void *bootstrapRoot(void* listenComm) {
  struct extInfo info;
  ncclNetHandle_t *rankHandles = NULL;
  ncclNetHandle_t *rankHandlesRoot = NULL; // for initial rank <-> root information exchange
  ncclNetHandle_t zero = { 0 }; // for sanity checking
  void* tmpComm;
  ncclResult_t res;
  setFilesLimit();

  TRACE(NCCL_INIT, "BEGIN");
  /* Receive addresses from all ranks */
  int nranks = 0, c = 0;
  do {
    NCCLCHECKGOTO(bootstrapNetAccept(listenComm, &tmpComm), res, out);
    NCCLCHECKGOTO(bootstrapNetRecv(tmpComm, &info, sizeof(info)), res, out);
    NCCLCHECKGOTO(bootstrapNetCloseRecv(tmpComm), res, out);

    if (c == 0) {
      nranks = info.nranks;
      NCCLCHECKGOTO(ncclCalloc(&rankHandles, nranks), res, out);
      NCCLCHECKGOTO(ncclCalloc(&rankHandlesRoot, nranks), res, out);
    }

    if (nranks != info.nranks) {
      WARN("Bootstrap Root : mismatch in rank count from procs %d : %d", nranks, info.nranks);
      goto out;
    }

    if (memcmp(&zero, &rankHandlesRoot[info.rank], sizeof(ncclNetHandle_t)) != 0) {
      WARN("Bootstrap Root : rank %d of %d ranks has already checked in", info.rank, nranks);
      goto out;
    }

    // Save the connection handle for that rank
    memcpy(rankHandlesRoot+info.rank, info.extHandleListenRoot, sizeof(ncclNetHandle_t));
    memcpy(rankHandles+info.rank, info.extHandleListen, sizeof(ncclNetHandle_t));

    ++c;
    TRACE(NCCL_INIT, "Received connect from rank %d total %d/%d",  info.rank, c, nranks);
  } while (c < nranks);
  TRACE(NCCL_INIT, "COLLECTED ALL %d HANDLES", nranks);

  // Send the connect handle for the next rank in the AllGather ring
  for (int r=0; r<nranks; ++r) {
    int next = (r+1) % nranks;
    void *tmpSendComm;
    NCCLCHECKGOTO(bootstrapNetConnect(0, rankHandlesRoot+r, &tmpSendComm), res, out);
    NCCLCHECKGOTO(bootstrapNetSend(tmpSendComm, rankHandles+next, sizeof(ncclNetHandle_t)), res, out);
    NCCLCHECKGOTO(bootstrapNetCloseSend(tmpSendComm), res, out);
  }
  TRACE(NCCL_INIT, "SENT OUT ALL %d HANDLES", nranks);

out:
  bootstrapNetCloseListen(listenComm);
  if (rankHandles) free(rankHandles);
  if (rankHandlesRoot) free(rankHandlesRoot);

  TRACE(NCCL_INIT, "DONE");
  return NULL;
}

ListenComm 是之前 rank 0 创建的监听 fd,bootstrapNetAccept 是从 listenComm 中获取一个新连接,使用新连接的 fd 创建 recvcomm。

1
static ncclResult_t bootstrapNetAccept (void* listenComm, void** recvComm)

然后通过 bootstrapNetRecv 读取 tmpComm 的数据,即其他 rank 发送来的 extInfo,然后保存其他 rank 的 extHandleListen 和 extHandleListenRoot,这个时候 rank 0 就获取到其他所有 rank 的 ip 和 port 了。

获取完所有 rank 的 info 之后开始建环,将节点 (r+1) % nranks 的 extHandleListen 发送给节点 r,就是说将节点 r 的 next 节点的 nethandle 发送给节点 r。这里可以看出,每个节点创建了两个 listen comm,其中 rank 0 使用 extHandleListenRoot 进行通信,其他节点之间通过 extHandleListen 进行通信

bootstrapInit -> get next rank info from root

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commState) {
  // ...

  // get info on my "next" rank in the bootstrap ring from root
  ncclNetHandle_t extHandleNext;
  NCCLCHECK(bootstrapNetAccept(extBstrapListenCommRoot, &tmpRecvComm));
  NCCLCHECK(bootstrapNetRecv(tmpRecvComm, &extHandleNext, sizeof(extHandleNext)));
  NCCLCHECK(bootstrapNetCloseRecv(tmpRecvComm));
  NCCLCHECK(bootstrapNetCloseListen(extBstrapListenCommRoot));

  // ...
  NCCLCHECK(bootstrapNetConnect(state->dev, &extHandleNext, &state->extBstrapRingSendComm));
  // Accept the connect request from the previous rank in the AllGather ring
  NCCLCHECK(bootstrapNetAccept(state->extBstrapListenComm, &state->extBstrapRingRecvComm));

  // AllGather all listen handlers
  NCCLCHECK(ncclCalloc(&state->peerBstrapHandles, nranks));
  memcpy(state->peerBstrapHandles+rank, info.extHandleListen, sizeof(ncclNetHandle_t));
  NCCLCHECK(bootstrapAllGather(state, state->peerBstrapHandles, sizeof(ncclNetHandle_t)));

}

接着所有 rank 都会在 extHandleListenRoot 上接收新连接创建 tmpRecvComm,然后接收到当前 rank 的 next 的 ip,port;然后连接 next 创建 bscomm 到 state->extBstrapRingSendComm,接收 prev 的连接创建 bscomm 到 state->extBstrapRingRecvComm,到现在 bootstrap 网络连接就完全建立起来了,如下图

最后 gather 所有 rank 的 ip port,首先将自己的 nethandle 放到 peerBstrapHandles 的对应位置,如下所示

bootstrapAllGather

然后执行 bootstrapAllGather

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
ncclResult_t bootstrapAllGather(void* commState, void* allData, int size) {
  struct extState* state = (struct extState*)commState;
  char* data = (char*)allData;
  int rank = state->rank;
  int nranks = state->nranks;

  TRACE(NCCL_INIT, "rank %d nranks %d size %d", rank, nranks, size);

  /* Simple ring based AllGather
   * At each step i receive data from (rank-i-1) from left
   * and send previous step's data from (rank-i) to right
   */
  for (int i=0; i<nranks-1; i++) {
    size_t rslice = (rank - i - 1 + nranks) % nranks;
    size_t sslice = (rank - i + nranks) % nranks;

    // Send slice to the right
    NCCLCHECK(bootstrapNetSend(state->extBstrapRingSendComm, data+sslice*size, size));
    // Recv slice from the left
    NCCLCHECK(bootstrapNetRecv(state->extBstrapRingRecvComm, data+rslice*size, size));
  }

  TRACE(NCCL_INIT, "rank %d nranks %d size %d - DONE", rank, nranks, size);
  return ncclSuccess;
}

第一步:

第二步:

到这里每个 rank 就都有了全局所有 rank 的 ip port。

最后总结一下,本节主要创建了 bootstrap 环形网络连接,并保存到 ncclComm 里。

initTransportsRank bootstrap 机器内拓扑分析

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* commId) {
  // We use 3 AllGathers
  // 1. { peerInfo, comm }
  // 2. ConnectTransport[nranks], ConnectValue[nranks]
  // 3. { nThreads, nrings, compCap, prev[MAXCHANNELS], next[MAXCHANNELS] }

  int rank = comm->rank;
  int nranks = comm->nRanks;
  uint64_t commHash = getHash(commId->internal, NCCL_UNIQUE_ID_BYTES);
  TRACE(NCCL_INIT, "comm %p, commHash %lx, rank %d nranks %d - BEGIN", comm, commHash, rank, nranks);
  NCCLCHECK(bootstrapInit(commId, rank, nranks, &comm->bootstrap));

  // AllGather1 - begin
  struct {
    struct ncclPeerInfo peerInfo;
    struct ncclComm* comm;
  } *allGather1Data;

  NCCLCHECK(ncclCalloc(&allGather1Data, nranks));
  allGather1Data[rank].comm = comm;
  struct ncclPeerInfo* myInfo = &allGather1Data[rank].peerInfo;
  NCCLCHECK(fillInfo(comm, myInfo, commHash));
  ...
}

创建 nrank 个 allGather1Data,然后通过 fillInfo 填充当前 rank 的 peerInfo,ncclPeerInfo 是 rank 的一些基本信息,比如 rank 号,在哪个机器的哪个进程等。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
struct ncclPeerInfo {
  int rank;
  int cudaDev;
  int gdrSupport;
  uint64_t hostHash;
  uint64_t pidHash;
  dev_t shmDev;
  int64_t busId;
};

static ncclResult_t fillInfo(struct ncclComm* comm, struct ncclPeerInfo* info, uint64_t commHash) {
  info->rank = comm->rank;
  CUDACHECK(cudaGetDevice(&info->cudaDev));
  info->hostHash=getHostHash()+commHash;
  info->pidHash=getPidHash()+commHash;

  // Get the device MAJOR:MINOR of /dev/shm so we can use that
  // information to decide whether we can use SHM for inter-process
  // communication in a container environment
  struct stat statbuf;
  SYSCHECK(stat("/dev/shm", &statbuf), "stat");
  info->shmDev = statbuf.st_dev;

  info->busId = comm->busId;

  NCCLCHECK(ncclGpuGdrSupport(&info->gdrSupport));
  return ncclSuccess;
}

initTransportsRank 建图过程

暂时省略

initTransportsRank 路径计算

暂时省略

1
2
3
4
5
6
  // Compute paths between GPUs and NICs
  NCCLCHECK(ncclTopoComputePaths(comm->topo, comm->peerInfo));
  // Remove inaccessible GPUs and unused NICs
  NCCLCHECK(ncclTopoTrimSystem(comm->topo, comm));
  // Recompute paths after trimming
  NCCLCHECK(ncclTopoComputePaths(comm->topo, comm->peerInfo));

InitTransportsRank channel 搜索

nccl 中 channel 的概念表示一个通信路径,为了更好的利用带宽和网卡,以及同一块数据可以通过多个 channel 并发通信,另外后续可以看到一个 channel 对应了一个 GPU SM,所以基于这些原因,nccl 会使用多 channel,搜索的过程就是搜索出来一组 channel。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
  // Init search
  NCCLCHECK(ncclTopoSearchInit(comm->topo));
  // Print final topology
  NCCLCHECK(ncclTopoPrint(comm->topo));

  // Get rings and trees
  struct ncclTopoGraph ringGraph;
  ringGraph.id = 0;
  ringGraph.pattern = NCCL_TOPO_PATTERN_RING;
  ringGraph.crossNic = ncclParamCrossNic();
  ringGraph.collNet = 0;
  ringGraph.minChannels = 1;
  ringGraph.maxChannels = MAXCHANNELS/2;
  NCCLCHECK(ncclTopoCompute(comm->topo, &ringGraph));
  NCCLCHECK(ncclTopoPrintGraph(comm->topo, &ringGraph));

  struct ncclTopoGraph treeGraph;
  treeGraph.id = 1;
  treeGraph.pattern = NCCL_TOPO_PATTERN_SPLIT_TREE;
  treeGraph.crossNic = ncclParamCrossNic();
  treeGraph.collNet = 0;
  treeGraph.minChannels = 1;
  treeGraph.maxChannels = ringGraph.nChannels;
  NCCLCHECK(ncclTopoCompute(comm->topo, &treeGraph));
  NCCLCHECK(ncclTopoPrintGraph(comm->topo, &treeGraph));

  struct ncclTopoGraph collNetGraph;
  collNetGraph.id = 2;
  collNetGraph.pattern = NCCL_TOPO_PATTERN_TREE;
  collNetGraph.collNet = 1;
  collNetGraph.crossNic = ncclParamCrossNic();
  collNetGraph.minChannels = collNetGraph.maxChannels = ringGraph.nChannels;
  NCCLCHECK(ncclTopoCompute(comm->topo, &collNetGraph));
  NCCLCHECK(ncclTopoPrintGraph(comm->topo, &collNetGraph));

  if (comm->rank == ncclParamGraphDumpFileRank()) {
    struct ncclTopoGraph* graphs[3] = { &ringGraph, &treeGraph, &collNetGraph };
    NCCLCHECK(ncclTopoDumpGraphs(comm->topo, 3, graphs));
  }

ncclTopoSearchInit

ncclTopoSearchInit 就是初始化 system->maxWidth,如果是单机单卡的情况,那么 maxWidth 设置为 LOC_WIDTH,否则就遍历每个 GPU 节点,查看到其他所有 GPU 节点或者网卡最大带宽。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
ncclResult_t ncclTopoSearchInit(struct ncclTopoSystem* system) {
  system->maxWidth = 0.0;
  int inter = system->nodes[NET].count;
  if (inter == 0 && system->nodes[GPU].count == 1) {
    system->maxWidth = LOC_WIDTH;
    return ncclSuccess;
  }
  for (int g=0; g<system->nodes[GPU].count; g++) {
    struct ncclTopoNode* gpu = system->nodes[GPU].nodes+g;
    system->maxWidth = std::max(system->maxWidth, getMaxWidth(system, gpu, inter ? NET : GPU));
  }
  return ncclSuccess;
}

Nccl 执行集合通信时支持 ring,tree 和 collnet 三种算法,这里我们以 ring 来举例,后续专门介绍 ring 和 tree

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
struct ncclTopoGraph {
  // Input / output
  Int id; // ring : 0, tree : 1, collnet : 2
  Int pattern;
  Int crossNic;
  Int collNet;
  Int minChannels;
  Int maxChannels;
  // Output
  Int nChannels;      // 搜索到的 channel 数量
  Float speedIntra;   // 节点内单个 channel 带宽
  Float speedInter;   // 节点间单个 channel 带宽
  Int typeIntra;      // 节点内 channel 的路径类型
  Int typeInter;      // 节点间 channel 的路径类型
  Int sameChannels;   // channel 是否一样
  Int nHops;
  Int intra[MAXCHANNELS*NCCL_TOPO_MAX_NODES];  // 节点内每个 channel 路径
  Int inter[MAXCHANNELS*2];                    // 节点间每个 channel 路径
};

NcclTopoGraph 记录了搜索到的结果,具体含义见注释。

ncclTopoCompute

然后看下 ncclTopoCompute,这里就是实际搜索 channel 的过程,目标是搜索出来尽可能多,带宽尽可能大的一系列 channel,本质就是暴力搜索,先设置一系列的条件搜答案,如果搜不出来则降低条件继续搜。

由于此时没有 NET 节点,所以 crossNic 为 0,然后初始化 graph,首先设置最高的条件,限制节点内部只能使用不超过 PATH_NVL 路径,节点间只能使用不超过 PATH_PIX 的路径,然后通过 system-maxWidth 设置 speedIntra 和 speedInter,接着执行 ncclTopoSearchRec 搜索出一个答案存储到 tmpGraph 中。

暂时省略。

initTransportsRank 机器间 channels 连接

上节中完成了单机内部的 channel 搜索,仍然以 ringGraph 为例的话,相当于在单台机器内部搜索出来了一系列的环,接下来需要将机器之间的环连接起来。

为了方便理解假设两机十六卡的情况下第一台机器的一个 ring 为:

1
2
graph->intra: GPU/0 GPU/7 GPU/6 GPU/3 GPU/2 GPU/5 GPU/4 GPU/1
graph->inter: NET/0 NET/0

第二个机器对应的 ring 为:

1
2
graph->intra: GPU/10 GPU/9 GPU/8 GPU/13 GPU/12 GPU/15 GPU/14 GPU/11
graph->inter: NET/0 NET/0

AllGather3 Data 用于 rank 间聚合 channel 的信息,ncclGraphInfo 记录了环的信息,比如 speed 和 type

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* commId) {
  // ...

  // AllGather3 - begin
  struct ncclGraphInfo {
    int sameChannels;
    float speedIntra;
    float speedInter;
    int typeIntra;
  };

  struct {
    int cudaCompCap;
    int fullCudaCompCap;
    int nChannels;
    struct ncclGraphInfo tree;
    struct ncclGraphInfo ring;
    struct ncclGraphInfo collNet;
    struct ncclTopoRanks topoRanks;
  } *allGather3Data;

  NCCLCHECK(ncclCalloc(&allGather3Data, nranks));
  allGather3Data[rank].cudaCompCap = ncclCudaCompCap();
  allGather3Data[rank].nChannels = comm->nChannels = treeGraph.nChannels = ringGraph.nChannels =
    std::min(treeGraph.nChannels, ringGraph.nChannels);
  allGather3Data[rank].tree.sameChannels = treeGraph.sameChannels;
  allGather3Data[rank].tree.speedIntra = treeGraph.speedIntra;
  allGather3Data[rank].tree.speedInter = treeGraph.speedInter;
  allGather3Data[rank].tree.typeIntra = treeGraph.typeIntra;
  allGather3Data[rank].ring.sameChannels = ringGraph.sameChannels;
  allGather3Data[rank].ring.speedIntra = ringGraph.speedIntra;
  allGather3Data[rank].ring.speedInter = ringGraph.speedInter;
  allGather3Data[rank].ring.typeIntra = ringGraph.typeIntra;
  
  // ...
}

ncclTopoPreset

然后开始设置 ncclTopoRanks,获取当前 rank 在 ring 中的 prev 和 next,其中第一个 rank 的 prev 和最后一个 rank 的 next 为-1,如 rank 6 的 prev 为 7,next 为 3;获取当前 ring 的 ringRecv 和 ringSend,即 ring 的第一个节点和最后一个节点,最后将搜索到的环复制了一遍,这里在官方 issue 中看到相关解释是为了进一步的并行以充分利用带宽。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
struct ncclTopoRanks {
  int ringRecv[MAXCHANNELS];
  int ringSend[MAXCHANNELS];
  int ringPrev[MAXCHANNELS];
  int ringNext[MAXCHANNELS];
  int treeUpRecv[MAXCHANNELS];
  int treeUpSend[MAXCHANNELS];
  int treeDnRecv[MAXCHANNELS];
  int treeDnSend[MAXCHANNELS];
};
 
ncclResult_t ncclTopoPreset(struct ncclComm* comm,
    struct ncclTopoGraph* treeGraph, struct ncclTopoGraph* ringGraph, struct ncclTopoGraph* collNetGraph,
    struct ncclTopoRanks* topoRanks) {
  int rank = comm->rank;
  int localRanks = comm->localRanks;
  int nChannels = comm->nChannels;
 
  for (int c=0; c<nChannels; c++) {
    struct ncclChannel* channel = comm->channels+c;
    channel->ring.prev = channel->ring.next = -1;
    ...
 
    int* ringIntra = ringGraph->intra+c*localRanks;
    int* treeIntra = treeGraph->intra+c*localRanks;
    int* collNetIntra = collNetGraph->intra+c*localRanks;
 
    for (int i=0; i<localRanks; i++) {
      if (ringIntra[i] == rank) {
        topoRanks->ringRecv[c] = ringIntra[0];
        topoRanks->ringSend[c] = ringIntra[localRanks-1];
        channel->ring.prev = (i == 0) ? -1 : ringIntra[i-1];
        channel->ring.next = (i == localRanks-1) ? -1 : ringIntra[i+1];
      }
      ...
    }
    topoRanks->ringPrev[c] = channel->ring.prev;
    topoRanks->ringNext[c] = channel->ring.next;
  }
  // Duplicate channels rings/trees
  struct ncclChannel* channel0 = comm->channels;
  struct ncclChannel* channel1 = channel0+nChannels;
  memcpy(channel1, channel0, nChannels*sizeof(struct ncclChannel));
  return ncclSuccess;
}

然后通过 bootstrapAllGather 获取全局的 allGather 3 Data 信息,计算出当前 rank 所在的 node 保存在 comm->node,以及每个 node 的第一个 rank 保存在 nodesFirstRank,因此例子中:

1
2
nodesFirstRank[0]: 0
nodesFirstRank[1]: 10

ncclTopoPostset

然后开始将每个机器的环首尾相连组成大环。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
ncclResult_t ncclTopoPostset(struct ncclComm* comm, int* firstRanks, struct ncclTopoRanks** allTopoRanks, int* rings) {
  // Gather data from all ranks
  int *ringRecv, *ringSend, *ringPrev, *ringNext, *treeUpRecv, *treeUpSend, *treeDnRecv,*treeDnSend;
  int nranks = comm->nRanks;
  int nChannels = comm->nChannels;
  NCCLCHECK(ncclCalloc(&ringRecv, nranks*MAXCHANNELS));
  NCCLCHECK(ncclCalloc(&ringSend, nranks*MAXCHANNELS));
  NCCLCHECK(ncclCalloc(&ringPrev, nranks*MAXCHANNELS));
  NCCLCHECK(ncclCalloc(&ringNext, nranks*MAXCHANNELS));
  NCCLCHECK(ncclCalloc(&treeUpRecv, nranks*MAXCHANNELS));
  NCCLCHECK(ncclCalloc(&treeUpSend, nranks*MAXCHANNELS));
  NCCLCHECK(ncclCalloc(&treeDnRecv, nranks*MAXCHANNELS));
  NCCLCHECK(ncclCalloc(&treeDnSend, nranks*MAXCHANNELS));
  for (int i=0; i<nranks; i++) {
    for (int c=0; c<nChannels;c++) {
      ringRecv[c*nranks+i] = allTopoRanks[i]->ringRecv[c];
      ringSend[c*nranks+i] = allTopoRanks[i]->ringSend[c];
      ringPrev[c*nranks+i] = allTopoRanks[i]->ringPrev[c];
      ringNext[c*nranks+i] = allTopoRanks[i]->ringNext[c];
      treeUpRecv[c*nranks+i] = allTopoRanks[i]->treeUpRecv[c];
      treeUpSend[c*nranks+i] = allTopoRanks[i]->treeUpSend[c];
      treeDnRecv[c*nranks+i] = allTopoRanks[i]->treeDnRecv[c];
      treeDnSend[c*nranks+i] = allTopoRanks[i]->treeDnSend[c];
    }
  }
 
  // Connect rings and trees. This should also duplicate the channels.
  NCCLCHECK(connectRings(comm, ringRecv, ringSend, ringPrev, ringNext, firstRanks));
  NCCLCHECK(connectTrees(comm, treeUpRecv, treeUpSend, treeDnRecv, treeDnSend, firstRanks));
 
  // Duplicate ringPrev/ringNext for ncclBuildRing
  memcpy(ringPrev+nChannels*nranks, ringPrev, nChannels*nranks*sizeof(int));
  memcpy(ringNext+nChannels*nranks, ringNext, nChannels*nranks*sizeof(int));
 
  // Duplication should be complete now
  nChannels = comm->nChannels = std::min(MAXCHANNELS,nChannels*2);
 
  // Honor NCCL_MIN_NRINGS/NCCL_MAX_NRINGS.
  // We permit combining max, then min, to only use the first channels, then duplicate them.
  nChannels = comm->nChannels = std::min((int)ncclMaxNchannels(), nChannels);
  int c;
  for (c=nChannels; c<ncclMinNchannels(); c++) {
    memcpy(ringPrev+c*nranks, ringPrev+(c-nChannels)*nranks, nranks*sizeof(int));
    memcpy(ringNext+c*nranks, ringNext+(c-nChannels)*nranks, nranks*sizeof(int));
    memcpy(comm->channels+c, comm->channels+c-nChannels, sizeof(struct ncclChannel));
  }
  nChannels = comm->nChannels = c;
 
  // Create rings array and check all is fine
  NCCLCHECK(ncclBuildRings(nChannels, rings, comm->rank, comm->nRanks, ringPrev, ringNext));
 
  free(ringRecv);
  free(ringSend);
  free(ringPrev);
  free(ringNext);
  free(treeUpRecv);
  free(treeUpSend);
  free(treeDnRecv);
  free(treeDnSend);
 
  return ncclSuccess;
}
connectRings

这里将所有 channel 的 prev,next,send,recv 信息打平到数组中,例如 recv[0]表示第一个 ring 中 rank0的 recv 是哪个 rank,然后开始计算当前机器第一个 rank 的 prev 和最后一个 rank 的 next。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static ncclResult_t connectRings(struct ncclComm* comm, int* ringRecv, int* ringSend, int* ringPrev, int* ringNext, int* firstRanks) {
  int nChannels = comm->nChannels;
  int nNodes = comm->nNodes;
  for (int c=0; c<nChannels; c++) {
    int* recv = ringRecv+c*comm->nRanks;
    int* send = ringSend+c*comm->nRanks;
    int* prev = ringPrev+c*comm->nRanks;
    int* next = ringNext+c*comm->nRanks;
    struct ncclChannel* channel0 = comm->channels+c;
    struct ncclChannel* channel1 = channel0+nChannels;
    for (int n=0; n<nNodes; n++) {
      int recvRank = recv[firstRanks[n]];
      int prevSendRank = send[firstRanks[(n-1+nNodes)%nNodes]];
      prev[recvRank] = prevSendRank;
      if (comm->rank == recvRank) {
        channel0->ring.prev = prevSendRank;
        channel1->ring.prev = prevSendRank;
      }
      int sendRank = send[firstRanks[n]];
      int nextRecvRank = recv[firstRanks[(n+1)%nNodes]];
      next[sendRank] = nextRecvRank;
      if (comm->rank == sendRank) {
        channel0->ring.next = nextRecvRank;
        channel1->ring.next = nextRecvRank;
      }
    }
    TRACE(NCCL_GRAPH, "Ring %d : %d -> %d -> %d", c, channel0->ring.prev, comm->rank, channel0->ring.next);
    TRACE(NCCL_GRAPH, "Ring %d : %d -> %d -> %d", c+nChannels, channel1->ring.prev, comm->rank, channel1->ring.next);
  }
  return ncclSuccess;
}

如上所示,当前机器 recv rank 的 prev 就是前一个机器的 send rank,当前机器 send rank 的 next 就是下一个机器的 recv rank

ncclBuildRings

然后执行 ncclBuildRings 按照大环的顺序依次记录 rank 到 rings。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
ncclResult_t ncclBuildRings(int nrings, int* rings, int rank, int nranks, int* prev, int* next) {
  for (int r=0; r<nrings; r++) {
    char prefix[30];
    /*sprintf(prefix, "[%d] Channel %d Prev : ", rank, r);
    dumpLine(prev+r*nranks, nranks, prefix);
    sprintf(prefix, "[%d] Channel %d Next : ", rank, r);
    dumpLine(next+r*nranks, nranks, prefix);*/

    int current = rank;
    for (int i=0; i<nranks; i++) {
      rings[r*nranks+i] = current;
      current = next[r*nranks+current];
    }
    sprintf(prefix, "Channel %02d/%02d : ", r, nrings);
    if (rank == 0) dumpLine(rings+r*nranks, nranks, prefix);
    if (current != rank) {
      WARN("Error : ring %d does not loop back to start (%d != %d)", r, current, rank);
      return ncclInternalError;
    }
    // Check that all ranks are there
    for (int i=0; i<nranks; i++) {
      int found = 0;
      for (int j=0; j<nranks; j++) {
        if (rings[r*nranks+j] == i) {
          found = 1;
          break;
        }
      }
      if (found == 0) {
        WARN("Error : ring %d does not contain rank %d", r, i);
        return ncclInternalError;
      }
    }
  }
  return ncclSuccess;
}

还是以上述为例,其中 rank 6 记录的 rings 的第一个大环为:

1
GPU/6 GPU/3 GPU/2 GPU/5 GPU/4 GPU/1 GPU/10 GPU/9 GPU/8 GPU/13 GPU/12 GPU/15 GPU/14 GPU/11 GPU/0 GPU/7

到这里就完成了机器之间大环建立,每个 rank 都知道自己的上一个和下一个 rank 是谁,那么就可以建立实际的通信链路了。

打印日志信息如下:

1
2
tensorflow-mnist-worker-0:88413:88816 [0] NCCL INFO Channel 00/02 :    0   7  14  12  13  10  11   9   8  15   6   4   5   2   3   1
tensorflow-mnist-worker-0:88413:88816 [0] NCCL INFO Channel 01/02 :    0   7  14  12  13  10  11   9   8  15   6   4   5   2   3   1

initTransportsRank 设置亲和性和日志

Log format:

1
IP: hostname:pid:tid [cudaDev] NCCL INFO Trees [channel ID] down0 rank/down1 rank/down2 rank -> current rank->up rank

比如下面的日志可以解读为

1
tensorflow-mnist-worker-0:88416:88814 [3] NCCL INFO Trees [0] 1/-1/-1->3->4|4->3->1/-1/-1 [1] 1/-1/-1->3->4|4->3->1/-1/-1

tensorflow-mnist-worker-0 中上的设备 3,其 rank 为 3,有两颗树,分别为 channel 0 和 channel 1:

  • Channel 0 上的子节点只有 1,父节点为 4
  • Channel 1 上的子节点只有 1,父节点为 4
1
2
3
tensorflow-mnist-worker-0:88415:88813 [2] NCCL INFO threadThresholds 8/8/64 | 128/8/64 | 8/8/64
tensorflow-mnist-worker-0:88415:88813 [2] NCCL INFO Trees [0] 5/-1/-1->2->0|0->2->5/-1/-1 [1] 5/-1/-1->2->0|0->2->5/-1/-1
tensorflow-mnist-worker-0:88415:88813 [2] NCCL INFO Setting affinity for GPU 2 to ff,ffff0000,00ffffff

接下来每个 rank 都要为通信分配一些内存,为了提高性能,这里会在分配 buffer 之前设置 cpu 亲和性,使得分配的内存尽量是当前 numa 本地的。

对应代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
  char line[1024];
  line[0]='\0';
  for (int c=0; c<comm->nChannels; c++) {
    struct ncclTree* treeUp = &comm->channels[c].treeUp;
    struct ncclTree* treeDn = &comm->channels[c].treeDn;
    snprintf(line+strlen(line), 1023-strlen(line), " [%d] %d/%d/%d->%d->%d|%d->%d->%d/%d/%d",
        c, treeUp->down[0], treeUp->down[1], treeUp->down[2], rank, treeUp->up,
        treeDn->up, rank, treeDn->down[0], treeDn->down[1], treeDn->down[2]);
  }
  line[1023] = '\0';
  INFO(NCCL_INIT, "Trees%s", line);

  // Set Affinity to a CPU local the our GPU, so that all memory we allocate
  // on the host is local.
  cpu_set_t affinitySave;
  sched_getaffinity(0, sizeof(cpu_set_t), &affinitySave);
  NCCLCHECK(ncclTopoSetAffinity(comm->topo, comm->rank));

完整的一个两机器十六卡的 channel 信息如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
tensorflow-mnist-worker-0:88416:88814 [3] NCCL INFO threadThresholds 8/8/64 | 128/8/64 | 8/8/64
tensorflow-mnist-worker-0:88416:88814 [3] NCCL INFO Trees [0] 1/-1/-1->3->4|4->3->1/-1/-1 [1] 1/-1/-1->3->4|4->3->1/-1/-1
tensorflow-mnist-worker-0:88419:88831 [6] NCCL INFO threadThresholds 8/8/64 | 128/8/64 | 8/8/64
tensorflow-mnist-worker-0:88419:88831 [6] NCCL INFO Trees [0] 7/-1/-1->6->-1|-1->6->7/-1/-1 [1] 7/-1/-1->6->15|15->6->7/-1/-1
tensorflow-mnist-worker-0:88420:88825 [7] NCCL INFO threadThresholds 8/8/64 | 128/8/64 | 8/8/64
tensorflow-mnist-worker-0:88420:88825 [7] NCCL INFO Trees [0] 0/14/-1->7->6|6->7->0/14/-1 [1] 0/-1/-1->7->6|6->7->0/-1/-1
tensorflow-mnist-worker-0:88420:88825 [7] NCCL INFO Setting affinity for GPU 7 to ffffff00,0000ffff,ff000000
tensorflow-mnist-worker-0:88418:88826 [5] NCCL INFO threadThresholds 8/8/64 | 128/8/64 | 8/8/64
tensorflow-mnist-worker-0:88418:88826 [5] NCCL INFO Trees [0] 4/-1/-1->5->2|2->5->4/-1/-1 [1] 4/-1/-1->5->2|2->5->4/-1/-1
tensorflow-mnist-worker-0:88418:88826 [5] NCCL INFO Setting affinity for GPU 5 to ffffff00,0000ffff,ff000000
tensorflow-mnist-worker-0:88414:88815 [1] NCCL INFO threadThresholds 8/8/64 | 128/8/64 | 8/8/64
tensorflow-mnist-worker-0:88414:88815 [1] NCCL INFO Trees [0] -1/-1/-1->1->3|3->1->-1/-1/-1 [1] -1/-1/-1->1->3|3->1->-1/-1/-1
tensorflow-mnist-worker-0:88415:88813 [2] NCCL INFO threadThresholds 8/8/64 | 128/8/64 | 8/8/64
tensorflow-mnist-worker-0:88415:88813 [2] NCCL INFO Trees [0] 5/-1/-1->2->0|0->2->5/-1/-1 [1] 5/-1/-1->2->0|0->2->5/-1/-1
tensorflow-mnist-worker-0:88415:88813 [2] NCCL INFO Setting affinity for GPU 2 to ff,ffff0000,00ffffff
tensorflow-mnist-worker-0:88413:88816 [0] NCCL INFO Channel 00/02 :    0   7  14  12  13  10  11   9   8  15   6   4   5   2   3   1
tensorflow-mnist-worker-0:88413:88816 [0] NCCL INFO Channel 01/02 :    0   7  14  12  13  10  11   9   8  15   6   4   5   2   3   1
tensorflow-mnist-worker-0:88413:88816 [0] NCCL INFO threadThresholds 8/8/64 | 128/8/64 | 8/8/64
tensorflow-mnist-worker-0:88413:88816 [0] NCCL INFO Trees [0] 2/-1/-1->0->7|7->0->2/-1/-1 [1] 2/-1/-1->0->7|7->0->2/-1/-1
tensorflow-mnist-worker-0:88413:88816 [0] NCCL INFO Setting affinity for GPU 0 to ff,ffff0000,00ffffff
tensorflow-mnist-worker-0:88417:88832 [4] NCCL INFO threadThresholds 8/8/64 | 128/8/64 | 8/8/64
tensorflow-mnist-worker-0:88417:88832 [4] NCCL INFO Trees [0] 3/-1/-1->4->5|5->4->3/-1/-1 [1] 3/-1/-1->4->5|5->4->3/-1/-1
tensorflow-mnist-worker-1:99975:100376 [3] NCCL INFO threadThresholds 8/8/64 | 128/8/64 | 8/8/64
tensorflow-mnist-worker-1:99975:100376 [3] NCCL INFO Trees [0] 9/-1/-1->11->12|12->11->9/-1/-1 [1] 9/-1/-1->11->12|12->11->9/-1/-1
tensorflow-mnist-worker-1:99973:100377 [1] NCCL INFO threadThresholds 8/8/64 | 128/8/64 | 8/8/64
tensorflow-mnist-worker-1:99973:100377 [1] NCCL INFO Trees [0] -1/-1/-1->9->11|11->9->-1/-1/-1 [1] -1/-1/-1->9->11|11->9->-1/-1/-1
tensorflow-mnist-worker-1:99979:100391 [7] NCCL INFO threadThresholds 8/8/64 | 128/8/64 | 8/8/64
tensorflow-mnist-worker-1:99979:100391 [7] NCCL INFO Trees [0] 8/-1/-1->15->14|14->15->8/-1/-1 [1] 8/6/-1->15->14|14->15->8/6/-1
tensorflow-mnist-worker-1:99979:100391 [7] NCCL INFO Setting affinity for GPU 7 to ffffff00,0000ffff,ff000000
tensorflow-mnist-worker-1:99976:100374 [4] NCCL INFO threadThresholds 8/8/64 | 128/8/64 | 8/8/64
tensorflow-mnist-worker-1:99976:100374 [4] NCCL INFO Trees [0] 11/-1/-1->12->13|13->12->11/-1/-1 [1] 11/-1/-1->12->13|13->12->11/-1/-1
tensorflow-mnist-worker-1:99974:100372 [2] NCCL INFO threadThresholds 8/8/64 | 128/8/64 | 8/8/64
tensorflow-mnist-worker-1:99974:100372 [2] NCCL INFO Trees [0] 13/-1/-1->10->8|8->10->13/-1/-1 [1] 13/-1/-1->10->8|8->10->13/-1/-1
tensorflow-mnist-worker-1:99974:100372 [2] NCCL INFO Setting affinity for GPU 2 to ff,ffff0000,00ffffff
tensorflow-mnist-worker-1:99977:100390 [5] NCCL INFO threadThresholds 8/8/64 | 128/8/64 | 8/8/64
tensorflow-mnist-worker-1:99977:100390 [5] NCCL INFO Trees [0] 12/-1/-1->13->10|10->13->12/-1/-1 [1] 12/-1/-1->13->10|10->13->12/-1/-1
tensorflow-mnist-worker-1:99977:100390 [5] NCCL INFO Setting affinity for GPU 5 to ffffff00,0000ffff,ff000000
tensorflow-mnist-worker-1:99978:100375 [6] NCCL INFO threadThresholds 8/8/64 | 128/8/64 | 8/8/64
tensorflow-mnist-worker-1:99978:100375 [6] NCCL INFO Trees [0] 15/-1/-1->14->7|7->14->15/-1/-1 [1] 15/-1/-1->14->-1|-1->14->15/-1/-1
tensorflow-mnist-worker-1:99972:100373 [0] NCCL INFO threadThresholds 8/8/64 | 128/8/64 | 8/8/64
tensorflow-mnist-worker-1:99972:100373 [0] NCCL INFO Trees [0] 10/-1/-1->8->15|15->8->10/-1/-1 [1] 10/-1/-1->8->15|15->8->10/-1/-1
tensorflow-mnist-worker-1:99972:100373 [0] NCCL INFO Setting affinity for GPU 0 to ff,ffff0000,00ffffff

ncclTransportP2pSetup 数据通信链路 transport 的建立

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* commId) {
  // ...

  // Connect with prev/next for each ring
  struct ncclConnect *connect;
  NCCLCHECKGOTO(ncclCalloc(&connect, 2), ret, affinity_restore);
  for (int c=0; c<comm->nChannels; c++) {
    struct ncclChannel* channel = comm->channels+c;
    NCCLCHECKGOTO(setupChannel(comm, c, rank, nranks, rings+c*nranks), ret, affinity_restore);
    if (comm->nRanks == 1) continue;
    NCCLCHECKGOTO(ncclTransportP2pSetup(comm, &ringGraph, channel, 1, &channel->ring.prev, 1, &channel->ring.next), ret, affinity_restore);
    NCCLCHECKGOTO(ncclTransportP2pSetup(comm, &treeGraph, channel, NCCL_MAX_TREE_ARITY, channel->treeUp.down, 1, &channel->treeUp.up), ret, affinity_restore);
    NCCLCHECKGOTO(ncclTransportP2pSetup(comm, &treeGraph, channel, 1, &channel->treeDn.up, NCCL_MAX_TREE_ARITY, channel->treeDn.down), ret, affinity_restore);
  }
  // ...
}
  1. 接收端执行 recv setup,创建 buffer 等,将相关信息记录到 connectIndo,启动一个监听 socket,ip port 同样记录到 connectInfo,通过 bootstrap 发送 connectInfo 到发送端。
  2. 发送端执行 send setup,创建 buffer 等,将相关信息记录到 connectInfo,然后发送给接收端。这一步 rdma 场景没有用到 connectInfo。
  3. 发送端接受到步骤 1 中接收端的信息,然后建立发送端到接收端的链接,p 2 p 场景的话只是简单记录对端 buffer,rdma 场景的话需要初始化 qp 到 INIT 状态。
  4. 接收端接受到步骤 2 中 send 发送的信息,然后建立接收端到发送端的链接,p 2 p 场景还是记录对端 buffer,rdma 场景需要初始化 qp 到 RTS 状态,将本端的 qp 信息发送回对端。
  5. 如果 rdma 场景的话,发送端还需接收对端的 qp 状态初始化本端的 qp 到 RTS 状态。

Channel log 格式

1
IP: hostname:pid:tid [cudaDev] NCCL INFO Channel [channel ID] current rank[bus ID] -> successor rank[bus ID] via `transport type`

比如下面的日志表示 tensorflow-mnist-worker-0 上的设备 6 (rank 为 15,busID 为 b2000 其 channel 0 连接到 rank 6,传输方式为 NET/IB/0/GDRDMA

1
tensorflow-mnist-worker-0:88419:88831 [6] NCCL INFO Channel 00 : 15[b2000] -> 6[b1000] [receive] via NET/IB/0/GDRDMA

ncclTransport

ncclConnector 的 ncclTransportComm 定义了一系列的通信相关的函数指针,用户可以自己实现这些接口,ncclTransport 定义了 send 和 recv 两个 ncclTransportComm,本节会介绍下 P2P 和 NET 两个 ncclTransport。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
struct ncclTransportComm {
  ncclResult_t (*setup)(struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo*, struct ncclPeerInfo*, struct ncclConnect*, struct ncclConnector*, int channelId);
  ncclResult_t (*connect)(struct ncclConnect*, int nranks, int rank, struct ncclConnector*);
  ncclResult_t (*free)(void*);
  ncclResult_t (*proxy)(struct ncclProxyArgs*);
};

struct ncclTransport {
  const char name[4];
  ncclResult_t (*canConnect)(int*, struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo*, struct ncclPeerInfo*);
  struct ncclTransportComm send;
  struct ncclTransportComm recv;
};

struct ncclTransport netTransport = {
  "NET",
  netCanConnect,
  { netSendSetup, netSendConnect, netSendFree, netSendProxy },
  { netRecvSetup, netRecvConnect, netRecvFree, netRecvProxy }
};

struct ncclTransport ncclTransports[NTRANSPORTS] = {
  p2pTransport,
  shmTransport,
  netTransport,
};

Nccl 现共有三个 transport:

  • P2P 通过卡间 P2P 通信
  • SHM 通过机器内共享的 host 内存通信
  • NET 通过网络通信

selectTransport

Nccl 会依次通过这三个 transport 的 canConnect 判断是否可用,然后选择第一个可用的,由于 rank 1 不在当前机器,因此只有 NET 的 recv 可用,设置 connector 的 transportComm 为 netTransport 的 recv。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
template <int type>
static ncclResult_t selectTransport(struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connect, struct ncclConnector* connector, int channelId) {
  for (int t=0; t<NTRANSPORTS; t++) {
    struct ncclTransport *transport = ncclTransports+t;
    struct ncclTransportComm* transportComm = type == 1 ? &transport->send : &transport->recv;
    int ret = 0;
    NCCLCHECK(transport->canConnect(&ret, topo, graph, myInfo, peerInfo));
    if (ret) {
      connector->transportComm = transportComm;
      NCCLCHECK(transportComm->setup(topo, graph, myInfo, peerInfo, connect, connector, channelId));
      return ncclSuccess;
    }
  }
  WARN("No transport found !");
  return ncclInternalError;
}

接收端 setup

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclChannel* channel, int nrecv, int* peerRecv, int nsend, int* peerSend) {

  struct ncclConnect connect;
  struct ncclConnector* conn;
  for (int i=0; i<nrecv; i++) {
    int peer = peerRecv[i];
    if (peer == -1 || peer >= comm->nRanks) continue;
    conn = &channel->peers[peer].recv;
    if (conn->connected) { ++nSkippedRecv; continue; }
    memset(&connect, 0, sizeof(connect));
    NCCLCHECK(selectTransport<0>(comm->topo, graph, comm->peerInfo+comm->rank, comm->peerInfo+peer, &connect, conn, channel->id));
    NCCLCHECK(bootstrapSend(comm->bootstrap, peer, &connect, sizeof(struct ncclConnect)));
  }
  // ...
}

根据 selectTransport 可以看到,这里 type = 0,表示是接收端,执行 netRecvSetup

netRecvSetup
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
ncclResult_t netRecvSetup(struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* recv, int channelId) {
  struct netRecvResources* resources;
  NCCLCHECK(ncclCalloc(&resources, 1));
  recv->transportResources = resources;

  NCCLCHECK(ncclTopoGetNetDev(topo, myInfo->rank, graph, channelId, &resources->netDev));
  NCCLCHECK(ncclTopoCheckGdr(topo, myInfo->busId, resources->netDev, 0, &resources->useGdr));

  NCCLCHECK(ncclCudaHostCalloc(&resources->sendMem, 1));
  NCCLCHECK(ncclCudaHostCalloc(&resources->recvMem, 1));

  recv->conn.direct |= resources->useGdr ? NCCL_DIRECT_NIC : 0;
  recv->conn.tail = &resources->recvMem->tail;
  recv->conn.head = &resources->sendMem->head;

  int protoLoc[NCCL_NUM_PROTOCOLS];
  for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) {
    protoLoc[p] = resources->useGdr ? LOC_DEVMEM : LOC_HOSTMEM;
  }

  int buffSizes[NCCL_NUM_PROTOCOLS];
  for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) {
    // Only allocate buffers for simple for p2p connections
    buffSizes[p] = graph == NULL && p != NCCL_PROTO_SIMPLE ? 0 : recv->comm->buffSizes[p];
    resources->buffSizes[protoLoc[p]] += buffSizes[p];
  }

  if (resources->buffSizes[LOC_DEVMEM]) {
    NCCLCHECK(ncclCudaCalloc(resources->buffers+LOC_DEVMEM, resources->buffSizes[LOC_DEVMEM]));
  }
  if (resources->buffSizes[LOC_HOSTMEM]) {
    NCCLCHECK(ncclCudaHostCalloc(resources->buffers+LOC_HOSTMEM, resources->buffSizes[LOC_HOSTMEM]));
  }

  int offsets[LOC_COUNT];
  offsets[LOC_HOSTMEM] = offsets[LOC_DEVMEM] = 0;
  for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) {
    resources->mhandlesProto[p] = resources->mhandles+protoLoc[p];
    recv->conn.buffs[p] = resources->buffers[protoLoc[p]] + offsets[protoLoc[p]];
    offsets[protoLoc[p]] += buffSizes[p];
  }

  INFO(NCCL_INIT|NCCL_NET,"Channel %02d : %d[%lx] -> %d[%lx] [receive] via NET/%s/%d%s", channelId, peerInfo->rank, peerInfo->busId, myInfo->rank, myInfo->busId, ncclNetName(), resources->netDev,
      resources->useGdr ? "/GDRDMA" : "");
  struct netConnectInfo* info = (struct netConnectInfo*) connectInfo;
  NCCLCHECK(ncclNetListen(resources->netDev, &info->netHandle, &resources->netListenComm));

  return ncclSuccess;
}

首先分配 netRecvResources 赋给 ncclConnector,主要字段含义见注释,其中 LOC_CONUT 为 2,表示有两个 buffer,如果支持 gdr,那么会使用第 LOC_DEVMEM(1)个 buffer,即显存,如果不支持 gdr,那么会使用第 LOC_HOSTMEM(0)个 buffer,即锁页内存;sendMem,recvMem 记录了 fifo 的 head 和 tail,用来协调生产者消费者,这个下节具体介绍,本节可忽略;用户执行的通信操作比如 ncclSend 一块数据,nccl 会将这块数据分成多个小块流水线发送,step 表示第几个小块,这个也在下节具体介绍。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
struct netRecvResources {
  void* netListenComm; // 建链使用的监听comm,如果是ib的话即ncclIbListenComm,保存了监听fd和使用了哪张网卡
  void* netRecvComm;   // 通信连接上下文信息,如果是ib的话即ncclIbRecvComm,保存了pd,cq等rdma连接信息
  struct ncclSendMem* sendMem;
  struct ncclRecvMem* recvMem;
  int netDev;  // 用的哪个网卡
  int useGdr;  // 是否支持gdr
  char* buffers[LOC_COUNT]; // buffer地址,三个协议连续存储
  int buffSizes[LOC_COUNT]; // buffer长度,三个协议的长度和
  void* mhandles[LOC_COUNT]; // rdma注册的mr
  void** mhandlesProto[NCCL_NUM_PROTOCOLS]; // 指向mhandles
  uint64_t step;
  uint64_t llLastCleaning;
};
ncclTopoGetNetDev

ncclTopoGetNetDev为当前rank的gpu选择网卡,我们在搜索channel的时候将环对应的网卡记录在了graph->inter里,所以这里通过inter就可以找到对应网卡 。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
ncclResult_t ncclTopoGetNetDev(struct ncclTopoSystem* system, int rank, struct ncclTopoGraph* graph, int channelId, int* dev) {
  if (graph) {
    // Honor the net device in the graph
    int channel = channelId%graph->nChannels;
    int ngpus = system->nodes[GPU].count;
    int index = graph->intra[channel*ngpus] == rank ? 0 : 1;
    *dev = graph->inter[channel*2+index];
  } else {
    int64_t id;
    NCCLCHECK(ncclTopoGetLocalNet(system, rank, &id, channelId));
    *dev = id;
  }
  return ncclSuccess;
}
ncclTopoCheckGdr

ncclTopoCheckGdr检查选择的网卡和当前rank的gpu是否支持gdr,具体逻辑在第五节中介绍过,这里不再赘述。然后为sendMem和recvMem分配锁页内存,设置head和tail;测试机器支持gdr,所以protoLoc均为LOC_DEVMEM,即显存,然后分配三个协议所需的buffer,三个协议的buffer连续存储,通过offset记录各自的起始地址,offset保存到conn。mhandles即rdma用的mr,mhandlesProtoc指向mhandles。

ncclIbListen

由于基于socket的建链方式需要通过socket交换发送端和接收端的信息,比如qp number,port,mtu,gid或者lid等,所以这里通过ncclIbListen创建了监听socket,过程类似bootstrap,fd写到listenComm,ip port写到handle,即connectInfo。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
ncclResult_t ncclIbListen(int dev, void* opaqueHandle, void** listenComm) {
  struct ncclIbListenComm* comm;
  NCCLCHECK(ncclCalloc(&comm, 1));
  struct ncclIbHandle* handle = (struct ncclIbHandle*) opaqueHandle;
  static_assert(sizeof(struct ncclIbHandle) < NCCL_NET_HANDLE_MAXSIZE, "ncclIbHandle size too large");
  comm->dev = dev;
  NCCLCHECK(GetSocketAddr(&(handle->connectAddr)));
  NCCLCHECK(createListenSocket(&comm->fd, &handle->connectAddr));
  *listenComm = comm;
  return ncclSuccess;
}
 
struct ncclIbListenComm {
  int dev;
  int fd;
};

到这里就 recv 就初始化完成了。

bootstrapSend

然后回到 ncclTransportP2pSetup,通过 bootstrapSend 将 connectInfo 发送到了 peer,即 rank 1,connectInfo 就是上述的 ip port。

1
2
3
4
5
6
7
8
9
ncclResult_t bootstrapSend(void* commState, int peer, void* data, int size) {
  struct extState* state = (struct extState*)commState;
  void* tmpSendComm;
  NCCLCHECK(bootstrapNetConnect(state->dev, state->peerBstrapHandles+peer, &tmpSendComm));
  NCCLCHECK(bootstrapNetSend(tmpSendComm, &state->rank, sizeof(int)));
  NCCLCHECK(bootstrapNetSend(tmpSendComm, data, size));
  NCCLCHECK(bootstrapNetCloseSend(tmpSendComm));
  return ncclSuccess;
}

发送端 setup

当 rank 1执行这个函数的时候,会遍历 nsend,此时 rank 1的 peer 就是 rank 10,然后执行 selectTransport,就会执行 netTransport 的 send 的 setup,即 netSendSetup,这个逻辑和 netRecvSetup 基本一致,主要还是分配各种 buffer,不再赘述。接着看下边逻辑。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclChannel* channel, int nrecv, int* peerRecv, int nsend, int* peerSend) {
  ...

  for (int i=0; i<nsend; i++) {
    int peer = peerSend[i];
    if (peer == -1 || peer >= comm->nRanks) continue;
    conn = &channel->peers[peer].send;
    if (conn->connected) { ++nSkippedSend; continue; }
    memset(&connect, 0, sizeof(connect));
    NCCLCHECK(selectTransport<1>(comm->topo, graph, comm->peerInfo+comm->rank, comm->peerInfo+peer, &connect, conn, channel->id));
    NCCLCHECK(bootstrapSend(comm->bootstrap, peer, &connect, sizeof(struct ncclConnect)));
  }
}
netSendSetup
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
ncclResult_t netSendSetup(struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* send, int channelId) {
  struct netSendResources* resources;
  NCCLCHECK(ncclCalloc(&resources, 1));
  send->transportResources = resources;

  NCCLCHECK(ncclTopoGetNetDev(topo, myInfo->rank, graph, channelId, &resources->netDev));
  NCCLCHECK(ncclTopoCheckGdr(topo, myInfo->busId, resources->netDev, 1, &resources->useGdr));

  NCCLCHECK(ncclCudaHostCalloc(&resources->sendMem, 1));
  NCCLCHECK(ncclCudaHostCalloc(&resources->recvMem, 1));

  send->conn.direct |= resources->useGdr ? NCCL_DIRECT_NIC : 0;
  send->conn.tail = &resources->recvMem->tail;
  send->conn.fifo = resources->recvMem->sizesFifo;
  send->conn.head = &resources->sendMem->head;
  for (int i=0; i<NCCL_STEPS; i++) send->conn.fifo[i] = -1;

  int protoLoc[NCCL_NUM_PROTOCOLS];
  for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) {
    protoLoc[p] = p != NCCL_PROTO_LL && resources->useGdr ? LOC_DEVMEM : LOC_HOSTMEM;
  }

  int buffSizes[NCCL_NUM_PROTOCOLS];
  for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) {
    // Only allocate buffers for simple for p2p connections
    buffSizes[p] = graph == NULL && p != NCCL_PROTO_SIMPLE ? 0 : send->comm->buffSizes[p];
    resources->buffSizes[protoLoc[p]] += buffSizes[p];
  }

  if (resources->buffSizes[LOC_DEVMEM]) {
    NCCLCHECK(ncclCudaCalloc(resources->buffers+LOC_DEVMEM, resources->buffSizes[LOC_DEVMEM]));
  }
  if (resources->buffSizes[LOC_HOSTMEM]) {
    NCCLCHECK(ncclCudaHostCalloc(resources->buffers+LOC_HOSTMEM, resources->buffSizes[LOC_HOSTMEM]));
  }

  int offsets[LOC_COUNT];
  offsets[LOC_HOSTMEM] = offsets[LOC_DEVMEM] = 0;
  for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) {
    resources->mhandlesProto[p] = resources->mhandles+protoLoc[p];
    send->conn.buffs[p] = resources->buffers[protoLoc[p]] + offsets[protoLoc[p]];
    offsets[protoLoc[p]] += buffSizes[p];
  }

  INFO(NCCL_INIT|NCCL_NET,"Channel %02d : %d[%lx] -> %d[%lx] [send] via NET/%s/%d%s", channelId, myInfo->rank, myInfo->busId, peerInfo->rank, peerInfo->busId, ncclNetName(), resources->netDev,
      resources->useGdr ? "/GDRDMA" : "");
  return ncclSuccess;
}
bootstrapRecv

然后 rank 1通过 bootstrapRecv 收到了 rank 10发送来的 ip 和 port

netSendConnect

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclChannel* channel, int nrecv, int* peerRecv, int nsend, int* peerSend) {
  ...
  for (int i=0; i<nsend; i++) {
    int peer = peerSend[i];
    if (peer == -1 || peer >= comm->nRanks) continue;
    conn = &channel->peers[peer].send;
    if (conn->connected) {++nSkippedSend; continue; }
    memset(&connect, 0, sizeof(connect));
    NCCLCHECK(bootstrapRecv(comm->bootstrap, peer, &connect, sizeof(struct ncclConnect)));
    NCCLCHECK(conn->transportComm->connect(&connect, 1, comm->rank, conn));
    conn->connected = 1;
    CUDACHECK(cudaMemcpy(&channel->devPeers[peer].send, conn, sizeof(struct ncclConnector), cudaMemcpyHostToDevice));
  }
  ...
}

然后执行 connect,即 netSendConnect

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
ncclResult_t netSendConnect(struct ncclConnect* connectInfo, int nranks, int rank, struct ncclConnector* send) {
  // Setup device pointers
  struct netSendResources* resources = (struct netSendResources*)send->transportResources;
  struct netConnectInfo* info = (struct netConnectInfo*)connectInfo;

  // Connect to remote peer
  NCCLCHECK(ncclNetConnect(resources->netDev, info->netHandle, &resources->netSendComm));

  if (resources->buffSizes[LOC_DEVMEM]) {
    NCCLCHECK(ncclNetRegMr(resources->netSendComm, resources->buffers[LOC_DEVMEM], resources->buffSizes[LOC_DEVMEM], NCCL_PTR_CUDA, &resources->mhandles[LOC_DEVMEM]));
  }
  if (resources->buffSizes[LOC_HOSTMEM]) {
    NCCLCHECK(ncclNetRegMr(resources->netSendComm, resources->buffers[LOC_HOSTMEM], resources->buffSizes[LOC_HOSTMEM], NCCL_PTR_HOST, &resources->mhandles[LOC_HOSTMEM]));
  }
  return ncclSuccess;
}

这里的 info 就是 rank 10的 ip,port,然后执行 ncclNetConnect,即 ncclIbConnect,这里主要就是创建 qp 并将相关信息通过 socket 发送到接收端。

ncclNetConnect

这里调用了 ncclNetConnect,实际上对于 IB 就是 ncclIbConnect

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
ncclResult_t ncclIbConnect(int dev, void* opaqueHandle, void** sendComm) {
  struct ncclIbSendComm* comm;
  NCCLCHECK(ncclIbMalloc((void**)&comm, sizeof(struct ncclIbSendComm)));

  struct ncclIbHandle* handle = (struct ncclIbHandle*) opaqueHandle;
  NCCLCHECK(connectAddress(&comm->fd, &handle->connectAddr));
  *sendComm = comm;

  // IB Setup
  ibv_context* ctx = ncclIbDevs[dev].context;
  NCCLCHECK(ncclIbInitVerbs(ctx, &comm->verbs));
  uint8_t ib_port = ncclIbDevs[dev].port;
  NCCLCHECK(ncclIbCreateQp(ib_port, &comm->verbs, IBV_ACCESS_REMOTE_WRITE, &comm->qp));

  // Send my QP Info to receiver through the socket. Hope this won't block.
  struct ibv_port_attr portAttr;
  NCCLCHECK(wrap_ibv_query_port(ctx, ib_port, &portAttr));
  struct ncclIbQpInfo qpInfo;
  qpInfo.ib_port = ib_port;
  qpInfo.qpn = comm->qp->qp_num;
  qpInfo.mtu = portAttr.active_mtu;

  // Prepare my fifo
  NCCLCHECK(wrap_ibv_reg_mr(&comm->fifoMr, comm->verbs.pd, comm->fifo, sizeof(struct ncclIbSendFifo)*MAX_REQUESTS, IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_REMOTE_READ));
  qpInfo.fifoRkey = comm->fifoMr->rkey;
  qpInfo.fifoAddr = (uint64_t)comm->fifo;

  // RoCE support
  qpInfo.lid = portAttr.lid;
  if (qpInfo.lid) { // IB
    INFO(NCCL_NET,"NET/IB: Dev %d Port %d qpn %d mtu %d LID %d", dev, ib_port, qpInfo.qpn, qpInfo.mtu, qpInfo.lid);
  } else { // RoCE
    union ibv_gid gid;
    NCCLCHECK(wrap_ibv_query_gid(ctx, ib_port, ncclParamIbGidIndex(), &gid));
    qpInfo.spn = gid.global.subnet_prefix;
    qpInfo.iid = gid.global.interface_id;
    INFO(NCCL_NET,"NET/IB: Dev %d Port %d qpn %d mtu %d GID %ld (%lX/%lX)", dev, ib_port, qpInfo.qpn, qpInfo.mtu, ncclParamIbGidIndex(), qpInfo.spn, qpInfo.iid);
  }

  NCCLCHECK(socketSend(comm->fd, &qpInfo, sizeof(qpInfo)));
  return ncclSuccess;
}

看下ncclIbConnect创建qp的过程,先看下下边两个会用到的api

1
2
3
4
5
ncclResult_t ncclIbInitVerbs(ibv_context* ctx, struct ncclIbVerbs* verbs) {
  NCCLCHECK(wrap_ibv_alloc_pd(&verbs->pd, ctx));
  NCCLCHECK(wrap_ibv_create_cq(&verbs->cq, ctx, MAX_REQUESTS, NULL, NULL, 0));
  return ncclSuccess;
}

ncclIbInitVerbs 创建 pd 和 cq,ncclIbVerbs 保存了 pd 和 cq

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
ncclResult_t ncclIbCreateQp(uint8_t ib_port, struct ncclIbVerbs* verbs, int access_flags, struct ibv_qp** qp) {
  struct ibv_qp_init_attr qpInitAttr;
  memset(&qpInitAttr, 0, sizeof(struct ibv_qp_init_attr));
  qpInitAttr.send_cq = verbs->cq;
  qpInitAttr.recv_cq = verbs->cq;
  qpInitAttr.qp_type = IBV_QPT_RC;
  // We might send 2 requests per send (RDMA_WRITE+RDMA_WRITE_WITH_IMM)
  qpInitAttr.cap.max_send_wr = 2*MAX_REQUESTS;
  qpInitAttr.cap.max_recv_wr = MAX_REQUESTS;
  qpInitAttr.cap.max_send_sge = 1;
  qpInitAttr.cap.max_recv_sge = 1;
  qpInitAttr.cap.max_inline_data = 0;
  NCCLCHECK(wrap_ibv_create_qp(qp, verbs->pd, &qpInitAttr));
  struct ibv_qp_attr qpAttr;
  memset(&qpAttr, 0, sizeof(struct ibv_qp_attr));
  qpAttr.qp_state = IBV_QPS_INIT;
  qpAttr.pkey_index = 0;
  qpAttr.port_num = ib_port;
  qpAttr.qp_access_flags = access_flags;
  NCCLCHECK(wrap_ibv_modify_qp(*qp, &qpAttr, IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS));
  return ncclSuccess;
}

NcclIbCreateQp 用于创建和初始化 qp,设置 send 和 recv 使用的完成队列,设置 qp_type 为 rc,设置 send 和 recv 的最大 wr 个数,以及每个 wr 里最多有多少个 sge,然后创建 qp,此时这个 qp 处于 RST 状态,还无法做任何事情;然后设置 qp_state 为 init,然后设置 port 和 access_flag 为 IBV_ACCESS_REMOTE_WRITE,表示 qp 可以接受远端的写,然后修改 qp 状态,此时 qp 就处于 INIT 状态了,此时 qp 可以下发 recv wr,但是接收到的消息不会被处理。

然后再来看 ncclIbConnect,ncclIbMalloc 分配的是页对齐的内存,包括后边可以看到 nccl 在注册内存的时候都进行了页对齐,但 ibv_reg_mr 并不要求内存为页对齐的。

QP 初始化好之后就准备通过 socket 交换发送端和接收端的信息,获取 port 相关信息,将 port,mtu,qpn 赋值给 qpInfo,然后判断使用的是 ib 还是 roce,roce 里 lid 为 0,只能用 gid 进行通信,而 ib 可以使用 lid 进行通信,最后通过 socket 将 qpInfo 发送到接收端,即 rank 10。

ncclNetRegMr

再回到 netSendConnect,需要将 setup 过程中分配的数据 buffer 进行注册,即 ncclIbRegMr,这里进行了页对齐,mr 写到了 resource 的 mhandle 里。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
ncclResult_t netSendConnect(struct ncclConnect* connectInfo, int nranks, int rank, struct ncclConnector* send) {
  ...
  if (resources->buffSizes[LOC_DEVMEM]) {
    NCCLCHECK(ncclNetRegMr(resources->netSendComm, resources->buffers[LOC_DEVMEM], resources->buffSizes[LOC_DEVMEM], NCCL_PTR_CUDA, &resources->mhandles[LOC_DEVMEM]));
  }
  if (resources->buffSizes[LOC_HOSTMEM]) {
    NCCLCHECK(ncclNetRegMr(resources->netSendComm, resources->buffers[LOC_HOSTMEM], resources->buffSizes[LOC_HOSTMEM], NCCL_PTR_HOST, &resources->mhandles[LOC_HOSTMEM]));
  }
  return ncclSuccess;
}
 
ncclResult_t ncclIbRegMr(void* comm, void* data, int size, int type, void** mhandle) {
  struct ncclIbVerbs* verbs = (struct ncclIbVerbs*)comm;
  uint64_t addr = (uint64_t)data;
  assert(size > 0); 
 
  // Deregister / register
  uint64_t regAddr = addr & (~(REG_ALIGN-1));
  uint64_t regSize = addr+size - regAddr;
  regSize = ((regSize + REG_ALIGN-1) / REG_ALIGN ) * REG_ALIGN;
  struct ibv_mr* mr; 
  NCCLCHECK(wrap_ibv_reg_mr(&mr, verbs->pd, (void*)regAddr, regSize, IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_REMOTE_
READ));
  *mhandle = (void*)mr;
  TRACE(NCCL_INIT,"regAddr %lx size %ld rkey %x", regAddr, regSize, mr->rkey);
  return ncclSuccess;
}

netRecvConnect

然后再回到 ncclTransportP2pSetup,rank 1 执行了 connect,将 qp 相关信息通过 socket 发送给了 rank 10,这时候 rank 10 接着执行下边的 connect,即 netRecvConnect。另外在 rdma 场景下这里通过 bootstrap 收到的 ncclConnect 没有用到。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, struct ncclChannel* channel, int nrecv, int* peerRecv, int nsend, int* peerSend) {
  ...
  for (int i=0; i<nrecv; i++) {
    int peer = peerRecv[i];
    if (peer == -1 || peer >= comm->nRanks) continue;
    conn = &channel->peers[peer].recv;
    if (conn->connected) {++nSkippedRecv; continue; }
    memset(&connect, 0, sizeof(connect));
    NCCLCHECK(bootstrapRecv(comm->bootstrap, peer, &connect, sizeof(struct ncclConnect)));
    NCCLCHECK(conn->transportComm->connect(&connect, 1, comm->rank, conn));
    conn->connected = 1;
    CUDACHECK(cudaMemcpy(&channel->devPeers[peer].recv, conn, sizeof(struct ncclConnector), cudaMemcpyHostToDevice));
  }
  TRACE(NCCL_INIT, "nsend %d nrecv %d nSkippedSend %u nSkippedRecv %u - DONE", nsend, nrecv, nSkippedSend, nSkippedRecv);
  return ncclSuccess;
}

进入到 netRecvConnect

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
ncclResult_t netRecvConnect(struct ncclConnect* connectInfo, int nranks, int rank, struct ncclConnector* recv) {
  // Setup device pointers
  struct netRecvResources* resources = (struct netRecvResources*)recv->transportResources;
 
  // Finish connection establishment from remote peer
  NCCLCHECK(ncclNetAccept(resources->netListenComm, &resources->netRecvComm));
  NCCLCHECK(ncclNetCloseListen(resources->netListenComm));
 
  if (resources->buffSizes[LOC_DEVMEM]) {
    NCCLCHECK(ncclNetRegMr(resources->netRecvComm, resources->buffers[LOC_DEVMEM], resources->buffSizes[LOC_DEVMEM], NCCL_PTR_CUDA, &resources->mhandles[LOC_DEVMEM]));
  }
  if (resources->buffSizes[LOC_HOSTMEM]) {
    NCCLCHECK(ncclNetRegMr(resources->netRecvComm, resources->buffers[LOC_HOSTMEM], resources->buffSizes[LOC_HOSTMEM], NCCL_PTR_HOST, &resources->mhandles[LOC_HOSTMEM]));
  }
  return ncclSuccess;
}
ncclNetAccept

Rank 10 会执行 ncclIbAccept,通过 socket 收到了 rank 1 的 qp 信息,然后通过 net dev 获取对应网卡的 context 和 port,和上述过程一样通过 ncclIbInitVerbs 创建 pd 和 cq,通过 ncclIbCreateQp 创建 qp,然后根据 rank 1调整 mtu

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
ncclResult_t ncclIbAccept(void* listenComm, void** recvComm) {
  struct ncclIbListenComm* lComm = (struct ncclIbListenComm*)listenComm;
  struct ncclIbRecvComm* rComm;
  NCCLCHECK(ncclIbMalloc((void**)&rComm, sizeof(struct ncclIbRecvComm)));
 
  struct sockaddr_in sockaddr;
  socklen_t socklen = sizeof(struct sockaddr_in);
  SYSCHECKVAL(accept(lComm->fd, (struct sockaddr*)&sockaddr, &socklen), "accept", rComm->fd);
  struct ncclIbQpInfo remQpInfo;
  NCCLCHECK(socketReceive(rComm->fd, &remQpInfo, sizeof(remQpInfo)));
 
  // IB setup
  ibv_context* ctx = ncclIbDevs[lComm->dev].context;
  uint8_t ib_port = ncclIbDevs[lComm->dev].port;
  struct ibv_port_attr portAttr;
  NCCLCHECK(wrap_ibv_query_port(ctx, ib_port, &portAttr));
  union ibv_gid gid;
  NCCLCHECK(wrap_ibv_query_gid(ctx, ib_port, ncclParamIbGidIndex(), &gid));
 
  // QP Creation
  NCCLCHECK(ncclIbInitVerbs(ctx, &rComm->verbs));
  NCCLCHECK(ncclIbCreateQp(ib_port, &rComm->verbs, IBV_ACCESS_REMOTE_WRITE, &rComm->qp));
 
  // Adjust the MTU
  remQpInfo.mtu = (enum ibv_mtu)std::min(remQpInfo.mtu, portAttr.active_mtu);
 
  // Setup QP
  struct ibv_qp* qp = rComm->qp;
  NCCLCHECK(ncclIbRtrQp(qp, &remQpInfo));
  NCCLCHECK(ncclIbRtsQp(qp));
  ...
}
ncclIbRtrQp

然后执行 ncclIbRtrQp,将 qp 从 INIT 状态转到 RTR 状态,设置 mtu,对端的 qpn,gid 和 port 等信息,这个时候 qp 可以下发 recv 消息并正常 接收了

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
ncclResult_t ncclIbRtrQp(ibv_qp* qp, struct ncclIbQpInfo* info) {
  struct ibv_qp_attr qpAttr;
  memset(&qpAttr, 0, sizeof(struct ibv_qp_attr));
  qpAttr.qp_state = IBV_QPS_RTR;
  qpAttr.path_mtu = info->mtu;
  qpAttr.dest_qp_num = info->qpn;
  qpAttr.rq_psn = 0;
  qpAttr.max_dest_rd_atomic = 1;
  qpAttr.min_rnr_timer = 12;
  if (info->lid == 0) {
    qpAttr.ah_attr.is_global = 1;
    qpAttr.ah_attr.grh.dgid.global.subnet_prefix = info->spn;
    qpAttr.ah_attr.grh.dgid.global.interface_id = info->iid;
    qpAttr.ah_attr.grh.flow_label = 0;
    qpAttr.ah_attr.grh.sgid_index = ncclParamIbGidIndex();
    qpAttr.ah_attr.grh.hop_limit = 255;
    qpAttr.ah_attr.grh.traffic_class = ncclParamIbTc();
  } else {
    qpAttr.ah_attr.is_global = 0;
    qpAttr.ah_attr.dlid = info->lid;
  }
  qpAttr.ah_attr.sl = ncclParamIbSl();
  qpAttr.ah_attr.src_path_bits = 0;
  qpAttr.ah_attr.port_num = info->ib_port;
  NCCLCHECK(wrap_ibv_modify_qp(qp, &qpAttr, IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER));
  return ncclSuccess;
}
ncclIbRtsQp

然后执行,此时 qp 从状态 RTR 转为状态 RTS,此时 qp 可以下发 send 消息正常发送了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
ncclResult_t ncclIbRtsQp(ibv_qp* qp) {
  struct ibv_qp_attr qpAttr;
  memset(&qpAttr, 0, sizeof(struct ibv_qp_attr));
  qpAttr.qp_state = IBV_QPS_RTS;
  qpAttr.timeout = ncclParamIbTimeout();
  qpAttr.retry_cnt = ncclParamIbRetryCnt();
  qpAttr.rnr_retry = 7;
  qpAttr.sq_psn = 0;
  qpAttr.max_rd_atomic = 1;
  NCCLCHECK(wrap_ibv_modify_qp(qp, &qpAttr, IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC));
  return ncclSuccess;
}
Transfer fifo info

然后继续看 ncclIbAccept,这里 fifo 也是用来控制发送过程的,后边介绍数据通信再写。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
ncclResult_t ncclIbAccept(void* listenComm, void** recvComm) {
  ...
  // Retain remote fifo info and prepare my RDMA ops
  rComm->remFifo.rkey = remQpInfo.fifoRkey;
  rComm->remFifo.addr = remQpInfo.fifoAddr;
  NCCLCHECK(wrap_ibv_reg_mr(&rComm->remFifo.mr, rComm->verbs.pd, &rComm->remFifo.elems, sizeof(struct ncclIbSendFifo)*MAX_REQUESTS, IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_READ));
  rComm->remFifo.sge.length = sizeof(struct ncclIbSendFifo);
  rComm->remFifo.sge.lkey = rComm->remFifo.mr->lkey;
 
#if USE_RDMA_SEND_INLINE
  // Determine whether the remFifo element data can be sent INLINE
  struct ibv_qp_attr attr;
  struct ibv_qp_init_attr init_attr;
  NCCLCHECK(wrap_ibv_query_qp(qp, &attr, IBV_QP_CAP, &init_attr));
  if (init_attr.cap.max_inline_data >= rComm->remFifo.sge.length) rComm->remFifo.flags = IBV_SEND_INLINE;
#endif
 
  // Allocate Flush dummy buffer for GPU Direct RDMA
  rComm->gpuFlush.enabled = (ncclIbGdrSupport(lComm->dev) == 0) && (ncclParamIbGdrFlushDisable() == 0) ? 1 : 0;
  if (rComm->gpuFlush.enabled) {
    NCCLCHECK(wrap_ibv_reg_mr(&rComm->gpuFlush.hostMr, rComm->verbs.pd, &rComm->gpuFlush.hostMem, sizeof(int), IBV_ACCESS_LOCAL_WRITE));
    rComm->gpuFlush.sge.addr = (uint64_t)&rComm->gpuFlush.hostMem;
    rComm->gpuFlush.sge.length = 1;
    rComm->gpuFlush.sge.lkey = rComm->gpuFlush.hostMr->lkey;
    NCCLCHECK(ncclIbCreateQp(ib_port, &rComm->verbs, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ, &rComm->gpuFlush.qp));
    struct ncclIbQpInfo localQpInfo = {
      .lid=portAttr.lid,
      .ib_port=ib_port,
      .qpn=rComm->gpuFlush.qp->qp_num,
      .spn=gid.global.subnet_prefix,
      .iid=gid.global.interface_id,
      .mtu=portAttr.active_mtu
    };
    NCCLCHECK(ncclIbRtrQp(rComm->gpuFlush.qp, &localQpInfo));
    NCCLCHECK(ncclIbRtsQp(rComm->gpuFlush.qp));
  }
 
  // Fill Handle
  struct ncclIbQpInfo qpInfo = {
    .lid=portAttr.lid,
    .ib_port=ib_port,
    .qpn=qp->qp_num,
    .spn=gid.global.subnet_prefix,
    .iid=gid.global.interface_id,
    .mtu=remQpInfo.mtu
  };
 
  NCCLCHECK(socketSend(rComm->fd, &qpInfo, sizeof(qpInfo)));
  *recvComm = rComm;
  return ncclSuccess;
}

GpuFlush 也对应一个 qp,不过这个 qp 是 local 的,即他的对端 qp 就是自己,当开启 gdr 之后,每次接收数据后都需要执行一下 flush,其实是一个 rdma read 操作,使用网卡读一下接收到的数据的第一个 int 到 hostMem。官方 issue 里解释说当通过 gdr 接收数据完成,产生 wc 到 cpu 的时候,接收的数据并不一定在 gpu 端可以读到,这个时候需要在 cpu 端执行以下读取。

1
2
3
4
5
6
7
struct ncclIbGpuFlush {
  int enabled;
  int hostMem;
  struct ibv_mr* hostMr;
  struct ibv_sge sge;
  struct ibv_qp* qp; 
};

最后将 rank 10 的 port,qpn,gid 等通过 socket 发送回 rank 1,到这里 ncclTransportP 2 pSetup 就执行完成了,但是此时 rdma 还没有完成建立连接,因为 rank 1 还没有拿到 rank 10 的信息,qp 还处于 INIT 状态。Rank 1 直到开始发送数据的时候才会去检查是否完成最后一步建链,如果还没有建链那么执行 ncclSendCheck,过程和上述一致,不再赘述。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
ncclResult_t ncclSendCheck(struct ncclIbSendComm* comm) {
  struct ncclIbQpInfo remQpInfo;
  struct ibv_qp* qp = comm->qp;
 
  // Do not block on this receive, return if not ready.
  int bytes = 0;
  NCCLCHECK(socketProgress(NCCL_SOCKET_RECV, comm->fd, &remQpInfo, sizeof(remQpInfo), &bytes));
  if (bytes == 0) return ncclSuccess; // Try again later
  NCCLCHECK(socketWait(NCCL_SOCKET_RECV, comm->fd, &remQpInfo, sizeof(remQpInfo), &bytes));
 
  NCCLCHECK(ncclIbRtrQp(qp, &remQpInfo));
  NCCLCHECK(ncclIbRtsQp(qp));
  comm->ready = 1;
 
  // Block until this is done. It *should* not block indefinitely.
  NCCLCHECK(socketSend(comm->fd, &comm->ready, sizeof(int)));
 
  return ncclSuccess;
}

到这里 rank 1 和 rank 10 的 rdma 链接就建立完成了,然后我们再看下 rank 10 和 rank 9的 p2p 链接。

ncclNetRegMr

跟上面一样

节点内 P2P 建连

上面主要讲了跨界点通过 IB 实现建连,这部分主要讲解本节点内建立连接,原理基本一致,只是从 IB 换成了 P2P。我们以 rank 10和 rank 9的 p2p 连接为例。

p2p 场景 rank 之间交换的 connectInfo 如下所示

1
2
3
4
5
6
7
8
struct p2pConnectInfo {
  int direct;   // 是否为同进程
  int read;     // 是否支持p2p read
  union {
    void* directPtr;   // 同进程使用这个字段记录当前rank的数据buffer
    cudaIpcMemHandle_t devIpc;  // 不同进程的话使用共享显存通信,devIpc记录当前rank的ipc handle
  };  
};

仍然按照刚刚的顺序,rank 9先执行 recv 的 setup,首先分配 resource,数据通信 buffer 会保存在 ncclRecvMem 的 buff 字段。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
struct p2pRecvResources {
  struct ncclRecvMem* devMem;
  void* ipcPtr;
};
 
struct ncclRecvMem {
  union {
    struct {
      uint64_t tail;
      char pad1[CACHE_LINE_SIZE-sizeof(uint64_t)];
      char pad2[CACHE_LINE_SIZE-sizeof(uint64_t)];
      int sizesFifo[NCCL_STEPS];
    };  
    char pad4[MEM_ALIGN];
  };  
  char buff[1]; // Actually larger than that
};

然后判断 useRead,如果两个 rank 之间的路径类型小于 p2pLevel(默认是 PATH_SYS),那么 useP2P 为 1,如果路径类型为 PATH_NVL 并且为安培架构,那么 useRead 为 1,ncclRecvMem 使用柔性数组存储 buffer,还是只关注 NCCL_PROTO_SIMPLE,如果 read 为 1 那么不需要分配 buffer,由于当前场景为单进程,所以记录 direct 为 1,devMem 记录到 direcPtr,然后通过 cudaDeviceEnablePeerAccess 开启卡间 p2p 访问。

p2pRecvSetup
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
ncclResult_t p2pRecvSetup(struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo,
    struct ncclConnect* connectInfo, struct ncclConnector * recv, int channelId) {
 
  struct p2pRecvResources* resources;
  NCCLCHECK(ncclCalloc(&resources, 1));
  recv->transportResources = resources;
  int useRead = p2pUseRead(topo, myInfo, peerInfo);
  int recvSize = offsetof(struct ncclRecvMem, buff);
  // For P2P Read the SIMPLE buffer is tagged on the end of the ncclSendMem structure
  for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) if (!(useRead && p == NCCL_PROTO_SIMPLE)) recvSize += recv->comm->buffSizes[p];
  ALIGN_SIZE(recvSize, CUDA_IPC_MIN);
  NCCLCHECK(ncclCudaCalloc((char**)&resources->devMem, recvSize));
 
  struct p2pConnectInfo info;
  info.read = useRead;
  if (myInfo->pidHash == peerInfo->pidHash) {
    info.direct = 1;
    info.directPtr = resources->devMem;
    if (myInfo->cudaDev == peerInfo->cudaDev) {
      TRACE(NCCL_INIT|NCCL_P2P,"%d <- %d via P2P/common device", myInfo->rank, peerInfo->rank);
    } else {
      // Enable P2P access
      cudaError_t err = cudaDeviceEnablePeerAccess(peerInfo->cudaDev, 0);
      if (err == cudaErrorPeerAccessAlreadyEnabled) {
        cudaGetLastError();
      } else if (err != cudaSuccess) {
        WARN("failed to peer with device %d(=%lx): %d %s",
             peerInfo->cudaDev, peerInfo->busId, err, cudaGetErrorString(err));
        return ncclInternalError;
      }
      TRACE(NCCL_INIT|NCCL_P2P,"Channel %02d : %d[%lx] <- %d[%lx] via P2P/direct pointer", channelId, myInfo->rank, myInfo->busId, peerInfo->rank, peerInfo->busId);
    }
  } else {
    ...
  }
  static_assert(sizeof(struct p2pConnectInfo) <= sizeof(struct ncclConnect), "p2p Connect Info is too big");
  memcpy(connectInfo, &info, sizeof(struct p2pConnectInfo));
  return ncclSuccess;
}
p2pSendSetup

 接下来 rank 10会执行 send 的 setup,大体逻辑一致,从这里我们可以看出 useRead 的作用,如果 useRead 为1,那么 buffer 放在 send rank,如果为0,则放在 recv rank。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
ncclResult_t p2pSendSetup(struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo,
    struct ncclConnect* connectInfo, struct ncclConnector* send, int channelId) {
 
  struct p2pSendResources* resources;
  NCCLCHECK(ncclCalloc(&resources, 1));
  send->transportResources = resources;
  int useRead = p2pUseRead(topo, myInfo, peerInfo);
  int sendSize = sizeof(struct ncclSendMem);
  // For P2P Read the SIMPLE buffer is tagged on the end of the ncclSendMem structure
  if (useRead) sendSize += send->comm->buffSizes[NCCL_PROTO_SIMPLE];
  ALIGN_SIZE(sendSize, CUDA_IPC_MIN);
  NCCLCHECK(ncclCudaCalloc((char**)&resources->devMem, sendSize));
 
  struct p2pConnectInfo info;
  info.read = useRead;
  const char* useReadStr = info.read ? "/read" : "";
  if (myInfo->pidHash == peerInfo->pidHash) {
    info.direct = 1;
    info.directPtr = resources->devMem;
    if (myInfo->cudaDev == peerInfo->cudaDev) {
      INFO(NCCL_INIT|NCCL_P2P,"Channel %02d : %d[%d] -> %d[%d] via P2P/common device%s",
          channelId, myInfo->rank, myInfo->cudaDev, peerInfo->rank, peerInfo->cudaDev, useReadStr);
      return ncclInternalError;
    } else {
      // Enable P2P access
      cudaError_t err = cudaDeviceEnablePeerAccess(peerInfo->cudaDev, 0);
      if (err == cudaErrorPeerAccessAlreadyEnabled) {
        cudaGetLastError();
      } else if (err != cudaSuccess) {
        WARN("failed to peer with device %d(=%lx): %d %s",
             peerInfo->cudaDev, peerInfo->busId, err, cudaGetErrorString(err));
        return ncclInternalError;
      }
      INFO(NCCL_INIT|NCCL_P2P,"Channel %02d : %d[%lx] -> %d[%lx] via P2P/direct pointer%s",
          channelId, myInfo->rank, myInfo->busId, peerInfo->rank, peerInfo->busId, useReadStr);
    }
  } else {
    ...
  }
  static_assert(sizeof(struct p2pConnectInfo) <= sizeof(struct ncclConnect), "p2p Connect Info is too big");
  memcpy(connectInfo, &info, sizeof(struct p2pConnectInfo));
  return ncclSuccess;
}
p2pSendConnect

然后 rank 10 执行 send connect 过程,Info 为 rank 9 的信息,remDevMem 就是刚刚 rank 9 分配的显存,如果 read 为 0,则需要设置 conn 的 direct,接下来设置 conn 的 buff,如果 read 为 1,buff 为当前卡,否则设置为 rank 9 的显存,接下来设置的 head,tail 用来协调发送端和接收端,下节详细介绍。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static ncclResult_t p2pSendConnect(struct ncclConnect* connectInfo, int nranks, int rank, struct ncclConnector* send) {
  struct p2pSendResources* resources = (struct p2pSendResources*)send->transportResources;
  struct ncclRecvMem* remDevMem;
  struct p2pConnectInfo* info = (struct p2pConnectInfo*)connectInfo;
  if (info->direct) {
    remDevMem = (struct ncclRecvMem*)(info->directPtr);
    if (info->read == 0) send->conn.direct |= NCCL_DIRECT_GPU;
  } else {
    ...
  }
 
  int offset = 0;
  for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) {
    if (info->read && p == NCCL_PROTO_SIMPLE) {
      /* For P2P Read the SIMPLE buffer is local (ncclSendMem) */
      send->conn.buffs[p] = resources->devMem->buff;
    } else {
      send->conn.buffs[p] = remDevMem->buff + offset;
      offset += send->comm->buffSizes[p];
    }
  }
  send->conn.tail = &remDevMem->tail;
  send->conn.head = &resources->devMem->head;
  send->conn.ptrExchange = &resources->devMem->ptrExchange;
  return ncclSuccess;
}
p2pRecvConnect

对于 recv connect 逻辑基本一致

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
ncclResult_t p2pRecvConnect(struct ncclConnect* connectInfo, int nranks, int rank, struct ncclConnector* recv) {
  struct p2pRecvResources* resources = (struct p2pRecvResources*)recv->transportResources;
  struct ncclSendMem* remDevMem;
  struct p2pConnectInfo* info = (struct p2pConnectInfo*)connectInfo;
  if (info->direct) {
    remDevMem = (struct ncclSendMem*)(info->directPtr);
    if (info->read == 0) {
      recv->conn.direct |= NCCL_DIRECT_GPU;
      recv->conn.ptrExchange = &remDevMem->ptrExchange;
    }
  } else {
    //TRACE_DUMP_IPC(&info->devIpc);
    cudaError_t err = cudaIpcOpenMemHandle(&resources->ipcPtr, info->devIpc, cudaIpcMemLazyEnablePeerAccess);
    remDevMem = (struct ncclSendMem*)resources->ipcPtr;
    if (err != cudaSuccess) {
      WARN("failed to open CUDA IPC handle : %d %s",
          err, cudaGetErrorString(err));
      return ncclUnhandledCudaError;
    }
  }
 
  int offset = 0;
  for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) {
    if (info->read && p == NCCL_PROTO_SIMPLE) {
      /* For P2P Read the SIMPLE buffer is remote (ncclSendMem) */
      recv->conn.buffs[p] = remDevMem->buff;
    } else {
      recv->conn.buffs[p] = resources->devMem->buff + offset;
      offset += recv->comm->buffSizes[p];
    }
  }
  recv->conn.tail = &resources->devMem->tail;
  recv->conn.head = &remDevMem->head;
  return ncclSuccess;
}

建连日志

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
tensorflow-mnist-worker-0:88419:88831 [6] NCCL INFO Channel 00 : 15[b2000] -> 6[b1000] [receive] via NET/IB/0/GDRDMA
tensorflow-mnist-worker-1:99978:100375 [6] NCCL INFO Channel 00 : 7[b2000] -> 14[b1000] [receive] via NET/IB/0/GDRDMA
tensorflow-mnist-worker-0:88419:88831 [6] NCCL INFO Channel 00 : 6[b1000] -> 4[88000] via P2P/IPC
tensorflow-mnist-worker-0:88420:88825 [7] NCCL INFO Channel 00 : 7[b2000] -> 14[b1000] [send] via NET/IB/0/GDRDMA
tensorflow-mnist-worker-0:88417:88832 [4] NCCL INFO Channel 00 : 4[88000] -> 5[89000] via P2P/IPC
tensorflow-mnist-worker-0:88416:88814 [3] NCCL INFO Channel 00 : 3[3e000] -> 1[1b000] via P2P/IPC
tensorflow-mnist-worker-0:88414:88815 [1] NCCL INFO Channel 00 : 1[1b000] -> 0[1a000] via P2P/IPC
tensorflow-mnist-worker-0:88418:88826 [5] NCCL INFO Channel 00 : 5[89000] -> 2[3d000] via P2P/IPC
tensorflow-mnist-worker-0:88415:88813 [2] NCCL INFO Channel 00 : 2[3d000] -> 3[3e000] via P2P/IPC
tensorflow-mnist-worker-0:88413:88816 [0] NCCL INFO Channel 00 : 0[1a000] -> 7[b2000] via P2P/IPC
tensorflow-mnist-worker-1:99972:100373 [0] NCCL INFO Channel 00 : 8[1a000] -> 15[b2000] via P2P/IPC
tensorflow-mnist-worker-1:99979:100391 [7] NCCL INFO Channel 00 : 15[b2000] -> 6[b1000] [send] via NET/IB/0/GDRDMA
tensorflow-mnist-worker-1:99975:100376 [3] NCCL INFO Channel 00 : 11[3e000] -> 9[1b000] via P2P/IPC
tensorflow-mnist-worker-1:99973:100377 [1] NCCL INFO Channel 00 : 9[1b000] -> 8[1a000] via P2P/IPC
tensorflow-mnist-worker-1:99976:100374 [4] NCCL INFO Channel 00 : 12[88000] -> 13[89000] via P2P/IPC
tensorflow-mnist-worker-1:99974:100372 [2] NCCL INFO Channel 00 : 10[3d000] -> 11[3e000] via P2P/IPC
tensorflow-mnist-worker-1:99977:100390 [5] NCCL INFO Channel 00 : 13[89000] -> 10[3d000] via P2P/IPC
tensorflow-mnist-worker-1:99978:100375 [6] NCCL INFO Channel 00 : 14[b1000] -> 12[88000] via P2P/IPC
tensorflow-mnist-worker-0:88414:88815 [1] NCCL INFO Channel 00 : 1[1b000] -> 3[3e000] via P2P/IPC
tensorflow-mnist-worker-0:88415:88813 [2] NCCL INFO Channel 00 : 2[3d000] -> 0[1a000] via P2P/IPC
tensorflow-mnist-worker-1:99974:100372 [2] NCCL INFO Channel 00 : 10[3d000] -> 8[1a000] via P2P/IPC
tensorflow-mnist-worker-1:99973:100377 [1] NCCL INFO Channel 00 : 9[1b000] -> 11[3e000] via P2P/IPC
tensorflow-mnist-worker-0:88416:88814 [3] NCCL INFO Channel 00 : 3[3e000] -> 4[88000] via P2P/IPC
tensorflow-mnist-worker-0:88418:88826 [5] NCCL INFO Channel 00 : 5[89000] -> 4[88000] via P2P/IPC
tensorflow-mnist-worker-1:99975:100376 [3] NCCL INFO Channel 00 : 11[3e000] -> 12[88000] via P2P/IPC
tensorflow-mnist-worker-1:99977:100390 [5] NCCL INFO Channel 00 : 13[89000] -> 12[88000] via P2P/IPC
tensorflow-mnist-worker-0:88420:88825 [7] NCCL INFO Channel 00 : 14[b1000] -> 7[b2000] [receive] via NET/IB/0/GDRDMA
tensorflow-mnist-worker-0:88414:88815 [1] NCCL INFO Channel 01 : 1[1b000] -> 0[1a000] via P2P/IPC
tensorflow-mnist-worker-0:88415:88813 [2] NCCL INFO Channel 00 : 2[3d000] -> 5[89000] via P2P/IPC
tensorflow-mnist-worker-0:88413:88816 [0] NCCL INFO Channel 00 : 0[1a000] -> 2[3d000] via P2P/IPC
tensorflow-mnist-worker-0:88420:88825 [7] NCCL INFO Channel 00 : 7[b2000] -> 6[b1000] via P2P/IPC
tensorflow-mnist-worker-1:99979:100391 [7] NCCL INFO Channel 00 : 15[b2000] -> 14[b1000] via P2P/IPC
tensorflow-mnist-worker-0:88417:88832 [4] NCCL INFO Channel 00 : 4[88000] -> 3[3e000] via P2P/IPC
tensorflow-mnist-worker-1:99972:100373 [0] NCCL INFO Channel 00 : 8[1a000] -> 10[3d000] via P2P/IPC
tensorflow-mnist-worker-1:99974:100372 [2] NCCL INFO Channel 00 : 10[3d000] -> 13[89000] via P2P/IPC
tensorflow-mnist-worker-1:99973:100377 [1] NCCL INFO Channel 01 : 9[1b000] -> 8[1a000] via P2P/IPC
tensorflow-mnist-worker-1:99978:100375 [6] NCCL INFO Channel 00 : 14[b1000] -> 7[b2000] [send] via NET/IB/0/GDRDMA
tensorflow-mnist-worker-1:99976:100374 [4] NCCL INFO Channel 00 : 12[88000] -> 11[3e000] via P2P/IPC
tensorflow-mnist-worker-0:88419:88831 [6] NCCL INFO Channel 00 : 6[b1000] -> 7[b2000] via P2P/IPC
tensorflow-mnist-worker-0:88418:88826 [5] NCCL INFO Channel 01 : 5[89000] -> 2[3d000] via P2P/IPC
tensorflow-mnist-worker-0:88415:88813 [2] NCCL INFO Channel 01 : 2[3d000] -> 3[3e000] via P2P/IPC
tensorflow-mnist-worker-0:88416:88814 [3] NCCL INFO Channel 01 : 3[3e000] -> 1[1b000] via P2P/IPC
tensorflow-mnist-worker-0:88417:88832 [4] NCCL INFO Channel 01 : 4[88000] -> 5[89000] via P2P/IPC
tensorflow-mnist-worker-1:99979:100391 [7] NCCL INFO Channel 00 : 15[b2000] -> 8[1a000] via P2P/IPC
tensorflow-mnist-worker-0:88415:88813 [2] NCCL INFO Channel 01 : 2[3d000] -> 0[1a000] via P2P/IPC
tensorflow-mnist-worker-0:88416:88814 [3] NCCL INFO Channel 01 : 3[3e000] -> 4[88000] via P2P/IPC
tensorflow-mnist-worker-0:88418:88826 [5] NCCL INFO Channel 01 : 5[89000] -> 4[88000] via P2P/IPC
tensorflow-mnist-worker-1:99977:100390 [5] NCCL INFO Channel 01 : 13[89000] -> 10[3d000] via P2P/IPC
tensorflow-mnist-worker-1:99974:100372 [2] NCCL INFO Channel 01 : 10[3d000] -> 11[3e000] via P2P/IPC
tensorflow-mnist-worker-1:99975:100376 [3] NCCL INFO Channel 01 : 11[3e000] -> 9[1b000] via P2P/IPC
tensorflow-mnist-worker-1:99976:100374 [4] NCCL INFO Channel 01 : 12[88000] -> 13[89000] via P2P/IPC
tensorflow-mnist-worker-0:88420:88825 [7] NCCL INFO Channel 00 : 7[b2000] -> 0[1a000] via P2P/IPC
tensorflow-mnist-worker-0:88419:88831 [6] NCCL INFO Channel 01 : 15[b2000] -> 6[b1000] [receive] via NET/IB/0/GDRDMA
tensorflow-mnist-worker-1:99978:100375 [6] NCCL INFO Channel 00 : 14[b1000] -> 15[b2000] via P2P/IPC
tensorflow-mnist-worker-1:99972:100373 [0] NCCL INFO Channel 01 : 8[1a000] -> 15[b2000] via P2P/IPC
tensorflow-mnist-worker-1:99974:100372 [2] NCCL INFO Channel 01 : 10[3d000] -> 8[1a000] via P2P/IPC
tensorflow-mnist-worker-0:88419:88831 [6] NCCL INFO Channel 01 : 6[b1000] -> 4[88000] via P2P/IPC
tensorflow-mnist-worker-0:88413:88816 [0] NCCL INFO Channel 01 : 0[1a000] -> 7[b2000] via P2P/IPC
tensorflow-mnist-worker-1:99973:100377 [1] NCCL INFO Channel 01 : 9[1b000] -> 11[3e000] via P2P/IPC
tensorflow-mnist-worker-1:99978:100375 [6] NCCL INFO Channel 01 : 7[b2000] -> 14[b1000] [receive] via NET/IB/0/GDRDMA
tensorflow-mnist-worker-1:99975:100376 [3] NCCL INFO Channel 01 : 11[3e000] -> 12[88000] via P2P/IPC
tensorflow-mnist-worker-1:99977:100390 [5] NCCL INFO Channel 01 : 13[89000] -> 12[88000] via P2P/IPC
tensorflow-mnist-worker-0:88420:88825 [7] NCCL INFO Channel 01 : 7[b2000] -> 14[b1000] [send] via NET/IB/0/GDRDMA
tensorflow-mnist-worker-0:88414:88815 [1] NCCL INFO Channel 01 : 1[1b000] -> 3[3e000] via P2P/IPC
tensorflow-mnist-worker-1:99978:100375 [6] NCCL INFO Channel 01 : 14[b1000] -> 12[88000] via P2P/IPC
tensorflow-mnist-worker-1:99979:100391 [7] NCCL INFO Channel 01 : 15[b2000] -> 6[b1000] [send] via NET/IB/0/GDRDMA
tensorflow-mnist-worker-1:99973:100377 [1] NCCL INFO 2 coll channels, 2 p2p channels, 1 p2p channels per peer
tensorflow-mnist-worker-1:99973:100377 [1] NCCL INFO comm 0x7fd270fdb9d0 rank 9 nranks 16 cudaDev 1 busId 1b000 - Init COMPLETE
tensorflow-mnist-worker-0:88414:88815 [1] NCCL INFO 2 coll channels, 2 p2p channels, 1 p2p channels per peer
tensorflow-mnist-worker-0:88414:88815 [1] NCCL INFO comm 0x7f9f08ff9400 rank 1 nranks 16 cudaDev 1 busId 1b000 - Init COMPLETE
tensorflow-mnist-worker-0:88417:88832 [4] NCCL INFO Channel 01 : 4[88000] -> 3[3e000] via P2P/IPC
tensorflow-mnist-worker-0:88413:88816 [0] NCCL INFO Channel 01 : 0[1a000] -> 2[3d000] via P2P/IPC
tensorflow-mnist-worker-0:88415:88813 [2] NCCL INFO Channel 01 : 2[3d000] -> 5[89000] via P2P/IPC
tensorflow-mnist-worker-0:88420:88825 [7] NCCL INFO Channel 01 : 7[b2000] -> 6[b1000] via P2P/IPC
tensorflow-mnist-worker-1:99976:100374 [4] NCCL INFO Channel 01 : 12[88000] -> 11[3e000] via P2P/IPC
tensorflow-mnist-worker-0:88419:88831 [6] NCCL INFO Channel 01 : 6[b1000] -> 15[b2000] [send] via NET/IB/0/GDRDMA
tensorflow-mnist-worker-1:99974:100372 [2] NCCL INFO Channel 01 : 10[3d000] -> 13[89000] via P2P/IPC
tensorflow-mnist-worker-1:99972:100373 [0] NCCL INFO Channel 01 : 8[1a000] -> 10[3d000] via P2P/IPC
tensorflow-mnist-worker-0:88420:88825 [7] NCCL INFO Channel 01 : 7[b2000] -> 0[1a000] via P2P/IPC
tensorflow-mnist-worker-0:88416:88814 [3] NCCL INFO 2 coll channels, 2 p2p channels, 1 p2p channels per peer
tensorflow-mnist-worker-0:88416:88814 [3] NCCL INFO comm 0x7f1eacf4fb40 rank 3 nranks 16 cudaDev 3 busId 3e000 - Init COMPLETE
tensorflow-mnist-worker-1:99979:100391 [7] NCCL INFO Channel 01 : 6[b1000] -> 15[b2000] [receive] via NET/IB/0/GDRDMA
tensorflow-mnist-worker-1:99979:100391 [7] NCCL INFO Channel 01 : 15[b2000] -> 14[b1000] via P2P/IPC
tensorflow-mnist-worker-0:88417:88832 [4] NCCL INFO 2 coll channels, 2 p2p channels, 1 p2p channels per peer
tensorflow-mnist-worker-0:88417:88832 [4] NCCL INFO comm 0x7fd6d4f37790 rank 4 nranks 16 cudaDev 4 busId 88000 - Init COMPLETE
tensorflow-mnist-worker-0:88415:88813 [2] NCCL INFO 2 coll channels, 2 p2p channels, 1 p2p channels per peer
tensorflow-mnist-worker-0:88415:88813 [2] NCCL INFO comm 0x7f64e0fa34d0 rank 2 nranks 16 cudaDev 2 busId 3d000 - Init COMPLETE
tensorflow-mnist-worker-0:88418:88826 [5] NCCL INFO 2 coll channels, 2 p2p channels, 1 p2p channels per peer
tensorflow-mnist-worker-0:88418:88826 [5] NCCL INFO comm 0x7f9478f65bb0 rank 5 nranks 16 cudaDev 5 busId 89000 - Init COMPLETE
tensorflow-mnist-worker-1:99978:100375 [6] NCCL INFO Channel 01 : 14[b1000] -> 15[b2000] via P2P/IPC
tensorflow-mnist-worker-0:88413:88816 [0] NCCL INFO 2 coll channels, 2 p2p channels, 1 p2p channels per peer
tensorflow-mnist-worker-0:88413:88816 [0] NCCL INFO comm 0x7fc185152310 rank 0 nranks 16 cudaDev 0 busId 1a000 - Init COMPLETE
tensorflow-mnist-worker-0:88419:88831 [6] NCCL INFO Channel 01 : 6[b1000] -> 7[b2000] via P2P/IPC
tensorflow-mnist-worker-1:99976:100374 [4] NCCL INFO 2 coll channels, 2 p2p channels, 1 p2p channels per peer
tensorflow-mnist-worker-1:99976:100374 [4] NCCL INFO comm 0x7f9118f7b680 rank 12 nranks 16 cudaDev 4 busId 88000 - Init COMPLETE
tensorflow-mnist-worker-1:99974:100372 [2] NCCL INFO 2 coll channels, 2 p2p channels, 1 p2p channels per peer
tensorflow-mnist-worker-1:99974:100372 [2] NCCL INFO comm 0x7f3eecf79b80 rank 10 nranks 16 cudaDev 2 busId 3d000 - Init COMPLETE
tensorflow-mnist-worker-0:88419:88831 [6] NCCL INFO 2 coll channels, 2 p2p channels, 1 p2p channels per peer
tensorflow-mnist-worker-0:88419:88831 [6] NCCL INFO comm 0x7f3248f70b00 rank 6 nranks 16 cudaDev 6 busId b1000 - Init COMPLETE
tensorflow-mnist-worker-0:88420:88825 [7] NCCL INFO 2 coll channels, 2 p2p channels, 1 p2p channels per peer
tensorflow-mnist-worker-0:88420:88825 [7] NCCL INFO comm 0x7f921cf686c0 rank 7 nranks 16 cudaDev 7 busId b2000 - Init COMPLETE
tensorflow-mnist-worker-1:99979:100391 [7] NCCL INFO Channel 01 : 15[b2000] -> 8[1a000] via P2P/IPC
tensorflow-mnist-worker-1:99977:100390 [5] NCCL INFO 2 coll channels, 2 p2p channels, 1 p2p channels per peer
tensorflow-mnist-worker-1:99977:100390 [5] NCCL INFO comm 0x7f64b4f5a550 rank 13 nranks 16 cudaDev 5 busId 89000 - Init COMPLETE
tensorflow-mnist-worker-1:99975:100376 [3] NCCL INFO 2 coll channels, 2 p2p channels, 1 p2p channels per peer
tensorflow-mnist-worker-1:99975:100376 [3] NCCL INFO comm 0x7f5330f7a820 rank 11 nranks 16 cudaDev 3 busId 3e000 - Init COMPLETE
tensorflow-mnist-worker-0:88413:88816 [0] NCCL INFO Launch mode Parallel
tensorflow-mnist-worker-1:99978:100375 [6] NCCL INFO 2 coll channels, 2 p2p channels, 1 p2p channels per peer
tensorflow-mnist-worker-1:99978:100375 [6] NCCL INFO comm 0x7fc270f72f20 rank 14 nranks 16 cudaDev 6 busId b1000 - Init COMPLETE
tensorflow-mnist-worker-1:99979:100391 [7] NCCL INFO 2 coll channels, 2 p2p channels, 1 p2p channels per peer
tensorflow-mnist-worker-1:99979:100391 [7] NCCL INFO comm 0x7f44c0fba8e0 rank 15 nranks 16 cudaDev 7 busId b2000 - Init COMPLETE
tensorflow-mnist-worker-1:99972:100373 [0] NCCL INFO 2 coll channels, 2 p2p channels, 1 p2p channels per peer
tensorflow-mnist-worker-1:99972:100373 [0] NCCL INFO comm 0x7f6a7d024fe0 rank 8 nranks 16 cudaDev 0 busId 1a000 - Init COMPLETE

ncclAllReduce

在完成设备的 Communicator 初始化后,就可以调用集合通信的相关原语。在这里我们以 Allreduce 为例,分析集合通信原语的实现逻辑。

1
2
3
4
5
6
7
8
9
NCCL_API(ncclResult_t, ncclAllReduce, const void* sendbuff, void* recvbuff, size_t count,
    ncclDataType_t datatype, ncclRedOp_t op, ncclComm* comm, cudaStream_t stream);
ncclResult_t ncclAllReduce(const void* sendbuff, void* recvbuff, size_t count,
    ncclDataType_t datatype, ncclRedOp_t op, ncclComm* comm, cudaStream_t stream) {
  struct ncclInfo info = { ncclCollAllReduce, "AllReduce",
    sendbuff, recvbuff, count, datatype, op, 0, comm, stream, /* Args */
    ALLREDUCE_CHUNKSTEPS, ALLREDUCE_SLICESTEPS };
  return ncclEnqueueCheck(&info);
}

可以看到,调用 ncclAllReduce 时,所有的变量被存到一个 ncclInfo  结构体中,然后通过 ncclEnqueueCheck  将这个结构体插入到队列中。

ncclEnqueueCheck

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
ncclResult_t ncclEnqueueCheck(struct ncclInfo* info) {

    //...

    NCCLCHECK(PtrCheck(info->comm, info->opName, "comm"));
    NCCLCHECK(ArgsCheck(info));
    NCCLCHECK(checkSetStream(info));

    INFO(NCCL_COLL,"%s: opCount %lx sendbuff %p recvbuff %p count %zi datatype %d op %d root %d comm %p [nranks=%d] stream %p",
        info->opName, info->comm->opCount, info->sendbuff, info->recvbuff, info->count,
        info->datatype, info->op, info->root, info->comm, info->comm->nRanks, info->stream);

    NCCLCHECK(ncclSaveKernel(info));
    NCCLCHECK(ncclBarrierEnqueue(info->comm));
    NCCLCHECK(ncclBarrierEnqueueWait(info->comm));
    NCCLCHECK(ncclEnqueueEvents(info->comm));
    return ncclSuccess;

    //...
}

ncclPrimitives

ncclPrimitives  这个类实现了各类通信原语。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// Implementation of primitive types
template <int UNROLL, int SLICESPERCHUNK, int SLICESTEPS, typename T, int NRECV, int NSEND, int DIRECT, class FUNC>
class ncclPrimitives {
 private:
  const int tid;
  const int nthreads;
  const int wid;
  const int stepSize;
  int nrecv = 0;
  int nsend = 0;
  struct ncclConnInfo* recvConn = NULL;
  volatile uint64_t* recvConnHeadPtr = NULL;
  uint64_t recvConnHead;
  volatile uint64_t* recvConnTailPtr = NULL;
  uint64_t recvConnTail;
  uint64_t recvConnTailCache; // Cache last seen value

  struct ncclConnInfo* sendConn = NULL;
  volatile int* sendConnFifoPtr = NULL;
  volatile uint64_t* sendConnTailPtr = NULL;
  uint64_t sendConnTail;
  volatile uint64_t* sendConnHeadPtr = NULL;
  uint64_t sendConnHead;
  uint64_t sendConnHeadCache; // Cache last seen value

  uint64_t recvStep[NRECV];
  uint64_t sendStep[NSEND];
  const T* recvDirectBuff[NRECV];
  T* sendDirectBuff[NSEND];
  const T* recvBuff[NRECV];
  T* sendBuff[NSEND];
  struct ncclDevComm* comm;

  //...

 public:
  __device__ __forceinline__
  ncclPrimitives(const int tid, const int nthreads, int* recvPeers, int* sendPeers, T* directBuff, int stepSize, struct ncclChannel* channel, struct ncclDevComm* comm)
    : comm(comm), tid(tid), nthreads(nthreads), wid(tid%WARP_SIZE), stepSize(stepSize) {
    // Make sure step is updated before we read it.
    barrier();

    for (int i=0; i<NRECV && recvPeers[i] >= 0; i++) loadRecvConn(&channel->devPeers[recvPeers[i]].recv.conn, i, directBuff);
    for (int i=0; i<NSEND && sendPeers[i] >= 0; i++) loadSendConn(&channel->devPeers[sendPeers[i]].send.conn, i);
    loadRecvSync();
    loadSendSync();
  }

  //...

  __device__ __forceinline__ ~ncclPrimitives() {
    // Save steps for the next operation
    saveRecvSync();
    saveSendSync();
  }
};

GenericOp

比较巧妙的设计是,通过设计一个 GenericOp,改变其 6 个 input 的值就能使用相同的代码实现多种不同的功能。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
  template <int DIRECTRECV, int DIRECTSEND, int RECV, int SEND, int SRC, int DST>
  inline __device__ void
  GenericOp(const T* srcPtr, T* dstPtr, int nelem, ssize_t directOffset) {
    int offset = 0;
    int sliceSize = stepSize*SLICESTEPS;
    int dataSize = max(DIVUP(nelem, 16*SLICESPERCHUNK)*16, sliceSize/32);

    const T* srcs[RECV*NRECV+SRC];
    srcs[0] = SRC ? srcPtr : directRecvPtr<DIRECTRECV>(0, directOffset);
    if (RECV) {
      if (SRC) srcs[1] = recvPtr(0);
      for (int i=1; i<NRECV && i<nrecv; i++) srcs[SRC+i] = recvPtr(i);
    }

    T* dsts[SEND*NSEND+DST];
    dsts[0] = DST ? dstPtr : directSendPtr<DIRECTSEND>(0, directOffset);
    if (SEND) {
      if (DST) dsts[1] = directSendPtr<DIRECTSEND>(0, directOffset);
      for (int i=1; i<NSEND && i<nsend; i++) dsts[DST+i] = directSendPtr<DIRECTSEND>(i, directOffset);
    }

    bool syncThread = tid >= nthreads;

    #pragma unroll
    for (int slice=0; slice<SLICESPERCHUNK; ++slice) {
      int realSize = max(0, min(dataSize, nelem-offset));
      if (!syncThread) {
        if (SEND) waitSend(realSize*sizeof(T));
        if (RECV) waitRecv();
        if (realSize > 0) {
          subBarrier();
          if (DIRECTRECV && recvDirectBuff[0]) {
            // We can only have one direct receive. Since srcs[0] == dstPtr+offset, skip one copy
            if (SEND) {
              ReduceOrCopyMulti<UNROLL, FUNC, T, 1, 1, 1, NSEND>(tid, nthreads, 1, srcs, nsend, dsts+1, realSize);
            }
          } else {
            ReduceOrCopyMulti<UNROLL, FUNC, T, RECV+SRC, RECV*NRECV+SRC, SEND+DST, SEND*NSEND+DST>(tid, nthreads, RECV*nrecv+SRC, srcs, SEND*nsend+DST, dsts, realSize);
          }
        }
      }
      barrier();
      FOR_SEND(incSend);
      FOR_RECV(incRecv);
      if (syncThread) {
        if (SEND) {
          if (realSize > 0 && wid == 0) __threadfence_system();
          __syncwarp();
          postSend();
        }
        if (RECV) postRecv();
      }
      srcs[0] += SRC ? realSize : directRecvInc<DIRECTRECV>(0, realSize, sliceSize);
      for (int i=1-SRC; i<RECV*NRECV; i++) srcs[SRC+i] += sliceSize;
      dsts[0] += DST ? realSize : directSendInc<DIRECTSEND>(0, realSize, sliceSize);
      for (int i=1-DST; i<SEND*NSEND; i++) dsts[DST+i] += directSendInc<DIRECTSEND>(i, realSize, sliceSize);
      offset += realSize;
    }
  }

在 GenericOp 中,调用 send 和 recv 之类的操作时,会修改ncclConnInfo  这个结构体中的void* *ptrsFifo;  以及int *sizesFifo;  这两个数据结构,这两个结构中存储着要传输的数据地址以及对应的数据大小。在 netTransport 的 proxy 相关的函数中,会检查这个队列,进行传输。

比如在 ncclPrimitives 中有如下方法,send 的模板参数为

  • DIRECTRECV 为 0
  • DIRECTSEND 为 0
  • RECV 为 0
  • SEND 为 1
  • SRC 为 1
  • DST 为 0
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
  __device__ __forceinline__ void
  send(const T* src, int nelem) {
    GenericOp<0, 0, 0, 1, 1, 0>(src, NULL, nelem, 0);
  }
  __device__ __forceinline__ void
  directSend(const T* src, ssize_t directOffset, int nelem) {
    GenericOp<0, 1, 0, 1, 1, 0>(src, NULL, nelem, directOffset);
  }

  __device__ __forceinline__ void
  recv(T* dst, int nelem) {
    GenericOp<0, 0, 1, 0, 0, 1>(NULL, dst, nelem, 0);
  }
  __device__ __forceinline__ void
  directRecv(T* dst, ssize_t directOffset, int nelem) {
    GenericOp<1, 0, 1, 0, 0, 1>(NULL, dst, nelem, directOffset);
  }

proxy 线程

数据传输过程其实是 GPU 上的 NCCL kernel 跟 CPU 上的 Proxy 线程协同完成的。

Proxy 线程中与数据传输有关的三个函数是

  • ncclNetIrecv:Proxy 从网络收到数据

  • ncclNetIsend:Proxy 发送数据到网络

  • ncclNetIflush: Proxy 将数据从 CPU 传到 GPU

  • 数据怎样从一个 GPU 传输到同一个节点的另一个 GPU?

    • Peer-to-peer, PCI+host memory
  • 数据怎样从一个 GPU 传输到另一个节点的 GPU?

    • Socket, InfiniBand

netSendProxy

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
ncclResult_t netSendProxy(struct ncclProxyArgs* args) {
  struct netSendResources* resources = (struct netSendResources*) (args->connector->transportResources);
  if (args->state == ncclProxyOpReady) {
    // Round to next multiple of sliceSteps
    resources->step = ROUNDUP(resources->step, args->chunkSteps);
    args->head = resources->step;
    args->tail = resources->step;
    args->end = args->head + args->nsteps;
    args->state = ncclProxyOpProgress;
  }
  if (args->state == ncclProxyOpProgress) {
    int p = args->protocol;
    int stepSize = args->connector->comm->buffSizes[p] / NCCL_STEPS;
    char* localBuff = args->connector->conn.buffs[p];
    void* mhandle = *(resources->mhandlesProto[p]);
    args->idle = 1;
    if (args->head < args->end) {
      int buffSlot = args->tail%NCCL_STEPS;
      if (args->tail < args->end && args->tail < args->head + NCCL_STEPS) {
        volatile int* sizesFifo = resources->recvMem->sizesFifo;
        volatile uint64_t* recvTail = &resources->recvMem->tail;
        if (args->protocol == NCCL_PROTO_LL128) {
          if (args->tail < *recvTail) {
            if (sizesFifo[buffSlot] != -1) {
              int ready = resources->useGdr;
              if (!ready) {
                // When data is in sysmem, we need to wait until all flags are correct since the GPU only
                // called threadfence()
                uint64_t flag = args->tail + 1;
                int nFifoLines = DIVUP(sizesFifo[buffSlot], sizeof(uint64_t)*NCCL_LL128_LINEELEMS);
                volatile uint64_t* lines = (volatile uint64_t*)(localBuff+buffSlot*stepSize);
                ready = 1;
                for (int i=0; i<nFifoLines; i++) {
                  if (lines[i*NCCL_LL128_LINEELEMS+NCCL_LL128_DATAELEMS] != flag) { ready = 0; break; }
                }
              }
              if (ready) {
                // Send through network
                NCCLCHECK(ncclNetIsend(resources->netSendComm, localBuff+buffSlot*stepSize, sizesFifo[buffSlot], mhandle, args->requests+buffSlot));
                if (args->requests[buffSlot] != NULL) {
                  sizesFifo[buffSlot] = -1;
                  // Make sure size is reset to zero before we update the head.
                  __sync_synchronize();
                  args->tail += args->sliceSteps;
                  args->idle = 0;
                }
              }
            }
          }
        } else if (args->protocol == NCCL_PROTO_LL) {
          int size = sizesFifo[buffSlot];
          if (size != -1) {
            uint32_t flag = NCCL_LL_FLAG(args->tail + 1);
            int nFifoLines = DIVUP(size, sizeof(union ncclLLFifoLine));
            size = nFifoLines * sizeof(union ncclLLFifoLine);
            union ncclLLFifoLine* lines = (union ncclLLFifoLine*)(localBuff+buffSlot*stepSize);
            int ready = 1;
            for (int i=0; i<nFifoLines; i++) {
              volatile uint32_t *f1 = &lines[i].flag1;
              volatile uint32_t *f2 = &lines[i].flag2;
              if (f1[0] != flag || f2[0] != flag) { ready = 0; break; }
            }
            if (ready) {
              NCCLCHECK(ncclNetIsend(resources->netSendComm, lines, size, mhandle, args->requests+buffSlot));
              if (args->requests[buffSlot] != NULL) {
                sizesFifo[buffSlot] = -1;
                // Make sure size is reset to zero before we update the head.
                __sync_synchronize();
                args->tail += args->sliceSteps;
                args->idle = 0;
              }
            }
          }
        } else if (args->tail < *recvTail) {
          // Send through network
          if (sizesFifo[buffSlot] != -1) {
            NCCLCHECK(ncclNetIsend(resources->netSendComm, localBuff+buffSlot*stepSize, sizesFifo[buffSlot], mhandle, args->requests+buffSlot));
            if (args->requests[buffSlot] != NULL) {
              sizesFifo[buffSlot] = -1;
              // Make sure size is reset to zero before we update the head.
              __sync_synchronize();
              args->tail += args->sliceSteps;
              args->idle = 0;
            }
          }
        }
      }
      if (args->head < args->tail) {
        int done;
        int buffSlot = args->head%NCCL_STEPS;
        NCCLCHECK(ncclNetTest(args->requests[buffSlot], &done, NULL));
        if (done) {
          args->head += args->sliceSteps;
          resources->sendMem->head = args->head;
          args->idle = 0;
        }
      }
    }
    if (args->head == args->end) {
      resources->step = args->end;
      args->idle = 0;
      args->state = ncclProxyOpNone;
    }
  }
  return ncclSuccess;
}

netRecvProxy

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
ncclResult_t netRecvProxy(struct ncclProxyArgs* args) {
  struct netRecvResources* resources = (struct netRecvResources*) (args->connector->transportResources);
  if (args->state == ncclProxyOpReady) {
    // Round to next multiple of sliceSteps
    resources->step = ROUNDUP(resources->step, args->chunkSteps);
    args->head = resources->step;
    args->tail = resources->step;
    args->end = args->head + args->nsteps;
    args->state = ncclProxyOpProgress;
  }
  if (args->state == ncclProxyOpProgress) {
    args->idle = 1;
    int p = args->protocol;
    int stepSize = args->connector->comm->buffSizes[p] / NCCL_STEPS;
    char* localBuff = args->connector->conn.buffs[p];
    void* mhandle = *(resources->mhandlesProto[p]);
    if (args->head < args->end) {
      volatile uint64_t* sendHead = &resources->sendMem->head;
      if ((args->tail < args->head + NCCL_STEPS) && (args->tail < *sendHead + NCCL_STEPS) && (args->tail < args->end)) {
        int buffSlot = args->tail%NCCL_STEPS;
        int sliceSize = stepSize * args->sliceSteps;
        NCCLCHECK(ncclNetIrecv(resources->netRecvComm, localBuff+buffSlot*stepSize, sliceSize, mhandle, args->requests+buffSlot));
        if (args->requests[buffSlot] != NULL) {
          args->tail += args->sliceSteps;
          args->idle = 0;
        }
      }
      if (args->tail > args->head) {
        int buffSlot = args->head%NCCL_STEPS;
        int done, size;
        NCCLCHECK(ncclNetTest(args->requests[buffSlot], &done, &size));
        if (done) {
          args->head += args->sliceSteps;
          if (args->protocol == NCCL_PROTO_SIMPLE) {
            if (resources->useGdr) NCCLCHECK(ncclNetFlush(resources->netRecvComm, localBuff+buffSlot*stepSize, size, mhandle));
            resources->recvMem->tail = args->head;
          }
          args->idle = 0;
        }
      }
    }
    if (args->head == args->end) {
      resources->step = args->end;
      args->idle = 0;
      args->state = ncclProxyOpNone;
    }
  }
  return ncclSuccess;
}

persistentThread

可以看到 proxy 执行逻辑 persistentThread 是一个 while 循环,会循环检查队列中有无 op。op->progress 则是每个 op 对应的 proxy 操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void* persistentThread(void *comm_) {
  struct ncclComm* comm = (struct ncclComm*)comm_;
  struct ncclProxyState* state = &comm->proxyState;
  struct ncclProxyArgs* op = NULL;
  ncclResult_t ret = ncclSuccess;

  while (1) {
    do {
      if (*comm->abortFlag) return NULL;
      if (op == NULL) {
        pthread_mutex_lock(&state->mutex);
        op = state->ops;
        if (op == NULL) {
          if (state->stop) {
            // No more commands to process and proxy has been requested to stop
            pthread_mutex_unlock(&state->mutex);
            return NULL;
          }
          pthread_cond_wait(&state->cond, &state->mutex);
        }
        pthread_mutex_unlock(&state->mutex);

    } while (op == NULL);
    if (op->state != ncclProxyOpNone && op->opCount < comm->lastOpCount) ret = op->progress(op);
  }
  // ...
}

SaveProxy

那么这些 op 是怎么入队的呢,具体的就是在 SaveProxy 中处理的。而 SaveProxy 则是通过的调用路径

1
2
3
4
->ncclEnqueueCheck
  ->ncclSaveKernel
    ->ncclProxySaveP2p/ncclProxySaveColl
      ->SaveProxy

可以看到这里的 op->progress 就具体的和 connector->transportComm->proxy 关联上了。如果 transport 为 netTransport,则 proxy 则是对应的 netSendProxynetRecvProxy

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
template <int type>
static ncclResult_t SaveProxy(int peer, struct ncclProxyArgs* args) {
  if (peer < 0) return ncclSuccess;

  struct ncclPeer* peerComm = args->channel->peers+peer;
  struct ncclConnector* connector = type == proxyRecv ? &peerComm->recv : &peerComm->send;
  if (connector->transportComm == NULL) {
    WARN("[%d] Error no transport for %s peer %d on channel %d\n", connector->comm->rank,
        type == proxyRecv ? "recv" : "send", peer, args->channel->id);
    return ncclInternalError;
  }
  if (connector->transportComm->proxy == NULL) return ncclSuccess;

  struct ncclProxyArgs* op;
  NCCLCHECK(allocateArgs(connector->comm, &op));
  memcpy(op, args, sizeof(struct ncclProxyArgs));
  op->connector = connector;
  op->progress = connector->transportComm->proxy;
  op->state = ncclProxyOpReady;
  ProxyAppend(connector, op);
  return ncclSuccess;
}

netTransport 使用 IB 作为 backend 时,则会调用到通过下面三个函数真正调用到 RDMA IB 接口。

  • ncclNetIrecv:Proxy 从网络收到数据
  • ncclNetIsend:Proxy 发送数据到网络
  • ncclNetIflush: Proxy 将数据从 CPU 传到 GPU

ncclIbIsend

实际是通过 RC 连接中 RDMA_REMOTE_WRITE 的单边操作来实现的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
ncclResult_t ncclIbIsend(void* sendComm, void* data, int size, void* mhandle, void** request) {
  struct ncclIbSendComm* comm = (struct ncclIbSendComm*)sendComm;
  if (comm->ready == 0) NCCLCHECK(ncclSendCheck(comm));
  if (comm->ready == 0) { *request = NULL; return ncclSuccess; }

  struct ibv_mr* mr = (struct ibv_mr*)mhandle;

  // Wait for the receiver to have posted the corresponding receive
  volatile struct ncclIbSendFifo* slot = comm->fifo + (comm->fifoHead%MAX_REQUESTS);
  volatile uint32_t * readyPtr = &slot->ready;
  if (*readyPtr == 0) { *request = NULL; return ncclSuccess; }

  struct ncclIbRequest* req;
  NCCLCHECK(ncclIbGetRequest(comm->reqs, &req));
  req->verbs = &comm->verbs;
  req->size = size;

  struct ibv_send_wr wr;
  memset(&wr, 0, sizeof(wr));
  wr.wr_id = (uint64_t)req;

  struct ibv_sge sge;
  if (size == 0) {
    wr.sg_list = NULL;
    wr.num_sge = 0;
  } else {
    sge.addr=(uintptr_t)data; sge.length=(unsigned int)size; sge.lkey=mr->lkey;
    wr.sg_list = &sge;
    wr.num_sge = 1;
  }
  wr.opcode = IBV_WR_SEND;
  wr.send_flags = IBV_SEND_SIGNALED;

  int useAr = 0;
  if (size > ncclParamIbArThreshold()) {
    useAr = 1;
  }
#if USE_RDMA_WRITE
  __sync_synchronize(); // order the readyPtr load against rkey load below
  // Sanity checks to catch user collective call count/size mismatches
  // plus any potential programming errors
  if (size > slot->size || slot->size <= 0 || slot->addr == 0 || slot->rkey == 0 || slot->seq != comm->fifoHead) {
    WARN("NET/IB : collective mismatch error local size %d remote %d addr %lx rkey %x seq %x/%x",
        size, slot->size, slot->addr, slot->rkey, slot->seq, comm->fifoHead);
    return ncclInternalError;
  }
  wr.opcode = useAr ? IBV_WR_RDMA_WRITE : IBV_WR_RDMA_WRITE_WITH_IMM;
  wr.wr.rdma.remote_addr = slot->addr;
  wr.wr.rdma.rkey = slot->rkey;
  wr.imm_data = size; // Send the message size via imm_data
  __sync_synchronize();
#endif
  // We must clear slot->ready, but reset other fields to aid
  // debugging and sanity checks
  slot->ready = 0;
  slot->addr = 0ULL;
  slot->rkey = slot->size = slot->seq = 0;
  comm->fifoHead++;

  struct ibv_send_wr* bad_wr;
  NCCLCHECK(wrap_ibv_post_send(comm->qp, &wr, &bad_wr));

#if USE_RDMA_WRITE
  // When using adaptive routing, send the bulk of the data first as an
  // RDMA_WRITE, then a 0-byte RDMA_WRITE_WITH_IMM to trigger a remote
  // completion.
  if (useAr) {
    wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
    wr.sg_list = NULL;
    wr.num_sge = 0;
    wr.send_flags &= ~IBV_SEND_SIGNALED;
    NCCLCHECK(wrap_ibv_post_send(comm->qp, &wr, &bad_wr));
  }
#endif
  *request = req;
  return ncclSuccess;
}

参考资料