Skip to content

vllm.model_executor.layers.logits_processor

A layer that compute logits from hidden_stats.

_logits_processor_threadpool module-attribute

_logits_processor_threadpool: Optional[
    ThreadPoolExecutor
] = None

LogitsProcessor

Bases: Module

Process logits and apply logits processors from sampling metadata.

This layer does the following: 1. Gather logits from model hidden_states. 2. Scale logits if needed. 3. Apply logits processors (if any).

Source code in vllm/model_executor/layers/logits_processor.py
class LogitsProcessor(nn.Module):
    """Process logits and apply logits processors from sampling metadata.

    This layer does the following:
    1. Gather logits from model hidden_states.
    2. Scale logits if needed.
    3. Apply logits processors (if any).
    """

    def __init__(self,
                 vocab_size: int,
                 org_vocab_size: Optional[int] = None,
                 scale: float = 1.0,
                 logits_as_input: bool = False,
                 soft_cap: Optional[float] = None) -> None:
        """
        Args:
            scale: A scaling factor to apply to the logits.
        """
        super().__init__()
        self.scale = scale
        self.vocab_size = vocab_size
        # Whether the input is logits (default is hidden states).
        self.logits_as_input = logits_as_input
        # original vocabulary size (without LoRA).
        self.org_vocab_size = org_vocab_size or vocab_size
        # Soft cap the logits. Used in Gemma 2.
        self.soft_cap = soft_cap
        # Whether to use gather or all-gather to gather the logits.
        self.use_all_gather = current_platform.use_all_gather()

    def forward(
        self,
        lm_head: VocabParallelEmbedding,
        hidden_states: torch.Tensor,
        sampling_metadata: Optional[SamplingMetadata] = None,
        embedding_bias: Optional[torch.Tensor] = None,
    ) -> Optional[torch.Tensor]:
        if self.logits_as_input:
            logits = hidden_states
        else:
            if sampling_metadata is not None:
                hidden_states = _prune_hidden_states(hidden_states,
                                                     sampling_metadata)

            # Get the logits for the next tokens.
            logits = self._get_logits(hidden_states, lm_head, embedding_bias)
        if logits is not None:
            if self.soft_cap is not None:
                logits = logits / self.soft_cap
                logits = torch.tanh(logits)
                logits = logits * self.soft_cap

            if self.scale != 1.0:
                logits *= self.scale

            # Apply logits processors (if any).
            if sampling_metadata is not None and \
                sampling_metadata.seq_groups is not None:
                logits = _apply_logits_processors(logits, sampling_metadata)

        return logits

    def _gather_logits(self, logits: torch.Tensor) -> torch.Tensor:
        """gather/all-gather the logits tensor across model parallel group."""
        if self.use_all_gather:
            # Gather is not supported for some devices such as TPUs.
            # Use all-gather instead.
            # NOTE(woosuk): Here, the outputs of every device should not be None
            # because XLA requires strict SPMD among all devices. Every device
            # should execute the same operations after gathering the logits.
            logits = tensor_model_parallel_all_gather(logits)
        else:
            # None may be returned for rank > 0
            logits = tensor_model_parallel_gather(logits)
        return logits

    def _get_logits(
        self,
        hidden_states: torch.Tensor,
        lm_head: VocabParallelEmbedding,
        embedding_bias: Optional[torch.Tensor],
    ) -> Optional[torch.Tensor]:
        # Get the logits for the next tokens.
        logits = lm_head.quant_method.apply(lm_head,
                                            hidden_states,
                                            bias=embedding_bias)

        # Gather logits for TP
        logits = self._gather_logits(logits)

        # Remove paddings in vocab (if any).
        if logits is not None:
            logits = logits[..., :self.org_vocab_size]
        return logits

    def extra_repr(self) -> str:
        s = f"vocab_size={self.vocab_size}"
        s += f", org_vocab_size={self.org_vocab_size}"
        s += f", scale={self.scale}, logits_as_input={self.logits_as_input}"
        return s

logits_as_input instance-attribute

logits_as_input = logits_as_input

org_vocab_size instance-attribute

org_vocab_size = org_vocab_size or vocab_size

scale instance-attribute

scale = scale

soft_cap instance-attribute

soft_cap = soft_cap

use_all_gather instance-attribute

use_all_gather = use_all_gather()

vocab_size instance-attribute

vocab_size = vocab_size

__init__

__init__(
    vocab_size: int,
    org_vocab_size: Optional[int] = None,
    scale: float = 1.0,
    logits_as_input: bool = False,
    soft_cap: Optional[float] = None,
) -> None

Parameters:

Name Type Description Default
scale float

A scaling factor to apply to the logits.

1.0
Source code in vllm/model_executor/layers/logits_processor.py
def __init__(self,
             vocab_size: int,
             org_vocab_size: Optional[int] = None,
             scale: float = 1.0,
             logits_as_input: bool = False,
             soft_cap: Optional[float] = None) -> None:
    """
    Args:
        scale: A scaling factor to apply to the logits.
    """
    super().__init__()
    self.scale = scale
    self.vocab_size = vocab_size
    # Whether the input is logits (default is hidden states).
    self.logits_as_input = logits_as_input
    # original vocabulary size (without LoRA).
    self.org_vocab_size = org_vocab_size or vocab_size
    # Soft cap the logits. Used in Gemma 2.
    self.soft_cap = soft_cap
    # Whether to use gather or all-gather to gather the logits.
    self.use_all_gather = current_platform.use_all_gather()

_gather_logits

_gather_logits(logits: Tensor) -> Tensor

gather/all-gather the logits tensor across model parallel group.

Source code in vllm/model_executor/layers/logits_processor.py
def _gather_logits(self, logits: torch.Tensor) -> torch.Tensor:
    """gather/all-gather the logits tensor across model parallel group."""
    if self.use_all_gather:
        # Gather is not supported for some devices such as TPUs.
        # Use all-gather instead.
        # NOTE(woosuk): Here, the outputs of every device should not be None
        # because XLA requires strict SPMD among all devices. Every device
        # should execute the same operations after gathering the logits.
        logits = tensor_model_parallel_all_gather(logits)
    else:
        # None may be returned for rank > 0
        logits = tensor_model_parallel_gather(logits)
    return logits

_get_logits

_get_logits(
    hidden_states: Tensor,
    lm_head: VocabParallelEmbedding,
    embedding_bias: Optional[Tensor],
) -> Optional[Tensor]
Source code in vllm/model_executor/layers/logits_processor.py
def _get_logits(
    self,
    hidden_states: torch.Tensor,
    lm_head: VocabParallelEmbedding,
    embedding_bias: Optional[torch.Tensor],
) -> Optional[torch.Tensor]:
    # Get the logits for the next tokens.
    logits = lm_head.quant_method.apply(lm_head,
                                        hidden_states,
                                        bias=embedding_bias)

    # Gather logits for TP
    logits = self._gather_logits(logits)

    # Remove paddings in vocab (if any).
    if logits is not None:
        logits = logits[..., :self.org_vocab_size]
    return logits

extra_repr

extra_repr() -> str
Source code in vllm/model_executor/layers/logits_processor.py
def extra_repr(self) -> str:
    s = f"vocab_size={self.vocab_size}"
    s += f", org_vocab_size={self.org_vocab_size}"
    s += f", scale={self.scale}, logits_as_input={self.logits_as_input}"
    return s

forward

forward(
    lm_head: VocabParallelEmbedding,
    hidden_states: Tensor,
    sampling_metadata: Optional[SamplingMetadata] = None,
    embedding_bias: Optional[Tensor] = None,
) -> Optional[Tensor]
Source code in vllm/model_executor/layers/logits_processor.py
def forward(
    self,
    lm_head: VocabParallelEmbedding,
    hidden_states: torch.Tensor,
    sampling_metadata: Optional[SamplingMetadata] = None,
    embedding_bias: Optional[torch.Tensor] = None,
) -> Optional[torch.Tensor]:
    if self.logits_as_input:
        logits = hidden_states
    else:
        if sampling_metadata is not None:
            hidden_states = _prune_hidden_states(hidden_states,
                                                 sampling_metadata)

        # Get the logits for the next tokens.
        logits = self._get_logits(hidden_states, lm_head, embedding_bias)
    if logits is not None:
        if self.soft_cap is not None:
            logits = logits / self.soft_cap
            logits = torch.tanh(logits)
            logits = logits * self.soft_cap

        if self.scale != 1.0:
            logits *= self.scale

        # Apply logits processors (if any).
        if sampling_metadata is not None and \
            sampling_metadata.seq_groups is not None:
            logits = _apply_logits_processors(logits, sampling_metadata)

    return logits

_apply_logits_processors

_apply_logits_processors(
    logits: Tensor, sampling_metadata: SamplingMetadata
) -> Tensor
Source code in vllm/model_executor/layers/logits_processor.py
def _apply_logits_processors(
    logits: torch.Tensor,
    sampling_metadata: SamplingMetadata,
) -> torch.Tensor:
    found_logits_processors = False
    logits_processed = 0
    logits_row_ids_and_logits_row_futures = []
    for seq_group in sampling_metadata.seq_groups:
        seq_ids = seq_group.seq_ids
        sampling_params = seq_group.sampling_params
        logits_processors = sampling_params.logits_processors
        if logits_processors:
            found_logits_processors = True

            for seq_id, logits_row_idx in zip(seq_ids,
                                              seq_group.sample_indices):
                logits_row = logits[logits_row_idx]
                past_tokens_ids = seq_group.seq_data[seq_id].output_token_ids
                prompt_tokens_ids = seq_group.seq_data[seq_id].prompt_token_ids

                if _logits_processor_threadpool is not None:
                    logits_row_ids_and_logits_row_futures.append(
                        (logits_row_idx,
                         _logits_processor_threadpool.submit(
                             _apply_logits_processors_single_seq, logits_row,
                             logits_processors, past_tokens_ids,
                             prompt_tokens_ids)))
                else:
                    logits[logits_row_idx] = \
                        _apply_logits_processors_single_seq(
                            logits_row, logits_processors, past_tokens_ids,
                            prompt_tokens_ids)

        logits_processed += len(seq_group.sample_indices) + len(
            seq_group.prompt_logprob_indices)

    for logits_row_idx, future in logits_row_ids_and_logits_row_futures:
        logits[logits_row_idx] = future.result()

    if found_logits_processors:
        # verifies that no rows in logits were missed unexpectedly
        assert logits_processed == logits.shape[0]
    return logits

_apply_logits_processors_single_seq

_apply_logits_processors_single_seq(
    logits_row,
    logits_processors,
    past_tokens_ids,
    prompt_tokens_ids,
) -> Tensor
Source code in vllm/model_executor/layers/logits_processor.py
def _apply_logits_processors_single_seq(logits_row, logits_processors,
                                        past_tokens_ids,
                                        prompt_tokens_ids) -> torch.Tensor:
    for logits_processor in logits_processors:
        parameters = inspect.signature(logits_processor).parameters
        if len(parameters) == 3:
            logits_row = logits_processor(prompt_tokens_ids, past_tokens_ids,
                                          logits_row)
        else:
            logits_row = logits_processor(past_tokens_ids, logits_row)
    return logits_row

_prune_hidden_states

_prune_hidden_states(
    hidden_states: Tensor,
    sampling_metadata: SamplingMetadata,
) -> Tensor
Source code in vllm/model_executor/layers/logits_processor.py
def _prune_hidden_states(
    hidden_states: torch.Tensor,
    sampling_metadata: SamplingMetadata,
) -> torch.Tensor:
    # NOTE(kzawora): The if guard is needed for Gaudi - in some scenarios
    # (warmup, profile_run) we might not have selected_token_indices,
    # so we skip pruning.
    if sampling_metadata.selected_token_indices is not None:
        return hidden_states.index_select(
            0, sampling_metadata.selected_token_indices)
    else:
        return hidden_states