Skip to content

vllm.attention.backends.flashmla

FlashMLABackend

Bases: MLACommonBackend

Source code in vllm/attention/backends/flashmla.py
class FlashMLABackend(MLACommonBackend):

    @staticmethod
    def get_name() -> str:
        return "FLASHMLA"

    @staticmethod
    def get_impl_cls() -> Type["FlashMLAImpl"]:
        return FlashMLAImpl

    @staticmethod
    def get_metadata_cls() -> Type["FlashMLAMetadata"]:
        return FlashMLAMetadata

    @staticmethod
    def get_builder_cls() -> Type["FlashMLAMetadataBuilder"]:
        return FlashMLAMetadataBuilder

    @staticmethod
    def get_state_cls() -> Type["FlashMLAState"]:
        return FlashMLAState

get_builder_cls staticmethod

get_builder_cls() -> Type[FlashMLAMetadataBuilder]
Source code in vllm/attention/backends/flashmla.py
@staticmethod
def get_builder_cls() -> Type["FlashMLAMetadataBuilder"]:
    return FlashMLAMetadataBuilder

get_impl_cls staticmethod

get_impl_cls() -> Type[FlashMLAImpl]
Source code in vllm/attention/backends/flashmla.py
@staticmethod
def get_impl_cls() -> Type["FlashMLAImpl"]:
    return FlashMLAImpl

get_metadata_cls staticmethod

get_metadata_cls() -> Type[FlashMLAMetadata]
Source code in vllm/attention/backends/flashmla.py
@staticmethod
def get_metadata_cls() -> Type["FlashMLAMetadata"]:
    return FlashMLAMetadata

get_name staticmethod

get_name() -> str
Source code in vllm/attention/backends/flashmla.py
@staticmethod
def get_name() -> str:
    return "FLASHMLA"

get_state_cls staticmethod

get_state_cls() -> Type[FlashMLAState]
Source code in vllm/attention/backends/flashmla.py
@staticmethod
def get_state_cls() -> Type["FlashMLAState"]:
    return FlashMLAState

FlashMLAImpl

Bases: MLACommonImpl[FlashMLAMetadata]

Source code in vllm/attention/backends/flashmla.py
class FlashMLAImpl(MLACommonImpl[FlashMLAMetadata]):

    def __init__(
            self,
            num_heads: int,
            head_size: int,
            scale: float,
            num_kv_heads: int,
            alibi_slopes: Optional[List[float]],
            sliding_window: Optional[int],
            kv_cache_dtype: str,
            blocksparse_params: Optional[Dict[str, Any]],
            logits_soft_cap: Optional[float],
            attn_type: str,
            kv_sharing_target_layer_name: Optional[str] = None,
            # MLA Specific Arguments
            **mla_args) -> None:
        super().__init__(num_heads, head_size, scale, num_kv_heads,
                         alibi_slopes, sliding_window, kv_cache_dtype,
                         blocksparse_params, logits_soft_cap, attn_type,
                         kv_sharing_target_layer_name, **mla_args)

        assert is_flashmla_supported(), \
            "FlashMLA is not supported on this device"

        unsupported_features = [
            alibi_slopes, sliding_window, blocksparse_params, logits_soft_cap
        ]
        if any(unsupported_features):
            raise NotImplementedError(
                "FlashMLAImpl does not support one of the following: "
                "alibi_slopes, sliding_window, blocksparse_params, "
                "logits_soft_cap")

        if attn_type != AttentionType.DECODER:
            raise NotImplementedError("Encoder self-attention and "
                                      "encoder/decoder cross-attention "
                                      "are not implemented for "
                                      "FlashMLAImpl")

        if is_quantized_kv_cache(self.kv_cache_dtype):
            raise NotImplementedError(
                "FlashMLA with FP8 KV cache not yet supported")

    def _forward_decode(
        self,
        q_nope: torch.Tensor,
        q_pe: torch.Tensor,
        kv_c_and_k_pe_cache: torch.Tensor,
        attn_metadata: FlashMLAMetadata,
    ) -> torch.Tensor:
        assert kv_c_and_k_pe_cache.numel() > 0

        decode_meta = attn_metadata.decode_metadata
        assert decode_meta is not None

        q = torch.cat([q_nope, q_pe], dim=-1)\
            .unsqueeze(1) # Add seqlen dim of 1 (decode)

        o, _ = flash_mla_with_kvcache(
            q=q,
            k_cache=kv_c_and_k_pe_cache.unsqueeze(-2),  # Add head dim of 1
            block_table=decode_meta.block_tables,
            cache_seqlens=decode_meta.seq_lens_tensor,
            head_dim_v=self.kv_lora_rank,
            tile_scheduler_metadata=decode_meta.decode_tile_scheduler_metadata,
            num_splits=decode_meta.decode_num_splits,
            softmax_scale=self.scale,
            causal=True,
        )

        return self._v_up_proj(o)

__init__

__init__(
    num_heads: int,
    head_size: int,
    scale: float,
    num_kv_heads: int,
    alibi_slopes: Optional[List[float]],
    sliding_window: Optional[int],
    kv_cache_dtype: str,
    blocksparse_params: Optional[Dict[str, Any]],
    logits_soft_cap: Optional[float],
    attn_type: str,
    kv_sharing_target_layer_name: Optional[str] = None,
    **mla_args,
) -> None
Source code in vllm/attention/backends/flashmla.py
def __init__(
        self,
        num_heads: int,
        head_size: int,
        scale: float,
        num_kv_heads: int,
        alibi_slopes: Optional[List[float]],
        sliding_window: Optional[int],
        kv_cache_dtype: str,
        blocksparse_params: Optional[Dict[str, Any]],
        logits_soft_cap: Optional[float],
        attn_type: str,
        kv_sharing_target_layer_name: Optional[str] = None,
        # MLA Specific Arguments
        **mla_args) -> None:
    super().__init__(num_heads, head_size, scale, num_kv_heads,
                     alibi_slopes, sliding_window, kv_cache_dtype,
                     blocksparse_params, logits_soft_cap, attn_type,
                     kv_sharing_target_layer_name, **mla_args)

    assert is_flashmla_supported(), \
        "FlashMLA is not supported on this device"

    unsupported_features = [
        alibi_slopes, sliding_window, blocksparse_params, logits_soft_cap
    ]
    if any(unsupported_features):
        raise NotImplementedError(
            "FlashMLAImpl does not support one of the following: "
            "alibi_slopes, sliding_window, blocksparse_params, "
            "logits_soft_cap")

    if attn_type != AttentionType.DECODER:
        raise NotImplementedError("Encoder self-attention and "
                                  "encoder/decoder cross-attention "
                                  "are not implemented for "
                                  "FlashMLAImpl")

    if is_quantized_kv_cache(self.kv_cache_dtype):
        raise NotImplementedError(
            "FlashMLA with FP8 KV cache not yet supported")

_forward_decode

_forward_decode(
    q_nope: Tensor,
    q_pe: Tensor,
    kv_c_and_k_pe_cache: Tensor,
    attn_metadata: FlashMLAMetadata,
) -> Tensor
Source code in vllm/attention/backends/flashmla.py
def _forward_decode(
    self,
    q_nope: torch.Tensor,
    q_pe: torch.Tensor,
    kv_c_and_k_pe_cache: torch.Tensor,
    attn_metadata: FlashMLAMetadata,
) -> torch.Tensor:
    assert kv_c_and_k_pe_cache.numel() > 0

    decode_meta = attn_metadata.decode_metadata
    assert decode_meta is not None

    q = torch.cat([q_nope, q_pe], dim=-1)\
        .unsqueeze(1) # Add seqlen dim of 1 (decode)

    o, _ = flash_mla_with_kvcache(
        q=q,
        k_cache=kv_c_and_k_pe_cache.unsqueeze(-2),  # Add head dim of 1
        block_table=decode_meta.block_tables,
        cache_seqlens=decode_meta.seq_lens_tensor,
        head_dim_v=self.kv_lora_rank,
        tile_scheduler_metadata=decode_meta.decode_tile_scheduler_metadata,
        num_splits=decode_meta.decode_num_splits,
        softmax_scale=self.scale,
        causal=True,
    )

    return self._v_up_proj(o)

FlashMLAMetadata dataclass

Bases: MLACommonMetadata

Source code in vllm/attention/backends/flashmla.py
@dataclass
class FlashMLAMetadata(MLACommonMetadata):
    decode_tile_scheduler_metadata: Optional[Tuple[torch.Tensor,
                                                   torch.Tensor]] = None
    decode_num_splits: Optional[torch.Tensor] = None

    @property
    def decode_metadata(self):
        decode_metadata = super().decode_metadata
        # TODO: cache assignment?
        if decode_metadata is not None:
            decode_metadata.decode_tile_scheduler_metadata=\
                self.decode_tile_scheduler_metadata
            decode_metadata.decode_num_splits=\
                self.decode_num_splits
        return decode_metadata

    def advance_step(self,
                     model_input: "ModelInputForGPUWithSamplingMetadata",
                     sampled_token_ids: Optional[torch.Tensor],
                     block_size: int,
                     num_seqs: int,
                     num_queries: int,
                     turn_prefills_into_decodes: bool = False):
        raise NotImplementedError(
            "advance_step is not implemented for FlashMLA")

decode_metadata property

decode_metadata

decode_num_splits class-attribute instance-attribute

decode_num_splits: Optional[Tensor] = None

decode_tile_scheduler_metadata class-attribute instance-attribute

decode_tile_scheduler_metadata: Optional[
    Tuple[Tensor, Tensor]
] = None

__init__

__init__(
    num_prefills: int,
    num_prefill_tokens: int,
    num_decode_tokens: int,
    slot_mapping: Tensor,
    multi_modal_placeholder_index_maps: Optional[
        Dict[str, IndexMap]
    ],
    enable_kv_scales_calculation: bool,
    use_cuda_graph: bool,
    seq_lens: Optional[List[int]],
    seq_lens_tensor: Optional[Tensor],
    max_prefill_seq_len: int,
    max_decode_seq_len: int,
    context_lens_tensor: Optional[Tensor],
    block_tables: Optional[Tensor],
    max_query_len: Optional[int] = None,
    max_decode_query_len: Optional[int] = None,
    query_start_loc: Optional[Tensor] = None,
    seq_start_loc: Optional[Tensor] = None,
    _cached_prefill_metadata: Optional[Any] = None,
    _cached_decode_metadata: Optional[Any] = None,
    head_dim: Optional[int] = None,
    is_profile_run: bool = False,
    context_chunk_cu_seq_lens: Optional[Tensor] = None,
    context_chunk_starts: Optional[Tensor] = None,
    context_chunk_seq_tot: Optional[List[int]] = None,
    context_chunk_max_seq_lens: Optional[List[int]] = None,
    context_chunk_workspace: Optional[Tensor] = None,
    decode_tile_scheduler_metadata: Optional[
        Tuple[Tensor, Tensor]
    ] = None,
    decode_num_splits: Optional[Tensor] = None,
) -> None

advance_step

advance_step(
    model_input: ModelInputForGPUWithSamplingMetadata,
    sampled_token_ids: Optional[Tensor],
    block_size: int,
    num_seqs: int,
    num_queries: int,
    turn_prefills_into_decodes: bool = False,
)
Source code in vllm/attention/backends/flashmla.py
def advance_step(self,
                 model_input: "ModelInputForGPUWithSamplingMetadata",
                 sampled_token_ids: Optional[torch.Tensor],
                 block_size: int,
                 num_seqs: int,
                 num_queries: int,
                 turn_prefills_into_decodes: bool = False):
    raise NotImplementedError(
        "advance_step is not implemented for FlashMLA")

FlashMLAMetadataBuilder

Bases: MLACommonMetadataBuilder[FlashMLAMetadata]

Source code in vllm/attention/backends/flashmla.py
class FlashMLAMetadataBuilder(MLACommonMetadataBuilder[FlashMLAMetadata]):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.num_q_heads = self.runner.model_config.get_num_attention_heads(
            self.runner.parallel_config)

    def build(self, seq_lens: List[int], query_lens: List[int],
              cuda_graph_pad_size: int, batch_size: int):
        m = super().build(seq_lens, query_lens, cuda_graph_pad_size,
                          batch_size)

        if m.num_decode_tokens > 0:
            m.decode_tile_scheduler_metadata, m.decode_num_splits = \
                get_mla_metadata(
                m.seq_lens_tensor[m.num_prefills:],
                self.num_q_heads,
                1, # MQA for the decode path
            )

        return m

num_q_heads instance-attribute

num_q_heads = get_num_attention_heads(parallel_config)

__init__

__init__(*args, **kwargs)
Source code in vllm/attention/backends/flashmla.py
def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)

    self.num_q_heads = self.runner.model_config.get_num_attention_heads(
        self.runner.parallel_config)

build

build(
    seq_lens: List[int],
    query_lens: List[int],
    cuda_graph_pad_size: int,
    batch_size: int,
)
Source code in vllm/attention/backends/flashmla.py
def build(self, seq_lens: List[int], query_lens: List[int],
          cuda_graph_pad_size: int, batch_size: int):
    m = super().build(seq_lens, query_lens, cuda_graph_pad_size,
                      batch_size)

    if m.num_decode_tokens > 0:
        m.decode_tile_scheduler_metadata, m.decode_num_splits = \
            get_mla_metadata(
            m.seq_lens_tensor[m.num_prefills:],
            self.num_q_heads,
            1, # MQA for the decode path
        )

    return m

FlashMLAState

Bases: MLACommonState[FlashMLAMetadata]

Source code in vllm/attention/backends/flashmla.py
class FlashMLAState(MLACommonState[FlashMLAMetadata]):

    def __init__(self, *args, **kwds):
        super().__init__(*args, **kwds)

        self.num_q_heads = self.runner.model_config.get_num_attention_heads(
            self.runner.parallel_config)

    @contextmanager
    def graph_capture(self, max_batch_size: int):
        # Run a dummy `get_mla_metadata` so we can get the right shapes
        self._graph_decoder_tile_scheduler_metadata, \
            self._graph_decode_num_splits = get_mla_metadata(
            torch.ones(
                max_batch_size, dtype=torch.int32, device=self.runner.device),
            self.num_q_heads,
            1, # MQA for the decode path
        )

        with super().graph_capture(max_batch_size):
            yield

        del self._graph_decoder_tile_scheduler_metadata
        del self._graph_decode_num_splits

    def graph_capture_get_metadata_for_batch(
            self, batch_size: int, is_encoder_decoder_model: bool = False):
        metadata = super().graph_capture_get_metadata_for_batch(
            batch_size, is_encoder_decoder_model)
        assert metadata.num_decode_tokens > 0

        decoder_tile_scheduler_metadata, decode_num_splits = get_mla_metadata(
            self._graph_seq_lens[:batch_size],
            self.num_q_heads,
            1,  # MQA for the decode path
        )

        self._graph_decoder_tile_scheduler_metadata.copy_(
            decoder_tile_scheduler_metadata)
        self._graph_decode_num_splits[:batch_size + 1].copy_(decode_num_splits)

        metadata.decode_tile_scheduler_metadata=\
            self._graph_decoder_tile_scheduler_metadata
        metadata.decode_num_splits=\
            self._graph_decode_num_splits[:batch_size + 1]

        return metadata

    def get_graph_input_buffers(self,
                                attn_metadata,
                                is_encoder_decoder_model: bool = False):
        input_buffers = super().get_graph_input_buffers(
            attn_metadata, is_encoder_decoder_model)
        input_buffers["decode_tile_scheduler_metadata"] = \
                attn_metadata.decode_metadata.decode_tile_scheduler_metadata
        input_buffers["decode_num_splits"] = \
                attn_metadata.decode_metadata.decode_num_splits

        return input_buffers

    def prepare_graph_input_buffers(self,
                                    input_buffers,
                                    attn_metadata,
                                    is_encoder_decoder_model: bool = False):
        super().prepare_graph_input_buffers(input_buffers, attn_metadata,
                                            is_encoder_decoder_model)

        input_buffers["decode_tile_scheduler_metadata"].copy_(
            attn_metadata.decode_metadata.decode_tile_scheduler_metadata)
        input_buffers["decode_num_splits"].copy_(
            attn_metadata.decode_metadata.decode_num_splits)

num_q_heads instance-attribute

num_q_heads = get_num_attention_heads(parallel_config)

__init__

__init__(*args, **kwds)
Source code in vllm/attention/backends/flashmla.py
def __init__(self, *args, **kwds):
    super().__init__(*args, **kwds)

    self.num_q_heads = self.runner.model_config.get_num_attention_heads(
        self.runner.parallel_config)

get_graph_input_buffers

get_graph_input_buffers(
    attn_metadata, is_encoder_decoder_model: bool = False
)
Source code in vllm/attention/backends/flashmla.py
def get_graph_input_buffers(self,
                            attn_metadata,
                            is_encoder_decoder_model: bool = False):
    input_buffers = super().get_graph_input_buffers(
        attn_metadata, is_encoder_decoder_model)
    input_buffers["decode_tile_scheduler_metadata"] = \
            attn_metadata.decode_metadata.decode_tile_scheduler_metadata
    input_buffers["decode_num_splits"] = \
            attn_metadata.decode_metadata.decode_num_splits

    return input_buffers

graph_capture

graph_capture(max_batch_size: int)
Source code in vllm/attention/backends/flashmla.py
@contextmanager
def graph_capture(self, max_batch_size: int):
    # Run a dummy `get_mla_metadata` so we can get the right shapes
    self._graph_decoder_tile_scheduler_metadata, \
        self._graph_decode_num_splits = get_mla_metadata(
        torch.ones(
            max_batch_size, dtype=torch.int32, device=self.runner.device),
        self.num_q_heads,
        1, # MQA for the decode path
    )

    with super().graph_capture(max_batch_size):
        yield

    del self._graph_decoder_tile_scheduler_metadata
    del self._graph_decode_num_splits

graph_capture_get_metadata_for_batch

graph_capture_get_metadata_for_batch(
    batch_size: int, is_encoder_decoder_model: bool = False
)
Source code in vllm/attention/backends/flashmla.py
def graph_capture_get_metadata_for_batch(
        self, batch_size: int, is_encoder_decoder_model: bool = False):
    metadata = super().graph_capture_get_metadata_for_batch(
        batch_size, is_encoder_decoder_model)
    assert metadata.num_decode_tokens > 0

    decoder_tile_scheduler_metadata, decode_num_splits = get_mla_metadata(
        self._graph_seq_lens[:batch_size],
        self.num_q_heads,
        1,  # MQA for the decode path
    )

    self._graph_decoder_tile_scheduler_metadata.copy_(
        decoder_tile_scheduler_metadata)
    self._graph_decode_num_splits[:batch_size + 1].copy_(decode_num_splits)

    metadata.decode_tile_scheduler_metadata=\
        self._graph_decoder_tile_scheduler_metadata
    metadata.decode_num_splits=\
        self._graph_decode_num_splits[:batch_size + 1]

    return metadata

prepare_graph_input_buffers

prepare_graph_input_buffers(
    input_buffers,
    attn_metadata,
    is_encoder_decoder_model: bool = False,
)
Source code in vllm/attention/backends/flashmla.py
def prepare_graph_input_buffers(self,
                                input_buffers,
                                attn_metadata,
                                is_encoder_decoder_model: bool = False):
    super().prepare_graph_input_buffers(input_buffers, attn_metadata,
                                        is_encoder_decoder_model)

    input_buffers["decode_tile_scheduler_metadata"].copy_(
        attn_metadata.decode_metadata.decode_tile_scheduler_metadata)
    input_buffers["decode_num_splits"].copy_(
        attn_metadata.decode_metadata.decode_num_splits)