Skip to content

vllm.v1.worker.gpu_worker

A GPU worker class.

logger module-attribute

logger = init_logger(__name__)

Worker

Bases: WorkerBase

Source code in vllm/v1/worker/gpu_worker.py
class Worker(WorkerBase):

    def __init__(
        self,
        vllm_config: VllmConfig,
        local_rank: int,
        rank: int,
        distributed_init_method: str,
        is_driver_worker: bool = False,
    ):

        super().__init__(vllm_config=vllm_config,
                         local_rank=local_rank,
                         rank=rank,
                         distributed_init_method=distributed_init_method,
                         is_driver_worker=is_driver_worker)

        if self.model_config.trust_remote_code:
            # note: lazy import to avoid importing torch before initializing
            from vllm.utils import init_cached_hf_modules
            init_cached_hf_modules()

        # Buffers saved before sleep
        self._sleep_saved_buffers: dict[str, torch.Tensor] = {}

        # Torch profiler. Enabled and configured through env vars:
        # VLLM_TORCH_PROFILER_DIR=/path/to/save/trace
        if envs.VLLM_TORCH_PROFILER_DIR:
            torch_profiler_trace_dir = envs.VLLM_TORCH_PROFILER_DIR
            logger.info("Profiling enabled. Traces will be saved to: %s",
                        torch_profiler_trace_dir)
            self.profiler = torch.profiler.profile(
                activities=[
                    torch.profiler.ProfilerActivity.CPU,
                    torch.profiler.ProfilerActivity.CUDA,
                ],
                with_stack=True,
                on_trace_ready=torch.profiler.tensorboard_trace_handler(
                    torch_profiler_trace_dir, use_gzip=True))
        else:
            self.profiler = None

    def sleep(self, level: int = 1) -> None:
        free_bytes_before_sleep = torch.cuda.mem_get_info()[0]

        # Save the buffers before level 2 sleep
        if level == 2:
            model = self.model_runner.model
            self._sleep_saved_buffers = {
                name: buffer.cpu().clone()
                for name, buffer in model.named_buffers()
            }

        allocator = CuMemAllocator.get_instance()
        allocator.sleep(offload_tags=("weights", ) if level == 1 else tuple())
        free_bytes_after_sleep, total = torch.cuda.mem_get_info()
        freed_bytes = free_bytes_after_sleep - free_bytes_before_sleep
        used_bytes = total - free_bytes_after_sleep
        assert freed_bytes >= 0, "Memory usage increased after sleeping."
        logger.info(
            "Sleep mode freed %.2f GiB memory, "
            "%.2f GiB memory is still in use.", freed_bytes / GiB_bytes,
            used_bytes / GiB_bytes)

    def wake_up(self, tags: Optional[list[str]] = None) -> None:
        allocator = CuMemAllocator.get_instance()
        allocator.wake_up(tags)

        # Restore the buffers after level 2 sleep
        if len(self._sleep_saved_buffers):
            model = self.model_runner.model
            for name, buffer in model.named_buffers():
                if name in self._sleep_saved_buffers:
                    buffer.data.copy_(self._sleep_saved_buffers[name].data)
            self._sleep_saved_buffers = {}

    def initialize_cache(self, num_gpu_blocks: int,
                         num_cpu_blocks: int) -> None:
        self.cache_config.num_gpu_blocks = num_gpu_blocks
        self.cache_config.num_cpu_blocks = num_cpu_blocks

    def init_device(self):
        if self.device_config.device.type == "cuda":
            # torch.distributed.all_reduce does not free the input tensor until
            # the synchronization point. This causes the memory usage to grow
            # as the number of all_reduce calls increases. This env var disables
            # this behavior.
            # Related issue:
            # https://discuss.pytorch.org/t/cuda-allocation-lifetime-for-inputs-to-distributed-all-reduce/191573
            os.environ["TORCH_NCCL_AVOID_RECORD_STREAMS"] = "1"

            # This env var set by Ray causes exceptions with graph building.
            os.environ.pop("NCCL_ASYNC_ERROR_HANDLING", None)
            self.device = torch.device(f"cuda:{self.local_rank}")
            torch.cuda.set_device(self.device)

            _check_if_gpu_supports_dtype(self.model_config.dtype)
            gc.collect()
            torch.cuda.empty_cache()

            # take current memory snapshot
            self.init_snapshot = MemorySnapshot()
            self.requested_memory = (self.init_snapshot.total_memory *
                                     self.cache_config.gpu_memory_utilization)
            if self.init_snapshot.free_memory < self.requested_memory:
                GiB = lambda b: round(b / GiB_bytes, 2)
                raise ValueError(
                    f"Free memory on device "
                    f"({GiB(self.init_snapshot.free_memory)}/"
                    f"{GiB(self.init_snapshot.total_memory)} GiB) on startup "
                    f"is less than desired GPU memory utilization "
                    f"({self.cache_config.gpu_memory_utilization}, "
                    f"{GiB(self.requested_memory)} GiB). Decrease GPU memory "
                    f"utilization or reduce GPU memory used by other processes."
                )
        else:
            raise RuntimeError(
                f"Not support device type: {self.device_config.device}")
        # Initialize the distributed environment.
        init_worker_distributed_environment(self.vllm_config, self.rank,
                                            self.distributed_init_method,
                                            self.local_rank)
        # Set random seed.
        set_random_seed(self.model_config.seed)

        # Construct the model runner
        self.model_runner: GPUModelRunner = GPUModelRunner(
            self.vllm_config, self.device)

        if self.rank == 0:
            # If usage stat is enabled, collect relevant info.
            report_usage_stats(self.vllm_config)

    # FIXME(youkaichao & ywang96): Use TorchDispatchMode instead of memory pool
    # to hijack tensor allocation.
    def load_model(self) -> None:
        if self.vllm_config.model_config.enable_sleep_mode:
            allocator = CuMemAllocator.get_instance()
            assert allocator.get_current_usage() == 0, (
                "Sleep mode can only be "
                "used for one instance per process.")
            context = allocator.use_memory_pool(tag="weights")
        else:
            from contextlib import nullcontext
            context = nullcontext()
        with context:
            self.model_runner.load_model()

    @torch.inference_mode()
    def determine_available_memory(self) -> int:
        """Profiles the peak memory usage of the model to determine how much 
        memory can be used for KV cache without OOMs.

        The engine will first conduct a profiling of the existing memory usage.
        Then, it calculate the free memory that can be used for KV cache in
        bytes.

        Tip:
            You may limit the usage of GPU memory
            by adjusting the `gpu_memory_utilization` parameter.
        """
        torch.cuda.empty_cache()
        torch.cuda.reset_peak_memory_stats()
        GiB = lambda b: b / GiB_bytes

        # Execute a forward pass with dummy inputs to profile the memory usage
        # of the model.
        with memory_profiling(
                self.init_snapshot,
                weights_memory=int(
                    self.model_runner.model_memory_usage)) as profile_result:
            self.model_runner.profile_run()

        free_gpu_memory = profile_result.after_profile.free_memory
        # NOTE(woosuk): Here we assume that the other processes using the same
        # GPU did not change their memory usage during the profiling.
        assert self.init_snapshot.free_memory > free_gpu_memory, (
            "Error in memory profiling. "
            f"Initial free memory {GiB(self.init_snapshot.free_memory)} GiB, "
            f"current free memory {GiB(free_gpu_memory)} GiB. "
            "This happens when other processes sharing the same container "
            "release GPU memory while vLLM is profiling during initialization. "
            "To fix this, ensure consistent GPU memory allocation or "
            "isolate vLLM in its own container.")
        available_kv_cache_memory = self.requested_memory \
            - profile_result.non_kv_cache_memory

        logger.debug(
            "Initial free memory: %.2f GiB, free memory: %.2f GiB, "
            "requested GPU memory: %.2f GiB",
            GiB(self.init_snapshot.free_memory), GiB(free_gpu_memory),
            GiB(self.requested_memory))
        logger.debug(profile_result)
        logger.info("Available KV cache memory: %.2f GiB",
                    GiB(available_kv_cache_memory))
        gc.collect()

        return int(available_kv_cache_memory)

    def get_kv_cache_spec(self) -> dict[str, KVCacheSpec]:
        return self.model_runner.get_kv_cache_spec()

    def initialize_from_config(self, kv_cache_config: KVCacheConfig) -> None:
        """Allocate GPU KV cache with the specified kv_cache_config."""
        if self.vllm_config.model_config.enable_sleep_mode:
            allocator = CuMemAllocator.get_instance()
            context = allocator.use_memory_pool(tag="kv_cache")
        else:
            from contextlib import nullcontext
            context = nullcontext()
        with context:
            self.model_runner.initialize_kv_cache(kv_cache_config)

    def compile_or_warm_up_model(self) -> None:
        # warm up sizes that are not in cudagraph capture sizes,
        # but users still want to compile for better performance,
        # e.g. for the max-num-batched token size in chunked prefill.
        warmup_sizes = self.vllm_config.compilation_config.compile_sizes.copy()
        if not self.model_config.enforce_eager:
            warmup_sizes = [
                x for x in warmup_sizes if x not in
                self.vllm_config.compilation_config.cudagraph_capture_sizes
            ]
        # We skip EPLB here since we don't want to record dummy metrics
        for size in sorted(warmup_sizes, reverse=True):
            logger.info("Compile and warming up model for size %d", size)
            self.model_runner._dummy_run(size, skip_eplb=True)
        if not self.model_config.enforce_eager:
            self.model_runner.capture_model()

        # Warm up sampler and preallocate memory buffer for logits and other
        # sampling related tensors of max possible shape to avoid memory
        # fragmentation issue.
        # NOTE: This is called after `capture_model` on purpose to prevent
        # memory buffers from being cleared by `torch.cuda.empty_cache`.
        if get_pp_group().is_last_rank:
            max_num_reqs = min(self.scheduler_config.max_num_seqs,
                               self.scheduler_config.max_num_batched_tokens)

            # We skip EPLB here since we don't want to record dummy metrics
            hidden_states, last_hidden_states = \
                self.model_runner._dummy_run(
                    num_tokens=max_num_reqs,
                    skip_eplb=True,
                )
            if self.model_runner.is_pooling_model:
                self.model_runner._dummy_pooler_run(hidden_states)
            else:
                self.model_runner._dummy_sampler_run(
                    hidden_states=last_hidden_states)

        # Reset the seed to ensure that the random state is not affected by
        # the model initialization and profiling.
        set_random_seed(self.model_config.seed)

    def get_model(self) -> nn.Module:
        return self.model_runner.get_model()

    @torch.inference_mode()
    def execute_model(
        self,
        scheduler_output: "SchedulerOutput",
    ) -> Optional[ModelRunnerOutput]:
        intermediate_tensors = None
        if not get_pp_group().is_first_rank:
            intermediate_tensors = IntermediateTensors(
                get_pp_group().recv_tensor_dict(
                    all_gather_group=get_tp_group()))

        output = self.model_runner.execute_model(scheduler_output,
                                                 intermediate_tensors)
        parallel_config = self.vllm_config.parallel_config
        if parallel_config.distributed_executor_backend != "external_launcher" \
            and not get_pp_group().is_last_rank:
            assert isinstance(output, IntermediateTensors)
            get_pp_group().send_tensor_dict(output.tensors,
                                            all_gather_group=get_tp_group())
            return None
        assert isinstance(output, ModelRunnerOutput)
        return output if self.is_driver_worker else None

    def profile(self, is_start: bool = True):
        if self.profiler is None:
            raise RuntimeError("Profiler is not enabled.")
        if is_start:
            self.profiler.start()
        else:
            self.profiler.stop()
            print(self.profiler.key_averages().table(
                sort_by="self_cuda_time_total"))

    def execute_dummy_batch(self) -> None:
        self.model_runner._dummy_run(1)

    def add_lora(self, lora_request: LoRARequest) -> bool:
        return self.model_runner.add_lora(lora_request)

    def remove_lora(self, lora_id: int) -> bool:
        return self.model_runner.remove_lora(lora_id)

    def list_loras(self) -> set[int]:
        return self.model_runner.list_loras()

    def pin_lora(self, lora_id: int) -> bool:
        return self.model_runner.pin_lora(lora_id)

    def check_health(self) -> None:
        # worker will always be healthy as long as it's running.
        return

    def save_sharded_state(
        self,
        path: str,
        pattern: Optional[str] = None,
        max_size: Optional[int] = None,
    ) -> None:
        from vllm.model_executor.model_loader import ShardedStateLoader
        ShardedStateLoader.save_model(
            self.model_runner.model,
            path,
            pattern=pattern,
            max_size=max_size,
        )

    def save_tensorized_model(
        self,
        tensorizer_config: "TensorizerConfig",
    ) -> None:
        self.model_runner.save_tensorized_model(
            tensorizer_config=tensorizer_config, )

_sleep_saved_buffers instance-attribute

_sleep_saved_buffers: dict[str, Tensor] = {}

profiler instance-attribute

profiler = profile(
    activities=[CPU, CUDA],
    with_stack=True,
    on_trace_ready=tensorboard_trace_handler(
        torch_profiler_trace_dir, use_gzip=True
    ),
)

__init__

__init__(
    vllm_config: VllmConfig,
    local_rank: int,
    rank: int,
    distributed_init_method: str,
    is_driver_worker: bool = False,
)
Source code in vllm/v1/worker/gpu_worker.py
def __init__(
    self,
    vllm_config: VllmConfig,
    local_rank: int,
    rank: int,
    distributed_init_method: str,
    is_driver_worker: bool = False,
):

    super().__init__(vllm_config=vllm_config,
                     local_rank=local_rank,
                     rank=rank,
                     distributed_init_method=distributed_init_method,
                     is_driver_worker=is_driver_worker)

    if self.model_config.trust_remote_code:
        # note: lazy import to avoid importing torch before initializing
        from vllm.utils import init_cached_hf_modules
        init_cached_hf_modules()

    # Buffers saved before sleep
    self._sleep_saved_buffers: dict[str, torch.Tensor] = {}

    # Torch profiler. Enabled and configured through env vars:
    # VLLM_TORCH_PROFILER_DIR=/path/to/save/trace
    if envs.VLLM_TORCH_PROFILER_DIR:
        torch_profiler_trace_dir = envs.VLLM_TORCH_PROFILER_DIR
        logger.info("Profiling enabled. Traces will be saved to: %s",
                    torch_profiler_trace_dir)
        self.profiler = torch.profiler.profile(
            activities=[
                torch.profiler.ProfilerActivity.CPU,
                torch.profiler.ProfilerActivity.CUDA,
            ],
            with_stack=True,
            on_trace_ready=torch.profiler.tensorboard_trace_handler(
                torch_profiler_trace_dir, use_gzip=True))
    else:
        self.profiler = None

add_lora

add_lora(lora_request: LoRARequest) -> bool
Source code in vllm/v1/worker/gpu_worker.py
def add_lora(self, lora_request: LoRARequest) -> bool:
    return self.model_runner.add_lora(lora_request)

check_health

check_health() -> None
Source code in vllm/v1/worker/gpu_worker.py
def check_health(self) -> None:
    # worker will always be healthy as long as it's running.
    return

compile_or_warm_up_model

compile_or_warm_up_model() -> None
Source code in vllm/v1/worker/gpu_worker.py
def compile_or_warm_up_model(self) -> None:
    # warm up sizes that are not in cudagraph capture sizes,
    # but users still want to compile for better performance,
    # e.g. for the max-num-batched token size in chunked prefill.
    warmup_sizes = self.vllm_config.compilation_config.compile_sizes.copy()
    if not self.model_config.enforce_eager:
        warmup_sizes = [
            x for x in warmup_sizes if x not in
            self.vllm_config.compilation_config.cudagraph_capture_sizes
        ]
    # We skip EPLB here since we don't want to record dummy metrics
    for size in sorted(warmup_sizes, reverse=True):
        logger.info("Compile and warming up model for size %d", size)
        self.model_runner._dummy_run(size, skip_eplb=True)
    if not self.model_config.enforce_eager:
        self.model_runner.capture_model()

    # Warm up sampler and preallocate memory buffer for logits and other
    # sampling related tensors of max possible shape to avoid memory
    # fragmentation issue.
    # NOTE: This is called after `capture_model` on purpose to prevent
    # memory buffers from being cleared by `torch.cuda.empty_cache`.
    if get_pp_group().is_last_rank:
        max_num_reqs = min(self.scheduler_config.max_num_seqs,
                           self.scheduler_config.max_num_batched_tokens)

        # We skip EPLB here since we don't want to record dummy metrics
        hidden_states, last_hidden_states = \
            self.model_runner._dummy_run(
                num_tokens=max_num_reqs,
                skip_eplb=True,
            )
        if self.model_runner.is_pooling_model:
            self.model_runner._dummy_pooler_run(hidden_states)
        else:
            self.model_runner._dummy_sampler_run(
                hidden_states=last_hidden_states)

    # Reset the seed to ensure that the random state is not affected by
    # the model initialization and profiling.
    set_random_seed(self.model_config.seed)

determine_available_memory

determine_available_memory() -> int

Profiles the peak memory usage of the model to determine how much memory can be used for KV cache without OOMs.

The engine will first conduct a profiling of the existing memory usage. Then, it calculate the free memory that can be used for KV cache in bytes.

Tip

You may limit the usage of GPU memory by adjusting the gpu_memory_utilization parameter.

Source code in vllm/v1/worker/gpu_worker.py
@torch.inference_mode()
def determine_available_memory(self) -> int:
    """Profiles the peak memory usage of the model to determine how much 
    memory can be used for KV cache without OOMs.

    The engine will first conduct a profiling of the existing memory usage.
    Then, it calculate the free memory that can be used for KV cache in
    bytes.

    Tip:
        You may limit the usage of GPU memory
        by adjusting the `gpu_memory_utilization` parameter.
    """
    torch.cuda.empty_cache()
    torch.cuda.reset_peak_memory_stats()
    GiB = lambda b: b / GiB_bytes

    # Execute a forward pass with dummy inputs to profile the memory usage
    # of the model.
    with memory_profiling(
            self.init_snapshot,
            weights_memory=int(
                self.model_runner.model_memory_usage)) as profile_result:
        self.model_runner.profile_run()

    free_gpu_memory = profile_result.after_profile.free_memory
    # NOTE(woosuk): Here we assume that the other processes using the same
    # GPU did not change their memory usage during the profiling.
    assert self.init_snapshot.free_memory > free_gpu_memory, (
        "Error in memory profiling. "
        f"Initial free memory {GiB(self.init_snapshot.free_memory)} GiB, "
        f"current free memory {GiB(free_gpu_memory)} GiB. "
        "This happens when other processes sharing the same container "
        "release GPU memory while vLLM is profiling during initialization. "
        "To fix this, ensure consistent GPU memory allocation or "
        "isolate vLLM in its own container.")
    available_kv_cache_memory = self.requested_memory \
        - profile_result.non_kv_cache_memory

    logger.debug(
        "Initial free memory: %.2f GiB, free memory: %.2f GiB, "
        "requested GPU memory: %.2f GiB",
        GiB(self.init_snapshot.free_memory), GiB(free_gpu_memory),
        GiB(self.requested_memory))
    logger.debug(profile_result)
    logger.info("Available KV cache memory: %.2f GiB",
                GiB(available_kv_cache_memory))
    gc.collect()

    return int(available_kv_cache_memory)

execute_dummy_batch

execute_dummy_batch() -> None
Source code in vllm/v1/worker/gpu_worker.py
def execute_dummy_batch(self) -> None:
    self.model_runner._dummy_run(1)

execute_model

execute_model(
    scheduler_output: SchedulerOutput,
) -> Optional[ModelRunnerOutput]
Source code in vllm/v1/worker/gpu_worker.py
@torch.inference_mode()
def execute_model(
    self,
    scheduler_output: "SchedulerOutput",
) -> Optional[ModelRunnerOutput]:
    intermediate_tensors = None
    if not get_pp_group().is_first_rank:
        intermediate_tensors = IntermediateTensors(
            get_pp_group().recv_tensor_dict(
                all_gather_group=get_tp_group()))

    output = self.model_runner.execute_model(scheduler_output,
                                             intermediate_tensors)
    parallel_config = self.vllm_config.parallel_config
    if parallel_config.distributed_executor_backend != "external_launcher" \
        and not get_pp_group().is_last_rank:
        assert isinstance(output, IntermediateTensors)
        get_pp_group().send_tensor_dict(output.tensors,
                                        all_gather_group=get_tp_group())
        return None
    assert isinstance(output, ModelRunnerOutput)
    return output if self.is_driver_worker else None

get_kv_cache_spec

get_kv_cache_spec() -> dict[str, KVCacheSpec]
Source code in vllm/v1/worker/gpu_worker.py
def get_kv_cache_spec(self) -> dict[str, KVCacheSpec]:
    return self.model_runner.get_kv_cache_spec()

get_model

get_model() -> Module
Source code in vllm/v1/worker/gpu_worker.py
def get_model(self) -> nn.Module:
    return self.model_runner.get_model()

init_device

init_device()
Source code in vllm/v1/worker/gpu_worker.py
def init_device(self):
    if self.device_config.device.type == "cuda":
        # torch.distributed.all_reduce does not free the input tensor until
        # the synchronization point. This causes the memory usage to grow
        # as the number of all_reduce calls increases. This env var disables
        # this behavior.
        # Related issue:
        # https://discuss.pytorch.org/t/cuda-allocation-lifetime-for-inputs-to-distributed-all-reduce/191573
        os.environ["TORCH_NCCL_AVOID_RECORD_STREAMS"] = "1"

        # This env var set by Ray causes exceptions with graph building.
        os.environ.pop("NCCL_ASYNC_ERROR_HANDLING", None)
        self.device = torch.device(f"cuda:{self.local_rank}")
        torch.cuda.set_device(self.device)

        _check_if_gpu_supports_dtype(self.model_config.dtype)
        gc.collect()
        torch.cuda.empty_cache()

        # take current memory snapshot
        self.init_snapshot = MemorySnapshot()
        self.requested_memory = (self.init_snapshot.total_memory *
                                 self.cache_config.gpu_memory_utilization)
        if self.init_snapshot.free_memory < self.requested_memory:
            GiB = lambda b: round(b / GiB_bytes, 2)
            raise ValueError(
                f"Free memory on device "
                f"({GiB(self.init_snapshot.free_memory)}/"
                f"{GiB(self.init_snapshot.total_memory)} GiB) on startup "
                f"is less than desired GPU memory utilization "
                f"({self.cache_config.gpu_memory_utilization}, "
                f"{GiB(self.requested_memory)} GiB). Decrease GPU memory "
                f"utilization or reduce GPU memory used by other processes."
            )
    else:
        raise RuntimeError(
            f"Not support device type: {self.device_config.device}")
    # Initialize the distributed environment.
    init_worker_distributed_environment(self.vllm_config, self.rank,
                                        self.distributed_init_method,
                                        self.local_rank)
    # Set random seed.
    set_random_seed(self.model_config.seed)

    # Construct the model runner
    self.model_runner: GPUModelRunner = GPUModelRunner(
        self.vllm_config, self.device)

    if self.rank == 0:
        # If usage stat is enabled, collect relevant info.
        report_usage_stats(self.vllm_config)

initialize_cache

initialize_cache(
    num_gpu_blocks: int, num_cpu_blocks: int
) -> None
Source code in vllm/v1/worker/gpu_worker.py
def initialize_cache(self, num_gpu_blocks: int,
                     num_cpu_blocks: int) -> None:
    self.cache_config.num_gpu_blocks = num_gpu_blocks
    self.cache_config.num_cpu_blocks = num_cpu_blocks

initialize_from_config

initialize_from_config(
    kv_cache_config: KVCacheConfig,
) -> None

Allocate GPU KV cache with the specified kv_cache_config.

Source code in vllm/v1/worker/gpu_worker.py
def initialize_from_config(self, kv_cache_config: KVCacheConfig) -> None:
    """Allocate GPU KV cache with the specified kv_cache_config."""
    if self.vllm_config.model_config.enable_sleep_mode:
        allocator = CuMemAllocator.get_instance()
        context = allocator.use_memory_pool(tag="kv_cache")
    else:
        from contextlib import nullcontext
        context = nullcontext()
    with context:
        self.model_runner.initialize_kv_cache(kv_cache_config)

list_loras

list_loras() -> set[int]
Source code in vllm/v1/worker/gpu_worker.py
def list_loras(self) -> set[int]:
    return self.model_runner.list_loras()

load_model

load_model() -> None
Source code in vllm/v1/worker/gpu_worker.py
def load_model(self) -> None:
    if self.vllm_config.model_config.enable_sleep_mode:
        allocator = CuMemAllocator.get_instance()
        assert allocator.get_current_usage() == 0, (
            "Sleep mode can only be "
            "used for one instance per process.")
        context = allocator.use_memory_pool(tag="weights")
    else:
        from contextlib import nullcontext
        context = nullcontext()
    with context:
        self.model_runner.load_model()

pin_lora

pin_lora(lora_id: int) -> bool
Source code in vllm/v1/worker/gpu_worker.py
def pin_lora(self, lora_id: int) -> bool:
    return self.model_runner.pin_lora(lora_id)

profile

profile(is_start: bool = True)
Source code in vllm/v1/worker/gpu_worker.py
def profile(self, is_start: bool = True):
    if self.profiler is None:
        raise RuntimeError("Profiler is not enabled.")
    if is_start:
        self.profiler.start()
    else:
        self.profiler.stop()
        print(self.profiler.key_averages().table(
            sort_by="self_cuda_time_total"))

remove_lora

remove_lora(lora_id: int) -> bool
Source code in vllm/v1/worker/gpu_worker.py
def remove_lora(self, lora_id: int) -> bool:
    return self.model_runner.remove_lora(lora_id)

save_sharded_state

save_sharded_state(
    path: str,
    pattern: Optional[str] = None,
    max_size: Optional[int] = None,
) -> None
Source code in vllm/v1/worker/gpu_worker.py
def save_sharded_state(
    self,
    path: str,
    pattern: Optional[str] = None,
    max_size: Optional[int] = None,
) -> None:
    from vllm.model_executor.model_loader import ShardedStateLoader
    ShardedStateLoader.save_model(
        self.model_runner.model,
        path,
        pattern=pattern,
        max_size=max_size,
    )

save_tensorized_model

save_tensorized_model(
    tensorizer_config: TensorizerConfig,
) -> None
Source code in vllm/v1/worker/gpu_worker.py
def save_tensorized_model(
    self,
    tensorizer_config: "TensorizerConfig",
) -> None:
    self.model_runner.save_tensorized_model(
        tensorizer_config=tensorizer_config, )

sleep

sleep(level: int = 1) -> None
Source code in vllm/v1/worker/gpu_worker.py
def sleep(self, level: int = 1) -> None:
    free_bytes_before_sleep = torch.cuda.mem_get_info()[0]

    # Save the buffers before level 2 sleep
    if level == 2:
        model = self.model_runner.model
        self._sleep_saved_buffers = {
            name: buffer.cpu().clone()
            for name, buffer in model.named_buffers()
        }

    allocator = CuMemAllocator.get_instance()
    allocator.sleep(offload_tags=("weights", ) if level == 1 else tuple())
    free_bytes_after_sleep, total = torch.cuda.mem_get_info()
    freed_bytes = free_bytes_after_sleep - free_bytes_before_sleep
    used_bytes = total - free_bytes_after_sleep
    assert freed_bytes >= 0, "Memory usage increased after sleeping."
    logger.info(
        "Sleep mode freed %.2f GiB memory, "
        "%.2f GiB memory is still in use.", freed_bytes / GiB_bytes,
        used_bytes / GiB_bytes)

wake_up

wake_up(tags: Optional[list[str]] = None) -> None
Source code in vllm/v1/worker/gpu_worker.py
def wake_up(self, tags: Optional[list[str]] = None) -> None:
    allocator = CuMemAllocator.get_instance()
    allocator.wake_up(tags)

    # Restore the buffers after level 2 sleep
    if len(self._sleep_saved_buffers):
        model = self.model_runner.model
        for name, buffer in model.named_buffers():
            if name in self._sleep_saved_buffers:
                buffer.data.copy_(self._sleep_saved_buffers[name].data)
        self._sleep_saved_buffers = {}

_check_if_gpu_supports_dtype

_check_if_gpu_supports_dtype(torch_dtype: dtype)
Source code in vllm/v1/worker/gpu_worker.py
def _check_if_gpu_supports_dtype(torch_dtype: torch.dtype):
    # Check if the GPU supports the dtype.
    if torch_dtype == torch.bfloat16:  # noqa: SIM102
        if not current_platform.has_device_capability(80):
            capability = current_platform.get_device_capability()
            gpu_name = current_platform.get_device_name()

            if capability is None:
                compute_str = "does not have a compute capability"
            else:
                version_str = capability.as_version_str()
                compute_str = f"has compute capability {version_str}"

            raise ValueError(
                "Bfloat16 is only supported on GPUs with compute capability "
                f"of at least 8.0. Your {gpu_name} GPU {compute_str}. "
                "You can use float16 instead by explicitly setting the "
                "`dtype` flag in CLI, for example: --dtype=half.")

init_worker_distributed_environment

init_worker_distributed_environment(
    vllm_config: VllmConfig,
    rank: int,
    distributed_init_method: Optional[str] = None,
    local_rank: int = -1,
    backend: str = "nccl",
) -> None

Initialize the distributed environment.

Source code in vllm/v1/worker/gpu_worker.py
def init_worker_distributed_environment(
    vllm_config: VllmConfig,
    rank: int,
    distributed_init_method: Optional[str] = None,
    local_rank: int = -1,
    backend: str = "nccl",
) -> None:
    """Initialize the distributed environment."""
    parallel_config = vllm_config.parallel_config
    set_custom_all_reduce(not parallel_config.disable_custom_all_reduce)

    init_distributed_environment(parallel_config.world_size, rank,
                                 distributed_init_method, local_rank, backend)

    ensure_model_parallel_initialized(parallel_config.tensor_parallel_size,
                                      parallel_config.pipeline_parallel_size)

    ensure_kv_transfer_initialized(vllm_config)