Skip to content

vllm.worker.neuron_worker

A Neuron worker class.

logger module-attribute

logger = init_logger(__name__)

NeuronWorker

Bases: LocalOrDistributedWorkerBase

A worker class that executes the model on a group of neuron cores.

Source code in vllm/worker/neuron_worker.py
class NeuronWorker(LocalOrDistributedWorkerBase):
    """A worker class that executes the model on a group of neuron cores.
    """

    model_runner: NeuronModelRunner

    def __init__(self,
                 vllm_config: VllmConfig,
                 local_rank: int,
                 rank: int,
                 distributed_init_method: str,
                 is_driver_worker: bool = False) -> None:
        WorkerBase.__init__(self, vllm_config=vllm_config)
        self.local_rank = local_rank
        self.rank = rank
        self.distributed_init_method = distributed_init_method
        self.is_driver_worker = is_driver_worker
        self.lora_config = vllm_config.lora_config

        if self.model_config.trust_remote_code:
            # note: lazy import to avoid importing torch before initializing
            from vllm.utils import init_cached_hf_modules
            init_cached_hf_modules()

        neuron_framework = current_platform.get_neuron_framework_to_use()
        if neuron_framework == NeuronFramework.TRANSFORMERS_NEURONX:
            self.model_runner = self.get_tnx_model_runner(vllm_config)
        elif neuron_framework == NeuronFramework.NEURONX_DISTRIBUTED_INFERENCE:
            self.model_runner = self.get_neuronx_distributed_model_runner(
                vllm_config)
        else:
            raise NotImplementedError(
                "Specified framework" +
                f" {os.environ.get('VLLM_NEURON_FRAMEWORK')}" +
                " is either not installed or not supported." +
                " Supported frameworks: " +
                "[transformers-neuronx, neuronx-distributed-inference]")

    def get_tnx_model_runner(self, vllm_config):
        assert (self.lora_config
                is None), ("LoRA is not supported for TransformersNeuronX "
                           "framework.")
        from vllm.worker.multi_step_neuron_model_runner import (
            MultiStepNeuronModelRunner)
        if self.speculative_config is not None:
            return MultiStepNeuronModelRunner(vllm_config=vllm_config)
        else:
            return NeuronModelRunner(vllm_config=vllm_config)

    def get_neuronx_distributed_model_runner(self, vllm_config):
        from vllm.worker.multi_step_neuronx_distributed_model_runner import (
            MultiStepNeuronxDistributedModelRunner)
        from vllm.worker.neuronx_distributed_model_runner import (
            NeuronxDistributedModelRunner)
        if self.speculative_config is not None:
            assert (self.lora_config
                    is None), "LoRA is not supported for Speculative Decoding"
            return MultiStepNeuronxDistributedModelRunner(
                vllm_config=vllm_config)
        else:
            return NeuronxDistributedModelRunner(vllm_config=vllm_config)

    def init_device(self) -> None:
        self.init_distributed_environment()

        # Set random seed.
        set_random_seed(self.model_config.seed)

    def load_model(self):
        self.model_runner.load_model()

    def determine_num_available_blocks(self) -> Tuple[int, int]:
        """Determine the number of available KV blocks.

        Swapping is not yet supported, so always return num_cpu_blocks=0.

        We configure num_gpu_blocks to be equal to max_num_seqs.
        """
        # Set the number of GPU blocks to be the same as the maximum number of
        # sequences that can be processed in a single batch. This is equivalent
        # to schedule without PagedAttention.
        num_gpu_blocks = self.scheduler_config.max_num_seqs + 1

        # Swap not yet supported with Neuron backend.
        num_cpu_blocks = 0

        return num_gpu_blocks, num_cpu_blocks

    def initialize_cache(self, num_gpu_blocks: int,
                         num_cpu_blocks: int) -> None:
        """Initialize the KV cache.
        """

        # Different values are not tested.
        assert num_cpu_blocks == 0
        assert num_gpu_blocks == self.scheduler_config.max_num_seqs + 1

        self.cache_config.num_gpu_blocks = num_gpu_blocks
        self.cache_config.num_cpu_blocks = num_cpu_blocks

    @property
    def do_metadata_broadcast(self) -> bool:
        return False

    @property
    def kv_cache(self) -> Optional[List[List[torch.Tensor]]]:
        return None

    @torch.inference_mode()
    def prepare_worker_input(
            self, execute_model_req: ExecuteModelRequest) -> WorkerInput:
        return WorkerInput(num_seq_groups=len(
            execute_model_req.seq_group_metadata_list), )

    def execute_worker(self, worker_input: WorkerInput) -> None:
        pass

    def get_cache_block_size_bytes(self) -> int:
        """Determine the size in bytes of a cache block.

        This is required for speculative decoding; it is not yet implemented.
        """
        raise NotImplementedError

    def init_distributed_environment(self):
        """Neuron uses transformers-neuronx for tensor parallelism.

        vLLM still needs the environment initialized when TP/PP > 1
        """
        init_distributed_environment(
            world_size=1,
            rank=self.rank,
            local_rank=self.local_rank,
            distributed_init_method=self.distributed_init_method,
            backend="gloo",
        )

        ensure_model_parallel_initialized(
            1,
            1,
        )

    def add_lora(self, lora_request: LoRARequest) -> bool:
        if current_platform.use_transformers_neuronx():
            raise NotImplementedError(
                f"{type(self)} does not support LoRA with Neuron Framework "
                f"Transformers NeuronX")
        return self.model_runner.add_lora(lora_request)

    def remove_lora(self, lora_id: int) -> bool:
        if current_platform.use_transformers_neuronx():
            raise NotImplementedError(
                f"{type(self)} does not support LoRA with Neuron Framework "
                f"Transformers NeuronX")
        return self.model_runner.remove_lora(lora_id)

    def pin_lora(self, lora_id: int) -> bool:
        if current_platform.use_transformers_neuronx():
            raise NotImplementedError(
                f"{type(self)} does not support LoRA with Neuron Framework "
                f"Transformers NeuronX")
        return self.model_runner.pin_lora(lora_id)

    def list_loras(self) -> Set[int]:
        if current_platform.use_transformers_neuronx():
            raise NotImplementedError(
                f"{type(self)} does not support LoRA with Neuron Framework "
                f"Transformers NeuronX")
        return self.model_runner.list_loras()

distributed_init_method instance-attribute

distributed_init_method = distributed_init_method

do_metadata_broadcast property

do_metadata_broadcast: bool

is_driver_worker instance-attribute

is_driver_worker = is_driver_worker

kv_cache property

kv_cache: Optional[List[List[Tensor]]]

local_rank instance-attribute

local_rank = local_rank

lora_config instance-attribute

lora_config = lora_config

model_runner instance-attribute

model_runner: NeuronModelRunner

rank instance-attribute

rank = rank

__init__

__init__(
    vllm_config: VllmConfig,
    local_rank: int,
    rank: int,
    distributed_init_method: str,
    is_driver_worker: bool = False,
) -> None
Source code in vllm/worker/neuron_worker.py
def __init__(self,
             vllm_config: VllmConfig,
             local_rank: int,
             rank: int,
             distributed_init_method: str,
             is_driver_worker: bool = False) -> None:
    WorkerBase.__init__(self, vllm_config=vllm_config)
    self.local_rank = local_rank
    self.rank = rank
    self.distributed_init_method = distributed_init_method
    self.is_driver_worker = is_driver_worker
    self.lora_config = vllm_config.lora_config

    if self.model_config.trust_remote_code:
        # note: lazy import to avoid importing torch before initializing
        from vllm.utils import init_cached_hf_modules
        init_cached_hf_modules()

    neuron_framework = current_platform.get_neuron_framework_to_use()
    if neuron_framework == NeuronFramework.TRANSFORMERS_NEURONX:
        self.model_runner = self.get_tnx_model_runner(vllm_config)
    elif neuron_framework == NeuronFramework.NEURONX_DISTRIBUTED_INFERENCE:
        self.model_runner = self.get_neuronx_distributed_model_runner(
            vllm_config)
    else:
        raise NotImplementedError(
            "Specified framework" +
            f" {os.environ.get('VLLM_NEURON_FRAMEWORK')}" +
            " is either not installed or not supported." +
            " Supported frameworks: " +
            "[transformers-neuronx, neuronx-distributed-inference]")

add_lora

add_lora(lora_request: LoRARequest) -> bool
Source code in vllm/worker/neuron_worker.py
def add_lora(self, lora_request: LoRARequest) -> bool:
    if current_platform.use_transformers_neuronx():
        raise NotImplementedError(
            f"{type(self)} does not support LoRA with Neuron Framework "
            f"Transformers NeuronX")
    return self.model_runner.add_lora(lora_request)

determine_num_available_blocks

determine_num_available_blocks() -> Tuple[int, int]

Determine the number of available KV blocks.

Swapping is not yet supported, so always return num_cpu_blocks=0.

We configure num_gpu_blocks to be equal to max_num_seqs.

Source code in vllm/worker/neuron_worker.py
def determine_num_available_blocks(self) -> Tuple[int, int]:
    """Determine the number of available KV blocks.

    Swapping is not yet supported, so always return num_cpu_blocks=0.

    We configure num_gpu_blocks to be equal to max_num_seqs.
    """
    # Set the number of GPU blocks to be the same as the maximum number of
    # sequences that can be processed in a single batch. This is equivalent
    # to schedule without PagedAttention.
    num_gpu_blocks = self.scheduler_config.max_num_seqs + 1

    # Swap not yet supported with Neuron backend.
    num_cpu_blocks = 0

    return num_gpu_blocks, num_cpu_blocks

execute_worker

execute_worker(worker_input: WorkerInput) -> None
Source code in vllm/worker/neuron_worker.py
def execute_worker(self, worker_input: WorkerInput) -> None:
    pass

get_cache_block_size_bytes

get_cache_block_size_bytes() -> int

Determine the size in bytes of a cache block.

This is required for speculative decoding; it is not yet implemented.

Source code in vllm/worker/neuron_worker.py
def get_cache_block_size_bytes(self) -> int:
    """Determine the size in bytes of a cache block.

    This is required for speculative decoding; it is not yet implemented.
    """
    raise NotImplementedError

get_neuronx_distributed_model_runner

get_neuronx_distributed_model_runner(vllm_config)
Source code in vllm/worker/neuron_worker.py
def get_neuronx_distributed_model_runner(self, vllm_config):
    from vllm.worker.multi_step_neuronx_distributed_model_runner import (
        MultiStepNeuronxDistributedModelRunner)
    from vllm.worker.neuronx_distributed_model_runner import (
        NeuronxDistributedModelRunner)
    if self.speculative_config is not None:
        assert (self.lora_config
                is None), "LoRA is not supported for Speculative Decoding"
        return MultiStepNeuronxDistributedModelRunner(
            vllm_config=vllm_config)
    else:
        return NeuronxDistributedModelRunner(vllm_config=vllm_config)

get_tnx_model_runner

get_tnx_model_runner(vllm_config)
Source code in vllm/worker/neuron_worker.py
def get_tnx_model_runner(self, vllm_config):
    assert (self.lora_config
            is None), ("LoRA is not supported for TransformersNeuronX "
                       "framework.")
    from vllm.worker.multi_step_neuron_model_runner import (
        MultiStepNeuronModelRunner)
    if self.speculative_config is not None:
        return MultiStepNeuronModelRunner(vllm_config=vllm_config)
    else:
        return NeuronModelRunner(vllm_config=vllm_config)

init_device

init_device() -> None
Source code in vllm/worker/neuron_worker.py
def init_device(self) -> None:
    self.init_distributed_environment()

    # Set random seed.
    set_random_seed(self.model_config.seed)

init_distributed_environment

init_distributed_environment()

Neuron uses transformers-neuronx for tensor parallelism.

vLLM still needs the environment initialized when TP/PP > 1

Source code in vllm/worker/neuron_worker.py
def init_distributed_environment(self):
    """Neuron uses transformers-neuronx for tensor parallelism.

    vLLM still needs the environment initialized when TP/PP > 1
    """
    init_distributed_environment(
        world_size=1,
        rank=self.rank,
        local_rank=self.local_rank,
        distributed_init_method=self.distributed_init_method,
        backend="gloo",
    )

    ensure_model_parallel_initialized(
        1,
        1,
    )

initialize_cache

initialize_cache(
    num_gpu_blocks: int, num_cpu_blocks: int
) -> None

Initialize the KV cache.

Source code in vllm/worker/neuron_worker.py
def initialize_cache(self, num_gpu_blocks: int,
                     num_cpu_blocks: int) -> None:
    """Initialize the KV cache.
    """

    # Different values are not tested.
    assert num_cpu_blocks == 0
    assert num_gpu_blocks == self.scheduler_config.max_num_seqs + 1

    self.cache_config.num_gpu_blocks = num_gpu_blocks
    self.cache_config.num_cpu_blocks = num_cpu_blocks

list_loras

list_loras() -> Set[int]
Source code in vllm/worker/neuron_worker.py
def list_loras(self) -> Set[int]:
    if current_platform.use_transformers_neuronx():
        raise NotImplementedError(
            f"{type(self)} does not support LoRA with Neuron Framework "
            f"Transformers NeuronX")
    return self.model_runner.list_loras()

load_model

load_model()
Source code in vllm/worker/neuron_worker.py
def load_model(self):
    self.model_runner.load_model()

pin_lora

pin_lora(lora_id: int) -> bool
Source code in vllm/worker/neuron_worker.py
def pin_lora(self, lora_id: int) -> bool:
    if current_platform.use_transformers_neuronx():
        raise NotImplementedError(
            f"{type(self)} does not support LoRA with Neuron Framework "
            f"Transformers NeuronX")
    return self.model_runner.pin_lora(lora_id)

prepare_worker_input

prepare_worker_input(
    execute_model_req: ExecuteModelRequest,
) -> WorkerInput
Source code in vllm/worker/neuron_worker.py
@torch.inference_mode()
def prepare_worker_input(
        self, execute_model_req: ExecuteModelRequest) -> WorkerInput:
    return WorkerInput(num_seq_groups=len(
        execute_model_req.seq_group_metadata_list), )

remove_lora

remove_lora(lora_id: int) -> bool
Source code in vllm/worker/neuron_worker.py
def remove_lora(self, lora_id: int) -> bool:
    if current_platform.use_transformers_neuronx():
        raise NotImplementedError(
            f"{type(self)} does not support LoRA with Neuron Framework "
            f"Transformers NeuronX")
    return self.model_runner.remove_lora(lora_id)