Skip to content

vllm.distributed.device_communicators.custom_all_reduce

custom_ar module-attribute

custom_ar = True

logger module-attribute

logger = init_logger(__name__)

CustomAllreduce

Source code in vllm/distributed/device_communicators/custom_all_reduce.py
class CustomAllreduce:

    _SUPPORTED_WORLD_SIZES = [2, 4, 6, 8]

    # max_size: max supported allreduce size
    def __init__(self,
                 group: ProcessGroup,
                 device: Union[int, str, torch.device],
                 max_size=8192 * 1024) -> None:
        """
        Args:
            group: the process group to work on. If None, it will use the
                default process group.
            device: the device to bind the CustomAllreduce to. If None,
                it will be bind to f"cuda:{local_rank}".
        It is the caller's responsibility to make sure each communicator
        is bind to a unique device, and all communicators in this group
        are in the same node.
        """
        self._IS_CAPTURING = False
        self.disabled = True

        if not custom_ar:
            # disable because of missing custom allreduce library
            # e.g. in a non-GPU environment
            logger.info("Custom allreduce is disabled because "
                        "of missing custom allreduce library")
            return

        self.group = group

        assert dist.get_backend(group) != dist.Backend.NCCL, (
            "CustomAllreduce should be attached to a non-NCCL group.")

        if not all(in_the_same_node_as(group, source_rank=0)):
            # No need to initialize custom allreduce for multi-node case.
            logger.warning(
                "Custom allreduce is disabled because this process group"
                " spans across nodes.")
            return

        rank = dist.get_rank(group=self.group)
        self.rank = rank
        world_size = dist.get_world_size(group=self.group)
        if world_size == 1:
            # No need to initialize custom allreduce for single GPU case.
            return

        if world_size not in CustomAllreduce._SUPPORTED_WORLD_SIZES:
            logger.warning(
                "Custom allreduce is disabled due to an unsupported world"
                " size: %d. Supported world sizes: %s. To silence this "
                "warning, specify disable_custom_all_reduce=True explicitly.",
                world_size, str(CustomAllreduce._SUPPORTED_WORLD_SIZES))
            return

        if isinstance(device, int):
            device = torch.device(f"cuda:{device}")
        elif isinstance(device, str):
            device = torch.device(device)
        # now `device` is a `torch.device` object
        assert isinstance(device, torch.device)
        self.device = device

        cuda_visible_devices = envs.CUDA_VISIBLE_DEVICES
        if cuda_visible_devices:
            device_ids = list(map(int, cuda_visible_devices.split(",")))
        else:
            device_ids = list(range(cuda_device_count_stateless()))

        physical_device_id = device_ids[device.index]
        tensor = torch.tensor([physical_device_id],
                              dtype=torch.int,
                              device="cpu")
        gather_list = [
            torch.tensor([0], dtype=torch.int, device="cpu")
            for _ in range(world_size)
        ]
        dist.all_gather(gather_list, tensor, group=self.group)
        physical_device_ids = [t.item() for t in gather_list]

        # test nvlink first, this will filter out most of the cases
        # where custom allreduce is not supported
        # this checks hardware and driver support for NVLink
        assert current_platform.is_cuda_alike()
        fully_connected = current_platform.is_fully_connected(
            physical_device_ids)
        if world_size > 2 and not fully_connected:
            logger.warning(
                "Custom allreduce is disabled because it's not supported on"
                " more than two PCIe-only GPUs. To silence this warning, "
                "specify disable_custom_all_reduce=True explicitly.")
            return
        # test P2P capability, this checks software/cudaruntime support
        # this is expensive to compute at the first time
        # then we cache the result
        # On AMD GPU, p2p is always enabled between XGMI connected GPUs
        if not current_platform.is_rocm() and not _can_p2p(rank, world_size):
            logger.warning(
                "Custom allreduce is disabled because your platform lacks "
                "GPU P2P capability or P2P test failed. To silence this "
                "warning, specify disable_custom_all_reduce=True explicitly.")
            return

        self.disabled = False
        # Buffers memory are owned by this Python class and passed to C++.
        # Meta data composes of two parts: meta data for synchronization and a
        # temporary buffer for storing intermediate allreduce results.
        self.meta_ptrs = self.create_shared_buffer(ops.meta_size() + max_size,
                                                   group=group,
                                                   uncached=True)
        # This is a pre-registered IPC buffer. In eager mode, input tensors
        # are first copied into this buffer before allreduce is performed
        self.buffer_ptrs = self.create_shared_buffer(max_size, group=group)
        # This is a buffer for storing the tuples of pointers pointing to
        # IPC buffers from all ranks. Each registered tuple has size of
        # 8*world_size bytes where world_size is at most 8. Allocating 8MB
        # is enough for 131072 such tuples. The largest model I've seen only
        # needs less than 10000 of registered tuples.
        self.rank_data = torch.empty(8 * 1024 * 1024,
                                     dtype=torch.uint8,
                                     device=self.device)
        self.max_size = max_size
        self.rank = rank
        self.world_size = world_size
        self.fully_connected = fully_connected
        self._ptr = ops.init_custom_ar(self.meta_ptrs, self.rank_data, rank,
                                       self.fully_connected)
        ops.register_buffer(self._ptr, self.buffer_ptrs)

    @contextmanager
    def capture(self):
        """
        The main responsibility of this context manager is the 
        `register_graph_buffers` call at the end of the context.
        It records all the buffer addresses used in the CUDA graph.
        """
        try:
            self._IS_CAPTURING = True
            yield
        finally:
            self._IS_CAPTURING = False
            if not self.disabled:
                self.register_graph_buffers()

    def register_graph_buffers(self):
        handle, offset = ops.get_graph_buffer_ipc_meta(self._ptr)
        logger.info("Registering %d cuda graph addresses", len(offset))
        # We cannot directly use `dist.all_gather_object` here
        # because it is incompatible with `gloo` backend under inference mode.
        # see https://github.com/pytorch/pytorch/issues/126032 for details.
        all_data = [[None, None]
                    for _ in range(dist.get_world_size(group=self.group))]
        all_data[self.rank] = [handle, offset]
        ranks = sorted(dist.get_process_group_ranks(group=self.group))
        for i, rank in enumerate(ranks):
            dist.broadcast_object_list(all_data[i],
                                       src=rank,
                                       group=self.group,
                                       device="cpu")
        # Unpack list of tuples to tuple of lists.
        handles = [d[0] for d in all_data]  # type: ignore
        offsets = [d[1] for d in all_data]  # type: ignore
        ops.register_graph_buffers(self._ptr, handles, offsets)

    def should_custom_ar(self, inp: torch.Tensor):
        if self.disabled:
            return False
        inp_size = inp.numel() * inp.element_size()
        # custom allreduce requires input byte size to be multiples of 16
        if inp_size % 16 != 0:
            return False
        if not is_weak_contiguous(inp):
            return False
        # for 4 or more non NVLink-capable GPUs, custom allreduce provides
        # little performance improvement over NCCL.
        if self.world_size == 2 or self.fully_connected:
            return inp_size < self.max_size
        return False

    def all_reduce(self,
                   inp: torch.Tensor,
                   *,
                   out: torch.Tensor = None,
                   registered: bool = False):
        """Performs an out-of-place all reduce.

        If registered is True, this assumes inp's pointer is already
        IPC-registered. Otherwise, inp is first copied into a pre-registered
        buffer.
        """
        if out is None:
            out = torch.empty_like(inp)
        if registered:
            ops.all_reduce(self._ptr, inp, out, 0, 0)
        else:
            ops.all_reduce(self._ptr, inp, out, self.buffer_ptrs[self.rank],
                           self.max_size)
        return out

    def custom_all_reduce(self, input: torch.Tensor) -> Optional[torch.Tensor]:
        """The main allreduce API that provides support for cuda graph."""
        # When custom allreduce is disabled, this will be None.
        if self.disabled or not self.should_custom_ar(input):
            return None
        if self._IS_CAPTURING:
            if torch.cuda.is_current_stream_capturing():
                return self.all_reduce(input, registered=True)
            else:
                # If warm up, mimic the allocation pattern since custom
                # allreduce is out-of-place.
                return torch.empty_like(input)
        else:
            # Note: outside of cuda graph context, custom allreduce incurs a
            # cost of cudaMemcpy, which should be small (<=1% of overall
            # latency) compared to the performance gain of using custom kernels
            return self.all_reduce(input, registered=False)

    def close(self):
        if not self.disabled and self._ptr:
            if ops is not None:
                ops.dispose(self._ptr)
            self._ptr = 0
            self.free_shared_buffer(self.meta_ptrs, rank=self.rank)
            self.free_shared_buffer(self.buffer_ptrs, rank=self.rank)

    def __del__(self):
        self.close()

    @staticmethod
    def create_shared_buffer(size_in_bytes: int,
                             group: Optional[ProcessGroup] = None,
                             uncached: Optional[bool] = False) -> list[int]:
        pointer, handle = ops.allocate_shared_buffer_and_handle(size_in_bytes)

        world_size = dist.get_world_size(group=group)
        rank = dist.get_rank(group=group)
        handles = [None] * world_size
        dist.all_gather_object(handles, handle, group=group)

        pointers: list[int] = []
        for i, h in enumerate(handles):
            if i == rank:
                pointers.append(pointer)  # type: ignore
            else:
                pointers.append(ops.open_mem_handle(h))
        return pointers

    @staticmethod
    def free_shared_buffer(pointers: list[int],
                           group: Optional[ProcessGroup] = None,
                           rank: Optional[int] = 0) -> None:
        if rank is None:
            rank = dist.get_rank(group=group)
        if ops is not None:
            ops.free_shared_buffer(pointers[rank])

_IS_CAPTURING instance-attribute

_IS_CAPTURING = False

_SUPPORTED_WORLD_SIZES class-attribute instance-attribute

_SUPPORTED_WORLD_SIZES = [2, 4, 6, 8]

_ptr instance-attribute

_ptr = init_custom_ar(
    meta_ptrs, rank_data, rank, fully_connected
)

buffer_ptrs instance-attribute

buffer_ptrs = create_shared_buffer(max_size, group=group)

device instance-attribute

device = device

disabled instance-attribute

disabled = False

fully_connected instance-attribute

fully_connected = fully_connected

group instance-attribute

group = group

max_size instance-attribute

max_size = max_size

meta_ptrs instance-attribute

meta_ptrs = create_shared_buffer(
    meta_size() + max_size, group=group, uncached=True
)

rank instance-attribute

rank = rank

rank_data instance-attribute

rank_data = empty(
    8 * 1024 * 1024, dtype=uint8, device=device
)

world_size instance-attribute

world_size = world_size

__del__

__del__()
Source code in vllm/distributed/device_communicators/custom_all_reduce.py
def __del__(self):
    self.close()

__init__

__init__(
    group: ProcessGroup,
    device: Union[int, str, device],
    max_size=8192 * 1024,
) -> None

Parameters:

Name Type Description Default
group ProcessGroup

the process group to work on. If None, it will use the default process group.

required
device Union[int, str, device]

the device to bind the CustomAllreduce to. If None, it will be bind to f"cuda:{local_rank}".

required

It is the caller's responsibility to make sure each communicator is bind to a unique device, and all communicators in this group are in the same node.

Source code in vllm/distributed/device_communicators/custom_all_reduce.py
def __init__(self,
             group: ProcessGroup,
             device: Union[int, str, torch.device],
             max_size=8192 * 1024) -> None:
    """
    Args:
        group: the process group to work on. If None, it will use the
            default process group.
        device: the device to bind the CustomAllreduce to. If None,
            it will be bind to f"cuda:{local_rank}".
    It is the caller's responsibility to make sure each communicator
    is bind to a unique device, and all communicators in this group
    are in the same node.
    """
    self._IS_CAPTURING = False
    self.disabled = True

    if not custom_ar:
        # disable because of missing custom allreduce library
        # e.g. in a non-GPU environment
        logger.info("Custom allreduce is disabled because "
                    "of missing custom allreduce library")
        return

    self.group = group

    assert dist.get_backend(group) != dist.Backend.NCCL, (
        "CustomAllreduce should be attached to a non-NCCL group.")

    if not all(in_the_same_node_as(group, source_rank=0)):
        # No need to initialize custom allreduce for multi-node case.
        logger.warning(
            "Custom allreduce is disabled because this process group"
            " spans across nodes.")
        return

    rank = dist.get_rank(group=self.group)
    self.rank = rank
    world_size = dist.get_world_size(group=self.group)
    if world_size == 1:
        # No need to initialize custom allreduce for single GPU case.
        return

    if world_size not in CustomAllreduce._SUPPORTED_WORLD_SIZES:
        logger.warning(
            "Custom allreduce is disabled due to an unsupported world"
            " size: %d. Supported world sizes: %s. To silence this "
            "warning, specify disable_custom_all_reduce=True explicitly.",
            world_size, str(CustomAllreduce._SUPPORTED_WORLD_SIZES))
        return

    if isinstance(device, int):
        device = torch.device(f"cuda:{device}")
    elif isinstance(device, str):
        device = torch.device(device)
    # now `device` is a `torch.device` object
    assert isinstance(device, torch.device)
    self.device = device

    cuda_visible_devices = envs.CUDA_VISIBLE_DEVICES
    if cuda_visible_devices:
        device_ids = list(map(int, cuda_visible_devices.split(",")))
    else:
        device_ids = list(range(cuda_device_count_stateless()))

    physical_device_id = device_ids[device.index]
    tensor = torch.tensor([physical_device_id],
                          dtype=torch.int,
                          device="cpu")
    gather_list = [
        torch.tensor([0], dtype=torch.int, device="cpu")
        for _ in range(world_size)
    ]
    dist.all_gather(gather_list, tensor, group=self.group)
    physical_device_ids = [t.item() for t in gather_list]

    # test nvlink first, this will filter out most of the cases
    # where custom allreduce is not supported
    # this checks hardware and driver support for NVLink
    assert current_platform.is_cuda_alike()
    fully_connected = current_platform.is_fully_connected(
        physical_device_ids)
    if world_size > 2 and not fully_connected:
        logger.warning(
            "Custom allreduce is disabled because it's not supported on"
            " more than two PCIe-only GPUs. To silence this warning, "
            "specify disable_custom_all_reduce=True explicitly.")
        return
    # test P2P capability, this checks software/cudaruntime support
    # this is expensive to compute at the first time
    # then we cache the result
    # On AMD GPU, p2p is always enabled between XGMI connected GPUs
    if not current_platform.is_rocm() and not _can_p2p(rank, world_size):
        logger.warning(
            "Custom allreduce is disabled because your platform lacks "
            "GPU P2P capability or P2P test failed. To silence this "
            "warning, specify disable_custom_all_reduce=True explicitly.")
        return

    self.disabled = False
    # Buffers memory are owned by this Python class and passed to C++.
    # Meta data composes of two parts: meta data for synchronization and a
    # temporary buffer for storing intermediate allreduce results.
    self.meta_ptrs = self.create_shared_buffer(ops.meta_size() + max_size,
                                               group=group,
                                               uncached=True)
    # This is a pre-registered IPC buffer. In eager mode, input tensors
    # are first copied into this buffer before allreduce is performed
    self.buffer_ptrs = self.create_shared_buffer(max_size, group=group)
    # This is a buffer for storing the tuples of pointers pointing to
    # IPC buffers from all ranks. Each registered tuple has size of
    # 8*world_size bytes where world_size is at most 8. Allocating 8MB
    # is enough for 131072 such tuples. The largest model I've seen only
    # needs less than 10000 of registered tuples.
    self.rank_data = torch.empty(8 * 1024 * 1024,
                                 dtype=torch.uint8,
                                 device=self.device)
    self.max_size = max_size
    self.rank = rank
    self.world_size = world_size
    self.fully_connected = fully_connected
    self._ptr = ops.init_custom_ar(self.meta_ptrs, self.rank_data, rank,
                                   self.fully_connected)
    ops.register_buffer(self._ptr, self.buffer_ptrs)

all_reduce

all_reduce(
    inp: Tensor,
    *,
    out: Tensor = None,
    registered: bool = False,
)

Performs an out-of-place all reduce.

If registered is True, this assumes inp's pointer is already IPC-registered. Otherwise, inp is first copied into a pre-registered buffer.

Source code in vllm/distributed/device_communicators/custom_all_reduce.py
def all_reduce(self,
               inp: torch.Tensor,
               *,
               out: torch.Tensor = None,
               registered: bool = False):
    """Performs an out-of-place all reduce.

    If registered is True, this assumes inp's pointer is already
    IPC-registered. Otherwise, inp is first copied into a pre-registered
    buffer.
    """
    if out is None:
        out = torch.empty_like(inp)
    if registered:
        ops.all_reduce(self._ptr, inp, out, 0, 0)
    else:
        ops.all_reduce(self._ptr, inp, out, self.buffer_ptrs[self.rank],
                       self.max_size)
    return out

capture

capture()

The main responsibility of this context manager is the register_graph_buffers call at the end of the context. It records all the buffer addresses used in the CUDA graph.

Source code in vllm/distributed/device_communicators/custom_all_reduce.py
@contextmanager
def capture(self):
    """
    The main responsibility of this context manager is the 
    `register_graph_buffers` call at the end of the context.
    It records all the buffer addresses used in the CUDA graph.
    """
    try:
        self._IS_CAPTURING = True
        yield
    finally:
        self._IS_CAPTURING = False
        if not self.disabled:
            self.register_graph_buffers()

close

close()
Source code in vllm/distributed/device_communicators/custom_all_reduce.py
def close(self):
    if not self.disabled and self._ptr:
        if ops is not None:
            ops.dispose(self._ptr)
        self._ptr = 0
        self.free_shared_buffer(self.meta_ptrs, rank=self.rank)
        self.free_shared_buffer(self.buffer_ptrs, rank=self.rank)

create_shared_buffer staticmethod

create_shared_buffer(
    size_in_bytes: int,
    group: Optional[ProcessGroup] = None,
    uncached: Optional[bool] = False,
) -> list[int]
Source code in vllm/distributed/device_communicators/custom_all_reduce.py
@staticmethod
def create_shared_buffer(size_in_bytes: int,
                         group: Optional[ProcessGroup] = None,
                         uncached: Optional[bool] = False) -> list[int]:
    pointer, handle = ops.allocate_shared_buffer_and_handle(size_in_bytes)

    world_size = dist.get_world_size(group=group)
    rank = dist.get_rank(group=group)
    handles = [None] * world_size
    dist.all_gather_object(handles, handle, group=group)

    pointers: list[int] = []
    for i, h in enumerate(handles):
        if i == rank:
            pointers.append(pointer)  # type: ignore
        else:
            pointers.append(ops.open_mem_handle(h))
    return pointers

custom_all_reduce

custom_all_reduce(input: Tensor) -> Optional[Tensor]

The main allreduce API that provides support for cuda graph.

Source code in vllm/distributed/device_communicators/custom_all_reduce.py
def custom_all_reduce(self, input: torch.Tensor) -> Optional[torch.Tensor]:
    """The main allreduce API that provides support for cuda graph."""
    # When custom allreduce is disabled, this will be None.
    if self.disabled or not self.should_custom_ar(input):
        return None
    if self._IS_CAPTURING:
        if torch.cuda.is_current_stream_capturing():
            return self.all_reduce(input, registered=True)
        else:
            # If warm up, mimic the allocation pattern since custom
            # allreduce is out-of-place.
            return torch.empty_like(input)
    else:
        # Note: outside of cuda graph context, custom allreduce incurs a
        # cost of cudaMemcpy, which should be small (<=1% of overall
        # latency) compared to the performance gain of using custom kernels
        return self.all_reduce(input, registered=False)

free_shared_buffer staticmethod

free_shared_buffer(
    pointers: list[int],
    group: Optional[ProcessGroup] = None,
    rank: Optional[int] = 0,
) -> None
Source code in vllm/distributed/device_communicators/custom_all_reduce.py
@staticmethod
def free_shared_buffer(pointers: list[int],
                       group: Optional[ProcessGroup] = None,
                       rank: Optional[int] = 0) -> None:
    if rank is None:
        rank = dist.get_rank(group=group)
    if ops is not None:
        ops.free_shared_buffer(pointers[rank])

register_graph_buffers

register_graph_buffers()
Source code in vllm/distributed/device_communicators/custom_all_reduce.py
def register_graph_buffers(self):
    handle, offset = ops.get_graph_buffer_ipc_meta(self._ptr)
    logger.info("Registering %d cuda graph addresses", len(offset))
    # We cannot directly use `dist.all_gather_object` here
    # because it is incompatible with `gloo` backend under inference mode.
    # see https://github.com/pytorch/pytorch/issues/126032 for details.
    all_data = [[None, None]
                for _ in range(dist.get_world_size(group=self.group))]
    all_data[self.rank] = [handle, offset]
    ranks = sorted(dist.get_process_group_ranks(group=self.group))
    for i, rank in enumerate(ranks):
        dist.broadcast_object_list(all_data[i],
                                   src=rank,
                                   group=self.group,
                                   device="cpu")
    # Unpack list of tuples to tuple of lists.
    handles = [d[0] for d in all_data]  # type: ignore
    offsets = [d[1] for d in all_data]  # type: ignore
    ops.register_graph_buffers(self._ptr, handles, offsets)

should_custom_ar

should_custom_ar(inp: Tensor)
Source code in vllm/distributed/device_communicators/custom_all_reduce.py
def should_custom_ar(self, inp: torch.Tensor):
    if self.disabled:
        return False
    inp_size = inp.numel() * inp.element_size()
    # custom allreduce requires input byte size to be multiples of 16
    if inp_size % 16 != 0:
        return False
    if not is_weak_contiguous(inp):
        return False
    # for 4 or more non NVLink-capable GPUs, custom allreduce provides
    # little performance improvement over NCCL.
    if self.world_size == 2 or self.fully_connected:
        return inp_size < self.max_size
    return False

_can_p2p

_can_p2p(rank: int, world_size: int) -> bool
Source code in vllm/distributed/device_communicators/custom_all_reduce.py
def _can_p2p(rank: int, world_size: int) -> bool:
    for i in range(world_size):
        if i == rank:
            continue
        if envs.VLLM_SKIP_P2P_CHECK:
            logger.info(
                "Skipping P2P check and trusting the driver's P2P report.")
            return torch.cuda.can_device_access_peer(rank, i)
        if not gpu_p2p_access_check(rank, i):
            return False
    return True

is_weak_contiguous

is_weak_contiguous(inp: Tensor)
Source code in vllm/distributed/device_communicators/custom_all_reduce.py
def is_weak_contiguous(inp: torch.Tensor):
    return inp.is_contiguous() or (inp.storage().nbytes() -
                                   inp.storage_offset() * inp.element_size()
                                   == inp.numel() * inp.element_size())