Skip to content

vllm.v1.executor.abstract

FailureCallback module-attribute

FailureCallback = Callable[[], None]

Executor

Bases: ExecutorBase

Abstract class for v1 executors, mainly define some methods for v1. For methods shared by v0 and v1, define them in ExecutorBase

Source code in vllm/v1/executor/abstract.py
class Executor(ExecutorBase):
    """
    Abstract class for v1 executors, mainly define some methods for v1.
    For methods shared by v0 and v1, define them in ExecutorBase"""

    @staticmethod
    def get_class(vllm_config: VllmConfig) -> type["Executor"]:
        executor_class: type[Executor]
        parallel_config = vllm_config.parallel_config
        distributed_executor_backend = (
            parallel_config.distributed_executor_backend)
        # distributed_executor_backend must be set in VllmConfig.__post_init__
        if isinstance(distributed_executor_backend, type):
            if not issubclass(distributed_executor_backend, ExecutorBase):
                raise TypeError(
                    "distributed_executor_backend must be a subclass of "
                    f"ExecutorBase. Got {distributed_executor_backend}.")
            executor_class = distributed_executor_backend
        elif distributed_executor_backend == "ray":
            from vllm.v1.executor.ray_distributed_executor import (  # noqa
                RayDistributedExecutor)
            executor_class = RayDistributedExecutor
        elif distributed_executor_backend == "mp":
            from vllm.v1.executor.multiproc_executor import MultiprocExecutor
            executor_class = MultiprocExecutor
        elif distributed_executor_backend == "uni":
            executor_class = UniProcExecutor
        elif distributed_executor_backend == "external_launcher":
            # TODO: make v1 scheduling deterministic
            # to support external launcher
            executor_class = ExecutorWithExternalLauncher
        else:
            raise ValueError("Unknown distributed executor backend: "
                             f"{distributed_executor_backend}")
        return executor_class

    def initialize_from_config(self,
                               kv_cache_configs: list[KVCacheConfig]) -> None:
        """
        Initialize the KV caches and begin the model execution loop of the
        underlying workers.
        """
        self.collective_rpc("initialize_from_config",
                            args=(kv_cache_configs, ))
        self.collective_rpc("compile_or_warm_up_model")

    def register_failure_callback(self, callback: FailureCallback):
        """
        Register a function to be called if the executor enters a permanent
        failed state.
        """
        pass

    def determine_available_memory(self) -> list[int]:  # in bytes
        output = self.collective_rpc("determine_available_memory")
        return output

    def get_kv_cache_specs(self) -> list[dict[str, KVCacheSpec]]:
        output = self.collective_rpc("get_kv_cache_spec")
        return output

    def execute_model(
        self,
        scheduler_output,
    ) -> Union[ModelRunnerOutput, Future[ModelRunnerOutput]]:
        output = self.collective_rpc("execute_model",
                                     args=(scheduler_output, ))
        return output[0]

    @property
    def max_concurrent_batches(self) -> int:
        return 1

    def profile(self, is_start: bool = True):
        self.collective_rpc("profile", args=(is_start, ))

max_concurrent_batches property

max_concurrent_batches: int

determine_available_memory

determine_available_memory() -> list[int]
Source code in vllm/v1/executor/abstract.py
def determine_available_memory(self) -> list[int]:  # in bytes
    output = self.collective_rpc("determine_available_memory")
    return output

execute_model

execute_model(
    scheduler_output,
) -> Union[ModelRunnerOutput, Future[ModelRunnerOutput]]
Source code in vllm/v1/executor/abstract.py
def execute_model(
    self,
    scheduler_output,
) -> Union[ModelRunnerOutput, Future[ModelRunnerOutput]]:
    output = self.collective_rpc("execute_model",
                                 args=(scheduler_output, ))
    return output[0]

get_class staticmethod

get_class(vllm_config: VllmConfig) -> type[Executor]
Source code in vllm/v1/executor/abstract.py
@staticmethod
def get_class(vllm_config: VllmConfig) -> type["Executor"]:
    executor_class: type[Executor]
    parallel_config = vllm_config.parallel_config
    distributed_executor_backend = (
        parallel_config.distributed_executor_backend)
    # distributed_executor_backend must be set in VllmConfig.__post_init__
    if isinstance(distributed_executor_backend, type):
        if not issubclass(distributed_executor_backend, ExecutorBase):
            raise TypeError(
                "distributed_executor_backend must be a subclass of "
                f"ExecutorBase. Got {distributed_executor_backend}.")
        executor_class = distributed_executor_backend
    elif distributed_executor_backend == "ray":
        from vllm.v1.executor.ray_distributed_executor import (  # noqa
            RayDistributedExecutor)
        executor_class = RayDistributedExecutor
    elif distributed_executor_backend == "mp":
        from vllm.v1.executor.multiproc_executor import MultiprocExecutor
        executor_class = MultiprocExecutor
    elif distributed_executor_backend == "uni":
        executor_class = UniProcExecutor
    elif distributed_executor_backend == "external_launcher":
        # TODO: make v1 scheduling deterministic
        # to support external launcher
        executor_class = ExecutorWithExternalLauncher
    else:
        raise ValueError("Unknown distributed executor backend: "
                         f"{distributed_executor_backend}")
    return executor_class

get_kv_cache_specs

get_kv_cache_specs() -> list[dict[str, KVCacheSpec]]
Source code in vllm/v1/executor/abstract.py
def get_kv_cache_specs(self) -> list[dict[str, KVCacheSpec]]:
    output = self.collective_rpc("get_kv_cache_spec")
    return output

initialize_from_config

initialize_from_config(
    kv_cache_configs: list[KVCacheConfig],
) -> None

Initialize the KV caches and begin the model execution loop of the underlying workers.

Source code in vllm/v1/executor/abstract.py
def initialize_from_config(self,
                           kv_cache_configs: list[KVCacheConfig]) -> None:
    """
    Initialize the KV caches and begin the model execution loop of the
    underlying workers.
    """
    self.collective_rpc("initialize_from_config",
                        args=(kv_cache_configs, ))
    self.collective_rpc("compile_or_warm_up_model")

profile

profile(is_start: bool = True)
Source code in vllm/v1/executor/abstract.py
def profile(self, is_start: bool = True):
    self.collective_rpc("profile", args=(is_start, ))

register_failure_callback

register_failure_callback(callback: FailureCallback)

Register a function to be called if the executor enters a permanent failed state.

Source code in vllm/v1/executor/abstract.py
def register_failure_callback(self, callback: FailureCallback):
    """
    Register a function to be called if the executor enters a permanent
    failed state.
    """
    pass

ExecutorWithExternalLauncher

Bases: ExecutorWithExternalLauncher, Executor

Source code in vllm/v1/executor/abstract.py
class ExecutorWithExternalLauncher(ExecutorWithExternalLauncherV0, Executor):

    def determine_available_memory(self) -> list[int]:  # in bytes
        # same as determine_num_available_blocks in v0,
        # we need to get the min across all ranks.
        memory = super().determine_available_memory()
        from vllm.distributed.parallel_state import get_world_group
        cpu_group = get_world_group().cpu_group
        memory_tensor = torch.tensor([memory], device="cpu", dtype=torch.int64)
        dist.all_reduce(memory_tensor, group=cpu_group, op=dist.ReduceOp.MIN)
        return [memory_tensor.item()]

determine_available_memory

determine_available_memory() -> list[int]
Source code in vllm/v1/executor/abstract.py
def determine_available_memory(self) -> list[int]:  # in bytes
    # same as determine_num_available_blocks in v0,
    # we need to get the min across all ranks.
    memory = super().determine_available_memory()
    from vllm.distributed.parallel_state import get_world_group
    cpu_group = get_world_group().cpu_group
    memory_tensor = torch.tensor([memory], device="cpu", dtype=torch.int64)
    dist.all_reduce(memory_tensor, group=cpu_group, op=dist.ReduceOp.MIN)
    return [memory_tensor.item()]

UniProcExecutor

Bases: UniProcExecutor, Executor

Source code in vllm/v1/executor/abstract.py
class UniProcExecutor(UniProcExecutorV0, Executor):
    pass