Skip to content

vllm.model_executor.models.ernie_mtp

Inference-only Ernie-MTP model.

ErnieMTP

Bases: Module, SupportsPP

Source code in vllm/model_executor/models/ernie_mtp.py
class ErnieMTP(nn.Module, SupportsPP):

    def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
        super().__init__()

        self.config = vllm_config.model_config.hf_config
        self.model = ErnieMultiTokenPredictor(vllm_config=vllm_config,
                                              prefix=maybe_prefix(
                                                  prefix, "model"))
        self.lm_head = ParallelLMHead(self.config.vocab_size,
                                      self.config.hidden_size)
        self.sampler = get_sampler()

        if self.config.tie_word_embeddings:
            self.lm_head.weight = self.model.embed_tokens.weight

    def forward(
        self,
        input_ids: torch.Tensor,
        positions: torch.Tensor,
        hidden_states: torch.Tensor,
        intermediate_tensors: Optional[IntermediateTensors] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        spec_step_idx: int = 0,
    ) -> torch.Tensor:
        assert spec_step_idx == 0, "ernie_mtp only support predict one token"
        hidden_states = self.model(input_ids, positions, hidden_states,
                                   inputs_embeds, spec_step_idx)
        return hidden_states

    def compute_logits(
        self,
        hidden_states: torch.Tensor,
        sampling_metadata: SamplingMetadata,
        spec_step_idx: int = 0,
    ) -> Optional[torch.Tensor]:
        return self.model.compute_logits(hidden_states, self.lm_head,
                                         sampling_metadata, spec_step_idx)

    def sample(
        self,
        logits: torch.Tensor,
        sampling_metadata: SamplingMetadata,
    ) -> Optional[SamplerOutput]:
        next_tokens = self.sampler(logits, sampling_metadata)
        return next_tokens

    def load_weights(self, weights: Iterable[tuple[str,
                                                   torch.Tensor]]) -> set[str]:
        stacked_params_mapping = [
            ("qkv_proj", "q_proj", "q"),
            ("qkv_proj", "k_proj", "k"),
            ("qkv_proj", "v_proj", "v"),
            ("gate_up_proj", "gate_proj", 0),
            ("gate_up_proj", "up_proj", 1),
        ]

        params_dict = dict(self.named_parameters())
        loaded_params: set[str] = set()
        for name, loaded_weight in weights:

            if self.config.tie_word_embeddings and name.endswith(
                    "lm_head.weight"):
                continue
            if "rotary_emb.inv_freq" in name:
                continue
            if "mtp" in name:
                name = self._rewrite_spec_layer_name(self.config, name)

            for (param_name, weight_name, shard_id) in stacked_params_mapping:
                # Skip non-stacked layers and experts (experts handled below).
                if weight_name not in name:
                    continue
                if "mtp" not in name:
                    continue
                # We have mlp.experts[0].gate_proj in the checkpoint.
                # Since we handle the experts below in expert_params_mapping,
                # we need to skip here BEFORE we update the name, otherwise
                # name will be updated to mlp.experts[0].gate_up_proj, which
                # will then be updated below in expert_params_mapping
                # for mlp.experts[0].gate_gate_up_proj, which breaks load.
                if (("mlp.experts." in name) and name not in params_dict):
                    continue
                name = name.replace(weight_name, param_name)
                # Skip loading extra bias for GPTQ models.
                if ((name.endswith(".bias") or name.endswith("_bias"))
                        and name not in params_dict):
                    continue
                # Skip layers on other devices.
                if is_pp_missing_parameter(name, self):
                    continue

                param = params_dict[name]
                weight_loader = param.weight_loader
                weight_loader(param, loaded_weight, shard_id)
                break
            else:
                # Skip loading extra bias for GPTQ models.
                if ((name.endswith(".bias") or name.endswith("_bias"))
                        and name not in params_dict):
                    continue
                # Skip layers on other devices.
                if is_pp_missing_parameter(name, self):
                    continue

                # According to DeepSeek-V3 Technical Report, MTP modules
                # shares embedding layer. We only load the first weights.
                if "mtp_" not in name and ("embed_tokens" not in name
                                           and "lm_head" not in name):
                    continue

                param = params_dict[name]
                weight_loader = getattr(param, "weight_loader",
                                        default_weight_loader)
                weight_loader(param, loaded_weight)
            loaded_params.add(name)
        return loaded_params

    def _rewrite_spec_layer_name(self, config: PretrainedConfig,
                                 name: str) -> str:
        """
        Rewrite the weight name to match the format of the original model.
        """
        spec_layer_weight_names = [
            "embed_tokens", "mtp_emb_norm", "mtp_hidden_norm",
            "mtp_linear_proj"
        ]
        layer_idx = config.num_hidden_layers
        for weight_name in spec_layer_weight_names:
            if weight_name in name:
                name = name.replace(
                    f"model.{weight_name}.0.",
                    f"model.layers.{layer_idx}.{weight_name}.")
                return name
        name = name.replace("model.mtp_block.0.",
                            f"model.layers.{layer_idx}.mtp_block.")
        return name

config instance-attribute

config = hf_config

lm_head instance-attribute

lm_head = ParallelLMHead(vocab_size, hidden_size)

model instance-attribute

model = ErnieMultiTokenPredictor(
    vllm_config=vllm_config,
    prefix=maybe_prefix(prefix, "model"),
)

sampler instance-attribute

sampler = get_sampler()

__init__

__init__(*, vllm_config: VllmConfig, prefix: str = '')
Source code in vllm/model_executor/models/ernie_mtp.py
def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
    super().__init__()

    self.config = vllm_config.model_config.hf_config
    self.model = ErnieMultiTokenPredictor(vllm_config=vllm_config,
                                          prefix=maybe_prefix(
                                              prefix, "model"))
    self.lm_head = ParallelLMHead(self.config.vocab_size,
                                  self.config.hidden_size)
    self.sampler = get_sampler()

    if self.config.tie_word_embeddings:
        self.lm_head.weight = self.model.embed_tokens.weight

_rewrite_spec_layer_name

_rewrite_spec_layer_name(
    config: PretrainedConfig, name: str
) -> str

Rewrite the weight name to match the format of the original model.

Source code in vllm/model_executor/models/ernie_mtp.py
def _rewrite_spec_layer_name(self, config: PretrainedConfig,
                             name: str) -> str:
    """
    Rewrite the weight name to match the format of the original model.
    """
    spec_layer_weight_names = [
        "embed_tokens", "mtp_emb_norm", "mtp_hidden_norm",
        "mtp_linear_proj"
    ]
    layer_idx = config.num_hidden_layers
    for weight_name in spec_layer_weight_names:
        if weight_name in name:
            name = name.replace(
                f"model.{weight_name}.0.",
                f"model.layers.{layer_idx}.{weight_name}.")
            return name
    name = name.replace("model.mtp_block.0.",
                        f"model.layers.{layer_idx}.mtp_block.")
    return name

compute_logits

compute_logits(
    hidden_states: Tensor,
    sampling_metadata: SamplingMetadata,
    spec_step_idx: int = 0,
) -> Optional[Tensor]
Source code in vllm/model_executor/models/ernie_mtp.py
def compute_logits(
    self,
    hidden_states: torch.Tensor,
    sampling_metadata: SamplingMetadata,
    spec_step_idx: int = 0,
) -> Optional[torch.Tensor]:
    return self.model.compute_logits(hidden_states, self.lm_head,
                                     sampling_metadata, spec_step_idx)

forward

forward(
    input_ids: Tensor,
    positions: Tensor,
    hidden_states: Tensor,
    intermediate_tensors: Optional[
        IntermediateTensors
    ] = None,
    inputs_embeds: Optional[Tensor] = None,
    spec_step_idx: int = 0,
) -> Tensor
Source code in vllm/model_executor/models/ernie_mtp.py
def forward(
    self,
    input_ids: torch.Tensor,
    positions: torch.Tensor,
    hidden_states: torch.Tensor,
    intermediate_tensors: Optional[IntermediateTensors] = None,
    inputs_embeds: Optional[torch.Tensor] = None,
    spec_step_idx: int = 0,
) -> torch.Tensor:
    assert spec_step_idx == 0, "ernie_mtp only support predict one token"
    hidden_states = self.model(input_ids, positions, hidden_states,
                               inputs_embeds, spec_step_idx)
    return hidden_states

load_weights

load_weights(
    weights: Iterable[tuple[str, Tensor]],
) -> set[str]
Source code in vllm/model_executor/models/ernie_mtp.py
def load_weights(self, weights: Iterable[tuple[str,
                                               torch.Tensor]]) -> set[str]:
    stacked_params_mapping = [
        ("qkv_proj", "q_proj", "q"),
        ("qkv_proj", "k_proj", "k"),
        ("qkv_proj", "v_proj", "v"),
        ("gate_up_proj", "gate_proj", 0),
        ("gate_up_proj", "up_proj", 1),
    ]

    params_dict = dict(self.named_parameters())
    loaded_params: set[str] = set()
    for name, loaded_weight in weights:

        if self.config.tie_word_embeddings and name.endswith(
                "lm_head.weight"):
            continue
        if "rotary_emb.inv_freq" in name:
            continue
        if "mtp" in name:
            name = self._rewrite_spec_layer_name(self.config, name)

        for (param_name, weight_name, shard_id) in stacked_params_mapping:
            # Skip non-stacked layers and experts (experts handled below).
            if weight_name not in name:
                continue
            if "mtp" not in name:
                continue
            # We have mlp.experts[0].gate_proj in the checkpoint.
            # Since we handle the experts below in expert_params_mapping,
            # we need to skip here BEFORE we update the name, otherwise
            # name will be updated to mlp.experts[0].gate_up_proj, which
            # will then be updated below in expert_params_mapping
            # for mlp.experts[0].gate_gate_up_proj, which breaks load.
            if (("mlp.experts." in name) and name not in params_dict):
                continue
            name = name.replace(weight_name, param_name)
            # Skip loading extra bias for GPTQ models.
            if ((name.endswith(".bias") or name.endswith("_bias"))
                    and name not in params_dict):
                continue
            # Skip layers on other devices.
            if is_pp_missing_parameter(name, self):
                continue

            param = params_dict[name]
            weight_loader = param.weight_loader
            weight_loader(param, loaded_weight, shard_id)
            break
        else:
            # Skip loading extra bias for GPTQ models.
            if ((name.endswith(".bias") or name.endswith("_bias"))
                    and name not in params_dict):
                continue
            # Skip layers on other devices.
            if is_pp_missing_parameter(name, self):
                continue

            # According to DeepSeek-V3 Technical Report, MTP modules
            # shares embedding layer. We only load the first weights.
            if "mtp_" not in name and ("embed_tokens" not in name
                                       and "lm_head" not in name):
                continue

            param = params_dict[name]
            weight_loader = getattr(param, "weight_loader",
                                    default_weight_loader)
            weight_loader(param, loaded_weight)
        loaded_params.add(name)
    return loaded_params

sample

sample(
    logits: Tensor, sampling_metadata: SamplingMetadata
) -> Optional[SamplerOutput]
Source code in vllm/model_executor/models/ernie_mtp.py
def sample(
    self,
    logits: torch.Tensor,
    sampling_metadata: SamplingMetadata,
) -> Optional[SamplerOutput]:
    next_tokens = self.sampler(logits, sampling_metadata)
    return next_tokens

ErnieMultiTokenPredictor

Bases: Module

Source code in vllm/model_executor/models/ernie_mtp.py
class ErnieMultiTokenPredictor(nn.Module):

    def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
        super().__init__()

        config = vllm_config.model_config.hf_config
        self.mtp_start_layer_idx = config.num_hidden_layers
        self.num_mtp_layers = config.num_nextn_predict_layers
        # to map the exact layer index from weights
        self.layers = torch.nn.ModuleDict({
            str(idx):
            ErnieMultiTokenPredictorLayer(
                config,
                f"{prefix}.layers.{idx}",
                model_config=vllm_config.model_config,
                cache_config=vllm_config.cache_config,
            )
            for idx in range(self.mtp_start_layer_idx,
                             self.mtp_start_layer_idx + self.num_mtp_layers)
        })
        self.embed_tokens = VocabParallelEmbedding(
            config.vocab_size,
            config.hidden_size,
        )
        self.logits_processor = LogitsProcessor(config.vocab_size)

    def forward(
        self,
        input_ids: torch.Tensor,
        positions: torch.Tensor,
        previous_hidden_states: torch.Tensor,
        inputs_embeds: Optional[torch.Tensor] = None,
        spec_step_idx: int = 0,
    ) -> torch.Tensor:
        if inputs_embeds is None:
            inputs_embeds = self.embed_tokens(input_ids)
        return self.layers[str(self.mtp_start_layer_idx + spec_step_idx)](
            inputs_embeds,
            positions,
            previous_hidden_states,
            spec_step_idx,
        )

    def compute_logits(
        self,
        hidden_states: torch.Tensor,
        lm_head: ParallelLMHead,
        sampling_metadata: SamplingMetadata,
        spec_step_idx: int = 0,
    ) -> torch.Tensor:
        self.layers[str(self.mtp_start_layer_idx + spec_step_idx)]
        logits = self.logits_processor(lm_head, hidden_states,
                                       sampling_metadata)
        return logits

embed_tokens instance-attribute

embed_tokens = VocabParallelEmbedding(
    vocab_size, hidden_size
)

layers instance-attribute

layers = ModuleDict(
    {
        (str(idx)): (
            ErnieMultiTokenPredictorLayer(
                config,
                f"{prefix}.layers.{idx}",
                model_config=model_config,
                cache_config=cache_config,
            )
        )
        for idx in (
            range(
                mtp_start_layer_idx,
                mtp_start_layer_idx + num_mtp_layers,
            )
        )
    }
)

logits_processor instance-attribute

logits_processor = LogitsProcessor(vocab_size)

mtp_start_layer_idx instance-attribute

mtp_start_layer_idx = num_hidden_layers

num_mtp_layers instance-attribute

num_mtp_layers = num_nextn_predict_layers

__init__

__init__(*, vllm_config: VllmConfig, prefix: str = '')
Source code in vllm/model_executor/models/ernie_mtp.py
def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
    super().__init__()

    config = vllm_config.model_config.hf_config
    self.mtp_start_layer_idx = config.num_hidden_layers
    self.num_mtp_layers = config.num_nextn_predict_layers
    # to map the exact layer index from weights
    self.layers = torch.nn.ModuleDict({
        str(idx):
        ErnieMultiTokenPredictorLayer(
            config,
            f"{prefix}.layers.{idx}",
            model_config=vllm_config.model_config,
            cache_config=vllm_config.cache_config,
        )
        for idx in range(self.mtp_start_layer_idx,
                         self.mtp_start_layer_idx + self.num_mtp_layers)
    })
    self.embed_tokens = VocabParallelEmbedding(
        config.vocab_size,
        config.hidden_size,
    )
    self.logits_processor = LogitsProcessor(config.vocab_size)

compute_logits

compute_logits(
    hidden_states: Tensor,
    lm_head: ParallelLMHead,
    sampling_metadata: SamplingMetadata,
    spec_step_idx: int = 0,
) -> Tensor
Source code in vllm/model_executor/models/ernie_mtp.py
def compute_logits(
    self,
    hidden_states: torch.Tensor,
    lm_head: ParallelLMHead,
    sampling_metadata: SamplingMetadata,
    spec_step_idx: int = 0,
) -> torch.Tensor:
    self.layers[str(self.mtp_start_layer_idx + spec_step_idx)]
    logits = self.logits_processor(lm_head, hidden_states,
                                   sampling_metadata)
    return logits

forward

forward(
    input_ids: Tensor,
    positions: Tensor,
    previous_hidden_states: Tensor,
    inputs_embeds: Optional[Tensor] = None,
    spec_step_idx: int = 0,
) -> Tensor
Source code in vllm/model_executor/models/ernie_mtp.py
def forward(
    self,
    input_ids: torch.Tensor,
    positions: torch.Tensor,
    previous_hidden_states: torch.Tensor,
    inputs_embeds: Optional[torch.Tensor] = None,
    spec_step_idx: int = 0,
) -> torch.Tensor:
    if inputs_embeds is None:
        inputs_embeds = self.embed_tokens(input_ids)
    return self.layers[str(self.mtp_start_layer_idx + spec_step_idx)](
        inputs_embeds,
        positions,
        previous_hidden_states,
        spec_step_idx,
    )

ErnieMultiTokenPredictorLayer

Bases: Module

Source code in vllm/model_executor/models/ernie_mtp.py
class ErnieMultiTokenPredictorLayer(nn.Module):

    def __init__(
        self,
        config: PretrainedConfig,
        prefix: str,
        model_config: ModelConfig,
        cache_config: Optional[CacheConfig] = None,
        quant_config: Optional[QuantizationConfig] = None,
    ) -> None:
        super().__init__()

        self.mtp_emb_norm = RMSNorm(config.hidden_size,
                                    eps=config.rms_norm_eps)
        self.mtp_hidden_norm = RMSNorm(config.hidden_size,
                                       eps=config.rms_norm_eps)
        self.mtp_linear_proj = nn.Linear(config.hidden_size * 2,
                                         config.hidden_size,
                                         bias=False)
        self.mtp_block = LlamaDecoderLayer(config, cache_config, quant_config,
                                           prefix)

    def forward(
        self,
        inputs_embeds: torch.Tensor,
        positions: torch.Tensor,
        previous_hidden_states: torch.Tensor,
        spec_step_index: int = 0,
    ) -> torch.Tensor:
        assert inputs_embeds is not None
        # masking inputs at position 0, as not needed by MTP
        inputs_embeds[positions == 0] = 0

        inputs_embeds = self.mtp_emb_norm(inputs_embeds)
        previous_hidden_states = self.mtp_hidden_norm(previous_hidden_states)

        hidden_states = self.mtp_linear_proj(
            torch.cat([inputs_embeds, previous_hidden_states], dim=-1))

        hidden_states, residual = self.mtp_block(positions=positions,
                                                 hidden_states=hidden_states,
                                                 residual=None)
        hidden_states = residual + hidden_states

        return hidden_states

mtp_block instance-attribute

mtp_block = LlamaDecoderLayer(
    config, cache_config, quant_config, prefix
)

mtp_emb_norm instance-attribute

mtp_emb_norm = RMSNorm(hidden_size, eps=rms_norm_eps)

mtp_hidden_norm instance-attribute

mtp_hidden_norm = RMSNorm(hidden_size, eps=rms_norm_eps)

mtp_linear_proj instance-attribute

mtp_linear_proj = Linear(
    hidden_size * 2, hidden_size, bias=False
)

__init__

__init__(
    config: PretrainedConfig,
    prefix: str,
    model_config: ModelConfig,
    cache_config: Optional[CacheConfig] = None,
    quant_config: Optional[QuantizationConfig] = None,
) -> None
Source code in vllm/model_executor/models/ernie_mtp.py
def __init__(
    self,
    config: PretrainedConfig,
    prefix: str,
    model_config: ModelConfig,
    cache_config: Optional[CacheConfig] = None,
    quant_config: Optional[QuantizationConfig] = None,
) -> None:
    super().__init__()

    self.mtp_emb_norm = RMSNorm(config.hidden_size,
                                eps=config.rms_norm_eps)
    self.mtp_hidden_norm = RMSNorm(config.hidden_size,
                                   eps=config.rms_norm_eps)
    self.mtp_linear_proj = nn.Linear(config.hidden_size * 2,
                                     config.hidden_size,
                                     bias=False)
    self.mtp_block = LlamaDecoderLayer(config, cache_config, quant_config,
                                       prefix)

forward

forward(
    inputs_embeds: Tensor,
    positions: Tensor,
    previous_hidden_states: Tensor,
    spec_step_index: int = 0,
) -> Tensor
Source code in vllm/model_executor/models/ernie_mtp.py
def forward(
    self,
    inputs_embeds: torch.Tensor,
    positions: torch.Tensor,
    previous_hidden_states: torch.Tensor,
    spec_step_index: int = 0,
) -> torch.Tensor:
    assert inputs_embeds is not None
    # masking inputs at position 0, as not needed by MTP
    inputs_embeds[positions == 0] = 0

    inputs_embeds = self.mtp_emb_norm(inputs_embeds)
    previous_hidden_states = self.mtp_hidden_norm(previous_hidden_states)

    hidden_states = self.mtp_linear_proj(
        torch.cat([inputs_embeds, previous_hidden_states], dim=-1))

    hidden_states, residual = self.mtp_block(positions=positions,
                                             hidden_states=hidden_states,
                                             residual=None)
    hidden_states = residual + hidden_states

    return hidden_states