Skip to content

vllm.distributed.kv_transfer.kv_connector.v1.moriio.moriio_common

HandshakeError

Bases: MoRIIOError

Exception raised when handshake fails.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
class HandshakeError(MoRIIOError):
    """Exception raised when handshake fails."""

    pass

LayerTransferPlan dataclass

Plan for transferring a single layer.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
@dataclass
class LayerTransferPlan:
    """Plan for transferring a single layer."""

    request_id: str
    layer_name: str
    sess_idx: int
    transfer_local_offsets: list[int]
    transfer_remote_offsets: list[int]
    transfer_sizes: list[int]
    use_batch: bool = True

MoRIIOConstants

Constants for MoRIIO connector.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
class MoRIIOConstants:
    """Constants for MoRIIO connector."""

    # ZMQ message types
    GET_META_MSG = b"get_meta_msg"
    POP_DONE_RECV = b"pop_done_recv"
    OVER = b"OVER"
    COMPLETION_PREFIX = "cmpl"

    PING_INTERVAL = 5
    MAX_PING_RETRIES = 100
    DEFAULT_HANDSHAKE_PORT = "6301"
    DEFAULT_NOTIFY_PORT = "61005"

    VLLM_MORI_READ_ABORT_REQUEST_TIMEOUT = 3600

MoRIIOError

Bases: Exception

Base exception for MoRIIO operations.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
class MoRIIOError(Exception):
    """Base exception for MoRIIO operations."""

    pass

RemoteAllocInfo dataclass

Information about remote block allocation.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
@dataclass
class RemoteAllocInfo:
    """Information about remote block allocation."""

    block_ids: list[int]
    writes_done: int = 0
    decode_dp_rank: int = 0
    transfer_offset: tuple[list[int], list[int], list[int]] | None = None

ReqMeta dataclass

Metadata for a single request.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
@dataclass
class ReqMeta:
    """Metadata for a single request."""

    local_block_ids: list[int]
    remote_block_ids: list[int]
    remote_host: str
    remote_port: int
    remote_handshake_port: int
    remote_notify_port: int
    remote_engine_id: str
    tp_size: int
    remote_dp_size: int

RoleManager

Manages role state across the connector.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
class RoleManager:
    """Manages role state across the connector."""

    _instance: "RoleManager | None" = None
    _lock = threading.Lock()

    def __init__(self) -> None:
        self._role: ROLE = ROLE.NOTINIT

    @classmethod
    def get_instance(cls) -> "RoleManager":
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = cls()
        return cls._instance

    def set_role(self, role: ROLE) -> None:
        """Set the current role."""
        with self._lock:
            self._role = role

    def get_role(self) -> ROLE:
        """Get the current role."""
        return self._role

get_role

get_role() -> ROLE

Get the current role.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
def get_role(self) -> ROLE:
    """Get the current role."""
    return self._role

set_role

set_role(role: ROLE) -> None

Set the current role.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
def set_role(self, role: ROLE) -> None:
    """Set the current role."""
    with self._lock:
        self._role = role

TransferError

Bases: MoRIIOError

Exception raised when transfer fails.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
class TransferError(MoRIIOError):
    """Exception raised when transfer fails."""

    pass

get_role

get_role() -> ROLE

Get the global role.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
def get_role() -> ROLE:
    """Get the global role."""
    return RoleManager.get_instance().get_role()

set_role

set_role(role: ROLE)

Set the global role.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
def set_role(role: ROLE):
    """Set the global role."""
    RoleManager.get_instance().set_role(role)

zmq_ctx

zmq_ctx(socket_type: Any, addr: str) -> Iterator[Socket]

Context manager for a ZMQ socket

Source code in vllm/distributed/kv_transfer/kv_connector/v1/moriio/moriio_common.py
@contextlib.contextmanager
def zmq_ctx(socket_type: Any, addr: str) -> Iterator[zmq.Socket]:
    """Context manager for a ZMQ socket"""

    if socket_type not in (zmq.ROUTER, zmq.REQ, zmq.DEALER):
        raise ValueError(f"Unexpected socket type: {socket_type}")

    ctx: zmq.Context | None = None
    try:
        ctx = zmq.Context()  # type: ignore[attr-defined]
        yield make_zmq_socket(
            ctx=ctx, path=addr, socket_type=socket_type, bind=socket_type == zmq.ROUTER
        )
    finally:
        if ctx is not None:
            ctx.destroy(linger=0)