Skip to content

vllm.transformers_utils.processors

Modules:

Name Description
deepseek_vl2
ovis

__all__ module-attribute

__all__ = ['DeepseekVLV2Processor', 'OvisProcessor']

DeepseekVLV2Processor

Bases: ProcessorMixin

Source code in vllm/transformers_utils/processors/deepseek_vl2.py
class DeepseekVLV2Processor(ProcessorMixin):
    tokenizer_class = ("LlamaTokenizer", "LlamaTokenizerFast")
    attributes = ["tokenizer"]

    def __init__(
        self,
        tokenizer: LlamaTokenizerFast,
        candidate_resolutions: tuple[tuple[int, int]],
        patch_size: int,
        downsample_ratio: int,
        image_mean: tuple[float, float, float] = (0.5, 0.5, 0.5),
        image_std: tuple[float, float, float] = (0.5, 0.5, 0.5),
        normalize: bool = True,
        image_token: str = "<image>",
        pad_token: str = "<|▁pad▁|>",
        add_special_token: bool = False,
        sft_format: str = "deepseek",
        mask_prompt: bool = True,
        ignore_id: int = -100,
        **kwargs,
    ):

        self.candidate_resolutions = candidate_resolutions
        self.image_size = candidate_resolutions[0][0]
        self.patch_size = patch_size
        self.image_mean = image_mean
        self.image_std = image_std
        self.normalize = normalize
        self.downsample_ratio = downsample_ratio

        self.image_transform = ImageTransform(mean=image_mean, std=image_std, normalize=normalize)
        self.tokenizer = tokenizer
        self.tokenizer.padding_side = 'left'  # must set this,padding side with make a difference in batch inference

        # add the pad_token as special token to use 'tokenizer.pad_token' and 'tokenizer.pad_token_id'
        if tokenizer.pad_token is None:
            self.tokenizer.add_special_tokens({'pad_token': pad_token})

        # add image token
        image_token_id = self.tokenizer.vocab.get(image_token)
        if image_token_id is None:
            special_tokens = [image_token]
            special_tokens_dict = {"additional_special_tokens": special_tokens}
            self.tokenizer.add_special_tokens(special_tokens_dict)
        self.image_token_id = self.tokenizer.vocab.get(image_token)

        # add five special tokens for grounding-related tasks
        # <|ref|>, <|/ref|>, <|det|>, <|/det|>, <|grounding|>
        special_tokens = ['<|ref|>', '<|/ref|>', '<|det|>', '<|/det|>', '<|grounding|>']
        special_tokens_dict = {"additional_special_tokens": special_tokens}
        self.tokenizer.add_special_tokens(special_tokens_dict)

        # add special tokens for SFT data
        special_tokens = ["<|User|>", "<|Assistant|>"]
        special_tokens_dict = {"additional_special_tokens": special_tokens}
        self.tokenizer.add_special_tokens(special_tokens_dict)

        self.image_token = image_token
        self.pad_token = pad_token
        self.add_special_token = add_special_token
        self.sft_format = sft_format
        self.mask_prompt = mask_prompt
        self.ignore_id = ignore_id

        super().__init__(
            tokenizer,
            **kwargs,
        )

    def select_best_resolution(self, image_size):
        # used for cropping
        original_width, original_height = image_size
        best_fit = None
        max_effective_resolution = 0
        min_wasted_resolution = float("inf")

        for width, height in self.candidate_resolutions:
            scale = min(width / original_width, height / original_height)
            downscaled_width, downscaled_height = int(
                original_width * scale), int(original_height * scale)
            effective_resolution = min(downscaled_width * downscaled_height,
                                       original_width * original_height)
            wasted_resolution = (width * height) - effective_resolution

            if effective_resolution > max_effective_resolution or (
                    effective_resolution == max_effective_resolution
                    and wasted_resolution < min_wasted_resolution):
                max_effective_resolution = effective_resolution
                min_wasted_resolution = wasted_resolution
                best_fit = (width, height)

        return best_fit

    @property
    def bos_id(self):
        return self.tokenizer.bos_token_id

    @property
    def eos_id(self):
        return self.tokenizer.eos_token_id

    @property
    def pad_id(self):
        return self.tokenizer.pad_token_id

    def encode(self, text: str, bos: bool = True, eos: bool = False):
        t = self.tokenizer.encode(text, add_special_tokens=False)

        if bos:
            t = [self.bos_id] + t
        if eos:
            t = t + [self.eos_id]

        return t

    def decode(self, t: list[int], **kwargs) -> str:
        return self.tokenizer.decode(t, **kwargs)

    def process_one(
        self,
        prompt: str,
        images: list[Image.Image],
        inference_mode: bool = True,
        **kwargs,
    ):
        """

        Args:
            prompt (str): the formatted prompt;
            conversations (list[dict]): conversations with a list of messages;
            images (list[ImageType]): the list of images;
            inference_mode (bool): if True, then remove the last eos token;
            system_prompt (str): the system prompt;
            **kwargs:

        Returns:
            outputs (BaseProcessorOutput): the output of the processor,
                - input_ids (torch.LongTensor): [N + image tokens]
                - target_ids (torch.LongTensor): [N + image tokens]
                - pixel_values (torch.FloatTensor): [n_patches, 3, H, W]
                - image_id (int): the id of the image token
                - num_image_tokens (list[int]): the number of image tokens
        """

        assert (prompt is not None and images is not None
                ), "prompt and images must be used at the same time."

        sft_format = prompt
        tokenized_str, images_list, images_seq_mask, images_spatial_crop, num_image_tokens = self.tokenize_with_images(
            sft_format, images, bos=True, eos=True, cropping=len(images) <= 2)
        masked_tokenized_str = []
        for token_index in tokenized_str:
            if token_index != self.image_token_id:
                masked_tokenized_str.append(token_index)
            else:
                masked_tokenized_str.append(self.ignore_id)

        assert len(tokenized_str) == len(images_seq_mask) == len(masked_tokenized_str), \
            (f"tokenized_str's length {len(tokenized_str)}, input_ids' length {len(masked_tokenized_str)}, "
             f"imags_seq_mask's length {len(images_seq_mask)}, are not equal")

        input_ids = torch.LongTensor(tokenized_str)
        target_ids = torch.LongTensor(masked_tokenized_str)
        images_seq_mask = torch.tensor(images_seq_mask, dtype=torch.bool)

        # set input_ids < 0 | input_ids == self.image_token_id as ignore_id
        target_ids[(input_ids < 0) |
                   (input_ids == self.image_token_id)] = self.ignore_id
        input_ids[input_ids < 0] = self.pad_id

        if inference_mode:
            # Remove the ending eos token
            assert input_ids[-1] == self.eos_id
            input_ids = input_ids[:-1]
            target_ids = target_ids[:-1]
            images_seq_mask = images_seq_mask[:-1]

        if len(images_list) == 0:
            pixel_values = torch.zeros((1, 3, self.image_size, self.image_size))
            images_spatial_crop = torch.zeros((1, 2), dtype=torch.long)
        else:
            pixel_values = torch.stack(images_list, dim=0)
            images_spatial_crop = torch.tensor(images_spatial_crop, dtype=torch.long)

        input_ids = input_ids.unsqueeze(0)

        prepare = BatchFeature(
            data=dict(
                input_ids=input_ids,
                pixel_values=pixel_values,
                images_seq_mask=images_seq_mask,
                images_spatial_crop=images_spatial_crop,
                num_image_tokens=num_image_tokens,
            ),
            tensor_type="pt",
        )
        return prepare

    def __call__(
        self,
        *,
        prompt: str,
        images: list[Image.Image],
        inference_mode: bool = True,
        **kwargs,
    ):
        """

        Args:
            prompt (str): the formatted prompt;
            images (list[ImageType]): the list of images;
            inference_mode (bool): if True, then remove the last eos token;
            **kwargs:

        Returns:
            outputs (BaseProcessorOutput): the output of the processor,
                - input_ids (torch.LongTensor): [N + image tokens]
                - images (torch.FloatTensor): [n_images, 3, H, W]
                - image_id (int): the id of the image token
                - num_image_tokens (list[int]): the number of image tokens
        """

        prepare = self.process_one(
            prompt=prompt,
            images=images,
            inference_mode=inference_mode,
        )

        return prepare

    def tokenize_with_images(
        self,
        conversation: str,
        images: list[Image.Image],
        bos: bool = True,
        eos: bool = True,
        cropping: bool = True,
    ):
        """Tokenize text with <image> tags."""
        assert conversation.count(self.image_token) == len(images)
        text_splits = conversation.split(self.image_token)
        images_list, images_seq_mask, images_spatial_crop = [], [], []
        num_image_tokens = []
        tokenized_str = []
        for text_sep, image in zip(text_splits, images):
            """encode text_sep"""
            tokenized_sep = self.encode(text_sep, bos=False, eos=False)
            tokenized_str += tokenized_sep
            images_seq_mask += [False] * len(tokenized_sep)

            """select best resolution for anyres"""
            if cropping:
                best_width, best_height = self.select_best_resolution(image.size)
            else:
                best_width, best_height = self.image_size, self.image_size

            """process the global view"""
            global_view = ImageOps.pad(image, (self.image_size, self.image_size),
                                       color=tuple(int(x * 255) for x in self.image_transform.mean))
            images_list.append(self.image_transform(global_view))

            """process the local views"""
            local_view = ImageOps.pad(image, (best_width, best_height),
                                      color=tuple(int(x * 255) for x in self.image_transform.mean))
            for i in range(0, best_height, self.image_size):
                for j in range(0, best_width, self.image_size):
                    images_list.append(
                        self.image_transform(local_view.crop((j, i, j + self.image_size, i + self.image_size))))

            """record height / width crop num"""
            num_width_tiles, num_height_tiles = best_width // self.image_size, best_height // self.image_size
            images_spatial_crop.append([num_width_tiles, num_height_tiles])

            """add image tokens"""
            h = w = math.ceil((self.image_size // self.patch_size) / self.downsample_ratio)
            # global views tokens h * (w + 1), 1 is for line separator
            tokenized_image = [self.image_token_id] * h * (w + 1)
            # add a separator between global and local views
            tokenized_image += [self.image_token_id]
            # local views tokens, (num_height_tiles * h) * (num_width_tiles * w + 1)
            tokenized_image += [self.image_token_id] * (num_height_tiles * h) * (num_width_tiles * w + 1)

            tokenized_str += tokenized_image
            images_seq_mask += [True] * len(tokenized_image)
            num_image_tokens.append(len(tokenized_image))

        """process the last text split"""
        tokenized_sep = self.encode(text_splits[-1], bos=False, eos=False)
        tokenized_str += tokenized_sep
        images_seq_mask += [False] * len(tokenized_sep)

        """add the bos and eos tokens"""
        if bos:
            tokenized_str = [self.bos_id] + tokenized_str
            images_seq_mask = [False] + images_seq_mask
        if eos:
            tokenized_str = tokenized_str + [self.eos_id]
            images_seq_mask = images_seq_mask + [False]

        assert len(tokenized_str) == len(
            images_seq_mask), f"tokenize_with_images func: tokenized_str's length {len(tokenized_str)} is not equal to imags_seq_mask's length {len(images_seq_mask)}"

        return tokenized_str, images_list, images_seq_mask, images_spatial_crop, num_image_tokens

add_special_token instance-attribute

add_special_token = add_special_token

attributes class-attribute instance-attribute

attributes = ['tokenizer']

bos_id property

bos_id

candidate_resolutions instance-attribute

candidate_resolutions = candidate_resolutions

downsample_ratio instance-attribute

downsample_ratio = downsample_ratio

eos_id property

eos_id

ignore_id instance-attribute

ignore_id = ignore_id

image_mean instance-attribute

image_mean = image_mean

image_size instance-attribute

image_size = candidate_resolutions[0][0]

image_std instance-attribute

image_std = image_std

image_token instance-attribute

image_token = image_token

image_token_id instance-attribute

image_token_id = get(image_token)

image_transform instance-attribute

image_transform = ImageTransform(
    mean=image_mean, std=image_std, normalize=normalize
)

mask_prompt instance-attribute

mask_prompt = mask_prompt

normalize instance-attribute

normalize = normalize

pad_id property

pad_id

pad_token instance-attribute

pad_token = pad_token

patch_size instance-attribute

patch_size = patch_size

sft_format instance-attribute

sft_format = sft_format

tokenizer instance-attribute

tokenizer = tokenizer

tokenizer_class class-attribute instance-attribute

tokenizer_class = ('LlamaTokenizer', 'LlamaTokenizerFast')

__call__

__call__(
    *,
    prompt: str,
    images: list[Image],
    inference_mode: bool = True,
    **kwargs,
)

Parameters:

Name Type Description Default
prompt str

the formatted prompt;

required
images list[ImageType]

the list of images;

required
inference_mode bool

if True, then remove the last eos token;

True
**kwargs
{}

Returns:

Name Type Description
outputs BaseProcessorOutput

the output of the processor, - input_ids (torch.LongTensor): [N + image tokens] - images (torch.FloatTensor): [n_images, 3, H, W] - image_id (int): the id of the image token - num_image_tokens (list[int]): the number of image tokens

Source code in vllm/transformers_utils/processors/deepseek_vl2.py
def __call__(
    self,
    *,
    prompt: str,
    images: list[Image.Image],
    inference_mode: bool = True,
    **kwargs,
):
    """

    Args:
        prompt (str): the formatted prompt;
        images (list[ImageType]): the list of images;
        inference_mode (bool): if True, then remove the last eos token;
        **kwargs:

    Returns:
        outputs (BaseProcessorOutput): the output of the processor,
            - input_ids (torch.LongTensor): [N + image tokens]
            - images (torch.FloatTensor): [n_images, 3, H, W]
            - image_id (int): the id of the image token
            - num_image_tokens (list[int]): the number of image tokens
    """

    prepare = self.process_one(
        prompt=prompt,
        images=images,
        inference_mode=inference_mode,
    )

    return prepare

__init__

__init__(
    tokenizer: LlamaTokenizerFast,
    candidate_resolutions: tuple[tuple[int, int]],
    patch_size: int,
    downsample_ratio: int,
    image_mean: tuple[float, float, float] = (
        0.5,
        0.5,
        0.5,
    ),
    image_std: tuple[float, float, float] = (0.5, 0.5, 0.5),
    normalize: bool = True,
    image_token: str = "<image>",
    pad_token: str = "<|▁pad▁|>",
    add_special_token: bool = False,
    sft_format: str = "deepseek",
    mask_prompt: bool = True,
    ignore_id: int = -100,
    **kwargs,
)
Source code in vllm/transformers_utils/processors/deepseek_vl2.py
def __init__(
    self,
    tokenizer: LlamaTokenizerFast,
    candidate_resolutions: tuple[tuple[int, int]],
    patch_size: int,
    downsample_ratio: int,
    image_mean: tuple[float, float, float] = (0.5, 0.5, 0.5),
    image_std: tuple[float, float, float] = (0.5, 0.5, 0.5),
    normalize: bool = True,
    image_token: str = "<image>",
    pad_token: str = "<|▁pad▁|>",
    add_special_token: bool = False,
    sft_format: str = "deepseek",
    mask_prompt: bool = True,
    ignore_id: int = -100,
    **kwargs,
):

    self.candidate_resolutions = candidate_resolutions
    self.image_size = candidate_resolutions[0][0]
    self.patch_size = patch_size
    self.image_mean = image_mean
    self.image_std = image_std
    self.normalize = normalize
    self.downsample_ratio = downsample_ratio

    self.image_transform = ImageTransform(mean=image_mean, std=image_std, normalize=normalize)
    self.tokenizer = tokenizer
    self.tokenizer.padding_side = 'left'  # must set this,padding side with make a difference in batch inference

    # add the pad_token as special token to use 'tokenizer.pad_token' and 'tokenizer.pad_token_id'
    if tokenizer.pad_token is None:
        self.tokenizer.add_special_tokens({'pad_token': pad_token})

    # add image token
    image_token_id = self.tokenizer.vocab.get(image_token)
    if image_token_id is None:
        special_tokens = [image_token]
        special_tokens_dict = {"additional_special_tokens": special_tokens}
        self.tokenizer.add_special_tokens(special_tokens_dict)
    self.image_token_id = self.tokenizer.vocab.get(image_token)

    # add five special tokens for grounding-related tasks
    # <|ref|>, <|/ref|>, <|det|>, <|/det|>, <|grounding|>
    special_tokens = ['<|ref|>', '<|/ref|>', '<|det|>', '<|/det|>', '<|grounding|>']
    special_tokens_dict = {"additional_special_tokens": special_tokens}
    self.tokenizer.add_special_tokens(special_tokens_dict)

    # add special tokens for SFT data
    special_tokens = ["<|User|>", "<|Assistant|>"]
    special_tokens_dict = {"additional_special_tokens": special_tokens}
    self.tokenizer.add_special_tokens(special_tokens_dict)

    self.image_token = image_token
    self.pad_token = pad_token
    self.add_special_token = add_special_token
    self.sft_format = sft_format
    self.mask_prompt = mask_prompt
    self.ignore_id = ignore_id

    super().__init__(
        tokenizer,
        **kwargs,
    )

decode

decode(t: list[int], **kwargs) -> str
Source code in vllm/transformers_utils/processors/deepseek_vl2.py
def decode(self, t: list[int], **kwargs) -> str:
    return self.tokenizer.decode(t, **kwargs)

encode

encode(text: str, bos: bool = True, eos: bool = False)
Source code in vllm/transformers_utils/processors/deepseek_vl2.py
def encode(self, text: str, bos: bool = True, eos: bool = False):
    t = self.tokenizer.encode(text, add_special_tokens=False)

    if bos:
        t = [self.bos_id] + t
    if eos:
        t = t + [self.eos_id]

    return t

process_one

process_one(
    prompt: str,
    images: list[Image],
    inference_mode: bool = True,
    **kwargs,
)

Parameters:

Name Type Description Default
prompt str

the formatted prompt;

required
conversations list[dict]

conversations with a list of messages;

required
images list[ImageType]

the list of images;

required
inference_mode bool

if True, then remove the last eos token;

True
system_prompt str

the system prompt;

required
**kwargs
{}

Returns:

Name Type Description
outputs BaseProcessorOutput

the output of the processor, - input_ids (torch.LongTensor): [N + image tokens] - target_ids (torch.LongTensor): [N + image tokens] - pixel_values (torch.FloatTensor): [n_patches, 3, H, W] - image_id (int): the id of the image token - num_image_tokens (list[int]): the number of image tokens

Source code in vllm/transformers_utils/processors/deepseek_vl2.py
def process_one(
    self,
    prompt: str,
    images: list[Image.Image],
    inference_mode: bool = True,
    **kwargs,
):
    """

    Args:
        prompt (str): the formatted prompt;
        conversations (list[dict]): conversations with a list of messages;
        images (list[ImageType]): the list of images;
        inference_mode (bool): if True, then remove the last eos token;
        system_prompt (str): the system prompt;
        **kwargs:

    Returns:
        outputs (BaseProcessorOutput): the output of the processor,
            - input_ids (torch.LongTensor): [N + image tokens]
            - target_ids (torch.LongTensor): [N + image tokens]
            - pixel_values (torch.FloatTensor): [n_patches, 3, H, W]
            - image_id (int): the id of the image token
            - num_image_tokens (list[int]): the number of image tokens
    """

    assert (prompt is not None and images is not None
            ), "prompt and images must be used at the same time."

    sft_format = prompt
    tokenized_str, images_list, images_seq_mask, images_spatial_crop, num_image_tokens = self.tokenize_with_images(
        sft_format, images, bos=True, eos=True, cropping=len(images) <= 2)
    masked_tokenized_str = []
    for token_index in tokenized_str:
        if token_index != self.image_token_id:
            masked_tokenized_str.append(token_index)
        else:
            masked_tokenized_str.append(self.ignore_id)

    assert len(tokenized_str) == len(images_seq_mask) == len(masked_tokenized_str), \
        (f"tokenized_str's length {len(tokenized_str)}, input_ids' length {len(masked_tokenized_str)}, "
         f"imags_seq_mask's length {len(images_seq_mask)}, are not equal")

    input_ids = torch.LongTensor(tokenized_str)
    target_ids = torch.LongTensor(masked_tokenized_str)
    images_seq_mask = torch.tensor(images_seq_mask, dtype=torch.bool)

    # set input_ids < 0 | input_ids == self.image_token_id as ignore_id
    target_ids[(input_ids < 0) |
               (input_ids == self.image_token_id)] = self.ignore_id
    input_ids[input_ids < 0] = self.pad_id

    if inference_mode:
        # Remove the ending eos token
        assert input_ids[-1] == self.eos_id
        input_ids = input_ids[:-1]
        target_ids = target_ids[:-1]
        images_seq_mask = images_seq_mask[:-1]

    if len(images_list) == 0:
        pixel_values = torch.zeros((1, 3, self.image_size, self.image_size))
        images_spatial_crop = torch.zeros((1, 2), dtype=torch.long)
    else:
        pixel_values = torch.stack(images_list, dim=0)
        images_spatial_crop = torch.tensor(images_spatial_crop, dtype=torch.long)

    input_ids = input_ids.unsqueeze(0)

    prepare = BatchFeature(
        data=dict(
            input_ids=input_ids,
            pixel_values=pixel_values,
            images_seq_mask=images_seq_mask,
            images_spatial_crop=images_spatial_crop,
            num_image_tokens=num_image_tokens,
        ),
        tensor_type="pt",
    )
    return prepare

select_best_resolution

select_best_resolution(image_size)
Source code in vllm/transformers_utils/processors/deepseek_vl2.py
def select_best_resolution(self, image_size):
    # used for cropping
    original_width, original_height = image_size
    best_fit = None
    max_effective_resolution = 0
    min_wasted_resolution = float("inf")

    for width, height in self.candidate_resolutions:
        scale = min(width / original_width, height / original_height)
        downscaled_width, downscaled_height = int(
            original_width * scale), int(original_height * scale)
        effective_resolution = min(downscaled_width * downscaled_height,
                                   original_width * original_height)
        wasted_resolution = (width * height) - effective_resolution

        if effective_resolution > max_effective_resolution or (
                effective_resolution == max_effective_resolution
                and wasted_resolution < min_wasted_resolution):
            max_effective_resolution = effective_resolution
            min_wasted_resolution = wasted_resolution
            best_fit = (width, height)

    return best_fit

tokenize_with_images

tokenize_with_images(
    conversation: str,
    images: list[Image],
    bos: bool = True,
    eos: bool = True,
    cropping: bool = True,
)

Tokenize text with tags.

Source code in vllm/transformers_utils/processors/deepseek_vl2.py
def tokenize_with_images(
    self,
    conversation: str,
    images: list[Image.Image],
    bos: bool = True,
    eos: bool = True,
    cropping: bool = True,
):
    """Tokenize text with <image> tags."""
    assert conversation.count(self.image_token) == len(images)
    text_splits = conversation.split(self.image_token)
    images_list, images_seq_mask, images_spatial_crop = [], [], []
    num_image_tokens = []
    tokenized_str = []
    for text_sep, image in zip(text_splits, images):
        """encode text_sep"""
        tokenized_sep = self.encode(text_sep, bos=False, eos=False)
        tokenized_str += tokenized_sep
        images_seq_mask += [False] * len(tokenized_sep)

        """select best resolution for anyres"""
        if cropping:
            best_width, best_height = self.select_best_resolution(image.size)
        else:
            best_width, best_height = self.image_size, self.image_size

        """process the global view"""
        global_view = ImageOps.pad(image, (self.image_size, self.image_size),
                                   color=tuple(int(x * 255) for x in self.image_transform.mean))
        images_list.append(self.image_transform(global_view))

        """process the local views"""
        local_view = ImageOps.pad(image, (best_width, best_height),
                                  color=tuple(int(x * 255) for x in self.image_transform.mean))
        for i in range(0, best_height, self.image_size):
            for j in range(0, best_width, self.image_size):
                images_list.append(
                    self.image_transform(local_view.crop((j, i, j + self.image_size, i + self.image_size))))

        """record height / width crop num"""
        num_width_tiles, num_height_tiles = best_width // self.image_size, best_height // self.image_size
        images_spatial_crop.append([num_width_tiles, num_height_tiles])

        """add image tokens"""
        h = w = math.ceil((self.image_size // self.patch_size) / self.downsample_ratio)
        # global views tokens h * (w + 1), 1 is for line separator
        tokenized_image = [self.image_token_id] * h * (w + 1)
        # add a separator between global and local views
        tokenized_image += [self.image_token_id]
        # local views tokens, (num_height_tiles * h) * (num_width_tiles * w + 1)
        tokenized_image += [self.image_token_id] * (num_height_tiles * h) * (num_width_tiles * w + 1)

        tokenized_str += tokenized_image
        images_seq_mask += [True] * len(tokenized_image)
        num_image_tokens.append(len(tokenized_image))

    """process the last text split"""
    tokenized_sep = self.encode(text_splits[-1], bos=False, eos=False)
    tokenized_str += tokenized_sep
    images_seq_mask += [False] * len(tokenized_sep)

    """add the bos and eos tokens"""
    if bos:
        tokenized_str = [self.bos_id] + tokenized_str
        images_seq_mask = [False] + images_seq_mask
    if eos:
        tokenized_str = tokenized_str + [self.eos_id]
        images_seq_mask = images_seq_mask + [False]

    assert len(tokenized_str) == len(
        images_seq_mask), f"tokenize_with_images func: tokenized_str's length {len(tokenized_str)} is not equal to imags_seq_mask's length {len(images_seq_mask)}"

    return tokenized_str, images_list, images_seq_mask, images_spatial_crop, num_image_tokens

OvisProcessor

Bases: ProcessorMixin

Constructs a Ovis processor which wraps a Ovis image processor and a Qwen2 tokenizer into a single processor. [OvisProcessor] offers all the functionalities of [Qwen2VLImageProcessor] and [Qwen2TokenizerFast]. See the [~OvisProcessor.__call__] and [~OvisProcessor.decode] for more information. Args: image_processor ([Qwen2VLImageProcessor], optional): The image processor is a required input. tokenizer ([Qwen2TokenizerFast], optional): The tokenizer is a required input. chat_template (str, optional): A Jinja template which will be used to convert lists of messages in a chat into a tokenizable string.

Source code in vllm/transformers_utils/processors/ovis.py
class OvisProcessor(ProcessorMixin):
    r"""
    Constructs a Ovis processor which wraps a Ovis image processor and a Qwen2 tokenizer into a single processor.
    [`OvisProcessor`] offers all the functionalities of [`Qwen2VLImageProcessor`] and [`Qwen2TokenizerFast`]. See the
    [`~OvisProcessor.__call__`] and [`~OvisProcessor.decode`] for more information.
    Args:
        image_processor ([`Qwen2VLImageProcessor`], *optional*):
            The image processor is a required input.
        tokenizer ([`Qwen2TokenizerFast`], *optional*):
            The tokenizer is a required input.
        chat_template (`str`, *optional*): A Jinja template which will be used to convert lists of messages
            in a chat into a tokenizable string.
    """

    attributes = ["image_processor", "tokenizer"]
    valid_kwargs = ["chat_template", "image_pad_token", "image_segment_len"]

    image_processor_class = "AutoImageProcessor"
    tokenizer_class = "AutoTokenizer"

    def __init__(
        self,
        image_processor=None,
        tokenizer=None,
        chat_template=None,
        image_pad_token=None,
        image_segment_len=255,
        **kwargs,
    ):
        self.image_token = "<image>"
        self.image_pad_token = image_pad_token
        self.image_segment_len = image_segment_len
        super().__init__(image_processor, tokenizer, chat_template=chat_template)

    @cached_property
    def extra_special_tokens(self):
        image_pad_token_id = self.tokenizer.get_vocab()[self.image_pad_token]
        extra_special_tokens = {
            "image_token": -200,
            "image_atom": -300,
            "image_start": -301,
            "image_prefix": -302,
            "image_col_sep": -303,
            "image_row_sep": -304,
            "image_end": -305,
            'image_pad': image_pad_token_id,
        }
        return extra_special_tokens

    def __call__(
        self,
        images: ImageInput = None,
        text: Union[TextInput, PreTokenizedInput, list[TextInput], list[PreTokenizedInput]] = None,
        **kwargs: Unpack[OvisProcessorKwargs],
    ) -> BatchFeature:
        """
        Main method to prepare for the model one or several sequences(s) and image(s). This method forwards the `text`
        and `kwargs` arguments to Qwen2TokenizerFast's [`~Qwen2TokenizerFast.__call__`] if `text` is not `None` to encode
        the text. To prepare the vision inputs, this method forwards the `vision_infos` and `kwrags` arguments to
        Qwen2VLImageProcessor's [`~Qwen2VLImageProcessor.__call__`] if `vision_infos` is not `None`.
            Args:
                images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `list[PIL.Image.Image]`, `list[np.ndarray]`, `list[torch.Tensor]`):
                    The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch
                    tensor. Both channels-first and channels-last formats are supported.
                text (`str`, `list[str]`, `list[list[str]]`):
                    The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings
                    (pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set
                    `is_split_into_words=True` (to lift the ambiguity with a batch of sequences).
                videos (`np.ndarray`, `torch.Tensor`, `list[np.ndarray]`, `list[torch.Tensor]`):
                    The image or batch of videos to be prepared. Each video can be a 4D NumPy array or PyTorch
                    tensor, or a nested list of 3D frames. Both channels-first and channels-last formats are supported.
                return_tensors (`str` or [`~utils.TensorType`], *optional*):
                    If set, will return tensors of a particular framework. Acceptable values are:
                    - `'tf'`: Return TensorFlow `tf.constant` objects.
                    - `'pt'`: Return PyTorch `torch.Tensor` objects.
                    - `'np'`: Return NumPy `np.ndarray` objects.
                    - `'jax'`: Return JAX `jnp.ndarray` objects.
            Returns:
                [`BatchFeature`]: A [`BatchFeature`] with the following fields:
                - **input_ids** -- List of token ids to be fed to a model. Returned when `text` is not `None`.
                - **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when
                  `return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not
                  `None`).
                - **pixel_values** -- Pixel values to be fed to a model. Returned when `images` is not `None`.
                - **pixel_values_videos** -- Pixel values of videos to be fed to a model. Returned when `videos` is not `None`.
                - **image_grid_thw** -- List of image 3D grid in LLM. Returned when `images` is not `None`.
                - **video_grid_thw** -- List of video 3D grid in LLM. Returned when `videos` is not `None`.
                - **second_per_grid_ts** -- List of video seconds per time grid. Returned when `videos` is not `None`.
        """
        output_kwargs = self._merge_kwargs(
            OvisProcessorKwargs,
            tokenizer_init_kwargs=self.tokenizer.init_kwargs,
            **kwargs,
        )

        # Process all images first
        image_features = {}
        if images is not None:
            processed_images = []
            image_placeholders_list = []
            grids = []

            # Process each image
            for image in images if isinstance(images, list) else [images]:
                pixel_values, image_placeholders, grid = self.preprocess_image(
                    image=image, **output_kwargs["images_kwargs"]
                )
                processed_images.append(pixel_values)
                image_placeholders_list.append(image_placeholders)
                grids.append(grid)

            # assign all processed images
            if processed_images:
                image_features["image_placeholders"] = image_placeholders_list

        # Process text input
        if text is not None:

            if not isinstance(text, list):
                text = [text]

            tokenized_batched_text = self._tokenize_with_image_symbol(text)
            image_token_id = self.get_token_value("image_token")
            replaced_ids_list = []
            idx = 0
            for ids_tensor in tokenized_batched_text:
                if image_token_id in ids_tensor and "image_placeholders" in image_features:
                    if idx < len(image_features["image_placeholders"]):
                        # Converts in list for ease of use
                        ids_list = ids_tensor.tolist()

                        new_ids = []

                        # replace placeholders
                        for i, token_id in enumerate(ids_list):
                            if token_id == image_token_id:
                                placeholder_ids = image_features["image_placeholders"][idx]
                                new_ids.extend(placeholder_ids)
                                idx += 1
                            else:
                                new_ids.append(token_id)

                        # Converts back to tensors
                        ids_tensor = torch.tensor(new_ids, dtype=torch.long)
                    else:
                        raise RuntimeError(
                            'Mismatch between the images you provided and the number of placeholder present in the text')

                replaced_ids_list.append(ids_tensor)

            if replaced_ids_list:
                replaced_and_tokenized_ids = torch.stack(replaced_ids_list)
            else:
                replaced_and_tokenized_ids = torch.tensor([], dtype=torch.long)

            # Create the output with text features
            output = BatchFeature(
                data={
                    "input_ids": replaced_and_tokenized_ids,
                }
            )

            # Add image features if present
            if image_features:
                output["pixel_values"] = processed_images
                output['grids'] = grids

            return output

        # If only images were provided
        return BatchFeature(data=image_features)

    def _tokenize_with_image_symbol(self, text_list: list[str]) -> torch.LongTensor:
        batch_token_ids = []
        for text in text_list:
            text_chunks = [self.tokenizer(chunk, add_special_tokens=False).input_ids for chunk in
                           text.split(self.image_token)]
            token_ids = []
            num_chuck = len(text_chunks)
            for i, chunk in enumerate(text_chunks):
                token_ids.extend(chunk)
                if i < num_chuck - 1:
                    token_ids.append(self.get_token_value("image_token"))
            batch_token_ids.append(token_ids)
        return torch.tensor(batch_token_ids, dtype=torch.long)

    def get_image_size(self):
        size = self.image_processor.size
        if 'shortest_edge' in size:
            width = height = size['shortest_edge']
        elif "height" in size and "width" in size:
            width = size['width']
            height = size['height']
        else:
            raise ValueError( "Can't parse image size from image_processor config.")
        return height, width

    def get_token_value(self, tok):
        return self.extra_special_tokens[tok]

    def construct_image_indicators(self, grid):
        image_placeholders = [self.get_token_value('image_start'),
                              self.get_token_value('image_atom'),
                              self.get_token_value('image_prefix')]
        if grid[0] * grid[1] > 1:
            for r in range(grid[0]):
                for c in range(grid[1]):
                    image_placeholders.append(self.get_token_value('image_atom') )
                    if c < grid[1] - 1:
                        image_placeholders.append(self.get_token_value('image_col_sep'))
                if r < grid[0] - 1:
                    image_placeholders.append(self.get_token_value('image_row_sep'))
        image_placeholders.append(self.get_token_value('image_end'))
        return image_placeholders

    def construct_image_placeholders(self, grid):

        image_placeholders = self.construct_image_indicators(grid)

        image_atom_token_id = self.get_token_value('image_atom')
        # Extract the padding token ID from tokenizer
        image_padding_token_id = self.get_token_value('image_pad')

        # Create a new list with padding tokens inserted
        padded_placeholder_tokens = []
        for token in image_placeholders:
            padded_placeholder_tokens.append(image_padding_token_id)
            if token == image_atom_token_id:
                padded_placeholder_tokens.extend([image_padding_token_id] * self.image_segment_len)
        return padded_placeholder_tokens

    def preprocess_image(self, image: PIL.Image.Image, max_partition, covering_threshold, convert_to_rgb, return_tensors):
        def _preprocess(img: PIL.Image.Image, side):
            # first resize and preprocess
            w, h = img.size
            if w == h:
                new_width = new_height = side
            elif w > h:
                new_width = side
                new_height = int(h / w * new_width)
            else:
                new_height = side
                new_width = int(w / h * new_height)
            new_size = dict(height=new_height, width=new_width)
            pixel_values = self.image_processor.preprocess(img, size=new_size, return_tensors=return_tensors)['pixel_values']

            # then pad to square
            square_values = torch.zeros([1, 3, side, side], dtype=pixel_values.dtype, device=pixel_values.device)
            new_height, new_width = pixel_values.shape[2:]
            if new_height == new_width:
                square_values[:, :, :, :] = pixel_values
            elif new_height > new_width:
                from_index = (side - new_width) // 2
                square_values[:, :, :, from_index:from_index + new_width] = pixel_values
            else:
                from_index = (side - new_height) // 2
                square_values[:, :, from_index:from_index + new_height, :] = pixel_values

            return square_values

        def _partition(img, grid) -> list[tuple[int, int, int, int]]:
            w, h = img.size
            row_height = h // grid[0]
            col_width = w // grid[1]

            partition = []
            for row in range(grid[0]):
                for col in range(grid[1]):
                    left = col * col_width
                    upper = row * row_height
                    right = w if col == grid[1] - 1 else (col + 1) * col_width
                    lower = h if row == grid[0] - 1 else (row + 1) * row_height
                    partition.append((left, upper, right, lower))

            return partition

        def _covering_area(left, upper, right, lower, side):
            w = right - left
            h = lower - upper
            w, h = max(w, h), min(w, h)
            if w > side:
                h = h / w * side
                w = side
            return w * h

        def _get_best_grid(img, side):
            img_area = img.size[0] * img.size[1]

            candidate_grids = []
            for i in range(1, max_partition + 1):
                for j in range(1, max_partition + 1):
                    if i * j <= max_partition:
                        candidate_grids.append((i, j))

            all_grids = []
            good_grids = []
            for grid in candidate_grids:
                partition = _partition(img, grid)
                covering_ratio = sum([_covering_area(*p, side) for p in partition]) / img_area
                assert covering_ratio <= 1.0
                all_grids.append((grid, covering_ratio))
                if covering_ratio > covering_threshold:
                    good_grids.append((grid, covering_ratio))

            if len(good_grids) > 0:
                # pick the good partition with minimum #sub_images and break the tie using covering_ratio
                return sorted(good_grids, key=lambda x: (x[0][0] * x[0][1], -x[1]))[0][0]
            else:
                # pick the partition with maximum covering_ratio and break the tie using #sub_images
                return sorted(all_grids, key=lambda x: (-x[1], x[0][0] * x[0][1]))[0][0]

        if convert_to_rgb:
            image = convert_image_mode(image, 'RGB')


        sides = self.get_image_size()
        if sides[0] != sides[1]:
            raise ValueError('get_image_size() returns non-square size')
        side = sides[0]
        grid = _get_best_grid(image, side)
        partition = _partition(image, grid)
        crops = [image.crop(p) for p in partition]
        if len(crops) > 1:
            crops.insert(0, image)
        pixel_values = torch.cat([_preprocess(crop, side) for crop in crops], dim=0)
        image_placeholders = self.construct_image_placeholders(grid)
        return pixel_values, image_placeholders, grid

    def batch_decode(self, *args, **kwargs):
        """
        This method forwards all its arguments to Qwen2TokenizerFast's [`~PreTrainedTokenizer.batch_decode`]. Please
        refer to the docstring of this method for more information.
        """
        return self.tokenizer.batch_decode(*args, **kwargs)

    def decode(self, *args, **kwargs):
        """
        This method forwards all its arguments to Qwen2TokenizerFast's [`~PreTrainedTokenizer.decode`]. Please refer to
        the docstring of this method for more information.
        """
        return self.tokenizer.decode(*args, **kwargs)

    def post_process_image_text_to_text(self, generated_outputs):
        """
        Post-process the output of the model to decode the text.
        Args:
            generated_outputs (`torch.Tensor` or `np.ndarray`):
                The output of the model `generate` function. The output is expected to be a tensor of shape `(batch_size, sequence_length)`
                or `(sequence_length,)`.
        Returns:
            `list[str]`: The decoded text.
        """
        return self.tokenizer.batch_decode(
            generated_outputs, skip_special_tokens=True, clean_up_tokenization_spaces=False
        )

    @property
    def model_input_names(self):
        tokenizer_input_names = self.tokenizer.model_input_names
        image_processor_input_names = self.image_processor.model_input_names
        names_from_processor = list(dict.fromkeys(tokenizer_input_names + image_processor_input_names))
        return names_from_processor + ["second_per_grid_ts"]

attributes class-attribute instance-attribute

attributes = ['image_processor', 'tokenizer']

extra_special_tokens cached property

extra_special_tokens

image_pad_token instance-attribute

image_pad_token = image_pad_token

image_processor_class class-attribute instance-attribute

image_processor_class = 'AutoImageProcessor'

image_segment_len instance-attribute

image_segment_len = image_segment_len

image_token instance-attribute

image_token = '<image>'

model_input_names property

model_input_names

tokenizer_class class-attribute instance-attribute

tokenizer_class = 'AutoTokenizer'

valid_kwargs class-attribute instance-attribute

valid_kwargs = [
    "chat_template",
    "image_pad_token",
    "image_segment_len",
]

__call__

__call__(
    images: ImageInput = None,
    text: Union[
        TextInput,
        PreTokenizedInput,
        list[TextInput],
        list[PreTokenizedInput],
    ] = None,
    **kwargs: Unpack[OvisProcessorKwargs],
) -> BatchFeature

Main method to prepare for the model one or several sequences(s) and image(s). This method forwards the text and kwargs arguments to Qwen2TokenizerFast's [~Qwen2TokenizerFast.__call__] if text is not None to encode the text. To prepare the vision inputs, this method forwards the vision_infos and kwrags arguments to Qwen2VLImageProcessor's [~Qwen2VLImageProcessor.__call__] if vision_infos is not None. Args: images (PIL.Image.Image, np.ndarray, torch.Tensor, list[PIL.Image.Image], list[np.ndarray], list[torch.Tensor]): The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch tensor. Both channels-first and channels-last formats are supported. text (str, list[str], list[list[str]]): The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings (pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set is_split_into_words=True (to lift the ambiguity with a batch of sequences). videos (np.ndarray, torch.Tensor, list[np.ndarray], list[torch.Tensor]): The image or batch of videos to be prepared. Each video can be a 4D NumPy array or PyTorch tensor, or a nested list of 3D frames. Both channels-first and channels-last formats are supported. return_tensors (str or [~utils.TensorType], optional): If set, will return tensors of a particular framework. Acceptable values are: - 'tf': Return TensorFlow tf.constant objects. - 'pt': Return PyTorch torch.Tensor objects. - 'np': Return NumPy np.ndarray objects. - 'jax': Return JAX jnp.ndarray objects. Returns: [BatchFeature]: A [BatchFeature] with the following fields: - input_ids -- List of token ids to be fed to a model. Returned when text is not None. - attention_mask -- List of indices specifying which tokens should be attended to by the model (when return_attention_mask=True or if "attention_mask" is in self.model_input_names and if text is not None). - pixel_values -- Pixel values to be fed to a model. Returned when images is not None. - pixel_values_videos -- Pixel values of videos to be fed to a model. Returned when videos is not None. - image_grid_thw -- List of image 3D grid in LLM. Returned when images is not None. - video_grid_thw -- List of video 3D grid in LLM. Returned when videos is not None. - second_per_grid_ts -- List of video seconds per time grid. Returned when videos is not None.

Source code in vllm/transformers_utils/processors/ovis.py
def __call__(
    self,
    images: ImageInput = None,
    text: Union[TextInput, PreTokenizedInput, list[TextInput], list[PreTokenizedInput]] = None,
    **kwargs: Unpack[OvisProcessorKwargs],
) -> BatchFeature:
    """
    Main method to prepare for the model one or several sequences(s) and image(s). This method forwards the `text`
    and `kwargs` arguments to Qwen2TokenizerFast's [`~Qwen2TokenizerFast.__call__`] if `text` is not `None` to encode
    the text. To prepare the vision inputs, this method forwards the `vision_infos` and `kwrags` arguments to
    Qwen2VLImageProcessor's [`~Qwen2VLImageProcessor.__call__`] if `vision_infos` is not `None`.
        Args:
            images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `list[PIL.Image.Image]`, `list[np.ndarray]`, `list[torch.Tensor]`):
                The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch
                tensor. Both channels-first and channels-last formats are supported.
            text (`str`, `list[str]`, `list[list[str]]`):
                The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings
                (pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set
                `is_split_into_words=True` (to lift the ambiguity with a batch of sequences).
            videos (`np.ndarray`, `torch.Tensor`, `list[np.ndarray]`, `list[torch.Tensor]`):
                The image or batch of videos to be prepared. Each video can be a 4D NumPy array or PyTorch
                tensor, or a nested list of 3D frames. Both channels-first and channels-last formats are supported.
            return_tensors (`str` or [`~utils.TensorType`], *optional*):
                If set, will return tensors of a particular framework. Acceptable values are:
                - `'tf'`: Return TensorFlow `tf.constant` objects.
                - `'pt'`: Return PyTorch `torch.Tensor` objects.
                - `'np'`: Return NumPy `np.ndarray` objects.
                - `'jax'`: Return JAX `jnp.ndarray` objects.
        Returns:
            [`BatchFeature`]: A [`BatchFeature`] with the following fields:
            - **input_ids** -- List of token ids to be fed to a model. Returned when `text` is not `None`.
            - **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when
              `return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not
              `None`).
            - **pixel_values** -- Pixel values to be fed to a model. Returned when `images` is not `None`.
            - **pixel_values_videos** -- Pixel values of videos to be fed to a model. Returned when `videos` is not `None`.
            - **image_grid_thw** -- List of image 3D grid in LLM. Returned when `images` is not `None`.
            - **video_grid_thw** -- List of video 3D grid in LLM. Returned when `videos` is not `None`.
            - **second_per_grid_ts** -- List of video seconds per time grid. Returned when `videos` is not `None`.
    """
    output_kwargs = self._merge_kwargs(
        OvisProcessorKwargs,
        tokenizer_init_kwargs=self.tokenizer.init_kwargs,
        **kwargs,
    )

    # Process all images first
    image_features = {}
    if images is not None:
        processed_images = []
        image_placeholders_list = []
        grids = []

        # Process each image
        for image in images if isinstance(images, list) else [images]:
            pixel_values, image_placeholders, grid = self.preprocess_image(
                image=image, **output_kwargs["images_kwargs"]
            )
            processed_images.append(pixel_values)
            image_placeholders_list.append(image_placeholders)
            grids.append(grid)

        # assign all processed images
        if processed_images:
            image_features["image_placeholders"] = image_placeholders_list

    # Process text input
    if text is not None:

        if not isinstance(text, list):
            text = [text]

        tokenized_batched_text = self._tokenize_with_image_symbol(text)
        image_token_id = self.get_token_value("image_token")
        replaced_ids_list = []
        idx = 0
        for ids_tensor in tokenized_batched_text:
            if image_token_id in ids_tensor and "image_placeholders" in image_features:
                if idx < len(image_features["image_placeholders"]):
                    # Converts in list for ease of use
                    ids_list = ids_tensor.tolist()

                    new_ids = []

                    # replace placeholders
                    for i, token_id in enumerate(ids_list):
                        if token_id == image_token_id:
                            placeholder_ids = image_features["image_placeholders"][idx]
                            new_ids.extend(placeholder_ids)
                            idx += 1
                        else:
                            new_ids.append(token_id)

                    # Converts back to tensors
                    ids_tensor = torch.tensor(new_ids, dtype=torch.long)
                else:
                    raise RuntimeError(
                        'Mismatch between the images you provided and the number of placeholder present in the text')

            replaced_ids_list.append(ids_tensor)

        if replaced_ids_list:
            replaced_and_tokenized_ids = torch.stack(replaced_ids_list)
        else:
            replaced_and_tokenized_ids = torch.tensor([], dtype=torch.long)

        # Create the output with text features
        output = BatchFeature(
            data={
                "input_ids": replaced_and_tokenized_ids,
            }
        )

        # Add image features if present
        if image_features:
            output["pixel_values"] = processed_images
            output['grids'] = grids

        return output

    # If only images were provided
    return BatchFeature(data=image_features)

__init__

__init__(
    image_processor=None,
    tokenizer=None,
    chat_template=None,
    image_pad_token=None,
    image_segment_len=255,
    **kwargs,
)
Source code in vllm/transformers_utils/processors/ovis.py
def __init__(
    self,
    image_processor=None,
    tokenizer=None,
    chat_template=None,
    image_pad_token=None,
    image_segment_len=255,
    **kwargs,
):
    self.image_token = "<image>"
    self.image_pad_token = image_pad_token
    self.image_segment_len = image_segment_len
    super().__init__(image_processor, tokenizer, chat_template=chat_template)

_tokenize_with_image_symbol

_tokenize_with_image_symbol(
    text_list: list[str],
) -> LongTensor
Source code in vllm/transformers_utils/processors/ovis.py
def _tokenize_with_image_symbol(self, text_list: list[str]) -> torch.LongTensor:
    batch_token_ids = []
    for text in text_list:
        text_chunks = [self.tokenizer(chunk, add_special_tokens=False).input_ids for chunk in
                       text.split(self.image_token)]
        token_ids = []
        num_chuck = len(text_chunks)
        for i, chunk in enumerate(text_chunks):
            token_ids.extend(chunk)
            if i < num_chuck - 1:
                token_ids.append(self.get_token_value("image_token"))
        batch_token_ids.append(token_ids)
    return torch.tensor(batch_token_ids, dtype=torch.long)

batch_decode

batch_decode(*args, **kwargs)

This method forwards all its arguments to Qwen2TokenizerFast's [~PreTrainedTokenizer.batch_decode]. Please refer to the docstring of this method for more information.

Source code in vllm/transformers_utils/processors/ovis.py
def batch_decode(self, *args, **kwargs):
    """
    This method forwards all its arguments to Qwen2TokenizerFast's [`~PreTrainedTokenizer.batch_decode`]. Please
    refer to the docstring of this method for more information.
    """
    return self.tokenizer.batch_decode(*args, **kwargs)

construct_image_indicators

construct_image_indicators(grid)
Source code in vllm/transformers_utils/processors/ovis.py
def construct_image_indicators(self, grid):
    image_placeholders = [self.get_token_value('image_start'),
                          self.get_token_value('image_atom'),
                          self.get_token_value('image_prefix')]
    if grid[0] * grid[1] > 1:
        for r in range(grid[0]):
            for c in range(grid[1]):
                image_placeholders.append(self.get_token_value('image_atom') )
                if c < grid[1] - 1:
                    image_placeholders.append(self.get_token_value('image_col_sep'))
            if r < grid[0] - 1:
                image_placeholders.append(self.get_token_value('image_row_sep'))
    image_placeholders.append(self.get_token_value('image_end'))
    return image_placeholders

construct_image_placeholders

construct_image_placeholders(grid)
Source code in vllm/transformers_utils/processors/ovis.py
def construct_image_placeholders(self, grid):

    image_placeholders = self.construct_image_indicators(grid)

    image_atom_token_id = self.get_token_value('image_atom')
    # Extract the padding token ID from tokenizer
    image_padding_token_id = self.get_token_value('image_pad')

    # Create a new list with padding tokens inserted
    padded_placeholder_tokens = []
    for token in image_placeholders:
        padded_placeholder_tokens.append(image_padding_token_id)
        if token == image_atom_token_id:
            padded_placeholder_tokens.extend([image_padding_token_id] * self.image_segment_len)
    return padded_placeholder_tokens

decode

decode(*args, **kwargs)

This method forwards all its arguments to Qwen2TokenizerFast's [~PreTrainedTokenizer.decode]. Please refer to the docstring of this method for more information.

Source code in vllm/transformers_utils/processors/ovis.py
def decode(self, *args, **kwargs):
    """
    This method forwards all its arguments to Qwen2TokenizerFast's [`~PreTrainedTokenizer.decode`]. Please refer to
    the docstring of this method for more information.
    """
    return self.tokenizer.decode(*args, **kwargs)

get_image_size

get_image_size()
Source code in vllm/transformers_utils/processors/ovis.py
def get_image_size(self):
    size = self.image_processor.size
    if 'shortest_edge' in size:
        width = height = size['shortest_edge']
    elif "height" in size and "width" in size:
        width = size['width']
        height = size['height']
    else:
        raise ValueError( "Can't parse image size from image_processor config.")
    return height, width

get_token_value

get_token_value(tok)
Source code in vllm/transformers_utils/processors/ovis.py
def get_token_value(self, tok):
    return self.extra_special_tokens[tok]

post_process_image_text_to_text

post_process_image_text_to_text(generated_outputs)

Post-process the output of the model to decode the text. Args: generated_outputs (torch.Tensor or np.ndarray): The output of the model generate function. The output is expected to be a tensor of shape (batch_size, sequence_length) or (sequence_length,). Returns: list[str]: The decoded text.

Source code in vllm/transformers_utils/processors/ovis.py
def post_process_image_text_to_text(self, generated_outputs):
    """
    Post-process the output of the model to decode the text.
    Args:
        generated_outputs (`torch.Tensor` or `np.ndarray`):
            The output of the model `generate` function. The output is expected to be a tensor of shape `(batch_size, sequence_length)`
            or `(sequence_length,)`.
    Returns:
        `list[str]`: The decoded text.
    """
    return self.tokenizer.batch_decode(
        generated_outputs, skip_special_tokens=True, clean_up_tokenization_spaces=False
    )

preprocess_image

preprocess_image(
    image: Image,
    max_partition,
    covering_threshold,
    convert_to_rgb,
    return_tensors,
)
Source code in vllm/transformers_utils/processors/ovis.py
def preprocess_image(self, image: PIL.Image.Image, max_partition, covering_threshold, convert_to_rgb, return_tensors):
    def _preprocess(img: PIL.Image.Image, side):
        # first resize and preprocess
        w, h = img.size
        if w == h:
            new_width = new_height = side
        elif w > h:
            new_width = side
            new_height = int(h / w * new_width)
        else:
            new_height = side
            new_width = int(w / h * new_height)
        new_size = dict(height=new_height, width=new_width)
        pixel_values = self.image_processor.preprocess(img, size=new_size, return_tensors=return_tensors)['pixel_values']

        # then pad to square
        square_values = torch.zeros([1, 3, side, side], dtype=pixel_values.dtype, device=pixel_values.device)
        new_height, new_width = pixel_values.shape[2:]
        if new_height == new_width:
            square_values[:, :, :, :] = pixel_values
        elif new_height > new_width:
            from_index = (side - new_width) // 2
            square_values[:, :, :, from_index:from_index + new_width] = pixel_values
        else:
            from_index = (side - new_height) // 2
            square_values[:, :, from_index:from_index + new_height, :] = pixel_values

        return square_values

    def _partition(img, grid) -> list[tuple[int, int, int, int]]:
        w, h = img.size
        row_height = h // grid[0]
        col_width = w // grid[1]

        partition = []
        for row in range(grid[0]):
            for col in range(grid[1]):
                left = col * col_width
                upper = row * row_height
                right = w if col == grid[1] - 1 else (col + 1) * col_width
                lower = h if row == grid[0] - 1 else (row + 1) * row_height
                partition.append((left, upper, right, lower))

        return partition

    def _covering_area(left, upper, right, lower, side):
        w = right - left
        h = lower - upper
        w, h = max(w, h), min(w, h)
        if w > side:
            h = h / w * side
            w = side
        return w * h

    def _get_best_grid(img, side):
        img_area = img.size[0] * img.size[1]

        candidate_grids = []
        for i in range(1, max_partition + 1):
            for j in range(1, max_partition + 1):
                if i * j <= max_partition:
                    candidate_grids.append((i, j))

        all_grids = []
        good_grids = []
        for grid in candidate_grids:
            partition = _partition(img, grid)
            covering_ratio = sum([_covering_area(*p, side) for p in partition]) / img_area
            assert covering_ratio <= 1.0
            all_grids.append((grid, covering_ratio))
            if covering_ratio > covering_threshold:
                good_grids.append((grid, covering_ratio))

        if len(good_grids) > 0:
            # pick the good partition with minimum #sub_images and break the tie using covering_ratio
            return sorted(good_grids, key=lambda x: (x[0][0] * x[0][1], -x[1]))[0][0]
        else:
            # pick the partition with maximum covering_ratio and break the tie using #sub_images
            return sorted(all_grids, key=lambda x: (-x[1], x[0][0] * x[0][1]))[0][0]

    if convert_to_rgb:
        image = convert_image_mode(image, 'RGB')


    sides = self.get_image_size()
    if sides[0] != sides[1]:
        raise ValueError('get_image_size() returns non-square size')
    side = sides[0]
    grid = _get_best_grid(image, side)
    partition = _partition(image, grid)
    crops = [image.crop(p) for p in partition]
    if len(crops) > 1:
        crops.insert(0, image)
    pixel_values = torch.cat([_preprocess(crop, side) for crop in crops], dim=0)
    image_placeholders = self.construct_image_placeholders(grid)
    return pixel_values, image_placeholders, grid