Skip to content

vllm.engine.async_timeout

Timeout

Source code in vllm/engine/async_timeout.py
class Timeout:
    # Internal class, please don't instantiate it directly
    # Use timeout() and timeout_at() public factories instead.
    #
    # Implementation note: `async with timeout()` is preferred
    # over `with timeout()`.
    # While technically the Timeout class implementation
    # doesn't need to be async at all,
    # the `async with` statement explicitly points that
    # the context manager should be used from async function context.
    #
    # This design allows to avoid many silly misusages.
    #
    # TimeoutError is raised immediately when scheduled
    # if the deadline is passed.
    # The purpose is to time out as soon as possible
    # without waiting for the next await expression.

    __slots__ = ("_deadline", "_loop", "_state", "_timeout_handler")

    def __init__(self, deadline: Optional[float],
                 loop: asyncio.AbstractEventLoop) -> None:
        self._loop = loop
        self._state = _State.INIT

        self._timeout_handler = None  # type: Optional[asyncio.Handle]
        if deadline is None:
            self._deadline = None  # type: Optional[float]
        else:
            self.update(deadline)

    async def __aenter__(self) -> "Timeout":
        self._do_enter()
        return self

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Optional[TracebackType],
    ) -> Optional[bool]:
        self._do_exit(exc_type)
        return None

    @property
    def expired(self) -> bool:
        """Is timeout expired during execution?"""
        return self._state == _State.TIMEOUT

    @property
    def deadline(self) -> Optional[float]:
        return self._deadline

    def reject(self) -> None:
        """Reject scheduled timeout if any."""
        # cancel is maybe better name but
        # task.cancel() raises CancelledError in asyncio world.
        if self._state not in (_State.INIT, _State.ENTER):
            raise RuntimeError(f"invalid state {self._state.value}")
        self._reject()

    def _reject(self) -> None:
        if self._timeout_handler is not None:
            self._timeout_handler.cancel()
            self._timeout_handler = None

    def shift(self, delay: float) -> None:
        """Advance timeout on delay seconds.
        The delay can be negative.
        Raise RuntimeError if shift is called when deadline is not scheduled
        """
        deadline = self._deadline
        if deadline is None:
            raise RuntimeError(
                "cannot shift timeout if deadline is not scheduled")
        self.update(deadline + delay)

    def update(self, deadline: float) -> None:
        """Set deadline to absolute value.
        deadline argument points on the time in the same clock system
        as loop.time().
        If new deadline is in the past the timeout is raised immediately.
        Please note: it is not POSIX time but a time with
        undefined starting base, e.g. the time of the system power on.
        """
        if self._state == _State.EXIT:
            raise RuntimeError(
                "cannot reschedule after exit from context manager")
        if self._state == _State.TIMEOUT:
            raise RuntimeError("cannot reschedule expired timeout")
        if self._timeout_handler is not None:
            self._timeout_handler.cancel()
        self._deadline = deadline
        if self._state != _State.INIT:
            self._reschedule()

    def _reschedule(self) -> None:
        assert self._state == _State.ENTER
        deadline = self._deadline
        if deadline is None:
            return

        now = self._loop.time()
        if self._timeout_handler is not None:
            self._timeout_handler.cancel()

        task = asyncio.current_task()
        if deadline <= now:
            self._timeout_handler = self._loop.call_soon(
                self._on_timeout, task)
        else:
            self._timeout_handler = self._loop.call_at(
                deadline, self._on_timeout, task)

    def _do_enter(self) -> None:
        if self._state != _State.INIT:
            raise RuntimeError(f"invalid state {self._state.value}")
        self._state = _State.ENTER
        self._reschedule()

    def _do_exit(self, exc_type: Optional[Type[BaseException]]) -> None:
        if exc_type is asyncio.CancelledError and \
                self._state == _State.TIMEOUT:
            self._timeout_handler = None
            raise asyncio.TimeoutError
        # timeout has not expired
        self._state = _State.EXIT
        self._reject()
        return None

    def _on_timeout(self, task: "Optional[asyncio.Task[Any]]") -> None:
        if task:
            task.cancel()
        self._state = _State.TIMEOUT
        # drop the reference early
        self._timeout_handler = None

__slots__ class-attribute instance-attribute

__slots__ = (
    "_deadline",
    "_loop",
    "_state",
    "_timeout_handler",
)

_deadline instance-attribute

_deadline = None

_loop instance-attribute

_loop = loop

_state instance-attribute

_state = INIT

_timeout_handler instance-attribute

_timeout_handler = None

deadline property

deadline: Optional[float]

expired property

expired: bool

Is timeout expired during execution?

__aenter__ async

__aenter__() -> Timeout
Source code in vllm/engine/async_timeout.py
async def __aenter__(self) -> "Timeout":
    self._do_enter()
    return self

__aexit__ async

__aexit__(
    exc_type: Optional[Type[BaseException]],
    exc_val: Optional[BaseException],
    exc_tb: Optional[TracebackType],
) -> Optional[bool]
Source code in vllm/engine/async_timeout.py
async def __aexit__(
    self,
    exc_type: Optional[Type[BaseException]],
    exc_val: Optional[BaseException],
    exc_tb: Optional[TracebackType],
) -> Optional[bool]:
    self._do_exit(exc_type)
    return None

__init__

__init__(
    deadline: Optional[float], loop: AbstractEventLoop
) -> None
Source code in vllm/engine/async_timeout.py
def __init__(self, deadline: Optional[float],
             loop: asyncio.AbstractEventLoop) -> None:
    self._loop = loop
    self._state = _State.INIT

    self._timeout_handler = None  # type: Optional[asyncio.Handle]
    if deadline is None:
        self._deadline = None  # type: Optional[float]
    else:
        self.update(deadline)

_do_enter

_do_enter() -> None
Source code in vllm/engine/async_timeout.py
def _do_enter(self) -> None:
    if self._state != _State.INIT:
        raise RuntimeError(f"invalid state {self._state.value}")
    self._state = _State.ENTER
    self._reschedule()

_do_exit

_do_exit(exc_type: Optional[Type[BaseException]]) -> None
Source code in vllm/engine/async_timeout.py
def _do_exit(self, exc_type: Optional[Type[BaseException]]) -> None:
    if exc_type is asyncio.CancelledError and \
            self._state == _State.TIMEOUT:
        self._timeout_handler = None
        raise asyncio.TimeoutError
    # timeout has not expired
    self._state = _State.EXIT
    self._reject()
    return None

_on_timeout

_on_timeout(task: Optional[Task[Any]]) -> None
Source code in vllm/engine/async_timeout.py
def _on_timeout(self, task: "Optional[asyncio.Task[Any]]") -> None:
    if task:
        task.cancel()
    self._state = _State.TIMEOUT
    # drop the reference early
    self._timeout_handler = None

_reject

_reject() -> None
Source code in vllm/engine/async_timeout.py
def _reject(self) -> None:
    if self._timeout_handler is not None:
        self._timeout_handler.cancel()
        self._timeout_handler = None

_reschedule

_reschedule() -> None
Source code in vllm/engine/async_timeout.py
def _reschedule(self) -> None:
    assert self._state == _State.ENTER
    deadline = self._deadline
    if deadline is None:
        return

    now = self._loop.time()
    if self._timeout_handler is not None:
        self._timeout_handler.cancel()

    task = asyncio.current_task()
    if deadline <= now:
        self._timeout_handler = self._loop.call_soon(
            self._on_timeout, task)
    else:
        self._timeout_handler = self._loop.call_at(
            deadline, self._on_timeout, task)

reject

reject() -> None

Reject scheduled timeout if any.

Source code in vllm/engine/async_timeout.py
def reject(self) -> None:
    """Reject scheduled timeout if any."""
    # cancel is maybe better name but
    # task.cancel() raises CancelledError in asyncio world.
    if self._state not in (_State.INIT, _State.ENTER):
        raise RuntimeError(f"invalid state {self._state.value}")
    self._reject()

shift

shift(delay: float) -> None

Advance timeout on delay seconds. The delay can be negative. Raise RuntimeError if shift is called when deadline is not scheduled

Source code in vllm/engine/async_timeout.py
def shift(self, delay: float) -> None:
    """Advance timeout on delay seconds.
    The delay can be negative.
    Raise RuntimeError if shift is called when deadline is not scheduled
    """
    deadline = self._deadline
    if deadline is None:
        raise RuntimeError(
            "cannot shift timeout if deadline is not scheduled")
    self.update(deadline + delay)

update

update(deadline: float) -> None

Set deadline to absolute value. deadline argument points on the time in the same clock system as loop.time(). If new deadline is in the past the timeout is raised immediately. Please note: it is not POSIX time but a time with undefined starting base, e.g. the time of the system power on.

Source code in vllm/engine/async_timeout.py
def update(self, deadline: float) -> None:
    """Set deadline to absolute value.
    deadline argument points on the time in the same clock system
    as loop.time().
    If new deadline is in the past the timeout is raised immediately.
    Please note: it is not POSIX time but a time with
    undefined starting base, e.g. the time of the system power on.
    """
    if self._state == _State.EXIT:
        raise RuntimeError(
            "cannot reschedule after exit from context manager")
    if self._state == _State.TIMEOUT:
        raise RuntimeError("cannot reschedule expired timeout")
    if self._timeout_handler is not None:
        self._timeout_handler.cancel()
    self._deadline = deadline
    if self._state != _State.INIT:
        self._reschedule()

_State

Bases: Enum

Source code in vllm/engine/async_timeout.py
class _State(enum.Enum):
    INIT = "INIT"
    ENTER = "ENTER"
    TIMEOUT = "TIMEOUT"
    EXIT = "EXIT"

ENTER class-attribute instance-attribute

ENTER = 'ENTER'

EXIT class-attribute instance-attribute

EXIT = 'EXIT'

INIT class-attribute instance-attribute

INIT = 'INIT'

TIMEOUT class-attribute instance-attribute

TIMEOUT = 'TIMEOUT'