Skip to content

vllm.distributed.kv_transfer.kv_connector.v1.lmcache_mp_connector

logger module-attribute

logger = init_logger(__name__)

LMCacheMPConnector

Bases: KVConnectorBase_V1

The connector for LMCache multi-process mode.

Extra configs (kv_transfer_config.extra_config): - lmcache.mp.host: the host of the LMCache server. - lmcache.mp.port: the port of the LMCache server.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
class LMCacheMPConnector(KVConnectorBase_V1):
    """
    The connector for LMCache multi-process mode.

    Extra configs (kv_transfer_config.extra_config):
    - lmcache.mp.host: the host of the LMCache server.
    - lmcache.mp.port: the port of the LMCache server.
    """

    def __init__(
        self,
        vllm_config: "VllmConfig",
        role: KVConnectorRole,
        kv_cache_config: Optional["KVCacheConfig"] = None,
    ):
        super().__init__(vllm_config, role, kv_cache_config)

        assert vllm_config.kv_transfer_config is not None
        server_host = vllm_config.kv_transfer_config.get_from_extra_config(
            "lmcache.mp.host", "tcp://localhost"
        )
        server_port = vllm_config.kv_transfer_config.get_from_extra_config(
            "lmcache.mp.port", 5555
        )

        server_url = f"{server_host}:{server_port}"
        zmq_context = zmq.Context.instance()
        if self.role == KVConnectorRole.SCHEDULER:
            self.scheduler_adapter = create_scheduler_adapter(
                server_url, zmq_context, vllm_config
            )
            self.request_trackers: dict[str, LMCacheMPRequestTracker] = {}
        elif self.role == KVConnectorRole.WORKER:
            self.worker_adapter = create_worker_adapter(
                server_url, zmq_context, vllm_config
            )
        else:
            raise ValueError(f"Unknown KVConnectorRole: {self.role}")

        self.vllm_block_size = vllm_config.cache_config.block_size

    @property
    def role(self) -> KVConnectorRole:
        return self._role

    # ==============================
    # Worker-side methods
    # ==============================

    def _get_connector_metadata(self) -> KVConnectorMetadata:
        """Get the connector metadata.

        This function should only be called inside the connector.

        Returns:
            ConnectorMetadata: the connector metadata.
        """

        # Should only be called while set to valid metadata.
        assert self._connector_metadata is not None
        return self._connector_metadata

    def register_kv_caches(self, kv_caches: dict[str, torch.Tensor]):
        """
        Initialize with the KV caches. Useful for pre-registering the
        KV Caches in the KVConnector (e.g. for NIXL).

        Args:
            kv_caches: dictionary of layer names, kv cache
        """
        logger.info("Registering kv caches!")
        self.worker_adapter.register_kv_caches(kv_caches)
        return

    def start_load_kv(self, forward_context: "ForwardContext", **kwargs: Any) -> None:
        """
        Start loading the KV cache from the connector to vLLM's paged
        KV buffer. This is called from the forward context before the
        forward pass to enable async loading during model execution.

        Args:
            forward_context (ForwardContext): the forward context.
            **kwargs: additional arguments for the load operation

        Note:
            The number of elements in kv_caches and layer_names should be
            the same.

        """
        metadata = self._get_connector_metadata()
        assert isinstance(metadata, LMCacheMPConnectorMetadata)

        with torch.cuda.stream(torch.cuda.current_stream()):
            event = torch.cuda.Event(interprocess=True)
            event.record()

        request_ids = []
        ops = []

        for meta in metadata.requests:
            if meta.direction != "RETRIEVE":
                continue
            request_ids.append(meta.request_id)
            ops.append(meta.op)

        if len(request_ids) > 0:
            self.worker_adapter.batched_submit_retrieve_requests(
                request_ids, ops, event
            )

    def wait_for_layer_load(self, layer_name: str) -> None:
        """
        Block until the KV for a specific layer is loaded into vLLM's
        paged buffer. This is called from within attention layer to ensure
        async copying from start_load_kv is complete.

        This interface will be useful for layer-by-layer pipelining.

        Args:
            layer_name: the name of that layer
        """
        return

    def save_kv_layer(
        self,
        layer_name: str,
        kv_layer: torch.Tensor,
        attn_metadata: "AttentionMetadata",
        **kwargs: Any,
    ) -> None:
        """
        Start saving a layer of KV cache from vLLM's paged buffer
        to the connector. This is called from within attention layer to
        enable async copying during execution.

        Args:
            layer_name (str): the name of the layer.
            kv_layer (torch.Tensor): the paged KV buffer of the current
                layer in vLLM.
            attn_metadata (AttentionMetadata): the attention metadata.
            **kwargs: additional arguments for the save operation.
        """
        return

    def wait_for_save(self):
        """
        Block until all the save operations is done. This is called
        as the forward context exits to ensure that the async saving
        from save_kv_layer is complete before finishing the forward.

        This prevents overwrites of paged KV buffer before saving done.
        """
        metadata = self._get_connector_metadata()
        assert isinstance(metadata, LMCacheMPConnectorMetadata)

        with torch.cuda.stream(torch.cuda.current_stream()):
            event = torch.cuda.Event(interprocess=True)
            event.record()

        request_ids = []
        ops = []
        for meta in metadata.requests:
            if meta.direction != "STORE":
                continue
            request_ids.append(meta.request_id)
            ops.append(meta.op)

        if len(request_ids) > 0:
            self.worker_adapter.batched_submit_store_requests(request_ids, ops, event)

    def get_finished(
        self, finished_req_ids: set[str]
    ) -> tuple[set[str] | None, set[str] | None]:
        """
        Notifies worker-side connector ids of requests that have
        finished generating tokens on the worker.
        The scheduler process (via the Executors) will use this output
        to track which workers are done.

        Returns:
            ids of requests that have finished asynchronous transfer
            (requests that previously returned True from request_finished()),
            tuple of (sending/saving ids, recving/loading ids).
            The finished saves/sends req ids must belong to a set provided in a
            call to this method (this call or a prior one).
        """
        val = self.worker_adapter.get_finished(finished_req_ids)
        # logger.error("Finished req ids: %s, %s", val[0], val[1])
        return val

    def get_block_ids_with_load_errors(self) -> set[int]:
        """
        Get the set of block IDs that failed to load.

        Returns:
            Set of block IDs that encountered load errors.
            Empty set if no load errors occurred.

        Notes:
            - Applies to both sync- and async-loading requests.
            - Async loading: failed blocks may be reported in any forward pass
              up to and including the pass where the request ID is returned by
              `get_finished()`. Even if failures occur, the request must still
              be reported via `get_finished()`, and the failed block IDs must
              appear here no later than that same pass.
            - Sync loading: failed blocks should be reported in the forward
              pass in which they are detected.
        """
        # TODO: add error tracking
        return set()

    def shutdown(self):
        """
        Shutdown the connector. This is called when the worker process
        is shutting down to ensure that all the async operations are
        completed and the connector is cleaned up properly.
        """
        if hasattr(self, "worker_adapter"):
            self.worker_adapter.shutdown()
        return None

    def get_kv_connector_stats(self) -> Optional["KVConnectorStats"]:
        """
        Get the KV connector stats collected during the last interval.
        """
        return None

    # ==============================
    # Scheduler-side methods
    # ==============================

    def get_num_new_matched_tokens(
        self,
        request: "Request",
        num_computed_tokens: int,
    ) -> tuple[int | None, bool]:
        """
        Get number of new tokens that can be loaded from the
        external KV cache beyond the num_computed_tokens.

        Args:
            request (Request): the request object.
            num_computed_tokens (int): the number of locally
                computed tokens for this request

        Returns:
            A tuple with the following elements:
                - An optional number of tokens that can be loaded from the
                  external KV cache beyond what is already computed.
                  If None, it means that the connector needs more time to
                  determine the number of matched tokens, and the scheduler
                  should query for this request again later.
                - `True` if external KV cache tokens will be loaded
                  asynchronously (between scheduler steps). Must be
                  'False' if the first element is 0.

        Notes:
            The connector should only consider the largest prefix of prompt-
            tokens for which KV cache is actually available at the time of the
            call. If the cache cannot be loaded for some tokens (e.g., due to
            connectivity issues or eviction), those tokens must not be taken
            into account.
        """
        tracker = self._get_or_create_request_tracker(request)

        self.scheduler_adapter.maybe_submit_lookup_request(
            request.request_id, convert_block_hashes_to_bytes(request.block_hashes)
        )

        ret = self.scheduler_adapter.check_lookup_result(request.request_id)
        if ret is None:
            return None, True

        if ret == 0:
            return 0, False

        assert (
            ret % (self.scheduler_adapter.num_blocks_per_chunk() * self.vllm_block_size)
            == 0
        )

        # Update num stored blocks for the tracker
        num_vllm_blocks = num_computed_tokens // self.vllm_block_size
        num_lmcache_blocks = ret // self.vllm_block_size
        tracker.increase_num_stored_blocks(num_lmcache_blocks)

        # Save the vllm and lmcache hit tokens
        tracker.num_vllm_hit_blocks = num_vllm_blocks
        tracker.num_lmcache_hit_blocks = num_lmcache_blocks

        need_to_load = max(0, ret - num_computed_tokens)
        logger.debug(
            "vLLM hit is: %d, Need to load is %d", num_computed_tokens, need_to_load
        )
        return need_to_load, need_to_load > 0

    def update_state_after_alloc(
        self, request: "Request", blocks: "KVCacheBlocks", num_external_tokens: int
    ):
        """
        Update KVConnector state after block allocation.

        If get_num_new_matched_tokens previously returned True for a
        request, this function may be called twice for that same request -
        first when blocks are allocated for the connector tokens to be
        asynchronously loaded into, and second when any additional blocks
        are allocated, after the load/transfer is complete.

        Args:
            request (Request): the request object.
            blocks (KVCacheBlocks): the blocks allocated for the request.
            num_external_tokens (int): the number of tokens that will be
                loaded from the external KV cache.
        """
        # NOTE: the `blocks` are NEW BLOCKS allocated for this request.
        tracker = self._get_request_tracker(request.request_id)
        block_ids = reformat_block_ids(blocks.get_block_ids())

        # No matter we need to retrieve or not, we need to update
        # the block ids into the tracker
        tracker.update_block_ids(block_ids)

        # Update the state of the tracker
        condition = tracker.needs_retrieve()
        if tracker.state == LMCacheMPRequestState.PREFETCHING:
            # If need to retrieve, change to WAITING_FOR_LOAD
            # Otherwise, change to READY
            tracker.state = (
                LMCacheMPRequestState.WAITING_FOR_LOAD
                if condition
                else LMCacheMPRequestState.READY
            )

    def build_connector_meta(
        self, scheduler_output: SchedulerOutput
    ) -> KVConnectorMetadata:
        """
        Build the connector metadata for this step.

        This function should NOT modify fields in the scheduler_output.
        Also, calling this function will reset the state of the connector.

        Args:
            scheduler_output (SchedulerOutput): the scheduler output object.
        """
        metadata = LMCacheMPConnectorMetadata()

        self._process_retrieve_requests(metadata)
        self._process_new_requests(scheduler_output, metadata)
        self._process_cached_requests(scheduler_output, metadata)

        if len(metadata) > 0:
            logger.debug("Final connector metadata: %s", metadata)

        return metadata

    def update_connector_output(self, connector_output: KVConnectorOutput):
        """
        Update KVConnector state from worker-side connectors output.

        Args:
            connector_output (KVConnectorOutput): the worker-side
                connectors output.
        """
        return

    def request_finished(
        self,
        request: "Request",
        block_ids: list[int],
    ) -> tuple[bool, dict[str, Any] | None]:
        """
        Called exactly once when a request has finished, before its blocks are
        freed.

        The connector may assumes responsibility for freeing the blocks
        asynchronously by returning True.

        Returns:
            True if the request is being saved/sent asynchronously and blocks
            should not be freed until the request_id is returned from
            get_finished().
            Optional KVTransferParams to be included in the request outputs
            returned by the engine.
        """
        return True, None

    def take_events(self) -> Iterable["KVCacheEvent"]:
        """
        Take the KV cache events from the connector.

        Yields:
            New KV cache events since the last call.
        """
        return ()

    @classmethod
    def get_required_kvcache_layout(cls, vllm_config: "VllmConfig") -> str | None:
        """
        Get the required KV cache layout for this connector.
        Args:
            vllm_config (VllmConfig): the vllm config.

        Returns:
            str: the required KV cache layout. e.g. HND, or NHD.
            None if the connector does not require a specific layout.
        """

        if cls is KVConnectorBase_V1:
            raise TypeError(
                "get_required_kvcache_layout should not be called "
                "on the abstract base class"
            )
        return None

    def get_finished_count(self) -> int | None:
        """
        Get the count of requests expected to complete send/receive operations
        via this connector. This method is used to initialize the
        KVOutputAggregator, overwriting the default world_size.

        Returns:
            int: expected sending or receiving completion count.
        """
        return None

    @classmethod
    def build_kv_connector_stats(
        cls, data: dict[str, Any] | None = None
    ) -> Optional["KVConnectorStats"]:
        """
        KVConnectorStats resolution method. This method allows dynamically
        registered connectors to return their own KVConnectorStats object,
        which can implement custom aggregation logic on the data dict.
        """
        return None

    @classmethod
    def build_prom_metrics(
        cls,
        vllm_config: "VllmConfig",
        metric_types: dict[type["PromMetric"], type["PromMetricT"]],
        labelnames: list[str],
        per_engine_labelvalues: dict[int, list[str]],
    ) -> Optional["KVConnectorPromMetrics"]:
        """
        Create a KVConnectorPromMetrics subclass which should register
        per-connector Prometheus metrics and implement observe() to
        expose connector transfer stats via Prometheus.
        """
        return None

    ##############################
    # Helper functions
    ##############################
    def _process_retrieve_requests(
        self,
        metadata: LMCacheMPConnectorMetadata,
    ) -> None:
        blocks_per_chunk = self.scheduler_adapter.num_blocks_per_chunk()

        for request_tracker in self.request_trackers.values():
            if request_tracker.state != LMCacheMPRequestState.WAITING_FOR_LOAD:
                continue
            r_metadata = LMCacheMPRequestMetadata.GetRetrieveMetadata(
                request_tracker, blocks_per_chunk
            )
            if r_metadata is not None:
                metadata.add_request_metadata(r_metadata)
            request_tracker.state = LMCacheMPRequestState.READY

    def _process_new_requests(
        self,
        scheduler_output: SchedulerOutput,
        metadata: LMCacheMPConnectorMetadata,
    ) -> None:
        blocks_per_chunk = self.scheduler_adapter.num_blocks_per_chunk()

        for new_request in scheduler_output.scheduled_new_reqs:
            request_tracker = self._get_request_tracker(new_request.req_id)

            num_new_tokens = scheduler_output.num_scheduled_tokens[new_request.req_id]
            request_tracker.increase_num_scheduled_tokens(num_new_tokens)

            r_meta = LMCacheMPRequestMetadata.GetStoreMetadata(
                request_tracker, blocks_per_chunk, self.vllm_block_size
            )
            if r_meta is not None:
                metadata.add_request_metadata(r_meta)

    def _process_cached_requests(
        self,
        scheduler_output: SchedulerOutput,
        metadata: LMCacheMPConnectorMetadata,
    ) -> None:
        blocks_per_chunk = self.scheduler_adapter.num_blocks_per_chunk()

        cached_reqs = scheduler_output.scheduled_cached_reqs
        for idx, request_id in enumerate(cached_reqs.req_ids):
            request_tracker = self._get_request_tracker(request_id)

            # Update block ids
            new_block_ids = reformat_block_ids(cached_reqs.new_block_ids[idx])
            request_tracker.update_block_ids(new_block_ids)

            # Update new scheduled tokens
            num_new_tokens = cached_reqs.num_computed_tokens[idx]
            request_tracker.increase_num_scheduled_tokens(num_new_tokens)

            r_meta = LMCacheMPRequestMetadata.GetStoreMetadata(
                request_tracker, blocks_per_chunk, self.vllm_block_size
            )

            if r_meta is not None:
                metadata.add_request_metadata(r_meta)

    def _get_request_tracker(self, request_id: str) -> LMCacheMPRequestTracker:
        assert request_id in self.request_trackers, (
            f"Request tracker for request_id {request_id} not found. "
        )
        return self.request_trackers[request_id]

    def _get_or_create_request_tracker(
        self, request: "Request"
    ) -> LMCacheMPRequestTracker:
        request_id = request.request_id
        if request_id not in self.request_trackers:
            new_tracker = LMCacheMPRequestTracker(request)
            self.request_trackers[request_id] = new_tracker
        return self.request_trackers[request_id]

request_trackers instance-attribute

request_trackers: dict[str, LMCacheMPRequestTracker] = {}

role property

scheduler_adapter instance-attribute

scheduler_adapter = create_scheduler_adapter(
    server_url, zmq_context, vllm_config
)

vllm_block_size instance-attribute

vllm_block_size = block_size

worker_adapter instance-attribute

worker_adapter = create_worker_adapter(
    server_url, zmq_context, vllm_config
)

__init__

__init__(
    vllm_config: VllmConfig,
    role: KVConnectorRole,
    kv_cache_config: Optional[KVCacheConfig] = None,
)
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def __init__(
    self,
    vllm_config: "VllmConfig",
    role: KVConnectorRole,
    kv_cache_config: Optional["KVCacheConfig"] = None,
):
    super().__init__(vllm_config, role, kv_cache_config)

    assert vllm_config.kv_transfer_config is not None
    server_host = vllm_config.kv_transfer_config.get_from_extra_config(
        "lmcache.mp.host", "tcp://localhost"
    )
    server_port = vllm_config.kv_transfer_config.get_from_extra_config(
        "lmcache.mp.port", 5555
    )

    server_url = f"{server_host}:{server_port}"
    zmq_context = zmq.Context.instance()
    if self.role == KVConnectorRole.SCHEDULER:
        self.scheduler_adapter = create_scheduler_adapter(
            server_url, zmq_context, vllm_config
        )
        self.request_trackers: dict[str, LMCacheMPRequestTracker] = {}
    elif self.role == KVConnectorRole.WORKER:
        self.worker_adapter = create_worker_adapter(
            server_url, zmq_context, vllm_config
        )
    else:
        raise ValueError(f"Unknown KVConnectorRole: {self.role}")

    self.vllm_block_size = vllm_config.cache_config.block_size

_get_connector_metadata

_get_connector_metadata() -> KVConnectorMetadata

Get the connector metadata.

This function should only be called inside the connector.

Returns:

Name Type Description
ConnectorMetadata KVConnectorMetadata

the connector metadata.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def _get_connector_metadata(self) -> KVConnectorMetadata:
    """Get the connector metadata.

    This function should only be called inside the connector.

    Returns:
        ConnectorMetadata: the connector metadata.
    """

    # Should only be called while set to valid metadata.
    assert self._connector_metadata is not None
    return self._connector_metadata

_get_or_create_request_tracker

_get_or_create_request_tracker(
    request: Request,
) -> LMCacheMPRequestTracker
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def _get_or_create_request_tracker(
    self, request: "Request"
) -> LMCacheMPRequestTracker:
    request_id = request.request_id
    if request_id not in self.request_trackers:
        new_tracker = LMCacheMPRequestTracker(request)
        self.request_trackers[request_id] = new_tracker
    return self.request_trackers[request_id]

_get_request_tracker

_get_request_tracker(
    request_id: str,
) -> LMCacheMPRequestTracker
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def _get_request_tracker(self, request_id: str) -> LMCacheMPRequestTracker:
    assert request_id in self.request_trackers, (
        f"Request tracker for request_id {request_id} not found. "
    )
    return self.request_trackers[request_id]

_process_cached_requests

_process_cached_requests(
    scheduler_output: SchedulerOutput,
    metadata: LMCacheMPConnectorMetadata,
) -> None
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def _process_cached_requests(
    self,
    scheduler_output: SchedulerOutput,
    metadata: LMCacheMPConnectorMetadata,
) -> None:
    blocks_per_chunk = self.scheduler_adapter.num_blocks_per_chunk()

    cached_reqs = scheduler_output.scheduled_cached_reqs
    for idx, request_id in enumerate(cached_reqs.req_ids):
        request_tracker = self._get_request_tracker(request_id)

        # Update block ids
        new_block_ids = reformat_block_ids(cached_reqs.new_block_ids[idx])
        request_tracker.update_block_ids(new_block_ids)

        # Update new scheduled tokens
        num_new_tokens = cached_reqs.num_computed_tokens[idx]
        request_tracker.increase_num_scheduled_tokens(num_new_tokens)

        r_meta = LMCacheMPRequestMetadata.GetStoreMetadata(
            request_tracker, blocks_per_chunk, self.vllm_block_size
        )

        if r_meta is not None:
            metadata.add_request_metadata(r_meta)

_process_new_requests

_process_new_requests(
    scheduler_output: SchedulerOutput,
    metadata: LMCacheMPConnectorMetadata,
) -> None
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def _process_new_requests(
    self,
    scheduler_output: SchedulerOutput,
    metadata: LMCacheMPConnectorMetadata,
) -> None:
    blocks_per_chunk = self.scheduler_adapter.num_blocks_per_chunk()

    for new_request in scheduler_output.scheduled_new_reqs:
        request_tracker = self._get_request_tracker(new_request.req_id)

        num_new_tokens = scheduler_output.num_scheduled_tokens[new_request.req_id]
        request_tracker.increase_num_scheduled_tokens(num_new_tokens)

        r_meta = LMCacheMPRequestMetadata.GetStoreMetadata(
            request_tracker, blocks_per_chunk, self.vllm_block_size
        )
        if r_meta is not None:
            metadata.add_request_metadata(r_meta)

_process_retrieve_requests

_process_retrieve_requests(
    metadata: LMCacheMPConnectorMetadata,
) -> None
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def _process_retrieve_requests(
    self,
    metadata: LMCacheMPConnectorMetadata,
) -> None:
    blocks_per_chunk = self.scheduler_adapter.num_blocks_per_chunk()

    for request_tracker in self.request_trackers.values():
        if request_tracker.state != LMCacheMPRequestState.WAITING_FOR_LOAD:
            continue
        r_metadata = LMCacheMPRequestMetadata.GetRetrieveMetadata(
            request_tracker, blocks_per_chunk
        )
        if r_metadata is not None:
            metadata.add_request_metadata(r_metadata)
        request_tracker.state = LMCacheMPRequestState.READY

build_connector_meta

build_connector_meta(
    scheduler_output: SchedulerOutput,
) -> KVConnectorMetadata

Build the connector metadata for this step.

This function should NOT modify fields in the scheduler_output. Also, calling this function will reset the state of the connector.

Parameters:

Name Type Description Default
scheduler_output SchedulerOutput

the scheduler output object.

required
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def build_connector_meta(
    self, scheduler_output: SchedulerOutput
) -> KVConnectorMetadata:
    """
    Build the connector metadata for this step.

    This function should NOT modify fields in the scheduler_output.
    Also, calling this function will reset the state of the connector.

    Args:
        scheduler_output (SchedulerOutput): the scheduler output object.
    """
    metadata = LMCacheMPConnectorMetadata()

    self._process_retrieve_requests(metadata)
    self._process_new_requests(scheduler_output, metadata)
    self._process_cached_requests(scheduler_output, metadata)

    if len(metadata) > 0:
        logger.debug("Final connector metadata: %s", metadata)

    return metadata

build_kv_connector_stats classmethod

build_kv_connector_stats(
    data: dict[str, Any] | None = None,
) -> Optional[KVConnectorStats]

KVConnectorStats resolution method. This method allows dynamically registered connectors to return their own KVConnectorStats object, which can implement custom aggregation logic on the data dict.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
@classmethod
def build_kv_connector_stats(
    cls, data: dict[str, Any] | None = None
) -> Optional["KVConnectorStats"]:
    """
    KVConnectorStats resolution method. This method allows dynamically
    registered connectors to return their own KVConnectorStats object,
    which can implement custom aggregation logic on the data dict.
    """
    return None

build_prom_metrics classmethod

build_prom_metrics(
    vllm_config: VllmConfig,
    metric_types: dict[type[PromMetric], type[PromMetricT]],
    labelnames: list[str],
    per_engine_labelvalues: dict[int, list[str]],
) -> Optional[KVConnectorPromMetrics]

Create a KVConnectorPromMetrics subclass which should register per-connector Prometheus metrics and implement observe() to expose connector transfer stats via Prometheus.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
@classmethod
def build_prom_metrics(
    cls,
    vllm_config: "VllmConfig",
    metric_types: dict[type["PromMetric"], type["PromMetricT"]],
    labelnames: list[str],
    per_engine_labelvalues: dict[int, list[str]],
) -> Optional["KVConnectorPromMetrics"]:
    """
    Create a KVConnectorPromMetrics subclass which should register
    per-connector Prometheus metrics and implement observe() to
    expose connector transfer stats via Prometheus.
    """
    return None

get_block_ids_with_load_errors

get_block_ids_with_load_errors() -> set[int]

Get the set of block IDs that failed to load.

Returns:

Type Description
set[int]

Set of block IDs that encountered load errors.

set[int]

Empty set if no load errors occurred.

Notes
  • Applies to both sync- and async-loading requests.
  • Async loading: failed blocks may be reported in any forward pass up to and including the pass where the request ID is returned by get_finished(). Even if failures occur, the request must still be reported via get_finished(), and the failed block IDs must appear here no later than that same pass.
  • Sync loading: failed blocks should be reported in the forward pass in which they are detected.
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def get_block_ids_with_load_errors(self) -> set[int]:
    """
    Get the set of block IDs that failed to load.

    Returns:
        Set of block IDs that encountered load errors.
        Empty set if no load errors occurred.

    Notes:
        - Applies to both sync- and async-loading requests.
        - Async loading: failed blocks may be reported in any forward pass
          up to and including the pass where the request ID is returned by
          `get_finished()`. Even if failures occur, the request must still
          be reported via `get_finished()`, and the failed block IDs must
          appear here no later than that same pass.
        - Sync loading: failed blocks should be reported in the forward
          pass in which they are detected.
    """
    # TODO: add error tracking
    return set()

get_finished

get_finished(
    finished_req_ids: set[str],
) -> tuple[set[str] | None, set[str] | None]

Notifies worker-side connector ids of requests that have finished generating tokens on the worker. The scheduler process (via the Executors) will use this output to track which workers are done.

Returns:

Type Description
set[str] | None

ids of requests that have finished asynchronous transfer

set[str] | None

(requests that previously returned True from request_finished()),

tuple[set[str] | None, set[str] | None]

tuple of (sending/saving ids, recving/loading ids).

tuple[set[str] | None, set[str] | None]

The finished saves/sends req ids must belong to a set provided in a

tuple[set[str] | None, set[str] | None]

call to this method (this call or a prior one).

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def get_finished(
    self, finished_req_ids: set[str]
) -> tuple[set[str] | None, set[str] | None]:
    """
    Notifies worker-side connector ids of requests that have
    finished generating tokens on the worker.
    The scheduler process (via the Executors) will use this output
    to track which workers are done.

    Returns:
        ids of requests that have finished asynchronous transfer
        (requests that previously returned True from request_finished()),
        tuple of (sending/saving ids, recving/loading ids).
        The finished saves/sends req ids must belong to a set provided in a
        call to this method (this call or a prior one).
    """
    val = self.worker_adapter.get_finished(finished_req_ids)
    # logger.error("Finished req ids: %s, %s", val[0], val[1])
    return val

get_finished_count

get_finished_count() -> int | None

Get the count of requests expected to complete send/receive operations via this connector. This method is used to initialize the KVOutputAggregator, overwriting the default world_size.

Returns:

Name Type Description
int int | None

expected sending or receiving completion count.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def get_finished_count(self) -> int | None:
    """
    Get the count of requests expected to complete send/receive operations
    via this connector. This method is used to initialize the
    KVOutputAggregator, overwriting the default world_size.

    Returns:
        int: expected sending or receiving completion count.
    """
    return None

get_kv_connector_stats

get_kv_connector_stats() -> Optional[KVConnectorStats]

Get the KV connector stats collected during the last interval.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def get_kv_connector_stats(self) -> Optional["KVConnectorStats"]:
    """
    Get the KV connector stats collected during the last interval.
    """
    return None

get_num_new_matched_tokens

get_num_new_matched_tokens(
    request: Request, num_computed_tokens: int
) -> tuple[int | None, bool]

Get number of new tokens that can be loaded from the external KV cache beyond the num_computed_tokens.

Parameters:

Name Type Description Default
request Request

the request object.

required
num_computed_tokens int

the number of locally computed tokens for this request

required

Returns:

Type Description
tuple[int | None, bool]

A tuple with the following elements: - An optional number of tokens that can be loaded from the external KV cache beyond what is already computed. If None, it means that the connector needs more time to determine the number of matched tokens, and the scheduler should query for this request again later. - True if external KV cache tokens will be loaded asynchronously (between scheduler steps). Must be 'False' if the first element is 0.

Notes

The connector should only consider the largest prefix of prompt- tokens for which KV cache is actually available at the time of the call. If the cache cannot be loaded for some tokens (e.g., due to connectivity issues or eviction), those tokens must not be taken into account.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def get_num_new_matched_tokens(
    self,
    request: "Request",
    num_computed_tokens: int,
) -> tuple[int | None, bool]:
    """
    Get number of new tokens that can be loaded from the
    external KV cache beyond the num_computed_tokens.

    Args:
        request (Request): the request object.
        num_computed_tokens (int): the number of locally
            computed tokens for this request

    Returns:
        A tuple with the following elements:
            - An optional number of tokens that can be loaded from the
              external KV cache beyond what is already computed.
              If None, it means that the connector needs more time to
              determine the number of matched tokens, and the scheduler
              should query for this request again later.
            - `True` if external KV cache tokens will be loaded
              asynchronously (between scheduler steps). Must be
              'False' if the first element is 0.

    Notes:
        The connector should only consider the largest prefix of prompt-
        tokens for which KV cache is actually available at the time of the
        call. If the cache cannot be loaded for some tokens (e.g., due to
        connectivity issues or eviction), those tokens must not be taken
        into account.
    """
    tracker = self._get_or_create_request_tracker(request)

    self.scheduler_adapter.maybe_submit_lookup_request(
        request.request_id, convert_block_hashes_to_bytes(request.block_hashes)
    )

    ret = self.scheduler_adapter.check_lookup_result(request.request_id)
    if ret is None:
        return None, True

    if ret == 0:
        return 0, False

    assert (
        ret % (self.scheduler_adapter.num_blocks_per_chunk() * self.vllm_block_size)
        == 0
    )

    # Update num stored blocks for the tracker
    num_vllm_blocks = num_computed_tokens // self.vllm_block_size
    num_lmcache_blocks = ret // self.vllm_block_size
    tracker.increase_num_stored_blocks(num_lmcache_blocks)

    # Save the vllm and lmcache hit tokens
    tracker.num_vllm_hit_blocks = num_vllm_blocks
    tracker.num_lmcache_hit_blocks = num_lmcache_blocks

    need_to_load = max(0, ret - num_computed_tokens)
    logger.debug(
        "vLLM hit is: %d, Need to load is %d", num_computed_tokens, need_to_load
    )
    return need_to_load, need_to_load > 0

get_required_kvcache_layout classmethod

get_required_kvcache_layout(
    vllm_config: VllmConfig,
) -> str | None

Get the required KV cache layout for this connector. Args: vllm_config (VllmConfig): the vllm config.

Returns:

Name Type Description
str str | None

the required KV cache layout. e.g. HND, or NHD.

str | None

None if the connector does not require a specific layout.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
@classmethod
def get_required_kvcache_layout(cls, vllm_config: "VllmConfig") -> str | None:
    """
    Get the required KV cache layout for this connector.
    Args:
        vllm_config (VllmConfig): the vllm config.

    Returns:
        str: the required KV cache layout. e.g. HND, or NHD.
        None if the connector does not require a specific layout.
    """

    if cls is KVConnectorBase_V1:
        raise TypeError(
            "get_required_kvcache_layout should not be called "
            "on the abstract base class"
        )
    return None

register_kv_caches

register_kv_caches(kv_caches: dict[str, Tensor])

Initialize with the KV caches. Useful for pre-registering the KV Caches in the KVConnector (e.g. for NIXL).

Parameters:

Name Type Description Default
kv_caches dict[str, Tensor]

dictionary of layer names, kv cache

required
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def register_kv_caches(self, kv_caches: dict[str, torch.Tensor]):
    """
    Initialize with the KV caches. Useful for pre-registering the
    KV Caches in the KVConnector (e.g. for NIXL).

    Args:
        kv_caches: dictionary of layer names, kv cache
    """
    logger.info("Registering kv caches!")
    self.worker_adapter.register_kv_caches(kv_caches)
    return

request_finished

request_finished(
    request: Request, block_ids: list[int]
) -> tuple[bool, dict[str, Any] | None]

Called exactly once when a request has finished, before its blocks are freed.

The connector may assumes responsibility for freeing the blocks asynchronously by returning True.

Returns:

Type Description
bool

True if the request is being saved/sent asynchronously and blocks

dict[str, Any] | None

should not be freed until the request_id is returned from

tuple[bool, dict[str, Any] | None]

get_finished().

tuple[bool, dict[str, Any] | None]

Optional KVTransferParams to be included in the request outputs

tuple[bool, dict[str, Any] | None]

returned by the engine.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def request_finished(
    self,
    request: "Request",
    block_ids: list[int],
) -> tuple[bool, dict[str, Any] | None]:
    """
    Called exactly once when a request has finished, before its blocks are
    freed.

    The connector may assumes responsibility for freeing the blocks
    asynchronously by returning True.

    Returns:
        True if the request is being saved/sent asynchronously and blocks
        should not be freed until the request_id is returned from
        get_finished().
        Optional KVTransferParams to be included in the request outputs
        returned by the engine.
    """
    return True, None

save_kv_layer

save_kv_layer(
    layer_name: str,
    kv_layer: Tensor,
    attn_metadata: AttentionMetadata,
    **kwargs: Any,
) -> None

Start saving a layer of KV cache from vLLM's paged buffer to the connector. This is called from within attention layer to enable async copying during execution.

Parameters:

Name Type Description Default
layer_name str

the name of the layer.

required
kv_layer Tensor

the paged KV buffer of the current layer in vLLM.

required
attn_metadata AttentionMetadata

the attention metadata.

required
**kwargs Any

additional arguments for the save operation.

{}
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def save_kv_layer(
    self,
    layer_name: str,
    kv_layer: torch.Tensor,
    attn_metadata: "AttentionMetadata",
    **kwargs: Any,
) -> None:
    """
    Start saving a layer of KV cache from vLLM's paged buffer
    to the connector. This is called from within attention layer to
    enable async copying during execution.

    Args:
        layer_name (str): the name of the layer.
        kv_layer (torch.Tensor): the paged KV buffer of the current
            layer in vLLM.
        attn_metadata (AttentionMetadata): the attention metadata.
        **kwargs: additional arguments for the save operation.
    """
    return

shutdown

shutdown()

Shutdown the connector. This is called when the worker process is shutting down to ensure that all the async operations are completed and the connector is cleaned up properly.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def shutdown(self):
    """
    Shutdown the connector. This is called when the worker process
    is shutting down to ensure that all the async operations are
    completed and the connector is cleaned up properly.
    """
    if hasattr(self, "worker_adapter"):
        self.worker_adapter.shutdown()
    return None

start_load_kv

start_load_kv(
    forward_context: ForwardContext, **kwargs: Any
) -> None

Start loading the KV cache from the connector to vLLM's paged KV buffer. This is called from the forward context before the forward pass to enable async loading during model execution.

Parameters:

Name Type Description Default
forward_context ForwardContext

the forward context.

required
**kwargs Any

additional arguments for the load operation

{}
Note

The number of elements in kv_caches and layer_names should be the same.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def start_load_kv(self, forward_context: "ForwardContext", **kwargs: Any) -> None:
    """
    Start loading the KV cache from the connector to vLLM's paged
    KV buffer. This is called from the forward context before the
    forward pass to enable async loading during model execution.

    Args:
        forward_context (ForwardContext): the forward context.
        **kwargs: additional arguments for the load operation

    Note:
        The number of elements in kv_caches and layer_names should be
        the same.

    """
    metadata = self._get_connector_metadata()
    assert isinstance(metadata, LMCacheMPConnectorMetadata)

    with torch.cuda.stream(torch.cuda.current_stream()):
        event = torch.cuda.Event(interprocess=True)
        event.record()

    request_ids = []
    ops = []

    for meta in metadata.requests:
        if meta.direction != "RETRIEVE":
            continue
        request_ids.append(meta.request_id)
        ops.append(meta.op)

    if len(request_ids) > 0:
        self.worker_adapter.batched_submit_retrieve_requests(
            request_ids, ops, event
        )

take_events

take_events() -> Iterable[KVCacheEvent]

Take the KV cache events from the connector.

Yields:

Type Description
Iterable[KVCacheEvent]

New KV cache events since the last call.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def take_events(self) -> Iterable["KVCacheEvent"]:
    """
    Take the KV cache events from the connector.

    Yields:
        New KV cache events since the last call.
    """
    return ()

update_connector_output

update_connector_output(
    connector_output: KVConnectorOutput,
)

Update KVConnector state from worker-side connectors output.

Parameters:

Name Type Description Default
connector_output KVConnectorOutput

the worker-side connectors output.

required
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def update_connector_output(self, connector_output: KVConnectorOutput):
    """
    Update KVConnector state from worker-side connectors output.

    Args:
        connector_output (KVConnectorOutput): the worker-side
            connectors output.
    """
    return

update_state_after_alloc

update_state_after_alloc(
    request: Request,
    blocks: KVCacheBlocks,
    num_external_tokens: int,
)

Update KVConnector state after block allocation.

If get_num_new_matched_tokens previously returned True for a request, this function may be called twice for that same request - first when blocks are allocated for the connector tokens to be asynchronously loaded into, and second when any additional blocks are allocated, after the load/transfer is complete.

Parameters:

Name Type Description Default
request Request

the request object.

required
blocks KVCacheBlocks

the blocks allocated for the request.

required
num_external_tokens int

the number of tokens that will be loaded from the external KV cache.

required
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def update_state_after_alloc(
    self, request: "Request", blocks: "KVCacheBlocks", num_external_tokens: int
):
    """
    Update KVConnector state after block allocation.

    If get_num_new_matched_tokens previously returned True for a
    request, this function may be called twice for that same request -
    first when blocks are allocated for the connector tokens to be
    asynchronously loaded into, and second when any additional blocks
    are allocated, after the load/transfer is complete.

    Args:
        request (Request): the request object.
        blocks (KVCacheBlocks): the blocks allocated for the request.
        num_external_tokens (int): the number of tokens that will be
            loaded from the external KV cache.
    """
    # NOTE: the `blocks` are NEW BLOCKS allocated for this request.
    tracker = self._get_request_tracker(request.request_id)
    block_ids = reformat_block_ids(blocks.get_block_ids())

    # No matter we need to retrieve or not, we need to update
    # the block ids into the tracker
    tracker.update_block_ids(block_ids)

    # Update the state of the tracker
    condition = tracker.needs_retrieve()
    if tracker.state == LMCacheMPRequestState.PREFETCHING:
        # If need to retrieve, change to WAITING_FOR_LOAD
        # Otherwise, change to READY
        tracker.state = (
            LMCacheMPRequestState.WAITING_FOR_LOAD
            if condition
            else LMCacheMPRequestState.READY
        )

wait_for_layer_load

wait_for_layer_load(layer_name: str) -> None

Block until the KV for a specific layer is loaded into vLLM's paged buffer. This is called from within attention layer to ensure async copying from start_load_kv is complete.

This interface will be useful for layer-by-layer pipelining.

Parameters:

Name Type Description Default
layer_name str

the name of that layer

required
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def wait_for_layer_load(self, layer_name: str) -> None:
    """
    Block until the KV for a specific layer is loaded into vLLM's
    paged buffer. This is called from within attention layer to ensure
    async copying from start_load_kv is complete.

    This interface will be useful for layer-by-layer pipelining.

    Args:
        layer_name: the name of that layer
    """
    return

wait_for_save

wait_for_save()

Block until all the save operations is done. This is called as the forward context exits to ensure that the async saving from save_kv_layer is complete before finishing the forward.

This prevents overwrites of paged KV buffer before saving done.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def wait_for_save(self):
    """
    Block until all the save operations is done. This is called
    as the forward context exits to ensure that the async saving
    from save_kv_layer is complete before finishing the forward.

    This prevents overwrites of paged KV buffer before saving done.
    """
    metadata = self._get_connector_metadata()
    assert isinstance(metadata, LMCacheMPConnectorMetadata)

    with torch.cuda.stream(torch.cuda.current_stream()):
        event = torch.cuda.Event(interprocess=True)
        event.record()

    request_ids = []
    ops = []
    for meta in metadata.requests:
        if meta.direction != "STORE":
            continue
        request_ids.append(meta.request_id)
        ops.append(meta.op)

    if len(request_ids) > 0:
        self.worker_adapter.batched_submit_store_requests(request_ids, ops, event)

LMCacheMPConnectorMetadata

Bases: KVConnectorMetadata

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
class LMCacheMPConnectorMetadata(KVConnectorMetadata):
    def __init__(self):
        super().__init__()
        self.requests: list[LMCacheMPRequestMetadata] = []

    def add_request_metadata(self, request_metadata: LMCacheMPRequestMetadata):
        self.requests.append(request_metadata)

    def __len__(self):
        return len(self.requests)

    # For debugging
    def __str__(self):
        request_strs = []
        for req_meta in self.requests:
            request_strs.append(
                f"RequestMetadata(request_id={req_meta.request_id}, "
                f"direction={req_meta.direction}, "
                f"num_blocks={len(req_meta.op)}, "
                f"block_ids={req_meta.op.block_ids})"
            )
        return "[" + "\n".join(request_strs) + "]"

    def __repr__(self):
        return self.__str__()

requests instance-attribute

__init__

__init__()
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def __init__(self):
    super().__init__()
    self.requests: list[LMCacheMPRequestMetadata] = []

__len__

__len__()
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def __len__(self):
    return len(self.requests)

__repr__

__repr__()
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def __repr__(self):
    return self.__str__()

__str__

__str__()
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def __str__(self):
    request_strs = []
    for req_meta in self.requests:
        request_strs.append(
            f"RequestMetadata(request_id={req_meta.request_id}, "
            f"direction={req_meta.direction}, "
            f"num_blocks={len(req_meta.op)}, "
            f"block_ids={req_meta.op.block_ids})"
        )
    return "[" + "\n".join(request_strs) + "]"

add_request_metadata

add_request_metadata(
    request_metadata: LMCacheMPRequestMetadata,
)
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def add_request_metadata(self, request_metadata: LMCacheMPRequestMetadata):
    self.requests.append(request_metadata)

LMCacheMPRequestMetadata dataclass

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
@dataclass
class LMCacheMPRequestMetadata:
    request_id: str
    direction: Literal["STORE", "RETRIEVE"]
    op: LoadStoreOp

    @staticmethod
    def GetStoreMetadata(
        tracker: LMCacheMPRequestTracker,
        blocks_in_chunk: int,
        vllm_block_size: int,
    ) -> "LMCacheMPRequestMetadata | None":
        """
        Generate the store metadata for the current request tracker.

        Args:
            tracker: The request tracker to generate the metadata from.
            blocks_in_chunk: the number of blocks in a LMCache data chunk
        """
        # Store the blocks that has block hashes
        # NOTE: the invariant here is that `num_stored_blocks` should
        # always be a multiple of `blocks_in_chunk`
        # TODO: This should be checked everytime we update the num_stored_blocks
        min_available_blocks = min(
            len(tracker.block_hashes),
            len(tracker.allocated_block_ids),
            tracker.num_scheduled_tokens // vllm_block_size,
        )
        num_staging_blocks = min_available_blocks - tracker.num_stored_blocks
        num_chunks = num_staging_blocks // blocks_in_chunk

        if num_chunks >= 1:
            start = tracker.num_stored_blocks
            end = start + num_chunks * blocks_in_chunk
            block_hashes = convert_block_hashes_to_bytes(
                tracker.block_hashes[start:end]
            )
            block_ids = tracker.allocated_block_ids[start:end]

            ret = LMCacheMPRequestMetadata(
                request_id=tracker.request_id,
                direction="STORE",
                op=LoadStoreOp(block_hashes=block_hashes, block_ids=block_ids),
            )

            # Update the request tracker
            tracker.increase_num_stored_blocks(end - start)
            return ret

        return None

    @staticmethod
    def GetRetrieveMetadata(
        tracker: LMCacheMPRequestTracker,
        blocks_in_chunk: int,
    ) -> "LMCacheMPRequestMetadata | None":
        """
        Generate the retrieve metadata for the current request tracker.

        Args:
            tracker: The request tracker to generate the metadata from.
            blocks_in_chunk: the number of blocks in a LMCache data chunk
        """
        if not tracker.is_ready_for_retrieving():
            return None

        # |---------------------|-----------------|----------------|
        # | num_vllm_hit_blocks |
        # | lmcache chunk 1   | lmcache chunk 2   |
        #                     |  need to retrieve |

        start = tracker.num_vllm_hit_blocks // blocks_in_chunk * blocks_in_chunk
        end = tracker.num_lmcache_hit_blocks
        assert end % blocks_in_chunk == 0, (
            "The number of LMCache hit blocks should be a multiple of the "
            "number of blocks in a lmcache chunk. "
        )
        assert len(tracker.block_hashes) >= end, (
            "The number of block hashes should be greater than or equal to the "
            "number of LMCache hit blocks. "
        )
        if end > start:
            block_hashes = convert_block_hashes_to_bytes(
                tracker.block_hashes[start:end]
            )
            block_ids = tracker.allocated_block_ids[start:end]

            ret = LMCacheMPRequestMetadata(
                request_id=tracker.request_id,
                direction="RETRIEVE",
                op=LoadStoreOp(block_hashes=block_hashes, block_ids=block_ids),
            )
            return ret

        return None

direction instance-attribute

direction: Literal['STORE', 'RETRIEVE']

op instance-attribute

request_id instance-attribute

request_id: str

GetRetrieveMetadata staticmethod

GetRetrieveMetadata(
    tracker: LMCacheMPRequestTracker, blocks_in_chunk: int
) -> LMCacheMPRequestMetadata | None

Generate the retrieve metadata for the current request tracker.

Parameters:

Name Type Description Default
tracker LMCacheMPRequestTracker

The request tracker to generate the metadata from.

required
blocks_in_chunk int

the number of blocks in a LMCache data chunk

required
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
@staticmethod
def GetRetrieveMetadata(
    tracker: LMCacheMPRequestTracker,
    blocks_in_chunk: int,
) -> "LMCacheMPRequestMetadata | None":
    """
    Generate the retrieve metadata for the current request tracker.

    Args:
        tracker: The request tracker to generate the metadata from.
        blocks_in_chunk: the number of blocks in a LMCache data chunk
    """
    if not tracker.is_ready_for_retrieving():
        return None

    # |---------------------|-----------------|----------------|
    # | num_vllm_hit_blocks |
    # | lmcache chunk 1   | lmcache chunk 2   |
    #                     |  need to retrieve |

    start = tracker.num_vllm_hit_blocks // blocks_in_chunk * blocks_in_chunk
    end = tracker.num_lmcache_hit_blocks
    assert end % blocks_in_chunk == 0, (
        "The number of LMCache hit blocks should be a multiple of the "
        "number of blocks in a lmcache chunk. "
    )
    assert len(tracker.block_hashes) >= end, (
        "The number of block hashes should be greater than or equal to the "
        "number of LMCache hit blocks. "
    )
    if end > start:
        block_hashes = convert_block_hashes_to_bytes(
            tracker.block_hashes[start:end]
        )
        block_ids = tracker.allocated_block_ids[start:end]

        ret = LMCacheMPRequestMetadata(
            request_id=tracker.request_id,
            direction="RETRIEVE",
            op=LoadStoreOp(block_hashes=block_hashes, block_ids=block_ids),
        )
        return ret

    return None

GetStoreMetadata staticmethod

GetStoreMetadata(
    tracker: LMCacheMPRequestTracker,
    blocks_in_chunk: int,
    vllm_block_size: int,
) -> LMCacheMPRequestMetadata | None

Generate the store metadata for the current request tracker.

Parameters:

Name Type Description Default
tracker LMCacheMPRequestTracker

The request tracker to generate the metadata from.

required
blocks_in_chunk int

the number of blocks in a LMCache data chunk

required
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
@staticmethod
def GetStoreMetadata(
    tracker: LMCacheMPRequestTracker,
    blocks_in_chunk: int,
    vllm_block_size: int,
) -> "LMCacheMPRequestMetadata | None":
    """
    Generate the store metadata for the current request tracker.

    Args:
        tracker: The request tracker to generate the metadata from.
        blocks_in_chunk: the number of blocks in a LMCache data chunk
    """
    # Store the blocks that has block hashes
    # NOTE: the invariant here is that `num_stored_blocks` should
    # always be a multiple of `blocks_in_chunk`
    # TODO: This should be checked everytime we update the num_stored_blocks
    min_available_blocks = min(
        len(tracker.block_hashes),
        len(tracker.allocated_block_ids),
        tracker.num_scheduled_tokens // vllm_block_size,
    )
    num_staging_blocks = min_available_blocks - tracker.num_stored_blocks
    num_chunks = num_staging_blocks // blocks_in_chunk

    if num_chunks >= 1:
        start = tracker.num_stored_blocks
        end = start + num_chunks * blocks_in_chunk
        block_hashes = convert_block_hashes_to_bytes(
            tracker.block_hashes[start:end]
        )
        block_ids = tracker.allocated_block_ids[start:end]

        ret = LMCacheMPRequestMetadata(
            request_id=tracker.request_id,
            direction="STORE",
            op=LoadStoreOp(block_hashes=block_hashes, block_ids=block_ids),
        )

        # Update the request tracker
        tracker.increase_num_stored_blocks(end - start)
        return ret

    return None

__init__

__init__(
    request_id: str,
    direction: Literal["STORE", "RETRIEVE"],
    op: LoadStoreOp,
) -> None

LMCacheMPRequestState

Bases: Enum

State machine: PREFETCHING -- update_state_after_alloc --> WAITING_FOR_LOAD WAITING_FOR_LOAD -- process_loading_requests --> READY

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
class LMCacheMPRequestState(enum.Enum):
    """
    State machine:
    PREFETCHING -- update_state_after_alloc --> WAITING_FOR_LOAD
    WAITING_FOR_LOAD -- process_loading_requests --> READY
    """

    PREFETCHING = enum.auto()
    WAITING_FOR_LOAD = enum.auto()
    READY = enum.auto()

PREFETCHING class-attribute instance-attribute

PREFETCHING = auto()

READY class-attribute instance-attribute

READY = auto()

WAITING_FOR_LOAD class-attribute instance-attribute

WAITING_FOR_LOAD = auto()

LMCacheMPRequestTracker dataclass

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
@dataclass
class LMCacheMPRequestTracker:
    # NOTE: this class used vLLM data structures, should be part of
    # vLLM integration code

    request_id: str

    # Read-only lists to track the token ids and block hashes
    all_token_ids: ConstantList[int]
    block_hashes: ConstantList["BlockHash"]

    # Block ids and hashes will be updated at update_states_after_alloc and
    # during the generation
    allocated_block_ids: list[int] = field(default_factory=list)

    # Number of scheduled tokens in this request. We keep tracking this to
    # avoid saving half-full blocks.
    num_scheduled_tokens: int = 0

    # Number of blocks stored will be initialized when lookup the external
    # hit tokens and will be updated when processing new requests and cached
    # requests.
    num_stored_blocks: int = 0

    # Staging load operation -- save vllm and lmcache hit tokens during lookup
    num_vllm_hit_blocks: int = 0
    num_lmcache_hit_blocks: int = 0

    # Main state
    state: LMCacheMPRequestState = LMCacheMPRequestState.PREFETCHING

    def __init__(self, request: "Request"):
        self.request_id = request.request_id
        self.all_token_ids = request.all_token_ids
        self.block_hashes = ConstantList(request.block_hashes)
        self.allocated_block_ids = []
        self.num_stored_blocks = 0
        self.num_vllm_hit_blocks = 0
        self.num_lmcache_hit_blocks = 0
        self.state = LMCacheMPRequestState.PREFETCHING

    ####
    # Check the state of the request
    ####
    def needs_retrieve(self) -> bool:
        """Check whether the current request needs retrieve, will be used
        update_stage_after_alloc"""
        return (
            self.num_lmcache_hit_blocks > self.num_vllm_hit_blocks
            and self.state != LMCacheMPRequestState.READY
        )

    def is_ready_for_retrieving(self) -> bool:
        """Check whether the current request is ready for retrieving,
        will be used in process_loading_requests"""
        return (
            self.state == LMCacheMPRequestState.WAITING_FOR_LOAD
            and self.needs_retrieve()
        )

    ####
    # Update internal states
    ####
    def increase_num_scheduled_tokens(self, num_new_tokens: int):
        self.num_scheduled_tokens += num_new_tokens

    def increase_num_stored_blocks(self, num_new_blocks: int):
        """Increase the number of stored blocks for the current request
        This function will be called when processing the cached requests.
        """
        self.num_stored_blocks += num_new_blocks

    def update_block_ids(
        self,
        new_block_ids: list[int],
    ):
        """Update the block ids for the current request
        This function will be called when processing the cached requests.
        """
        self.allocated_block_ids.extend(new_block_ids)

    ####
    # For debugging
    ####
    def __repr__(self) -> str:
        return (
            f"LMCacheMPRequestTracker(request_id={self.request_id}, "
            f"num_tokens={len(self.all_token_ids)}, "
            f"num_block_hashes={len(self.block_hashes)}, "
            f"num_allocated_blocks={len(self.allocated_block_ids)}, "
            f"num_stored_blocks={self.num_stored_blocks}, "
            f"vllm_hit_blocks={self.num_vllm_hit_blocks}, "
            f"lmcache_hit_blocks={self.num_lmcache_hit_blocks}, "
            f"state={self.state})"
        )

    def __str__(self) -> str:
        return self.__repr__()

all_token_ids instance-attribute

all_token_ids: ConstantList[int] = all_token_ids

allocated_block_ids class-attribute instance-attribute

allocated_block_ids: list[int] = []

block_hashes instance-attribute

block_hashes: ConstantList[BlockHash] = ConstantList(
    block_hashes
)

num_lmcache_hit_blocks class-attribute instance-attribute

num_lmcache_hit_blocks: int = 0

num_scheduled_tokens class-attribute instance-attribute

num_scheduled_tokens: int = 0

num_stored_blocks class-attribute instance-attribute

num_stored_blocks: int = 0

num_vllm_hit_blocks class-attribute instance-attribute

num_vllm_hit_blocks: int = 0

request_id instance-attribute

request_id: str = request_id

state class-attribute instance-attribute

__init__

__init__(request: Request)
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def __init__(self, request: "Request"):
    self.request_id = request.request_id
    self.all_token_ids = request.all_token_ids
    self.block_hashes = ConstantList(request.block_hashes)
    self.allocated_block_ids = []
    self.num_stored_blocks = 0
    self.num_vllm_hit_blocks = 0
    self.num_lmcache_hit_blocks = 0
    self.state = LMCacheMPRequestState.PREFETCHING

__repr__

__repr__() -> str
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def __repr__(self) -> str:
    return (
        f"LMCacheMPRequestTracker(request_id={self.request_id}, "
        f"num_tokens={len(self.all_token_ids)}, "
        f"num_block_hashes={len(self.block_hashes)}, "
        f"num_allocated_blocks={len(self.allocated_block_ids)}, "
        f"num_stored_blocks={self.num_stored_blocks}, "
        f"vllm_hit_blocks={self.num_vllm_hit_blocks}, "
        f"lmcache_hit_blocks={self.num_lmcache_hit_blocks}, "
        f"state={self.state})"
    )

__str__

__str__() -> str
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def __str__(self) -> str:
    return self.__repr__()

increase_num_scheduled_tokens

increase_num_scheduled_tokens(num_new_tokens: int)
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def increase_num_scheduled_tokens(self, num_new_tokens: int):
    self.num_scheduled_tokens += num_new_tokens

increase_num_stored_blocks

increase_num_stored_blocks(num_new_blocks: int)

Increase the number of stored blocks for the current request This function will be called when processing the cached requests.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def increase_num_stored_blocks(self, num_new_blocks: int):
    """Increase the number of stored blocks for the current request
    This function will be called when processing the cached requests.
    """
    self.num_stored_blocks += num_new_blocks

is_ready_for_retrieving

is_ready_for_retrieving() -> bool

Check whether the current request is ready for retrieving, will be used in process_loading_requests

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def is_ready_for_retrieving(self) -> bool:
    """Check whether the current request is ready for retrieving,
    will be used in process_loading_requests"""
    return (
        self.state == LMCacheMPRequestState.WAITING_FOR_LOAD
        and self.needs_retrieve()
    )

needs_retrieve

needs_retrieve() -> bool

Check whether the current request needs retrieve, will be used update_stage_after_alloc

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def needs_retrieve(self) -> bool:
    """Check whether the current request needs retrieve, will be used
    update_stage_after_alloc"""
    return (
        self.num_lmcache_hit_blocks > self.num_vllm_hit_blocks
        and self.state != LMCacheMPRequestState.READY
    )

update_block_ids

update_block_ids(new_block_ids: list[int])

Update the block ids for the current request This function will be called when processing the cached requests.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def update_block_ids(
    self,
    new_block_ids: list[int],
):
    """Update the block ids for the current request
    This function will be called when processing the cached requests.
    """
    self.allocated_block_ids.extend(new_block_ids)

convert_block_hashes_to_bytes

convert_block_hashes_to_bytes(
    block_hashes: list[BlockHash],
) -> list[bytes]
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def convert_block_hashes_to_bytes(
    block_hashes: list["BlockHash"],
) -> list[bytes]:
    return cast(list[bytes], block_hashes)

create_scheduler_adapter

create_scheduler_adapter(
    server_url: str,
    zmq_context: Context,
    vllm_config: VllmConfig,
) -> LMCacheMPSchedulerAdapter
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def create_scheduler_adapter(
    server_url: str, zmq_context: zmq.Context, vllm_config: VllmConfig
) -> LMCacheMPSchedulerAdapter:
    world_size, kv_rank = extract_world_size_and_kv_rank(
        vllm_config.parallel_config.world_size,
        vllm_config.parallel_config.rank,
        vllm_config,
    )
    return LMCacheMPSchedulerAdapter(
        server_url,
        zmq_context,
        vllm_config.model_config.model,
        world_size,
        kv_rank,
        vllm_config.cache_config.block_size,
    )

create_worker_adapter

create_worker_adapter(
    server_url: str,
    zmq_context: Context,
    vllm_config: VllmConfig,
) -> LMCacheMPWorkerAdapter
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def create_worker_adapter(
    server_url: str, zmq_context: zmq.Context, vllm_config: VllmConfig
) -> LMCacheMPWorkerAdapter:
    world_size, kv_rank = extract_world_size_and_kv_rank(
        vllm_config.parallel_config.world_size,
        vllm_config.parallel_config.rank,
        vllm_config,
    )
    return LMCacheMPWorkerAdapter(
        server_url,
        zmq_context,
        vllm_config.model_config.model,
        world_size,
        kv_rank,
        vllm_config.cache_config.block_size,
    )

extract_world_size_and_kv_rank

extract_world_size_and_kv_rank(
    world_size: int, rank: int, vllm_config: VllmConfig
) -> tuple[int, int]

Convert the rank for the MLA.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def extract_world_size_and_kv_rank(
    world_size: int,
    rank: int,
    vllm_config: VllmConfig,
) -> tuple[int, int]:
    """
    Convert the rank for the MLA.
    """
    use_mla = mla_enabled(vllm_config.model_config)
    if not use_mla:
        return world_size, rank
    else:
        # Tensor parallel does not change the KV caches for MLA models.
        # So we need to "exclude" the effect of TP on rank and world size
        tp_size = vllm_config.parallel_config.tensor_parallel_size
        # vLLM constructs TP groups first, and then construct other
        # parallel groups on top of TP groups.
        # for example, TP=4, PP=2,
        # TP group: [0, 1, 2, 3], [4, 5, 6, 7]
        # PP group: [0, 4], [1, 5], [2, 6], [3, 7]
        # So we can "exclude" the effect of TP by rank // tp_size.
        return world_size // tp_size, rank // tp_size

reformat_block_ids

reformat_block_ids(
    block_ids: tuple[list[int], ...] | None,
) -> list[int]
Source code in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_mp_connector.py
def reformat_block_ids(block_ids: tuple[list[int], ...] | None) -> list[int]:
    if block_ids is None:
        return []
    assert isinstance(block_ids, tuple), (
        f"Expected block_ids to be a tuple of lists, but got {type(block_ids)}"
    )

    if len(block_ids) > 1:
        raise RuntimeError(
            "LMCacheMPConnector only works without hybrid kv cache manager. "
            "Please pass --disable-hybrid-kv-cache-manager when starting vllm"
        )

    return block_ids[0]