Skip to content

vllm.v1.worker.xpu_worker

logger module-attribute

logger = init_logger(__name__)

XPUWorker

Bases: Worker

A XPU worker class.

Source code in vllm/v1/worker/xpu_worker.py
class XPUWorker(Worker):
    """A XPU worker class."""

    def __init__(
        self,
        vllm_config: VllmConfig,
        local_rank: int,
        rank: int,
        distributed_init_method: str,
        is_driver_worker: bool = False,
    ):
        super().__init__(vllm_config, local_rank, rank,
                         distributed_init_method, is_driver_worker)
        device_config = self.device_config
        assert device_config.device_type == "xpu"
        assert current_platform.is_xpu()

        # Torch profiler. Enabled and configured through env vars:
        # VLLM_TORCH_PROFILER_DIR=/path/to/save/trace
        if envs.VLLM_TORCH_PROFILER_DIR:
            torch_profiler_trace_dir = envs.VLLM_TORCH_PROFILER_DIR
            logger.info("Profiling enabled. Traces will be saved to: %s",
                        torch_profiler_trace_dir)
            self.profiler = torch.profiler.profile(
                activities=[
                    torch.profiler.ProfilerActivity.CPU,
                    torch.profiler.ProfilerActivity.XPU,
                ],
                with_stack=True,
                on_trace_ready=torch.profiler.tensorboard_trace_handler(
                    torch_profiler_trace_dir, use_gzip=True))
        else:
            self.profiler = None

    # we provide this function due to `torch.xpu.mem_get_info()` doesn't
    # return correct free_gpu_memory on intel client GPU. We need to
    # calculate/estiamte it.
    def xpu_get_mem_info(self):
        if current_platform.is_data_center_gpu():
            return torch.xpu.mem_get_info()
        else:
            _, total_gpu_memory = torch.xpu.mem_get_info()
            # FIXME: memory_allocated() doesn't count non-torch allocations,
            # and we don't have any API to get it. so we mark it as 128MB.
            used_memory = torch.xpu.memory_allocated()
            non_torch_allocations = 128 * 1024 * 1024
            free_gpu_memory = total_gpu_memory - (used_memory +
                                                  non_torch_allocations)
            return free_gpu_memory, total_gpu_memory

    @torch.inference_mode()
    def determine_available_memory(self) -> int:
        """Profiles the peak memory usage of the model to determine how many
        KV blocks may be allocated without OOMs.
        The engine will first conduct a profiling of the existing memory usage.
        Then, it calculate the maximum possible number of GPU and CPU blocks
        that can be allocated with the remaining free memory.
        .. tip::
            You may limit the usage of GPU memory
            by adjusting the `gpu_memory_utilization` parameter.
        """
        # Profile the memory usage of the model and get the maximum number of
        # cache blocks that can be allocated with the remaining free memory.
        torch.xpu.empty_cache()
        torch.xpu.reset_peak_memory_stats()

        free_gpu_memory, total_gpu_memory = torch.xpu.mem_get_info()
        current_allocated_bytes = torch.xpu.memory_allocated()
        msg = ("Before memory profiling run, "
               f"total GPU memory: {total_gpu_memory / 1024**2:.2f} MB, "
               f"model load takes {current_allocated_bytes / 1024**2:.2f} MB, "
               f"free gpu memory is {free_gpu_memory / 1024**2:.2f} MB.")
        logger.info(msg)
        # Execute a forward pass with dummy inputs to profile the memory usage
        # of the model.
        self.model_runner.profile_run()

        free_gpu_memory, _ = self.xpu_get_mem_info()
        # NOTE(woosuk): Here we assume that the other processes using the same
        # GPU did not change their memory usage during the profiling.
        assert self.init_gpu_memory > free_gpu_memory, (
            "Error in memory profiling. "
            f"Initial free memory {self.init_gpu_memory}, current free memory"
            f" {free_gpu_memory}. This happens when the GPU memory was "
            "not properly cleaned up before initializing the vLLM instance.")

        # Get the peak memory allocation recorded by torch
        peak_memory = torch.xpu.memory_stats()["allocated_bytes.all.peak"]

        torch.xpu.empty_cache()
        torch_allocated_bytes = torch.xpu.memory_stats(
        )["allocated_bytes.all.current"]
        total_allocated_bytes = self.xpu_get_mem_info(
        )[1] - self.xpu_get_mem_info()[0]

        non_torch_allocations = total_allocated_bytes - torch_allocated_bytes
        if non_torch_allocations > 0:
            peak_memory += non_torch_allocations
        available_kv_cache_memory = (
            total_gpu_memory * self.cache_config.gpu_memory_utilization -
            peak_memory)

        msg = ("After memory profiling run, "
               f"peak memory usage is {peak_memory / 1024**2:.2f} MB,"
               f"torch mem is {torch_allocated_bytes / 1024**2:.2f} MB, "
               f"non-torch mem is {non_torch_allocations / 1024**2:.2f} MB, "
               f"free gpu memory is {free_gpu_memory / 1024**2:.2f} MB.")
        logger.info(msg)

        return int(available_kv_cache_memory)

    def init_device(self):
        if self.device_config.device.type == "xpu" and current_platform.is_xpu(
        ):
            self.device = torch.device(f"xpu:{self.local_rank}")
            torch.xpu.set_device(self.device)
            torch.xpu.empty_cache()
            self.init_gpu_memory = torch.xpu.get_device_properties(
                self.local_rank).total_memory
        else:
            raise RuntimeError(
                f"Not support device type: {self.device_config.device}")

        ENV_CCL_ZE_IPC_EXCHANGE = os.getenv("CCL_ZE_IPC_EXCHANGE", "drmfd")
        ENV_CCL_ATL_TRANSPORT = os.getenv("CCL_ATL_TRANSPORT", "ofi")
        ENV_LOCAL_WORLD_SIZE = os.getenv("LOCAL_WORLD_SIZE",
                                         str(self.parallel_config.world_size))
        os.environ["CCL_ZE_IPC_EXCHANGE"] = ENV_CCL_ZE_IPC_EXCHANGE
        os.environ["CCL_ATL_TRANSPORT"] = ENV_CCL_ATL_TRANSPORT
        os.environ["LOCAL_WORLD_SIZE"] = ENV_LOCAL_WORLD_SIZE
        os.environ["LOCAL_RANK"] = str(self.local_rank)
        dist_backend = "ccl"

        init_worker_distributed_environment(self.vllm_config, self.rank,
                                            self.distributed_init_method,
                                            self.local_rank, dist_backend)

        # global all_reduce needed for overall oneccl warm up
        torch.distributed.all_reduce(torch.zeros(1).xpu())

        # Set random seed.
        set_random_seed(self.model_config.seed)

        # Construct the model runner
        self.model_runner = XPUModelRunner(  # type: ignore
            self.vllm_config, self.device)

profiler instance-attribute

profiler = profile(
    activities=[CPU, XPU],
    with_stack=True,
    on_trace_ready=tensorboard_trace_handler(
        torch_profiler_trace_dir, use_gzip=True
    ),
)

__init__

__init__(
    vllm_config: VllmConfig,
    local_rank: int,
    rank: int,
    distributed_init_method: str,
    is_driver_worker: bool = False,
)
Source code in vllm/v1/worker/xpu_worker.py
def __init__(
    self,
    vllm_config: VllmConfig,
    local_rank: int,
    rank: int,
    distributed_init_method: str,
    is_driver_worker: bool = False,
):
    super().__init__(vllm_config, local_rank, rank,
                     distributed_init_method, is_driver_worker)
    device_config = self.device_config
    assert device_config.device_type == "xpu"
    assert current_platform.is_xpu()

    # Torch profiler. Enabled and configured through env vars:
    # VLLM_TORCH_PROFILER_DIR=/path/to/save/trace
    if envs.VLLM_TORCH_PROFILER_DIR:
        torch_profiler_trace_dir = envs.VLLM_TORCH_PROFILER_DIR
        logger.info("Profiling enabled. Traces will be saved to: %s",
                    torch_profiler_trace_dir)
        self.profiler = torch.profiler.profile(
            activities=[
                torch.profiler.ProfilerActivity.CPU,
                torch.profiler.ProfilerActivity.XPU,
            ],
            with_stack=True,
            on_trace_ready=torch.profiler.tensorboard_trace_handler(
                torch_profiler_trace_dir, use_gzip=True))
    else:
        self.profiler = None

determine_available_memory

determine_available_memory() -> int

Profiles the peak memory usage of the model to determine how many KV blocks may be allocated without OOMs. The engine will first conduct a profiling of the existing memory usage. Then, it calculate the maximum possible number of GPU and CPU blocks that can be allocated with the remaining free memory. .. tip:: You may limit the usage of GPU memory by adjusting the gpu_memory_utilization parameter.

Source code in vllm/v1/worker/xpu_worker.py
@torch.inference_mode()
def determine_available_memory(self) -> int:
    """Profiles the peak memory usage of the model to determine how many
    KV blocks may be allocated without OOMs.
    The engine will first conduct a profiling of the existing memory usage.
    Then, it calculate the maximum possible number of GPU and CPU blocks
    that can be allocated with the remaining free memory.
    .. tip::
        You may limit the usage of GPU memory
        by adjusting the `gpu_memory_utilization` parameter.
    """
    # Profile the memory usage of the model and get the maximum number of
    # cache blocks that can be allocated with the remaining free memory.
    torch.xpu.empty_cache()
    torch.xpu.reset_peak_memory_stats()

    free_gpu_memory, total_gpu_memory = torch.xpu.mem_get_info()
    current_allocated_bytes = torch.xpu.memory_allocated()
    msg = ("Before memory profiling run, "
           f"total GPU memory: {total_gpu_memory / 1024**2:.2f} MB, "
           f"model load takes {current_allocated_bytes / 1024**2:.2f} MB, "
           f"free gpu memory is {free_gpu_memory / 1024**2:.2f} MB.")
    logger.info(msg)
    # Execute a forward pass with dummy inputs to profile the memory usage
    # of the model.
    self.model_runner.profile_run()

    free_gpu_memory, _ = self.xpu_get_mem_info()
    # NOTE(woosuk): Here we assume that the other processes using the same
    # GPU did not change their memory usage during the profiling.
    assert self.init_gpu_memory > free_gpu_memory, (
        "Error in memory profiling. "
        f"Initial free memory {self.init_gpu_memory}, current free memory"
        f" {free_gpu_memory}. This happens when the GPU memory was "
        "not properly cleaned up before initializing the vLLM instance.")

    # Get the peak memory allocation recorded by torch
    peak_memory = torch.xpu.memory_stats()["allocated_bytes.all.peak"]

    torch.xpu.empty_cache()
    torch_allocated_bytes = torch.xpu.memory_stats(
    )["allocated_bytes.all.current"]
    total_allocated_bytes = self.xpu_get_mem_info(
    )[1] - self.xpu_get_mem_info()[0]

    non_torch_allocations = total_allocated_bytes - torch_allocated_bytes
    if non_torch_allocations > 0:
        peak_memory += non_torch_allocations
    available_kv_cache_memory = (
        total_gpu_memory * self.cache_config.gpu_memory_utilization -
        peak_memory)

    msg = ("After memory profiling run, "
           f"peak memory usage is {peak_memory / 1024**2:.2f} MB,"
           f"torch mem is {torch_allocated_bytes / 1024**2:.2f} MB, "
           f"non-torch mem is {non_torch_allocations / 1024**2:.2f} MB, "
           f"free gpu memory is {free_gpu_memory / 1024**2:.2f} MB.")
    logger.info(msg)

    return int(available_kv_cache_memory)

init_device

init_device()
Source code in vllm/v1/worker/xpu_worker.py
def init_device(self):
    if self.device_config.device.type == "xpu" and current_platform.is_xpu(
    ):
        self.device = torch.device(f"xpu:{self.local_rank}")
        torch.xpu.set_device(self.device)
        torch.xpu.empty_cache()
        self.init_gpu_memory = torch.xpu.get_device_properties(
            self.local_rank).total_memory
    else:
        raise RuntimeError(
            f"Not support device type: {self.device_config.device}")

    ENV_CCL_ZE_IPC_EXCHANGE = os.getenv("CCL_ZE_IPC_EXCHANGE", "drmfd")
    ENV_CCL_ATL_TRANSPORT = os.getenv("CCL_ATL_TRANSPORT", "ofi")
    ENV_LOCAL_WORLD_SIZE = os.getenv("LOCAL_WORLD_SIZE",
                                     str(self.parallel_config.world_size))
    os.environ["CCL_ZE_IPC_EXCHANGE"] = ENV_CCL_ZE_IPC_EXCHANGE
    os.environ["CCL_ATL_TRANSPORT"] = ENV_CCL_ATL_TRANSPORT
    os.environ["LOCAL_WORLD_SIZE"] = ENV_LOCAL_WORLD_SIZE
    os.environ["LOCAL_RANK"] = str(self.local_rank)
    dist_backend = "ccl"

    init_worker_distributed_environment(self.vllm_config, self.rank,
                                        self.distributed_init_method,
                                        self.local_rank, dist_backend)

    # global all_reduce needed for overall oneccl warm up
    torch.distributed.all_reduce(torch.zeros(1).xpu())

    # Set random seed.
    set_random_seed(self.model_config.seed)

    # Construct the model runner
    self.model_runner = XPUModelRunner(  # type: ignore
        self.vllm_config, self.device)

xpu_get_mem_info

xpu_get_mem_info()
Source code in vllm/v1/worker/xpu_worker.py
def xpu_get_mem_info(self):
    if current_platform.is_data_center_gpu():
        return torch.xpu.mem_get_info()
    else:
        _, total_gpu_memory = torch.xpu.mem_get_info()
        # FIXME: memory_allocated() doesn't count non-torch allocations,
        # and we don't have any API to get it. so we mark it as 128MB.
        used_memory = torch.xpu.memory_allocated()
        non_torch_allocations = 128 * 1024 * 1024
        free_gpu_memory = total_gpu_memory - (used_memory +
                                              non_torch_allocations)
        return free_gpu_memory, total_gpu_memory