Skip to content

vllm.distributed.kv_events

logger module-attribute

logger = init_logger(__name__)

AllBlocksCleared

Bases: KVCacheEvent

Source code in vllm/distributed/kv_events.py
class AllBlocksCleared(KVCacheEvent):
    pass

BlockRemoved

Bases: KVCacheEvent

Source code in vllm/distributed/kv_events.py
class BlockRemoved(KVCacheEvent):
    block_hashes: list[int]

block_hashes instance-attribute

block_hashes: list[int]

BlockStored

Bases: KVCacheEvent

Source code in vllm/distributed/kv_events.py
class BlockStored(KVCacheEvent):
    block_hashes: list[int]
    parent_block_hash: Optional[int]
    token_ids: list[int]
    block_size: int
    lora_id: Optional[int]

block_hashes instance-attribute

block_hashes: list[int]

block_size instance-attribute

block_size: int

lora_id instance-attribute

lora_id: Optional[int]

parent_block_hash instance-attribute

parent_block_hash: Optional[int]

token_ids instance-attribute

token_ids: list[int]

EventBatch

Bases: Struct

Source code in vllm/distributed/kv_events.py
class EventBatch(
        msgspec.Struct,
        array_like=True,  # type: ignore[call-arg]
        omit_defaults=True,  # type: ignore[call-arg]
        gc=False,  # type: ignore[call-arg]
):
    ts: float
    events: list[Any]
    data_parallel_rank: Optional[int] = None

data_parallel_rank class-attribute instance-attribute

data_parallel_rank: Optional[int] = None

events instance-attribute

events: list[Any]

ts instance-attribute

ts: float

EventPublisher

Bases: ABC

Lightweight publisher for EventBatch batches with data parallelism support.

In data parallel setups, each DP rank runs its own EventPublisher instance to avoid duplicate events and ensure proper event attribution:

  • Each DP rank creates a separate publisher
  • Publishers automatically annotate events with their data_parallel_rank
  • This allows consumers to distinguish events from different DP ranks

The publisher is responsible for adding DP metadata since the scheduler operates independently of DP topology and shouldn't need DP awareness.

Source code in vllm/distributed/kv_events.py
class EventPublisher(ABC):
    """Lightweight publisher for EventBatch batches with data parallelism
    support.

    In data parallel setups, each DP rank runs its own EventPublisher instance
    to avoid duplicate events and ensure proper event attribution:

    - Each DP rank creates a separate publisher
    - Publishers automatically annotate events with their data_parallel_rank
    - This allows consumers to distinguish events from different DP ranks

    The publisher is responsible for adding DP metadata since the scheduler
    operates independently of DP topology and shouldn't need DP awareness.
    """

    def __init__(self, data_parallel_rank: int = 0) -> None:
        self._data_parallel_rank = data_parallel_rank

    @abstractmethod
    def publish(self, events: EventBatch) -> None:
        """Emit events in order.

        Implementations should guarantee at-least-once delivery and
        monotonic ordering (e.g., via sequence numbers).
        """

    @abstractmethod
    def shutdown(self) -> None:
        """Shutdown the publisher."""

_data_parallel_rank instance-attribute

_data_parallel_rank = data_parallel_rank

__init__

__init__(data_parallel_rank: int = 0) -> None
Source code in vllm/distributed/kv_events.py
def __init__(self, data_parallel_rank: int = 0) -> None:
    self._data_parallel_rank = data_parallel_rank

publish abstractmethod

publish(events: EventBatch) -> None

Emit events in order.

Implementations should guarantee at-least-once delivery and monotonic ordering (e.g., via sequence numbers).

Source code in vllm/distributed/kv_events.py
@abstractmethod
def publish(self, events: EventBatch) -> None:
    """Emit events in order.

    Implementations should guarantee at-least-once delivery and
    monotonic ordering (e.g., via sequence numbers).
    """

shutdown abstractmethod

shutdown() -> None

Shutdown the publisher.

Source code in vllm/distributed/kv_events.py
@abstractmethod
def shutdown(self) -> None:
    """Shutdown the publisher."""

EventPublisherFactory

Source code in vllm/distributed/kv_events.py
class EventPublisherFactory:
    _registry: dict[str, Callable[..., EventPublisher]] = {
        "null": NullEventPublisher,
        "zmq": ZmqEventPublisher,
    }

    @classmethod
    def register_publisher(cls, name: str,
                           ctor: Callable[..., EventPublisher]) -> None:
        if name in cls._registry:
            raise KeyError(f"publisher '{name}' already registered")
        cls._registry[name] = ctor

    @classmethod
    def create(cls,
               config: Optional[KVEventsConfig],
               data_parallel_rank: int = 0) -> EventPublisher:
        """Create publisher from a config mapping."""
        if not config:
            return NullEventPublisher()

        config_dict = asdict(config)

        kind = config_dict.pop("publisher", "null")
        config_dict.pop("enable_kv_cache_events")
        try:
            constructor = cls._registry[kind]
        except KeyError as exc:
            raise ValueError(f"Unknown event publisher '{kind}'") from exc
        return constructor(data_parallel_rank=data_parallel_rank,
                           **config_dict)

_registry class-attribute instance-attribute

_registry: dict[str, Callable[..., EventPublisher]] = {
    "null": NullEventPublisher,
    "zmq": ZmqEventPublisher,
}

create classmethod

create(
    config: Optional[KVEventsConfig],
    data_parallel_rank: int = 0,
) -> EventPublisher

Create publisher from a config mapping.

Source code in vllm/distributed/kv_events.py
@classmethod
def create(cls,
           config: Optional[KVEventsConfig],
           data_parallel_rank: int = 0) -> EventPublisher:
    """Create publisher from a config mapping."""
    if not config:
        return NullEventPublisher()

    config_dict = asdict(config)

    kind = config_dict.pop("publisher", "null")
    config_dict.pop("enable_kv_cache_events")
    try:
        constructor = cls._registry[kind]
    except KeyError as exc:
        raise ValueError(f"Unknown event publisher '{kind}'") from exc
    return constructor(data_parallel_rank=data_parallel_rank,
                       **config_dict)

register_publisher classmethod

register_publisher(
    name: str, ctor: Callable[..., EventPublisher]
) -> None
Source code in vllm/distributed/kv_events.py
@classmethod
def register_publisher(cls, name: str,
                       ctor: Callable[..., EventPublisher]) -> None:
    if name in cls._registry:
        raise KeyError(f"publisher '{name}' already registered")
    cls._registry[name] = ctor

KVCacheEvent

Bases: Struct

Base class for all KV cache-related events

Source code in vllm/distributed/kv_events.py
class KVCacheEvent(
        msgspec.Struct,
        array_like=True,  # type: ignore[call-arg]
        omit_defaults=True,  # type: ignore[call-arg]
        gc=False,  # type: ignore[call-arg]
        tag=True):
    """Base class for all KV cache-related events"""

KVEventBatch

Bases: EventBatch

Source code in vllm/distributed/kv_events.py
class KVEventBatch(EventBatch):
    events: list[Union[BlockStored, BlockRemoved, AllBlocksCleared]]

events instance-attribute

NullEventPublisher

Bases: EventPublisher

No-op implementation (default when disabled).

Source code in vllm/distributed/kv_events.py
class NullEventPublisher(EventPublisher):
    """No-op implementation (default when disabled)."""

    def publish(self, events) -> None:
        return

    def shutdown(self) -> None:
        return

publish

publish(events) -> None
Source code in vllm/distributed/kv_events.py
def publish(self, events) -> None:
    return

shutdown

shutdown() -> None
Source code in vllm/distributed/kv_events.py
def shutdown(self) -> None:
    return

ZmqEventPublisher

Bases: EventPublisher

Reliable PUB/ROUTER publisher with an in-memory replay buffer.

Spawns a separate thread to handle publishing from a queue.

Parameters

endpoint: PUB address. Use tcp://*:5557 to bind or tcp://host:5557 to connect. replay_endpoint: Optional ROUTER address for replay requests. When given, subscribers can request missed batches by sending the starting sequence number as an 8-byte big-endian integer. buffer_steps: Number of past batches to keep for replay. hwm: ZeroMQ high-water-mark for PUB socket. max_queue_size: Maximum number of events to buffer in memory. topic: Topic to publish events to.

Source code in vllm/distributed/kv_events.py
class ZmqEventPublisher(EventPublisher):
    """Reliable PUB/ROUTER publisher with an in-memory replay buffer.

    Spawns a separate thread to handle publishing from a queue.

    Parameters
    ----------
    endpoint:
        PUB address. Use ``tcp://*:5557`` to bind or ``tcp://host:5557`` to
        connect.
    replay_endpoint:
        Optional ROUTER address for replay requests. When given, subscribers can
        request missed batches by sending the starting sequence number as an
        8-byte big-endian integer.
    buffer_steps:
        Number of past batches to keep for replay.
    hwm:
        ZeroMQ high-water-mark for PUB socket.
    max_queue_size:
        Maximum number of events to buffer in memory.
    topic:
        Topic to publish events to.
    """
    SHUTDOWN_TIMEOUT: float = 1.0
    END_SEQ = (-1).to_bytes(8, "big", signed=True)

    def __init__(
        self,
        data_parallel_rank: int,
        endpoint: str = "tcp://*:5557",
        replay_endpoint: Optional[str] = None,
        buffer_steps: int = 10_000,
        hwm: int = 100_000,
        max_queue_size: int = 100_000,
        topic: str = "",
    ) -> None:
        # Storage
        super().__init__(data_parallel_rank)
        self._event_queue = Queue[Optional[EventBatch]](maxsize=max_queue_size)
        self._buffer = deque[tuple[int, bytes]](maxlen=buffer_steps)

        # ZMQ sockets
        self._ctx = zmq.Context.instance()
        self._pub: Optional[zmq.Socket] = None
        self._replay: Optional[zmq.Socket] = None
        self._dp_rank = data_parallel_rank

        self._endpoint = self.offset_endpoint_port(endpoint, self._dp_rank)
        self._replay_endpoint = self.offset_endpoint_port(
            replay_endpoint, self._dp_rank)
        self._hwm = hwm
        self._socket_setup()

        # Payload
        self._seq_gen = count()
        self._topic_bytes = topic.encode('utf-8')

        # Thread
        self._running = True
        logger.info("Starting ZMQ publisher thread")

        self._thread = threading.Thread(target=self._publisher_thread,
                                        daemon=True,
                                        name="zmq-publisher")
        self._thread.start()

    def publish(self, events: EventBatch) -> None:
        if not self._running:
            raise RuntimeError("Publisher is closed")
        if events.data_parallel_rank is None:
            events.data_parallel_rank = self._data_parallel_rank
        self._event_queue.put(events)

    def shutdown(self) -> None:
        """Stop the publisher thread and clean up resources."""
        self._running = False
        self._event_queue.put_nowait(None)

        start = time.time()
        pending_items = True
        while pending_items and (time.time() - start < self.SHUTDOWN_TIMEOUT):
            pending_items = not self._event_queue.empty()
            if pending_items:
                time.sleep(0.1)

        if pending_items:
            logger.warning(
                "Warning: Queue still has %s items after %s seconds timeout",
                self._event_queue.qsize(),
                self.SHUTDOWN_TIMEOUT,
            )

        if self._thread.is_alive():
            self._thread.join(timeout=self.SHUTDOWN_TIMEOUT)

        # Clean up ZMQ resources
        try:
            if self._pub is not None:
                self._pub.close(linger=0)
            if self._replay is not None:
                self._replay.close(linger=0)
        finally:
            pass  # Do not terminate context; other sockets may use it

    def _socket_setup(self) -> None:
        """Initialize sockets
        https://pyzmq.readthedocs.io/en/v19.0.0/morethanbindings.html#thread-safety
        """
        if self._pub is None:
            self._pub = self._ctx.socket(zmq.PUB)
            self._pub.set_hwm(self._hwm)
            # Heuristic: bind if wildcard / * present, else connect.
            # bind stable, connect volatile convention
            if (self._endpoint is not None
                    and ("*" in self._endpoint or "::" in self._endpoint
                         or self._endpoint.startswith("ipc://")
                         or self._endpoint.startswith("inproc://"))):
                self._pub.bind(self._endpoint)
            elif self._endpoint is not None:
                self._pub.connect(self._endpoint)

        # Set up replay socket: use ROUTER
        # 1) handles multiple REQ clients (identities)
        # 2) lets us send back one request → many replies (streamed events)
        # 3) works in our non‑blocking poll loop alongside PUB
        if self._replay_endpoint is not None:
            self._replay = self._ctx.socket(zmq.ROUTER)
            self._replay.bind(self._replay_endpoint)

    def _publisher_thread(self) -> None:
        """Background thread that processes the event queue."""
        self._pack = msgspec.msgpack.Encoder()

        assert self._pub is not None  # narrows type for mypy

        while self._running or self._event_queue.qsize() > 0:
            # --- replay (non-critical) ---------------------------------
            if self._replay is not None and self._replay.poll(0):
                try:
                    self._service_replay()
                except Exception as e:
                    logger.exception("Error in replay: %s", e)

            # --- main queue (critical) ---------------------------------
            try:
                event = self._event_queue.get(timeout=0.1)
                if event is None:
                    break  # Sentinel received, exit thread
            except queue.Empty:
                continue

            try:
                seq = next(self._seq_gen)

                payload = self._pack.encode(event)
                seq_bytes = seq.to_bytes(8, "big")
                self._pub.send_multipart(
                    (self._topic_bytes, seq_bytes, payload))

                self._buffer.append((seq, payload))
                self._event_queue.task_done()

            except Exception as e:
                # Publishing failed;  back-off a bit to avoid a tight error loop
                logger.exception("Error in publisher thread: %s", e)
                time.sleep(0.1)

    def _service_replay(self) -> None:
        """If a replay request is waiting, send buffered batches."""
        assert self._replay is not None  # narrows type for mypy

        frame = self._replay.recv_multipart()
        if len(frame) != 3:
            logger.warning("Invalid replay request: %s", frame)
            return
        client_id, _, start_seq_bytes = frame
        start_seq = int.from_bytes(start_seq_bytes, "big")

        for seq, buf in self._buffer:
            if seq >= start_seq:
                # [identity, empty_delim, seq_bytes, payload]
                # (identity, empty_delim) are stripped off by the router
                # receiving payload is (seq_bytes, payload)
                self._replay.send_multipart(
                    (client_id, b"", seq.to_bytes(8, "big"), buf))
        # Send end of sequence marker
        # receiving payload is (-1, b""")
        self._replay.send_multipart((client_id, b"", self.END_SEQ, b""))

    @staticmethod
    def offset_endpoint_port(endpoint: Optional[str],
                             data_parallel_rank: int) -> Optional[str]:
        """Helper function to offset the port in an endpoint by 
            the data parallel rank.

        Args:
            endpoint: The endpoint string 
                (e.g., "tcp://*:5557" or "inproc://cache")
            data_parallel_rank: The data parallel rank to offset by

        Returns:
            The endpoint with the port offset by data_parallel_rank 
                or suffix appended
        """
        # Do nothing if input is None or data_parallel_rank is 0
        if not endpoint or data_parallel_rank == 0:
            return endpoint

        if "inproc" in endpoint:
            return f"{endpoint}_dp{data_parallel_rank}"
        if "tcp" in endpoint:
            if endpoint and ":" in endpoint:
                # Get everything after the last colon (the port)
                last_colon_idx = endpoint.rfind(":")
                base_addr = endpoint[:last_colon_idx]
                base_port = int(endpoint[last_colon_idx + 1:])
                new_port = base_port + data_parallel_rank
                return f"{base_addr}:{new_port}"
            return endpoint
        raise ValueError("Invalid endpoint: must contain 'inproc' or 'tcp'")

END_SEQ class-attribute instance-attribute

END_SEQ = to_bytes(8, 'big', signed=True)

SHUTDOWN_TIMEOUT class-attribute instance-attribute

SHUTDOWN_TIMEOUT: float = 1.0

_buffer instance-attribute

_buffer = deque[tuple[int, bytes]](maxlen=buffer_steps)

_ctx instance-attribute

_ctx = instance()

_dp_rank instance-attribute

_dp_rank = data_parallel_rank

_endpoint instance-attribute

_endpoint = offset_endpoint_port(endpoint, _dp_rank)

_event_queue instance-attribute

_event_queue = Queue[Optional[EventBatch]](
    maxsize=max_queue_size
)

_hwm instance-attribute

_hwm = hwm

_pub instance-attribute

_pub: Optional[Socket] = None

_replay instance-attribute

_replay: Optional[Socket] = None

_replay_endpoint instance-attribute

_replay_endpoint = offset_endpoint_port(
    replay_endpoint, _dp_rank
)

_running instance-attribute

_running = True

_seq_gen instance-attribute

_seq_gen = count()

_thread instance-attribute

_thread = Thread(
    target=_publisher_thread,
    daemon=True,
    name="zmq-publisher",
)

_topic_bytes instance-attribute

_topic_bytes = encode('utf-8')

__init__

__init__(
    data_parallel_rank: int,
    endpoint: str = "tcp://*:5557",
    replay_endpoint: Optional[str] = None,
    buffer_steps: int = 10000,
    hwm: int = 100000,
    max_queue_size: int = 100000,
    topic: str = "",
) -> None
Source code in vllm/distributed/kv_events.py
def __init__(
    self,
    data_parallel_rank: int,
    endpoint: str = "tcp://*:5557",
    replay_endpoint: Optional[str] = None,
    buffer_steps: int = 10_000,
    hwm: int = 100_000,
    max_queue_size: int = 100_000,
    topic: str = "",
) -> None:
    # Storage
    super().__init__(data_parallel_rank)
    self._event_queue = Queue[Optional[EventBatch]](maxsize=max_queue_size)
    self._buffer = deque[tuple[int, bytes]](maxlen=buffer_steps)

    # ZMQ sockets
    self._ctx = zmq.Context.instance()
    self._pub: Optional[zmq.Socket] = None
    self._replay: Optional[zmq.Socket] = None
    self._dp_rank = data_parallel_rank

    self._endpoint = self.offset_endpoint_port(endpoint, self._dp_rank)
    self._replay_endpoint = self.offset_endpoint_port(
        replay_endpoint, self._dp_rank)
    self._hwm = hwm
    self._socket_setup()

    # Payload
    self._seq_gen = count()
    self._topic_bytes = topic.encode('utf-8')

    # Thread
    self._running = True
    logger.info("Starting ZMQ publisher thread")

    self._thread = threading.Thread(target=self._publisher_thread,
                                    daemon=True,
                                    name="zmq-publisher")
    self._thread.start()

_publisher_thread

_publisher_thread() -> None

Background thread that processes the event queue.

Source code in vllm/distributed/kv_events.py
def _publisher_thread(self) -> None:
    """Background thread that processes the event queue."""
    self._pack = msgspec.msgpack.Encoder()

    assert self._pub is not None  # narrows type for mypy

    while self._running or self._event_queue.qsize() > 0:
        # --- replay (non-critical) ---------------------------------
        if self._replay is not None and self._replay.poll(0):
            try:
                self._service_replay()
            except Exception as e:
                logger.exception("Error in replay: %s", e)

        # --- main queue (critical) ---------------------------------
        try:
            event = self._event_queue.get(timeout=0.1)
            if event is None:
                break  # Sentinel received, exit thread
        except queue.Empty:
            continue

        try:
            seq = next(self._seq_gen)

            payload = self._pack.encode(event)
            seq_bytes = seq.to_bytes(8, "big")
            self._pub.send_multipart(
                (self._topic_bytes, seq_bytes, payload))

            self._buffer.append((seq, payload))
            self._event_queue.task_done()

        except Exception as e:
            # Publishing failed;  back-off a bit to avoid a tight error loop
            logger.exception("Error in publisher thread: %s", e)
            time.sleep(0.1)

_service_replay

_service_replay() -> None

If a replay request is waiting, send buffered batches.

Source code in vllm/distributed/kv_events.py
def _service_replay(self) -> None:
    """If a replay request is waiting, send buffered batches."""
    assert self._replay is not None  # narrows type for mypy

    frame = self._replay.recv_multipart()
    if len(frame) != 3:
        logger.warning("Invalid replay request: %s", frame)
        return
    client_id, _, start_seq_bytes = frame
    start_seq = int.from_bytes(start_seq_bytes, "big")

    for seq, buf in self._buffer:
        if seq >= start_seq:
            # [identity, empty_delim, seq_bytes, payload]
            # (identity, empty_delim) are stripped off by the router
            # receiving payload is (seq_bytes, payload)
            self._replay.send_multipart(
                (client_id, b"", seq.to_bytes(8, "big"), buf))
    # Send end of sequence marker
    # receiving payload is (-1, b""")
    self._replay.send_multipart((client_id, b"", self.END_SEQ, b""))

_socket_setup

_socket_setup() -> None

Initialize sockets https://pyzmq.readthedocs.io/en/v19.0.0/morethanbindings.html#thread-safety

Source code in vllm/distributed/kv_events.py
def _socket_setup(self) -> None:
    """Initialize sockets
    https://pyzmq.readthedocs.io/en/v19.0.0/morethanbindings.html#thread-safety
    """
    if self._pub is None:
        self._pub = self._ctx.socket(zmq.PUB)
        self._pub.set_hwm(self._hwm)
        # Heuristic: bind if wildcard / * present, else connect.
        # bind stable, connect volatile convention
        if (self._endpoint is not None
                and ("*" in self._endpoint or "::" in self._endpoint
                     or self._endpoint.startswith("ipc://")
                     or self._endpoint.startswith("inproc://"))):
            self._pub.bind(self._endpoint)
        elif self._endpoint is not None:
            self._pub.connect(self._endpoint)

    # Set up replay socket: use ROUTER
    # 1) handles multiple REQ clients (identities)
    # 2) lets us send back one request → many replies (streamed events)
    # 3) works in our non‑blocking poll loop alongside PUB
    if self._replay_endpoint is not None:
        self._replay = self._ctx.socket(zmq.ROUTER)
        self._replay.bind(self._replay_endpoint)

offset_endpoint_port staticmethod

offset_endpoint_port(
    endpoint: Optional[str], data_parallel_rank: int
) -> Optional[str]

Helper function to offset the port in an endpoint by the data parallel rank.

Parameters:

Name Type Description Default
endpoint Optional[str]

The endpoint string (e.g., "tcp://*:5557" or "inproc://cache")

required
data_parallel_rank int

The data parallel rank to offset by

required

Returns:

Type Description
Optional[str]

The endpoint with the port offset by data_parallel_rank or suffix appended

Source code in vllm/distributed/kv_events.py
@staticmethod
def offset_endpoint_port(endpoint: Optional[str],
                         data_parallel_rank: int) -> Optional[str]:
    """Helper function to offset the port in an endpoint by 
        the data parallel rank.

    Args:
        endpoint: The endpoint string 
            (e.g., "tcp://*:5557" or "inproc://cache")
        data_parallel_rank: The data parallel rank to offset by

    Returns:
        The endpoint with the port offset by data_parallel_rank 
            or suffix appended
    """
    # Do nothing if input is None or data_parallel_rank is 0
    if not endpoint or data_parallel_rank == 0:
        return endpoint

    if "inproc" in endpoint:
        return f"{endpoint}_dp{data_parallel_rank}"
    if "tcp" in endpoint:
        if endpoint and ":" in endpoint:
            # Get everything after the last colon (the port)
            last_colon_idx = endpoint.rfind(":")
            base_addr = endpoint[:last_colon_idx]
            base_port = int(endpoint[last_colon_idx + 1:])
            new_port = base_port + data_parallel_rank
            return f"{base_addr}:{new_port}"
        return endpoint
    raise ValueError("Invalid endpoint: must contain 'inproc' or 'tcp'")

publish

publish(events: EventBatch) -> None
Source code in vllm/distributed/kv_events.py
def publish(self, events: EventBatch) -> None:
    if not self._running:
        raise RuntimeError("Publisher is closed")
    if events.data_parallel_rank is None:
        events.data_parallel_rank = self._data_parallel_rank
    self._event_queue.put(events)

shutdown

shutdown() -> None

Stop the publisher thread and clean up resources.

Source code in vllm/distributed/kv_events.py
def shutdown(self) -> None:
    """Stop the publisher thread and clean up resources."""
    self._running = False
    self._event_queue.put_nowait(None)

    start = time.time()
    pending_items = True
    while pending_items and (time.time() - start < self.SHUTDOWN_TIMEOUT):
        pending_items = not self._event_queue.empty()
        if pending_items:
            time.sleep(0.1)

    if pending_items:
        logger.warning(
            "Warning: Queue still has %s items after %s seconds timeout",
            self._event_queue.qsize(),
            self.SHUTDOWN_TIMEOUT,
        )

    if self._thread.is_alive():
        self._thread.join(timeout=self.SHUTDOWN_TIMEOUT)

    # Clean up ZMQ resources
    try:
        if self._pub is not None:
            self._pub.close(linger=0)
        if self._replay is not None:
            self._replay.close(linger=0)
    finally:
        pass  # Do not terminate context; other sockets may use it