Skip to content

vllm.v1.kv_offload.tiering.fs.manager

FileSystemTierManager: Pure-Python file system secondary tier for KV cache offloading.

Store path

Data is written to a temp file () via os.write, then os.replace'd to the final path (without .tmp).

Load path

Data is read from the block file directly via os.readv into the provided memoryview slice.

_r//_g/.bin

(hash-based subdirectories to limit directory fan-out)

FileSystemTierManager

Bases: SecondaryTierManager

Pure-Python disk-backed secondary tier.

Read-priority threads service load jobs preferentially; write-priority threads service store jobs preferentially. Both groups can drain either queue, so neither starves.

submit_store / submit_load are non-blocking: they enqueue tasks and return. get_finished() polls job completion and returns completed JobResults.

Source code in vllm/v1/kv_offload/tiering/fs/manager.py
class FileSystemTierManager(SecondaryTierManager):
    """
    Pure-Python disk-backed secondary tier.

    Read-priority threads service load jobs preferentially; write-priority
    threads service store jobs preferentially.  Both groups can drain either
    queue, so neither starves.

    submit_store / submit_load are non-blocking: they enqueue tasks and return.
    get_finished() polls job completion and returns completed JobResults.

    """

    def __init__(
        self,
        offloading_spec: "OffloadingSpec",
        primary_kv_view: memoryview,
        tier_type: str,
        root_dir: str,
        n_read_threads: int = 16,
        n_write_threads: int = 16,
    ):
        """
        Args:
            offloading_spec: contains the vllm_config, kv_cache_config
                and block_size_factor.
            primary_kv_view: Memoryview of the primary tier's CPU KV cache.
            tier_type: Tier type identifier, set by SecondaryTierFactory.
            root_dir: Root directory for block files.
            n_read_threads: Number of read-priority I/O threads.
            n_write_threads: Number of write-priority I/O threads.
        """
        super().__init__(offloading_spec, primary_kv_view, tier_type)

        # Extract block size from primary view
        assert primary_kv_view.strides is not None, (
            "primary_kv_view.strides cannot be None"
        )
        self._block_size: int = primary_kv_view.strides[0]

        # Create file mapper
        self.file_mapper = FileMapper.from_offloading_spec(
            root_dir=root_dir,
            offloading_spec=offloading_spec,
            gpu_blocks_per_file=offloading_spec.block_size_factor,
        )

        # Write config file
        config_path = self.file_mapper.get_config_file_path()
        os.makedirs(os.path.dirname(config_path), exist_ok=True)
        if not os.path.exists(config_path):
            with open(config_path, "w") as f:
                json.dump(
                    self.file_mapper.get_run_config(), f, indent=2, sort_keys=True
                )

        self._pool = DualQueueThreadPool(
            n_read_threads,
            n_write_threads,
            thread_name_prefix="vllm_kv_py_fs",
        )

    def lookup(
        self, key: OffloadKey, req_context: ReqContext | None = None
    ) -> bool | None:
        return os.path.exists(self.file_mapper.get_file_name(key))

    def submit_store(self, job_metadata: JobMetadata) -> None:
        tasks = (
            functools.partial(
                store_block,
                self.file_mapper.get_file_name(key),
                self._primary_kv_view,
                int(bid) * self._block_size,
                self._block_size,
            )
            for key, bid in zip(job_metadata.keys, job_metadata.block_ids)
        )
        self._pool.enqueue_store(job_metadata.job_id, len(job_metadata.keys), tasks)

    def submit_load(self, job_metadata: JobMetadata) -> None:
        tasks = (
            functools.partial(
                load_block,
                self.file_mapper.get_file_name(key),
                self._primary_kv_view,
                int(bid) * self._block_size,
                self._block_size,
            )
            for key, bid in zip(job_metadata.keys, job_metadata.block_ids)
        )
        self._pool.enqueue_load(job_metadata.job_id, len(job_metadata.keys), tasks)

    def get_finished(self) -> Iterable[JobResult]:
        """
        Collect completed jobs from the finished-jobs queue.
        """
        return (
            JobResult(job_id=job_id, success=success)
            for job_id, success in self._pool.get_finished()
        )

    def shutdown(self) -> None:
        """
        Release resources held by this tier.

        Shuts down the thread pool, clearing pending tasks and waiting for
        active threads to complete.
        """
        self._pool.shutdown(wait=True)

__init__

__init__(
    offloading_spec: OffloadingSpec,
    primary_kv_view: memoryview,
    tier_type: str,
    root_dir: str,
    n_read_threads: int = 16,
    n_write_threads: int = 16,
)

Parameters:

Name Type Description Default
offloading_spec OffloadingSpec

contains the vllm_config, kv_cache_config and block_size_factor.

required
primary_kv_view memoryview

Memoryview of the primary tier's CPU KV cache.

required
tier_type str

Tier type identifier, set by SecondaryTierFactory.

required
root_dir str

Root directory for block files.

required
n_read_threads int

Number of read-priority I/O threads.

16
n_write_threads int

Number of write-priority I/O threads.

16
Source code in vllm/v1/kv_offload/tiering/fs/manager.py
def __init__(
    self,
    offloading_spec: "OffloadingSpec",
    primary_kv_view: memoryview,
    tier_type: str,
    root_dir: str,
    n_read_threads: int = 16,
    n_write_threads: int = 16,
):
    """
    Args:
        offloading_spec: contains the vllm_config, kv_cache_config
            and block_size_factor.
        primary_kv_view: Memoryview of the primary tier's CPU KV cache.
        tier_type: Tier type identifier, set by SecondaryTierFactory.
        root_dir: Root directory for block files.
        n_read_threads: Number of read-priority I/O threads.
        n_write_threads: Number of write-priority I/O threads.
    """
    super().__init__(offloading_spec, primary_kv_view, tier_type)

    # Extract block size from primary view
    assert primary_kv_view.strides is not None, (
        "primary_kv_view.strides cannot be None"
    )
    self._block_size: int = primary_kv_view.strides[0]

    # Create file mapper
    self.file_mapper = FileMapper.from_offloading_spec(
        root_dir=root_dir,
        offloading_spec=offloading_spec,
        gpu_blocks_per_file=offloading_spec.block_size_factor,
    )

    # Write config file
    config_path = self.file_mapper.get_config_file_path()
    os.makedirs(os.path.dirname(config_path), exist_ok=True)
    if not os.path.exists(config_path):
        with open(config_path, "w") as f:
            json.dump(
                self.file_mapper.get_run_config(), f, indent=2, sort_keys=True
            )

    self._pool = DualQueueThreadPool(
        n_read_threads,
        n_write_threads,
        thread_name_prefix="vllm_kv_py_fs",
    )

get_finished

get_finished() -> Iterable[JobResult]

Collect completed jobs from the finished-jobs queue.

Source code in vllm/v1/kv_offload/tiering/fs/manager.py
def get_finished(self) -> Iterable[JobResult]:
    """
    Collect completed jobs from the finished-jobs queue.
    """
    return (
        JobResult(job_id=job_id, success=success)
        for job_id, success in self._pool.get_finished()
    )

shutdown

shutdown() -> None

Release resources held by this tier.

Shuts down the thread pool, clearing pending tasks and waiting for active threads to complete.

Source code in vllm/v1/kv_offload/tiering/fs/manager.py
def shutdown(self) -> None:
    """
    Release resources held by this tier.

    Shuts down the thread pool, clearing pending tasks and waiting for
    active threads to complete.
    """
    self._pool.shutdown(wait=True)