Skip to content

vllm.distributed.eplb

Expert parallelism load balancer (EPLB).

Modules:

Name Description
eplb_state

Expert parallelism load balancer (EPLB) metrics and states.

rebalance_algo

Expert parallelism load balancer (EPLB) for vLLM.

rebalance_execute

The actual execution of the rearrangement.

logger module-attribute

logger = init_logger(__name__)

EplbState dataclass

EPLB metrics.

Source code in vllm/distributed/eplb/eplb_state.py
@dataclass
class EplbState:
    """EPLB metrics."""

    physical_to_logical_map: torch.Tensor
    """
    Mapping from physical experts to logical experts.

    Shape: (num_moe_layers, num_physical_experts)

    # Example

    For a 2-layer MoE model with 6 physical experts and 4 logical experts on 3
    EP ranks, the mapping could look like this:

    ```
    [[0, 1, 2, 3, 0, 1],
     [0, 2, 0, 1, 0, 3]]
    ```
    """
    logical_to_physical_map: torch.Tensor
    """
    Mapping from logical experts to physical experts.

    This is a sparse matrix, where -1 indicates no mapping.

    Shape: (num_moe_layers, num_logical_experts, num_redundant_experts + 1)

    # Example

    For a 2-layer MoE model with 6 physical experts and 4 logical experts on 3
    EP ranks, the mapping could look like this:

    ```
    [[[0, 4, -1],
      [1, 5, -1],
      [2, -1, -1],
      [3, -1, -1]],
     [[0, 2, 4],
      [3, -1, -1],
      [1, -1, -1],
      [5, -1, -1]]]
    ```
    """
    logical_replica_count: torch.Tensor
    """
    Number of replicas for each logical expert.
    This is exactly the non-`-1` count in the `logical_to_physical_map`.

    Shape: (num_moe_layers, num_logical_experts)

    # Example
    For a 2-layer MoE model with 6 physical experts and 4 logical experts on 3
    EP ranks, the count could look like this:

    ```
    [[2, 2, 1, 1],
     [3, 1, 1, 1]]
    """

    expert_load_pass: torch.Tensor
    """
    Expert load during this forward pass. 
    We use the token count each expert processes as the load.

    Shape: (num_moe_layers, num_local_physical_experts)
    """
    expert_load_window: torch.Tensor
    """
    A sliding window of expert load.

    Shape: (window_size, num_moe_layers, num_local_physical_experts)
    """
    expert_load_window_step: int = 0
    """
    Current step in the sliding window.

    Different from `expert_rearrangement_step`, each EP rank may have its own
    `expert_load_window_step`.
    """
    expert_load_window_size: int = 0
    """
    Size of the expert load sliding window.
    This is a constant and is taken from the config.
    """

    expert_rearrangement_step: int = 0
    """
    Steps after last rearrangement.
    Will trigger a rearrangement if it exceeds the threshold.

    NOTE: Keep in mind that all EP ranks need to have the same
    `expert_rearrangement_step` value to ensure synchronization.
    Otherwise, the rearrangement will hang at collective
    communication calls.
    """
    expert_rearrangement_step_interval: int = 0
    """
    Interval for expert rearrangement steps.
    This is a constant and is taken from the config.
    """

    @staticmethod
    def build_initial_global_physical_to_logical_map(
        num_routed_experts: int,
        num_redundant_experts: int,
    ) -> Sequence[int]:
        """
        Build an initial expert arrangement using the following structure:
        [original routed experts, redundant experts]

        Returns:
            physical_to_logical_map (Sequence[int]): A list of integers,
                where each integer is the index of the logical expert
                that the corresponding physical expert maps to.
        """
        global_physical_to_logical_map = list(range(num_routed_experts))
        global_physical_to_logical_map += [
            i % num_routed_experts for i in range(num_redundant_experts)
        ]
        return global_physical_to_logical_map

    @classmethod
    def build(
        cls,
        model: MixtureOfExperts,
        device: torch.device,
        parallel_config: ParallelConfig,
    ) -> "EplbState":
        """
        Build the initial EPLB state.
        """
        physical_to_logical_map_list = (
            cls.build_initial_global_physical_to_logical_map(
                model.num_routed_experts,
                model.num_redundant_experts,
            ))
        physical_to_logical_map = torch.tensor(
            physical_to_logical_map_list,
            device=device,
        )
        logical_to_physical_map = torch.full(
            (model.num_logical_experts, model.num_redundant_experts + 1),
            -1,
            device=device,
        )
        logical_replica_count = torch.zeros(
            (model.num_logical_experts, ),
            device=device,
            dtype=torch.long,
        )

        for i in range(model.num_physical_experts):
            logical_idx = physical_to_logical_map[i]
            logical_to_physical_map[logical_idx,
                                    logical_replica_count[logical_idx]] = i
            logical_replica_count[logical_idx] += 1

        # Duplicate initial mapping for all layers
        physical_to_logical_map = physical_to_logical_map.unsqueeze(0).expand(
            model.num_moe_layers,
            -1,
        ).contiguous()
        logical_to_physical_map = logical_to_physical_map.unsqueeze(0).expand(
            model.num_moe_layers,
            -1,
            -1,
        ).contiguous()
        logical_replica_count = logical_replica_count.unsqueeze(0).expand(
            model.num_moe_layers,
            -1,
        ).contiguous()

        expert_load_pass = torch.zeros(
            (model.num_moe_layers, model.num_local_physical_experts),
            dtype=torch.int32,
            device=device,
        )
        expert_load_window_size = parallel_config.eplb_window_size
        expert_load_window = torch.zeros(
            (expert_load_window_size, model.num_moe_layers,
             model.num_local_physical_experts),
            dtype=torch.int32,
            device=device,
        )

        # Set the initial progress of rearrangement to 3/4
        eplb_step_interval = parallel_config.eplb_step_interval
        expert_rearrangement_step = max(
            0, eplb_step_interval - eplb_step_interval // 4)

        model.set_eplb_state(
            expert_load_pass,
            logical_to_physical_map,
            logical_replica_count,
        )

        return cls(
            physical_to_logical_map,
            logical_to_physical_map,
            logical_replica_count,
            expert_load_pass,
            expert_load_window,
            expert_load_window_size=expert_load_window_size,
            expert_rearrangement_step=expert_rearrangement_step,
            expert_rearrangement_step_interval=eplb_step_interval,
        )

    def step(self,
             model: MixtureOfExperts,
             is_dummy: bool = False,
             is_profile: bool = False,
             log_stats: bool = False) -> None:
        """
        Step the EPLB state.

        Args:
            model (MixtureOfExperts): The MoE model.
            is_dummy (bool): If `True`, this is a dummy step and the load
              metrics recorded in this forward pass will not count. Defaults
              to `False`.
            is_profile (bool): If `True`, perform a dummy rearrangement
              with maximum communication cost. This is used in `profile_run`
              to reserve enough memory for the communication buffer.
            log_stats (bool): If `True`, log the expert load metrics.

        # Stats
            The metrics are all summed up across layers.
            - `avg_tokens`: The average load across ranks.
            - `max_tokens`: The maximum load across ranks.
            - `balancedness`: The ratio of average load to maximum load.
        """

        if is_profile:
            self.rearrange(model, is_profile=True)
            return

        if is_dummy:
            # Do not record load metrics for dummy steps
            self.expert_load_pass.zero_()

        if log_stats:
            # `num_tokens`: (num_moe_layers,)
            num_tokens = self.expert_load_pass.sum(dim=-1)

            # Collect load metrics from all ranks
            ep_group = get_ep_group().device_group
            num_tokens_list = [
                torch.empty_like(num_tokens) for _ in range(ep_group.size())
            ]
            all_gather(num_tokens_list, num_tokens, group=ep_group)
            # Stack to get (num_ranks, num_moe_layers)
            num_tokens_per_rank = torch.stack(num_tokens_list).float()

            # Compute balancedness ratio:
            # for each layer:
            #   (mean load across ranks) / (max load across ranks)
            avg_tokens_tensor = num_tokens_per_rank.mean(dim=0).sum(dim=0)
            max_tokens_tensor = num_tokens_per_rank.max(dim=0).values.sum(
                dim=0)

            # Just to make type checker happy
            tokens_tensors: list[float] = torch.stack(
                [avg_tokens_tensor, max_tokens_tensor]).tolist()
            avg_tokens, max_tokens = tokens_tensors
            balancedness = avg_tokens / max_tokens if max_tokens > 0 else 0.0

            if ep_group.rank() == 0:
                logger.info(
                    "EPLB step: avg_tokens=%.2f, max_tokens=%d, "
                    "balancedness=%.4f", avg_tokens, max_tokens, balancedness)

        # Update the expert load sliding window
        if not is_dummy:
            self.expert_load_window[self.expert_load_window_step] = (
                self.expert_load_pass.clone())
            self.expert_load_window_step += 1
            if self.expert_load_window_step >= self.expert_load_window_size:
                self.expert_load_window_step = 0
            self.expert_load_pass.zero_()

        # Step the expert rearrangement step
        # Note that even if this is a dummy step, we still increment the
        # rearrangement step and perform rearrangement to ensure all ranks are
        # performing collective communication.
        self.expert_rearrangement_step += 1
        if (self.expert_rearrangement_step
                >= self.expert_rearrangement_step_interval):
            self.expert_rearrangement_step = 0
            self.rearrange(model)

    def rearrange(self,
                  model: MixtureOfExperts,
                  is_profile: bool = False) -> None:
        """
        Rearrange the experts according to the current load.
        """

        ep_group = get_ep_group().device_group
        ep_rank = ep_group.rank()

        time_start = None
        is_main_rank = ep_rank == 0
        if is_main_rank:
            torch.cuda.synchronize()
            time_start = time.perf_counter()
            logger.info("Rearranging experts %s...",
                        "(profile)" if is_profile else "")

        # This mapping is only used here, so we do not store it in the state
        physical_expert_start = ep_rank * model.num_local_physical_experts
        physical_expert_end = (physical_expert_start +
                               model.num_local_physical_experts)
        # (num_moe_layers, num_local_physical_experts)
        local_physical_to_logical_map = self.physical_to_logical_map[
            :,
            physical_expert_start:physical_expert_end,
        ]

        # Map the local physical expert load to global logical experts
        logical_expert_load_window = torch.zeros(
            self.expert_load_window_size,
            model.num_moe_layers,
            model.num_logical_experts,
            dtype=self.expert_load_window.dtype,
            device=self.expert_load_window.device,
        )
        logical_expert_load_window.scatter_add_(
            dim=-1,
            index=local_physical_to_logical_map.unsqueeze(0).expand_as(
                self.expert_load_window).long(),
            src=self.expert_load_window,
        )

        # Perform all-reduce to get the expert load across all ranks
        global_expert_load_window = logical_expert_load_window.sum(dim=0)
        all_reduce(global_expert_load_window, group=ep_group)

        # TODO(bowen): Treat differently for prefill and decode nodes
        num_replicas = model.num_physical_experts
        num_groups = model.num_expert_groups
        num_nodes = get_node_count()
        num_gpus = ep_group.size()

        if num_gpus % num_nodes != 0:
            logger.warning_once(
                f"num_gpus % num_nodes != 0, "
                "not using hierarchical rearrangement algorithm.\n"
                f"{num_gpus=}, {num_nodes=}")

        # Get new expert mappings
        (
            new_physical_to_logical_map,
            new_logical_to_physical_map,
            new_logical_replica_count,
        ) = (rebalance_experts(
            global_expert_load_window,
            num_replicas,
            num_groups,
            num_nodes,
            num_gpus,
        ))

        # Update expert weights
        rearrange_expert_weights_inplace(
            self.physical_to_logical_map,
            new_physical_to_logical_map,
            model.expert_weights,
            ep_group,
            is_profile,
        )

        if not is_profile:
            self.physical_to_logical_map.copy_(new_physical_to_logical_map)
            self.logical_to_physical_map.copy_(new_logical_to_physical_map)
            self.logical_replica_count.copy_(new_logical_replica_count)

        if is_main_rank:
            assert time_start is not None
            torch.cuda.synchronize()
            time_end = time.perf_counter()
            logger.info(
                "Rearranged experts%sin %.2f seconds.",
                " (profile) " if is_profile else " ",
                time_end - time_start,
            )

expert_load_pass instance-attribute

expert_load_pass: Tensor

Expert load during this forward pass. We use the token count each expert processes as the load.

Shape: (num_moe_layers, num_local_physical_experts)

expert_load_window instance-attribute

expert_load_window: Tensor

A sliding window of expert load.

Shape: (window_size, num_moe_layers, num_local_physical_experts)

expert_load_window_size class-attribute instance-attribute

expert_load_window_size: int = 0

Size of the expert load sliding window. This is a constant and is taken from the config.

expert_load_window_step class-attribute instance-attribute

expert_load_window_step: int = 0

Current step in the sliding window.

Different from expert_rearrangement_step, each EP rank may have its own expert_load_window_step.

expert_rearrangement_step class-attribute instance-attribute

expert_rearrangement_step: int = 0

Steps after last rearrangement. Will trigger a rearrangement if it exceeds the threshold.

NOTE: Keep in mind that all EP ranks need to have the same expert_rearrangement_step value to ensure synchronization. Otherwise, the rearrangement will hang at collective communication calls.

expert_rearrangement_step_interval class-attribute instance-attribute

expert_rearrangement_step_interval: int = 0

Interval for expert rearrangement steps. This is a constant and is taken from the config.

logical_replica_count instance-attribute

logical_replica_count: Tensor

Number of replicas for each logical expert. This is exactly the non--1 count in the logical_to_physical_map.

Shape: (num_moe_layers, num_logical_experts)

Example

For a 2-layer MoE model with 6 physical experts and 4 logical experts on 3 EP ranks, the count could look like this:

``` [[2, 2, 1, 1], [3, 1, 1, 1]]

logical_to_physical_map instance-attribute

logical_to_physical_map: Tensor

Mapping from logical experts to physical experts.

This is a sparse matrix, where -1 indicates no mapping.

Shape: (num_moe_layers, num_logical_experts, num_redundant_experts + 1)

Example

For a 2-layer MoE model with 6 physical experts and 4 logical experts on 3 EP ranks, the mapping could look like this:

[[[0, 4, -1],
  [1, 5, -1],
  [2, -1, -1],
  [3, -1, -1]],
 [[0, 2, 4],
  [3, -1, -1],
  [1, -1, -1],
  [5, -1, -1]]]

physical_to_logical_map instance-attribute

physical_to_logical_map: Tensor

Mapping from physical experts to logical experts.

Shape: (num_moe_layers, num_physical_experts)

Example

For a 2-layer MoE model with 6 physical experts and 4 logical experts on 3 EP ranks, the mapping could look like this:

[[0, 1, 2, 3, 0, 1],
 [0, 2, 0, 1, 0, 3]]

__init__

__init__(
    physical_to_logical_map: Tensor,
    logical_to_physical_map: Tensor,
    logical_replica_count: Tensor,
    expert_load_pass: Tensor,
    expert_load_window: Tensor,
    expert_load_window_step: int = 0,
    expert_load_window_size: int = 0,
    expert_rearrangement_step: int = 0,
    expert_rearrangement_step_interval: int = 0,
) -> None

build classmethod

build(
    model: MixtureOfExperts,
    device: device,
    parallel_config: ParallelConfig,
) -> EplbState

Build the initial EPLB state.

Source code in vllm/distributed/eplb/eplb_state.py
@classmethod
def build(
    cls,
    model: MixtureOfExperts,
    device: torch.device,
    parallel_config: ParallelConfig,
) -> "EplbState":
    """
    Build the initial EPLB state.
    """
    physical_to_logical_map_list = (
        cls.build_initial_global_physical_to_logical_map(
            model.num_routed_experts,
            model.num_redundant_experts,
        ))
    physical_to_logical_map = torch.tensor(
        physical_to_logical_map_list,
        device=device,
    )
    logical_to_physical_map = torch.full(
        (model.num_logical_experts, model.num_redundant_experts + 1),
        -1,
        device=device,
    )
    logical_replica_count = torch.zeros(
        (model.num_logical_experts, ),
        device=device,
        dtype=torch.long,
    )

    for i in range(model.num_physical_experts):
        logical_idx = physical_to_logical_map[i]
        logical_to_physical_map[logical_idx,
                                logical_replica_count[logical_idx]] = i
        logical_replica_count[logical_idx] += 1

    # Duplicate initial mapping for all layers
    physical_to_logical_map = physical_to_logical_map.unsqueeze(0).expand(
        model.num_moe_layers,
        -1,
    ).contiguous()
    logical_to_physical_map = logical_to_physical_map.unsqueeze(0).expand(
        model.num_moe_layers,
        -1,
        -1,
    ).contiguous()
    logical_replica_count = logical_replica_count.unsqueeze(0).expand(
        model.num_moe_layers,
        -1,
    ).contiguous()

    expert_load_pass = torch.zeros(
        (model.num_moe_layers, model.num_local_physical_experts),
        dtype=torch.int32,
        device=device,
    )
    expert_load_window_size = parallel_config.eplb_window_size
    expert_load_window = torch.zeros(
        (expert_load_window_size, model.num_moe_layers,
         model.num_local_physical_experts),
        dtype=torch.int32,
        device=device,
    )

    # Set the initial progress of rearrangement to 3/4
    eplb_step_interval = parallel_config.eplb_step_interval
    expert_rearrangement_step = max(
        0, eplb_step_interval - eplb_step_interval // 4)

    model.set_eplb_state(
        expert_load_pass,
        logical_to_physical_map,
        logical_replica_count,
    )

    return cls(
        physical_to_logical_map,
        logical_to_physical_map,
        logical_replica_count,
        expert_load_pass,
        expert_load_window,
        expert_load_window_size=expert_load_window_size,
        expert_rearrangement_step=expert_rearrangement_step,
        expert_rearrangement_step_interval=eplb_step_interval,
    )

build_initial_global_physical_to_logical_map staticmethod

build_initial_global_physical_to_logical_map(
    num_routed_experts: int, num_redundant_experts: int
) -> Sequence[int]

Build an initial expert arrangement using the following structure: [original routed experts, redundant experts]

Returns:

Name Type Description
physical_to_logical_map Sequence[int]

A list of integers, where each integer is the index of the logical expert that the corresponding physical expert maps to.

Source code in vllm/distributed/eplb/eplb_state.py
@staticmethod
def build_initial_global_physical_to_logical_map(
    num_routed_experts: int,
    num_redundant_experts: int,
) -> Sequence[int]:
    """
    Build an initial expert arrangement using the following structure:
    [original routed experts, redundant experts]

    Returns:
        physical_to_logical_map (Sequence[int]): A list of integers,
            where each integer is the index of the logical expert
            that the corresponding physical expert maps to.
    """
    global_physical_to_logical_map = list(range(num_routed_experts))
    global_physical_to_logical_map += [
        i % num_routed_experts for i in range(num_redundant_experts)
    ]
    return global_physical_to_logical_map

rearrange

rearrange(
    model: MixtureOfExperts, is_profile: bool = False
) -> None

Rearrange the experts according to the current load.

Source code in vllm/distributed/eplb/eplb_state.py
def rearrange(self,
              model: MixtureOfExperts,
              is_profile: bool = False) -> None:
    """
    Rearrange the experts according to the current load.
    """

    ep_group = get_ep_group().device_group
    ep_rank = ep_group.rank()

    time_start = None
    is_main_rank = ep_rank == 0
    if is_main_rank:
        torch.cuda.synchronize()
        time_start = time.perf_counter()
        logger.info("Rearranging experts %s...",
                    "(profile)" if is_profile else "")

    # This mapping is only used here, so we do not store it in the state
    physical_expert_start = ep_rank * model.num_local_physical_experts
    physical_expert_end = (physical_expert_start +
                           model.num_local_physical_experts)
    # (num_moe_layers, num_local_physical_experts)
    local_physical_to_logical_map = self.physical_to_logical_map[
        :,
        physical_expert_start:physical_expert_end,
    ]

    # Map the local physical expert load to global logical experts
    logical_expert_load_window = torch.zeros(
        self.expert_load_window_size,
        model.num_moe_layers,
        model.num_logical_experts,
        dtype=self.expert_load_window.dtype,
        device=self.expert_load_window.device,
    )
    logical_expert_load_window.scatter_add_(
        dim=-1,
        index=local_physical_to_logical_map.unsqueeze(0).expand_as(
            self.expert_load_window).long(),
        src=self.expert_load_window,
    )

    # Perform all-reduce to get the expert load across all ranks
    global_expert_load_window = logical_expert_load_window.sum(dim=0)
    all_reduce(global_expert_load_window, group=ep_group)

    # TODO(bowen): Treat differently for prefill and decode nodes
    num_replicas = model.num_physical_experts
    num_groups = model.num_expert_groups
    num_nodes = get_node_count()
    num_gpus = ep_group.size()

    if num_gpus % num_nodes != 0:
        logger.warning_once(
            f"num_gpus % num_nodes != 0, "
            "not using hierarchical rearrangement algorithm.\n"
            f"{num_gpus=}, {num_nodes=}")

    # Get new expert mappings
    (
        new_physical_to_logical_map,
        new_logical_to_physical_map,
        new_logical_replica_count,
    ) = (rebalance_experts(
        global_expert_load_window,
        num_replicas,
        num_groups,
        num_nodes,
        num_gpus,
    ))

    # Update expert weights
    rearrange_expert_weights_inplace(
        self.physical_to_logical_map,
        new_physical_to_logical_map,
        model.expert_weights,
        ep_group,
        is_profile,
    )

    if not is_profile:
        self.physical_to_logical_map.copy_(new_physical_to_logical_map)
        self.logical_to_physical_map.copy_(new_logical_to_physical_map)
        self.logical_replica_count.copy_(new_logical_replica_count)

    if is_main_rank:
        assert time_start is not None
        torch.cuda.synchronize()
        time_end = time.perf_counter()
        logger.info(
            "Rearranged experts%sin %.2f seconds.",
            " (profile) " if is_profile else " ",
            time_end - time_start,
        )

step

step(
    model: MixtureOfExperts,
    is_dummy: bool = False,
    is_profile: bool = False,
    log_stats: bool = False,
) -> None

Step the EPLB state.

Parameters:

Name Type Description Default
model MixtureOfExperts

The MoE model.

required
is_dummy bool

If True, this is a dummy step and the load metrics recorded in this forward pass will not count. Defaults to False.

False
is_profile bool

If True, perform a dummy rearrangement with maximum communication cost. This is used in profile_run to reserve enough memory for the communication buffer.

False
log_stats bool

If True, log the expert load metrics.

False

Stats

The metrics are all summed up across layers.
- `avg_tokens`: The average load across ranks.
- `max_tokens`: The maximum load across ranks.
- `balancedness`: The ratio of average load to maximum load.
Source code in vllm/distributed/eplb/eplb_state.py
def step(self,
         model: MixtureOfExperts,
         is_dummy: bool = False,
         is_profile: bool = False,
         log_stats: bool = False) -> None:
    """
    Step the EPLB state.

    Args:
        model (MixtureOfExperts): The MoE model.
        is_dummy (bool): If `True`, this is a dummy step and the load
          metrics recorded in this forward pass will not count. Defaults
          to `False`.
        is_profile (bool): If `True`, perform a dummy rearrangement
          with maximum communication cost. This is used in `profile_run`
          to reserve enough memory for the communication buffer.
        log_stats (bool): If `True`, log the expert load metrics.

    # Stats
        The metrics are all summed up across layers.
        - `avg_tokens`: The average load across ranks.
        - `max_tokens`: The maximum load across ranks.
        - `balancedness`: The ratio of average load to maximum load.
    """

    if is_profile:
        self.rearrange(model, is_profile=True)
        return

    if is_dummy:
        # Do not record load metrics for dummy steps
        self.expert_load_pass.zero_()

    if log_stats:
        # `num_tokens`: (num_moe_layers,)
        num_tokens = self.expert_load_pass.sum(dim=-1)

        # Collect load metrics from all ranks
        ep_group = get_ep_group().device_group
        num_tokens_list = [
            torch.empty_like(num_tokens) for _ in range(ep_group.size())
        ]
        all_gather(num_tokens_list, num_tokens, group=ep_group)
        # Stack to get (num_ranks, num_moe_layers)
        num_tokens_per_rank = torch.stack(num_tokens_list).float()

        # Compute balancedness ratio:
        # for each layer:
        #   (mean load across ranks) / (max load across ranks)
        avg_tokens_tensor = num_tokens_per_rank.mean(dim=0).sum(dim=0)
        max_tokens_tensor = num_tokens_per_rank.max(dim=0).values.sum(
            dim=0)

        # Just to make type checker happy
        tokens_tensors: list[float] = torch.stack(
            [avg_tokens_tensor, max_tokens_tensor]).tolist()
        avg_tokens, max_tokens = tokens_tensors
        balancedness = avg_tokens / max_tokens if max_tokens > 0 else 0.0

        if ep_group.rank() == 0:
            logger.info(
                "EPLB step: avg_tokens=%.2f, max_tokens=%d, "
                "balancedness=%.4f", avg_tokens, max_tokens, balancedness)

    # Update the expert load sliding window
    if not is_dummy:
        self.expert_load_window[self.expert_load_window_step] = (
            self.expert_load_pass.clone())
        self.expert_load_window_step += 1
        if self.expert_load_window_step >= self.expert_load_window_size:
            self.expert_load_window_step = 0
        self.expert_load_pass.zero_()

    # Step the expert rearrangement step
    # Note that even if this is a dummy step, we still increment the
    # rearrangement step and perform rearrangement to ensure all ranks are
    # performing collective communication.
    self.expert_rearrangement_step += 1
    if (self.expert_rearrangement_step
            >= self.expert_rearrangement_step_interval):
        self.expert_rearrangement_step = 0
        self.rearrange(model)

MixtureOfExperts

Bases: Protocol

Check if the model is a mixture of experts (MoE) model.

Source code in vllm/model_executor/models/interfaces.py
@runtime_checkable
class MixtureOfExperts(Protocol):
    """
    Check if the model is a mixture of experts (MoE) model.
    """

    expert_weights: MutableSequence[Iterable[Tensor]]
    """
    Expert weights saved in this rank.

    The first dimension is the layer, and the second dimension is different
    parameters in the layer, e.g. up/down projection weights.
    """

    num_moe_layers: int
    """Number of MoE layers in this model."""

    num_expert_groups: int
    """Number of expert groups in this model."""

    num_logical_experts: int
    """Number of logical experts in this model."""

    num_physical_experts: int
    """Number of physical experts in this model."""

    num_local_physical_experts: int
    """Number of local physical experts in this model."""

    num_routed_experts: int
    """Number of routed experts in this model."""

    num_shared_experts: int
    """Number of shared experts in this model."""

    num_redundant_experts: int
    """Number of redundant experts in this model."""

    def set_eplb_state(
        self,
        expert_load_view: Tensor,
        logical_to_physical_map: Tensor,
        logical_replica_count: Tensor,
    ) -> None:
        """
        Register the EPLB state in the MoE model.

        Since these are views of the actual EPLB state, any changes made by
        the EPLB algorithm are automatically reflected in the model's behavior
        without requiring additional method calls to set new states.

        You should also collect model's `expert_weights` here instead of in
        the weight loader, since after initial weight loading, further
        processing like quantization may be applied to the weights.

        Args:
            expert_load_view: A view of the expert load metrics tensor.
            logical_to_physical_map: Mapping from logical to physical experts.
            logical_replica_count: Count of replicas for each logical expert.
        """
        ...

expert_weights instance-attribute

expert_weights: MutableSequence[Iterable[Tensor]]

Expert weights saved in this rank.

The first dimension is the layer, and the second dimension is different parameters in the layer, e.g. up/down projection weights.

num_expert_groups instance-attribute

num_expert_groups: int

Number of expert groups in this model.

num_local_physical_experts instance-attribute

num_local_physical_experts: int

Number of local physical experts in this model.

num_logical_experts instance-attribute

num_logical_experts: int

Number of logical experts in this model.

num_moe_layers instance-attribute

num_moe_layers: int

Number of MoE layers in this model.

num_physical_experts instance-attribute

num_physical_experts: int

Number of physical experts in this model.

num_redundant_experts instance-attribute

num_redundant_experts: int

Number of redundant experts in this model.

num_routed_experts instance-attribute

num_routed_experts: int

Number of routed experts in this model.

num_shared_experts instance-attribute

num_shared_experts: int

Number of shared experts in this model.

set_eplb_state

set_eplb_state(
    expert_load_view: Tensor,
    logical_to_physical_map: Tensor,
    logical_replica_count: Tensor,
) -> None

Register the EPLB state in the MoE model.

Since these are views of the actual EPLB state, any changes made by the EPLB algorithm are automatically reflected in the model's behavior without requiring additional method calls to set new states.

You should also collect model's expert_weights here instead of in the weight loader, since after initial weight loading, further processing like quantization may be applied to the weights.

Parameters:

Name Type Description Default
expert_load_view Tensor

A view of the expert load metrics tensor.

required
logical_to_physical_map Tensor

Mapping from logical to physical experts.

required
logical_replica_count Tensor

Count of replicas for each logical expert.

required
Source code in vllm/model_executor/models/interfaces.py
def set_eplb_state(
    self,
    expert_load_view: Tensor,
    logical_to_physical_map: Tensor,
    logical_replica_count: Tensor,
) -> None:
    """
    Register the EPLB state in the MoE model.

    Since these are views of the actual EPLB state, any changes made by
    the EPLB algorithm are automatically reflected in the model's behavior
    without requiring additional method calls to set new states.

    You should also collect model's `expert_weights` here instead of in
    the weight loader, since after initial weight loading, further
    processing like quantization may be applied to the weights.

    Args:
        expert_load_view: A view of the expert load metrics tensor.
        logical_to_physical_map: Mapping from logical to physical experts.
        logical_replica_count: Count of replicas for each logical expert.
    """
    ...

ParallelConfig

Configuration for the distributed execution.

Source code in vllm/config.py
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
@config
@dataclass
class ParallelConfig:
    """Configuration for the distributed execution."""

    pipeline_parallel_size: int = 1
    """Number of pipeline parallel groups."""
    tensor_parallel_size: int = 1
    """Number of tensor parallel groups."""
    data_parallel_size: int = 1
    """Number of data parallel groups. MoE layers will be sharded according to
    the product of the tensor parallel size and data parallel size."""
    data_parallel_size_local: int = 1
    """Number of local data parallel groups."""
    data_parallel_rank: int = 0
    """Rank of the data parallel group."""
    data_parallel_rank_local: Optional[int] = None
    """Local rank of the data parallel group,
    set only in SPMD mode."""
    data_parallel_master_ip: str = "127.0.0.1"
    """IP of the data parallel master."""
    data_parallel_rpc_port: int = 29550
    """Port for data parallel messaging."""
    data_parallel_master_port: int = 29500
    """Port of the data parallel master."""
    data_parallel_backend: str = "mp"
    """Backend to use for data parallel, either "mp" or "ray"."""
    data_parallel_external_lb: bool = False
    """Whether to use "external" DP LB mode. Applies only to online serving
    and when data_parallel_size > 0. Set implicitly when
    data_parallel_rank is provided explicitly to vllm serve."""
    enable_expert_parallel: bool = False
    """Use expert parallelism instead of tensor parallelism for MoE layers."""
    enable_eplb: bool = False
    """Enable expert parallelism load balancing for MoE layers."""
    num_redundant_experts: int = 0
    """Number of redundant experts to use for expert parallelism."""
    eplb_window_size: int = 1000
    """Window size for expert load recording."""
    eplb_step_interval: int = 3000
    """
    Interval for rearranging experts in expert parallelism.

    Note that if this is greater than the EPLB window size, only the metrics
    of the last `eplb_window_size` steps will be used for rearranging experts.
    """
    eplb_log_balancedness: bool = False
    """
    Log the balancedness each step of expert parallelism.
    This is turned off by default since it will cause communication overhead.
    """

    max_parallel_loading_workers: Optional[int] = None
    """Maximum number of parallel loading workers when loading model
    sequentially in multiple batches. To avoid RAM OOM when using tensor
    parallel and large models."""

    disable_custom_all_reduce: bool = False
    """Disable the custom all-reduce kernel and fall back to NCCL."""

    tokenizer_pool_config: Optional[TokenizerPoolConfig] = None
    """This parameter is deprecated and will be removed in a future release.
    Please remove it from your configs"""

    ray_workers_use_nsight: bool = False
    """Whether to profile Ray workers with nsight, see https://docs.ray.io/en/latest/ray-observability/user-guides/profiling.html#profiling-nsight-profiler."""

    placement_group: Optional["PlacementGroup"] = None
    """ray distributed model workers placement group."""

    distributed_executor_backend: Optional[Union[DistributedExecutorBackend,
                                                 type["ExecutorBase"]]] = None
    """Backend to use for distributed model
    workers, either "ray" or "mp" (multiprocessing). If the product
    of pipeline_parallel_size and tensor_parallel_size is less than
    or equal to the number of GPUs available, "mp" will be used to
    keep processing on a single host. Otherwise, this will default
    to "ray" if Ray is installed and fail otherwise. Note that tpu
    and hpu only support Ray for distributed inference."""

    worker_cls: str = "auto"
    """The full name of the worker class to use. If "auto", the worker class
    will be determined based on the platform."""
    sd_worker_cls: str = "auto"
    """The full name of the worker class to use for speculative decoding.
    If "auto", the worker class will be determined based on the platform."""
    worker_extension_cls: str = ""
    """The full name of the worker extension class to use. The worker extension
    class is dynamically inherited by the worker class. This is used to inject
    new attributes and methods to the worker class for use in collective_rpc
    calls."""

    world_size: int = field(init=False)
    """world_size is TPxPP, it affects the number of workers we create."""

    rank: int = 0
    """Global rank in distributed setup."""

    enable_multimodal_encoder_data_parallel: bool = False
    """ Use data parallelism instead of tensor parallelism for vision encoder.
    Only support LLama4 for now"""

    @property
    def world_size_across_dp(self) -> int:
        """world_size_across_dp is TPxPPxDP, it is the size of the world
        including data parallelism."""
        return self.world_size * self.data_parallel_size

    def get_next_dp_init_port(self) -> int:
        """
        We might need to initialize process groups in multiple
        processes that is related to data parallelism,
        e.g. both in the worker and in the engine, which
        can live in different processes. To avoid port conflicts, we
        increment the port number each time we need to initialize a
        new process group related to data parallelism.
        """
        answer = self.data_parallel_master_port
        self.data_parallel_master_port += 1
        return answer

    def stateless_init_dp_group(self) -> "ProcessGroup":
        # NOTE: In high-concurrency scenarios multiple processes
        # can pick the same (currently free) port through a race
        # condition when calling `get_open_port()`. When the first
        # process binds the port the others will subsequently fail
        # with `torch.distributed.DistNetworkError: EADDRINUSE`.
        # To make the initialization more robust we retry a few times
        # with a fresh port whenever this specific error is observed.
        from torch.distributed import DistNetworkError

        from vllm.distributed.utils import (
            stateless_init_torch_distributed_process_group)

        max_retries = 5
        last_exc: Optional[Exception] = None
        for _ in range(max_retries):
            try:
                # use gloo since the engine process might not have cuda device
                return stateless_init_torch_distributed_process_group(
                    self.data_parallel_master_ip,
                    self.get_next_dp_init_port(),
                    self.data_parallel_rank,
                    self.data_parallel_size,
                    backend="gloo")
            except DistNetworkError as e:
                # We only want to retry when the root cause is EADDRINUSE.
                if "EADDRINUSE" in str(e):
                    logger.warning(
                        "Address already in use. Retrying with a new port.")
                    last_exc = e
                    continue  # try again with a new port
                raise e

        # If we get here all retries have failed.
        assert last_exc is not None
        raise last_exc

    @staticmethod
    def has_unfinished_dp(dp_group: "ProcessGroup",
                          has_unfinished: bool) -> bool:
        tensor = torch.tensor([has_unfinished],
                              dtype=torch.int32,
                              device="cpu")
        # dp rank 0: has_unfinished_seqs=True
        # dp rank 1: has_unfinished_seqs=False
        # aggregated: has_unfinished_seqs=True
        # so this is an OR operation, i.e. MAX in integers
        torch.distributed.all_reduce(tensor, op=ReduceOp.MAX, group=dp_group)
        aggregated_has_unfinished = bool(tensor.item())
        return aggregated_has_unfinished

    def compute_hash(self):
        """
        Provide a hash that uniquely identifies all the configs
        that affect the structure of the computation
        graph from input ids/embeddings to the final hidden states,
        excluding anything before input ids/embeddings and after
        the final hidden states.
        """
        factors: list[Any] = []
        factors.append(self.pipeline_parallel_size)
        factors.append(self.tensor_parallel_size)
        factors.append(self.enable_expert_parallel)
        factors.append(self.data_parallel_size)
        factors.append(envs.VLLM_ALL2ALL_BACKEND)
        return hashlib.sha256(str(factors).encode()).hexdigest()

    def __post_init__(self) -> None:
        self.world_size = self.pipeline_parallel_size * \
            self.tensor_parallel_size

        if self.data_parallel_size_local > self.data_parallel_size:
            raise ValueError(
                f"data_parallel_size_local ({self.data_parallel_size_local}) "
                f"must be <= data_parallel_size ({self.data_parallel_size})")

        if self.data_parallel_size > 1 or self.data_parallel_size_local == 0:
            # Data parallel was specified in the engine args.
            self.data_parallel_master_port = get_open_port()

            if not (0 <= self.data_parallel_rank < self.data_parallel_size):
                raise ValueError(
                    f"data_parallel_rank ({self.data_parallel_rank})"
                    f" must be in the range [0, {self.data_parallel_size})")
        else:
            # Otherwise fall back to env vars (e.g. for offline SPMD case).
            self.data_parallel_size = envs.VLLM_DP_SIZE
            self.data_parallel_rank = envs.VLLM_DP_RANK
            self.data_parallel_rank_local = envs.VLLM_DP_RANK_LOCAL
            self.data_parallel_master_ip = envs.VLLM_DP_MASTER_IP
            self.data_parallel_master_port = envs.VLLM_DP_MASTER_PORT

            if self.data_parallel_external_lb:
                raise ValueError("data_parallel_external_lb can only "
                                 "be set when data_parallel_size > 1")

        if self.distributed_executor_backend == "external_launcher":
            import os
            os.environ["VLLM_ENABLE_V1_MULTIPROCESSING"] = "0"
            logger.info("Disabling V1 multiprocessing for external launcher.")

        if self.enable_eplb:
            if not current_platform.is_cuda():
                raise ValueError(
                    "Expert parallelism load balancing is only supported on "
                    "CUDA devices now.")
            if self.num_redundant_experts < 0:
                raise ValueError(
                    "num_redundant_experts must be non-negative, but got "
                    f"{self.num_redundant_experts}.")
        else:
            if self.num_redundant_experts != 0:
                raise ValueError(
                    "num_redundant_experts should be used with EPLB."
                    f"{self.num_redundant_experts}.")
        if self.distributed_executor_backend is None and self.world_size > 1:
            # We use multiprocessing by default if world_size fits on the
            # current node and we aren't in a ray placement group.

            from vllm.executor import ray_utils
            backend: DistributedExecutorBackend = "mp"
            ray_found = ray_utils.ray_is_available()
            if current_platform.is_neuron():
                # neuron uses single process to control multiple devices
                backend = "uni"
            elif current_platform.is_tpu() and envs.VLLM_XLA_USE_SPMD:
                backend = "uni"
            elif (current_platform.is_cuda()
                  and cuda_device_count_stateless() < self.world_size):
                if not ray_found:
                    raise ValueError("Unable to load Ray which is "
                                     "required for multi-node inference, "
                                     "please install Ray with `pip install "
                                     "ray`.") from ray_utils.ray_import_err
                backend = "ray"
            elif self.data_parallel_backend == "ray":
                logger.info("Using ray distributed inference because "
                            "data_parallel_backend is ray")
                backend = "ray"
            elif ray_found:
                if self.placement_group:
                    backend = "ray"
                else:
                    from ray import is_initialized as ray_is_initialized
                    if ray_is_initialized():
                        from ray.util import get_current_placement_group
                        if get_current_placement_group():
                            backend = "ray"
            self.distributed_executor_backend = backend
            logger.debug("Defaulting to use %s for distributed inference",
                         backend)

        if self.distributed_executor_backend is None and self.world_size == 1:
            self.distributed_executor_backend = "uni"

    @property
    def use_ray(self) -> bool:
        return self.distributed_executor_backend == "ray" or (
            isinstance(self.distributed_executor_backend, type)
            and self.distributed_executor_backend.uses_ray)

    @model_validator(mode='after')
    def _verify_args(self) -> Self:
        # Lazy import to avoid circular import
        from vllm.executor.executor_base import ExecutorBase
        from vllm.platforms import current_platform
        if self.distributed_executor_backend not in (
                "ray", "mp", "uni",
                "external_launcher", None) and not (isinstance(
                    self.distributed_executor_backend, type) and issubclass(
                        self.distributed_executor_backend, ExecutorBase)):
            raise ValueError(
                "Unrecognized distributed executor backend "
                f"{self.distributed_executor_backend}. Supported "
                "values are 'ray', 'mp' 'uni', 'external_launcher' or"
                " custom ExecutorBase subclass.")
        if self.use_ray:
            from vllm.executor import ray_utils
            ray_utils.assert_ray_available()

        if not current_platform.use_custom_allreduce():
            self.disable_custom_all_reduce = True
            logger.debug(
                "Disabled the custom all-reduce kernel because it is not "
                "supported on current platform.")
        if self.ray_workers_use_nsight and not self.use_ray:
            raise ValueError("Unable to use nsight profiling unless workers "
                             "run with Ray.")

        return self

data_parallel_backend class-attribute instance-attribute

data_parallel_backend: str = 'mp'

Backend to use for data parallel, either "mp" or "ray".

data_parallel_external_lb class-attribute instance-attribute

data_parallel_external_lb: bool = False

Whether to use "external" DP LB mode. Applies only to online serving and when data_parallel_size > 0. Set implicitly when data_parallel_rank is provided explicitly to vllm serve.

data_parallel_master_ip class-attribute instance-attribute

data_parallel_master_ip: str = '127.0.0.1'

IP of the data parallel master.

data_parallel_master_port class-attribute instance-attribute

data_parallel_master_port: int = 29500

Port of the data parallel master.

data_parallel_rank class-attribute instance-attribute

data_parallel_rank: int = 0

Rank of the data parallel group.

data_parallel_rank_local class-attribute instance-attribute

data_parallel_rank_local: Optional[int] = None

Local rank of the data parallel group, set only in SPMD mode.

data_parallel_rpc_port class-attribute instance-attribute

data_parallel_rpc_port: int = 29550

Port for data parallel messaging.

data_parallel_size class-attribute instance-attribute

data_parallel_size: int = 1

Number of data parallel groups. MoE layers will be sharded according to the product of the tensor parallel size and data parallel size.

data_parallel_size_local class-attribute instance-attribute

data_parallel_size_local: int = 1

Number of local data parallel groups.

disable_custom_all_reduce class-attribute instance-attribute

disable_custom_all_reduce: bool = False

Disable the custom all-reduce kernel and fall back to NCCL.

distributed_executor_backend class-attribute instance-attribute

distributed_executor_backend: Optional[
    Union[DistributedExecutorBackend, type[ExecutorBase]]
] = None

Backend to use for distributed model workers, either "ray" or "mp" (multiprocessing). If the product of pipeline_parallel_size and tensor_parallel_size is less than or equal to the number of GPUs available, "mp" will be used to keep processing on a single host. Otherwise, this will default to "ray" if Ray is installed and fail otherwise. Note that tpu and hpu only support Ray for distributed inference.

enable_eplb class-attribute instance-attribute

enable_eplb: bool = False

Enable expert parallelism load balancing for MoE layers.

enable_expert_parallel class-attribute instance-attribute

enable_expert_parallel: bool = False

Use expert parallelism instead of tensor parallelism for MoE layers.

enable_multimodal_encoder_data_parallel class-attribute instance-attribute

enable_multimodal_encoder_data_parallel: bool = False

Use data parallelism instead of tensor parallelism for vision encoder. Only support LLama4 for now

eplb_log_balancedness class-attribute instance-attribute

eplb_log_balancedness: bool = False

Log the balancedness each step of expert parallelism. This is turned off by default since it will cause communication overhead.

eplb_step_interval class-attribute instance-attribute

eplb_step_interval: int = 3000

Interval for rearranging experts in expert parallelism.

Note that if this is greater than the EPLB window size, only the metrics of the last eplb_window_size steps will be used for rearranging experts.

eplb_window_size class-attribute instance-attribute

eplb_window_size: int = 1000

Window size for expert load recording.

max_parallel_loading_workers class-attribute instance-attribute

max_parallel_loading_workers: Optional[int] = None

Maximum number of parallel loading workers when loading model sequentially in multiple batches. To avoid RAM OOM when using tensor parallel and large models.

num_redundant_experts class-attribute instance-attribute

num_redundant_experts: int = 0

Number of redundant experts to use for expert parallelism.

pipeline_parallel_size class-attribute instance-attribute

pipeline_parallel_size: int = 1

Number of pipeline parallel groups.

placement_group class-attribute instance-attribute

placement_group: Optional[PlacementGroup] = None

ray distributed model workers placement group.

rank class-attribute instance-attribute

rank: int = 0

Global rank in distributed setup.

ray_workers_use_nsight class-attribute instance-attribute

ray_workers_use_nsight: bool = False

Whether to profile Ray workers with nsight, see https://docs.ray.io/en/latest/ray-observability/user-guides/profiling.html#profiling-nsight-profiler.

sd_worker_cls class-attribute instance-attribute

sd_worker_cls: str = 'auto'

The full name of the worker class to use for speculative decoding. If "auto", the worker class will be determined based on the platform.

tensor_parallel_size class-attribute instance-attribute

tensor_parallel_size: int = 1

Number of tensor parallel groups.

tokenizer_pool_config class-attribute instance-attribute

tokenizer_pool_config: Optional[TokenizerPoolConfig] = None

This parameter is deprecated and will be removed in a future release. Please remove it from your configs

use_ray property

use_ray: bool

worker_cls class-attribute instance-attribute

worker_cls: str = 'auto'

The full name of the worker class to use. If "auto", the worker class will be determined based on the platform.

worker_extension_cls class-attribute instance-attribute

worker_extension_cls: str = ''

The full name of the worker extension class to use. The worker extension class is dynamically inherited by the worker class. This is used to inject new attributes and methods to the worker class for use in collective_rpc calls.

world_size class-attribute instance-attribute

world_size: int = field(init=False)

world_size is TPxPP, it affects the number of workers we create.

world_size_across_dp property

world_size_across_dp: int

world_size_across_dp is TPxPPxDP, it is the size of the world including data parallelism.

__post_init__

__post_init__() -> None
Source code in vllm/config.py
def __post_init__(self) -> None:
    self.world_size = self.pipeline_parallel_size * \
        self.tensor_parallel_size

    if self.data_parallel_size_local > self.data_parallel_size:
        raise ValueError(
            f"data_parallel_size_local ({self.data_parallel_size_local}) "
            f"must be <= data_parallel_size ({self.data_parallel_size})")

    if self.data_parallel_size > 1 or self.data_parallel_size_local == 0:
        # Data parallel was specified in the engine args.
        self.data_parallel_master_port = get_open_port()

        if not (0 <= self.data_parallel_rank < self.data_parallel_size):
            raise ValueError(
                f"data_parallel_rank ({self.data_parallel_rank})"
                f" must be in the range [0, {self.data_parallel_size})")
    else:
        # Otherwise fall back to env vars (e.g. for offline SPMD case).
        self.data_parallel_size = envs.VLLM_DP_SIZE
        self.data_parallel_rank = envs.VLLM_DP_RANK
        self.data_parallel_rank_local = envs.VLLM_DP_RANK_LOCAL
        self.data_parallel_master_ip = envs.VLLM_DP_MASTER_IP
        self.data_parallel_master_port = envs.VLLM_DP_MASTER_PORT

        if self.data_parallel_external_lb:
            raise ValueError("data_parallel_external_lb can only "
                             "be set when data_parallel_size > 1")

    if self.distributed_executor_backend == "external_launcher":
        import os
        os.environ["VLLM_ENABLE_V1_MULTIPROCESSING"] = "0"
        logger.info("Disabling V1 multiprocessing for external launcher.")

    if self.enable_eplb:
        if not current_platform.is_cuda():
            raise ValueError(
                "Expert parallelism load balancing is only supported on "
                "CUDA devices now.")
        if self.num_redundant_experts < 0:
            raise ValueError(
                "num_redundant_experts must be non-negative, but got "
                f"{self.num_redundant_experts}.")
    else:
        if self.num_redundant_experts != 0:
            raise ValueError(
                "num_redundant_experts should be used with EPLB."
                f"{self.num_redundant_experts}.")
    if self.distributed_executor_backend is None and self.world_size > 1:
        # We use multiprocessing by default if world_size fits on the
        # current node and we aren't in a ray placement group.

        from vllm.executor import ray_utils
        backend: DistributedExecutorBackend = "mp"
        ray_found = ray_utils.ray_is_available()
        if current_platform.is_neuron():
            # neuron uses single process to control multiple devices
            backend = "uni"
        elif current_platform.is_tpu() and envs.VLLM_XLA_USE_SPMD:
            backend = "uni"
        elif (current_platform.is_cuda()
              and cuda_device_count_stateless() < self.world_size):
            if not ray_found:
                raise ValueError("Unable to load Ray which is "
                                 "required for multi-node inference, "
                                 "please install Ray with `pip install "
                                 "ray`.") from ray_utils.ray_import_err
            backend = "ray"
        elif self.data_parallel_backend == "ray":
            logger.info("Using ray distributed inference because "
                        "data_parallel_backend is ray")
            backend = "ray"
        elif ray_found:
            if self.placement_group:
                backend = "ray"
            else:
                from ray import is_initialized as ray_is_initialized
                if ray_is_initialized():
                    from ray.util import get_current_placement_group
                    if get_current_placement_group():
                        backend = "ray"
        self.distributed_executor_backend = backend
        logger.debug("Defaulting to use %s for distributed inference",
                     backend)

    if self.distributed_executor_backend is None and self.world_size == 1:
        self.distributed_executor_backend = "uni"

_verify_args

_verify_args() -> Self
Source code in vllm/config.py
@model_validator(mode='after')
def _verify_args(self) -> Self:
    # Lazy import to avoid circular import
    from vllm.executor.executor_base import ExecutorBase
    from vllm.platforms import current_platform
    if self.distributed_executor_backend not in (
            "ray", "mp", "uni",
            "external_launcher", None) and not (isinstance(
                self.distributed_executor_backend, type) and issubclass(
                    self.distributed_executor_backend, ExecutorBase)):
        raise ValueError(
            "Unrecognized distributed executor backend "
            f"{self.distributed_executor_backend}. Supported "
            "values are 'ray', 'mp' 'uni', 'external_launcher' or"
            " custom ExecutorBase subclass.")
    if self.use_ray:
        from vllm.executor import ray_utils
        ray_utils.assert_ray_available()

    if not current_platform.use_custom_allreduce():
        self.disable_custom_all_reduce = True
        logger.debug(
            "Disabled the custom all-reduce kernel because it is not "
            "supported on current platform.")
    if self.ray_workers_use_nsight and not self.use_ray:
        raise ValueError("Unable to use nsight profiling unless workers "
                         "run with Ray.")

    return self

compute_hash

compute_hash()

Provide a hash that uniquely identifies all the configs that affect the structure of the computation graph from input ids/embeddings to the final hidden states, excluding anything before input ids/embeddings and after the final hidden states.

Source code in vllm/config.py
def compute_hash(self):
    """
    Provide a hash that uniquely identifies all the configs
    that affect the structure of the computation
    graph from input ids/embeddings to the final hidden states,
    excluding anything before input ids/embeddings and after
    the final hidden states.
    """
    factors: list[Any] = []
    factors.append(self.pipeline_parallel_size)
    factors.append(self.tensor_parallel_size)
    factors.append(self.enable_expert_parallel)
    factors.append(self.data_parallel_size)
    factors.append(envs.VLLM_ALL2ALL_BACKEND)
    return hashlib.sha256(str(factors).encode()).hexdigest()

get_next_dp_init_port

get_next_dp_init_port() -> int

We might need to initialize process groups in multiple processes that is related to data parallelism, e.g. both in the worker and in the engine, which can live in different processes. To avoid port conflicts, we increment the port number each time we need to initialize a new process group related to data parallelism.

Source code in vllm/config.py
def get_next_dp_init_port(self) -> int:
    """
    We might need to initialize process groups in multiple
    processes that is related to data parallelism,
    e.g. both in the worker and in the engine, which
    can live in different processes. To avoid port conflicts, we
    increment the port number each time we need to initialize a
    new process group related to data parallelism.
    """
    answer = self.data_parallel_master_port
    self.data_parallel_master_port += 1
    return answer

has_unfinished_dp staticmethod

has_unfinished_dp(
    dp_group: ProcessGroup, has_unfinished: bool
) -> bool
Source code in vllm/config.py
@staticmethod
def has_unfinished_dp(dp_group: "ProcessGroup",
                      has_unfinished: bool) -> bool:
    tensor = torch.tensor([has_unfinished],
                          dtype=torch.int32,
                          device="cpu")
    # dp rank 0: has_unfinished_seqs=True
    # dp rank 1: has_unfinished_seqs=False
    # aggregated: has_unfinished_seqs=True
    # so this is an OR operation, i.e. MAX in integers
    torch.distributed.all_reduce(tensor, op=ReduceOp.MAX, group=dp_group)
    aggregated_has_unfinished = bool(tensor.item())
    return aggregated_has_unfinished

stateless_init_dp_group

stateless_init_dp_group() -> ProcessGroup
Source code in vllm/config.py
def stateless_init_dp_group(self) -> "ProcessGroup":
    # NOTE: In high-concurrency scenarios multiple processes
    # can pick the same (currently free) port through a race
    # condition when calling `get_open_port()`. When the first
    # process binds the port the others will subsequently fail
    # with `torch.distributed.DistNetworkError: EADDRINUSE`.
    # To make the initialization more robust we retry a few times
    # with a fresh port whenever this specific error is observed.
    from torch.distributed import DistNetworkError

    from vllm.distributed.utils import (
        stateless_init_torch_distributed_process_group)

    max_retries = 5
    last_exc: Optional[Exception] = None
    for _ in range(max_retries):
        try:
            # use gloo since the engine process might not have cuda device
            return stateless_init_torch_distributed_process_group(
                self.data_parallel_master_ip,
                self.get_next_dp_init_port(),
                self.data_parallel_rank,
                self.data_parallel_size,
                backend="gloo")
        except DistNetworkError as e:
            # We only want to retry when the root cause is EADDRINUSE.
            if "EADDRINUSE" in str(e):
                logger.warning(
                    "Address already in use. Retrying with a new port.")
                last_exc = e
                continue  # try again with a new port
            raise e

    # If we get here all retries have failed.
    assert last_exc is not None
    raise last_exc

get_ep_group

get_ep_group() -> GroupCoordinator
Source code in vllm/distributed/parallel_state.py
def get_ep_group() -> GroupCoordinator:
    assert _EP is not None, ("expert parallel group is not initialized")
    return _EP

get_node_count

get_node_count() -> int

Return the total number of nodes in the distributed environment.

Source code in vllm/distributed/parallel_state.py
def get_node_count() -> int:
    """Return the total number of nodes in the distributed environment. """
    assert _NODE_COUNT is not None, (
        "distributed environment is not initialized")
    return _NODE_COUNT

init_logger

init_logger(name: str) -> _VllmLogger

The main purpose of this function is to ensure that loggers are retrieved in such a way that we can be sure the root vllm logger has already been configured.

Source code in vllm/logger.py
def init_logger(name: str) -> _VllmLogger:
    """The main purpose of this function is to ensure that loggers are
    retrieved in such a way that we can be sure the root vllm logger has
    already been configured."""

    logger = logging.getLogger(name)

    methods_to_patch = {
        "info_once": _print_info_once,
        "warning_once": _print_warning_once,
    }

    for method_name, method in methods_to_patch.items():
        setattr(logger, method_name, MethodType(method, logger))

    return cast(_VllmLogger, logger)

rearrange_expert_weights_inplace

rearrange_expert_weights_inplace(
    old_global_expert_indices: Tensor,
    new_global_expert_indices: Tensor,
    expert_weights: Sequence[Iterable[Tensor]],
    ep_group: ProcessGroup,
    is_profile: bool = False,
) -> None

Rearranges the expert weights in place according to the new expert indices.

The value of the indices arguments are logical indices of the experts, while keys are physical.

Parameters:

Name Type Description Default
old_global_expert_indices Tensor

Shape (num_moe_layers, num_physical_experts).

required
new_global_expert_indices Tensor

Shape (num_moe_layers, num_physical_experts).

required
expert_weights Sequence[Iterable[Tensor]]

A sequence of shape (num_moe_layers)(weight_count) of tensors of shape (num_local_physical_experts, hidden_size_i). For example, a linear layer may have up and down projection, so weight_count = 2. Each weight's hidden size can be different.

required
ep_group ProcessGroup

The device process group for expert parallelism.

required
is_profile bool

If True, do not perform any actual weight copy. This is used during profile run, where we only perform dummy communications to reserve enough memory for the buffers.

False
Source code in vllm/distributed/eplb/rebalance_execute.py
def rearrange_expert_weights_inplace(
    old_global_expert_indices: torch.Tensor,
    new_global_expert_indices: torch.Tensor,
    expert_weights: Sequence[Iterable[torch.Tensor]],
    ep_group: ProcessGroup,
    is_profile: bool = False,
) -> None:
    """
    Rearranges the expert weights in place according to the new expert indices.

    The value of the indices arguments are logical indices of the experts,
    while keys are physical.

    Args:
        old_global_expert_indices: Shape (num_moe_layers, num_physical_experts).
        new_global_expert_indices: Shape (num_moe_layers, num_physical_experts).
        expert_weights: A sequence of shape (num_moe_layers)(weight_count)
            of tensors of shape (num_local_physical_experts, hidden_size_i).
            For example, a linear layer may have up and down projection,
            so weight_count = 2. Each weight's hidden size can be different.
        ep_group: The device process group for expert parallelism.
        is_profile (bool): If `True`, do not perform any actual weight copy.
            This is used during profile run, where we only perform dummy
            communications to reserve enough memory for the buffers.
    """
    num_moe_layers, num_physical_experts = old_global_expert_indices.shape
    assert len(expert_weights) == num_moe_layers

    num_local_physical_experts = next(iter(expert_weights[0])).shape[0]
    assert new_global_expert_indices.shape == (num_moe_layers,
                                               num_physical_experts)

    ep_rank = ep_group.rank()
    ep_size = ep_group.size()
    assert num_physical_experts == ep_size * num_local_physical_experts

    # A buffer to hold the expert weights in one layer during the exchange.
    # NOTE: Currently we assume the same weights across different layers
    # have the same shape.
    expert_weights_buffer = [torch.empty_like(w) for w in expert_weights[0]]

    if is_profile:
        # Maximum send size is to send all local experts to all ranks,
        # So we use a dummy `all_gather` to reserve enough communication buffer
        for weight, buffer in zip(expert_weights[0], expert_weights_buffer):
            # A `/dev/null`-like buffer to avoid real memory allocation
            dummy_recv_buffer = [buffer for _ in range(ep_size)]
            # NOTE(bowen): Needed this barrier to avoid OOM during actual
            # execution. I'm not very sure why this is needed
            torch.distributed.barrier()
            all_gather(
                dummy_recv_buffer,
                weight,
                group=ep_group,
            )
        return

    for layer in range(num_moe_layers):
        # NOTE(bowen): We need this synchronize to run, but I don't know why.
        # If you figure out the reason, please let me know -- thank you!
        torch.cuda.synchronize()
        shuffle_layer(
            num_local_physical_experts,
            ep_rank,
            old_global_expert_indices[layer].tolist(),
            new_global_expert_indices[layer].tolist(),
            expert_weights[layer],
            expert_weights_buffer,
            ep_group,
        )

rebalance_experts

rebalance_experts(
    weight: Tensor,
    num_replicas: int,
    num_groups: int,
    num_nodes: int,
    num_gpus: int,
) -> tuple[Tensor, Tensor, Tensor]

Entry point for expert-parallelism load balancer.

Parameters:

Name Type Description Default
weight Tensor

[layers, num_logical_experts], the load statistics for all logical experts

required
num_replicas int

number of physical experts, must be a multiple of num_gpus

required
num_groups int

number of expert groups

required
num_nodes int

number of server nodes, where the intra-node network (e.g, NVLink) is faster

required
num_gpus int

number of GPUs, must be a multiple of num_nodes

required

Returns:

Name Type Description
physical_to_logical_map Tensor

[layers, num_replicas], the expert index of each replica

logical_to_physical_map Tensor

[layers, num_logical_experts, X], the replica indices for each expert

expert_count Tensor

[layers, num_logical_experts], number of physical replicas for each logical expert

Source code in vllm/distributed/eplb/rebalance_algo.py
def rebalance_experts(
    weight: torch.Tensor,
    num_replicas: int,
    num_groups: int,
    num_nodes: int,
    num_gpus: int,
) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
    """
    Entry point for expert-parallelism load balancer.

    Parameters:
        weight: [layers, num_logical_experts], the load statistics for all
            logical experts
        num_replicas: number of physical experts, must be a multiple of
            `num_gpus`
        num_groups: number of expert groups
        num_nodes: number of server nodes, where the intra-node network
            (e.g, NVLink) is faster
        num_gpus: number of GPUs, must be a multiple of `num_nodes`

    Returns:
        physical_to_logical_map: [layers, num_replicas], the expert index of
            each replica
        logical_to_physical_map: [layers, num_logical_experts, X], the replica
            indices for each expert
        expert_count: [layers, num_logical_experts], number of physical
            replicas for each logical expert
    """
    num_layers, num_logical_experts = weight.shape
    weight = weight.float().cpu()
    if num_groups % num_nodes == 0:
        # use hierarchical load-balance policy
        phy2log, phyrank, logcnt = rebalance_experts_hierarchical(
            weight, num_replicas, num_groups, num_nodes, num_gpus)
    else:
        # use global load-balance policy
        phy2log, phyrank, logcnt = rebalance_experts_hierarchical(
            weight, num_replicas, 1, 1, num_gpus)
    num_redundant_experts = num_replicas - num_logical_experts
    maxlogcnt = num_redundant_experts + 1
    log2phy: torch.Tensor = torch.full(
        (num_layers, num_logical_experts, maxlogcnt),
        -1,
        dtype=torch.int64,
        device=logcnt.device,
    )
    log2phy.view(num_layers, -1).scatter_(
        -1,
        phy2log * maxlogcnt + phyrank,
        torch.arange(num_replicas, dtype=torch.int64,
                     device=log2phy.device).expand(num_layers, -1),
    )
    return phy2log, log2phy, logcnt