Skip to content

vllm.executor.executor_base

_R module-attribute

_R = TypeVar('_R', default=Any)

logger module-attribute

logger = init_logger(__name__)

DistributedExecutorBase

Bases: ExecutorBase

Abstract superclass of distributed executor implementations.

Source code in vllm/executor/executor_base.py
class DistributedExecutorBase(ExecutorBase):
    """Abstract superclass of distributed executor implementations."""

    def __init__(self, *args, **kwargs):
        # This is non-None when the execute model loop is running
        # in the parallel workers. It's a coroutine in the AsyncLLMEngine case.
        self.parallel_worker_tasks: Optional[Union[Any, Awaitable[Any]]] = None

        super().__init__(*args, **kwargs)

    def execute_model(
        self,
        execute_model_req: ExecuteModelRequest,
    ) -> List[SamplerOutput]:
        # TODO: unify into collective_rpc
        if self.parallel_worker_tasks is None:
            self.parallel_worker_tasks = self._run_workers(
                "start_worker_execution_loop",
                async_run_tensor_parallel_workers_only=True)

        # Only the driver worker returns the sampling results.
        driver_outputs = self._driver_execute_model(execute_model_req)
        assert driver_outputs is not None
        return driver_outputs

    def stop_remote_worker_execution_loop(self) -> None:
        if self.parallel_worker_tasks is None:
            return

        self._driver_execute_model(execute_model_req=None)
        parallel_worker_tasks = self.parallel_worker_tasks
        self.parallel_worker_tasks = None
        # Ensure that workers exit model loop cleanly
        # (this will raise otherwise)
        self._wait_for_tasks_completion(parallel_worker_tasks)

    @abstractmethod
    def _driver_execute_model(
        self, execute_model_req: Optional[ExecuteModelRequest]
    ) -> Optional[List[SamplerOutput]]:
        """Run execute_model in the driver worker.

        Passing None will cause the driver to stop the model execution loop
        running in each of the remote workers. In this case, this method
        returns None. Otherwise, this method returns the model output.
        """
        raise NotImplementedError

    def collective_rpc(self,
                       method: Union[str, Callable],
                       timeout: Optional[float] = None,
                       args: Tuple = (),
                       kwargs: Optional[Dict] = None) -> List[Any]:
        return self._run_workers(method, *args, **(kwargs or {}))

    @abstractmethod
    def _run_workers(
        self,
        method: Union[str, Callable],
        *args,
        async_run_tensor_parallel_workers_only: bool = False,
        max_concurrent_workers: Optional[int] = None,
        **kwargs,
    ) -> Any:
        """Runs the given method on all workers.

        Args:
            async_run_tensor_parallel_workers_only: If True the method will be
                run only in the remote TP workers, not the driver worker.
                It will also be run asynchronously and return a list of futures
                rather than blocking on the results.

        # TODO: simplify and merge with collective_rpc
        """
        raise NotImplementedError

    @abstractmethod
    def _wait_for_tasks_completion(self, parallel_worker_tasks: Any) -> None:
        """Wait for futures returned from _run_workers() with
        async_run_remote_workers_only to complete."""
        raise NotImplementedError

    async def execute_model_async(
            self,
            execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]:
        if self.parallel_worker_tasks is None:
            # Start model execution loop running in the parallel workers
            self.parallel_worker_tasks = asyncio.create_task(
                self._start_worker_execution_loop())

        # Only the driver worker returns the sampling results.
        return await self._driver_execute_model_async(execute_model_req)

    async def stop_remote_worker_execution_loop_async(self) -> None:
        if self.parallel_worker_tasks is None:
            return

        await self._driver_execute_model_async()
        parallel_worker_tasks = self.parallel_worker_tasks
        self.parallel_worker_tasks = None
        # Ensure that workers exit model loop cleanly
        # (this will raise otherwise)
        await parallel_worker_tasks

    @abstractmethod
    async def _driver_execute_model_async(
        self,
        execute_model_req: Optional[ExecuteModelRequest] = None,
    ) -> List[SamplerOutput]:
        """Execute the model asynchronously in the driver worker.

        Passing None will cause the driver to stop the model execution
        loop running in each of the remote workers.
        """
        raise NotImplementedError

    @abstractmethod
    async def _start_worker_execution_loop(self):
        """Run execution loop on all workers. It guarantees all workers run
        the loop or None of them is running the loop. Loop can be stopped by
        `stop_remote_worker_execution_loop`.
        The API is idempotent (guarantee only 1 loop run at any moment)."""
        raise NotImplementedError

parallel_worker_tasks instance-attribute

parallel_worker_tasks: Optional[
    Union[Any, Awaitable[Any]]
] = None

__init__

__init__(*args, **kwargs)
Source code in vllm/executor/executor_base.py
def __init__(self, *args, **kwargs):
    # This is non-None when the execute model loop is running
    # in the parallel workers. It's a coroutine in the AsyncLLMEngine case.
    self.parallel_worker_tasks: Optional[Union[Any, Awaitable[Any]]] = None

    super().__init__(*args, **kwargs)

_driver_execute_model abstractmethod

_driver_execute_model(
    execute_model_req: Optional[ExecuteModelRequest],
) -> Optional[List[SamplerOutput]]

Run execute_model in the driver worker.

Passing None will cause the driver to stop the model execution loop running in each of the remote workers. In this case, this method returns None. Otherwise, this method returns the model output.

Source code in vllm/executor/executor_base.py
@abstractmethod
def _driver_execute_model(
    self, execute_model_req: Optional[ExecuteModelRequest]
) -> Optional[List[SamplerOutput]]:
    """Run execute_model in the driver worker.

    Passing None will cause the driver to stop the model execution loop
    running in each of the remote workers. In this case, this method
    returns None. Otherwise, this method returns the model output.
    """
    raise NotImplementedError

_driver_execute_model_async abstractmethod async

_driver_execute_model_async(
    execute_model_req: Optional[ExecuteModelRequest] = None,
) -> List[SamplerOutput]

Execute the model asynchronously in the driver worker.

Passing None will cause the driver to stop the model execution loop running in each of the remote workers.

Source code in vllm/executor/executor_base.py
@abstractmethod
async def _driver_execute_model_async(
    self,
    execute_model_req: Optional[ExecuteModelRequest] = None,
) -> List[SamplerOutput]:
    """Execute the model asynchronously in the driver worker.

    Passing None will cause the driver to stop the model execution
    loop running in each of the remote workers.
    """
    raise NotImplementedError

_run_workers abstractmethod

_run_workers(
    method: Union[str, Callable],
    *args,
    async_run_tensor_parallel_workers_only: bool = False,
    max_concurrent_workers: Optional[int] = None,
    **kwargs,
) -> Any

Runs the given method on all workers.

Parameters:

Name Type Description Default
async_run_tensor_parallel_workers_only bool

If True the method will be run only in the remote TP workers, not the driver worker. It will also be run asynchronously and return a list of futures rather than blocking on the results.

False

TODO: simplify and merge with collective_rpc

Source code in vllm/executor/executor_base.py
@abstractmethod
def _run_workers(
    self,
    method: Union[str, Callable],
    *args,
    async_run_tensor_parallel_workers_only: bool = False,
    max_concurrent_workers: Optional[int] = None,
    **kwargs,
) -> Any:
    """Runs the given method on all workers.

    Args:
        async_run_tensor_parallel_workers_only: If True the method will be
            run only in the remote TP workers, not the driver worker.
            It will also be run asynchronously and return a list of futures
            rather than blocking on the results.

    # TODO: simplify and merge with collective_rpc
    """
    raise NotImplementedError

_start_worker_execution_loop abstractmethod async

_start_worker_execution_loop()

Run execution loop on all workers. It guarantees all workers run the loop or None of them is running the loop. Loop can be stopped by stop_remote_worker_execution_loop. The API is idempotent (guarantee only 1 loop run at any moment).

Source code in vllm/executor/executor_base.py
@abstractmethod
async def _start_worker_execution_loop(self):
    """Run execution loop on all workers. It guarantees all workers run
    the loop or None of them is running the loop. Loop can be stopped by
    `stop_remote_worker_execution_loop`.
    The API is idempotent (guarantee only 1 loop run at any moment)."""
    raise NotImplementedError

_wait_for_tasks_completion abstractmethod

_wait_for_tasks_completion(
    parallel_worker_tasks: Any,
) -> None

Wait for futures returned from _run_workers() with async_run_remote_workers_only to complete.

Source code in vllm/executor/executor_base.py
@abstractmethod
def _wait_for_tasks_completion(self, parallel_worker_tasks: Any) -> None:
    """Wait for futures returned from _run_workers() with
    async_run_remote_workers_only to complete."""
    raise NotImplementedError

collective_rpc

collective_rpc(
    method: Union[str, Callable],
    timeout: Optional[float] = None,
    args: Tuple = (),
    kwargs: Optional[Dict] = None,
) -> List[Any]
Source code in vllm/executor/executor_base.py
def collective_rpc(self,
                   method: Union[str, Callable],
                   timeout: Optional[float] = None,
                   args: Tuple = (),
                   kwargs: Optional[Dict] = None) -> List[Any]:
    return self._run_workers(method, *args, **(kwargs or {}))

execute_model

execute_model(
    execute_model_req: ExecuteModelRequest,
) -> List[SamplerOutput]
Source code in vllm/executor/executor_base.py
def execute_model(
    self,
    execute_model_req: ExecuteModelRequest,
) -> List[SamplerOutput]:
    # TODO: unify into collective_rpc
    if self.parallel_worker_tasks is None:
        self.parallel_worker_tasks = self._run_workers(
            "start_worker_execution_loop",
            async_run_tensor_parallel_workers_only=True)

    # Only the driver worker returns the sampling results.
    driver_outputs = self._driver_execute_model(execute_model_req)
    assert driver_outputs is not None
    return driver_outputs

execute_model_async async

execute_model_async(
    execute_model_req: ExecuteModelRequest,
) -> List[SamplerOutput]
Source code in vllm/executor/executor_base.py
async def execute_model_async(
        self,
        execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]:
    if self.parallel_worker_tasks is None:
        # Start model execution loop running in the parallel workers
        self.parallel_worker_tasks = asyncio.create_task(
            self._start_worker_execution_loop())

    # Only the driver worker returns the sampling results.
    return await self._driver_execute_model_async(execute_model_req)

stop_remote_worker_execution_loop

stop_remote_worker_execution_loop() -> None
Source code in vllm/executor/executor_base.py
def stop_remote_worker_execution_loop(self) -> None:
    if self.parallel_worker_tasks is None:
        return

    self._driver_execute_model(execute_model_req=None)
    parallel_worker_tasks = self.parallel_worker_tasks
    self.parallel_worker_tasks = None
    # Ensure that workers exit model loop cleanly
    # (this will raise otherwise)
    self._wait_for_tasks_completion(parallel_worker_tasks)

stop_remote_worker_execution_loop_async async

stop_remote_worker_execution_loop_async() -> None
Source code in vllm/executor/executor_base.py
async def stop_remote_worker_execution_loop_async(self) -> None:
    if self.parallel_worker_tasks is None:
        return

    await self._driver_execute_model_async()
    parallel_worker_tasks = self.parallel_worker_tasks
    self.parallel_worker_tasks = None
    # Ensure that workers exit model loop cleanly
    # (this will raise otherwise)
    await parallel_worker_tasks

ExecutorBase

Bases: ABC

Base class for all executors.

An executor is responsible for executing the model on one device, or it can be a distributed executor that can execute the model on multiple devices.

Source code in vllm/executor/executor_base.py
class ExecutorBase(ABC):
    """Base class for all executors.

    An executor is responsible for executing the model on one device,
    or it can be a distributed executor 
    that can execute the model on multiple devices.
    """

    uses_ray: bool  # whether the executor uses Ray for orchestration.

    def __init__(
        self,
        vllm_config: VllmConfig,
    ) -> None:
        self.vllm_config = vllm_config
        self.model_config = vllm_config.model_config
        self.cache_config = vllm_config.cache_config
        self.lora_config = vllm_config.lora_config
        self.load_config = vllm_config.load_config
        self.parallel_config = vllm_config.parallel_config
        self.scheduler_config = vllm_config.scheduler_config
        self.device_config = vllm_config.device_config
        self.speculative_config = vllm_config.speculative_config
        self.prompt_adapter_config = vllm_config.prompt_adapter_config
        self.observability_config = vllm_config.observability_config
        self._init_executor()
        self.is_sleeping = False
        self.sleeping_tags: set[str] = set()

    @abstractmethod
    def _init_executor(self) -> None:
        raise NotImplementedError

    @abstractmethod
    def collective_rpc(self,
                       method: Union[str, Callable[..., _R]],
                       timeout: Optional[float] = None,
                       args: Tuple = (),
                       kwargs: Optional[Dict[str, Any]] = None) -> List[_R]:
        """
        Execute an RPC call on all workers.

        Args:
            method: Name of the worker method to execute, or a callable that
                is serialized and sent to all workers to execute.

                If the method is a callable, it should accept an additional
                `self` argument, in addition to the arguments passed in `args`
                and `kwargs`. The `self` argument will be the worker object.
            timeout: Maximum time in seconds to wait for execution. Raises a
                [`TimeoutError`][] on timeout. `None` means wait indefinitely.
            args: Positional arguments to pass to the worker method.
            kwargs: Keyword arguments to pass to the worker method.

        Returns:
            A list containing the results from each worker.

        Note:
            It is recommended to use this API to only pass control messages,
            and set up data-plane communication to pass data.
        """
        raise NotImplementedError

    def determine_num_available_blocks(self) -> Tuple[int, int]:
        """Determine the number of available blocks for the GPU KV cache and
        swappable CPU KV cache.

        Normally, this should simply delegate to the underlying Worker. Some
        ExecutorBase may require modification of the result, e.g. to ensure the
        selected cache sizes are compatible with all workers.

        Returns a Tuple[num_gpu_blocks, num_cpu_blocks], where num_gpu_blocks
        are blocks that are "active" on the device and can be appended to.
        num_cpu_blocks refers to "swapped" blocks in CPU memory and cannot be
        appended to.
        """
        results = self.collective_rpc("determine_num_available_blocks")
        a = min([r[0] for r in results])
        b = min([r[1] for r in results])
        return a, b

    def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks) -> None:
        """Initialize the KV cache by invoking the underlying worker.
        """
        # NOTE: This is logged in the executor because there can be >1 workers.
        logger.info("# %s blocks: %d, # CPU blocks: %d",
                    vllm.platforms.current_platform.device_name,
                    num_gpu_blocks, num_cpu_blocks)
        max_concurrency = (num_gpu_blocks * self.cache_config.block_size /
                           self.model_config.max_model_len)
        logger.info("Maximum concurrency for %s tokens per request: %.2fx",
                    self.model_config.max_model_len, max_concurrency)

        self.cache_config.num_gpu_blocks = num_gpu_blocks
        self.cache_config.num_cpu_blocks = num_cpu_blocks

        self.collective_rpc("initialize_cache",
                            args=(num_gpu_blocks, num_cpu_blocks))

    def apply_model(self, func: Callable[[nn.Module], _R]) -> list[_R]:
        """
        Run a function directly on the model inside each worker,
        returning the result for each of them.
        """

        def rpc_func(worker: WorkerBase) -> _R:
            return func(worker.get_model())

        return self.collective_rpc(rpc_func)

    def execute_model(
        self, execute_model_req: ExecuteModelRequest
    ) -> Optional[List[Union[SamplerOutput, PoolerOutput]]]:
        output = self.collective_rpc("execute_model",
                                     args=(execute_model_req, ))
        return output[0]

    def stop_remote_worker_execution_loop(self) -> None:
        """Releases parallel workers from model loop."""
        return

    def add_lora(self, lora_request: LoRARequest) -> bool:
        assert lora_request.lora_int_id > 0, "lora_id must be greater than 0."
        return all(self.collective_rpc("add_lora", args=(lora_request, )))

    def remove_lora(self, lora_id: int) -> bool:
        assert lora_id > 0, "lora_id must be greater than 0."
        return all(self.collective_rpc("remove_lora", args=(lora_id, )))

    def pin_lora(self, lora_id: int) -> bool:
        assert lora_id > 0, "lora_id must be greater than 0."
        return all(self.collective_rpc("pin_lora", args=(lora_id, )))

    def list_loras(self) -> Set[int]:
        sets = self.collective_rpc("list_loras")
        for s in sets:
            assert s == sets[0], "All workers should have the same LORAs."
        return sets[0]

    def add_prompt_adapter(
            self, prompt_adapter_request: PromptAdapterRequest) -> bool:
        assert prompt_adapter_request.prompt_adapter_id > 0, \
            "prompt_adapter_id must be greater than 0."
        return all(
            self.collective_rpc("add_prompt_adapter",
                                args=(prompt_adapter_request, )))

    def remove_prompt_adapter(self, prompt_adapter_id: int) -> bool:
        assert prompt_adapter_id > 0, \
            "prompt_adapter_id must be greater than 0."
        return all(
            self.collective_rpc("remove_prompt_adapter",
                                args=(prompt_adapter_id, )))

    def pin_prompt_adapter(self, prompt_adapter_id: int) -> bool:
        assert prompt_adapter_id > 0, \
            "prompt_adapter_id must be greater than 0."
        return all(
            self.collective_rpc("pin_prompt_adapter",
                                args=(prompt_adapter_id, )))

    def list_prompt_adapters(self) -> Set[int]:
        sets = self.collective_rpc("list_prompt_adapters")
        for s in sets:
            assert (s == sets[0]
                    ), "All workers should have the same prompt adapters."
        return sets[0]

    def start_profile(self) -> None:
        self.collective_rpc("start_profile")

    def stop_profile(self) -> None:
        self.collective_rpc("stop_profile")

    def sleep(self, level: int = 1):
        if self.is_sleeping:
            logger.warning("Executor is already sleeping.")
            return
        time_before_sleep = time.perf_counter()
        self.collective_rpc("sleep", kwargs=dict(level=level))
        time_after_sleep = time.perf_counter()
        self.sleeping_tags = {"weights", "kv_cache"}
        self.is_sleeping = True
        logger.info("It took %.6f seconds to fall asleep.",
                    time_after_sleep - time_before_sleep)

    def wake_up(self, tags: Optional[list[str]] = None):
        if not self.is_sleeping:
            logger.warning("Executor is not sleeping.")
            return
        if tags:
            for tag in tags:
                if tag not in self.sleeping_tags:
                    logger.warning("Tag %s is not in sleeping tags %s", tag,
                                   self.sleeping_tags)
                    return
        time_before_wakeup = time.perf_counter()
        self.collective_rpc("wake_up", kwargs=dict(tags=tags))
        time_after_wakeup = time.perf_counter()
        logger.info("It took %.6f seconds to wake up tags %s.",
                    time_after_wakeup - time_before_wakeup,
                    tags if tags is not None else self.sleeping_tags)
        if tags:
            for tag in tags:
                self.sleeping_tags.remove(tag)
        else:
            self.sleeping_tags.clear()
        if not self.sleeping_tags:
            self.is_sleeping = False

    def save_sharded_state(
        self,
        path: str,
        pattern: Optional[str] = None,
        max_size: Optional[int] = None,
    ) -> None:
        self.collective_rpc("save_sharded_state",
                            kwargs=dict(path=path,
                                        pattern=pattern,
                                        max_size=max_size))

    @abstractmethod
    def check_health(self) -> None:
        """Checks if the executor is healthy. If not, it should raise an
        exception."""
        raise NotImplementedError

    def shutdown(self) -> None:
        """Shutdown the executor."""
        return

    def __del__(self):
        self.shutdown()

    async def execute_model_async(
            self,
            execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]:
        """Executes one model step on the given sequences."""
        output = await make_async(self.execute_model)(execute_model_req)
        return output

    async def stop_remote_worker_execution_loop_async(self) -> None:
        """Releases parallel workers from model loop."""
        return

    async def check_health_async(self) -> None:
        """Checks if the executor is healthy. If not, it should raise an
        exception."""
        self.check_health()

cache_config instance-attribute

cache_config = cache_config

device_config instance-attribute

device_config = device_config

is_sleeping instance-attribute

is_sleeping = False

load_config instance-attribute

load_config = load_config

lora_config instance-attribute

lora_config = lora_config

model_config instance-attribute

model_config = model_config

observability_config instance-attribute

observability_config = observability_config

parallel_config instance-attribute

parallel_config = parallel_config

prompt_adapter_config instance-attribute

prompt_adapter_config = prompt_adapter_config

scheduler_config instance-attribute

scheduler_config = scheduler_config

sleeping_tags instance-attribute

sleeping_tags: set[str] = set()

speculative_config instance-attribute

speculative_config = speculative_config

uses_ray instance-attribute

uses_ray: bool

vllm_config instance-attribute

vllm_config = vllm_config

__del__

__del__()
Source code in vllm/executor/executor_base.py
def __del__(self):
    self.shutdown()

__init__

__init__(vllm_config: VllmConfig) -> None
Source code in vllm/executor/executor_base.py
def __init__(
    self,
    vllm_config: VllmConfig,
) -> None:
    self.vllm_config = vllm_config
    self.model_config = vllm_config.model_config
    self.cache_config = vllm_config.cache_config
    self.lora_config = vllm_config.lora_config
    self.load_config = vllm_config.load_config
    self.parallel_config = vllm_config.parallel_config
    self.scheduler_config = vllm_config.scheduler_config
    self.device_config = vllm_config.device_config
    self.speculative_config = vllm_config.speculative_config
    self.prompt_adapter_config = vllm_config.prompt_adapter_config
    self.observability_config = vllm_config.observability_config
    self._init_executor()
    self.is_sleeping = False
    self.sleeping_tags: set[str] = set()

_init_executor abstractmethod

_init_executor() -> None
Source code in vllm/executor/executor_base.py
@abstractmethod
def _init_executor(self) -> None:
    raise NotImplementedError

add_lora

add_lora(lora_request: LoRARequest) -> bool
Source code in vllm/executor/executor_base.py
def add_lora(self, lora_request: LoRARequest) -> bool:
    assert lora_request.lora_int_id > 0, "lora_id must be greater than 0."
    return all(self.collective_rpc("add_lora", args=(lora_request, )))

add_prompt_adapter

add_prompt_adapter(
    prompt_adapter_request: PromptAdapterRequest,
) -> bool
Source code in vllm/executor/executor_base.py
def add_prompt_adapter(
        self, prompt_adapter_request: PromptAdapterRequest) -> bool:
    assert prompt_adapter_request.prompt_adapter_id > 0, \
        "prompt_adapter_id must be greater than 0."
    return all(
        self.collective_rpc("add_prompt_adapter",
                            args=(prompt_adapter_request, )))

apply_model

apply_model(func: Callable[[Module], _R]) -> list[_R]

Run a function directly on the model inside each worker, returning the result for each of them.

Source code in vllm/executor/executor_base.py
def apply_model(self, func: Callable[[nn.Module], _R]) -> list[_R]:
    """
    Run a function directly on the model inside each worker,
    returning the result for each of them.
    """

    def rpc_func(worker: WorkerBase) -> _R:
        return func(worker.get_model())

    return self.collective_rpc(rpc_func)

check_health abstractmethod

check_health() -> None

Checks if the executor is healthy. If not, it should raise an exception.

Source code in vllm/executor/executor_base.py
@abstractmethod
def check_health(self) -> None:
    """Checks if the executor is healthy. If not, it should raise an
    exception."""
    raise NotImplementedError

check_health_async async

check_health_async() -> None

Checks if the executor is healthy. If not, it should raise an exception.

Source code in vllm/executor/executor_base.py
async def check_health_async(self) -> None:
    """Checks if the executor is healthy. If not, it should raise an
    exception."""
    self.check_health()

collective_rpc abstractmethod

collective_rpc(
    method: Union[str, Callable[..., _R]],
    timeout: Optional[float] = None,
    args: Tuple = (),
    kwargs: Optional[Dict[str, Any]] = None,
) -> List[_R]

Execute an RPC call on all workers.

Parameters:

Name Type Description Default
method Union[str, Callable[..., _R]]

Name of the worker method to execute, or a callable that is serialized and sent to all workers to execute.

If the method is a callable, it should accept an additional self argument, in addition to the arguments passed in args and kwargs. The self argument will be the worker object.

required
timeout Optional[float]

Maximum time in seconds to wait for execution. Raises a TimeoutError on timeout. None means wait indefinitely.

None
args Tuple

Positional arguments to pass to the worker method.

()
kwargs Optional[Dict[str, Any]]

Keyword arguments to pass to the worker method.

None

Returns:

Type Description
List[_R]

A list containing the results from each worker.

Note

It is recommended to use this API to only pass control messages, and set up data-plane communication to pass data.

Source code in vllm/executor/executor_base.py
@abstractmethod
def collective_rpc(self,
                   method: Union[str, Callable[..., _R]],
                   timeout: Optional[float] = None,
                   args: Tuple = (),
                   kwargs: Optional[Dict[str, Any]] = None) -> List[_R]:
    """
    Execute an RPC call on all workers.

    Args:
        method: Name of the worker method to execute, or a callable that
            is serialized and sent to all workers to execute.

            If the method is a callable, it should accept an additional
            `self` argument, in addition to the arguments passed in `args`
            and `kwargs`. The `self` argument will be the worker object.
        timeout: Maximum time in seconds to wait for execution. Raises a
            [`TimeoutError`][] on timeout. `None` means wait indefinitely.
        args: Positional arguments to pass to the worker method.
        kwargs: Keyword arguments to pass to the worker method.

    Returns:
        A list containing the results from each worker.

    Note:
        It is recommended to use this API to only pass control messages,
        and set up data-plane communication to pass data.
    """
    raise NotImplementedError

determine_num_available_blocks

determine_num_available_blocks() -> Tuple[int, int]

Determine the number of available blocks for the GPU KV cache and swappable CPU KV cache.

Normally, this should simply delegate to the underlying Worker. Some ExecutorBase may require modification of the result, e.g. to ensure the selected cache sizes are compatible with all workers.

Returns a Tuple[num_gpu_blocks, num_cpu_blocks], where num_gpu_blocks are blocks that are "active" on the device and can be appended to. num_cpu_blocks refers to "swapped" blocks in CPU memory and cannot be appended to.

Source code in vllm/executor/executor_base.py
def determine_num_available_blocks(self) -> Tuple[int, int]:
    """Determine the number of available blocks for the GPU KV cache and
    swappable CPU KV cache.

    Normally, this should simply delegate to the underlying Worker. Some
    ExecutorBase may require modification of the result, e.g. to ensure the
    selected cache sizes are compatible with all workers.

    Returns a Tuple[num_gpu_blocks, num_cpu_blocks], where num_gpu_blocks
    are blocks that are "active" on the device and can be appended to.
    num_cpu_blocks refers to "swapped" blocks in CPU memory and cannot be
    appended to.
    """
    results = self.collective_rpc("determine_num_available_blocks")
    a = min([r[0] for r in results])
    b = min([r[1] for r in results])
    return a, b

execute_model

execute_model(
    execute_model_req: ExecuteModelRequest,
) -> Optional[List[Union[SamplerOutput, PoolerOutput]]]
Source code in vllm/executor/executor_base.py
def execute_model(
    self, execute_model_req: ExecuteModelRequest
) -> Optional[List[Union[SamplerOutput, PoolerOutput]]]:
    output = self.collective_rpc("execute_model",
                                 args=(execute_model_req, ))
    return output[0]

execute_model_async async

execute_model_async(
    execute_model_req: ExecuteModelRequest,
) -> List[SamplerOutput]

Executes one model step on the given sequences.

Source code in vllm/executor/executor_base.py
async def execute_model_async(
        self,
        execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]:
    """Executes one model step on the given sequences."""
    output = await make_async(self.execute_model)(execute_model_req)
    return output

initialize_cache

initialize_cache(
    num_gpu_blocks: int, num_cpu_blocks
) -> None

Initialize the KV cache by invoking the underlying worker.

Source code in vllm/executor/executor_base.py
def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks) -> None:
    """Initialize the KV cache by invoking the underlying worker.
    """
    # NOTE: This is logged in the executor because there can be >1 workers.
    logger.info("# %s blocks: %d, # CPU blocks: %d",
                vllm.platforms.current_platform.device_name,
                num_gpu_blocks, num_cpu_blocks)
    max_concurrency = (num_gpu_blocks * self.cache_config.block_size /
                       self.model_config.max_model_len)
    logger.info("Maximum concurrency for %s tokens per request: %.2fx",
                self.model_config.max_model_len, max_concurrency)

    self.cache_config.num_gpu_blocks = num_gpu_blocks
    self.cache_config.num_cpu_blocks = num_cpu_blocks

    self.collective_rpc("initialize_cache",
                        args=(num_gpu_blocks, num_cpu_blocks))

list_loras

list_loras() -> Set[int]
Source code in vllm/executor/executor_base.py
def list_loras(self) -> Set[int]:
    sets = self.collective_rpc("list_loras")
    for s in sets:
        assert s == sets[0], "All workers should have the same LORAs."
    return sets[0]

list_prompt_adapters

list_prompt_adapters() -> Set[int]
Source code in vllm/executor/executor_base.py
def list_prompt_adapters(self) -> Set[int]:
    sets = self.collective_rpc("list_prompt_adapters")
    for s in sets:
        assert (s == sets[0]
                ), "All workers should have the same prompt adapters."
    return sets[0]

pin_lora

pin_lora(lora_id: int) -> bool
Source code in vllm/executor/executor_base.py
def pin_lora(self, lora_id: int) -> bool:
    assert lora_id > 0, "lora_id must be greater than 0."
    return all(self.collective_rpc("pin_lora", args=(lora_id, )))

pin_prompt_adapter

pin_prompt_adapter(prompt_adapter_id: int) -> bool
Source code in vllm/executor/executor_base.py
def pin_prompt_adapter(self, prompt_adapter_id: int) -> bool:
    assert prompt_adapter_id > 0, \
        "prompt_adapter_id must be greater than 0."
    return all(
        self.collective_rpc("pin_prompt_adapter",
                            args=(prompt_adapter_id, )))

remove_lora

remove_lora(lora_id: int) -> bool
Source code in vllm/executor/executor_base.py
def remove_lora(self, lora_id: int) -> bool:
    assert lora_id > 0, "lora_id must be greater than 0."
    return all(self.collective_rpc("remove_lora", args=(lora_id, )))

remove_prompt_adapter

remove_prompt_adapter(prompt_adapter_id: int) -> bool
Source code in vllm/executor/executor_base.py
def remove_prompt_adapter(self, prompt_adapter_id: int) -> bool:
    assert prompt_adapter_id > 0, \
        "prompt_adapter_id must be greater than 0."
    return all(
        self.collective_rpc("remove_prompt_adapter",
                            args=(prompt_adapter_id, )))

save_sharded_state

save_sharded_state(
    path: str,
    pattern: Optional[str] = None,
    max_size: Optional[int] = None,
) -> None
Source code in vllm/executor/executor_base.py
def save_sharded_state(
    self,
    path: str,
    pattern: Optional[str] = None,
    max_size: Optional[int] = None,
) -> None:
    self.collective_rpc("save_sharded_state",
                        kwargs=dict(path=path,
                                    pattern=pattern,
                                    max_size=max_size))

shutdown

shutdown() -> None

Shutdown the executor.

Source code in vllm/executor/executor_base.py
def shutdown(self) -> None:
    """Shutdown the executor."""
    return

sleep

sleep(level: int = 1)
Source code in vllm/executor/executor_base.py
def sleep(self, level: int = 1):
    if self.is_sleeping:
        logger.warning("Executor is already sleeping.")
        return
    time_before_sleep = time.perf_counter()
    self.collective_rpc("sleep", kwargs=dict(level=level))
    time_after_sleep = time.perf_counter()
    self.sleeping_tags = {"weights", "kv_cache"}
    self.is_sleeping = True
    logger.info("It took %.6f seconds to fall asleep.",
                time_after_sleep - time_before_sleep)

start_profile

start_profile() -> None
Source code in vllm/executor/executor_base.py
def start_profile(self) -> None:
    self.collective_rpc("start_profile")

stop_profile

stop_profile() -> None
Source code in vllm/executor/executor_base.py
def stop_profile(self) -> None:
    self.collective_rpc("stop_profile")

stop_remote_worker_execution_loop

stop_remote_worker_execution_loop() -> None

Releases parallel workers from model loop.

Source code in vllm/executor/executor_base.py
def stop_remote_worker_execution_loop(self) -> None:
    """Releases parallel workers from model loop."""
    return

stop_remote_worker_execution_loop_async async

stop_remote_worker_execution_loop_async() -> None

Releases parallel workers from model loop.

Source code in vllm/executor/executor_base.py
async def stop_remote_worker_execution_loop_async(self) -> None:
    """Releases parallel workers from model loop."""
    return

wake_up

wake_up(tags: Optional[list[str]] = None)
Source code in vllm/executor/executor_base.py
def wake_up(self, tags: Optional[list[str]] = None):
    if not self.is_sleeping:
        logger.warning("Executor is not sleeping.")
        return
    if tags:
        for tag in tags:
            if tag not in self.sleeping_tags:
                logger.warning("Tag %s is not in sleeping tags %s", tag,
                               self.sleeping_tags)
                return
    time_before_wakeup = time.perf_counter()
    self.collective_rpc("wake_up", kwargs=dict(tags=tags))
    time_after_wakeup = time.perf_counter()
    logger.info("It took %.6f seconds to wake up tags %s.",
                time_after_wakeup - time_before_wakeup,
                tags if tags is not None else self.sleeping_tags)
    if tags:
        for tag in tags:
            self.sleeping_tags.remove(tag)
    else:
        self.sleeping_tags.clear()
    if not self.sleeping_tags:
        self.is_sleeping = False