Skip to content

vllm.spec_decode.metrics

Timer module-attribute

Timer = Callable[[], float]

AsyncMetricsCollector

Class which copies rejection/typical-acceptance sampler metrics from the device to CPU on a non-default Torch stream.

Source code in vllm/spec_decode/metrics.py
class AsyncMetricsCollector:
    """Class which copies rejection/typical-acceptance sampler metrics
    from the device to CPU on a non-default Torch stream.
    """

    def __init__(self,
                 spec_decode_sampler: SpecDecodeBaseSampler,
                 timer: Optional[Timer] = None,
                 collect_interval_s: float = 5.0):
        self.spec_decode_sampler = spec_decode_sampler
        self._timer = time.time if timer is None else timer

        self._rank: Optional[int] = None

        # We don't have a device set yet.
        self._copy_stream: Optional[torch.cuda.Stream] = None

        self._in_flight_copy: Optional[torch.cuda.Event] = None

        pin_memory = is_pin_memory_available()
        self._aggregate_num_accepted_tokens = torch.tensor(
            0, dtype=torch.long, device="cpu", pin_memory=pin_memory)
        self._aggregate_num_emitted_tokens = torch.tensor(
            0, dtype=torch.long, device="cpu", pin_memory=pin_memory)
        self._aggregate_num_draft_tokens = 0

        self._rejsample_metrics_collect_interval_s = collect_interval_s
        self._last_metrics_collect_time = self._timer()

    def init_gpu_tensors(self, rank: int) -> None:
        self._rank = rank
        self._copy_stream = torch.cuda.Stream()

    def init_tensors(self,
                     rank: int,
                     device_type: Union[torch.device, str] = 'cuda') -> None:
        self._rank = rank
        if isinstance(device_type, torch.device):
            device_type = device_type.type
        stream = current_platform.Stream
        if stream is not None:
            self._copy_stream = stream()

    def maybe_collect_rejsample_metrics(
            self, k: int) -> Optional[SpecDecodeWorkerMetrics]:
        # Skip for any platform that doesn't have device Event
        if current_platform.Event is None:
            return None

        # If a copy was initiated in the previous call, collect and return.
        if self._in_flight_copy is not None:
            ready_event = self._in_flight_copy
            self._in_flight_copy = None
            return self._collect_rejsample_metrics(k, ready_event)

        # Otherwise, check if we should start a new copy.
        if self._should_collect_rejsample_metrics(self._timer()):
            assert self._in_flight_copy is None
            self._in_flight_copy = self._copy_rejsample_metrics_async()

        return None

    def _should_collect_rejsample_metrics(self, now: float) -> bool:
        """Return whether or not this iteration should print sampling
        metrics.
        """
        if self._rank != 0:
            return False

        return now - self._last_metrics_collect_time >= self._rejsample_metrics_collect_interval_s  # noqa: E501

    def _copy_rejsample_metrics_async(self) -> torch.cuda.Event:
        """Copy rejection/typical-acceptance sampling metrics
        (number of accepted tokens, etc) to CPU asynchronously.

        Returns a device event recording when the copy is complete.
        """
        assert self._copy_stream is not None
        self._copy_stream.wait_stream(current_platform.current_stream())

        with current_platform.stream(self._copy_stream):
            self._aggregate_num_accepted_tokens.copy_(
                self.spec_decode_sampler.num_accepted_tokens,
                non_blocking=True)
            self._aggregate_num_emitted_tokens.copy_(
                self.spec_decode_sampler.num_emitted_tokens, non_blocking=True)
            # Number of draft tokens is calculated on CPU, so no copy is
            # required.
            self._aggregate_num_draft_tokens = (
                self.spec_decode_sampler.num_draft_tokens)

        aggregate_metrics_ready = current_platform.Event()
        aggregate_metrics_ready.record(self._copy_stream)

        return aggregate_metrics_ready

    def _collect_rejsample_metrics(
            self, k: int,
            ready_event: torch.cuda.Event) -> SpecDecodeWorkerMetrics:
        """Create metrics object from statistics copied asynchronously.

        Args:
            k: int. The number of speculative tokens; used to determine system
                efficiency.
            ready_event: torch.cuda.Event. The CUDA event recording when the
                async GPU->CPU copy is complete.
        """

        ready_event.synchronize()

        # update time of last collection
        self._last_metrics_collect_time = self._timer()

        accepted_tokens = self._aggregate_num_accepted_tokens.item()
        emitted_tokens = self._aggregate_num_emitted_tokens.item()
        draft_tokens = self._aggregate_num_draft_tokens

        max_num_emitted_tokens = self.get_max_num_emitted_tokens(
            draft_tokens, k)

        if draft_tokens > 0:
            draft_acceptance_rate = accepted_tokens / draft_tokens
        else:
            draft_acceptance_rate = float("nan")

        if max_num_emitted_tokens > 0:
            system_efficiency = emitted_tokens / max_num_emitted_tokens
        else:
            system_efficiency = float("nan")

        return SpecDecodeWorkerMetrics(
            num_spec_tokens=k,
            draft_acceptance_rate=draft_acceptance_rate,
            system_efficiency=system_efficiency,
            accepted_tokens=accepted_tokens,
            draft_tokens=draft_tokens,
            emitted_tokens=emitted_tokens,
        )

    @staticmethod
    def get_max_num_emitted_tokens(draft_tokens: int, k: int) -> int:
        """Calculate the number of emitted tokens, assuming all tokens are
        accepted.

        This is equal to the number of sequences that have been speculated on,
        times (speculation len + 1). The +1 comes from the bonus token.
        """
        # Determine the number of sequences that have been speculated on. Since
        # the batch size can be variable, we divide by k.
        assert draft_tokens % k == 0
        total_num_spec_seqs = draft_tokens // k

        # A single sequence may emit k accepted tokens and one bonus token in
        # the best case.
        num_emitted_per_seq_if_all_accepted = k + 1

        # The max num of emitted tokens is the number of speculated sequences
        # times the max emitted per seq.
        return total_num_spec_seqs * num_emitted_per_seq_if_all_accepted

_aggregate_num_accepted_tokens instance-attribute

_aggregate_num_accepted_tokens = tensor(
    0, dtype=long, device="cpu", pin_memory=pin_memory
)

_aggregate_num_draft_tokens instance-attribute

_aggregate_num_draft_tokens = 0

_aggregate_num_emitted_tokens instance-attribute

_aggregate_num_emitted_tokens = tensor(
    0, dtype=long, device="cpu", pin_memory=pin_memory
)

_copy_stream instance-attribute

_copy_stream: Optional[Stream] = None

_in_flight_copy instance-attribute

_in_flight_copy: Optional[Event] = None

_last_metrics_collect_time instance-attribute

_last_metrics_collect_time = _timer()

_rank instance-attribute

_rank: Optional[int] = None

_rejsample_metrics_collect_interval_s instance-attribute

_rejsample_metrics_collect_interval_s = collect_interval_s

_timer instance-attribute

_timer = time if timer is None else timer

spec_decode_sampler instance-attribute

spec_decode_sampler = spec_decode_sampler

__init__

__init__(
    spec_decode_sampler: SpecDecodeBaseSampler,
    timer: Optional[Timer] = None,
    collect_interval_s: float = 5.0,
)
Source code in vllm/spec_decode/metrics.py
def __init__(self,
             spec_decode_sampler: SpecDecodeBaseSampler,
             timer: Optional[Timer] = None,
             collect_interval_s: float = 5.0):
    self.spec_decode_sampler = spec_decode_sampler
    self._timer = time.time if timer is None else timer

    self._rank: Optional[int] = None

    # We don't have a device set yet.
    self._copy_stream: Optional[torch.cuda.Stream] = None

    self._in_flight_copy: Optional[torch.cuda.Event] = None

    pin_memory = is_pin_memory_available()
    self._aggregate_num_accepted_tokens = torch.tensor(
        0, dtype=torch.long, device="cpu", pin_memory=pin_memory)
    self._aggregate_num_emitted_tokens = torch.tensor(
        0, dtype=torch.long, device="cpu", pin_memory=pin_memory)
    self._aggregate_num_draft_tokens = 0

    self._rejsample_metrics_collect_interval_s = collect_interval_s
    self._last_metrics_collect_time = self._timer()

_collect_rejsample_metrics

_collect_rejsample_metrics(
    k: int, ready_event: Event
) -> SpecDecodeWorkerMetrics

Create metrics object from statistics copied asynchronously.

Parameters:

Name Type Description Default
k int

int. The number of speculative tokens; used to determine system efficiency.

required
ready_event Event

torch.cuda.Event. The CUDA event recording when the async GPU->CPU copy is complete.

required
Source code in vllm/spec_decode/metrics.py
def _collect_rejsample_metrics(
        self, k: int,
        ready_event: torch.cuda.Event) -> SpecDecodeWorkerMetrics:
    """Create metrics object from statistics copied asynchronously.

    Args:
        k: int. The number of speculative tokens; used to determine system
            efficiency.
        ready_event: torch.cuda.Event. The CUDA event recording when the
            async GPU->CPU copy is complete.
    """

    ready_event.synchronize()

    # update time of last collection
    self._last_metrics_collect_time = self._timer()

    accepted_tokens = self._aggregate_num_accepted_tokens.item()
    emitted_tokens = self._aggregate_num_emitted_tokens.item()
    draft_tokens = self._aggregate_num_draft_tokens

    max_num_emitted_tokens = self.get_max_num_emitted_tokens(
        draft_tokens, k)

    if draft_tokens > 0:
        draft_acceptance_rate = accepted_tokens / draft_tokens
    else:
        draft_acceptance_rate = float("nan")

    if max_num_emitted_tokens > 0:
        system_efficiency = emitted_tokens / max_num_emitted_tokens
    else:
        system_efficiency = float("nan")

    return SpecDecodeWorkerMetrics(
        num_spec_tokens=k,
        draft_acceptance_rate=draft_acceptance_rate,
        system_efficiency=system_efficiency,
        accepted_tokens=accepted_tokens,
        draft_tokens=draft_tokens,
        emitted_tokens=emitted_tokens,
    )

_copy_rejsample_metrics_async

_copy_rejsample_metrics_async() -> Event

Copy rejection/typical-acceptance sampling metrics (number of accepted tokens, etc) to CPU asynchronously.

Returns a device event recording when the copy is complete.

Source code in vllm/spec_decode/metrics.py
def _copy_rejsample_metrics_async(self) -> torch.cuda.Event:
    """Copy rejection/typical-acceptance sampling metrics
    (number of accepted tokens, etc) to CPU asynchronously.

    Returns a device event recording when the copy is complete.
    """
    assert self._copy_stream is not None
    self._copy_stream.wait_stream(current_platform.current_stream())

    with current_platform.stream(self._copy_stream):
        self._aggregate_num_accepted_tokens.copy_(
            self.spec_decode_sampler.num_accepted_tokens,
            non_blocking=True)
        self._aggregate_num_emitted_tokens.copy_(
            self.spec_decode_sampler.num_emitted_tokens, non_blocking=True)
        # Number of draft tokens is calculated on CPU, so no copy is
        # required.
        self._aggregate_num_draft_tokens = (
            self.spec_decode_sampler.num_draft_tokens)

    aggregate_metrics_ready = current_platform.Event()
    aggregate_metrics_ready.record(self._copy_stream)

    return aggregate_metrics_ready

_should_collect_rejsample_metrics

_should_collect_rejsample_metrics(now: float) -> bool

Return whether or not this iteration should print sampling metrics.

Source code in vllm/spec_decode/metrics.py
def _should_collect_rejsample_metrics(self, now: float) -> bool:
    """Return whether or not this iteration should print sampling
    metrics.
    """
    if self._rank != 0:
        return False

    return now - self._last_metrics_collect_time >= self._rejsample_metrics_collect_interval_s  # noqa: E501

get_max_num_emitted_tokens staticmethod

get_max_num_emitted_tokens(
    draft_tokens: int, k: int
) -> int

Calculate the number of emitted tokens, assuming all tokens are accepted.

This is equal to the number of sequences that have been speculated on, times (speculation len + 1). The +1 comes from the bonus token.

Source code in vllm/spec_decode/metrics.py
@staticmethod
def get_max_num_emitted_tokens(draft_tokens: int, k: int) -> int:
    """Calculate the number of emitted tokens, assuming all tokens are
    accepted.

    This is equal to the number of sequences that have been speculated on,
    times (speculation len + 1). The +1 comes from the bonus token.
    """
    # Determine the number of sequences that have been speculated on. Since
    # the batch size can be variable, we divide by k.
    assert draft_tokens % k == 0
    total_num_spec_seqs = draft_tokens // k

    # A single sequence may emit k accepted tokens and one bonus token in
    # the best case.
    num_emitted_per_seq_if_all_accepted = k + 1

    # The max num of emitted tokens is the number of speculated sequences
    # times the max emitted per seq.
    return total_num_spec_seqs * num_emitted_per_seq_if_all_accepted

init_gpu_tensors

init_gpu_tensors(rank: int) -> None
Source code in vllm/spec_decode/metrics.py
def init_gpu_tensors(self, rank: int) -> None:
    self._rank = rank
    self._copy_stream = torch.cuda.Stream()

init_tensors

init_tensors(
    rank: int, device_type: Union[device, str] = "cuda"
) -> None
Source code in vllm/spec_decode/metrics.py
def init_tensors(self,
                 rank: int,
                 device_type: Union[torch.device, str] = 'cuda') -> None:
    self._rank = rank
    if isinstance(device_type, torch.device):
        device_type = device_type.type
    stream = current_platform.Stream
    if stream is not None:
        self._copy_stream = stream()

maybe_collect_rejsample_metrics

maybe_collect_rejsample_metrics(
    k: int,
) -> Optional[SpecDecodeWorkerMetrics]
Source code in vllm/spec_decode/metrics.py
def maybe_collect_rejsample_metrics(
        self, k: int) -> Optional[SpecDecodeWorkerMetrics]:
    # Skip for any platform that doesn't have device Event
    if current_platform.Event is None:
        return None

    # If a copy was initiated in the previous call, collect and return.
    if self._in_flight_copy is not None:
        ready_event = self._in_flight_copy
        self._in_flight_copy = None
        return self._collect_rejsample_metrics(k, ready_event)

    # Otherwise, check if we should start a new copy.
    if self._should_collect_rejsample_metrics(self._timer()):
        assert self._in_flight_copy is None
        self._in_flight_copy = self._copy_rejsample_metrics_async()

    return None

SpecDecodeWorkerMetrics

Bases: Struct

Dataclass holding metrics emitted from the spec decode worker.

Source code in vllm/spec_decode/metrics.py
class SpecDecodeWorkerMetrics(
        msgspec.Struct,
        omit_defaults=True,  # type: ignore[call-arg]
        array_like=True):  # type: ignore[call-arg]
    """Dataclass holding metrics emitted from the spec decode worker.
    """

    # The empirical acceptance rate of the proposal method on a per-token basis.
    # This is useful for evaluating how well the proposal method aligns with the
    # scoring method.
    draft_acceptance_rate: float

    # The empirical efficiency, measured as the number of tokens emitted by the
    # system divided by the number of tokens that could be emitted by the system
    # if the proposal method were perfect.
    system_efficiency: float

    # The number of speculative tokens produced by the proposal method.
    draft_tokens: int

    # The number of tokens emitted by the entire system.
    emitted_tokens: int

    # The number of tokens accepted by the scoring model and verification
    # routine, e.g. Llama2-70B and lossless rejection sampling.
    #
    # NOTE: Any token accepted by the verification routine is considered
    # accepted (regardless of if the speculative prefix is also accepted). The
    # user will usually see less accepted tokens. This metric is helpful when
    # evaluating alignment of the proposal method with the scoring model.
    accepted_tokens: int

    # The number of speculative tokens per sequence.
    num_spec_tokens: int

accepted_tokens instance-attribute

accepted_tokens: int

draft_acceptance_rate instance-attribute

draft_acceptance_rate: float

draft_tokens instance-attribute

draft_tokens: int

emitted_tokens instance-attribute

emitted_tokens: int

num_spec_tokens instance-attribute

num_spec_tokens: int

system_efficiency instance-attribute

system_efficiency: float