Skip to content

vllm.v1.executor.ray_distributed_executor

FutureWrapper

Bases: Future

A wrapper around a Ray output reference to meet the interface of .execute_model().

Source code in vllm/v1/executor/ray_distributed_executor.py
class FutureWrapper(Future):
    """A wrapper around a Ray output reference to meet the interface
    of .execute_model().
    """

    def __init__(self, ref):
        super().__init__()
        self.ref = ref

    def result(self, timeout=None):
        if timeout is not None:
            raise NotImplementedError("timeout is not supported")
        return self.ref.get()

ref instance-attribute

ref = ref

__init__

__init__(ref)
Source code in vllm/v1/executor/ray_distributed_executor.py
def __init__(self, ref):
    super().__init__()
    self.ref = ref

result

result(timeout=None)
Source code in vllm/v1/executor/ray_distributed_executor.py
def result(self, timeout=None):
    if timeout is not None:
        raise NotImplementedError("timeout is not supported")
    return self.ref.get()

RayDistributedExecutor

Bases: RayDistributedExecutor, Executor

Ray distributed executor using Ray Compiled Graphs.

Source code in vllm/v1/executor/ray_distributed_executor.py
class RayDistributedExecutor(RayDistributedExecutorV0, Executor):
    """Ray distributed executor using Ray Compiled Graphs."""

    @property
    def max_concurrent_batches(self) -> int:
        """Ray distributed executor supports pipeline parallelism,
        meaning that it allows PP size batches to be executed concurrently.
        """
        return self.parallel_config.pipeline_parallel_size

    def execute_model(
        self,
        scheduler_output,
    ) -> Union[ModelRunnerOutput, Future[ModelRunnerOutput]]:
        """Execute the model on the Ray workers.

        Args:
            scheduler_output: The scheduler output to execute.

        Returns:
            The model runner output.
        """
        # Build the compiled DAG for the first time.
        if self.forward_dag is None:  # type: ignore
            self.forward_dag = self._compiled_ray_dag(enable_asyncio=False)

        refs = self.forward_dag.execute(scheduler_output)  # type: ignore

        # When PP is not used, we block here until the result is available.
        if self.max_concurrent_batches == 1:
            return refs[0].get()

        # When PP is used, we return a FutureWrapper immediately so that
        # the scheduler can yield to the next batch.
        return FutureWrapper(refs[0])

max_concurrent_batches property

max_concurrent_batches: int

Ray distributed executor supports pipeline parallelism, meaning that it allows PP size batches to be executed concurrently.

execute_model

execute_model(
    scheduler_output,
) -> Union[ModelRunnerOutput, Future[ModelRunnerOutput]]

Execute the model on the Ray workers.

Parameters:

Name Type Description Default
scheduler_output

The scheduler output to execute.

required

Returns:

Type Description
Union[ModelRunnerOutput, Future[ModelRunnerOutput]]

The model runner output.

Source code in vllm/v1/executor/ray_distributed_executor.py
def execute_model(
    self,
    scheduler_output,
) -> Union[ModelRunnerOutput, Future[ModelRunnerOutput]]:
    """Execute the model on the Ray workers.

    Args:
        scheduler_output: The scheduler output to execute.

    Returns:
        The model runner output.
    """
    # Build the compiled DAG for the first time.
    if self.forward_dag is None:  # type: ignore
        self.forward_dag = self._compiled_ray_dag(enable_asyncio=False)

    refs = self.forward_dag.execute(scheduler_output)  # type: ignore

    # When PP is not used, we block here until the result is available.
    if self.max_concurrent_batches == 1:
        return refs[0].get()

    # When PP is used, we return a FutureWrapper immediately so that
    # the scheduler can yield to the next batch.
    return FutureWrapper(refs[0])