Skip to content

vllm.distributed.device_communicators.cuda_wrapper

This file is a pure Python wrapper for the cudart library. It avoids the need to compile a separate shared library, and is convenient for use when we just need to call a few functions.

cudaError_t module-attribute

cudaError_t = c_int

cudaMemcpyKind module-attribute

cudaMemcpyKind = c_int

logger module-attribute

logger = init_logger(__name__)

CudaRTLibrary

Source code in vllm/distributed/device_communicators/cuda_wrapper.py
class CudaRTLibrary:
    exported_functions = [
        # ​cudaError_t cudaSetDevice ( int  device )
        Function("cudaSetDevice", cudaError_t, [ctypes.c_int]),
        # cudaError_t 	cudaDeviceSynchronize ( void )
        Function("cudaDeviceSynchronize", cudaError_t, []),
        # ​cudaError_t cudaDeviceReset ( void )
        Function("cudaDeviceReset", cudaError_t, []),

        # const char* 	cudaGetErrorString ( cudaError_t error )
        Function("cudaGetErrorString", ctypes.c_char_p, [cudaError_t]),

        # ​cudaError_t 	cudaMalloc ( void** devPtr, size_t size )
        Function("cudaMalloc", cudaError_t,
                 [ctypes.POINTER(ctypes.c_void_p), ctypes.c_size_t]),
        # ​cudaError_t 	cudaFree ( void* devPtr )
        Function("cudaFree", cudaError_t, [ctypes.c_void_p]),
        # ​cudaError_t cudaMemset ( void* devPtr, int  value, size_t count )
        Function("cudaMemset", cudaError_t,
                 [ctypes.c_void_p, ctypes.c_int, ctypes.c_size_t]),
        # ​cudaError_t cudaMemcpy ( void* dst, const void* src, size_t count, cudaMemcpyKind kind ) # noqa
        Function("cudaMemcpy", cudaError_t, [
            ctypes.c_void_p, ctypes.c_void_p, ctypes.c_size_t, cudaMemcpyKind
        ]),

        # cudaError_t cudaIpcGetMemHandle ( cudaIpcMemHandle_t* handle, void* devPtr ) # noqa
        Function("cudaIpcGetMemHandle", cudaError_t,
                 [ctypes.POINTER(cudaIpcMemHandle_t), ctypes.c_void_p]),
        # ​cudaError_t cudaIpcOpenMemHandle ( void** devPtr, cudaIpcMemHandle_t handle, unsigned int  flags ) # noqa
        Function("cudaIpcOpenMemHandle", cudaError_t, [
            ctypes.POINTER(ctypes.c_void_p), cudaIpcMemHandle_t, ctypes.c_uint
        ]),
    ]

    # class attribute to store the mapping from the path to the library
    # to avoid loading the same library multiple times
    path_to_library_cache: dict[str, Any] = {}

    # class attribute to store the mapping from library path
    #  to the corresponding dictionary
    path_to_dict_mapping: dict[str, dict[str, Any]] = {}

    def __init__(self, so_file: Optional[str] = None):
        if so_file is None:
            so_file = find_loaded_library("libcudart")
            if so_file is None:
                so_file = envs.VLLM_CUDART_SO_PATH  # fallback to env var
            assert so_file is not None, \
                (
                    "libcudart is not loaded in the current process, "
                    "try setting VLLM_CUDART_SO_PATH"
                )
        if so_file not in CudaRTLibrary.path_to_library_cache:
            lib = ctypes.CDLL(so_file)
            CudaRTLibrary.path_to_library_cache[so_file] = lib
        self.lib = CudaRTLibrary.path_to_library_cache[so_file]

        if so_file not in CudaRTLibrary.path_to_dict_mapping:
            _funcs = {}
            for func in CudaRTLibrary.exported_functions:
                f = getattr(self.lib, func.name)
                f.restype = func.restype
                f.argtypes = func.argtypes
                _funcs[func.name] = f
            CudaRTLibrary.path_to_dict_mapping[so_file] = _funcs
        self.funcs = CudaRTLibrary.path_to_dict_mapping[so_file]

    def CUDART_CHECK(self, result: cudaError_t) -> None:
        if result != 0:
            error_str = self.cudaGetErrorString(result)
            raise RuntimeError(f"CUDART error: {error_str}")

    def cudaGetErrorString(self, error: cudaError_t) -> str:
        return self.funcs["cudaGetErrorString"](error).decode("utf-8")

    def cudaSetDevice(self, device: int) -> None:
        self.CUDART_CHECK(self.funcs["cudaSetDevice"](device))

    def cudaDeviceSynchronize(self) -> None:
        self.CUDART_CHECK(self.funcs["cudaDeviceSynchronize"]())

    def cudaDeviceReset(self) -> None:
        self.CUDART_CHECK(self.funcs["cudaDeviceReset"]())

    def cudaMalloc(self, size: int) -> ctypes.c_void_p:
        devPtr = ctypes.c_void_p()
        self.CUDART_CHECK(self.funcs["cudaMalloc"](ctypes.byref(devPtr), size))
        return devPtr

    def cudaFree(self, devPtr: ctypes.c_void_p) -> None:
        self.CUDART_CHECK(self.funcs["cudaFree"](devPtr))

    def cudaMemset(self, devPtr: ctypes.c_void_p, value: int,
                   count: int) -> None:
        self.CUDART_CHECK(self.funcs["cudaMemset"](devPtr, value, count))

    def cudaMemcpy(self, dst: ctypes.c_void_p, src: ctypes.c_void_p,
                   count: int) -> None:
        cudaMemcpyDefault = 4
        kind = cudaMemcpyDefault
        self.CUDART_CHECK(self.funcs["cudaMemcpy"](dst, src, count, kind))

    def cudaIpcGetMemHandle(self,
                            devPtr: ctypes.c_void_p) -> cudaIpcMemHandle_t:
        handle = cudaIpcMemHandle_t()
        self.CUDART_CHECK(self.funcs["cudaIpcGetMemHandle"](
            ctypes.byref(handle), devPtr))
        return handle

    def cudaIpcOpenMemHandle(self,
                             handle: cudaIpcMemHandle_t) -> ctypes.c_void_p:
        cudaIpcMemLazyEnablePeerAccess = 1
        devPtr = ctypes.c_void_p()
        self.CUDART_CHECK(self.funcs["cudaIpcOpenMemHandle"](
            ctypes.byref(devPtr), handle, cudaIpcMemLazyEnablePeerAccess))
        return devPtr

exported_functions class-attribute instance-attribute

exported_functions = [
    Function("cudaSetDevice", cudaError_t, [c_int]),
    Function("cudaDeviceSynchronize", cudaError_t, []),
    Function("cudaDeviceReset", cudaError_t, []),
    Function("cudaGetErrorString", c_char_p, [cudaError_t]),
    Function(
        "cudaMalloc",
        cudaError_t,
        [POINTER(c_void_p), c_size_t],
    ),
    Function("cudaFree", cudaError_t, [c_void_p]),
    Function(
        "cudaMemset",
        cudaError_t,
        [c_void_p, c_int, c_size_t],
    ),
    Function(
        "cudaMemcpy",
        cudaError_t,
        [c_void_p, c_void_p, c_size_t, cudaMemcpyKind],
    ),
    Function(
        "cudaIpcGetMemHandle",
        cudaError_t,
        [POINTER(cudaIpcMemHandle_t), c_void_p],
    ),
    Function(
        "cudaIpcOpenMemHandle",
        cudaError_t,
        [POINTER(c_void_p), cudaIpcMemHandle_t, c_uint],
    ),
]

funcs instance-attribute

funcs = path_to_dict_mapping[so_file]

lib instance-attribute

lib = path_to_library_cache[so_file]

path_to_dict_mapping class-attribute instance-attribute

path_to_dict_mapping: dict[str, dict[str, Any]] = {}

path_to_library_cache class-attribute instance-attribute

path_to_library_cache: dict[str, Any] = {}

CUDART_CHECK

CUDART_CHECK(result: cudaError_t) -> None
Source code in vllm/distributed/device_communicators/cuda_wrapper.py
def CUDART_CHECK(self, result: cudaError_t) -> None:
    if result != 0:
        error_str = self.cudaGetErrorString(result)
        raise RuntimeError(f"CUDART error: {error_str}")

__init__

__init__(so_file: Optional[str] = None)
Source code in vllm/distributed/device_communicators/cuda_wrapper.py
def __init__(self, so_file: Optional[str] = None):
    if so_file is None:
        so_file = find_loaded_library("libcudart")
        if so_file is None:
            so_file = envs.VLLM_CUDART_SO_PATH  # fallback to env var
        assert so_file is not None, \
            (
                "libcudart is not loaded in the current process, "
                "try setting VLLM_CUDART_SO_PATH"
            )
    if so_file not in CudaRTLibrary.path_to_library_cache:
        lib = ctypes.CDLL(so_file)
        CudaRTLibrary.path_to_library_cache[so_file] = lib
    self.lib = CudaRTLibrary.path_to_library_cache[so_file]

    if so_file not in CudaRTLibrary.path_to_dict_mapping:
        _funcs = {}
        for func in CudaRTLibrary.exported_functions:
            f = getattr(self.lib, func.name)
            f.restype = func.restype
            f.argtypes = func.argtypes
            _funcs[func.name] = f
        CudaRTLibrary.path_to_dict_mapping[so_file] = _funcs
    self.funcs = CudaRTLibrary.path_to_dict_mapping[so_file]

cudaDeviceReset

cudaDeviceReset() -> None
Source code in vllm/distributed/device_communicators/cuda_wrapper.py
def cudaDeviceReset(self) -> None:
    self.CUDART_CHECK(self.funcs["cudaDeviceReset"]())

cudaDeviceSynchronize

cudaDeviceSynchronize() -> None
Source code in vllm/distributed/device_communicators/cuda_wrapper.py
def cudaDeviceSynchronize(self) -> None:
    self.CUDART_CHECK(self.funcs["cudaDeviceSynchronize"]())

cudaFree

cudaFree(devPtr: c_void_p) -> None
Source code in vllm/distributed/device_communicators/cuda_wrapper.py
def cudaFree(self, devPtr: ctypes.c_void_p) -> None:
    self.CUDART_CHECK(self.funcs["cudaFree"](devPtr))

cudaGetErrorString

cudaGetErrorString(error: cudaError_t) -> str
Source code in vllm/distributed/device_communicators/cuda_wrapper.py
def cudaGetErrorString(self, error: cudaError_t) -> str:
    return self.funcs["cudaGetErrorString"](error).decode("utf-8")

cudaIpcGetMemHandle

cudaIpcGetMemHandle(devPtr: c_void_p) -> cudaIpcMemHandle_t
Source code in vllm/distributed/device_communicators/cuda_wrapper.py
def cudaIpcGetMemHandle(self,
                        devPtr: ctypes.c_void_p) -> cudaIpcMemHandle_t:
    handle = cudaIpcMemHandle_t()
    self.CUDART_CHECK(self.funcs["cudaIpcGetMemHandle"](
        ctypes.byref(handle), devPtr))
    return handle

cudaIpcOpenMemHandle

cudaIpcOpenMemHandle(
    handle: cudaIpcMemHandle_t,
) -> c_void_p
Source code in vllm/distributed/device_communicators/cuda_wrapper.py
def cudaIpcOpenMemHandle(self,
                         handle: cudaIpcMemHandle_t) -> ctypes.c_void_p:
    cudaIpcMemLazyEnablePeerAccess = 1
    devPtr = ctypes.c_void_p()
    self.CUDART_CHECK(self.funcs["cudaIpcOpenMemHandle"](
        ctypes.byref(devPtr), handle, cudaIpcMemLazyEnablePeerAccess))
    return devPtr

cudaMalloc

cudaMalloc(size: int) -> c_void_p
Source code in vllm/distributed/device_communicators/cuda_wrapper.py
def cudaMalloc(self, size: int) -> ctypes.c_void_p:
    devPtr = ctypes.c_void_p()
    self.CUDART_CHECK(self.funcs["cudaMalloc"](ctypes.byref(devPtr), size))
    return devPtr

cudaMemcpy

cudaMemcpy(
    dst: c_void_p, src: c_void_p, count: int
) -> None
Source code in vllm/distributed/device_communicators/cuda_wrapper.py
def cudaMemcpy(self, dst: ctypes.c_void_p, src: ctypes.c_void_p,
               count: int) -> None:
    cudaMemcpyDefault = 4
    kind = cudaMemcpyDefault
    self.CUDART_CHECK(self.funcs["cudaMemcpy"](dst, src, count, kind))

cudaMemset

cudaMemset(
    devPtr: c_void_p, value: int, count: int
) -> None
Source code in vllm/distributed/device_communicators/cuda_wrapper.py
def cudaMemset(self, devPtr: ctypes.c_void_p, value: int,
               count: int) -> None:
    self.CUDART_CHECK(self.funcs["cudaMemset"](devPtr, value, count))

cudaSetDevice

cudaSetDevice(device: int) -> None
Source code in vllm/distributed/device_communicators/cuda_wrapper.py
def cudaSetDevice(self, device: int) -> None:
    self.CUDART_CHECK(self.funcs["cudaSetDevice"](device))

Function dataclass

Source code in vllm/distributed/device_communicators/cuda_wrapper.py
@dataclass
class Function:
    name: str
    restype: Any
    argtypes: list[Any]

argtypes instance-attribute

argtypes: list[Any]

name instance-attribute

name: str

restype instance-attribute

restype: Any

__init__

__init__(
    name: str, restype: Any, argtypes: list[Any]
) -> None

cudaIpcMemHandle_t

Bases: Structure

Source code in vllm/distributed/device_communicators/cuda_wrapper.py
class cudaIpcMemHandle_t(ctypes.Structure):
    _fields_ = [("internal", ctypes.c_byte * 128)]

_fields_ class-attribute instance-attribute

_fields_ = [('internal', c_byte * 128)]

find_loaded_library

find_loaded_library(lib_name) -> Optional[str]

According to according to https://man7.org/linux/man-pages/man5/proc_pid_maps.5.html, the file /proc/self/maps contains the memory maps of the process, which includes the shared libraries loaded by the process. We can use this file to find the path of the a loaded library.

Source code in vllm/distributed/device_communicators/cuda_wrapper.py
def find_loaded_library(lib_name) -> Optional[str]:
    """
    According to according to https://man7.org/linux/man-pages/man5/proc_pid_maps.5.html,
    the file `/proc/self/maps` contains the memory maps of the process, which includes the
    shared libraries loaded by the process. We can use this file to find the path of the
    a loaded library.
    """ # noqa
    found = False
    with open("/proc/self/maps") as f:
        for line in f:
            if lib_name in line:
                found = True
                break
    if not found:
        # the library is not loaded in the current process
        return None
    # if lib_name is libcudart, we need to match a line with:
    # address /path/to/libcudart-hash.so.11.0
    start = line.index("/")
    path = line[start:].strip()
    filename = path.split("/")[-1]
    assert filename.rpartition(".so")[0].startswith(lib_name), \
        f"Unexpected filename: {filename} for library {lib_name}"
    return path