Skip to content

vllm.distributed.device_communicators.custom_all_reduce_utils

__all__ module-attribute

__all__ = ['gpu_p2p_access_check']

_gpu_p2p_access_cache module-attribute

_gpu_p2p_access_cache: Optional[dict[str, bool]] = None

logger module-attribute

logger = init_logger(__name__)

result module-attribute

result = can_actually_p2p(batch_src, batch_tgt)

can_actually_p2p

can_actually_p2p(
    batch_src: Sequence[int], batch_tgt: Sequence[int]
) -> Sequence[bool]

Usually, checking if P2P access is enabled can be done by torch.cuda.can_device_access_peer(src, tgt). However, sometimes the driver might be broken, and torch.cuda.can_device_access_peer(src, tgt) returns True even if P2P access is not actually possible. See https://github.com/vllm-project/vllm/issues/2728 and https://forums.developer.nvidia.com/t/direct-gpu-gpu-communication-does-not-seem-to-work-properly/283264/10 Therefore, we have to perform a real P2P access to check if it is actually possible.

Note on p2p and cuda IPC: Usually, one process uses one GPU: GPU src --> cuda context src --> tensor src --> process src

We need to combine p2p and cuda IPC, so that: GPU src --> cuda context src --> tensor src --> process src |shared| GPU tgt --> cuda context tgt --> tensor tgt --> process tgt That is to say, process src creates a tensor in GPU src, passes IPC handle to process tgt, and process tgt accesses the tensor in GPU tgt. Any operation on the tensor in process tgt will be reflected in the tensor in process src, because they are the same memory segment. It is important to note that process tgt accesses the tensor in GPU tgt, not GPU src. That's why we need p2p access.

The most time-consuming part is the process creation. To avoid creating processes for every pair of GPUs, we use batched testing. We create two processes for testing all pairs of GPUs in batch. The trick is to reset the device after each test (which is not available in PyTorch).

Source code in vllm/distributed/device_communicators/custom_all_reduce_utils.py
def can_actually_p2p(
    batch_src: Sequence[int],
    batch_tgt: Sequence[int],
) -> Sequence[bool]:
    """
    Usually, checking if P2P access is enabled can be done by
    `torch.cuda.can_device_access_peer(src, tgt)`. However, sometimes
    the driver might be broken, and `torch.cuda.can_device_access_peer(src, tgt)`
    returns `True` even if P2P access is not actually possible.
    See https://github.com/vllm-project/vllm/issues/2728 and
    https://forums.developer.nvidia.com/t/direct-gpu-gpu-communication-does-not-seem-to-work-properly/283264/10
    Therefore, we have to perform a real P2P access to check if it is actually
    possible.

    Note on p2p and cuda IPC:
    Usually, one process uses one GPU:
    GPU src --> cuda context src --> tensor src --> process src

    We need to combine p2p and cuda IPC, so that:
    GPU src --> cuda context src --> tensor src --> process src
                                      |shared|
    GPU tgt --> cuda context tgt --> tensor tgt --> process tgt
    That is to say, process src creates a tensor in GPU src, passes IPC handle to
    process tgt, and process tgt accesses the tensor in GPU tgt. Any operation on the
    tensor in process tgt will be reflected in the tensor in process src, because
    they are the same memory segment.
    It is important to note that process tgt accesses the tensor in GPU tgt, not
    GPU src. That's why we need p2p access.

    The most time-consuming part is the process creation. To avoid creating
    processes for every pair of GPUs, we use batched testing. We create two
    processes for testing all pairs of GPUs in batch. The trick is to reset
    the device after each test (which is not available in PyTorch).
    """  # noqa
    cuda_visible_devices = envs.CUDA_VISIBLE_DEVICES
    # pass the CUDA_VISIBLE_DEVICES to the child process
    # to make sure they see the same set of GPUs

    # make sure the processes are spawned
    smp = mp.get_context("spawn")
    producer_queue = smp.Queue()
    consumer_queue = smp.Queue()
    result_queue = smp.Queue()
    p_src = smp.Process(target=producer,
                        args=(batch_src, producer_queue, consumer_queue,
                              result_queue, cuda_visible_devices))
    p_tgt = smp.Process(target=consumer,
                        args=(batch_tgt, producer_queue, consumer_queue,
                              result_queue, cuda_visible_devices))
    p_src.start()
    p_tgt.start()
    p_src.join()
    p_tgt.join()
    assert p_src.exitcode == 0 and p_tgt.exitcode == 0
    result: list[bool] = []
    for src, tgt in zip(batch_src, batch_tgt):
        a = result_queue.get()
        b = result_queue.get()
        if a != b:
            logger.warning(
                "Two processes do not agree on the P2P access"
                " status on %d -> %d, treat as disabled.", src, tgt)
            result.append(False)
        else:
            result.append(a)
    return result

consumer

consumer(
    batch_tgt: Sequence[int],
    producer_queue,
    consumer_queue,
    result_queue,
    cuda_visible_devices: Optional[str] = None,
)
Source code in vllm/distributed/device_communicators/custom_all_reduce_utils.py
def consumer(batch_tgt: Sequence[int],
             producer_queue,
             consumer_queue,
             result_queue,
             cuda_visible_devices: Optional[str] = None):
    if cuda_visible_devices is not None:
        update_environment_variables(
            {"CUDA_VISIBLE_DEVICES": cuda_visible_devices})

    lib = CudaRTLibrary()
    for j in batch_tgt:
        lib.cudaSetDevice(j)
        handle = producer_queue.get()
        open_success = False
        try:
            pointer = lib.cudaIpcOpenMemHandle(handle)  # type: ignore
            open_success = True
        except RuntimeError:
            # cannot error out here, because the producer process
            # is still waiting for the response.
            pass
        consumer_queue.put(open_success)
        if open_success:
            # modify the memory
            lib.cudaMemset(pointer, 2, 1024)
            lib.cudaDeviceSynchronize()
            # use two queues to simulate barrier
            producer_queue.get()
            consumer_queue.put(0)
            # check if the memory is modified
            host_data = (ctypes.c_char * 1024)()
            lib.cudaMemcpy(host_data, pointer, 1024)  # type: ignore
            for i in range(1024):
                if ord(host_data[i]) != 2:
                    open_success = False
                    break
        result_queue.put(open_success)
        lib.cudaDeviceReset()

gpu_p2p_access_check

gpu_p2p_access_check(src: int, tgt: int) -> bool

Check if GPU src can access GPU tgt.

Source code in vllm/distributed/device_communicators/custom_all_reduce_utils.py
def gpu_p2p_access_check(src: int, tgt: int) -> bool:
    """Check if GPU src can access GPU tgt."""

    # if the cache variable is already calculated,
    # read from the cache instead of checking it again
    global _gpu_p2p_access_cache
    if _gpu_p2p_access_cache is not None:
        return _gpu_p2p_access_cache[f"{src}->{tgt}"]

    is_distributed = dist.is_initialized()

    num_dev = cuda_device_count_stateless()
    cuda_visible_devices = envs.CUDA_VISIBLE_DEVICES
    if cuda_visible_devices is None:
        cuda_visible_devices = ",".join(str(i) for i in range(num_dev))

    path = os.path.join(
        envs.VLLM_CACHE_ROOT,
        f"gpu_p2p_access_cache_for_{cuda_visible_devices}.json")
    os.makedirs(os.path.dirname(path), exist_ok=True)
    from vllm.distributed.parallel_state import get_world_group
    if ((not is_distributed or get_world_group().local_rank == 0)
            and (not os.path.exists(path))):
        # only the local master process (with local_rank == 0) can
        #  enter this block to calculate the cache
        logger.info("generating GPU P2P access cache in %s", path)
        cache: dict[str, bool] = {}
        ids = list(range(num_dev))
        # batch of all pairs of GPUs
        batch_src, batch_tgt = zip(*list(product(ids, ids)))
        # NOTE: we use `subprocess` rather than `multiprocessing` here
        # because the caller might not have `if __name__ == "__main__":`,
        # in that case we cannot use spawn method in multiprocessing.
        # However, `can_actually_p2p` requires spawn method.
        # The fix is, we use `subprocess` to call the function,
        # where we have `if __name__ == "__main__":` in this file.

        # use a temporary file to store the result
        # we don't use the output of the subprocess directly,
        # because the subprocess might produce logging output
        with tempfile.NamedTemporaryFile() as output_file:
            input_bytes = pickle.dumps(
                (batch_src, batch_tgt, output_file.name))
            returned = subprocess.run([sys.executable, __file__],
                                      input=input_bytes,
                                      capture_output=True)
            # check if the subprocess is successful
            try:
                returned.check_returncode()
            except Exception as e:
                # wrap raised exception to provide more information
                raise RuntimeError(
                    f"Error happened when batch testing "
                    f"peer-to-peer access from {batch_src} to {batch_tgt}:\n"
                    f"{returned.stderr.decode()}") from e
            with open(output_file.name, "rb") as f:
                result = pickle.load(f)
        for _i, _j, r in zip(batch_src, batch_tgt, result):
            cache[f"{_i}->{_j}"] = r
        with open(path, "w") as f:
            json.dump(cache, f, indent=4)
    if is_distributed:
        get_world_group().barrier()
    logger.info("reading GPU P2P access cache from %s", path)
    with open(path) as f:
        cache = json.load(f)
    _gpu_p2p_access_cache = cache
    return _gpu_p2p_access_cache[f"{src}->{tgt}"]

producer

producer(
    batch_src: Sequence[int],
    producer_queue,
    consumer_queue,
    result_queue,
    cuda_visible_devices: Optional[str] = None,
)
Source code in vllm/distributed/device_communicators/custom_all_reduce_utils.py
def producer(batch_src: Sequence[int],
             producer_queue,
             consumer_queue,
             result_queue,
             cuda_visible_devices: Optional[str] = None):
    if cuda_visible_devices is not None:
        update_environment_variables(
            {"CUDA_VISIBLE_DEVICES": cuda_visible_devices})

    lib = CudaRTLibrary()
    for i in batch_src:
        lib.cudaSetDevice(i)
        pointer = lib.cudaMalloc(1024)
        lib.cudaMemset(pointer, 1, 1024)
        lib.cudaDeviceSynchronize()
        handle = lib.cudaIpcGetMemHandle(pointer)
        producer_queue.put(handle)
        open_success = consumer_queue.get()
        if open_success:
            # use two queues to simulate barrier
            producer_queue.put(0)
            consumer_queue.get()
            # check if the memory is modified
            host_data = (ctypes.c_char * 1024)()
            lib.cudaMemcpy(host_data, pointer, 1024)  # type: ignore
            for i in range(1024):
                if ord(host_data[i]) != 2:
                    open_success = False
                    break
        result_queue.put(open_success)
        lib.cudaDeviceReset()