Skip to content

vllm.core.block.prefix_caching_block

Token blocks.

PrefixHash module-attribute

PrefixHash = int

_DEFAULT_LAST_ACCESSED_TIME module-attribute

_DEFAULT_LAST_ACCESSED_TIME = -1

logger module-attribute

logger = init_logger(__name__)

BlockTracker

Used to track the status of a block inside the prefix caching allocator

Source code in vllm/core/block/prefix_caching_block.py
class BlockTracker:
    """Used to track the status of a block inside the prefix caching allocator
    """
    __slots__ = ("active", "last_accessed", "computed")

    def reset(self):
        self.last_accessed: float = _DEFAULT_LAST_ACCESSED_TIME
        self.computed: bool = False

    def __init__(self):
        self.active: bool = False
        self.reset()

    def enable(self):
        assert not self.active
        self.active = True
        self.reset()

    def disable(self):
        assert self.active
        self.active = False
        self.reset()

__slots__ class-attribute instance-attribute

__slots__ = ('active', 'last_accessed', 'computed')

active instance-attribute

active: bool = False

__init__

__init__()
Source code in vllm/core/block/prefix_caching_block.py
def __init__(self):
    self.active: bool = False
    self.reset()

disable

disable()
Source code in vllm/core/block/prefix_caching_block.py
def disable(self):
    assert self.active
    self.active = False
    self.reset()

enable

enable()
Source code in vllm/core/block/prefix_caching_block.py
def enable(self):
    assert not self.active
    self.active = True
    self.reset()

reset

reset()
Source code in vllm/core/block/prefix_caching_block.py
def reset(self):
    self.last_accessed: float = _DEFAULT_LAST_ACCESSED_TIME
    self.computed: bool = False

ComputedBlocksTracker

Tracks the computed blocks for each sequence.

Internally, it maintains a map from sequence id to the list of block hashes for the sequence. We cache the hashes of the full blocks for each sequence, and make sure the hash is calculated in the same way as the allocator. When a sequence is being decoded, we also update the sequence's hash accordingly and incrementally.

From the sequence hash, with prefix caching enabled, we could also calculate the number of cached tokens for the sequence by looking up the number of cached block hashes in the allocator.

Source code in vllm/core/block/prefix_caching_block.py
class ComputedBlocksTracker:
    """
    Tracks the computed blocks for each sequence.

    Internally, it maintains a map from sequence id to the list of block hashes
    for the sequence. We cache the hashes of the full blocks for each sequence,
    and make sure the hash is calculated in the same way as the allocator.
    When a sequence is being decoded, we also update the sequence's hash
    accordingly and incrementally.

    From the sequence hash, with prefix caching enabled, we could also calculate
    the number of cached tokens for the sequence by looking up the number of
    cached block hashes in the allocator.
    """

    # Note that we use 'None' as a string here instead of None because
    # as of Python 3.12, hash(None) returns a constant predictable value.
    # This could possibly make it easier to find and exploit hash
    # collisions. 'None' as a string will be hashed differently per process,
    # but consistently within the same process. This is the same as the
    # behavior of None prior to Python 3.12.
    _none_hash: int = hash('None')

    def __init__(
        self,
        allocator: DeviceAwareBlockAllocator,
        block_size: int,
        enable_caching: bool,
    ):
        self._allocator = allocator
        self._block_size = block_size
        self._enable_caching = enable_caching

        # A map from seq_id to the list of block hashes for the
        # sequence. This is so that we don't have to recompute the block hashes
        # for the sequence when we need to check if the sequence is cached.
        # Note a block that's not full will not have its hash calculated and
        # recorded.
        self._seq_id_to_blocks_hashes: Dict[int, List[int]] = {}

        # A map from seq_id to the number of tokens that are cached for the
        # sequence.
        # We need this so that a sequence in continuous prefill doesn't
        # accidentally see its cached token count change. See comments in
        # `get_num_cached_tokens` for more details.
        self._seq_id_to_num_tokens_computed: Dict[int, int] = {}

    def _update_seq_hashes(self, seq: Sequence) -> None:
        """Incrementally update the sequence's block hashes and record them."""
        assert self._enable_caching

        block_hashes_recorded = self._seq_id_to_blocks_hashes.get(
            seq.seq_id, [])
        cur_num_blocks_recorded = len(block_hashes_recorded)
        token_ids = seq.get_token_ids()
        assert len(token_ids) >= cur_num_blocks_recorded * self._block_size, (
            f"The sequence has {len(token_ids)} tokens, but"
            f" already recorded {cur_num_blocks_recorded} blocks. "
            "This should not happen since we assume blocks are "
            "only appended other than recomputation. When the sequence is "
            "recomputed, we should have removed the info of the old blocks.")
        # Update the computed block hashes for the sequence. Since only full
        # blocks are considered as "computed", we take floor here.
        num_computed_blocks = len(token_ids) // self._block_size

        # We need to know the hash of the previous block to compute the hash of
        # the current block so that blocks could be uniquely identified across
        # sequences of prefixes.
        prev_block_hash = (self._none_hash if cur_num_blocks_recorded == 0 else
                           block_hashes_recorded[-1])
        # Only update the computed block hashes for the new blocks
        for i in range(cur_num_blocks_recorded, num_computed_blocks):
            assert len(token_ids) >= (i + 1) * self._block_size
            block_token_ids = token_ids[i * self._block_size:(i + 1) *
                                        self._block_size]

            # NOTE: If there are any factors affecting the block besides
            # token_ids, they should be added as input to extra_hash.
            extra_hash = seq.extra_hash()

            # This has to be kept in sync with the allocator's hash
            # calculation.
            block_hash = PrefixCachingBlock.hash_block_tokens(
                is_first_block=prev_block_hash == self._none_hash,
                prev_block_hash=prev_block_hash,
                cur_block_token_ids=block_token_ids,
                extra_hash=extra_hash,
            )
            block_hashes_recorded.append(block_hash)
            prev_block_hash = block_hash

        self._seq_id_to_blocks_hashes[seq.seq_id] = block_hashes_recorded

    def get_num_cached_tokens(self, seq: Sequence) -> int:
        if not self._enable_caching:
            return 0

        # We always try to update the sequence hashes on the fly.
        # This is to ensure that we don't miss any cached tokens for the
        # sequence during decode.
        # This routine should only update hash for any new blocks too.
        self._update_seq_hashes(seq)

        num_computed_tokens_prev = self._seq_id_to_num_tokens_computed.get(
            seq.seq_id, None)

        # TODO(rickyx): This hack could be removed once we mark blocks as
        # computed correctly with chunked prefills.
        if num_computed_tokens_prev is not None and seq.is_prefill():
            # For a sequence that is still in prefill, we don't
            # recompute the number of cached tokens.
            # This also handles correctly chunked prefill since currently
            # we mark blocks as computed even if the sequence is still partially
            # prefilled. So a continuously prefilled sequence should not
            # see its cached token count change while running.
            return num_computed_tokens_prev

        block_hashes = self._seq_id_to_blocks_hashes[seq.seq_id]

        # This is O(logN), where N is the number of blocks.
        num_cached_blocks = len(
            self._allocator.find_cached_blocks_prefix(block_hashes))
        num_cached_tokens = num_cached_blocks * self._block_size
        self._seq_id_to_num_tokens_computed[seq.seq_id] = num_cached_tokens
        return num_cached_tokens

    def remove_seq(self, seq_id: int) -> None:
        """Stop tracking the sequence."""
        if not self._enable_caching:
            return
        assert seq_id in self._seq_id_to_blocks_hashes
        del self._seq_id_to_blocks_hashes[seq_id]

        assert seq_id in self._seq_id_to_num_tokens_computed
        del self._seq_id_to_num_tokens_computed[seq_id]

_allocator instance-attribute

_allocator = allocator

_block_size instance-attribute

_block_size = block_size

_enable_caching instance-attribute

_enable_caching = enable_caching

_none_hash class-attribute instance-attribute

_none_hash: int = hash('None')

_seq_id_to_blocks_hashes instance-attribute

_seq_id_to_blocks_hashes: Dict[int, List[int]] = {}

_seq_id_to_num_tokens_computed instance-attribute

_seq_id_to_num_tokens_computed: Dict[int, int] = {}

__init__

__init__(
    allocator: DeviceAwareBlockAllocator,
    block_size: int,
    enable_caching: bool,
)
Source code in vllm/core/block/prefix_caching_block.py
def __init__(
    self,
    allocator: DeviceAwareBlockAllocator,
    block_size: int,
    enable_caching: bool,
):
    self._allocator = allocator
    self._block_size = block_size
    self._enable_caching = enable_caching

    # A map from seq_id to the list of block hashes for the
    # sequence. This is so that we don't have to recompute the block hashes
    # for the sequence when we need to check if the sequence is cached.
    # Note a block that's not full will not have its hash calculated and
    # recorded.
    self._seq_id_to_blocks_hashes: Dict[int, List[int]] = {}

    # A map from seq_id to the number of tokens that are cached for the
    # sequence.
    # We need this so that a sequence in continuous prefill doesn't
    # accidentally see its cached token count change. See comments in
    # `get_num_cached_tokens` for more details.
    self._seq_id_to_num_tokens_computed: Dict[int, int] = {}

_update_seq_hashes

_update_seq_hashes(seq: Sequence) -> None

Incrementally update the sequence's block hashes and record them.

Source code in vllm/core/block/prefix_caching_block.py
def _update_seq_hashes(self, seq: Sequence) -> None:
    """Incrementally update the sequence's block hashes and record them."""
    assert self._enable_caching

    block_hashes_recorded = self._seq_id_to_blocks_hashes.get(
        seq.seq_id, [])
    cur_num_blocks_recorded = len(block_hashes_recorded)
    token_ids = seq.get_token_ids()
    assert len(token_ids) >= cur_num_blocks_recorded * self._block_size, (
        f"The sequence has {len(token_ids)} tokens, but"
        f" already recorded {cur_num_blocks_recorded} blocks. "
        "This should not happen since we assume blocks are "
        "only appended other than recomputation. When the sequence is "
        "recomputed, we should have removed the info of the old blocks.")
    # Update the computed block hashes for the sequence. Since only full
    # blocks are considered as "computed", we take floor here.
    num_computed_blocks = len(token_ids) // self._block_size

    # We need to know the hash of the previous block to compute the hash of
    # the current block so that blocks could be uniquely identified across
    # sequences of prefixes.
    prev_block_hash = (self._none_hash if cur_num_blocks_recorded == 0 else
                       block_hashes_recorded[-1])
    # Only update the computed block hashes for the new blocks
    for i in range(cur_num_blocks_recorded, num_computed_blocks):
        assert len(token_ids) >= (i + 1) * self._block_size
        block_token_ids = token_ids[i * self._block_size:(i + 1) *
                                    self._block_size]

        # NOTE: If there are any factors affecting the block besides
        # token_ids, they should be added as input to extra_hash.
        extra_hash = seq.extra_hash()

        # This has to be kept in sync with the allocator's hash
        # calculation.
        block_hash = PrefixCachingBlock.hash_block_tokens(
            is_first_block=prev_block_hash == self._none_hash,
            prev_block_hash=prev_block_hash,
            cur_block_token_ids=block_token_ids,
            extra_hash=extra_hash,
        )
        block_hashes_recorded.append(block_hash)
        prev_block_hash = block_hash

    self._seq_id_to_blocks_hashes[seq.seq_id] = block_hashes_recorded

get_num_cached_tokens

get_num_cached_tokens(seq: Sequence) -> int
Source code in vllm/core/block/prefix_caching_block.py
def get_num_cached_tokens(self, seq: Sequence) -> int:
    if not self._enable_caching:
        return 0

    # We always try to update the sequence hashes on the fly.
    # This is to ensure that we don't miss any cached tokens for the
    # sequence during decode.
    # This routine should only update hash for any new blocks too.
    self._update_seq_hashes(seq)

    num_computed_tokens_prev = self._seq_id_to_num_tokens_computed.get(
        seq.seq_id, None)

    # TODO(rickyx): This hack could be removed once we mark blocks as
    # computed correctly with chunked prefills.
    if num_computed_tokens_prev is not None and seq.is_prefill():
        # For a sequence that is still in prefill, we don't
        # recompute the number of cached tokens.
        # This also handles correctly chunked prefill since currently
        # we mark blocks as computed even if the sequence is still partially
        # prefilled. So a continuously prefilled sequence should not
        # see its cached token count change while running.
        return num_computed_tokens_prev

    block_hashes = self._seq_id_to_blocks_hashes[seq.seq_id]

    # This is O(logN), where N is the number of blocks.
    num_cached_blocks = len(
        self._allocator.find_cached_blocks_prefix(block_hashes))
    num_cached_tokens = num_cached_blocks * self._block_size
    self._seq_id_to_num_tokens_computed[seq.seq_id] = num_cached_tokens
    return num_cached_tokens

remove_seq

remove_seq(seq_id: int) -> None

Stop tracking the sequence.

Source code in vllm/core/block/prefix_caching_block.py
def remove_seq(self, seq_id: int) -> None:
    """Stop tracking the sequence."""
    if not self._enable_caching:
        return
    assert seq_id in self._seq_id_to_blocks_hashes
    del self._seq_id_to_blocks_hashes[seq_id]

    assert seq_id in self._seq_id_to_num_tokens_computed
    del self._seq_id_to_num_tokens_computed[seq_id]

LastAccessBlocksTracker

Manages the last access time of the tracked sequences, in order to allow an efficient update of allocator's block last access times

Source code in vllm/core/block/prefix_caching_block.py
class LastAccessBlocksTracker:
    """Manages the last access time of the tracked sequences, in order to allow
    an efficient update of allocator's block last access times
    """

    def __init__(self, allocator):
        self._allocator = allocator
        self._seq_last_access: Dict[int, Optional[float]] = {}

    def add_seq(self, seq_id: int) -> None:
        """Start tracking seq_id
        """
        assert seq_id not in self._seq_last_access
        self._seq_last_access[seq_id] = None

    def remove_seq(self, seq_id: int) -> None:
        """Stop tracking seq_id
        """
        assert seq_id in self._seq_last_access
        del self._seq_last_access[seq_id]

    def update_last_access(self, seq_id: int, time: float) -> None:
        assert seq_id in self._seq_last_access
        self._seq_last_access[seq_id] = time

    def update_seq_blocks_last_access(self, seq_id: int,
                                      block_ids: List[int]) -> None:
        assert seq_id in self._seq_last_access

        ts = self._seq_last_access[seq_id]

        if ts is None:
            # No last access was recorded, no need to update.
            return

        self._allocator.mark_blocks_as_accessed(block_ids, ts)

_allocator instance-attribute

_allocator = allocator

_seq_last_access instance-attribute

_seq_last_access: Dict[int, Optional[float]] = {}

__init__

__init__(allocator)
Source code in vllm/core/block/prefix_caching_block.py
def __init__(self, allocator):
    self._allocator = allocator
    self._seq_last_access: Dict[int, Optional[float]] = {}

add_seq

add_seq(seq_id: int) -> None

Start tracking seq_id

Source code in vllm/core/block/prefix_caching_block.py
def add_seq(self, seq_id: int) -> None:
    """Start tracking seq_id
    """
    assert seq_id not in self._seq_last_access
    self._seq_last_access[seq_id] = None

remove_seq

remove_seq(seq_id: int) -> None

Stop tracking seq_id

Source code in vllm/core/block/prefix_caching_block.py
def remove_seq(self, seq_id: int) -> None:
    """Stop tracking seq_id
    """
    assert seq_id in self._seq_last_access
    del self._seq_last_access[seq_id]

update_last_access

update_last_access(seq_id: int, time: float) -> None
Source code in vllm/core/block/prefix_caching_block.py
def update_last_access(self, seq_id: int, time: float) -> None:
    assert seq_id in self._seq_last_access
    self._seq_last_access[seq_id] = time

update_seq_blocks_last_access

update_seq_blocks_last_access(
    seq_id: int, block_ids: List[int]
) -> None
Source code in vllm/core/block/prefix_caching_block.py
def update_seq_blocks_last_access(self, seq_id: int,
                                  block_ids: List[int]) -> None:
    assert seq_id in self._seq_last_access

    ts = self._seq_last_access[seq_id]

    if ts is None:
        # No last access was recorded, no need to update.
        return

    self._allocator.mark_blocks_as_accessed(block_ids, ts)

PrefixCachingBlock

Bases: Block

A block implementation that supports prefix caching.

The PrefixCachingBlock class represents a block of token IDs with prefix caching capabilities. It wraps a NaiveBlock internally and provides additional functionality for content hashing and promoting immutable blocks with the prefix caching allocator.

Parameters:

Name Type Description Default
prev_block Optional[PrefixCachingBlock]

The previous block in the sequence.

required
token_ids List[int]

The initial token IDs to be stored in the block.

required
block_size int

The maximum number of token IDs that can be stored in the block.

required
allocator BlockAllocator

The prefix caching block allocator associated with this block.

required
block_id Optional[int]

The physical block index of this block. Defaults to None.

None
extra_hash Optional[int]

The hash value of additional factors such as adapters that influence the block, apart from the token_ids.

None
Source code in vllm/core/block/prefix_caching_block.py
class PrefixCachingBlock(Block):
    """A block implementation that supports prefix caching.

    The PrefixCachingBlock class represents a block of token IDs with prefix
    caching capabilities. It wraps a NaiveBlock internally and provides
    additional functionality for content hashing and promoting immutable blocks
    with the prefix caching allocator.

    Args:
        prev_block (Optional[PrefixCachingBlock]): The previous block in the
            sequence.
        token_ids (List[int]): The initial token IDs to be stored in the block.
        block_size (int): The maximum number of token IDs that can be stored in
            the block.
        allocator (BlockAllocator): The prefix
            caching block allocator associated with this block.
        block_id (Optional[int], optional): The physical block index
            of this block. Defaults to None.
        extra_hash (Optional[int]): The hash value of additional factors
            such as adapters that influence the block, apart from the token_ids.
    """

    # Note that we use 'None' as a string here instead of None because
    # as of Python 3.12, hash(None) returns a constant predictable value.
    # This could possibly make it easier to find and exploit hash
    # collisions. 'None' as a string will be hashed differently per process,
    # but consistently within the same process. This is the same as the
    # behavior of None prior to Python 3.12.
    _none_hash: int = hash('None')

    def __init__(
        self,
        prev_block: Optional[Block],
        token_ids: List[int],
        block_size: int,
        allocator: BlockAllocator,
        block_id: Optional[int] = None,
        computed: bool = False,
        extra_hash: Optional[int] = None,
    ):
        assert isinstance(allocator, PrefixCachingBlockAllocator), (
            "Currently this class is only tested with "
            "PrefixCachingBlockAllocator. Got instead allocator = {}".format(
                allocator))
        assert_prefix_caching_block_or_none(prev_block)

        self._prev_block = prev_block
        self._cached_content_hash: Optional[int] = None
        self._cached_num_tokens_total: int = 0
        self._allocator = allocator
        self._last_accessed: float = _DEFAULT_LAST_ACCESSED_TIME
        self._computed = computed
        self._extra_hash = extra_hash

        # On the first time, we create the block object, and next we only
        # reinitialize it
        if hasattr(self, "_block"):
            self._block.__init__(  # type: ignore[has-type]
                prev_block=prev_block,
                token_ids=token_ids,
                block_size=block_size,
                block_id=block_id,
                allocator=self._allocator)
        else:
            self._block = NaiveBlock(prev_block=prev_block,
                                     token_ids=token_ids,
                                     block_size=block_size,
                                     block_id=block_id,
                                     allocator=self._allocator)

        self._update_num_tokens_total()

    def _update_num_tokens_total(self):
        """Incrementally computes the number of tokens that there is
        till the current block (included)
        """
        res = 0

        # Add all previous blocks
        if self._prev_block is not None:
            res += self._prev_block.num_tokens_total

        # Add current block
        res += len(self.token_ids)

        self._cached_num_tokens_total = res

    @property
    def computed(self) -> bool:
        return self._computed

    @computed.setter
    def computed(self, value) -> None:
        self._computed = value

    @property
    def last_accessed(self) -> float:
        return self._last_accessed

    @last_accessed.setter
    def last_accessed(self, last_accessed_ts: float):
        self._last_accessed = last_accessed_ts

    def append_token_ids(self, token_ids: List[int]) -> None:
        """Appends the given token IDs to the block and registers the block as
        immutable if the block becomes full.

        Args:
            token_ids (List[int]): The token IDs to be appended to the block.
        """
        # Ensure this is mutable block (not promoted)
        assert self.content_hash is None
        assert not self.computed

        if len(token_ids) == 0:
            return

        # Ensure there are input tokens
        assert token_ids, "Got token_ids = {}".format(token_ids)

        # Naive block handles CoW.
        self._block.append_token_ids(token_ids)
        self._update_num_tokens_total()

        # If the content hash is present, then the block can be made immutable.
        # Register ourselves with the allocator, potentially replacing the
        # physical block index.
        if self.content_hash is not None:
            self.block_id = self._allocator.promote_to_immutable_block(self)

    @property
    def block_id(self) -> Optional[int]:
        return self._block.block_id

    @block_id.setter
    def block_id(self, value) -> None:
        self._block.block_id = value

    @property
    def is_full(self) -> bool:
        return self._block.is_full

    @property
    def num_empty_slots(self) -> int:
        return self._block.num_empty_slots

    @property
    def num_tokens_total(self) -> int:
        return self._cached_num_tokens_total

    @property
    def block_size(self) -> int:
        return self._block.block_size

    @property
    def token_ids(self) -> List[int]:
        return self._block.token_ids

    @property
    def prev_block(self) -> Optional[Block]:
        return self._prev_block

    @property
    def extra_hash(self) -> Optional[int]:
        return self._extra_hash

    @property
    def content_hash(self) -> Optional[int]:
        """Return the content-based hash of the current block, or None if it is
        not yet defined.

        For the content-based hash to be defined, the current block must be
        full.
        """
        # If the hash is already computed, return it.
        if self._cached_content_hash is not None:
            return self._cached_content_hash

        # We cannot compute a hash for the current block because it is not full.
        if not self.is_full:
            return None

        is_first_block = self._prev_block is None
        prev_block_hash = (
            self._none_hash if is_first_block else
            self._prev_block.content_hash  # type: ignore
        )

        # Previous block exists but does not yet have a hash.
        # Return no hash in this case.
        if prev_block_hash == self._none_hash and not is_first_block:
            return None

        self._cached_content_hash = PrefixCachingBlock.hash_block_tokens(
            is_first_block,
            prev_block_hash,
            cur_block_token_ids=self.token_ids,
            extra_hash=self._extra_hash)
        return self._cached_content_hash

    @classmethod
    def hash_block_tokens(cls,
                          is_first_block: bool,
                          prev_block_hash: Optional[int],
                          cur_block_token_ids: List[int],
                          extra_hash: Optional[int] = None) -> int:
        """Computes a hash value corresponding to the contents of a block and
        the contents of the preceding block(s). The hash value is used for
        prefix caching.

        Parameters:
        - is_first_block (bool): A flag indicating if the block is the first in
            the sequence.
        - prev_block_hash (Optional[int]): The hash of the previous block. None
            if this is the first block.
        - cur_block_token_ids (List[int]): A list of token ids in the current
            block. The current block is assumed to be full.
        - extra_hash (Optional[int]): The hash value of additional factors
            such as adapters that influence the block, apart from the token_ids.

        Returns:
        - int: The computed hash value for the block.
        """
        if is_first_block and prev_block_hash is None:
            prev_block_hash = cls._none_hash
        return hash((is_first_block, prev_block_hash, *cur_block_token_ids,
                     extra_hash))

_allocator instance-attribute

_allocator = allocator

_block instance-attribute

_block = NaiveBlock(
    prev_block=prev_block,
    token_ids=token_ids,
    block_size=block_size,
    block_id=block_id,
    allocator=_allocator,
)

_cached_content_hash instance-attribute

_cached_content_hash: Optional[int] = None

_cached_num_tokens_total instance-attribute

_cached_num_tokens_total: int = 0

_computed instance-attribute

_computed = computed

_extra_hash instance-attribute

_extra_hash = extra_hash

_last_accessed instance-attribute

_none_hash class-attribute instance-attribute

_none_hash: int = hash('None')

_prev_block instance-attribute

_prev_block = prev_block

block_id property writable

block_id: Optional[int]

block_size property

block_size: int

computed property writable

computed: bool

content_hash property

content_hash: Optional[int]

Return the content-based hash of the current block, or None if it is not yet defined.

For the content-based hash to be defined, the current block must be full.

extra_hash property

extra_hash: Optional[int]

is_full property

is_full: bool

last_accessed property writable

last_accessed: float

num_empty_slots property

num_empty_slots: int

num_tokens_total property

num_tokens_total: int

prev_block property

prev_block: Optional[Block]

token_ids property

token_ids: List[int]

__init__

__init__(
    prev_block: Optional[Block],
    token_ids: List[int],
    block_size: int,
    allocator: BlockAllocator,
    block_id: Optional[int] = None,
    computed: bool = False,
    extra_hash: Optional[int] = None,
)
Source code in vllm/core/block/prefix_caching_block.py
def __init__(
    self,
    prev_block: Optional[Block],
    token_ids: List[int],
    block_size: int,
    allocator: BlockAllocator,
    block_id: Optional[int] = None,
    computed: bool = False,
    extra_hash: Optional[int] = None,
):
    assert isinstance(allocator, PrefixCachingBlockAllocator), (
        "Currently this class is only tested with "
        "PrefixCachingBlockAllocator. Got instead allocator = {}".format(
            allocator))
    assert_prefix_caching_block_or_none(prev_block)

    self._prev_block = prev_block
    self._cached_content_hash: Optional[int] = None
    self._cached_num_tokens_total: int = 0
    self._allocator = allocator
    self._last_accessed: float = _DEFAULT_LAST_ACCESSED_TIME
    self._computed = computed
    self._extra_hash = extra_hash

    # On the first time, we create the block object, and next we only
    # reinitialize it
    if hasattr(self, "_block"):
        self._block.__init__(  # type: ignore[has-type]
            prev_block=prev_block,
            token_ids=token_ids,
            block_size=block_size,
            block_id=block_id,
            allocator=self._allocator)
    else:
        self._block = NaiveBlock(prev_block=prev_block,
                                 token_ids=token_ids,
                                 block_size=block_size,
                                 block_id=block_id,
                                 allocator=self._allocator)

    self._update_num_tokens_total()

_update_num_tokens_total

_update_num_tokens_total()

Incrementally computes the number of tokens that there is till the current block (included)

Source code in vllm/core/block/prefix_caching_block.py
def _update_num_tokens_total(self):
    """Incrementally computes the number of tokens that there is
    till the current block (included)
    """
    res = 0

    # Add all previous blocks
    if self._prev_block is not None:
        res += self._prev_block.num_tokens_total

    # Add current block
    res += len(self.token_ids)

    self._cached_num_tokens_total = res

append_token_ids

append_token_ids(token_ids: List[int]) -> None

Appends the given token IDs to the block and registers the block as immutable if the block becomes full.

Parameters:

Name Type Description Default
token_ids List[int]

The token IDs to be appended to the block.

required
Source code in vllm/core/block/prefix_caching_block.py
def append_token_ids(self, token_ids: List[int]) -> None:
    """Appends the given token IDs to the block and registers the block as
    immutable if the block becomes full.

    Args:
        token_ids (List[int]): The token IDs to be appended to the block.
    """
    # Ensure this is mutable block (not promoted)
    assert self.content_hash is None
    assert not self.computed

    if len(token_ids) == 0:
        return

    # Ensure there are input tokens
    assert token_ids, "Got token_ids = {}".format(token_ids)

    # Naive block handles CoW.
    self._block.append_token_ids(token_ids)
    self._update_num_tokens_total()

    # If the content hash is present, then the block can be made immutable.
    # Register ourselves with the allocator, potentially replacing the
    # physical block index.
    if self.content_hash is not None:
        self.block_id = self._allocator.promote_to_immutable_block(self)

hash_block_tokens classmethod

hash_block_tokens(
    is_first_block: bool,
    prev_block_hash: Optional[int],
    cur_block_token_ids: List[int],
    extra_hash: Optional[int] = None,
) -> int

Computes a hash value corresponding to the contents of a block and the contents of the preceding block(s). The hash value is used for prefix caching.

  • is_first_block (bool): A flag indicating if the block is the first in the sequence.
  • prev_block_hash (Optional[int]): The hash of the previous block. None if this is the first block.
  • cur_block_token_ids (List[int]): A list of token ids in the current block. The current block is assumed to be full.
  • extra_hash (Optional[int]): The hash value of additional factors such as adapters that influence the block, apart from the token_ids.

Returns: - int: The computed hash value for the block.

Source code in vllm/core/block/prefix_caching_block.py
@classmethod
def hash_block_tokens(cls,
                      is_first_block: bool,
                      prev_block_hash: Optional[int],
                      cur_block_token_ids: List[int],
                      extra_hash: Optional[int] = None) -> int:
    """Computes a hash value corresponding to the contents of a block and
    the contents of the preceding block(s). The hash value is used for
    prefix caching.

    Parameters:
    - is_first_block (bool): A flag indicating if the block is the first in
        the sequence.
    - prev_block_hash (Optional[int]): The hash of the previous block. None
        if this is the first block.
    - cur_block_token_ids (List[int]): A list of token ids in the current
        block. The current block is assumed to be full.
    - extra_hash (Optional[int]): The hash value of additional factors
        such as adapters that influence the block, apart from the token_ids.

    Returns:
    - int: The computed hash value for the block.
    """
    if is_first_block and prev_block_hash is None:
        prev_block_hash = cls._none_hash
    return hash((is_first_block, prev_block_hash, *cur_block_token_ids,
                 extra_hash))

PrefixCachingBlockAllocator

Bases: BlockAllocator

A block allocator that implements prefix caching.

The PrefixCachingBlockAllocator maintains a cache of blocks based on their content hash. It reuses blocks with the same content hash to avoid redundant memory allocation. The allocator also supports copy-on-write operations.

Parameters:

Name Type Description Default
num_blocks int

The total number of blocks to manage.

required
block_size int

The size of each block in tokens.

required
block_ids(Optional[Iterable[int]], optional

An optional iterable of block IDs. If not provided, block IDs will be assigned sequentially from 0 to num_blocks - 1.

required
Source code in vllm/core/block/prefix_caching_block.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
class PrefixCachingBlockAllocator(BlockAllocator):
    """A block allocator that implements prefix caching.

    The PrefixCachingBlockAllocator maintains a cache of blocks based on their
    content hash. It reuses blocks with the same content hash to avoid redundant
    memory allocation. The allocator also supports copy-on-write operations.

    Args:
        num_blocks (int): The total number of blocks to manage.
        block_size (int): The size of each block in tokens.
        block_ids(Optional[Iterable[int]], optional): An optional iterable of
            block IDs. If not provided, block IDs will be assigned sequentially
            from 0 to num_blocks - 1.
    """

    # Note that we use 'None' as a string here instead of None because
    # as of Python 3.12, hash(None) returns a constant predictable value.
    # This could possibly make it easier to find and exploit hash
    # collisions. 'None' as a string will be hashed differently per process,
    # but consistently within the same process. This is the same as the
    # behavior of None prior to Python 3.12.
    _none_hash: int = hash('None')

    # Implements Block.Factory.
    def __init__(
        self,
        num_blocks: int,
        block_size: int,
        block_ids: Optional[Iterable[int]] = None,
        eviction_policy: EvictionPolicy = EvictionPolicy.LRU,
    ):
        if block_ids is None:
            block_ids = range(num_blocks)

        self._block_size = block_size

        # A mapping of prefix hash to block index. All blocks which have a
        # prefix hash will be in this dict, even if they have refcount 0.
        self._cached_blocks: Dict[PrefixHash, BlockId] = {}

        # A list of immutable block IDs that have been touched by scheduler
        # and should be marked as computed after an entire batch of sequences
        # are scheduled.
        self._touched_blocks: Set[BlockId] = set()

        # Used to track status of each physical block id
        self._block_tracker: Dict[BlockId, BlockTracker] = {}
        for block_id in block_ids:
            self._block_tracker[block_id] = BlockTracker()

        # Pre-allocate "num_blocks * extra_factor" block objects.
        # The "* extra_factor" is a buffer to allow more block objects
        # than physical blocks
        extra_factor = 4
        self._block_pool = BlockPool(self._block_size, self._create_block,
                                     self, num_blocks * extra_factor)

        # An allocator for blocks that do not have prefix hashes.
        self._hashless_allocator = NaiveBlockAllocator(
            create_block=self._create_block,  # type: ignore
            num_blocks=num_blocks,
            block_size=block_size,
            block_ids=block_ids,
            block_pool=self._block_pool,  # Share block pool here
        )

        # Evitor used to maintain how we want to handle those computed blocks
        # if we find memory pressure is high.
        self.eviction_policy = eviction_policy
        self.evictor: Evictor = make_evictor(self.eviction_policy)

        # We share the refcounter between allocators. This allows us to promote
        # blocks originally allocated in the hashless allocator to immutable
        # blocks.
        self._refcounter = self._hashless_allocator.refcounter

        self._cow_tracker = CopyOnWriteTracker(
            refcounter=self._refcounter.as_readonly())

        self.metric_data = CacheMetricData()

    def _create_block(
        self,
        prev_block: Optional[Block],
        token_ids: List[int],
        block_size: int,
        allocator: BlockAllocator,
        block_id: Optional[int] = None,
        computed: bool = False,
        extra_hash: Optional[int] = None,
    ) -> Block:
        # Bind block to self.
        allocator = self

        return PrefixCachingBlock(
            prev_block=prev_block,
            token_ids=token_ids,
            block_size=block_size,
            block_id=block_id,
            allocator=allocator,
            computed=computed,
            extra_hash=extra_hash,
        )

    def allocate_immutable_block(self,
                                 prev_block: Optional[Block],
                                 token_ids: List[int],
                                 extra_hash: Optional[int] = None,
                                 device: Optional[Device] = None) -> Block:
        """Allocates an immutable block with the given token IDs, reusing cached
        blocks if possible.

        Args:
            prev_block (Optional[Block]): The previous block in the sequence.
            token_ids (List[int]): The token IDs to be stored in the block.

        Returns:
            Block: The allocated immutable block.
        """
        assert device is None
        assert_prefix_caching_block_or_none(prev_block)

        # First, try to create a block that points to cached data
        block = self._block_pool.init_block(prev_block=prev_block,
                                            token_ids=token_ids,
                                            block_size=self._block_size,
                                            physical_block_id=None,
                                            extra_hash=extra_hash)
        assert block.content_hash is not None

        cached_block_id = self._cached_blocks.get(block.content_hash, None)
        if cached_block_id is not None:
            self.metric_data.query(hit=True)
            block.block_id = cached_block_id
            self._incr_refcount_cached_block(block)
            return block
        self.metric_data.query(hit=False)
        self._block_pool.free_block(block)

        # No cached block => Allocate a new block
        block = self.allocate_mutable_block(prev_block, extra_hash=extra_hash)
        block.append_token_ids(token_ids)
        return block

    def allocate_immutable_blocks(
            self,
            prev_block: Optional[Block],
            block_token_ids: List[List[int]],
            extra_hash: Optional[int] = None,
            device: Optional[Device] = None) -> List[Block]:
        blocks = []
        for token_ids in block_token_ids:
            prev_block = self.allocate_immutable_block(prev_block=prev_block,
                                                       token_ids=token_ids,
                                                       device=device,
                                                       extra_hash=extra_hash)
            blocks.append(prev_block)
        return blocks

    def allocate_mutable_block(self,
                               prev_block: Optional[Block],
                               extra_hash: Optional[int] = None,
                               device: Optional[Device] = None) -> Block:
        """Allocates a mutable block. If there are no free blocks, this will
        evict unused cached blocks.

        Args:
            prev_block (Block): The previous block in the sequence.
                None is not allowed unlike it is super class.

        Returns:
            Block: The allocated mutable block.
        """
        assert device is None
        assert_prefix_caching_block_or_none(prev_block)

        block_id = self._allocate_block_id()
        block = self._block_pool.init_block(prev_block=prev_block,
                                            token_ids=[],
                                            block_size=self._block_size,
                                            physical_block_id=block_id,
                                            extra_hash=extra_hash)
        assert not block.computed
        assert block.content_hash is None
        return block

    def _incr_refcount_cached_block(self, block: Block) -> None:
        # Set this block to be "computed" since it is pointing to a
        # cached block id (which was already computed)
        block.computed = True

        block_id = block.block_id
        assert block_id is not None

        refcount = self._refcounter.incr(block_id)
        if refcount == 1:
            # In case a cached block was evicted, restore its tracking
            if block_id in self.evictor:
                self.evictor.remove(block_id)

            self._track_block_id(block_id, computed=True)

    def _decr_refcount_cached_block(self, block: Block) -> None:
        # Ensure this is immutable/cached block
        assert block.content_hash is not None

        block_id = block.block_id
        assert block_id is not None

        refcount = self._refcounter.decr(block_id)
        if refcount > 0:
            block.block_id = None
            return
        else:
            assert refcount == 0

        # No longer used
        assert block.content_hash in self._cached_blocks

        # Add the cached block to the evictor
        # (This keeps the cached block around so it can be reused)
        self.evictor.add(block_id, block.content_hash, block.num_tokens_total,
                         self._block_tracker[block_id].last_accessed)

        # Stop tracking the block
        self._untrack_block_id(block_id)

        block.block_id = None

    def _decr_refcount_hashless_block(self, block: Block) -> None:
        block_id = block.block_id
        assert block_id is not None

        # We may have a fork case where block is shared,
        # in which case, we cannot remove it from tracking
        refcount = self._refcounter.get(block_id)
        if refcount == 1:
            self._untrack_block_id(block_id)

        # Decrement refcount of the block_id, but do not free the block object
        # itself (will be handled by the caller)
        self._hashless_allocator.free(block, keep_block_object=True)

    def _allocate_block_id(self) -> BlockId:
        """First tries to allocate a block id from the hashless allocator,
        and if there are no blocks, then tries to evict an unused cached block.
        """
        hashless_block_id = self._maybe_allocate_hashless_block_id()
        if hashless_block_id is not None:
            return hashless_block_id

        evicted_block_id = self._maybe_allocate_evicted_block_id()
        if evicted_block_id is not None:
            return evicted_block_id

        # No block available in hashless allocator, nor in unused cache blocks.
        raise BlockAllocator.NoFreeBlocksError()

    def _maybe_allocate_hashless_block_id(self) -> Optional[BlockId]:
        try:
            # Allocate mutable block and extract its block_id
            block = self._hashless_allocator.allocate_mutable_block(
                prev_block=None)
            block_id = block.block_id
            self._block_pool.free_block(block)

            self._track_block_id(block_id, computed=False)
            return block_id
        except BlockAllocator.NoFreeBlocksError:
            return None

    def _maybe_allocate_evicted_block_id(self) -> Optional[BlockId]:
        if self.evictor.num_blocks == 0:
            return None

        # Here we get an evicted block, which is only added
        # into evictor if its ref counter is 0
        # and since its content would be changed, we need
        # to remove it from _cached_blocks's tracking list
        block_id, content_hash_to_evict = self.evictor.evict()

        # Sanity checks
        assert content_hash_to_evict in self._cached_blocks
        _block_id = self._cached_blocks[content_hash_to_evict]
        assert self._refcounter.get(_block_id) == 0
        assert _block_id == block_id

        self._cached_blocks.pop(content_hash_to_evict)

        self._refcounter.incr(block_id)
        self._track_block_id(block_id, computed=False)

        return block_id

    def _free_block_id(self, block: Block) -> None:
        """Decrements the refcount of the block. The block may be in two 
        possible states: (1) immutable/cached or (2) mutable/hashless. 
        In the first case, the refcount is decremented directly and the block
        may be possibly added to the evictor. In other case, hashless 
        allocator free(..) with keep_block_object=True is called to only free
        the block id (since the block object may be reused by the caller)
        """
        block_id = block.block_id
        assert block_id is not None, "Freeing unallocated block is undefined"

        if block.content_hash is not None:
            # Immutable: This type of block is always cached, and we want to
            # keep it in the evictor for future reuse
            self._decr_refcount_cached_block(block)
        else:
            # Mutable: This type of block is not cached, so we release it
            # directly to the hashless allocator
            self._decr_refcount_hashless_block(block)

        assert block.block_id is None

    def free(self, block: Block, keep_block_object: bool = False) -> None:
        """Release the block (look at free_block_id(..) docs)
        """
        # Release the physical block index
        self._free_block_id(block)

        # Release the block object to the pool
        if not keep_block_object:
            self._block_pool.free_block(block)

    def fork(self, last_block: Block) -> List[Block]:
        """Creates a new sequence of blocks that shares the same underlying
        memory as the original sequence.

        Args:
            last_block (Block): The last block in the original sequence.

        Returns:
            List[Block]: The new sequence of blocks that shares the same memory
                as the original sequence.
        """
        source_blocks = get_all_blocks_recursively(last_block)

        forked_blocks: List[Block] = []
        prev_block = None
        for block in source_blocks:
            block_id = block.block_id
            assert block_id is not None

            refcount = self._refcounter.incr(block_id)
            assert refcount != 1, "can't fork free'd block_id = {}".format(
                block_id)

            forked_block = self._block_pool.init_block(
                prev_block=prev_block,
                token_ids=block.token_ids,
                block_size=self._block_size,
                physical_block_id=block_id,
                extra_hash=block.extra_hash)

            forked_blocks.append(forked_block)
            prev_block = forked_blocks[-1]

        return forked_blocks

    def get_num_free_blocks(self, device: Optional[Device] = None) -> int:
        assert device is None
        # The number of free blocks is the number of hashless free blocks
        # plus the number of blocks evictor could free from its list.
        return self._hashless_allocator.get_num_free_blocks(
        ) + self.evictor.num_blocks

    def get_num_total_blocks(self) -> int:
        return self._hashless_allocator.get_num_total_blocks()

    def get_physical_block_id(self, absolute_id: int) -> int:
        """Returns the zero-offset block id on certain block allocator
        given the absolute block id.

        Args:
            absolute_id (int): The absolute block id for the block 
                in whole allocator.

        Returns:
            int: The rzero-offset block id on certain device.
        """
        return sorted(self.all_block_ids).index(absolute_id)

    @property
    def all_block_ids(self) -> FrozenSet[int]:
        return self._hashless_allocator.all_block_ids

    def get_prefix_cache_hit_rate(self) -> float:
        return self.metric_data.get_hit_rate()

    def reset_prefix_cache(self) -> bool:
        """Reset prefix cache. This function may be used in RLHF
        flows to invalid prefix caching after the weights are updated,
        or used for resetting prefix caching status for benchmarking.

        Returns:
            bool: True if the prefix cache is successfully reset,
            False otherwise.
        """
        num_used_blocks = (self.get_num_total_blocks() -
                           self.get_num_free_blocks())
        if num_used_blocks > 0:
            logger.warning(
                "Failed to reset prefix cache because some "
                "blocks (%d) are not freed yet", num_used_blocks)
            return False

        # Free all blocks in the evictor.
        while (block_id :=
               self._maybe_allocate_evicted_block_id()) is not None:
            self._hashless_allocator.free_block_id(block_id)

        # Should not have any cached blocks because all blocks are evicted.
        assert not self._cached_blocks

        # Reset the evictor.
        self.evictor = make_evictor(self.eviction_policy)

        # Reset the block tracker.
        for block_id in self._block_tracker:
            self._block_tracker[block_id] = BlockTracker()

        # Reset the metrics.
        self.metric_data = CacheMetricData()

        logger.info("Successfully reset prefix cache")
        return True

    def is_block_cached(self, block: Block) -> bool:
        assert block.content_hash is not None
        return block.content_hash in self._cached_blocks

    def promote_to_immutable_block(self, block: Block) -> BlockId:
        """Once a mutable block is full, it can be promoted to an immutable
        block. This means that its content can be referenced by future blocks
        having the same prefix.

        Note that if we already have a cached block with the same content, we
        will replace the newly-promoted block's mapping with the existing cached
        block id.

        Args:
            block: The mutable block to be promoted.

        Returns:
            BlockId: Either the original block index, or the block index of
                the previously cached block matching the same content.
        """
        # Ensure block can be promoted
        assert block.content_hash is not None
        assert block.block_id is not None
        assert self._refcounter.get(block.block_id) > 0

        if block.content_hash not in self._cached_blocks:
            # No cached content hash => Set this block as cached.
            # Note that this block cannot be marked as computed yet
            # because other sequences in the same batch cannot reuse
            # this block.
            self._cached_blocks[block.content_hash] = block.block_id
            # Mark this block as touched so that it can be marked as
            # computed after the entire batch of sequences are scheduled.
            self._touched_blocks.add(block.block_id)
            return block.block_id

        # Reuse the cached content hash
        self._decr_refcount_hashless_block(block)
        block.block_id = self._cached_blocks[block.content_hash]

        # Increment refcount of the cached block and (possibly) restore
        # it from the evictor.
        # Note that in this case, the block is marked as computed
        self._incr_refcount_cached_block(block)

        return block.block_id

    def cow_block_if_not_appendable(self, block: Block) -> BlockId:
        """Performs a copy-on-write operation on the given block if it is not
        appendable.

        Args:
            block (Block): The block to check for copy-on-write.

        Returns:
            BlockId: The block index of the new block if a copy-on-write 
                operation was performed, or the original block index if
                no copy-on-write was necessary.
        """
        src_block_id = block.block_id
        assert src_block_id is not None

        if self._cow_tracker.is_appendable(block):
            return src_block_id

        self._free_block_id(block)
        trg_block_id = self._allocate_block_id()

        self._cow_tracker.record_cow(src_block_id, trg_block_id)

        return trg_block_id

    def clear_copy_on_writes(self) -> List[Tuple[BlockId, BlockId]]:
        """Returns the copy-on-write source->destination mapping and clears it.

        Returns:
            List[Tuple[BlockId, BlockId]]: A list mapping source
                block indices to destination block indices.
        """
        return self._cow_tracker.clear_cows()

    def mark_blocks_as_accessed(self, block_ids: List[int],
                                now: float) -> None:
        """Mark blocks as accessed, used in prefix caching.

        If the block is added into evictor, we need to update corresponding
        info in evictor's metadata.
        """

        for block_id in block_ids:
            if self._block_tracker[block_id].active:
                self._block_tracker[block_id].last_accessed = now
            elif block_id in self.evictor:
                self.evictor.update(block_id, now)
            else:
                raise ValueError(
                    "Mark block as accessed which is not belonged to GPU")

    def mark_blocks_as_computed(self, block_ids: List[int]) -> None:
        # Mark all touched blocks as computed.
        for block_id in self._touched_blocks:
            self._block_tracker[block_id].computed = True
        self._touched_blocks.clear()

    def _track_block_id(self, block_id: Optional[BlockId],
                        computed: bool) -> None:
        assert block_id is not None
        self._block_tracker[block_id].enable()
        self._block_tracker[block_id].computed = computed

    def _untrack_block_id(self, block_id: Optional[BlockId]) -> None:
        assert block_id is not None
        self._block_tracker[block_id].disable()

    def block_is_computed(self, block_id: int) -> bool:
        if self._block_tracker[block_id].active:
            return self._block_tracker[block_id].computed
        else:
            return block_id in self.evictor

    def get_common_computed_block_ids(
            self, computed_seq_block_ids: List[List[int]]) -> List[int]:
        """Return the block ids that are common for a given sequence group.

        Only those blocks that are immutable and already be marked
        compyted would be taken consideration.
        """

        # NOTE We exclude the last block to avoid the case where the entire
        # prompt is cached. This would cause erroneous behavior in model
        # runner.

        # It returns a list of int although type annotation says list of string.
        if len(computed_seq_block_ids) == 1:
            return computed_seq_block_ids[0]

        return commonprefix([
            ids for ids in computed_seq_block_ids  # type: ignore
            if ids
        ])

    def get_num_full_blocks_touched(self, blocks: List[Block]) -> int:
        """Returns the number of full blocks that will be touched by
        swapping in/out.

        Args:
            blocks: List of blocks to be swapped.
        Returns:
            int: the number of full blocks that will be touched by
                swapping in/out the given blocks. Non full blocks are ignored
                when deciding the number of blocks to touch.
        """
        num_touched_blocks: int = 0
        for block in blocks:
            # If the block has a match in the cache and the cached
            # block is not referenced, then we still count it as a
            # touched block
            if block.is_full and (not self.is_block_cached(block) or \
                (block.content_hash is not None and \
                self._cached_blocks[block.content_hash] in \
                        self.evictor)):
                num_touched_blocks += 1
        return num_touched_blocks

    def swap_out(self, blocks: List[Block]) -> None:
        """Execute the swap out actions. Basically just free the 
        given blocks.

        Args:
            blocks: List of blocks to be swapped out.
        """
        for block in blocks:
            self._free_block_id(block)

    def swap_in(self, blocks: List[Block]) -> None:
        """Execute the swap in actions. Change the block id from 
        old allocator to current allocator for each block to finish 
        the block table update. 

        Args:
            blocks: List of blocks to be swapped in.
        """
        for block in blocks:
            # Here we allocate either immutable or mutable block and then
            # extract its block_id. Note that the block object is released
            # and the block_id is assigned to "block" to allow reusing the
            # existing "block" object
            if block.is_full:
                tmp_block = self.allocate_immutable_block(
                    prev_block=block.prev_block,
                    token_ids=block.token_ids,
                    extra_hash=block.extra_hash)
            else:
                tmp_block = self.allocate_mutable_block(
                    prev_block=block.prev_block, extra_hash=block.extra_hash)
                tmp_block.append_token_ids(block.token_ids)

            block_id = tmp_block.block_id
            self._block_pool.free_block(tmp_block)

            block.block_id = block_id  # Assign block_id

    def find_cached_blocks_prefix(self, block_hashes: List[int]) -> List[int]:
        """
        Given a list of block hashes, return the prefix of the block hashes that
        are all cached.

        Since a block's block hash includes the hashes of all previous blocks,
        and we only allocate/deallocate blocks in the entire sequence, so if a
        block is cached, then all previous blocks are also cached. With this
        property, we can use binary search to find the prefix of cached blocks.

        Args:
            block_hashes (List[int]): The list of block hashes.

        Returns:
            List[int]: The prefix of the `block_hashes` that are cached.
        """

        def _block_is_cached(block_hash: PrefixHash) -> bool:
            if block_hash not in self._cached_blocks:
                return False

            cached_block_id = self._cached_blocks[block_hash]
            # We only consider the blocks that are marked as computed.
            return self.block_is_computed(cached_block_id)

        def _bisect_left(a, x, key: Callable[[PrefixHash], bool]) -> int:

            # python <= 3.10 don't have the key argument
            if sys.version_info < (3, 10):
                a = [key(e) for e in a]
                return bisect_left(a, x)
            else:
                return bisect_left(a, x, key=key)

        # Look for the first block that's not cached, and returns the prefix
        # i.e. blocks that are cached.
        idx = _bisect_left(block_hashes,
                           True,
                           key=lambda x: not _block_is_cached(x))
        return block_hashes[:idx]

_block_pool instance-attribute

_block_pool = BlockPool(
    _block_size,
    _create_block,
    self,
    num_blocks * extra_factor,
)

_block_size instance-attribute

_block_size = block_size

_block_tracker instance-attribute

_block_tracker: Dict[BlockId, BlockTracker] = {}

_cached_blocks instance-attribute

_cached_blocks: Dict[PrefixHash, BlockId] = {}

_cow_tracker instance-attribute

_cow_tracker = CopyOnWriteTracker(refcounter=as_readonly())

_hashless_allocator instance-attribute

_hashless_allocator = NaiveBlockAllocator(
    create_block=_create_block,
    num_blocks=num_blocks,
    block_size=block_size,
    block_ids=block_ids,
    block_pool=_block_pool,
)

_none_hash class-attribute instance-attribute

_none_hash: int = hash('None')

_refcounter instance-attribute

_refcounter = refcounter

_touched_blocks instance-attribute

_touched_blocks: Set[BlockId] = set()

all_block_ids property

all_block_ids: FrozenSet[int]

eviction_policy instance-attribute

eviction_policy = eviction_policy

evictor instance-attribute

evictor: Evictor = make_evictor(eviction_policy)

metric_data instance-attribute

metric_data = CacheMetricData()

__init__

__init__(
    num_blocks: int,
    block_size: int,
    block_ids: Optional[Iterable[int]] = None,
    eviction_policy: EvictionPolicy = LRU,
)
Source code in vllm/core/block/prefix_caching_block.py
def __init__(
    self,
    num_blocks: int,
    block_size: int,
    block_ids: Optional[Iterable[int]] = None,
    eviction_policy: EvictionPolicy = EvictionPolicy.LRU,
):
    if block_ids is None:
        block_ids = range(num_blocks)

    self._block_size = block_size

    # A mapping of prefix hash to block index. All blocks which have a
    # prefix hash will be in this dict, even if they have refcount 0.
    self._cached_blocks: Dict[PrefixHash, BlockId] = {}

    # A list of immutable block IDs that have been touched by scheduler
    # and should be marked as computed after an entire batch of sequences
    # are scheduled.
    self._touched_blocks: Set[BlockId] = set()

    # Used to track status of each physical block id
    self._block_tracker: Dict[BlockId, BlockTracker] = {}
    for block_id in block_ids:
        self._block_tracker[block_id] = BlockTracker()

    # Pre-allocate "num_blocks * extra_factor" block objects.
    # The "* extra_factor" is a buffer to allow more block objects
    # than physical blocks
    extra_factor = 4
    self._block_pool = BlockPool(self._block_size, self._create_block,
                                 self, num_blocks * extra_factor)

    # An allocator for blocks that do not have prefix hashes.
    self._hashless_allocator = NaiveBlockAllocator(
        create_block=self._create_block,  # type: ignore
        num_blocks=num_blocks,
        block_size=block_size,
        block_ids=block_ids,
        block_pool=self._block_pool,  # Share block pool here
    )

    # Evitor used to maintain how we want to handle those computed blocks
    # if we find memory pressure is high.
    self.eviction_policy = eviction_policy
    self.evictor: Evictor = make_evictor(self.eviction_policy)

    # We share the refcounter between allocators. This allows us to promote
    # blocks originally allocated in the hashless allocator to immutable
    # blocks.
    self._refcounter = self._hashless_allocator.refcounter

    self._cow_tracker = CopyOnWriteTracker(
        refcounter=self._refcounter.as_readonly())

    self.metric_data = CacheMetricData()

_allocate_block_id

_allocate_block_id() -> BlockId

First tries to allocate a block id from the hashless allocator, and if there are no blocks, then tries to evict an unused cached block.

Source code in vllm/core/block/prefix_caching_block.py
def _allocate_block_id(self) -> BlockId:
    """First tries to allocate a block id from the hashless allocator,
    and if there are no blocks, then tries to evict an unused cached block.
    """
    hashless_block_id = self._maybe_allocate_hashless_block_id()
    if hashless_block_id is not None:
        return hashless_block_id

    evicted_block_id = self._maybe_allocate_evicted_block_id()
    if evicted_block_id is not None:
        return evicted_block_id

    # No block available in hashless allocator, nor in unused cache blocks.
    raise BlockAllocator.NoFreeBlocksError()

_create_block

_create_block(
    prev_block: Optional[Block],
    token_ids: List[int],
    block_size: int,
    allocator: BlockAllocator,
    block_id: Optional[int] = None,
    computed: bool = False,
    extra_hash: Optional[int] = None,
) -> Block
Source code in vllm/core/block/prefix_caching_block.py
def _create_block(
    self,
    prev_block: Optional[Block],
    token_ids: List[int],
    block_size: int,
    allocator: BlockAllocator,
    block_id: Optional[int] = None,
    computed: bool = False,
    extra_hash: Optional[int] = None,
) -> Block:
    # Bind block to self.
    allocator = self

    return PrefixCachingBlock(
        prev_block=prev_block,
        token_ids=token_ids,
        block_size=block_size,
        block_id=block_id,
        allocator=allocator,
        computed=computed,
        extra_hash=extra_hash,
    )

_decr_refcount_cached_block

_decr_refcount_cached_block(block: Block) -> None
Source code in vllm/core/block/prefix_caching_block.py
def _decr_refcount_cached_block(self, block: Block) -> None:
    # Ensure this is immutable/cached block
    assert block.content_hash is not None

    block_id = block.block_id
    assert block_id is not None

    refcount = self._refcounter.decr(block_id)
    if refcount > 0:
        block.block_id = None
        return
    else:
        assert refcount == 0

    # No longer used
    assert block.content_hash in self._cached_blocks

    # Add the cached block to the evictor
    # (This keeps the cached block around so it can be reused)
    self.evictor.add(block_id, block.content_hash, block.num_tokens_total,
                     self._block_tracker[block_id].last_accessed)

    # Stop tracking the block
    self._untrack_block_id(block_id)

    block.block_id = None

_decr_refcount_hashless_block

_decr_refcount_hashless_block(block: Block) -> None
Source code in vllm/core/block/prefix_caching_block.py
def _decr_refcount_hashless_block(self, block: Block) -> None:
    block_id = block.block_id
    assert block_id is not None

    # We may have a fork case where block is shared,
    # in which case, we cannot remove it from tracking
    refcount = self._refcounter.get(block_id)
    if refcount == 1:
        self._untrack_block_id(block_id)

    # Decrement refcount of the block_id, but do not free the block object
    # itself (will be handled by the caller)
    self._hashless_allocator.free(block, keep_block_object=True)

_free_block_id

_free_block_id(block: Block) -> None

Decrements the refcount of the block. The block may be in two possible states: (1) immutable/cached or (2) mutable/hashless. In the first case, the refcount is decremented directly and the block may be possibly added to the evictor. In other case, hashless allocator free(..) with keep_block_object=True is called to only free the block id (since the block object may be reused by the caller)

Source code in vllm/core/block/prefix_caching_block.py
def _free_block_id(self, block: Block) -> None:
    """Decrements the refcount of the block. The block may be in two 
    possible states: (1) immutable/cached or (2) mutable/hashless. 
    In the first case, the refcount is decremented directly and the block
    may be possibly added to the evictor. In other case, hashless 
    allocator free(..) with keep_block_object=True is called to only free
    the block id (since the block object may be reused by the caller)
    """
    block_id = block.block_id
    assert block_id is not None, "Freeing unallocated block is undefined"

    if block.content_hash is not None:
        # Immutable: This type of block is always cached, and we want to
        # keep it in the evictor for future reuse
        self._decr_refcount_cached_block(block)
    else:
        # Mutable: This type of block is not cached, so we release it
        # directly to the hashless allocator
        self._decr_refcount_hashless_block(block)

    assert block.block_id is None

_incr_refcount_cached_block

_incr_refcount_cached_block(block: Block) -> None
Source code in vllm/core/block/prefix_caching_block.py
def _incr_refcount_cached_block(self, block: Block) -> None:
    # Set this block to be "computed" since it is pointing to a
    # cached block id (which was already computed)
    block.computed = True

    block_id = block.block_id
    assert block_id is not None

    refcount = self._refcounter.incr(block_id)
    if refcount == 1:
        # In case a cached block was evicted, restore its tracking
        if block_id in self.evictor:
            self.evictor.remove(block_id)

        self._track_block_id(block_id, computed=True)

_maybe_allocate_evicted_block_id

_maybe_allocate_evicted_block_id() -> Optional[BlockId]
Source code in vllm/core/block/prefix_caching_block.py
def _maybe_allocate_evicted_block_id(self) -> Optional[BlockId]:
    if self.evictor.num_blocks == 0:
        return None

    # Here we get an evicted block, which is only added
    # into evictor if its ref counter is 0
    # and since its content would be changed, we need
    # to remove it from _cached_blocks's tracking list
    block_id, content_hash_to_evict = self.evictor.evict()

    # Sanity checks
    assert content_hash_to_evict in self._cached_blocks
    _block_id = self._cached_blocks[content_hash_to_evict]
    assert self._refcounter.get(_block_id) == 0
    assert _block_id == block_id

    self._cached_blocks.pop(content_hash_to_evict)

    self._refcounter.incr(block_id)
    self._track_block_id(block_id, computed=False)

    return block_id

_maybe_allocate_hashless_block_id

_maybe_allocate_hashless_block_id() -> Optional[BlockId]
Source code in vllm/core/block/prefix_caching_block.py
def _maybe_allocate_hashless_block_id(self) -> Optional[BlockId]:
    try:
        # Allocate mutable block and extract its block_id
        block = self._hashless_allocator.allocate_mutable_block(
            prev_block=None)
        block_id = block.block_id
        self._block_pool.free_block(block)

        self._track_block_id(block_id, computed=False)
        return block_id
    except BlockAllocator.NoFreeBlocksError:
        return None

_track_block_id

_track_block_id(
    block_id: Optional[BlockId], computed: bool
) -> None
Source code in vllm/core/block/prefix_caching_block.py
def _track_block_id(self, block_id: Optional[BlockId],
                    computed: bool) -> None:
    assert block_id is not None
    self._block_tracker[block_id].enable()
    self._block_tracker[block_id].computed = computed

_untrack_block_id

_untrack_block_id(block_id: Optional[BlockId]) -> None
Source code in vllm/core/block/prefix_caching_block.py
def _untrack_block_id(self, block_id: Optional[BlockId]) -> None:
    assert block_id is not None
    self._block_tracker[block_id].disable()

allocate_immutable_block

allocate_immutable_block(
    prev_block: Optional[Block],
    token_ids: List[int],
    extra_hash: Optional[int] = None,
    device: Optional[Device] = None,
) -> Block

Allocates an immutable block with the given token IDs, reusing cached blocks if possible.

Parameters:

Name Type Description Default
prev_block Optional[Block]

The previous block in the sequence.

required
token_ids List[int]

The token IDs to be stored in the block.

required

Returns:

Name Type Description
Block Block

The allocated immutable block.

Source code in vllm/core/block/prefix_caching_block.py
def allocate_immutable_block(self,
                             prev_block: Optional[Block],
                             token_ids: List[int],
                             extra_hash: Optional[int] = None,
                             device: Optional[Device] = None) -> Block:
    """Allocates an immutable block with the given token IDs, reusing cached
    blocks if possible.

    Args:
        prev_block (Optional[Block]): The previous block in the sequence.
        token_ids (List[int]): The token IDs to be stored in the block.

    Returns:
        Block: The allocated immutable block.
    """
    assert device is None
    assert_prefix_caching_block_or_none(prev_block)

    # First, try to create a block that points to cached data
    block = self._block_pool.init_block(prev_block=prev_block,
                                        token_ids=token_ids,
                                        block_size=self._block_size,
                                        physical_block_id=None,
                                        extra_hash=extra_hash)
    assert block.content_hash is not None

    cached_block_id = self._cached_blocks.get(block.content_hash, None)
    if cached_block_id is not None:
        self.metric_data.query(hit=True)
        block.block_id = cached_block_id
        self._incr_refcount_cached_block(block)
        return block
    self.metric_data.query(hit=False)
    self._block_pool.free_block(block)

    # No cached block => Allocate a new block
    block = self.allocate_mutable_block(prev_block, extra_hash=extra_hash)
    block.append_token_ids(token_ids)
    return block

allocate_immutable_blocks

allocate_immutable_blocks(
    prev_block: Optional[Block],
    block_token_ids: List[List[int]],
    extra_hash: Optional[int] = None,
    device: Optional[Device] = None,
) -> List[Block]
Source code in vllm/core/block/prefix_caching_block.py
def allocate_immutable_blocks(
        self,
        prev_block: Optional[Block],
        block_token_ids: List[List[int]],
        extra_hash: Optional[int] = None,
        device: Optional[Device] = None) -> List[Block]:
    blocks = []
    for token_ids in block_token_ids:
        prev_block = self.allocate_immutable_block(prev_block=prev_block,
                                                   token_ids=token_ids,
                                                   device=device,
                                                   extra_hash=extra_hash)
        blocks.append(prev_block)
    return blocks

allocate_mutable_block

allocate_mutable_block(
    prev_block: Optional[Block],
    extra_hash: Optional[int] = None,
    device: Optional[Device] = None,
) -> Block

Allocates a mutable block. If there are no free blocks, this will evict unused cached blocks.

Parameters:

Name Type Description Default
prev_block Block

The previous block in the sequence. None is not allowed unlike it is super class.

required

Returns:

Name Type Description
Block Block

The allocated mutable block.

Source code in vllm/core/block/prefix_caching_block.py
def allocate_mutable_block(self,
                           prev_block: Optional[Block],
                           extra_hash: Optional[int] = None,
                           device: Optional[Device] = None) -> Block:
    """Allocates a mutable block. If there are no free blocks, this will
    evict unused cached blocks.

    Args:
        prev_block (Block): The previous block in the sequence.
            None is not allowed unlike it is super class.

    Returns:
        Block: The allocated mutable block.
    """
    assert device is None
    assert_prefix_caching_block_or_none(prev_block)

    block_id = self._allocate_block_id()
    block = self._block_pool.init_block(prev_block=prev_block,
                                        token_ids=[],
                                        block_size=self._block_size,
                                        physical_block_id=block_id,
                                        extra_hash=extra_hash)
    assert not block.computed
    assert block.content_hash is None
    return block

block_is_computed

block_is_computed(block_id: int) -> bool
Source code in vllm/core/block/prefix_caching_block.py
def block_is_computed(self, block_id: int) -> bool:
    if self._block_tracker[block_id].active:
        return self._block_tracker[block_id].computed
    else:
        return block_id in self.evictor

clear_copy_on_writes

clear_copy_on_writes() -> List[Tuple[BlockId, BlockId]]

Returns the copy-on-write source->destination mapping and clears it.

Returns:

Type Description
List[Tuple[BlockId, BlockId]]

List[Tuple[BlockId, BlockId]]: A list mapping source block indices to destination block indices.

Source code in vllm/core/block/prefix_caching_block.py
def clear_copy_on_writes(self) -> List[Tuple[BlockId, BlockId]]:
    """Returns the copy-on-write source->destination mapping and clears it.

    Returns:
        List[Tuple[BlockId, BlockId]]: A list mapping source
            block indices to destination block indices.
    """
    return self._cow_tracker.clear_cows()

cow_block_if_not_appendable

cow_block_if_not_appendable(block: Block) -> BlockId

Performs a copy-on-write operation on the given block if it is not appendable.

Parameters:

Name Type Description Default
block Block

The block to check for copy-on-write.

required

Returns:

Name Type Description
BlockId BlockId

The block index of the new block if a copy-on-write operation was performed, or the original block index if no copy-on-write was necessary.

Source code in vllm/core/block/prefix_caching_block.py
def cow_block_if_not_appendable(self, block: Block) -> BlockId:
    """Performs a copy-on-write operation on the given block if it is not
    appendable.

    Args:
        block (Block): The block to check for copy-on-write.

    Returns:
        BlockId: The block index of the new block if a copy-on-write 
            operation was performed, or the original block index if
            no copy-on-write was necessary.
    """
    src_block_id = block.block_id
    assert src_block_id is not None

    if self._cow_tracker.is_appendable(block):
        return src_block_id

    self._free_block_id(block)
    trg_block_id = self._allocate_block_id()

    self._cow_tracker.record_cow(src_block_id, trg_block_id)

    return trg_block_id

find_cached_blocks_prefix

find_cached_blocks_prefix(
    block_hashes: List[int],
) -> List[int]

Given a list of block hashes, return the prefix of the block hashes that are all cached.

Since a block's block hash includes the hashes of all previous blocks, and we only allocate/deallocate blocks in the entire sequence, so if a block is cached, then all previous blocks are also cached. With this property, we can use binary search to find the prefix of cached blocks.

Parameters:

Name Type Description Default
block_hashes List[int]

The list of block hashes.

required

Returns:

Type Description
List[int]

List[int]: The prefix of the block_hashes that are cached.

Source code in vllm/core/block/prefix_caching_block.py
def find_cached_blocks_prefix(self, block_hashes: List[int]) -> List[int]:
    """
    Given a list of block hashes, return the prefix of the block hashes that
    are all cached.

    Since a block's block hash includes the hashes of all previous blocks,
    and we only allocate/deallocate blocks in the entire sequence, so if a
    block is cached, then all previous blocks are also cached. With this
    property, we can use binary search to find the prefix of cached blocks.

    Args:
        block_hashes (List[int]): The list of block hashes.

    Returns:
        List[int]: The prefix of the `block_hashes` that are cached.
    """

    def _block_is_cached(block_hash: PrefixHash) -> bool:
        if block_hash not in self._cached_blocks:
            return False

        cached_block_id = self._cached_blocks[block_hash]
        # We only consider the blocks that are marked as computed.
        return self.block_is_computed(cached_block_id)

    def _bisect_left(a, x, key: Callable[[PrefixHash], bool]) -> int:

        # python <= 3.10 don't have the key argument
        if sys.version_info < (3, 10):
            a = [key(e) for e in a]
            return bisect_left(a, x)
        else:
            return bisect_left(a, x, key=key)

    # Look for the first block that's not cached, and returns the prefix
    # i.e. blocks that are cached.
    idx = _bisect_left(block_hashes,
                       True,
                       key=lambda x: not _block_is_cached(x))
    return block_hashes[:idx]

fork

fork(last_block: Block) -> List[Block]

Creates a new sequence of blocks that shares the same underlying memory as the original sequence.

Parameters:

Name Type Description Default
last_block Block

The last block in the original sequence.

required

Returns:

Type Description
List[Block]

List[Block]: The new sequence of blocks that shares the same memory as the original sequence.

Source code in vllm/core/block/prefix_caching_block.py
def fork(self, last_block: Block) -> List[Block]:
    """Creates a new sequence of blocks that shares the same underlying
    memory as the original sequence.

    Args:
        last_block (Block): The last block in the original sequence.

    Returns:
        List[Block]: The new sequence of blocks that shares the same memory
            as the original sequence.
    """
    source_blocks = get_all_blocks_recursively(last_block)

    forked_blocks: List[Block] = []
    prev_block = None
    for block in source_blocks:
        block_id = block.block_id
        assert block_id is not None

        refcount = self._refcounter.incr(block_id)
        assert refcount != 1, "can't fork free'd block_id = {}".format(
            block_id)

        forked_block = self._block_pool.init_block(
            prev_block=prev_block,
            token_ids=block.token_ids,
            block_size=self._block_size,
            physical_block_id=block_id,
            extra_hash=block.extra_hash)

        forked_blocks.append(forked_block)
        prev_block = forked_blocks[-1]

    return forked_blocks

free

free(block: Block, keep_block_object: bool = False) -> None

Release the block (look at free_block_id(..) docs)

Source code in vllm/core/block/prefix_caching_block.py
def free(self, block: Block, keep_block_object: bool = False) -> None:
    """Release the block (look at free_block_id(..) docs)
    """
    # Release the physical block index
    self._free_block_id(block)

    # Release the block object to the pool
    if not keep_block_object:
        self._block_pool.free_block(block)

get_common_computed_block_ids

get_common_computed_block_ids(
    computed_seq_block_ids: List[List[int]],
) -> List[int]

Return the block ids that are common for a given sequence group.

Only those blocks that are immutable and already be marked compyted would be taken consideration.

Source code in vllm/core/block/prefix_caching_block.py
def get_common_computed_block_ids(
        self, computed_seq_block_ids: List[List[int]]) -> List[int]:
    """Return the block ids that are common for a given sequence group.

    Only those blocks that are immutable and already be marked
    compyted would be taken consideration.
    """

    # NOTE We exclude the last block to avoid the case where the entire
    # prompt is cached. This would cause erroneous behavior in model
    # runner.

    # It returns a list of int although type annotation says list of string.
    if len(computed_seq_block_ids) == 1:
        return computed_seq_block_ids[0]

    return commonprefix([
        ids for ids in computed_seq_block_ids  # type: ignore
        if ids
    ])

get_num_free_blocks

get_num_free_blocks(device: Optional[Device] = None) -> int
Source code in vllm/core/block/prefix_caching_block.py
def get_num_free_blocks(self, device: Optional[Device] = None) -> int:
    assert device is None
    # The number of free blocks is the number of hashless free blocks
    # plus the number of blocks evictor could free from its list.
    return self._hashless_allocator.get_num_free_blocks(
    ) + self.evictor.num_blocks

get_num_full_blocks_touched

get_num_full_blocks_touched(blocks: List[Block]) -> int

Returns the number of full blocks that will be touched by swapping in/out.

Parameters:

Name Type Description Default
blocks List[Block]

List of blocks to be swapped.

required

Returns: int: the number of full blocks that will be touched by swapping in/out the given blocks. Non full blocks are ignored when deciding the number of blocks to touch.

Source code in vllm/core/block/prefix_caching_block.py
def get_num_full_blocks_touched(self, blocks: List[Block]) -> int:
    """Returns the number of full blocks that will be touched by
    swapping in/out.

    Args:
        blocks: List of blocks to be swapped.
    Returns:
        int: the number of full blocks that will be touched by
            swapping in/out the given blocks. Non full blocks are ignored
            when deciding the number of blocks to touch.
    """
    num_touched_blocks: int = 0
    for block in blocks:
        # If the block has a match in the cache and the cached
        # block is not referenced, then we still count it as a
        # touched block
        if block.is_full and (not self.is_block_cached(block) or \
            (block.content_hash is not None and \
            self._cached_blocks[block.content_hash] in \
                    self.evictor)):
            num_touched_blocks += 1
    return num_touched_blocks

get_num_total_blocks

get_num_total_blocks() -> int
Source code in vllm/core/block/prefix_caching_block.py
def get_num_total_blocks(self) -> int:
    return self._hashless_allocator.get_num_total_blocks()

get_physical_block_id

get_physical_block_id(absolute_id: int) -> int

Returns the zero-offset block id on certain block allocator given the absolute block id.

Parameters:

Name Type Description Default
absolute_id int

The absolute block id for the block in whole allocator.

required

Returns:

Name Type Description
int int

The rzero-offset block id on certain device.

Source code in vllm/core/block/prefix_caching_block.py
def get_physical_block_id(self, absolute_id: int) -> int:
    """Returns the zero-offset block id on certain block allocator
    given the absolute block id.

    Args:
        absolute_id (int): The absolute block id for the block 
            in whole allocator.

    Returns:
        int: The rzero-offset block id on certain device.
    """
    return sorted(self.all_block_ids).index(absolute_id)

get_prefix_cache_hit_rate

get_prefix_cache_hit_rate() -> float
Source code in vllm/core/block/prefix_caching_block.py
def get_prefix_cache_hit_rate(self) -> float:
    return self.metric_data.get_hit_rate()

is_block_cached

is_block_cached(block: Block) -> bool
Source code in vllm/core/block/prefix_caching_block.py
def is_block_cached(self, block: Block) -> bool:
    assert block.content_hash is not None
    return block.content_hash in self._cached_blocks

mark_blocks_as_accessed

mark_blocks_as_accessed(
    block_ids: List[int], now: float
) -> None

Mark blocks as accessed, used in prefix caching.

If the block is added into evictor, we need to update corresponding info in evictor's metadata.

Source code in vllm/core/block/prefix_caching_block.py
def mark_blocks_as_accessed(self, block_ids: List[int],
                            now: float) -> None:
    """Mark blocks as accessed, used in prefix caching.

    If the block is added into evictor, we need to update corresponding
    info in evictor's metadata.
    """

    for block_id in block_ids:
        if self._block_tracker[block_id].active:
            self._block_tracker[block_id].last_accessed = now
        elif block_id in self.evictor:
            self.evictor.update(block_id, now)
        else:
            raise ValueError(
                "Mark block as accessed which is not belonged to GPU")

mark_blocks_as_computed

mark_blocks_as_computed(block_ids: List[int]) -> None
Source code in vllm/core/block/prefix_caching_block.py
def mark_blocks_as_computed(self, block_ids: List[int]) -> None:
    # Mark all touched blocks as computed.
    for block_id in self._touched_blocks:
        self._block_tracker[block_id].computed = True
    self._touched_blocks.clear()

promote_to_immutable_block

promote_to_immutable_block(block: Block) -> BlockId

Once a mutable block is full, it can be promoted to an immutable block. This means that its content can be referenced by future blocks having the same prefix.

Note that if we already have a cached block with the same content, we will replace the newly-promoted block's mapping with the existing cached block id.

Parameters:

Name Type Description Default
block Block

The mutable block to be promoted.

required

Returns:

Name Type Description
BlockId BlockId

Either the original block index, or the block index of the previously cached block matching the same content.

Source code in vllm/core/block/prefix_caching_block.py
def promote_to_immutable_block(self, block: Block) -> BlockId:
    """Once a mutable block is full, it can be promoted to an immutable
    block. This means that its content can be referenced by future blocks
    having the same prefix.

    Note that if we already have a cached block with the same content, we
    will replace the newly-promoted block's mapping with the existing cached
    block id.

    Args:
        block: The mutable block to be promoted.

    Returns:
        BlockId: Either the original block index, or the block index of
            the previously cached block matching the same content.
    """
    # Ensure block can be promoted
    assert block.content_hash is not None
    assert block.block_id is not None
    assert self._refcounter.get(block.block_id) > 0

    if block.content_hash not in self._cached_blocks:
        # No cached content hash => Set this block as cached.
        # Note that this block cannot be marked as computed yet
        # because other sequences in the same batch cannot reuse
        # this block.
        self._cached_blocks[block.content_hash] = block.block_id
        # Mark this block as touched so that it can be marked as
        # computed after the entire batch of sequences are scheduled.
        self._touched_blocks.add(block.block_id)
        return block.block_id

    # Reuse the cached content hash
    self._decr_refcount_hashless_block(block)
    block.block_id = self._cached_blocks[block.content_hash]

    # Increment refcount of the cached block and (possibly) restore
    # it from the evictor.
    # Note that in this case, the block is marked as computed
    self._incr_refcount_cached_block(block)

    return block.block_id

reset_prefix_cache

reset_prefix_cache() -> bool

Reset prefix cache. This function may be used in RLHF flows to invalid prefix caching after the weights are updated, or used for resetting prefix caching status for benchmarking.

Returns:

Name Type Description
bool bool

True if the prefix cache is successfully reset,

bool

False otherwise.

Source code in vllm/core/block/prefix_caching_block.py
def reset_prefix_cache(self) -> bool:
    """Reset prefix cache. This function may be used in RLHF
    flows to invalid prefix caching after the weights are updated,
    or used for resetting prefix caching status for benchmarking.

    Returns:
        bool: True if the prefix cache is successfully reset,
        False otherwise.
    """
    num_used_blocks = (self.get_num_total_blocks() -
                       self.get_num_free_blocks())
    if num_used_blocks > 0:
        logger.warning(
            "Failed to reset prefix cache because some "
            "blocks (%d) are not freed yet", num_used_blocks)
        return False

    # Free all blocks in the evictor.
    while (block_id :=
           self._maybe_allocate_evicted_block_id()) is not None:
        self._hashless_allocator.free_block_id(block_id)

    # Should not have any cached blocks because all blocks are evicted.
    assert not self._cached_blocks

    # Reset the evictor.
    self.evictor = make_evictor(self.eviction_policy)

    # Reset the block tracker.
    for block_id in self._block_tracker:
        self._block_tracker[block_id] = BlockTracker()

    # Reset the metrics.
    self.metric_data = CacheMetricData()

    logger.info("Successfully reset prefix cache")
    return True

swap_in

swap_in(blocks: List[Block]) -> None

Execute the swap in actions. Change the block id from old allocator to current allocator for each block to finish the block table update.

Parameters:

Name Type Description Default
blocks List[Block]

List of blocks to be swapped in.

required
Source code in vllm/core/block/prefix_caching_block.py
def swap_in(self, blocks: List[Block]) -> None:
    """Execute the swap in actions. Change the block id from 
    old allocator to current allocator for each block to finish 
    the block table update. 

    Args:
        blocks: List of blocks to be swapped in.
    """
    for block in blocks:
        # Here we allocate either immutable or mutable block and then
        # extract its block_id. Note that the block object is released
        # and the block_id is assigned to "block" to allow reusing the
        # existing "block" object
        if block.is_full:
            tmp_block = self.allocate_immutable_block(
                prev_block=block.prev_block,
                token_ids=block.token_ids,
                extra_hash=block.extra_hash)
        else:
            tmp_block = self.allocate_mutable_block(
                prev_block=block.prev_block, extra_hash=block.extra_hash)
            tmp_block.append_token_ids(block.token_ids)

        block_id = tmp_block.block_id
        self._block_pool.free_block(tmp_block)

        block.block_id = block_id  # Assign block_id

swap_out

swap_out(blocks: List[Block]) -> None

Execute the swap out actions. Basically just free the given blocks.

Parameters:

Name Type Description Default
blocks List[Block]

List of blocks to be swapped out.

required
Source code in vllm/core/block/prefix_caching_block.py
def swap_out(self, blocks: List[Block]) -> None:
    """Execute the swap out actions. Basically just free the 
    given blocks.

    Args:
        blocks: List of blocks to be swapped out.
    """
    for block in blocks:
        self._free_block_id(block)

assert_prefix_caching_block_or_none

assert_prefix_caching_block_or_none(block: Optional[Block])
Source code in vllm/core/block/prefix_caching_block.py
def assert_prefix_caching_block_or_none(block: Optional[Block]):
    if block is None:
        return
    assert isinstance(block,
                      PrefixCachingBlock), "Got block = {}".format(block)