Skip to content

vllm.entrypoints.openai.api_server

TASK_HANDLERS module-attribute

TASK_HANDLERS: dict[str, dict[str, tuple]] = {
    "generate": {
        "messages": (
            ChatCompletionRequest,
            create_chat_completion,
        ),
        "default": (CompletionRequest, create_completion),
    },
    "embed": {
        "messages": (
            EmbeddingChatRequest,
            create_embedding,
        ),
        "default": (
            EmbeddingCompletionRequest,
            create_embedding,
        ),
    },
    "score": {"default": (RerankRequest, do_rerank)},
    "rerank": {"default": (RerankRequest, do_rerank)},
    "reward": {
        "messages": (PoolingChatRequest, create_pooling),
        "default": (
            PoolingCompletionRequest,
            create_pooling,
        ),
    },
    "classify": {
        "messages": (PoolingChatRequest, create_pooling),
        "default": (
            PoolingCompletionRequest,
            create_pooling,
        ),
    },
}

_running_tasks module-attribute

_running_tasks: set[Task] = set()

args module-attribute

args = parse_args()

logger module-attribute

logger = init_logger('vllm.entrypoints.openai.api_server')

parser module-attribute

parser = FlexibleArgumentParser(
    description="vLLM OpenAI-Compatible RESTful API server."
)

prometheus_multiproc_dir module-attribute

prometheus_multiproc_dir: TemporaryDirectory

router module-attribute

router = APIRouter()

AuthenticationMiddleware

Pure ASGI middleware that authenticates each request by checking if the Authorization header exists and equals "Bearer {api_key}".

Notes

There are two cases in which authentication is skipped: 1. The HTTP method is OPTIONS. 2. The request path doesn't start with /v1 (e.g. /health).

Source code in vllm/entrypoints/openai/api_server.py
class AuthenticationMiddleware:
    """
    Pure ASGI middleware that authenticates each request by checking
    if the Authorization header exists and equals "Bearer {api_key}".

    Notes
    -----
    There are two cases in which authentication is skipped:
        1. The HTTP method is OPTIONS.
        2. The request path doesn't start with /v1 (e.g. /health).
    """

    def __init__(self, app: ASGIApp, api_token: str) -> None:
        self.app = app
        self.api_token = api_token

    def __call__(self, scope: Scope, receive: Receive,
                 send: Send) -> Awaitable[None]:
        if scope["type"] not in ("http",
                                 "websocket") or scope["method"] == "OPTIONS":
            # scope["type"] can be "lifespan" or "startup" for example,
            # in which case we don't need to do anything
            return self.app(scope, receive, send)
        root_path = scope.get("root_path", "")
        url_path = URL(scope=scope).path.removeprefix(root_path)
        headers = Headers(scope=scope)
        # Type narrow to satisfy mypy.
        if url_path.startswith("/v1") and headers.get(
                "Authorization") != f"Bearer {self.api_token}":
            response = JSONResponse(content={"error": "Unauthorized"},
                                    status_code=401)
            return response(scope, receive, send)
        return self.app(scope, receive, send)

api_token instance-attribute

api_token = api_token

app instance-attribute

app = app

__call__

__call__(
    scope: Scope, receive: Receive, send: Send
) -> Awaitable[None]
Source code in vllm/entrypoints/openai/api_server.py
def __call__(self, scope: Scope, receive: Receive,
             send: Send) -> Awaitable[None]:
    if scope["type"] not in ("http",
                             "websocket") or scope["method"] == "OPTIONS":
        # scope["type"] can be "lifespan" or "startup" for example,
        # in which case we don't need to do anything
        return self.app(scope, receive, send)
    root_path = scope.get("root_path", "")
    url_path = URL(scope=scope).path.removeprefix(root_path)
    headers = Headers(scope=scope)
    # Type narrow to satisfy mypy.
    if url_path.startswith("/v1") and headers.get(
            "Authorization") != f"Bearer {self.api_token}":
        response = JSONResponse(content={"error": "Unauthorized"},
                                status_code=401)
        return response(scope, receive, send)
    return self.app(scope, receive, send)

__init__

__init__(app: ASGIApp, api_token: str) -> None
Source code in vllm/entrypoints/openai/api_server.py
def __init__(self, app: ASGIApp, api_token: str) -> None:
    self.app = app
    self.api_token = api_token

PrometheusResponse

Bases: Response

Source code in vllm/entrypoints/openai/api_server.py
class PrometheusResponse(Response):
    media_type = prometheus_client.CONTENT_TYPE_LATEST

media_type class-attribute instance-attribute

media_type = CONTENT_TYPE_LATEST

XRequestIdMiddleware

Middleware the set's the X-Request-Id header for each response to a random uuid4 (hex) value if the header isn't already present in the request, otherwise use the provided request id.

Source code in vllm/entrypoints/openai/api_server.py
class XRequestIdMiddleware:
    """
    Middleware the set's the X-Request-Id header for each response
    to a random uuid4 (hex) value if the header isn't already
    present in the request, otherwise use the provided request id.
    """

    def __init__(self, app: ASGIApp) -> None:
        self.app = app

    def __call__(self, scope: Scope, receive: Receive,
                 send: Send) -> Awaitable[None]:
        if scope["type"] not in ("http", "websocket"):
            return self.app(scope, receive, send)

        # Extract the request headers.
        request_headers = Headers(scope=scope)

        async def send_with_request_id(message: Message) -> None:
            """
            Custom send function to mutate the response headers
            and append X-Request-Id to it.
            """
            if message["type"] == "http.response.start":
                response_headers = MutableHeaders(raw=message["headers"])
                request_id = request_headers.get("X-Request-Id",
                                                 uuid.uuid4().hex)
                response_headers.append("X-Request-Id", request_id)
            await send(message)

        return self.app(scope, receive, send_with_request_id)

app instance-attribute

app = app

__call__

__call__(
    scope: Scope, receive: Receive, send: Send
) -> Awaitable[None]
Source code in vllm/entrypoints/openai/api_server.py
def __call__(self, scope: Scope, receive: Receive,
             send: Send) -> Awaitable[None]:
    if scope["type"] not in ("http", "websocket"):
        return self.app(scope, receive, send)

    # Extract the request headers.
    request_headers = Headers(scope=scope)

    async def send_with_request_id(message: Message) -> None:
        """
        Custom send function to mutate the response headers
        and append X-Request-Id to it.
        """
        if message["type"] == "http.response.start":
            response_headers = MutableHeaders(raw=message["headers"])
            request_id = request_headers.get("X-Request-Id",
                                             uuid.uuid4().hex)
            response_headers.append("X-Request-Id", request_id)
        await send(message)

    return self.app(scope, receive, send_with_request_id)

__init__

__init__(app: ASGIApp) -> None
Source code in vllm/entrypoints/openai/api_server.py
def __init__(self, app: ASGIApp) -> None:
    self.app = app

base

base(request: Request) -> OpenAIServing
Source code in vllm/entrypoints/openai/api_server.py
def base(request: Request) -> OpenAIServing:
    # Reuse the existing instance
    return tokenization(request)

build_app

build_app(args: Namespace) -> FastAPI
Source code in vllm/entrypoints/openai/api_server.py
def build_app(args: Namespace) -> FastAPI:
    if args.disable_fastapi_docs:
        app = FastAPI(openapi_url=None,
                      docs_url=None,
                      redoc_url=None,
                      lifespan=lifespan)
    else:
        app = FastAPI(lifespan=lifespan)
    app.include_router(router)
    app.root_path = args.root_path

    mount_metrics(app)

    app.add_middleware(
        CORSMiddleware,
        allow_origins=args.allowed_origins,
        allow_credentials=args.allow_credentials,
        allow_methods=args.allowed_methods,
        allow_headers=args.allowed_headers,
    )

    @app.exception_handler(HTTPException)
    async def http_exception_handler(_: Request, exc: HTTPException):
        err = ErrorResponse(message=exc.detail,
                            type=HTTPStatus(exc.status_code).phrase,
                            code=exc.status_code)
        return JSONResponse(err.model_dump(), status_code=exc.status_code)

    @app.exception_handler(RequestValidationError)
    async def validation_exception_handler(_: Request,
                                           exc: RequestValidationError):
        exc_str = str(exc)
        errors_str = str(exc.errors())

        if exc.errors() and errors_str and errors_str != exc_str:
            message = f"{exc_str} {errors_str}"
        else:
            message = exc_str

        err = ErrorResponse(message=message,
                            type=HTTPStatus.BAD_REQUEST.phrase,
                            code=HTTPStatus.BAD_REQUEST)
        return JSONResponse(err.model_dump(),
                            status_code=HTTPStatus.BAD_REQUEST)

    # Ensure --api-key option from CLI takes precedence over VLLM_API_KEY
    if token := args.api_key or envs.VLLM_API_KEY:
        app.add_middleware(AuthenticationMiddleware, api_token=token)

    if args.enable_request_id_headers:
        app.add_middleware(XRequestIdMiddleware)

    if envs.VLLM_DEBUG_LOG_API_SERVER_RESPONSE:
        logger.warning("CAUTION: Enabling log response in the API Server. "
                       "This can include sensitive information and should be "
                       "avoided in production.")

        @app.middleware("http")
        async def log_response(request: Request, call_next):
            response = await call_next(request)
            response_body = [
                section async for section in response.body_iterator
            ]
            response.body_iterator = iterate_in_threadpool(iter(response_body))
            logger.info("response_body={%s}",
                        response_body[0].decode() if response_body else None)
            return response

    for middleware in args.middleware:
        module_path, object_name = middleware.rsplit(".", 1)
        imported = getattr(importlib.import_module(module_path), object_name)
        if inspect.isclass(imported):
            app.add_middleware(imported)  # type: ignore[arg-type]
        elif inspect.iscoroutinefunction(imported):
            app.middleware("http")(imported)
        else:
            raise ValueError(f"Invalid middleware {middleware}. "
                             f"Must be a function or a class.")

    return app

build_async_engine_client async

build_async_engine_client(
    args: Namespace,
    client_config: Optional[dict[str, Any]] = None,
) -> AsyncIterator[EngineClient]
Source code in vllm/entrypoints/openai/api_server.py
@asynccontextmanager
async def build_async_engine_client(
    args: Namespace,
    client_config: Optional[dict[str, Any]] = None,
) -> AsyncIterator[EngineClient]:

    # Context manager to handle engine_client lifecycle
    # Ensures everything is shutdown and cleaned up on error/exit
    engine_args = AsyncEngineArgs.from_cli_args(args)

    async with build_async_engine_client_from_engine_args(
            engine_args, args.disable_frontend_multiprocessing,
            client_config) as engine:
        yield engine

build_async_engine_client_from_engine_args async

build_async_engine_client_from_engine_args(
    engine_args: AsyncEngineArgs,
    disable_frontend_multiprocessing: bool = False,
    client_config: Optional[dict[str, Any]] = None,
) -> AsyncIterator[EngineClient]

Create EngineClient, either: - in-process using the AsyncLLMEngine Directly - multiprocess using AsyncLLMEngine RPC

Returns the Client or None if the creation failed.

Source code in vllm/entrypoints/openai/api_server.py
@asynccontextmanager
async def build_async_engine_client_from_engine_args(
    engine_args: AsyncEngineArgs,
    disable_frontend_multiprocessing: bool = False,
    client_config: Optional[dict[str, Any]] = None,
) -> AsyncIterator[EngineClient]:
    """
    Create EngineClient, either:
        - in-process using the AsyncLLMEngine Directly
        - multiprocess using AsyncLLMEngine RPC

    Returns the Client or None if the creation failed.
    """

    # Create the EngineConfig (determines if we can use V1).
    usage_context = UsageContext.OPENAI_API_SERVER
    vllm_config = engine_args.create_engine_config(usage_context=usage_context)

    # V1 AsyncLLM.
    if envs.VLLM_USE_V1:
        if disable_frontend_multiprocessing:
            logger.warning(
                "V1 is enabled, but got --disable-frontend-multiprocessing. "
                "To disable frontend multiprocessing, set VLLM_USE_V1=0.")

        from vllm.v1.engine.async_llm import AsyncLLM
        async_llm: Optional[AsyncLLM] = None
        client_index = client_config.pop(
            "client_index") if client_config else 0
        try:
            async_llm = AsyncLLM.from_vllm_config(
                vllm_config=vllm_config,
                usage_context=usage_context,
                disable_log_requests=engine_args.disable_log_requests,
                disable_log_stats=engine_args.disable_log_stats,
                client_addresses=client_config,
                client_index=client_index)

            # Don't keep the dummy data in memory
            await async_llm.reset_mm_cache()

            yield async_llm
        finally:
            if async_llm:
                async_llm.shutdown()

    # V0 AsyncLLM.
    elif (MQLLMEngineClient.is_unsupported_config(vllm_config)
          or disable_frontend_multiprocessing):

        engine_client: Optional[EngineClient] = None
        try:
            engine_client = AsyncLLMEngine.from_vllm_config(
                vllm_config=vllm_config,
                usage_context=usage_context,
                disable_log_requests=engine_args.disable_log_requests,
                disable_log_stats=engine_args.disable_log_stats)
            yield engine_client
        finally:
            if engine_client and hasattr(engine_client, "shutdown"):
                engine_client.shutdown()

    # V0MQLLMEngine.
    else:
        if "PROMETHEUS_MULTIPROC_DIR" not in os.environ:
            # Make TemporaryDirectory for prometheus multiprocessing
            # Note: global TemporaryDirectory will be automatically
            #   cleaned up upon exit.
            global prometheus_multiproc_dir
            prometheus_multiproc_dir = tempfile.TemporaryDirectory()
            os.environ[
                "PROMETHEUS_MULTIPROC_DIR"] = prometheus_multiproc_dir.name
        else:
            logger.warning(
                "Found PROMETHEUS_MULTIPROC_DIR was set by user. "
                "This directory must be wiped between vLLM runs or "
                "you will find inaccurate metrics. Unset the variable "
                "and vLLM will properly handle cleanup.")

        # Select random path for IPC.
        ipc_path = get_open_zmq_ipc_path()
        logger.debug("Multiprocessing frontend to use %s for IPC Path.",
                     ipc_path)

        # Start RPCServer in separate process (holds the LLMEngine).
        # the current process might have CUDA context,
        # so we need to spawn a new process
        context = multiprocessing.get_context("spawn")

        # Ensure we can serialize transformer config before spawning
        maybe_register_config_serialize_by_value()

        # The Process can raise an exception during startup, which may
        # not actually result in an exitcode being reported. As a result
        # we use a shared variable to communicate the information.
        engine_alive = multiprocessing.Value('b', True, lock=False)
        engine_process = context.Process(
            target=run_mp_engine,
            args=(vllm_config, UsageContext.OPENAI_API_SERVER, ipc_path,
                  engine_args.disable_log_stats,
                  engine_args.disable_log_requests, engine_alive))
        engine_process.start()
        engine_pid = engine_process.pid
        assert engine_pid is not None, "Engine process failed to start."
        logger.info("Started engine process with PID %d", engine_pid)

        def _cleanup_ipc_path():
            socket_path = ipc_path.replace("ipc://", "")
            if os.path.exists(socket_path):
                os.remove(socket_path)

        # Ensure we clean up the local IPC socket file on exit.
        atexit.register(_cleanup_ipc_path)

        # Build RPCClient, which conforms to EngineClient Protocol.
        build_client = partial(MQLLMEngineClient, ipc_path, vllm_config,
                               engine_pid)
        mq_engine_client = await asyncio.get_running_loop().run_in_executor(
            None, build_client)
        try:
            while True:
                try:
                    await mq_engine_client.setup()
                    break
                except TimeoutError:
                    if (not engine_process.is_alive()
                            or not engine_alive.value):
                        raise RuntimeError(
                            "Engine process failed to start. See stack "
                            "trace for the root cause.") from None

            yield mq_engine_client  # type: ignore[misc]
        finally:
            # Ensure rpc server process was terminated
            engine_process.terminate()

            # Close all open connections to the backend
            mq_engine_client.close()

            # Wait for engine process to join
            engine_process.join(4)
            if engine_process.exitcode is None:
                # Kill if taking longer than 5 seconds to stop
                engine_process.kill()

            # Lazy import for prometheus multiprocessing.
            # We need to set PROMETHEUS_MULTIPROC_DIR environment variable
            # before prometheus_client is imported.
            # See https://prometheus.github.io/client_python/multiprocess/
            from prometheus_client import multiprocess
            multiprocess.mark_process_dead(engine_process.pid)

chat

chat(request: Request) -> Optional[OpenAIServingChat]
Source code in vllm/entrypoints/openai/api_server.py
def chat(request: Request) -> Optional[OpenAIServingChat]:
    return request.app.state.openai_serving_chat

classify

classify(
    request: Request,
) -> Optional[ServingClassification]
Source code in vllm/entrypoints/openai/api_server.py
def classify(request: Request) -> Optional[ServingClassification]:
    return request.app.state.openai_serving_classification

completion

completion(
    request: Request,
) -> Optional[OpenAIServingCompletion]
Source code in vllm/entrypoints/openai/api_server.py
def completion(request: Request) -> Optional[OpenAIServingCompletion]:
    return request.app.state.openai_serving_completion

create_chat_completion async

create_chat_completion(
    request: ChatCompletionRequest, raw_request: Request
)
Source code in vllm/entrypoints/openai/api_server.py
@router.post("/v1/chat/completions",
             dependencies=[Depends(validate_json_request)],
             responses={
                 HTTPStatus.OK.value: {
                     "content": {
                         "text/event-stream": {}
                     }
                 },
                 HTTPStatus.BAD_REQUEST.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.NOT_FOUND.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.INTERNAL_SERVER_ERROR.value: {
                     "model": ErrorResponse
                 }
             })
@with_cancellation
@load_aware_call
async def create_chat_completion(request: ChatCompletionRequest,
                                 raw_request: Request):
    handler = chat(raw_request)
    if handler is None:
        return base(raw_request).create_error_response(
            message="The model does not support Chat Completions API")

    generator = await handler.create_chat_completion(request, raw_request)

    if isinstance(generator, ErrorResponse):
        return JSONResponse(content=generator.model_dump(),
                            status_code=generator.code)

    elif isinstance(generator, ChatCompletionResponse):
        return JSONResponse(content=generator.model_dump())

    return StreamingResponse(content=generator, media_type="text/event-stream")

create_classify async

create_classify(
    request: ClassificationRequest, raw_request: Request
)
Source code in vllm/entrypoints/openai/api_server.py
@router.post("/classify", dependencies=[Depends(validate_json_request)])
@with_cancellation
@load_aware_call
async def create_classify(request: ClassificationRequest,
                          raw_request: Request):
    handler = classify(raw_request)
    if handler is None:
        return base(raw_request).create_error_response(
            message="The model does not support Classification API")

    generator = await handler.create_classify(request, raw_request)
    if isinstance(generator, ErrorResponse):
        return JSONResponse(content=generator.model_dump(),
                            status_code=generator.code)

    elif isinstance(generator, ClassificationResponse):
        return JSONResponse(content=generator.model_dump())

    assert_never(generator)

create_completion async

create_completion(
    request: CompletionRequest, raw_request: Request
)
Source code in vllm/entrypoints/openai/api_server.py
@router.post("/v1/completions",
             dependencies=[Depends(validate_json_request)],
             responses={
                 HTTPStatus.OK.value: {
                     "content": {
                         "text/event-stream": {}
                     }
                 },
                 HTTPStatus.BAD_REQUEST.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.NOT_FOUND.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.INTERNAL_SERVER_ERROR.value: {
                     "model": ErrorResponse
                 },
             })
@with_cancellation
@load_aware_call
async def create_completion(request: CompletionRequest, raw_request: Request):
    handler = completion(raw_request)
    if handler is None:
        return base(raw_request).create_error_response(
            message="The model does not support Completions API")

    try:
        generator = await handler.create_completion(request, raw_request)
    except OverflowError as e:
        raise HTTPException(status_code=HTTPStatus.BAD_REQUEST.value,
                            detail=str(e)) from e
    except Exception as e:
        raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value,
                            detail=str(e)) from e

    if isinstance(generator, ErrorResponse):
        return JSONResponse(content=generator.model_dump(),
                            status_code=generator.code)
    elif isinstance(generator, CompletionResponse):
        return JSONResponse(content=generator.model_dump())

    return StreamingResponse(content=generator, media_type="text/event-stream")

create_embedding async

create_embedding(
    request: EmbeddingRequest, raw_request: Request
)
Source code in vllm/entrypoints/openai/api_server.py
@router.post("/v1/embeddings",
             dependencies=[Depends(validate_json_request)],
             responses={
                 HTTPStatus.BAD_REQUEST.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.INTERNAL_SERVER_ERROR.value: {
                     "model": ErrorResponse
                 },
             })
@with_cancellation
@load_aware_call
async def create_embedding(request: EmbeddingRequest, raw_request: Request):
    handler = embedding(raw_request)
    if handler is None:
        return base(raw_request).create_error_response(
            message="The model does not support Embeddings API")

    generator = await handler.create_embedding(request, raw_request)

    if isinstance(generator, ErrorResponse):
        return JSONResponse(content=generator.model_dump(),
                            status_code=generator.code)
    elif isinstance(generator, EmbeddingResponse):
        return JSONResponse(content=generator.model_dump())

    assert_never(generator)

create_pooling async

create_pooling(
    request: PoolingRequest, raw_request: Request
)
Source code in vllm/entrypoints/openai/api_server.py
@router.post("/pooling",
             dependencies=[Depends(validate_json_request)],
             responses={
                 HTTPStatus.BAD_REQUEST.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.INTERNAL_SERVER_ERROR.value: {
                     "model": ErrorResponse
                 },
             })
@with_cancellation
@load_aware_call
async def create_pooling(request: PoolingRequest, raw_request: Request):
    handler = pooling(raw_request)
    if handler is None:
        return base(raw_request).create_error_response(
            message="The model does not support Pooling API")

    generator = await handler.create_pooling(request, raw_request)
    if isinstance(generator, ErrorResponse):
        return JSONResponse(content=generator.model_dump(),
                            status_code=generator.code)
    elif isinstance(generator, PoolingResponse):
        return JSONResponse(content=generator.model_dump())

    assert_never(generator)

create_score async

create_score(request: ScoreRequest, raw_request: Request)
Source code in vllm/entrypoints/openai/api_server.py
@router.post("/score",
             dependencies=[Depends(validate_json_request)],
             responses={
                 HTTPStatus.BAD_REQUEST.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.INTERNAL_SERVER_ERROR.value: {
                     "model": ErrorResponse
                 },
             })
@with_cancellation
@load_aware_call
async def create_score(request: ScoreRequest, raw_request: Request):
    handler = score(raw_request)
    if handler is None:
        return base(raw_request).create_error_response(
            message="The model does not support Score API")

    generator = await handler.create_score(request, raw_request)
    if isinstance(generator, ErrorResponse):
        return JSONResponse(content=generator.model_dump(),
                            status_code=generator.code)
    elif isinstance(generator, ScoreResponse):
        return JSONResponse(content=generator.model_dump())

    assert_never(generator)

create_score_v1 async

create_score_v1(
    request: ScoreRequest, raw_request: Request
)
Source code in vllm/entrypoints/openai/api_server.py
@router.post("/v1/score",
             dependencies=[Depends(validate_json_request)],
             responses={
                 HTTPStatus.BAD_REQUEST.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.INTERNAL_SERVER_ERROR.value: {
                     "model": ErrorResponse
                 },
             })
@with_cancellation
@load_aware_call
async def create_score_v1(request: ScoreRequest, raw_request: Request):
    logger.warning(
        "To indicate that Score API is not part of standard OpenAI API, we "
        "have moved it to `/score`. Please update your client accordingly.")

    return await create_score(request, raw_request)

create_server_socket

create_server_socket(addr: tuple[str, int]) -> socket
Source code in vllm/entrypoints/openai/api_server.py
def create_server_socket(addr: tuple[str, int]) -> socket.socket:
    family = socket.AF_INET
    if is_valid_ipv6_address(addr[0]):
        family = socket.AF_INET6

    sock = socket.socket(family=family, type=socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
    sock.bind(addr)

    return sock

create_transcriptions async

create_transcriptions(
    raw_request: Request,
    request: Annotated[TranscriptionRequest, Form()],
)
Source code in vllm/entrypoints/openai/api_server.py
@router.post("/v1/audio/transcriptions",
             responses={
                 HTTPStatus.OK.value: {
                     "content": {
                         "text/event-stream": {}
                     }
                 },
                 HTTPStatus.BAD_REQUEST.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.UNPROCESSABLE_ENTITY.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.INTERNAL_SERVER_ERROR.value: {
                     "model": ErrorResponse
                 },
             })
@with_cancellation
@load_aware_call
async def create_transcriptions(raw_request: Request,
                                request: Annotated[TranscriptionRequest,
                                                   Form()]):
    handler = transcription(raw_request)
    if handler is None:
        return base(raw_request).create_error_response(
            message="The model does not support Transcriptions API")

    audio_data = await request.file.read()
    generator = await handler.create_transcription(audio_data, request,
                                                   raw_request)

    if isinstance(generator, ErrorResponse):
        return JSONResponse(content=generator.model_dump(),
                            status_code=generator.code)

    elif isinstance(generator, TranscriptionResponse):
        return JSONResponse(content=generator.model_dump())

    return StreamingResponse(content=generator, media_type="text/event-stream")

create_translations async

create_translations(
    request: Annotated[TranslationRequest, Form()],
    raw_request: Request,
)
Source code in vllm/entrypoints/openai/api_server.py
@router.post("/v1/audio/translations",
             responses={
                 HTTPStatus.OK.value: {
                     "content": {
                         "text/event-stream": {}
                     }
                 },
                 HTTPStatus.BAD_REQUEST.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.UNPROCESSABLE_ENTITY.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.INTERNAL_SERVER_ERROR.value: {
                     "model": ErrorResponse
                 },
             })
@with_cancellation
@load_aware_call
async def create_translations(request: Annotated[TranslationRequest,
                                                 Form()],
                              raw_request: Request):
    handler = translation(raw_request)
    if handler is None:
        return base(raw_request).create_error_response(
            message="The model does not support Translations API")

    audio_data = await request.file.read()
    generator = await handler.create_translation(audio_data, request,
                                                 raw_request)

    if isinstance(generator, ErrorResponse):
        return JSONResponse(content=generator.model_dump(),
                            status_code=generator.code)

    elif isinstance(generator, TranslationResponse):
        return JSONResponse(content=generator.model_dump())

    return StreamingResponse(content=generator, media_type="text/event-stream")

detokenize async

detokenize(
    request: DetokenizeRequest, raw_request: Request
)
Source code in vllm/entrypoints/openai/api_server.py
@router.post("/detokenize",
             dependencies=[Depends(validate_json_request)],
             responses={
                 HTTPStatus.BAD_REQUEST.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.NOT_FOUND.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.INTERNAL_SERVER_ERROR.value: {
                     "model": ErrorResponse
                 },
             })
@with_cancellation
async def detokenize(request: DetokenizeRequest, raw_request: Request):
    handler = tokenization(raw_request)

    try:
        generator = await handler.create_detokenize(request, raw_request)
    except OverflowError as e:
        raise RequestValidationError(errors=[str(e)]) from e
    except Exception as e:
        raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value,
                            detail=str(e)) from e

    if isinstance(generator, ErrorResponse):
        return JSONResponse(content=generator.model_dump(),
                            status_code=generator.code)
    elif isinstance(generator, DetokenizeResponse):
        return JSONResponse(content=generator.model_dump())

    assert_never(generator)

do_rerank async

do_rerank(request: RerankRequest, raw_request: Request)
Source code in vllm/entrypoints/openai/api_server.py
@router.post("/rerank",
             dependencies=[Depends(validate_json_request)],
             responses={
                 HTTPStatus.BAD_REQUEST.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.INTERNAL_SERVER_ERROR.value: {
                     "model": ErrorResponse
                 },
             })
@with_cancellation
@load_aware_call
async def do_rerank(request: RerankRequest, raw_request: Request):
    handler = rerank(raw_request)
    if handler is None:
        return base(raw_request).create_error_response(
            message="The model does not support Rerank (Score) API")
    generator = await handler.do_rerank(request, raw_request)
    if isinstance(generator, ErrorResponse):
        return JSONResponse(content=generator.model_dump(),
                            status_code=generator.code)
    elif isinstance(generator, RerankResponse):
        return JSONResponse(content=generator.model_dump())

    assert_never(generator)

do_rerank_v1 async

do_rerank_v1(request: RerankRequest, raw_request: Request)
Source code in vllm/entrypoints/openai/api_server.py
@router.post("/v1/rerank",
             dependencies=[Depends(validate_json_request)],
             responses={
                 HTTPStatus.BAD_REQUEST.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.INTERNAL_SERVER_ERROR.value: {
                     "model": ErrorResponse
                 },
             })
@with_cancellation
async def do_rerank_v1(request: RerankRequest, raw_request: Request):
    logger.warning_once(
        "To indicate that the rerank API is not part of the standard OpenAI"
        " API, we have located it at `/rerank`. Please update your client "
        "accordingly. (Note: Conforms to JinaAI rerank API)")

    return await do_rerank(request, raw_request)

do_rerank_v2 async

do_rerank_v2(request: RerankRequest, raw_request: Request)
Source code in vllm/entrypoints/openai/api_server.py
@router.post("/v2/rerank",
             dependencies=[Depends(validate_json_request)],
             responses={
                 HTTPStatus.BAD_REQUEST.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.INTERNAL_SERVER_ERROR.value: {
                     "model": ErrorResponse
                 },
             })
@with_cancellation
async def do_rerank_v2(request: RerankRequest, raw_request: Request):
    return await do_rerank(request, raw_request)

embedding

embedding(
    request: Request,
) -> Optional[OpenAIServingEmbedding]
Source code in vllm/entrypoints/openai/api_server.py
def embedding(request: Request) -> Optional[OpenAIServingEmbedding]:
    return request.app.state.openai_serving_embedding

engine_client

engine_client(request: Request) -> EngineClient
Source code in vllm/entrypoints/openai/api_server.py
def engine_client(request: Request) -> EngineClient:
    return request.app.state.engine_client

get_server_load_metrics async

get_server_load_metrics(request: Request)
Source code in vllm/entrypoints/openai/api_server.py
@router.get("/load")
async def get_server_load_metrics(request: Request):
    # This endpoint returns the current server load metrics.
    # It tracks requests utilizing the GPU from the following routes:
    # - /v1/chat/completions
    # - /v1/completions
    # - /v1/audio/transcriptions
    # - /v1/embeddings
    # - /pooling
    # - /classify
    # - /score
    # - /v1/score
    # - /rerank
    # - /v1/rerank
    # - /v2/rerank
    return JSONResponse(
        content={'server_load': request.app.state.server_load_metrics})

health async

health(raw_request: Request) -> Response

Health check.

Source code in vllm/entrypoints/openai/api_server.py
@router.get("/health", response_class=Response)
async def health(raw_request: Request) -> Response:
    """Health check."""
    await engine_client(raw_request).check_health()
    return Response(status_code=200)

init_app_state async

init_app_state(
    engine_client: EngineClient,
    vllm_config: VllmConfig,
    state: State,
    args: Namespace,
) -> None
Source code in vllm/entrypoints/openai/api_server.py
async def init_app_state(
    engine_client: EngineClient,
    vllm_config: VllmConfig,
    state: State,
    args: Namespace,
) -> None:
    if args.served_model_name is not None:
        served_model_names = args.served_model_name
    else:
        served_model_names = [args.model]

    if args.disable_log_requests:
        request_logger = None
    else:
        request_logger = RequestLogger(max_log_len=args.max_log_len)

    base_model_paths = [
        BaseModelPath(name=name, model_path=args.model)
        for name in served_model_names
    ]

    state.engine_client = engine_client
    state.log_stats = not args.disable_log_stats
    state.vllm_config = vllm_config
    model_config = vllm_config.model_config

    resolved_chat_template = load_chat_template(args.chat_template)
    if resolved_chat_template is not None:
        # Get the tokenizer to check official template
        tokenizer = await engine_client.get_tokenizer()

        if isinstance(tokenizer, MistralTokenizer):
            # The warning is logged in resolve_mistral_chat_template.
            resolved_chat_template = resolve_mistral_chat_template(
                chat_template=resolved_chat_template)
        else:
            hf_chat_template = resolve_hf_chat_template(
                tokenizer=tokenizer,
                chat_template=None,
                tools=None,
                model_config=vllm_config.model_config,
            )

            if hf_chat_template != resolved_chat_template:
                logger.warning(
                    "Using supplied chat template: %s\n"
                    "It is different from official chat template '%s'. "
                    "This discrepancy may lead to performance degradation.",
                    resolved_chat_template, args.model)

    state.openai_serving_models = OpenAIServingModels(
        engine_client=engine_client,
        model_config=model_config,
        base_model_paths=base_model_paths,
        lora_modules=args.lora_modules,
        prompt_adapters=args.prompt_adapters,
    )
    await state.openai_serving_models.init_static_loras()
    state.openai_serving_chat = OpenAIServingChat(
        engine_client,
        model_config,
        state.openai_serving_models,
        args.response_role,
        request_logger=request_logger,
        chat_template=resolved_chat_template,
        chat_template_content_format=args.chat_template_content_format,
        return_tokens_as_token_ids=args.return_tokens_as_token_ids,
        enable_auto_tools=args.enable_auto_tool_choice,
        expand_tools_even_if_tool_choice_none=args.
        expand_tools_even_if_tool_choice_none,
        tool_parser=args.tool_call_parser,
        reasoning_parser=args.reasoning_parser,
        enable_prompt_tokens_details=args.enable_prompt_tokens_details,
        enable_force_include_usage=args.enable_force_include_usage,
    ) if model_config.runner_type == "generate" else None
    state.openai_serving_completion = OpenAIServingCompletion(
        engine_client,
        model_config,
        state.openai_serving_models,
        request_logger=request_logger,
        return_tokens_as_token_ids=args.return_tokens_as_token_ids,
        enable_force_include_usage=args.enable_force_include_usage,
    ) if model_config.runner_type == "generate" else None
    state.openai_serving_pooling = OpenAIServingPooling(
        engine_client,
        model_config,
        state.openai_serving_models,
        request_logger=request_logger,
        chat_template=resolved_chat_template,
        chat_template_content_format=args.chat_template_content_format,
    ) if model_config.runner_type == "pooling" else None
    state.openai_serving_embedding = OpenAIServingEmbedding(
        engine_client,
        model_config,
        state.openai_serving_models,
        request_logger=request_logger,
        chat_template=resolved_chat_template,
        chat_template_content_format=args.chat_template_content_format,
    ) if model_config.task == "embed" else None
    state.openai_serving_classification = ServingClassification(
        engine_client,
        model_config,
        state.openai_serving_models,
        request_logger=request_logger,
    ) if model_config.task == "classify" else None

    enable_serving_reranking = (model_config.task == "classify" and getattr(
        model_config.hf_config, "num_labels", 0) == 1)
    state.jinaai_serving_reranking = ServingScores(
        engine_client,
        model_config,
        state.openai_serving_models,
        request_logger=request_logger) if enable_serving_reranking else None
    state.openai_serving_scores = ServingScores(
        engine_client,
        model_config,
        state.openai_serving_models,
        request_logger=request_logger) if (
            model_config.task == "embed" or enable_serving_reranking) else None

    state.openai_serving_tokenization = OpenAIServingTokenization(
        engine_client,
        model_config,
        state.openai_serving_models,
        request_logger=request_logger,
        chat_template=resolved_chat_template,
        chat_template_content_format=args.chat_template_content_format,
    )
    state.openai_serving_transcription = OpenAIServingTranscription(
        engine_client,
        model_config,
        state.openai_serving_models,
        request_logger=request_logger,
    ) if model_config.runner_type == "transcription" else None
    state.openai_serving_translation = OpenAIServingTranslation(
        engine_client,
        model_config,
        state.openai_serving_models,
        request_logger=request_logger,
    ) if model_config.runner_type == "transcription" else None
    state.task = model_config.task

    state.enable_server_load_tracking = args.enable_server_load_tracking
    state.server_load_metrics = 0

invocations async

invocations(raw_request: Request)

For SageMaker, routes requests to other handlers based on model task.

Source code in vllm/entrypoints/openai/api_server.py
@router.post("/invocations",
             dependencies=[Depends(validate_json_request)],
             responses={
                 HTTPStatus.BAD_REQUEST.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.UNSUPPORTED_MEDIA_TYPE.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.INTERNAL_SERVER_ERROR.value: {
                     "model": ErrorResponse
                 },
             })
async def invocations(raw_request: Request):
    """
    For SageMaker, routes requests to other handlers based on model `task`.
    """
    try:
        body = await raw_request.json()
    except json.JSONDecodeError as e:
        raise HTTPException(status_code=HTTPStatus.BAD_REQUEST.value,
                            detail=f"JSON decode error: {e}") from e

    task = raw_request.app.state.task

    if task not in TASK_HANDLERS:
        raise HTTPException(
            status_code=400,
            detail=f"Unsupported task: '{task}' for '/invocations'. "
            f"Expected one of {set(TASK_HANDLERS.keys())}")

    handler_config = TASK_HANDLERS[task]
    if "messages" in body:
        request_model, handler = handler_config["messages"]
    else:
        request_model, handler = handler_config["default"]

    # this is required since we lose the FastAPI automatic casting
    request = request_model.model_validate(body)
    return await handler(request, raw_request)

is_sleeping async

is_sleeping(raw_request: Request)
Source code in vllm/entrypoints/openai/api_server.py
@router.get("/is_sleeping")
async def is_sleeping(raw_request: Request):
    logger.info("check whether the engine is sleeping")
    is_sleeping = await engine_client(raw_request).is_sleeping()
    return JSONResponse(content={"is_sleeping": is_sleeping})

lifespan async

lifespan(app: FastAPI)
Source code in vllm/entrypoints/openai/api_server.py
@asynccontextmanager
async def lifespan(app: FastAPI):
    try:
        if app.state.log_stats:
            engine_client: EngineClient = app.state.engine_client

            async def _force_log():
                while True:
                    await asyncio.sleep(10.)
                    await engine_client.do_log_stats()

            task = asyncio.create_task(_force_log())
            _running_tasks.add(task)
            task.add_done_callback(_running_tasks.remove)
        else:
            task = None

        # Mark the startup heap as static so that it's ignored by GC.
        # Reduces pause times of oldest generation collections.
        gc.collect()
        gc.freeze()
        try:
            yield
        finally:
            if task is not None:
                task.cancel()
    finally:
        # Ensure app state including engine ref is gc'd
        del app.state

load_log_config

load_log_config(
    log_config_file: Optional[str],
) -> Optional[dict]
Source code in vllm/entrypoints/openai/api_server.py
def load_log_config(log_config_file: Optional[str]) -> Optional[dict]:
    if not log_config_file:
        return None
    try:
        with open(log_config_file) as f:
            return json.load(f)
    except Exception as e:
        logger.warning("Failed to load log config from file %s: error %s",
                       log_config_file, e)
        return None

load_lora_adapter async

load_lora_adapter(
    request: LoadLoRAAdapterRequest, raw_request: Request
)
Source code in vllm/entrypoints/openai/api_server.py
@router.post("/v1/load_lora_adapter",
             dependencies=[Depends(validate_json_request)])
async def load_lora_adapter(request: LoadLoRAAdapterRequest,
                            raw_request: Request):
    handler = models(raw_request)
    response = await handler.load_lora_adapter(request)
    if isinstance(response, ErrorResponse):
        return JSONResponse(content=response.model_dump(),
                            status_code=response.code)

    return Response(status_code=200, content=response)

models

models(request: Request) -> OpenAIServingModels
Source code in vllm/entrypoints/openai/api_server.py
def models(request: Request) -> OpenAIServingModels:
    return request.app.state.openai_serving_models

mount_metrics

mount_metrics(app: FastAPI)

Mount prometheus metrics to a FastAPI app.

Source code in vllm/entrypoints/openai/api_server.py
def mount_metrics(app: FastAPI):
    """Mount prometheus metrics to a FastAPI app."""

    registry = get_prometheus_registry()

    # `response_class=PrometheusResponse` is needed to return an HTTP response
    # with header "Content-Type: text/plain; version=0.0.4; charset=utf-8"
    # instead of the default "application/json" which is incorrect.
    # See https://github.com/trallnag/prometheus-fastapi-instrumentator/issues/163#issue-1296092364
    Instrumentator(
        excluded_handlers=[
            "/metrics",
            "/health",
            "/load",
            "/ping",
            "/version",
            "/server_info",
        ],
        registry=registry,
    ).add().instrument(app).expose(app, response_class=PrometheusResponse)

    # Add prometheus asgi middleware to route /metrics requests
    metrics_route = Mount("/metrics", make_asgi_app(registry=registry))

    # Workaround for 307 Redirect for /metrics
    metrics_route.path_regex = re.compile("^/metrics(?P<path>.*)$")
    app.routes.append(metrics_route)

ping async

ping(raw_request: Request) -> Response

Ping check. Endpoint required for SageMaker

Source code in vllm/entrypoints/openai/api_server.py
@router.get("/ping", response_class=Response)
@router.post("/ping", response_class=Response)
async def ping(raw_request: Request) -> Response:
    """Ping check. Endpoint required for SageMaker"""
    return await health(raw_request)

pooling

pooling(request: Request) -> Optional[OpenAIServingPooling]
Source code in vllm/entrypoints/openai/api_server.py
def pooling(request: Request) -> Optional[OpenAIServingPooling]:
    return request.app.state.openai_serving_pooling

rerank

rerank(request: Request) -> Optional[ServingScores]
Source code in vllm/entrypoints/openai/api_server.py
def rerank(request: Request) -> Optional[ServingScores]:
    return request.app.state.openai_serving_scores

reset_prefix_cache async

reset_prefix_cache(raw_request: Request)

Reset the prefix cache. Note that we currently do not check if the prefix cache is successfully reset in the API server.

Source code in vllm/entrypoints/openai/api_server.py
@router.post("/reset_prefix_cache")
async def reset_prefix_cache(raw_request: Request):
    """
    Reset the prefix cache. Note that we currently do not check if the
    prefix cache is successfully reset in the API server.
    """
    device = None
    device_str = raw_request.query_params.get("device")
    if device_str is not None:
        device = Device[device_str.upper()]
    logger.info("Resetting prefix cache with specific %s...", str(device))
    await engine_client(raw_request).reset_prefix_cache(device)
    return Response(status_code=200)

run_server async

run_server(args, **uvicorn_kwargs) -> None

Run a single-worker API server.

Source code in vllm/entrypoints/openai/api_server.py
async def run_server(args, **uvicorn_kwargs) -> None:
    """Run a single-worker API server."""
    listen_address, sock = setup_server(args)
    await run_server_worker(listen_address, sock, args, **uvicorn_kwargs)

run_server_worker async

run_server_worker(
    listen_address,
    sock,
    args,
    client_config=None,
    **uvicorn_kwargs,
) -> None

Run a single API server worker.

Source code in vllm/entrypoints/openai/api_server.py
async def run_server_worker(listen_address,
                            sock,
                            args,
                            client_config=None,
                            **uvicorn_kwargs) -> None:
    """Run a single API server worker."""

    if args.tool_parser_plugin and len(args.tool_parser_plugin) > 3:
        ToolParserManager.import_tool_parser(args.tool_parser_plugin)

    server_index = client_config.get("client_index", 0) if client_config else 0

    # Load logging config for uvicorn if specified
    log_config = load_log_config(args.log_config_file)
    if log_config is not None:
        uvicorn_kwargs['log_config'] = log_config

    async with build_async_engine_client(args, client_config) as engine_client:
        app = build_app(args)

        vllm_config = await engine_client.get_vllm_config()
        await init_app_state(engine_client, vllm_config, app.state, args)

        logger.info("Starting vLLM API server %d on %s", server_index,
                    listen_address)
        shutdown_task = await serve_http(
            app,
            sock=sock,
            enable_ssl_refresh=args.enable_ssl_refresh,
            host=args.host,
            port=args.port,
            log_level=args.uvicorn_log_level,
            # NOTE: When the 'disable_uvicorn_access_log' value is True,
            # no access log will be output.
            access_log=not args.disable_uvicorn_access_log,
            timeout_keep_alive=envs.VLLM_HTTP_TIMEOUT_KEEP_ALIVE,
            ssl_keyfile=args.ssl_keyfile,
            ssl_certfile=args.ssl_certfile,
            ssl_ca_certs=args.ssl_ca_certs,
            ssl_cert_reqs=args.ssl_cert_reqs,
            **uvicorn_kwargs,
        )

    # NB: Await server shutdown only after the backend context is exited
    try:
        await shutdown_task
    finally:
        sock.close()

score

score(request: Request) -> Optional[ServingScores]
Source code in vllm/entrypoints/openai/api_server.py
def score(request: Request) -> Optional[ServingScores]:
    return request.app.state.openai_serving_scores

setup_server

setup_server(args)

Validate API server args, set up signal handler, create socket ready to serve.

Source code in vllm/entrypoints/openai/api_server.py
def setup_server(args):
    """Validate API server args, set up signal handler, create socket
    ready to serve."""

    logger.info("vLLM API server version %s", VLLM_VERSION)
    log_non_default_args(args)

    if args.tool_parser_plugin and len(args.tool_parser_plugin) > 3:
        ToolParserManager.import_tool_parser(args.tool_parser_plugin)

    validate_api_server_args(args)

    # workaround to make sure that we bind the port before the engine is set up.
    # This avoids race conditions with ray.
    # see https://github.com/vllm-project/vllm/issues/8204
    sock_addr = (args.host or "", args.port)
    sock = create_server_socket(sock_addr)

    # workaround to avoid footguns where uvicorn drops requests with too
    # many concurrent requests active
    set_ulimit()

    def signal_handler(*_) -> None:
        # Interrupt server on sigterm while initializing
        raise KeyboardInterrupt("terminated")

    signal.signal(signal.SIGTERM, signal_handler)

    addr, port = sock_addr
    is_ssl = args.ssl_keyfile and args.ssl_certfile
    host_part = f"[{addr}]" if is_valid_ipv6_address(
        addr) else addr or "0.0.0.0"
    listen_address = f"http{'s' if is_ssl else ''}://{host_part}:{port}"

    return listen_address, sock

show_available_models async

show_available_models(raw_request: Request)
Source code in vllm/entrypoints/openai/api_server.py
@router.get("/v1/models")
async def show_available_models(raw_request: Request):
    handler = models(raw_request)

    models_ = await handler.show_available_models()
    return JSONResponse(content=models_.model_dump())

show_server_info async

show_server_info(raw_request: Request)
Source code in vllm/entrypoints/openai/api_server.py
@router.get("/server_info")
async def show_server_info(raw_request: Request):
    server_info = {"vllm_config": str(raw_request.app.state.vllm_config)}
    return JSONResponse(content=server_info)

show_version async

show_version()
Source code in vllm/entrypoints/openai/api_server.py
@router.get("/version")
async def show_version():
    ver = {"version": VLLM_VERSION}
    return JSONResponse(content=ver)

sleep async

sleep(raw_request: Request)
Source code in vllm/entrypoints/openai/api_server.py
@router.post("/sleep")
async def sleep(raw_request: Request):
    # get POST params
    level = raw_request.query_params.get("level", "1")
    await engine_client(raw_request).sleep(int(level))
    # FIXME: in v0 with frontend multiprocessing, the sleep command
    # is sent but does not finish yet when we return a response.
    return Response(status_code=200)

start_profile async

start_profile(raw_request: Request)
Source code in vllm/entrypoints/openai/api_server.py
@router.post("/start_profile")
async def start_profile(raw_request: Request):
    logger.info("Starting profiler...")
    await engine_client(raw_request).start_profile()
    logger.info("Profiler started.")
    return Response(status_code=200)

stop_profile async

stop_profile(raw_request: Request)
Source code in vllm/entrypoints/openai/api_server.py
@router.post("/stop_profile")
async def stop_profile(raw_request: Request):
    logger.info("Stopping profiler...")
    await engine_client(raw_request).stop_profile()
    logger.info("Profiler stopped.")
    return Response(status_code=200)

tokenization

tokenization(request: Request) -> OpenAIServingTokenization
Source code in vllm/entrypoints/openai/api_server.py
def tokenization(request: Request) -> OpenAIServingTokenization:
    return request.app.state.openai_serving_tokenization

tokenize async

tokenize(request: TokenizeRequest, raw_request: Request)
Source code in vllm/entrypoints/openai/api_server.py
@router.post("/tokenize",
             dependencies=[Depends(validate_json_request)],
             responses={
                 HTTPStatus.BAD_REQUEST.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.NOT_FOUND.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.INTERNAL_SERVER_ERROR.value: {
                     "model": ErrorResponse
                 },
                 HTTPStatus.NOT_IMPLEMENTED.value: {
                     "model": ErrorResponse
                 },
             })
@with_cancellation
async def tokenize(request: TokenizeRequest, raw_request: Request):
    handler = tokenization(raw_request)

    try:
        generator = await handler.create_tokenize(request, raw_request)
    except NotImplementedError as e:
        raise HTTPException(status_code=HTTPStatus.NOT_IMPLEMENTED.value,
                            detail=str(e)) from e
    except Exception as e:
        raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value,
                            detail=str(e)) from e

    if isinstance(generator, ErrorResponse):
        return JSONResponse(content=generator.model_dump(),
                            status_code=generator.code)
    elif isinstance(generator, TokenizeResponse):
        return JSONResponse(content=generator.model_dump())

    assert_never(generator)

transcription

transcription(
    request: Request,
) -> OpenAIServingTranscription
Source code in vllm/entrypoints/openai/api_server.py
def transcription(request: Request) -> OpenAIServingTranscription:
    return request.app.state.openai_serving_transcription

translation

translation(request: Request) -> OpenAIServingTranslation
Source code in vllm/entrypoints/openai/api_server.py
def translation(request: Request) -> OpenAIServingTranslation:
    return request.app.state.openai_serving_translation

unload_lora_adapter async

unload_lora_adapter(
    request: UnloadLoRAAdapterRequest, raw_request: Request
)
Source code in vllm/entrypoints/openai/api_server.py
@router.post("/v1/unload_lora_adapter",
             dependencies=[Depends(validate_json_request)])
async def unload_lora_adapter(request: UnloadLoRAAdapterRequest,
                              raw_request: Request):
    handler = models(raw_request)
    response = await handler.unload_lora_adapter(request)
    if isinstance(response, ErrorResponse):
        return JSONResponse(content=response.model_dump(),
                            status_code=response.code)

    return Response(status_code=200, content=response)

validate_api_server_args

validate_api_server_args(args)
Source code in vllm/entrypoints/openai/api_server.py
def validate_api_server_args(args):
    valid_tool_parses = ToolParserManager.tool_parsers.keys()
    if args.enable_auto_tool_choice \
            and args.tool_call_parser not in valid_tool_parses:
        raise KeyError(f"invalid tool call parser: {args.tool_call_parser} "
                       f"(chose from {{ {','.join(valid_tool_parses)} }})")

    valid_reasoning_parses = ReasoningParserManager.reasoning_parsers.keys()
    if args.reasoning_parser \
        and args.reasoning_parser not in valid_reasoning_parses:
        raise KeyError(
            f"invalid reasoning parser: {args.reasoning_parser} "
            f"(chose from {{ {','.join(valid_reasoning_parses)} }})")

validate_json_request async

validate_json_request(raw_request: Request)
Source code in vllm/entrypoints/openai/api_server.py
async def validate_json_request(raw_request: Request):
    content_type = raw_request.headers.get("content-type", "").lower()
    media_type = content_type.split(";", maxsplit=1)[0]
    if media_type != "application/json":
        raise RequestValidationError(errors=[
            "Unsupported Media Type: Only 'application/json' is allowed"
        ])

wake_up async

wake_up(raw_request: Request)
Source code in vllm/entrypoints/openai/api_server.py
@router.post("/wake_up")
async def wake_up(raw_request: Request):
    tags = raw_request.query_params.getlist("tags")
    if tags == []:
        # set to None to wake up all tags if no tags are provided
        tags = None
    logger.info("wake up the engine with tags: %s", tags)
    await engine_client(raw_request).wake_up(tags)
    # FIXME: in v0 with frontend multiprocessing, the wake-up command
    # is sent but does not finish yet when we return a response.
    return Response(status_code=200)