Skip to content

vllm.worker.xpu_worker

A XPU worker class.

logger module-attribute

logger = init_logger(__name__)

XPUWorker

Bases: LoRANotSupportedWorkerBase, Worker

A worker class that executes (a partition of) the model on a GPU.

Each worker is associated with a single XPU device. The worker is responsible for maintaining the KV cache and executing the model on the XPU. In case of distributed inference, each worker is assigned a partition of the model.

Source code in vllm/worker/xpu_worker.py
class XPUWorker(LoRANotSupportedWorkerBase, Worker):
    """A worker class that executes (a partition of) the model on a GPU.

    Each worker is associated with a single XPU device. The worker is 
    responsible for maintaining the KV cache and executing the model on the 
    XPU. In case of distributed inference, each worker is assigned a partition
    of the model.
    """

    def __init__(
        self,
        vllm_config: VllmConfig,
        local_rank: int,
        rank: int,
        distributed_init_method: str,
        is_driver_worker: bool = False,
    ) -> None:
        WorkerBase.__init__(self, vllm_config=vllm_config)
        device_config = self.device_config
        parallel_config = self.parallel_config
        assert device_config.device_type == "xpu"
        assert current_platform.is_xpu()

        self.parallel_config.rank = rank

        self.local_rank = local_rank
        self.rank = rank
        self.distributed_init_method = distributed_init_method
        self.is_driver_worker = is_driver_worker
        if parallel_config and is_driver_worker:
            assert rank % parallel_config.tensor_parallel_size == 0, \
                   "Driver worker should be rank 0 of tensor parallel group."

        self.model_runner = XPUModelRunner(  # type: ignore
            vllm_config=vllm_config,
            kv_cache_dtype=self.cache_config.cache_dtype,
            is_driver_worker=is_driver_worker,
        )
        # Uninitialized cache engine. Will be initialized by
        # initialize_cache.
        self.cache_engine: List[CacheEngine]
        self.gpu_cache: Optional[List[List[torch.Tensor]]]

    def init_device(self) -> None:
        if self.device_config.device.type == "xpu" and current_platform.is_xpu(
        ):
            self.device = torch.device(f"xpu:{self.local_rank}")
            torch.xpu.set_device(self.device)
            torch.xpu.empty_cache()
            self.init_gpu_memory = torch.xpu.get_device_properties(
                self.local_rank).total_memory
        else:
            raise RuntimeError(
                f"Not support device type: {self.device_config.device}")
        # Initialize the distributed environment.
        self.init_worker_distributed_environment()
        # Initialize the model.
        set_random_seed(self.model_config.seed)

    # keep this method for `empty_cache` and `synchronize` api
    @torch.inference_mode()
    def determine_num_available_blocks(self) -> Tuple[int, int]:
        """Profiles the peak memory usage of the model to determine how many
        KV blocks may be allocated without OOMs.

        The engine will first conduct a profiling of the existing memory usage.
        Then, it calculate the maximum possible number of GPU and CPU blocks
        that can be allocated with the remaining free memory.

        Tip:
            You may limit the usage of GPU memory
            by adjusting the `gpu_memory_utilization` parameter.
        """
        # Profile the memory usage of the model and get the maximum number of
        # cache blocks that can be allocated with the remaining free memory.
        torch.xpu.empty_cache()

        # Execute a forward pass with dummy inputs to profile the memory usage
        # of the model.
        self.model_runner.profile_run()

        # Calculate the number of blocks that can be allocated with the
        # profiled peak memory.
        torch.xpu.synchronize()
        used_memory = torch.xpu.memory_allocated()
        total_gpu_memory = torch.xpu.get_device_properties(
            self.local_rank).total_memory
        free_gpu_memory = total_gpu_memory - used_memory

        # NOTE(woosuk): Here we assume that the other processes using the same
        # GPU did not change their memory usage during the profiling.
        peak_memory = self.init_gpu_memory - free_gpu_memory
        assert peak_memory > 0, (
            "Error in memory profiling. "
            f"Initial free memory {self.init_gpu_memory}, current free memory"
            f" {free_gpu_memory}. This happens when the GPU memory was "
            "not properly cleaned up before initializing the vLLM instance.")

        cache_block_size = self.get_cache_block_size_bytes()
        num_gpu_blocks = int(
            (total_gpu_memory * self.cache_config.gpu_memory_utilization -
             peak_memory) // cache_block_size)
        num_cpu_blocks = int(self.cache_config.swap_space_bytes //
                             cache_block_size)
        num_gpu_blocks = max(num_gpu_blocks, 0)
        num_cpu_blocks = max(num_cpu_blocks, 0)
        gc.collect()
        torch.xpu.empty_cache()
        return num_gpu_blocks, num_cpu_blocks

    def _warm_up_model(self) -> None:
        # IPEX don't support capture graph yet
        pass

    def init_worker_distributed_environment(self) -> None:
        """Initialize the distributed environment."""

        parallel_config = self.parallel_config
        rank = self.rank
        distributed_init_method = self.distributed_init_method

        if torch.distributed.is_initialized():
            torch_world_size = torch.distributed.get_world_size()
            if torch_world_size != parallel_config.world_size:
                raise RuntimeError(
                    "torch.distributed is already initialized but the torch "
                    "world size does not match parallel_config.world_size "
                    f"({torch_world_size} vs. {parallel_config.world_size}).")
        elif not distributed_init_method:
            raise ValueError(
                "distributed_init_method must be set if torch.distributed "
                "is not already initialized")
        else:
            # use sockets as default Level zero IPC exchange backend. By
            # default oneccl will use `drmfd` as mechanism which need extra
            # dependency (libdrm and drm headers) on your system.
            ENV_CCL_ATL_TRANSPORT = os.getenv("CCL_ATL_TRANSPORT", "ofi")
            ENV_LOCAL_WORLD_SIZE = os.getenv("LOCAL_WORLD_SIZE",
                                             str(parallel_config.world_size))
            os.environ["CCL_ATL_TRANSPORT"] = ENV_CCL_ATL_TRANSPORT
            os.environ["LOCAL_WORLD_SIZE"] = ENV_LOCAL_WORLD_SIZE
            os.environ["LOCAL_RANK"] = str(self.local_rank)
            init_distributed_environment(
                world_size=parallel_config.world_size,
                rank=rank,
                distributed_init_method=distributed_init_method,
                local_rank=self.local_rank,
                backend="ccl")

        ensure_model_parallel_initialized(
            parallel_config.tensor_parallel_size,
            parallel_config.pipeline_parallel_size)
        # global all_reduce needed for overall oneccl warm up
        torch.distributed.all_reduce(torch.zeros(1).xpu())

        if parallel_config.pipeline_parallel_size > 1:
            # Add pp group init to avoid
            # p2p communication as the first call
            get_pp_group().all_reduce(torch.zeros(1).xpu())

cache_engine instance-attribute

cache_engine: List[CacheEngine]

distributed_init_method instance-attribute

distributed_init_method = distributed_init_method

gpu_cache instance-attribute

gpu_cache: Optional[List[List[Tensor]]]

is_driver_worker instance-attribute

is_driver_worker = is_driver_worker

local_rank instance-attribute

local_rank = local_rank

model_runner instance-attribute

model_runner = XPUModelRunner(
    vllm_config=vllm_config,
    kv_cache_dtype=cache_dtype,
    is_driver_worker=is_driver_worker,
)

rank instance-attribute

rank = rank

__init__

__init__(
    vllm_config: VllmConfig,
    local_rank: int,
    rank: int,
    distributed_init_method: str,
    is_driver_worker: bool = False,
) -> None
Source code in vllm/worker/xpu_worker.py
def __init__(
    self,
    vllm_config: VllmConfig,
    local_rank: int,
    rank: int,
    distributed_init_method: str,
    is_driver_worker: bool = False,
) -> None:
    WorkerBase.__init__(self, vllm_config=vllm_config)
    device_config = self.device_config
    parallel_config = self.parallel_config
    assert device_config.device_type == "xpu"
    assert current_platform.is_xpu()

    self.parallel_config.rank = rank

    self.local_rank = local_rank
    self.rank = rank
    self.distributed_init_method = distributed_init_method
    self.is_driver_worker = is_driver_worker
    if parallel_config and is_driver_worker:
        assert rank % parallel_config.tensor_parallel_size == 0, \
               "Driver worker should be rank 0 of tensor parallel group."

    self.model_runner = XPUModelRunner(  # type: ignore
        vllm_config=vllm_config,
        kv_cache_dtype=self.cache_config.cache_dtype,
        is_driver_worker=is_driver_worker,
    )
    # Uninitialized cache engine. Will be initialized by
    # initialize_cache.
    self.cache_engine: List[CacheEngine]
    self.gpu_cache: Optional[List[List[torch.Tensor]]]

_warm_up_model

_warm_up_model() -> None
Source code in vllm/worker/xpu_worker.py
def _warm_up_model(self) -> None:
    # IPEX don't support capture graph yet
    pass

determine_num_available_blocks

determine_num_available_blocks() -> Tuple[int, int]

Profiles the peak memory usage of the model to determine how many KV blocks may be allocated without OOMs.

The engine will first conduct a profiling of the existing memory usage. Then, it calculate the maximum possible number of GPU and CPU blocks that can be allocated with the remaining free memory.

Tip

You may limit the usage of GPU memory by adjusting the gpu_memory_utilization parameter.

Source code in vllm/worker/xpu_worker.py
@torch.inference_mode()
def determine_num_available_blocks(self) -> Tuple[int, int]:
    """Profiles the peak memory usage of the model to determine how many
    KV blocks may be allocated without OOMs.

    The engine will first conduct a profiling of the existing memory usage.
    Then, it calculate the maximum possible number of GPU and CPU blocks
    that can be allocated with the remaining free memory.

    Tip:
        You may limit the usage of GPU memory
        by adjusting the `gpu_memory_utilization` parameter.
    """
    # Profile the memory usage of the model and get the maximum number of
    # cache blocks that can be allocated with the remaining free memory.
    torch.xpu.empty_cache()

    # Execute a forward pass with dummy inputs to profile the memory usage
    # of the model.
    self.model_runner.profile_run()

    # Calculate the number of blocks that can be allocated with the
    # profiled peak memory.
    torch.xpu.synchronize()
    used_memory = torch.xpu.memory_allocated()
    total_gpu_memory = torch.xpu.get_device_properties(
        self.local_rank).total_memory
    free_gpu_memory = total_gpu_memory - used_memory

    # NOTE(woosuk): Here we assume that the other processes using the same
    # GPU did not change their memory usage during the profiling.
    peak_memory = self.init_gpu_memory - free_gpu_memory
    assert peak_memory > 0, (
        "Error in memory profiling. "
        f"Initial free memory {self.init_gpu_memory}, current free memory"
        f" {free_gpu_memory}. This happens when the GPU memory was "
        "not properly cleaned up before initializing the vLLM instance.")

    cache_block_size = self.get_cache_block_size_bytes()
    num_gpu_blocks = int(
        (total_gpu_memory * self.cache_config.gpu_memory_utilization -
         peak_memory) // cache_block_size)
    num_cpu_blocks = int(self.cache_config.swap_space_bytes //
                         cache_block_size)
    num_gpu_blocks = max(num_gpu_blocks, 0)
    num_cpu_blocks = max(num_cpu_blocks, 0)
    gc.collect()
    torch.xpu.empty_cache()
    return num_gpu_blocks, num_cpu_blocks

init_device

init_device() -> None
Source code in vllm/worker/xpu_worker.py
def init_device(self) -> None:
    if self.device_config.device.type == "xpu" and current_platform.is_xpu(
    ):
        self.device = torch.device(f"xpu:{self.local_rank}")
        torch.xpu.set_device(self.device)
        torch.xpu.empty_cache()
        self.init_gpu_memory = torch.xpu.get_device_properties(
            self.local_rank).total_memory
    else:
        raise RuntimeError(
            f"Not support device type: {self.device_config.device}")
    # Initialize the distributed environment.
    self.init_worker_distributed_environment()
    # Initialize the model.
    set_random_seed(self.model_config.seed)

init_worker_distributed_environment

init_worker_distributed_environment() -> None

Initialize the distributed environment.

Source code in vllm/worker/xpu_worker.py
def init_worker_distributed_environment(self) -> None:
    """Initialize the distributed environment."""

    parallel_config = self.parallel_config
    rank = self.rank
    distributed_init_method = self.distributed_init_method

    if torch.distributed.is_initialized():
        torch_world_size = torch.distributed.get_world_size()
        if torch_world_size != parallel_config.world_size:
            raise RuntimeError(
                "torch.distributed is already initialized but the torch "
                "world size does not match parallel_config.world_size "
                f"({torch_world_size} vs. {parallel_config.world_size}).")
    elif not distributed_init_method:
        raise ValueError(
            "distributed_init_method must be set if torch.distributed "
            "is not already initialized")
    else:
        # use sockets as default Level zero IPC exchange backend. By
        # default oneccl will use `drmfd` as mechanism which need extra
        # dependency (libdrm and drm headers) on your system.
        ENV_CCL_ATL_TRANSPORT = os.getenv("CCL_ATL_TRANSPORT", "ofi")
        ENV_LOCAL_WORLD_SIZE = os.getenv("LOCAL_WORLD_SIZE",
                                         str(parallel_config.world_size))
        os.environ["CCL_ATL_TRANSPORT"] = ENV_CCL_ATL_TRANSPORT
        os.environ["LOCAL_WORLD_SIZE"] = ENV_LOCAL_WORLD_SIZE
        os.environ["LOCAL_RANK"] = str(self.local_rank)
        init_distributed_environment(
            world_size=parallel_config.world_size,
            rank=rank,
            distributed_init_method=distributed_init_method,
            local_rank=self.local_rank,
            backend="ccl")

    ensure_model_parallel_initialized(
        parallel_config.tensor_parallel_size,
        parallel_config.pipeline_parallel_size)
    # global all_reduce needed for overall oneccl warm up
    torch.distributed.all_reduce(torch.zeros(1).xpu())

    if parallel_config.pipeline_parallel_size > 1:
        # Add pp group init to avoid
        # p2p communication as the first call
        get_pp_group().all_reduce(torch.zeros(1).xpu())