CUDA IPC 是 NVIDIA CUDA 提供的一种进程间通信机制,允许不同的 CUDA 进程(通常是不同的应用程序或进程)共享 GPU 内存,从而实现高效的数据传输和协作。

GPU 显存不同进程之间默认不共享,需要 cudaIPC。这里基于 cudaIPC 的序列化每个 tensor 只需要传一些 metadata 和一个 128 byte 的 ipc handle 即可,不涉及 gpu 显存的复制操作,开销非常小。

核心优势

  1. 零拷贝传输:进程间直接共享 GPU 内存,无需通过主机内存中转
  2. 高性能:避免了 PCIe 总线上的数据传输开销
  3. 低延迟:直接内存访问,延迟极低

基本工作流程

1. 创建共享内存句柄

1
2
3
4
5
// 进程A:创建共享内存
void* d_ptr;
cudaMalloc(&d_ptr, size);
cudaIpcMemHandle_t handle;
cudaIpcGetMemHandle(&handle, d_ptr);

2. 传递句柄给其他进程

// 通过文件、共享内存、网络等方式传递handle

3. 在其他进程中打开共享内存

1
2
3
// 进程B:打开共享内存
void* d_ptr_other;
cudaIpcOpenMemHandle(&d_ptr_other, handle, cudaIpcMemLazyEnablePeerAccess);

4. 使用共享内存

// 两个进程都可以直接访问同一块GPU内存

简单示例代码

让我为您创建一个完整的 CUDA IPC 示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#!/usr/bin/env python3
"""
CUDA IPC 示例代码
演示两个进程如何共享GPU内存
"""

import torch
import multiprocessing as mp
import time
import os

def process_a(pipe):
    """进程A:创建共享内存并传递句柄"""
    print(f"Process A started (PID: {os.getpid()})")
    
    # 在GPU 0上分配内存
    with torch.cuda.device(0):
        # 创建共享张量
        shared_tensor = torch.randn(1000, 1000, device='cuda:0')
        print(f"Process A: Created tensor with shape {shared_tensor.shape}")
        
        # 获取共享句柄
        handle = shared_tensor.storage()._share_cuda_()
        print("Process A: Got shared memory handle")
        
        # 通过管道发送句柄给进程B
        pipe.send(handle)
        print("Process A: Sent handle to Process B")
        
        # 等待进程B完成数据写入
        pipe.recv()
        
        # 验证进程B写入的数据
        expected_value = torch.tensor([42.0], device='cuda:0')
        if torch.allclose(shared_tensor[0, 0], expected_value):
            print("Process A: Success! Process B modified the shared memory")
        else:
            print("Process A: Error! Shared memory was not modified correctly")

def process_b(pipe):
    """进程B:接收句柄并访问共享内存"""
    print(f"Process B started (PID: {os.getpid()})")
    
    # 在GPU 0上工作
    with torch.cuda.device(0):
        # 接收共享句柄
        handle = pipe.recv()
        print("Process B: Received shared memory handle")
        
        # 从句柄重建共享张量
        shared_tensor = torch.UntypedStorage._new_shared_cuda_(handle)
        reconstructed_tensor = torch.tensor([], device='cuda:0').set_(shared_tensor, 0, (1000, 1000))
        
        print(f"Process B: Reconstructed tensor with shape {reconstructed_tensor.shape}")
        
        # 修改共享内存
        reconstructed_tensor[0, 0] = 42.0
        print("Process B: Modified shared memory")
        
        # 通知进程A已完成
        pipe.send("done")

def main():
    """主函数"""
    print("CUDA IPC Example")
    print("=" * 50)
    
    # 创建进程间通信管道
    parent_conn, child_conn = mp.Pipe()
    
    # 创建进程
    p1 = mp.Process(target=process_a, args=(parent_conn,))
    p2 = mp.Process(target=process_b, args=(child_conn,))
    
    # 启动进程
    p1.start()
    p2.start()
    
    # 等待进程结束
    p1.join()
    p2.join()
    
    print("\nExample completed successfully!")

if __name__ == "__main__":
    main()