class DeepseekV4ROCMAiterMLASparseImpl(
SparseMLAAttentionImpl[DeepseekV4ROCMAiterMLASparseMetadata]
):
"""ROCm sparse MLA implementation used by DeepSeek V4's custom MLA layer."""
_PREFILL_CHUNK_SIZE = 4
def __init__(
self,
num_heads: int,
head_size: int,
scale: float,
num_kv_heads: int,
alibi_slopes: list[float] | None,
sliding_window: int | None,
kv_cache_dtype: str,
logits_soft_cap: float | None,
attn_type: str,
kv_sharing_target_layer_name: str | None,
**_: object,
) -> None:
self.num_heads = num_heads
self.head_size = head_size
self.scale = float(scale)
self.num_kv_heads = num_kv_heads
self.kv_cache_dtype = kv_cache_dtype
def forward_mqa(
self,
q: torch.Tensor | tuple[torch.Tensor, torch.Tensor],
kv_c_and_k_pe_cache: torch.Tensor,
attn_metadata: DeepseekV4ROCMAiterMLASparseMetadata,
layer: AttentionLayer,
) -> tuple[torch.Tensor, torch.Tensor | None]:
raise NotImplementedError(
"DeepseekV4ROCMAiterMLASparseImpl is driven by "
"DeepseekV4MLAAttention.forward."
)
@classmethod
def forward(
cls,
layer: "DeepseekV4MLAAttention",
q: torch.Tensor,
kv: torch.Tensor,
positions: torch.Tensor,
output: torch.Tensor,
) -> None:
assert output.shape == q.shape, (
f"output buffer shape {output.shape} must match q shape {q.shape}"
)
assert output.dtype == q.dtype, (
f"output buffer dtype {output.dtype} must match q dtype {q.dtype}"
)
forward_context = get_forward_context()
attn_metadata = forward_context.attn_metadata
assert isinstance(attn_metadata, dict)
rocm_metadata = cast(
DeepseekV4ROCMAiterMLASparseMetadata | None,
attn_metadata.get(layer.prefix),
)
swa_metadata = cast(
DeepseekV4ROCMAiterSparseSWAMetadata | None,
attn_metadata.get(layer.swa_cache_layer.prefix),
)
assert swa_metadata is not None
swa_only = layer.compress_ratio <= 1
self_kv_cache = layer.kv_cache if not swa_only else None
swa_kv_cache = layer.swa_cache_layer.kv_cache
num_decodes = swa_metadata.num_decodes
num_prefills = swa_metadata.num_prefills
num_decode_tokens = swa_metadata.num_decode_tokens
if num_prefills > 0:
cls._forward_prefill(
layer=layer,
q=q[num_decode_tokens:],
positions=positions[num_decode_tokens:],
compressed_k_cache=self_kv_cache,
swa_k_cache=swa_kv_cache,
output=output[num_decode_tokens:],
attn_metadata=rocm_metadata,
swa_metadata=swa_metadata,
)
if num_decodes > 0:
cls._forward_decode(
layer=layer,
q=q[:num_decode_tokens],
kv_cache=self_kv_cache,
swa_metadata=swa_metadata,
attn_metadata=rocm_metadata,
swa_only=swa_only,
output=output[:num_decode_tokens],
)
@classmethod
def _forward_decode(
cls,
layer: "DeepseekV4MLAAttention",
q: torch.Tensor,
kv_cache: torch.Tensor | None,
swa_metadata: DeepseekV4ROCMAiterSparseSWAMetadata,
attn_metadata: DeepseekV4ROCMAiterMLASparseMetadata | None,
swa_only: bool,
output: torch.Tensor,
) -> None:
num_decodes = swa_metadata.num_decodes
num_decode_tokens = swa_metadata.num_decode_tokens
topk_indices = None
topk_lens = None
topk_ragged_indices = None
topk_ragged_indptr = None
if not swa_only:
assert attn_metadata is not None
assert swa_metadata.is_valid_token is not None
block_size = attn_metadata.block_size // layer.compress_ratio
is_valid = swa_metadata.is_valid_token[:num_decode_tokens]
if layer.compress_ratio == 4:
assert layer.topk_indices_buffer is not None
(
topk_ragged_indices,
topk_ragged_indptr,
topk_lens,
) = compute_global_topk_ragged_indices_and_indptr(
layer.topk_indices_buffer[:num_decode_tokens],
swa_metadata.token_to_req_indices,
attn_metadata.block_table[:num_decodes],
block_size,
is_valid,
)
else:
topk_indices = attn_metadata.c128a_global_decode_topk_indices
topk_lens = attn_metadata.c128a_decode_topk_lens
topk_ragged_indices = attn_metadata.c128a_decode_topk_ragged_indices
topk_ragged_indptr = attn_metadata.c128a_decode_topk_ragged_indptr
rocm_sparse_attn_decode(
q=q,
kv_cache=kv_cache,
swa_k_cache=layer.swa_cache_layer.kv_cache,
swa_only=swa_only,
topk_indices=topk_indices,
topk_lens=topk_lens,
swa_indices=swa_metadata.decode_swa_indices,
swa_lens=swa_metadata.decode_swa_lens,
swa_ragged_indices=swa_metadata.decode_swa_ragged_indices,
swa_ragged_indptr=swa_metadata.decode_swa_ragged_indptr,
topk_ragged_indices=topk_ragged_indices,
topk_ragged_indptr=topk_ragged_indptr,
attn_sink=layer.attn_sink,
scale=layer.scale,
head_dim=layer.head_dim,
nope_head_dim=layer.nope_head_dim,
rope_head_dim=layer.rope_head_dim,
output=output,
)
@classmethod
def _forward_prefill(
cls,
layer: "DeepseekV4MLAAttention",
q: torch.Tensor,
positions: torch.Tensor,
compressed_k_cache: torch.Tensor | None,
swa_k_cache: torch.Tensor,
output: torch.Tensor,
attn_metadata: DeepseekV4ROCMAiterMLASparseMetadata | None,
swa_metadata: DeepseekV4ROCMAiterSparseSWAMetadata,
) -> None:
swa_only = attn_metadata is None
num_prefills = swa_metadata.num_prefills
num_prefill_tokens = swa_metadata.num_prefill_tokens
num_decodes = swa_metadata.num_decodes
num_decode_tokens = swa_metadata.num_decode_tokens
seq_lens = swa_metadata.prefill_seq_lens
gather_lens = swa_metadata.prefill_gather_lens
assert seq_lens is not None
assert gather_lens is not None
query_start_loc_cpu = swa_metadata.query_start_loc_cpu
query_start_loc = swa_metadata.query_start_loc
assert query_start_loc_cpu is not None
assert query_start_loc is not None
prefill_token_base = query_start_loc_cpu[num_decodes]
if not swa_only:
if layer.compress_ratio == 4:
assert layer.topk_indices_buffer is not None
topk_indices = layer.topk_indices_buffer[num_decode_tokens:]
topk_indices = topk_indices[:num_prefill_tokens]
else:
assert attn_metadata is not None
topk_indices = attn_metadata.c128a_prefill_topk_indices
assert topk_indices is not None
top_k = topk_indices.shape[-1]
N = (layer.max_model_len + layer.compress_ratio - 1) // layer.compress_ratio
else:
assert layer.topk_indices_buffer is not None
topk_indices = layer.topk_indices_buffer[num_decode_tokens:]
top_k = 0
N = 0
M = N + layer.window_size + layer.max_num_batched_tokens
num_chunks = (num_prefills + cls._PREFILL_CHUNK_SIZE - 1) // (
cls._PREFILL_CHUNK_SIZE
)
workspace_manager = current_workspace_manager()
kv = workspace_manager.get_simultaneous(
((cls._PREFILL_CHUNK_SIZE, M, q.shape[-1]), torch.bfloat16),
)[0]
for chunk_idx in range(num_chunks):
chunk_start = chunk_idx * cls._PREFILL_CHUNK_SIZE
chunk_end = min(chunk_start + cls._PREFILL_CHUNK_SIZE, num_prefills)
chunk_size = chunk_end - chunk_start
if not swa_only:
assert attn_metadata is not None
assert compressed_k_cache is not None
block_table = attn_metadata.block_table[num_decodes:]
dequantize_and_gather_k_cache(
kv[:chunk_size],
compressed_k_cache,
seq_lens=seq_lens[chunk_start:chunk_end] // layer.compress_ratio,
gather_lens=None,
block_table=block_table[chunk_start:chunk_end],
block_size=attn_metadata.block_size // layer.compress_ratio,
offset=0,
)
swa_block_table = swa_metadata.block_table[num_decodes:]
dequantize_and_gather_k_cache(
kv[:chunk_size],
swa_k_cache,
seq_lens=seq_lens[chunk_start:chunk_end],
gather_lens=gather_lens[chunk_start:chunk_end],
block_table=swa_block_table[chunk_start:chunk_end],
block_size=swa_metadata.block_size,
offset=N,
)
query_start = (
query_start_loc_cpu[num_decodes + chunk_start] - prefill_token_base
)
query_end = (
query_start_loc_cpu[num_decodes + chunk_end] - prefill_token_base
)
combined_ragged_indices, combined_ragged_indptr, combined_lens = (
combine_topk_swa_indices_ragged(
topk_indices[query_start:query_end],
query_start_loc[
num_decodes + chunk_start : num_decodes + chunk_end + 1
],
seq_lens[chunk_start:chunk_end],
gather_lens[chunk_start:chunk_end],
layer.window_size,
layer.compress_ratio,
top_k,
M,
N,
)
)
rocm_sparse_attn_prefill(
q=q[query_start:query_end],
kv=kv.view(-1, 1, q.shape[-1]),
indices=torch.empty(
q[query_start:query_end].shape[0],
1,
0,
dtype=torch.int32,
device=q.device,
),
topk_length=combined_lens,
scale=layer.scale,
head_dim=layer.head_dim,
nope_head_dim=layer.nope_head_dim,
rope_head_dim=layer.rope_head_dim,
attn_sink=layer.attn_sink,
output=output[query_start:query_end],
ragged_indices=combined_ragged_indices,
ragged_indptr=combined_ragged_indptr,
)