Skip to content

vllm.distributed.ec_transfer.ec_connector.shared_storage_connector

logger module-attribute

logger = init_logger(__name__)

ECSharedStorageConnector

Bases: ECConnectorBase

Source code in vllm/distributed/ec_transfer/ec_connector/shared_storage_connector.py
class ECSharedStorageConnector(ECConnectorBase):
    # NOTE: This is Simple debug implementation of the EC connector.
    # It save / load the EC cache to / from the disk.

    def __init__(self, vllm_config: "VllmConfig", role: ECConnectorRole):
        super().__init__(vllm_config=vllm_config, role=role)
        # req_id -> index
        self._mm_datas_need_loads: dict[str, int] = {}
        transfer_config = vllm_config.ec_transfer_config
        if transfer_config is not None:
            self._storage_path = transfer_config.get_from_extra_config(
                "shared_storage_path", "/tmp"
            )
            logger.debug(transfer_config)
            logger.debug("Shared storage path is %s", self._storage_path)
        else:
            raise ValueError("ec_transfer_config must be set for ECConnectorBase")

    def start_load_caches(self, encoder_cache, **kwargs) -> None:
        """
        Start loading the cache from the connector into vLLM's encoder cache.

        This method loads the encoder cache based on metadata provided by the scheduler.
        It is called before `_gather_mm_embeddings` for the EC Connector. For EC,
        the `encoder_cache` and `mm_hash` are stored in `kwargs`.

        Args:
            encoder_cache (dict[str, torch.Tensor]): A dictionary mapping multimodal
                data hashes (`mm_hash`) to encoder cache tensors.
            kwargs (dict): Additional keyword arguments for the connector.
        """

        # Get the metadata
        metadata: ECConnectorMetadata = self._get_connector_metadata()
        assert isinstance(metadata, ECSharedStorageConnectorMetadata)
        assert encoder_cache is not None
        if metadata is None:
            logger.warning(
                (
                    "In connector.start_load_caches, ",
                    "but the connector metadata is None",
                )
            )
            return
        # Load the EC for each mm data
        for mm_data in metadata.mm_datas:
            if mm_data.mm_hash in encoder_cache:
                continue
            filename = self._generate_filename_debug(mm_data.mm_hash)
            ec_cache = safetensors.torch.load_file(filename)["ec_cache"].cuda()
            encoder_cache[mm_data.mm_hash] = ec_cache
            logger.debug("Success load encoder cache for hash %s", mm_data.mm_hash)

    def save_caches(self, encoder_cache, mm_hash, **kwargs) -> None:
        """
        Save the encoder cache to the connector.

        This method saves the encoder cache from the worker's local storage
        to shared storage or another external connector.

        Args:
            encoder_cache (dict[str, torch.Tensor]): A dictionary mapping multimodal
                data hashes (`mm_hash`) to encoder cache tensors.
            mm_hash (str): The hash of the multimodal data whose cache is being saved.
            kwargs (dict): Additional keyword arguments for the connector.
        """
        # Return if it is PD Instance
        if not self.is_producer:
            return
        filename = self._generate_filename_debug(mm_hash)
        ec_cache = encoder_cache[mm_hash]
        tensors = {"ec_cache": ec_cache.detach().cpu()}
        safetensors.torch.save_file(tensors, filename)
        logger.debug("Save cache successful for mm_hash %s", mm_hash)

    def has_caches(
        self,
        request: "Request",
    ) -> list[bool]:
        """
        Check if cache exist externally for each mm_data of request

        Args:
            request (Request): the request object.

        Returns:
            List of bool indicate that ith mm_data exist in cache or not
        """
        result = []
        for feature in request.mm_features:
            result.append(self._found_match_for_mm_data(feature.identifier))
        return result

    def update_state_after_alloc(
        self,
        request: "Request",
        index: int,
    ) -> None:
        """
        Update ECConnector state after encoder cache allocation.
        """
        mm_hash = request.mm_features[index].identifier
        num_encoder_token = request.get_num_encoder_tokens(index)
        # Insert mm_hash only if this block has not been recorded yet.
        self._mm_datas_need_loads[mm_hash] = num_encoder_token

    def build_connector_meta(
        self,
        scheduler_output: SchedulerOutput,
    ) -> ECConnectorMetadata:
        """Build the connector metadata for this step.

        This function should NOT modify any fields in the scheduler_output.
        Also, calling this function will reset the state of the connector.
        This only build for load mm_data only
        Args:
            scheduler_output (SchedulerOutput): the scheduler output object.
        """
        meta = ECSharedStorageConnectorMetadata()
        for mm_hash, num_encoder_token in self._mm_datas_need_loads.items():
            meta.add_mm_data(MMMeta.make_meta(mm_hash, num_encoder_token))
        self._mm_datas_need_loads.clear()
        return meta

    # ==============================
    # Helper functions
    # ==============================

    def _found_match_for_mm_data(self, mm_hash) -> bool:
        """Check if the cache is hit for the request."""
        filename = self._generate_filename_debug(mm_hash)
        return os.path.exists(filename)

    def _generate_foldername_debug(
        self,
        mm_hash: str,
        create_folder: bool = True,  # <- now defaults to True
    ) -> str:
        """
        Return the folder in which the cache for this mm_hash lives.
        If `create_folder` is True (default) the directory is created
        recursively the first time it is needed.
        """
        foldername = os.path.join(self._storage_path, mm_hash)
        if create_folder:
            os.makedirs(foldername, exist_ok=True)
        return foldername

    def _generate_filename_debug(self, mm_hash: str) -> str:
        """
        Return the full path of the safetensors file for this mm_hash.
        Ensures the parent directory exists because
        `_generate_foldername_debug` is called with its default
        (`create_folder=True`).
        """
        foldername = self._generate_foldername_debug(mm_hash)  # <- folder auto-created
        return os.path.join(foldername, "encoder_cache.safetensors")

_mm_datas_need_loads instance-attribute

_mm_datas_need_loads: dict[str, int] = {}

_storage_path instance-attribute

_storage_path = get_from_extra_config(
    "shared_storage_path", "/tmp"
)

__init__

__init__(vllm_config: VllmConfig, role: ECConnectorRole)
Source code in vllm/distributed/ec_transfer/ec_connector/shared_storage_connector.py
def __init__(self, vllm_config: "VllmConfig", role: ECConnectorRole):
    super().__init__(vllm_config=vllm_config, role=role)
    # req_id -> index
    self._mm_datas_need_loads: dict[str, int] = {}
    transfer_config = vllm_config.ec_transfer_config
    if transfer_config is not None:
        self._storage_path = transfer_config.get_from_extra_config(
            "shared_storage_path", "/tmp"
        )
        logger.debug(transfer_config)
        logger.debug("Shared storage path is %s", self._storage_path)
    else:
        raise ValueError("ec_transfer_config must be set for ECConnectorBase")

_found_match_for_mm_data

_found_match_for_mm_data(mm_hash) -> bool

Check if the cache is hit for the request.

Source code in vllm/distributed/ec_transfer/ec_connector/shared_storage_connector.py
def _found_match_for_mm_data(self, mm_hash) -> bool:
    """Check if the cache is hit for the request."""
    filename = self._generate_filename_debug(mm_hash)
    return os.path.exists(filename)

_generate_filename_debug

_generate_filename_debug(mm_hash: str) -> str

Return the full path of the safetensors file for this mm_hash. Ensures the parent directory exists because _generate_foldername_debug is called with its default (create_folder=True).

Source code in vllm/distributed/ec_transfer/ec_connector/shared_storage_connector.py
def _generate_filename_debug(self, mm_hash: str) -> str:
    """
    Return the full path of the safetensors file for this mm_hash.
    Ensures the parent directory exists because
    `_generate_foldername_debug` is called with its default
    (`create_folder=True`).
    """
    foldername = self._generate_foldername_debug(mm_hash)  # <- folder auto-created
    return os.path.join(foldername, "encoder_cache.safetensors")

_generate_foldername_debug

_generate_foldername_debug(
    mm_hash: str, create_folder: bool = True
) -> str

Return the folder in which the cache for this mm_hash lives. If create_folder is True (default) the directory is created recursively the first time it is needed.

Source code in vllm/distributed/ec_transfer/ec_connector/shared_storage_connector.py
def _generate_foldername_debug(
    self,
    mm_hash: str,
    create_folder: bool = True,  # <- now defaults to True
) -> str:
    """
    Return the folder in which the cache for this mm_hash lives.
    If `create_folder` is True (default) the directory is created
    recursively the first time it is needed.
    """
    foldername = os.path.join(self._storage_path, mm_hash)
    if create_folder:
        os.makedirs(foldername, exist_ok=True)
    return foldername

build_connector_meta

build_connector_meta(
    scheduler_output: SchedulerOutput,
) -> ECConnectorMetadata

Build the connector metadata for this step.

This function should NOT modify any fields in the scheduler_output. Also, calling this function will reset the state of the connector. This only build for load mm_data only Args: scheduler_output (SchedulerOutput): the scheduler output object.

Source code in vllm/distributed/ec_transfer/ec_connector/shared_storage_connector.py
def build_connector_meta(
    self,
    scheduler_output: SchedulerOutput,
) -> ECConnectorMetadata:
    """Build the connector metadata for this step.

    This function should NOT modify any fields in the scheduler_output.
    Also, calling this function will reset the state of the connector.
    This only build for load mm_data only
    Args:
        scheduler_output (SchedulerOutput): the scheduler output object.
    """
    meta = ECSharedStorageConnectorMetadata()
    for mm_hash, num_encoder_token in self._mm_datas_need_loads.items():
        meta.add_mm_data(MMMeta.make_meta(mm_hash, num_encoder_token))
    self._mm_datas_need_loads.clear()
    return meta

has_caches

has_caches(request: Request) -> list[bool]

Check if cache exist externally for each mm_data of request

Parameters:

Name Type Description Default
request Request

the request object.

required

Returns:

Type Description
list[bool]

List of bool indicate that ith mm_data exist in cache or not

Source code in vllm/distributed/ec_transfer/ec_connector/shared_storage_connector.py
def has_caches(
    self,
    request: "Request",
) -> list[bool]:
    """
    Check if cache exist externally for each mm_data of request

    Args:
        request (Request): the request object.

    Returns:
        List of bool indicate that ith mm_data exist in cache or not
    """
    result = []
    for feature in request.mm_features:
        result.append(self._found_match_for_mm_data(feature.identifier))
    return result

save_caches

save_caches(encoder_cache, mm_hash, **kwargs) -> None

Save the encoder cache to the connector.

This method saves the encoder cache from the worker's local storage to shared storage or another external connector.

Parameters:

Name Type Description Default
encoder_cache dict[str, Tensor]

A dictionary mapping multimodal data hashes (mm_hash) to encoder cache tensors.

required
mm_hash str

The hash of the multimodal data whose cache is being saved.

required
kwargs dict

Additional keyword arguments for the connector.

{}
Source code in vllm/distributed/ec_transfer/ec_connector/shared_storage_connector.py
def save_caches(self, encoder_cache, mm_hash, **kwargs) -> None:
    """
    Save the encoder cache to the connector.

    This method saves the encoder cache from the worker's local storage
    to shared storage or another external connector.

    Args:
        encoder_cache (dict[str, torch.Tensor]): A dictionary mapping multimodal
            data hashes (`mm_hash`) to encoder cache tensors.
        mm_hash (str): The hash of the multimodal data whose cache is being saved.
        kwargs (dict): Additional keyword arguments for the connector.
    """
    # Return if it is PD Instance
    if not self.is_producer:
        return
    filename = self._generate_filename_debug(mm_hash)
    ec_cache = encoder_cache[mm_hash]
    tensors = {"ec_cache": ec_cache.detach().cpu()}
    safetensors.torch.save_file(tensors, filename)
    logger.debug("Save cache successful for mm_hash %s", mm_hash)

start_load_caches

start_load_caches(encoder_cache, **kwargs) -> None

Start loading the cache from the connector into vLLM's encoder cache.

This method loads the encoder cache based on metadata provided by the scheduler. It is called before _gather_mm_embeddings for the EC Connector. For EC, the encoder_cache and mm_hash are stored in kwargs.

Parameters:

Name Type Description Default
encoder_cache dict[str, Tensor]

A dictionary mapping multimodal data hashes (mm_hash) to encoder cache tensors.

required
kwargs dict

Additional keyword arguments for the connector.

{}
Source code in vllm/distributed/ec_transfer/ec_connector/shared_storage_connector.py
def start_load_caches(self, encoder_cache, **kwargs) -> None:
    """
    Start loading the cache from the connector into vLLM's encoder cache.

    This method loads the encoder cache based on metadata provided by the scheduler.
    It is called before `_gather_mm_embeddings` for the EC Connector. For EC,
    the `encoder_cache` and `mm_hash` are stored in `kwargs`.

    Args:
        encoder_cache (dict[str, torch.Tensor]): A dictionary mapping multimodal
            data hashes (`mm_hash`) to encoder cache tensors.
        kwargs (dict): Additional keyword arguments for the connector.
    """

    # Get the metadata
    metadata: ECConnectorMetadata = self._get_connector_metadata()
    assert isinstance(metadata, ECSharedStorageConnectorMetadata)
    assert encoder_cache is not None
    if metadata is None:
        logger.warning(
            (
                "In connector.start_load_caches, ",
                "but the connector metadata is None",
            )
        )
        return
    # Load the EC for each mm data
    for mm_data in metadata.mm_datas:
        if mm_data.mm_hash in encoder_cache:
            continue
        filename = self._generate_filename_debug(mm_data.mm_hash)
        ec_cache = safetensors.torch.load_file(filename)["ec_cache"].cuda()
        encoder_cache[mm_data.mm_hash] = ec_cache
        logger.debug("Success load encoder cache for hash %s", mm_data.mm_hash)

update_state_after_alloc

update_state_after_alloc(
    request: Request, index: int
) -> None

Update ECConnector state after encoder cache allocation.

Source code in vllm/distributed/ec_transfer/ec_connector/shared_storage_connector.py
def update_state_after_alloc(
    self,
    request: "Request",
    index: int,
) -> None:
    """
    Update ECConnector state after encoder cache allocation.
    """
    mm_hash = request.mm_features[index].identifier
    num_encoder_token = request.get_num_encoder_tokens(index)
    # Insert mm_hash only if this block has not been recorded yet.
    self._mm_datas_need_loads[mm_hash] = num_encoder_token

ECSharedStorageConnectorMetadata dataclass

Bases: ECConnectorMetadata

Source code in vllm/distributed/ec_transfer/ec_connector/shared_storage_connector.py
@dataclass
class ECSharedStorageConnectorMetadata(ECConnectorMetadata):
    mm_datas: list[MMMeta]

    def __init__(self):
        self.mm_datas = []

    def add_mm_data(self, mm_data: MMMeta):
        self.mm_datas.append(mm_data)

mm_datas instance-attribute

mm_datas: list[MMMeta] = []

__init__

__init__()
Source code in vllm/distributed/ec_transfer/ec_connector/shared_storage_connector.py
def __init__(self):
    self.mm_datas = []

add_mm_data

add_mm_data(mm_data: MMMeta)
Source code in vllm/distributed/ec_transfer/ec_connector/shared_storage_connector.py
def add_mm_data(self, mm_data: MMMeta):
    self.mm_datas.append(mm_data)

MMMeta dataclass

Source code in vllm/distributed/ec_transfer/ec_connector/shared_storage_connector.py
@dataclass
class MMMeta:
    mm_hash: str
    num_token: int

    @staticmethod
    def make_meta(mm_hash, num_token) -> "MMMeta":
        return MMMeta(mm_hash=mm_hash, num_token=num_token)

mm_hash instance-attribute

mm_hash: str

num_token instance-attribute

num_token: int

__init__

__init__(mm_hash: str, num_token: int) -> None

make_meta staticmethod

make_meta(mm_hash, num_token) -> MMMeta
Source code in vllm/distributed/ec_transfer/ec_connector/shared_storage_connector.py
@staticmethod
def make_meta(mm_hash, num_token) -> "MMMeta":
    return MMMeta(mm_hash=mm_hash, num_token=num_token)