Skip to content

vllm.model_executor.models.nemotron_vl

IMG_CONTEXT module-attribute

IMG_CONTEXT = '<image>'

IMG_END module-attribute

IMG_END = '</img>'

IMG_START module-attribute

IMG_START = '<img>'

LlamaNemotronVLChatModel

Bases: Module, SupportsMultiModal, SupportsPP, SupportsLoRA

Source code in vllm/model_executor/models/nemotron_vl.py
@MULTIMODAL_REGISTRY.register_processor(
    BaseInternVLMultiModalProcessor[NemotronVLProcessingInfo],
    info=NemotronVLProcessingInfo,
    dummy_inputs=BaseInternVLDummyInputsBuilder[NemotronVLProcessingInfo])
class LlamaNemotronVLChatModel(nn.Module, SupportsMultiModal, SupportsPP,
                               SupportsLoRA):

    @classmethod
    def get_placeholder_str(cls, modality: str, i: int) -> Optional[str]:
        if modality.startswith("image"):
            return "<image>"

        raise ValueError("Only image modality is supported")

    def __init__(self, *, vllm_config: VllmConfig, prefix: str = "") -> None:
        super().__init__()

        config = vllm_config.model_config.hf_config
        quant_config = vllm_config.quant_config
        multimodal_config = vllm_config.model_config.multimodal_config

        self.config = config
        self.multimodal_config = multimodal_config
        self._patch_quant_config(config, quant_config)

        image_size = config.force_image_size or config.vision_config.image_size
        patch_size = config.vision_config.patch_size
        self.patch_size = patch_size
        self.num_image_token = int(
            (image_size // patch_size)**2 * (config.downsample_ratio**2))
        self.downsample_ratio = config.downsample_ratio
        self.ps_version = config.ps_version

        self.llm_arch_name = config.text_config.architectures[0]
        self.vision_model = self._init_vision_model(
            config,
            quant_config=quant_config,
            prefix=maybe_prefix(prefix, "vision_model"),
        )

        self.language_model = init_vllm_registered_model(
            vllm_config=vllm_config,
            hf_config=config.text_config,
            prefix=maybe_prefix(prefix, "language_model"),
        )

        self.mlp1 = self._init_mlp1(config)

        self.img_context_token_id = None

        self.visual_token_mask = None
        self.make_empty_intermediate_tensors = (
            self.language_model.make_empty_intermediate_tensors)

    def _patch_quant_config(self, config: PretrainedConfig,
                            quant_config: QuantizationConfig):
        # the awq models from OpenGVLab missing `modules_to_not_convert`
        # patch the quant_config to add `modules_to_not_convert` back
        if isinstance(quant_config, AWQConfig):
            text_config = config.text_config
            llm_quant_config = getattr(text_config, "quantization_config",
                                       None)
            if (not quant_config.modules_to_not_convert) and \
                (llm_quant_config is not None):
                quant_config.modules_to_not_convert.append("vision_model")

    def _init_vision_model(
        self,
        config: PretrainedConfig,
        quant_config: Optional[QuantizationConfig],
        *,
        prefix: str,
    ):
        return AutoModel.from_config(config.vision_config,
                                     trust_remote_code=True)

    def _init_mlp1(self, config: PretrainedConfig) -> nn.Sequential:
        vit_hidden_size = config.vit_hidden_size
        vision_projection_hidden_size = config.projector_hidden_size
        llm_hidden_size = config.text_config.hidden_size

        return nn.Sequential(
            nn.LayerNorm(vit_hidden_size * int(1 / self.downsample_ratio)**2,
                         bias=True),
            nn.Linear(vit_hidden_size * int(1 / self.downsample_ratio)**2,
                      vision_projection_hidden_size,
                      bias=True),
            nn.GELU(),
            nn.Linear(vision_projection_hidden_size, llm_hidden_size),
        )

    def pixel_shuffle(self, x, scale_factor=0.5):
        n, w, h, c = x.size()
        # N, W, H, C --> N, W, H * scale, C // scale
        x = x.view(n, w, int(h * scale_factor), int(c / scale_factor))
        # N, W, H * scale, C // scale --> N, H * scale, W, C // scale
        x = x.permute(0, 2, 1, 3).contiguous()
        x = x.view(n, int(h * scale_factor), int(w * scale_factor),
                   int(c / (scale_factor * scale_factor)))
        if self.ps_version == 'v1':
            pass
        else:
            x = x.permute(0, 2, 1, 3).contiguous()
        return x

    def extract_feature(self, pixel_values: torch.Tensor) -> torch.Tensor:
        # https://huggingface.co/nvidia/Llama-3.1-Nemotron-Nano-VL-8B-V1/blob/main/modeling.py#L177
        vit_embeds = self.vision_model(x=pixel_values).features
        vit_embeds = vit_embeds.to(dtype=torch.bfloat16)

        h = w = int(vit_embeds.shape[1]**0.5)
        vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], h, w, -1)
        vit_embeds = self.pixel_shuffle(vit_embeds,
                                        scale_factor=self.downsample_ratio)
        vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], -1,
                                        vit_embeds.shape[-1])
        vit_embeds = self.mlp1(vit_embeds)
        return vit_embeds

    def _parse_and_validate_image_input(
            self, **kwargs: object) -> Optional[InternVLImageInputs]:
        pixel_values_flat = kwargs.pop("pixel_values_flat", None)
        image_num_patches = kwargs.pop("image_num_patches", None)
        image_embeds = kwargs.pop("image_embeds", None)

        if pixel_values_flat is None and image_embeds is None:
            return None

        if image_embeds is not None:
            if not isinstance(image_embeds, (torch.Tensor, list)):
                raise ValueError("Incorrect type of image embeddings. "
                                 f"Got type: {type(image_embeds)}")

            return InternVLImageEmbeddingInputs(
                type="image_embeds",
                data=flatten_bn(image_embeds),
            )

        image_token_id = kwargs["image_token_id"]
        assert isinstance(image_token_id, torch.Tensor)
        self.img_context_token_id = image_token_id.flatten().unique().item()

        if pixel_values_flat is not None:
            if not isinstance(pixel_values_flat, (torch.Tensor, list)):
                raise ValueError("Incorrect type of pixel values. "
                                 f"Got type: {type(pixel_values_flat)}")

            if not isinstance(image_num_patches, (torch.Tensor, list)):
                raise ValueError("Incorrect type of image_num_patches. "
                                 f"Got type: {type(image_num_patches)}")

            pixel_values_flat = flatten_bn(pixel_values_flat, concat=True)
            image_num_patches = flatten_bn(image_num_patches, concat=True)

            return InternVLImagePixelInputs(
                type="pixel_values",
                pixel_values_flat=pixel_values_flat,
                num_patches=image_num_patches,
                resolve_bindings={
                    "h": self.config.force_image_size,
                    "w": self.config.force_image_size
                },
            )

        raise AssertionError("This line should be unreachable.")

    def _process_image_input(
        self,
        image_input: InternVLImageInputs,
    ) -> tuple[torch.Tensor, ...]:
        if image_input["type"] == "image_embeds":
            return image_input["data"]

        assert self.vision_model is not None

        image_embeds = self.extract_feature(image_input["pixel_values_flat"])

        num_patches = image_input["num_patches"]

        # Only one image in the current batch
        if len(num_patches) == 1:
            return (image_embeds.view(-1,
                                      self.config.text_config.hidden_size), )

        # NOTE: Image embeddings are split into separate tensors for each image
        # by the size of each embedding.
        feature_size = image_embeds.shape[1]
        image_embeds = image_embeds.view(-1,
                                         self.config.text_config.hidden_size)
        image_feature_sizes = [
            num_patches * feature_size for num_patches in num_patches
        ]
        return image_embeds.split(image_feature_sizes)

    def _parse_and_validate_multimodal_inputs(self, **kwargs: object) -> dict:
        modalities = {}

        # Preserve the order of modalities if there are multiple of them
        # from the order of kwargs.
        for input_key in kwargs:
            if input_key in ("pixel_values_flat",
                             "image_embeds") and "images" not in modalities:
                modalities["images"] = self._parse_and_validate_image_input(
                    **kwargs)

        return modalities

    def _set_visual_token_mask(self, input_ids: torch.Tensor) -> None:
        self.visual_token_mask = None

    def get_language_model(self) -> torch.nn.Module:
        return self.language_model

    def get_multimodal_embeddings(self,
                                  **kwargs: object) -> MultiModalEmbeddings:

        modalities = self._parse_and_validate_multimodal_inputs(**kwargs)
        if not modalities:
            return []

        # The result multimodal_embeddings is tuple of tensors, with each
        # tensor correspoending to a multimodal data item (image).
        multimodal_embeddings: tuple[torch.Tensor, ...] = ()

        # NOTE: It is important to iterate over the keys in this dictionary
        # to preserve the order of the modalities.
        for modality in modalities:
            if modality == "images":
                image_input = modalities["images"]
                vision_embeddings = self._process_image_input(image_input)
                multimodal_embeddings += vision_embeddings

        return multimodal_embeddings

    def get_input_embeddings(
        self,
        input_ids: torch.Tensor,
        multimodal_embeddings: Optional[MultiModalEmbeddings] = None,
    ) -> torch.Tensor:
        inputs_embeds = self.language_model.get_input_embeddings(input_ids)
        if multimodal_embeddings is not None \
            and len(multimodal_embeddings) != 0:
            context_token_ids = [self.img_context_token_id]
            assert len(context_token_ids) >= 1
            self._set_visual_token_mask(input_ids)
            inputs_embeds = merge_multimodal_embeddings(
                input_ids,
                inputs_embeds,
                multimodal_embeddings,
                context_token_ids,
            )
        return inputs_embeds

    def forward(
        self,
        input_ids: torch.Tensor,
        positions: torch.Tensor,
        intermediate_tensors: Optional[IntermediateTensors] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        **kwargs: object,
    ) -> IntermediateTensors:

        if intermediate_tensors is not None:
            input_ids = None
            inputs_embeds = None

        # NOTE: In v1, inputs_embeds is always generated at model runner, this
        # condition is for v0 compatibility.
        elif inputs_embeds is None:
            vision_embeddings = self.get_multimodal_embeddings(**kwargs)
            inputs_embeds = self.get_input_embeddings(input_ids,
                                                      vision_embeddings)
            input_ids = None

        forward_kwargs = {
            "input_ids": input_ids,
            "positions": positions,
            "intermediate_tensors": intermediate_tensors,
            "inputs_embeds": inputs_embeds,
        }

        # Only required if the model is mono-architecture
        if self.visual_token_mask is not None:
            forward_kwargs.update(
                {"visual_token_mask": self.visual_token_mask})
            self.visual_token_mask = None

        hidden_states = self.language_model.model(**forward_kwargs)
        return hidden_states

    def compute_logits(
        self,
        hidden_states: torch.Tensor,
        sampling_metadata: SamplingMetadata,
    ) -> Optional[torch.Tensor]:
        return self.language_model.compute_logits(hidden_states,
                                                  sampling_metadata)

    def load_weights(self, weights: Iterable[tuple[str,
                                                   torch.Tensor]]) -> set[str]:
        ## Ignore registered_buffers
        ## see https://huggingface.co/nvidia/C-RADIOv2-H/blob/main/input_conditioner.py#L28 # noqa: E501
        skip_substrs = ["norm_mean", "norm_std"]
        loader = AutoWeightsLoader(self, skip_substrs=skip_substrs)
        return loader.load_weights(weights)

    def get_mm_mapping(self) -> MultiModelKeys:
        """
        Get the module prefix in multimodal models
        """
        return MultiModelKeys.from_string_field(
            language_model="language_model",
            connector="mlp1",
            tower_model="vision_model")

config instance-attribute

config = config

downsample_ratio instance-attribute

downsample_ratio = downsample_ratio

img_context_token_id instance-attribute

img_context_token_id = None

language_model instance-attribute

language_model = init_vllm_registered_model(
    vllm_config=vllm_config,
    hf_config=text_config,
    prefix=maybe_prefix(prefix, "language_model"),
)

llm_arch_name instance-attribute

llm_arch_name = architectures[0]

make_empty_intermediate_tensors instance-attribute

make_empty_intermediate_tensors = (
    make_empty_intermediate_tensors
)

mlp1 instance-attribute

mlp1 = _init_mlp1(config)

multimodal_config instance-attribute

multimodal_config = multimodal_config

num_image_token instance-attribute

num_image_token = int(
    (image_size // patch_size) ** 2 * downsample_ratio**2
)

patch_size instance-attribute

patch_size = patch_size

ps_version instance-attribute

ps_version = ps_version

vision_model instance-attribute

vision_model = _init_vision_model(
    config,
    quant_config=quant_config,
    prefix=maybe_prefix(prefix, "vision_model"),
)

visual_token_mask instance-attribute

visual_token_mask = None

__init__

__init__(
    *, vllm_config: VllmConfig, prefix: str = ""
) -> None
Source code in vllm/model_executor/models/nemotron_vl.py
def __init__(self, *, vllm_config: VllmConfig, prefix: str = "") -> None:
    super().__init__()

    config = vllm_config.model_config.hf_config
    quant_config = vllm_config.quant_config
    multimodal_config = vllm_config.model_config.multimodal_config

    self.config = config
    self.multimodal_config = multimodal_config
    self._patch_quant_config(config, quant_config)

    image_size = config.force_image_size or config.vision_config.image_size
    patch_size = config.vision_config.patch_size
    self.patch_size = patch_size
    self.num_image_token = int(
        (image_size // patch_size)**2 * (config.downsample_ratio**2))
    self.downsample_ratio = config.downsample_ratio
    self.ps_version = config.ps_version

    self.llm_arch_name = config.text_config.architectures[0]
    self.vision_model = self._init_vision_model(
        config,
        quant_config=quant_config,
        prefix=maybe_prefix(prefix, "vision_model"),
    )

    self.language_model = init_vllm_registered_model(
        vllm_config=vllm_config,
        hf_config=config.text_config,
        prefix=maybe_prefix(prefix, "language_model"),
    )

    self.mlp1 = self._init_mlp1(config)

    self.img_context_token_id = None

    self.visual_token_mask = None
    self.make_empty_intermediate_tensors = (
        self.language_model.make_empty_intermediate_tensors)

_init_mlp1

_init_mlp1(config: PretrainedConfig) -> Sequential
Source code in vllm/model_executor/models/nemotron_vl.py
def _init_mlp1(self, config: PretrainedConfig) -> nn.Sequential:
    vit_hidden_size = config.vit_hidden_size
    vision_projection_hidden_size = config.projector_hidden_size
    llm_hidden_size = config.text_config.hidden_size

    return nn.Sequential(
        nn.LayerNorm(vit_hidden_size * int(1 / self.downsample_ratio)**2,
                     bias=True),
        nn.Linear(vit_hidden_size * int(1 / self.downsample_ratio)**2,
                  vision_projection_hidden_size,
                  bias=True),
        nn.GELU(),
        nn.Linear(vision_projection_hidden_size, llm_hidden_size),
    )

_init_vision_model

_init_vision_model(
    config: PretrainedConfig,
    quant_config: Optional[QuantizationConfig],
    *,
    prefix: str,
)
Source code in vllm/model_executor/models/nemotron_vl.py
def _init_vision_model(
    self,
    config: PretrainedConfig,
    quant_config: Optional[QuantizationConfig],
    *,
    prefix: str,
):
    return AutoModel.from_config(config.vision_config,
                                 trust_remote_code=True)

_parse_and_validate_image_input

_parse_and_validate_image_input(
    **kwargs: object,
) -> Optional[InternVLImageInputs]
Source code in vllm/model_executor/models/nemotron_vl.py
def _parse_and_validate_image_input(
        self, **kwargs: object) -> Optional[InternVLImageInputs]:
    pixel_values_flat = kwargs.pop("pixel_values_flat", None)
    image_num_patches = kwargs.pop("image_num_patches", None)
    image_embeds = kwargs.pop("image_embeds", None)

    if pixel_values_flat is None and image_embeds is None:
        return None

    if image_embeds is not None:
        if not isinstance(image_embeds, (torch.Tensor, list)):
            raise ValueError("Incorrect type of image embeddings. "
                             f"Got type: {type(image_embeds)}")

        return InternVLImageEmbeddingInputs(
            type="image_embeds",
            data=flatten_bn(image_embeds),
        )

    image_token_id = kwargs["image_token_id"]
    assert isinstance(image_token_id, torch.Tensor)
    self.img_context_token_id = image_token_id.flatten().unique().item()

    if pixel_values_flat is not None:
        if not isinstance(pixel_values_flat, (torch.Tensor, list)):
            raise ValueError("Incorrect type of pixel values. "
                             f"Got type: {type(pixel_values_flat)}")

        if not isinstance(image_num_patches, (torch.Tensor, list)):
            raise ValueError("Incorrect type of image_num_patches. "
                             f"Got type: {type(image_num_patches)}")

        pixel_values_flat = flatten_bn(pixel_values_flat, concat=True)
        image_num_patches = flatten_bn(image_num_patches, concat=True)

        return InternVLImagePixelInputs(
            type="pixel_values",
            pixel_values_flat=pixel_values_flat,
            num_patches=image_num_patches,
            resolve_bindings={
                "h": self.config.force_image_size,
                "w": self.config.force_image_size
            },
        )

    raise AssertionError("This line should be unreachable.")

_parse_and_validate_multimodal_inputs

_parse_and_validate_multimodal_inputs(
    **kwargs: object,
) -> dict
Source code in vllm/model_executor/models/nemotron_vl.py
def _parse_and_validate_multimodal_inputs(self, **kwargs: object) -> dict:
    modalities = {}

    # Preserve the order of modalities if there are multiple of them
    # from the order of kwargs.
    for input_key in kwargs:
        if input_key in ("pixel_values_flat",
                         "image_embeds") and "images" not in modalities:
            modalities["images"] = self._parse_and_validate_image_input(
                **kwargs)

    return modalities

_patch_quant_config

_patch_quant_config(
    config: PretrainedConfig,
    quant_config: QuantizationConfig,
)
Source code in vllm/model_executor/models/nemotron_vl.py
def _patch_quant_config(self, config: PretrainedConfig,
                        quant_config: QuantizationConfig):
    # the awq models from OpenGVLab missing `modules_to_not_convert`
    # patch the quant_config to add `modules_to_not_convert` back
    if isinstance(quant_config, AWQConfig):
        text_config = config.text_config
        llm_quant_config = getattr(text_config, "quantization_config",
                                   None)
        if (not quant_config.modules_to_not_convert) and \
            (llm_quant_config is not None):
            quant_config.modules_to_not_convert.append("vision_model")

_process_image_input

_process_image_input(
    image_input: InternVLImageInputs,
) -> tuple[Tensor, ...]
Source code in vllm/model_executor/models/nemotron_vl.py
def _process_image_input(
    self,
    image_input: InternVLImageInputs,
) -> tuple[torch.Tensor, ...]:
    if image_input["type"] == "image_embeds":
        return image_input["data"]

    assert self.vision_model is not None

    image_embeds = self.extract_feature(image_input["pixel_values_flat"])

    num_patches = image_input["num_patches"]

    # Only one image in the current batch
    if len(num_patches) == 1:
        return (image_embeds.view(-1,
                                  self.config.text_config.hidden_size), )

    # NOTE: Image embeddings are split into separate tensors for each image
    # by the size of each embedding.
    feature_size = image_embeds.shape[1]
    image_embeds = image_embeds.view(-1,
                                     self.config.text_config.hidden_size)
    image_feature_sizes = [
        num_patches * feature_size for num_patches in num_patches
    ]
    return image_embeds.split(image_feature_sizes)

_set_visual_token_mask

_set_visual_token_mask(input_ids: Tensor) -> None
Source code in vllm/model_executor/models/nemotron_vl.py
def _set_visual_token_mask(self, input_ids: torch.Tensor) -> None:
    self.visual_token_mask = None

compute_logits

compute_logits(
    hidden_states: Tensor,
    sampling_metadata: SamplingMetadata,
) -> Optional[Tensor]
Source code in vllm/model_executor/models/nemotron_vl.py
def compute_logits(
    self,
    hidden_states: torch.Tensor,
    sampling_metadata: SamplingMetadata,
) -> Optional[torch.Tensor]:
    return self.language_model.compute_logits(hidden_states,
                                              sampling_metadata)

extract_feature

extract_feature(pixel_values: Tensor) -> Tensor
Source code in vllm/model_executor/models/nemotron_vl.py
def extract_feature(self, pixel_values: torch.Tensor) -> torch.Tensor:
    # https://huggingface.co/nvidia/Llama-3.1-Nemotron-Nano-VL-8B-V1/blob/main/modeling.py#L177
    vit_embeds = self.vision_model(x=pixel_values).features
    vit_embeds = vit_embeds.to(dtype=torch.bfloat16)

    h = w = int(vit_embeds.shape[1]**0.5)
    vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], h, w, -1)
    vit_embeds = self.pixel_shuffle(vit_embeds,
                                    scale_factor=self.downsample_ratio)
    vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], -1,
                                    vit_embeds.shape[-1])
    vit_embeds = self.mlp1(vit_embeds)
    return vit_embeds

forward

forward(
    input_ids: Tensor,
    positions: Tensor,
    intermediate_tensors: Optional[
        IntermediateTensors
    ] = None,
    inputs_embeds: Optional[Tensor] = None,
    **kwargs: object,
) -> IntermediateTensors
Source code in vllm/model_executor/models/nemotron_vl.py
def forward(
    self,
    input_ids: torch.Tensor,
    positions: torch.Tensor,
    intermediate_tensors: Optional[IntermediateTensors] = None,
    inputs_embeds: Optional[torch.Tensor] = None,
    **kwargs: object,
) -> IntermediateTensors:

    if intermediate_tensors is not None:
        input_ids = None
        inputs_embeds = None

    # NOTE: In v1, inputs_embeds is always generated at model runner, this
    # condition is for v0 compatibility.
    elif inputs_embeds is None:
        vision_embeddings = self.get_multimodal_embeddings(**kwargs)
        inputs_embeds = self.get_input_embeddings(input_ids,
                                                  vision_embeddings)
        input_ids = None

    forward_kwargs = {
        "input_ids": input_ids,
        "positions": positions,
        "intermediate_tensors": intermediate_tensors,
        "inputs_embeds": inputs_embeds,
    }

    # Only required if the model is mono-architecture
    if self.visual_token_mask is not None:
        forward_kwargs.update(
            {"visual_token_mask": self.visual_token_mask})
        self.visual_token_mask = None

    hidden_states = self.language_model.model(**forward_kwargs)
    return hidden_states

get_input_embeddings

get_input_embeddings(
    input_ids: Tensor,
    multimodal_embeddings: Optional[
        MultiModalEmbeddings
    ] = None,
) -> Tensor
Source code in vllm/model_executor/models/nemotron_vl.py
def get_input_embeddings(
    self,
    input_ids: torch.Tensor,
    multimodal_embeddings: Optional[MultiModalEmbeddings] = None,
) -> torch.Tensor:
    inputs_embeds = self.language_model.get_input_embeddings(input_ids)
    if multimodal_embeddings is not None \
        and len(multimodal_embeddings) != 0:
        context_token_ids = [self.img_context_token_id]
        assert len(context_token_ids) >= 1
        self._set_visual_token_mask(input_ids)
        inputs_embeds = merge_multimodal_embeddings(
            input_ids,
            inputs_embeds,
            multimodal_embeddings,
            context_token_ids,
        )
    return inputs_embeds

get_language_model

get_language_model() -> Module
Source code in vllm/model_executor/models/nemotron_vl.py
def get_language_model(self) -> torch.nn.Module:
    return self.language_model

get_mm_mapping

get_mm_mapping() -> MultiModelKeys

Get the module prefix in multimodal models

Source code in vllm/model_executor/models/nemotron_vl.py
def get_mm_mapping(self) -> MultiModelKeys:
    """
    Get the module prefix in multimodal models
    """
    return MultiModelKeys.from_string_field(
        language_model="language_model",
        connector="mlp1",
        tower_model="vision_model")

get_multimodal_embeddings

get_multimodal_embeddings(
    **kwargs: object,
) -> MultiModalEmbeddings
Source code in vllm/model_executor/models/nemotron_vl.py
def get_multimodal_embeddings(self,
                              **kwargs: object) -> MultiModalEmbeddings:

    modalities = self._parse_and_validate_multimodal_inputs(**kwargs)
    if not modalities:
        return []

    # The result multimodal_embeddings is tuple of tensors, with each
    # tensor correspoending to a multimodal data item (image).
    multimodal_embeddings: tuple[torch.Tensor, ...] = ()

    # NOTE: It is important to iterate over the keys in this dictionary
    # to preserve the order of the modalities.
    for modality in modalities:
        if modality == "images":
            image_input = modalities["images"]
            vision_embeddings = self._process_image_input(image_input)
            multimodal_embeddings += vision_embeddings

    return multimodal_embeddings

get_placeholder_str classmethod

get_placeholder_str(modality: str, i: int) -> Optional[str]
Source code in vllm/model_executor/models/nemotron_vl.py
@classmethod
def get_placeholder_str(cls, modality: str, i: int) -> Optional[str]:
    if modality.startswith("image"):
        return "<image>"

    raise ValueError("Only image modality is supported")

load_weights

load_weights(
    weights: Iterable[tuple[str, Tensor]],
) -> set[str]
Source code in vllm/model_executor/models/nemotron_vl.py
def load_weights(self, weights: Iterable[tuple[str,
                                               torch.Tensor]]) -> set[str]:
    ## Ignore registered_buffers
    ## see https://huggingface.co/nvidia/C-RADIOv2-H/blob/main/input_conditioner.py#L28 # noqa: E501
    skip_substrs = ["norm_mean", "norm_std"]
    loader = AutoWeightsLoader(self, skip_substrs=skip_substrs)
    return loader.load_weights(weights)

pixel_shuffle

pixel_shuffle(x, scale_factor=0.5)
Source code in vllm/model_executor/models/nemotron_vl.py
def pixel_shuffle(self, x, scale_factor=0.5):
    n, w, h, c = x.size()
    # N, W, H, C --> N, W, H * scale, C // scale
    x = x.view(n, w, int(h * scale_factor), int(c / scale_factor))
    # N, W, H * scale, C // scale --> N, H * scale, W, C // scale
    x = x.permute(0, 2, 1, 3).contiguous()
    x = x.view(n, int(h * scale_factor), int(w * scale_factor),
               int(c / (scale_factor * scale_factor)))
    if self.ps_version == 'v1':
        pass
    else:
        x = x.permute(0, 2, 1, 3).contiguous()
    return x

NemotronVLProcessingInfo

Bases: BaseInternVLProcessingInfo

Processing info for Nemotron VL models.

Source code in vllm/model_executor/models/nemotron_vl.py
class NemotronVLProcessingInfo(BaseInternVLProcessingInfo):
    """Processing info for Nemotron VL models."""

    def get_hf_processor(self, **kwargs: object) -> NemotronVLProcessor:
        return self.ctx.init_processor(
            NemotronVLProcessor,
            config=self.get_hf_config(),
            tokenizer=self.get_tokenizer(),
            image_processor=self.get_image_processor(),
            **kwargs,
        )

    def get_image_processor(self, **kwargs: object):
        return cached_image_processor_from_config(
            self.ctx.model_config,
            **kwargs,
        )

get_hf_processor

get_hf_processor(**kwargs: object) -> NemotronVLProcessor
Source code in vllm/model_executor/models/nemotron_vl.py
def get_hf_processor(self, **kwargs: object) -> NemotronVLProcessor:
    return self.ctx.init_processor(
        NemotronVLProcessor,
        config=self.get_hf_config(),
        tokenizer=self.get_tokenizer(),
        image_processor=self.get_image_processor(),
        **kwargs,
    )

get_image_processor

get_image_processor(**kwargs: object)
Source code in vllm/model_executor/models/nemotron_vl.py
def get_image_processor(self, **kwargs: object):
    return cached_image_processor_from_config(
        self.ctx.model_config,
        **kwargs,
    )

NemotronVLProcessor

Bases: InternVLProcessor

Source code in vllm/model_executor/models/nemotron_vl.py
class NemotronVLProcessor(InternVLProcessor):

    def __init__(
        self,
        config: PretrainedConfig,
        tokenizer: AnyTokenizer,
        image_processor: BaseImageProcessorFast,
        *,
        min_dynamic_patch: Optional[int] = None,
        max_dynamic_patch: Optional[int] = None,
        dynamic_image_size: Optional[bool] = None,
    ) -> None:
        ABC.__init__(self)
        self.config = config
        self.tokenizer = tokenizer
        self.image_processor = image_processor
        image_size: int = config.force_image_size
        patch_size: int = config.patch_size

        if min_dynamic_patch is None:
            min_dynamic_patch = 1
        assert isinstance(min_dynamic_patch, int)

        if max_dynamic_patch is None:
            max_dynamic_patch = self.image_processor.max_num_tiles
        assert isinstance(max_dynamic_patch, int)

        if dynamic_image_size is None:
            dynamic_image_size = True
        assert isinstance(dynamic_image_size, bool)

        self.num_image_token = int(
            (image_size // patch_size)**2 * (config.downsample_ratio**2))
        self.image_size = image_size
        self.min_dynamic_patch = min_dynamic_patch
        self.max_dynamic_patch = max_dynamic_patch
        self.dynamic_image_size = dynamic_image_size
        self.use_thumbnail: bool = self.image_processor.use_thumbnail

    @property
    def image_token_id(self) -> int:
        return self.tokenizer.get_vocab()[IMG_CONTEXT]

    def get_num_image_tokens(
        self,
        *,
        image_width: int,
        image_height: int,
    ) -> int:
        target_ratios = self.resolve_target_ratios(
            use_thumbnail=False,  # Applied in calculate_targets
        )

        num_patches, _, _ = calculate_nemotron_vl_targets(
            orig_width=image_width,
            orig_height=image_height,
            image_size=self.image_size,
            target_ratios=target_ratios,
            use_thumbnail=self.use_thumbnail,
        )

        return num_patches * self.num_image_token

    def _images_to_pixel_values_lst(
        self,
        images: list[Image.Image],
        min_dynamic_patch: Optional[int] = None,
        max_dynamic_patch: Optional[int] = None,
        dynamic_image_size: Optional[bool] = None,
    ) -> list[torch.Tensor]:
        min_num, max_num = self.resolve_min_max_num(
            min_dynamic_patch=min_dynamic_patch,
            max_dynamic_patch=max_dynamic_patch,
            dynamic_image_size=dynamic_image_size,
            use_thumbnail=False,  # Applied in image_to_pixel_values
        )

        return [
            image_to_pixel_values_nemotron_vl(
                image,
                input_size=self.image_size,
                min_num=min_num,
                max_num=max_num,
                use_thumbnail=self.use_thumbnail,
            ) for image in images
        ]

    def _preprocess_image(
        self,
        text: list[str],
        images: list[Image.Image],
        min_dynamic_patch: Optional[int] = None,
        max_dynamic_patch: Optional[int] = None,
        dynamic_image_size: Optional[bool] = None,
    ) -> tuple[list[str], dict[str, torch.Tensor]]:
        if len(images) == 0:
            image_inputs = {}
        else:
            pixel_values_lst = self._images_to_pixel_values_lst(
                images,
                min_dynamic_patch=min_dynamic_patch,
                max_dynamic_patch=max_dynamic_patch,
                dynamic_image_size=dynamic_image_size,
            )
            image_inputs: dict[str, NestedTensors] = {
                "pixel_values_flat":
                torch.cat(pixel_values_lst),
                "image_num_patches":
                torch.tensor([len(item) for item in pixel_values_lst]),
            }

            for pixel_values in pixel_values_lst:
                num_patches = pixel_values.shape[0]
                feature_size = num_patches * self.num_image_token
                image_repl = self.get_image_repl(feature_size, num_patches)
                NVL_IMAGE_CONTEXT = image_repl.full.replace(
                    "<image>", "<NVL_IMG_CONTEXT>")
                text = [
                    t.replace('<image>', NVL_IMAGE_CONTEXT, 1) for t in text
                ]
            text = [t.replace("<NVL_IMG_CONTEXT>", IMG_CONTEXT) for t in text]
        return text, image_inputs

    def get_image_repl(
        self,
        feature_size: int,
        num_patches: Optional[int],
    ) -> PromptUpdateDetails[str]:
        repl_features = IMG_CONTEXT * feature_size
        repl_full = IMG_START + repl_features + IMG_END

        return PromptUpdateDetails.select_text(repl_full, IMG_CONTEXT)

config instance-attribute

config = config

dynamic_image_size instance-attribute

dynamic_image_size = dynamic_image_size

image_processor instance-attribute

image_processor = image_processor

image_size instance-attribute

image_size = image_size

image_token_id property

image_token_id: int

max_dynamic_patch instance-attribute

max_dynamic_patch = max_dynamic_patch

min_dynamic_patch instance-attribute

min_dynamic_patch = min_dynamic_patch

num_image_token instance-attribute

num_image_token = int(
    (image_size // patch_size) ** 2 * downsample_ratio**2
)

tokenizer instance-attribute

tokenizer = tokenizer

use_thumbnail instance-attribute

use_thumbnail: bool = use_thumbnail

__init__

__init__(
    config: PretrainedConfig,
    tokenizer: AnyTokenizer,
    image_processor: BaseImageProcessorFast,
    *,
    min_dynamic_patch: Optional[int] = None,
    max_dynamic_patch: Optional[int] = None,
    dynamic_image_size: Optional[bool] = None,
) -> None
Source code in vllm/model_executor/models/nemotron_vl.py
def __init__(
    self,
    config: PretrainedConfig,
    tokenizer: AnyTokenizer,
    image_processor: BaseImageProcessorFast,
    *,
    min_dynamic_patch: Optional[int] = None,
    max_dynamic_patch: Optional[int] = None,
    dynamic_image_size: Optional[bool] = None,
) -> None:
    ABC.__init__(self)
    self.config = config
    self.tokenizer = tokenizer
    self.image_processor = image_processor
    image_size: int = config.force_image_size
    patch_size: int = config.patch_size

    if min_dynamic_patch is None:
        min_dynamic_patch = 1
    assert isinstance(min_dynamic_patch, int)

    if max_dynamic_patch is None:
        max_dynamic_patch = self.image_processor.max_num_tiles
    assert isinstance(max_dynamic_patch, int)

    if dynamic_image_size is None:
        dynamic_image_size = True
    assert isinstance(dynamic_image_size, bool)

    self.num_image_token = int(
        (image_size // patch_size)**2 * (config.downsample_ratio**2))
    self.image_size = image_size
    self.min_dynamic_patch = min_dynamic_patch
    self.max_dynamic_patch = max_dynamic_patch
    self.dynamic_image_size = dynamic_image_size
    self.use_thumbnail: bool = self.image_processor.use_thumbnail

_images_to_pixel_values_lst

_images_to_pixel_values_lst(
    images: list[Image],
    min_dynamic_patch: Optional[int] = None,
    max_dynamic_patch: Optional[int] = None,
    dynamic_image_size: Optional[bool] = None,
) -> list[Tensor]
Source code in vllm/model_executor/models/nemotron_vl.py
def _images_to_pixel_values_lst(
    self,
    images: list[Image.Image],
    min_dynamic_patch: Optional[int] = None,
    max_dynamic_patch: Optional[int] = None,
    dynamic_image_size: Optional[bool] = None,
) -> list[torch.Tensor]:
    min_num, max_num = self.resolve_min_max_num(
        min_dynamic_patch=min_dynamic_patch,
        max_dynamic_patch=max_dynamic_patch,
        dynamic_image_size=dynamic_image_size,
        use_thumbnail=False,  # Applied in image_to_pixel_values
    )

    return [
        image_to_pixel_values_nemotron_vl(
            image,
            input_size=self.image_size,
            min_num=min_num,
            max_num=max_num,
            use_thumbnail=self.use_thumbnail,
        ) for image in images
    ]

_preprocess_image

_preprocess_image(
    text: list[str],
    images: list[Image],
    min_dynamic_patch: Optional[int] = None,
    max_dynamic_patch: Optional[int] = None,
    dynamic_image_size: Optional[bool] = None,
) -> tuple[list[str], dict[str, Tensor]]
Source code in vllm/model_executor/models/nemotron_vl.py
def _preprocess_image(
    self,
    text: list[str],
    images: list[Image.Image],
    min_dynamic_patch: Optional[int] = None,
    max_dynamic_patch: Optional[int] = None,
    dynamic_image_size: Optional[bool] = None,
) -> tuple[list[str], dict[str, torch.Tensor]]:
    if len(images) == 0:
        image_inputs = {}
    else:
        pixel_values_lst = self._images_to_pixel_values_lst(
            images,
            min_dynamic_patch=min_dynamic_patch,
            max_dynamic_patch=max_dynamic_patch,
            dynamic_image_size=dynamic_image_size,
        )
        image_inputs: dict[str, NestedTensors] = {
            "pixel_values_flat":
            torch.cat(pixel_values_lst),
            "image_num_patches":
            torch.tensor([len(item) for item in pixel_values_lst]),
        }

        for pixel_values in pixel_values_lst:
            num_patches = pixel_values.shape[0]
            feature_size = num_patches * self.num_image_token
            image_repl = self.get_image_repl(feature_size, num_patches)
            NVL_IMAGE_CONTEXT = image_repl.full.replace(
                "<image>", "<NVL_IMG_CONTEXT>")
            text = [
                t.replace('<image>', NVL_IMAGE_CONTEXT, 1) for t in text
            ]
        text = [t.replace("<NVL_IMG_CONTEXT>", IMG_CONTEXT) for t in text]
    return text, image_inputs

get_image_repl

get_image_repl(
    feature_size: int, num_patches: Optional[int]
) -> PromptUpdateDetails[str]
Source code in vllm/model_executor/models/nemotron_vl.py
def get_image_repl(
    self,
    feature_size: int,
    num_patches: Optional[int],
) -> PromptUpdateDetails[str]:
    repl_features = IMG_CONTEXT * feature_size
    repl_full = IMG_START + repl_features + IMG_END

    return PromptUpdateDetails.select_text(repl_full, IMG_CONTEXT)

get_num_image_tokens

get_num_image_tokens(
    *, image_width: int, image_height: int
) -> int
Source code in vllm/model_executor/models/nemotron_vl.py
def get_num_image_tokens(
    self,
    *,
    image_width: int,
    image_height: int,
) -> int:
    target_ratios = self.resolve_target_ratios(
        use_thumbnail=False,  # Applied in calculate_targets
    )

    num_patches, _, _ = calculate_nemotron_vl_targets(
        orig_width=image_width,
        orig_height=image_height,
        image_size=self.image_size,
        target_ratios=target_ratios,
        use_thumbnail=self.use_thumbnail,
    )

    return num_patches * self.num_image_token

build_transform

build_transform(input_size: int)
Source code in vllm/model_executor/models/nemotron_vl.py
def build_transform(input_size: int):
    return T.Compose([
        T.Lambda(lambda img: convert_image_mode(img, 'RGB')),
        T.Resize((input_size, input_size),
                 interpolation=T.InterpolationMode.BICUBIC),
        T.ToTensor(),
    ])

calculate_nemotron_vl_targets

calculate_nemotron_vl_targets(
    *,
    orig_width: int,
    orig_height: int,
    target_ratios: list[tuple[int, int]],
    image_size: int,
    use_thumbnail: bool,
) -> tuple[int, int, int]
Source code in vllm/model_executor/models/nemotron_vl.py
def calculate_nemotron_vl_targets(
    *,
    orig_width: int,
    orig_height: int,
    target_ratios: list[tuple[int, int]],
    image_size: int,
    use_thumbnail: bool,
) -> tuple[int, int, int]:
    aspect_ratio = orig_width / orig_height

    # find the closest aspect ratio to the target
    target_aspect_ratio = find_closest_aspect_ratio(
        aspect_ratio,
        target_ratios,
        width=orig_width,
        height=orig_height,
        image_size=image_size,
    )

    # calculate the target width and height
    target_width = image_size * target_aspect_ratio[0]
    target_height = image_size * target_aspect_ratio[1]
    blocks = target_aspect_ratio[0] * target_aspect_ratio[1]

    # add thumbnail image if num_blocks != 1
    if use_thumbnail and blocks != 1:
        blocks += 1

    return blocks, target_width, target_height

dynamic_preprocess_nemotron_vl

dynamic_preprocess_nemotron_vl(
    image: Image,
    *,
    target_ratios: list[tuple[int, int]],
    image_size: int,
    use_thumbnail: bool,
) -> list[Image]
Source code in vllm/model_executor/models/nemotron_vl.py
def dynamic_preprocess_nemotron_vl(
    image: Image.Image,
    *,
    target_ratios: list[tuple[int, int]],
    image_size: int,
    use_thumbnail: bool,
) -> list[Image.Image]:
    orig_width, orig_height = image.size

    # calculate the number of blocks without thumbnail
    blocks, target_width, target_height = calculate_nemotron_vl_targets(
        orig_width=orig_width,
        orig_height=orig_height,
        target_ratios=target_ratios,
        image_size=image_size,
        use_thumbnail=False,
    )

    # resize the image
    resized_img = image.resize((target_width, target_height))
    processed_images = []
    for i in range(blocks):
        box = ((i % (target_width // image_size)) * image_size,
               (i // (target_width // image_size)) * image_size,
               ((i % (target_width // image_size)) + 1) * image_size,
               ((i // (target_width // image_size)) + 1) * image_size)
        # split the image
        split_img = resized_img.crop(box)
        processed_images.append(split_img)

    assert len(processed_images) == blocks

    if use_thumbnail and len(processed_images) != 1:
        thumbnail_img = image.resize((image_size, image_size))
        processed_images.append(thumbnail_img)

    return processed_images

find_closest_aspect_ratio

find_closest_aspect_ratio(
    aspect_ratio: float,
    target_ratios: list[tuple[int, int]],
    *,
    width: int,
    height: int,
    image_size: int,
) -> tuple[int, int]
Source code in vllm/model_executor/models/nemotron_vl.py
def find_closest_aspect_ratio(
    aspect_ratio: float,
    target_ratios: list[tuple[int, int]],
    *,
    width: int,
    height: int,
    image_size: int,
) -> tuple[int, int]:
    best_factor = float('-inf')
    best_ratio = (1, 1)
    area = width * height

    for rw, rh in target_ratios:
        target_aspect_ratio = rw / rh
        size_factor = min((rw * rh * image_size * image_size) / area, 0.6)
        ratio_closeness = min(target_aspect_ratio / aspect_ratio,
                              aspect_ratio / target_aspect_ratio)
        factor = size_factor * ratio_closeness

        if factor > best_factor:
            best_factor = factor
            best_ratio = (rw, rh)

    return best_ratio

get_nemotron_vl_target_ratios

get_nemotron_vl_target_ratios(
    min_num: int, max_num: int
) -> list[tuple[int, int]]
Source code in vllm/model_executor/models/nemotron_vl.py
def get_nemotron_vl_target_ratios(
    min_num: int,
    max_num: int,
) -> list[tuple[int, int]]:
    target_ratios = {(i, j)
                     for n in range(min_num, max_num + 1)
                     for i in range(1, n + 1)
                     for j in range(1, n + 1) if min_num <= i * j <= max_num}
    return sorted(target_ratios, key=lambda x: x[0] * x[1])

image_to_pixel_values_nemotron_vl

image_to_pixel_values_nemotron_vl(
    image: Image,
    *,
    input_size: int,
    min_num: int,
    max_num: int,
    use_thumbnail: bool,
) -> Tensor
Source code in vllm/model_executor/models/nemotron_vl.py
def image_to_pixel_values_nemotron_vl(
    image: Image.Image,
    *,
    input_size: int,
    min_num: int,
    max_num: int,
    use_thumbnail: bool,
) -> torch.Tensor:
    target_ratios = get_nemotron_vl_target_ratios(min_num, max_num)

    transform = build_transform(input_size=input_size)

    images = dynamic_preprocess_nemotron_vl(
        image,
        target_ratios=target_ratios,
        image_size=input_size,
        use_thumbnail=use_thumbnail,
    )

    pixel_values = torch.stack([transform(image) for image in images])
    return pixel_values