Skip to content

vllm.compilation.collective_fusion

ALLREDUCE_OP module-attribute

ALLREDUCE_OP = default

FP8_DTYPE module-attribute

FP8_DTYPE = fp8_dtype()

MiB module-attribute

MiB = 1024 * 1024

RMS_ADD_OP module-attribute

RMS_ADD_OP = default

RMS_OP module-attribute

RMS_OP = default

STATIC_FP4_QUANT_OP module-attribute

STATIC_FP4_QUANT_OP = default

STATIC_FP8_QUANT_OP module-attribute

STATIC_FP8_QUANT_OP = default

_DEFAULT_FI_MAX_SIZE module-attribute

_DEFAULT_FI_MAX_SIZE = MiB // 2

_FI_MAX_SIZES module-attribute

_FI_MAX_SIZES = {
    2: 64 * MiB,
    4: MiB,
    6: MiB // 2,
    8: MiB // 2,
}

_FI_WORKSPACE_TENSOR module-attribute

_FI_WORKSPACE_TENSOR = None

flashinfer_trtllm_fused_allreduce_norm module-attribute

flashinfer_trtllm_fused_allreduce_norm = default

logger module-attribute

logger = init_logger(__name__)

AllGatherCutlassScaledMMPattern

Bases: BasePattern

Source code in vllm/compilation/collective_fusion.py
class AllGatherCutlassScaledMMPattern(BasePattern):

    def get_inputs(self):
        x = torch.empty([8, 16], device=self.device, dtype=FP8_DTYPE)
        weight = torch.empty([16, 16], device=self.device,
                             dtype=FP8_DTYPE).contiguous().transpose(0, 1)

        s1 = x.shape[0] * self.tp_size

        scale_a = torch.empty([s1, 1], device=self.device, dtype=torch.float32)
        scale_b = torch.empty([1, 16], device=self.device, dtype=torch.float32)

        s2 = weight.shape[1]
        output = torch.empty([s1, s2], device=self.device, dtype=self.dtype)

        return [x, weight, scale_a, scale_b, output]

    def register(self, pm_pass: PatternMatcherPass):

        def pattern(
            x: torch.Tensor,
            weight: torch.Tensor,
            scale_a: torch.Tensor,
            scale_b: torch.Tensor,
            output: torch.Tensor,
        ) -> torch.Tensor:
            all_gather = torch.ops.vllm.all_gather.default(
                x,
                dim=0,
                world_size=self.tp_size,
                group_name=self.tp.unique_name)

            cutlass_scaled_mm = torch.ops.higher_order.auto_functionalized(
                torch.ops._C.cutlass_scaled_mm.default,
                out=output,
                a=all_gather,
                b=weight,
                a_scales=scale_a,
                b_scales=scale_b,
                bias=None)
            return cutlass_scaled_mm[1]

        def replacement(x: torch.Tensor, weight: torch.Tensor,
                        scale_a: torch.Tensor, scale_b: torch.Tensor,
                        output: torch.Tensor) -> torch.Tensor:
            ag_output, mm_outputs = torch.ops.symm_mem.fused_all_gather_scaled_matmul(  # noqa
                x,
                [weight],
                scale_a,
                [scale_b],
                gather_dim=0,
                biases=[None],
                result_scales=[None],
                out_dtypes=[self.dtype],
                use_fast_accum=[False],
                group_name=self.tp.device_group.group_name,
            )
            return mm_outputs

        pm.register_replacement(pattern, replacement, self.get_inputs(),
                                pm.fwd_only, pm_pass)

get_inputs

get_inputs()
Source code in vllm/compilation/collective_fusion.py
def get_inputs(self):
    x = torch.empty([8, 16], device=self.device, dtype=FP8_DTYPE)
    weight = torch.empty([16, 16], device=self.device,
                         dtype=FP8_DTYPE).contiguous().transpose(0, 1)

    s1 = x.shape[0] * self.tp_size

    scale_a = torch.empty([s1, 1], device=self.device, dtype=torch.float32)
    scale_b = torch.empty([1, 16], device=self.device, dtype=torch.float32)

    s2 = weight.shape[1]
    output = torch.empty([s1, s2], device=self.device, dtype=self.dtype)

    return [x, weight, scale_a, scale_b, output]

register

register(pm_pass: PatternMatcherPass)
Source code in vllm/compilation/collective_fusion.py
def register(self, pm_pass: PatternMatcherPass):

    def pattern(
        x: torch.Tensor,
        weight: torch.Tensor,
        scale_a: torch.Tensor,
        scale_b: torch.Tensor,
        output: torch.Tensor,
    ) -> torch.Tensor:
        all_gather = torch.ops.vllm.all_gather.default(
            x,
            dim=0,
            world_size=self.tp_size,
            group_name=self.tp.unique_name)

        cutlass_scaled_mm = torch.ops.higher_order.auto_functionalized(
            torch.ops._C.cutlass_scaled_mm.default,
            out=output,
            a=all_gather,
            b=weight,
            a_scales=scale_a,
            b_scales=scale_b,
            bias=None)
        return cutlass_scaled_mm[1]

    def replacement(x: torch.Tensor, weight: torch.Tensor,
                    scale_a: torch.Tensor, scale_b: torch.Tensor,
                    output: torch.Tensor) -> torch.Tensor:
        ag_output, mm_outputs = torch.ops.symm_mem.fused_all_gather_scaled_matmul(  # noqa
            x,
            [weight],
            scale_a,
            [scale_b],
            gather_dim=0,
            biases=[None],
            result_scales=[None],
            out_dtypes=[self.dtype],
            use_fast_accum=[False],
            group_name=self.tp.device_group.group_name,
        )
        return mm_outputs

    pm.register_replacement(pattern, replacement, self.get_inputs(),
                            pm.fwd_only, pm_pass)

AllGatherGEMMPattern

Bases: BasePattern

Source code in vllm/compilation/collective_fusion.py
class AllGatherGEMMPattern(BasePattern):

    def get_inputs(self):
        x = torch.empty([4, 4], device=self.device, dtype=self.dtype)
        weight = torch.empty([4, 4], device=self.device, dtype=self.dtype)

        return [x, weight]

    def register(self, pm_pass: PatternMatcherPass):

        def pattern(
            x: torch.Tensor,
            weight: torch.Tensor,
        ) -> tuple[torch.Tensor, torch.Tensor]:
            all_gather = torch.ops.vllm.all_gather.default(
                x,
                dim=0,
                world_size=self.tp_size,
                group_name=self.tp.unique_name,
            )

            return torch.ops.aten.mm.default(all_gather, weight)

        def replacement(
                x: torch.Tensor,
                weight: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
            ag_output, mm_outputs = torch.ops.symm_mem.fused_all_gather_matmul(
                x,
                [weight],
                gather_dim=0,
                group_name=self.tp.device_group.group_name,
            )
            return mm_outputs

        pm.register_replacement(pattern, replacement, self.get_inputs(),
                                pm.fwd_only, pm_pass)

get_inputs

get_inputs()
Source code in vllm/compilation/collective_fusion.py
def get_inputs(self):
    x = torch.empty([4, 4], device=self.device, dtype=self.dtype)
    weight = torch.empty([4, 4], device=self.device, dtype=self.dtype)

    return [x, weight]

register

register(pm_pass: PatternMatcherPass)
Source code in vllm/compilation/collective_fusion.py
def register(self, pm_pass: PatternMatcherPass):

    def pattern(
        x: torch.Tensor,
        weight: torch.Tensor,
    ) -> tuple[torch.Tensor, torch.Tensor]:
        all_gather = torch.ops.vllm.all_gather.default(
            x,
            dim=0,
            world_size=self.tp_size,
            group_name=self.tp.unique_name,
        )

        return torch.ops.aten.mm.default(all_gather, weight)

    def replacement(
            x: torch.Tensor,
            weight: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
        ag_output, mm_outputs = torch.ops.symm_mem.fused_all_gather_matmul(
            x,
            [weight],
            gather_dim=0,
            group_name=self.tp.device_group.group_name,
        )
        return mm_outputs

    pm.register_replacement(pattern, replacement, self.get_inputs(),
                            pm.fwd_only, pm_pass)

AllGatherScaledMMPattern

Bases: BasePattern

Source code in vllm/compilation/collective_fusion.py
class AllGatherScaledMMPattern(BasePattern):

    def get_inputs(self):
        x = torch.empty([8, 16], device=self.device, dtype=FP8_DTYPE)
        weight = torch.empty([16, 16], device=self.device,
                             dtype=FP8_DTYPE).contiguous().transpose(0, 1)

        s1 = x.shape[0] * self.tp_size

        scale_a = torch.empty([s1, 1], device=self.device, dtype=torch.float32)
        scale_b = torch.empty([1, 16], device=self.device, dtype=torch.float32)

        return [x, weight, scale_a, scale_b]

    def register(self, pm_pass: PatternMatcherPass):

        def pattern(
            x: torch.Tensor,
            weight: torch.Tensor,
            scale_a: torch.Tensor,
            scale_b: torch.Tensor,
        ) -> torch.Tensor:
            all_gather = torch.ops.vllm.all_gather.default(
                x,
                dim=0,
                world_size=self.tp_size,
                group_name=self.tp.unique_name)

            return torch.ops.aten._scaled_mm.default(all_gather,
                                                     mat2=weight,
                                                     scale_a=scale_a,
                                                     scale_b=scale_b,
                                                     bias=None,
                                                     scale_result=None,
                                                     out_dtype=self.dtype)

        def replacement(x: torch.Tensor, weight: torch.Tensor,
                        scale_a: torch.Tensor,
                        scale_b: torch.Tensor) -> torch.Tensor:
            ag_output, mm_outputs = torch.ops.symm_mem.fused_all_gather_scaled_matmul(  # noqa
                x,
                [weight],
                scale_a,
                [scale_b],
                gather_dim=0,
                biases=[None],
                result_scales=[None],
                out_dtypes=[self.dtype],
                use_fast_accum=[False],
                group_name=self.tp.device_group.group_name,
            )
            return mm_outputs

        pm.register_replacement(pattern, replacement, self.get_inputs(),
                                pm.fwd_only, pm_pass)

get_inputs

get_inputs()
Source code in vllm/compilation/collective_fusion.py
def get_inputs(self):
    x = torch.empty([8, 16], device=self.device, dtype=FP8_DTYPE)
    weight = torch.empty([16, 16], device=self.device,
                         dtype=FP8_DTYPE).contiguous().transpose(0, 1)

    s1 = x.shape[0] * self.tp_size

    scale_a = torch.empty([s1, 1], device=self.device, dtype=torch.float32)
    scale_b = torch.empty([1, 16], device=self.device, dtype=torch.float32)

    return [x, weight, scale_a, scale_b]

register

register(pm_pass: PatternMatcherPass)
Source code in vllm/compilation/collective_fusion.py
def register(self, pm_pass: PatternMatcherPass):

    def pattern(
        x: torch.Tensor,
        weight: torch.Tensor,
        scale_a: torch.Tensor,
        scale_b: torch.Tensor,
    ) -> torch.Tensor:
        all_gather = torch.ops.vllm.all_gather.default(
            x,
            dim=0,
            world_size=self.tp_size,
            group_name=self.tp.unique_name)

        return torch.ops.aten._scaled_mm.default(all_gather,
                                                 mat2=weight,
                                                 scale_a=scale_a,
                                                 scale_b=scale_b,
                                                 bias=None,
                                                 scale_result=None,
                                                 out_dtype=self.dtype)

    def replacement(x: torch.Tensor, weight: torch.Tensor,
                    scale_a: torch.Tensor,
                    scale_b: torch.Tensor) -> torch.Tensor:
        ag_output, mm_outputs = torch.ops.symm_mem.fused_all_gather_scaled_matmul(  # noqa
            x,
            [weight],
            scale_a,
            [scale_b],
            gather_dim=0,
            biases=[None],
            result_scales=[None],
            out_dtypes=[self.dtype],
            use_fast_accum=[False],
            group_name=self.tp.device_group.group_name,
        )
        return mm_outputs

    pm.register_replacement(pattern, replacement, self.get_inputs(),
                            pm.fwd_only, pm_pass)

AllReduceFusedAddRMSNormPattern

Bases: BasePattern

This pattern replaces the allreduce + rms norm (with residual) with fused flashinfer implementation. Applies to o_proj + rmsnorm after attn and mlp + rmsnorm before attn.

Source code in vllm/compilation/collective_fusion.py
class AllReduceFusedAddRMSNormPattern(BasePattern):
    """
    This pattern replaces the allreduce + rms norm (with residual) 
    with fused flashinfer implementation.
    Applies to o_proj + rmsnorm after attn and mlp + rmsnorm before attn.
    """

    def __init__(
        self,
        epsilon: float,
        dtype: torch.dtype,
        device: str,
        allreduce_params: FlashInferFusedAllReduceParams,
    ):
        super().__init__(dtype, device)
        self.epsilon = epsilon
        self.allreduce_params = allreduce_params

    def get_inputs(self):
        input = torch.empty([4, 4], device=self.device, dtype=self.dtype)
        residual = torch.empty([4, 4], device=self.device, dtype=self.dtype)
        weight = torch.empty([4, 4], device=self.device, dtype=self.dtype)
        return [
            residual,
            input,
            weight,
        ]

    def register(self, pm_pass: PatternMatcherPass):

        def pattern(residual: torch.Tensor, input: torch.Tensor,
                    weight: torch.Tensor):
            allreduce_output = tensor_model_parallel_all_reduce(input)
            rms = auto_functionalized(
                RMS_ADD_OP,
                input=allreduce_output,
                residual=residual,
                weight=weight,
                epsilon=self.epsilon,
            )
            # input, residual
            return rms[1], rms[2]

        def replacement(residual: torch.Tensor, input: torch.Tensor,
                        weight: torch.Tensor):
            allreduce = auto_functionalized(
                flashinfer_trtllm_fused_allreduce_norm,
                allreduce_in=input,
                residual=residual,
                norm_out=None,
                quant_out=None,
                scale_out=None,
                rms_gamma=weight,
                rms_eps=self.epsilon,
                pattern_code=flashinfer_comm.AllReduceFusionPattern.
                kARResidualRMSNorm,
                **self.allreduce_params.get_trtllm_fused_allreduce_kwargs(),
            )
            # allreduce_in, residual
            return allreduce[1], allreduce[2]

        pm.register_replacement(pattern, replacement, self.get_inputs(),
                                pm.fwd_only, pm_pass)

allreduce_params instance-attribute

allreduce_params = allreduce_params

epsilon instance-attribute

epsilon = epsilon

__init__

__init__(
    epsilon: float,
    dtype: dtype,
    device: str,
    allreduce_params: FlashInferFusedAllReduceParams,
)
Source code in vllm/compilation/collective_fusion.py
def __init__(
    self,
    epsilon: float,
    dtype: torch.dtype,
    device: str,
    allreduce_params: FlashInferFusedAllReduceParams,
):
    super().__init__(dtype, device)
    self.epsilon = epsilon
    self.allreduce_params = allreduce_params

get_inputs

get_inputs()
Source code in vllm/compilation/collective_fusion.py
def get_inputs(self):
    input = torch.empty([4, 4], device=self.device, dtype=self.dtype)
    residual = torch.empty([4, 4], device=self.device, dtype=self.dtype)
    weight = torch.empty([4, 4], device=self.device, dtype=self.dtype)
    return [
        residual,
        input,
        weight,
    ]

register

register(pm_pass: PatternMatcherPass)
Source code in vllm/compilation/collective_fusion.py
def register(self, pm_pass: PatternMatcherPass):

    def pattern(residual: torch.Tensor, input: torch.Tensor,
                weight: torch.Tensor):
        allreduce_output = tensor_model_parallel_all_reduce(input)
        rms = auto_functionalized(
            RMS_ADD_OP,
            input=allreduce_output,
            residual=residual,
            weight=weight,
            epsilon=self.epsilon,
        )
        # input, residual
        return rms[1], rms[2]

    def replacement(residual: torch.Tensor, input: torch.Tensor,
                    weight: torch.Tensor):
        allreduce = auto_functionalized(
            flashinfer_trtllm_fused_allreduce_norm,
            allreduce_in=input,
            residual=residual,
            norm_out=None,
            quant_out=None,
            scale_out=None,
            rms_gamma=weight,
            rms_eps=self.epsilon,
            pattern_code=flashinfer_comm.AllReduceFusionPattern.
            kARResidualRMSNorm,
            **self.allreduce_params.get_trtllm_fused_allreduce_kwargs(),
        )
        # allreduce_in, residual
        return allreduce[1], allreduce[2]

    pm.register_replacement(pattern, replacement, self.get_inputs(),
                            pm.fwd_only, pm_pass)

AllReduceFusedAddRMSNormStaticQuantFP8Pattern

Bases: BasePattern

This pattern replaces the allreduce + rms norm (with residual) + static fp8 quant with fused flashinfer implementation. Applies to o_proj + rmsnorm after attn + quant and mlp + rmsnorm + quant before attn.

Source code in vllm/compilation/collective_fusion.py
class AllReduceFusedAddRMSNormStaticQuantFP8Pattern(BasePattern):
    """
    This pattern replaces the allreduce + rms norm (with residual)
    + static fp8 quant with fused flashinfer implementation.
    Applies to o_proj + rmsnorm after attn + quant and 
    mlp + rmsnorm + quant before attn.
    """

    def __init__(self, epsilon: float, dtype: torch.dtype, device: str,
                 allreduce_params: FlashInferFusedAllReduceParams):
        super().__init__(dtype, device)
        self.epsilon = epsilon
        self.allreduce_params = allreduce_params
        self.quant_dtype = torch.float8_e4m3fn

    def register(self, pm_pass: PatternMatcherPass):

        def get_inputs():
            input = torch.empty([4, 4], device=self.device, dtype=self.dtype)

            residual = torch.empty([4, 4],
                                   device=self.device,
                                   dtype=self.dtype)
            weight = torch.empty([4, 4], device=self.device, dtype=self.dtype)
            quant_result = torch.empty([4, 4],
                                       device=self.device,
                                       dtype=self.quant_dtype)
            scale = torch.empty([1, 1],
                                device=self.device,
                                dtype=torch.float32)

            return [
                quant_result,
                residual,
                input,
                weight,
                scale,
            ]

        def pattern(
            quant_result: torch.Tensor,
            residual: torch.Tensor,
            input: torch.Tensor,
            weight: torch.Tensor,
            scale: torch.Tensor,
        ):
            allreduce_output = tensor_model_parallel_all_reduce(input)

            fused_add_rmsnorm_out_tuple = \
            auto_functionalized(
                RMS_ADD_OP,
                input=allreduce_output,
                residual=residual,
                weight=weight,
                epsilon=self.epsilon)
            quant_out_tuple = auto_functionalized(
                STATIC_FP8_QUANT_OP,
                result=quant_result,
                input=fused_add_rmsnorm_out_tuple[1],
                scale=scale)

            # quant_out, allreduce_output
            return quant_out_tuple[1], fused_add_rmsnorm_out_tuple[2]

        def replacement(quant_result: torch.Tensor, residual: torch.Tensor,
                        input: torch.Tensor, weight: torch.Tensor,
                        scale: torch.Tensor):
            allreduce = auto_functionalized(
                flashinfer_trtllm_fused_allreduce_norm,
                allreduce_in=input,
                residual=residual,
                norm_out=None,
                quant_out=quant_result,
                scale_out=None,
                rms_gamma=weight,
                rms_eps=self.epsilon,
                pattern_code=flashinfer_comm.AllReduceFusionPattern.
                kARResidualRMSNormFP8Quant,  # we don't use norm_out afterwards
                scale_factor=scale,
                **self.allreduce_params.get_trtllm_fused_allreduce_kwargs(),
            )
            # # quant_out, rms_norm_residual
            return allreduce[4], allreduce[2]

        pm.register_replacement(pattern, replacement, get_inputs(),
                                pm.fwd_only, pm_pass)

allreduce_params instance-attribute

allreduce_params = allreduce_params

epsilon instance-attribute

epsilon = epsilon

quant_dtype instance-attribute

quant_dtype = float8_e4m3fn

__init__

__init__(
    epsilon: float,
    dtype: dtype,
    device: str,
    allreduce_params: FlashInferFusedAllReduceParams,
)
Source code in vllm/compilation/collective_fusion.py
def __init__(self, epsilon: float, dtype: torch.dtype, device: str,
             allreduce_params: FlashInferFusedAllReduceParams):
    super().__init__(dtype, device)
    self.epsilon = epsilon
    self.allreduce_params = allreduce_params
    self.quant_dtype = torch.float8_e4m3fn

register

register(pm_pass: PatternMatcherPass)
Source code in vllm/compilation/collective_fusion.py
def register(self, pm_pass: PatternMatcherPass):

    def get_inputs():
        input = torch.empty([4, 4], device=self.device, dtype=self.dtype)

        residual = torch.empty([4, 4],
                               device=self.device,
                               dtype=self.dtype)
        weight = torch.empty([4, 4], device=self.device, dtype=self.dtype)
        quant_result = torch.empty([4, 4],
                                   device=self.device,
                                   dtype=self.quant_dtype)
        scale = torch.empty([1, 1],
                            device=self.device,
                            dtype=torch.float32)

        return [
            quant_result,
            residual,
            input,
            weight,
            scale,
        ]

    def pattern(
        quant_result: torch.Tensor,
        residual: torch.Tensor,
        input: torch.Tensor,
        weight: torch.Tensor,
        scale: torch.Tensor,
    ):
        allreduce_output = tensor_model_parallel_all_reduce(input)

        fused_add_rmsnorm_out_tuple = \
        auto_functionalized(
            RMS_ADD_OP,
            input=allreduce_output,
            residual=residual,
            weight=weight,
            epsilon=self.epsilon)
        quant_out_tuple = auto_functionalized(
            STATIC_FP8_QUANT_OP,
            result=quant_result,
            input=fused_add_rmsnorm_out_tuple[1],
            scale=scale)

        # quant_out, allreduce_output
        return quant_out_tuple[1], fused_add_rmsnorm_out_tuple[2]

    def replacement(quant_result: torch.Tensor, residual: torch.Tensor,
                    input: torch.Tensor, weight: torch.Tensor,
                    scale: torch.Tensor):
        allreduce = auto_functionalized(
            flashinfer_trtllm_fused_allreduce_norm,
            allreduce_in=input,
            residual=residual,
            norm_out=None,
            quant_out=quant_result,
            scale_out=None,
            rms_gamma=weight,
            rms_eps=self.epsilon,
            pattern_code=flashinfer_comm.AllReduceFusionPattern.
            kARResidualRMSNormFP8Quant,  # we don't use norm_out afterwards
            scale_factor=scale,
            **self.allreduce_params.get_trtllm_fused_allreduce_kwargs(),
        )
        # # quant_out, rms_norm_residual
        return allreduce[4], allreduce[2]

    pm.register_replacement(pattern, replacement, get_inputs(),
                            pm.fwd_only, pm_pass)

AllReduceFusedAddRMSNormStaticQuantNVFP4Pattern

Bases: BasePattern

This pattern replaces the allreduce + rms norm (with residual) + static nvfp4 quant with fused flashinfer implementation. Applies to o_proj + rmsnorm after attn + quant and mlp + rmsnorm + quant before attn.

Source code in vllm/compilation/collective_fusion.py
class AllReduceFusedAddRMSNormStaticQuantNVFP4Pattern(BasePattern):
    """
    This pattern replaces the allreduce + rms norm (with residual)
    + static nvfp4 quant with fused flashinfer implementation.
    Applies to o_proj + rmsnorm after attn + quant and 
    mlp + rmsnorm + quant before attn.
    """

    def __init__(self, epsilon: float, dtype: torch.dtype, device: str,
                 allreduce_params: FlashInferFusedAllReduceParams):
        super().__init__(dtype, device)
        self.epsilon = epsilon
        self.allreduce_params = allreduce_params

    def register(self, pm_pass: PatternMatcherPass):

        def get_inputs():
            input = torch.empty([16, 16], device=self.device, dtype=self.dtype)

            residual = torch.empty([16, 16],
                                   device=self.device,
                                   dtype=self.dtype)
            weight = torch.empty([16, 16],
                                 device=self.device,
                                 dtype=self.dtype)
            quant_result = torch.empty((16, 8),
                                       device=self.device,
                                       dtype=torch.uint8)
            input_global_scale = torch.empty([1, 1],
                                             device=self.device,
                                             dtype=torch.float32)
            output_scale = torch.empty([128, 4],
                                       device=self.device,
                                       dtype=torch.int32)

            return [
                quant_result,
                residual,
                input,
                output_scale,
                weight,
                input_global_scale,
            ]

        def pattern(quant_result: torch.Tensor, residual: torch.Tensor,
                    input: torch.Tensor, output_scale: torch.Tensor,
                    weight: torch.Tensor, input_global_scale: torch.Tensor):
            allreduce_output = tensor_model_parallel_all_reduce(input)

            fused_add_rmsnorm_out_tuple = \
            auto_functionalized(
                RMS_ADD_OP,
                input=allreduce_output,
                residual=residual,
                weight=weight,
                epsilon=self.epsilon)
            quant_out_tuple = auto_functionalized(
                STATIC_FP4_QUANT_OP,
                output=quant_result,
                input=fused_add_rmsnorm_out_tuple[1],
                output_scale=output_scale,
                input_scale=input_global_scale)

            # quant_out, allreduce_output, output_scale
            return quant_out_tuple[1], fused_add_rmsnorm_out_tuple[
                2], quant_out_tuple[2]

        def replacement(quant_result: torch.Tensor, residual: torch.Tensor,
                        input: torch.Tensor, output_scale: torch.Tensor,
                        weight: torch.Tensor,
                        input_global_scale: torch.Tensor):
            allreduce = auto_functionalized(
                flashinfer_trtllm_fused_allreduce_norm,
                allreduce_in=input,
                residual=residual,
                norm_out=None,
                quant_out=quant_result,
                scale_out=output_scale,
                rms_gamma=weight,
                rms_eps=self.epsilon,
                pattern_code=flashinfer_comm.AllReduceFusionPattern.
                kARResidualRMSNormFP4Quant,  # we don't use norm_out afterwards
                scale_factor=input_global_scale,
                **self.allreduce_params.get_trtllm_fused_allreduce_kwargs(),
            )
            # quant_out, rms_norm_residual, output_scale
            return allreduce[4], allreduce[2], allreduce[5]

        pm.register_replacement(pattern, replacement, get_inputs(),
                                pm.fwd_only, pm_pass)

allreduce_params instance-attribute

allreduce_params = allreduce_params

epsilon instance-attribute

epsilon = epsilon

__init__

__init__(
    epsilon: float,
    dtype: dtype,
    device: str,
    allreduce_params: FlashInferFusedAllReduceParams,
)
Source code in vllm/compilation/collective_fusion.py
def __init__(self, epsilon: float, dtype: torch.dtype, device: str,
             allreduce_params: FlashInferFusedAllReduceParams):
    super().__init__(dtype, device)
    self.epsilon = epsilon
    self.allreduce_params = allreduce_params

register

register(pm_pass: PatternMatcherPass)
Source code in vllm/compilation/collective_fusion.py
def register(self, pm_pass: PatternMatcherPass):

    def get_inputs():
        input = torch.empty([16, 16], device=self.device, dtype=self.dtype)

        residual = torch.empty([16, 16],
                               device=self.device,
                               dtype=self.dtype)
        weight = torch.empty([16, 16],
                             device=self.device,
                             dtype=self.dtype)
        quant_result = torch.empty((16, 8),
                                   device=self.device,
                                   dtype=torch.uint8)
        input_global_scale = torch.empty([1, 1],
                                         device=self.device,
                                         dtype=torch.float32)
        output_scale = torch.empty([128, 4],
                                   device=self.device,
                                   dtype=torch.int32)

        return [
            quant_result,
            residual,
            input,
            output_scale,
            weight,
            input_global_scale,
        ]

    def pattern(quant_result: torch.Tensor, residual: torch.Tensor,
                input: torch.Tensor, output_scale: torch.Tensor,
                weight: torch.Tensor, input_global_scale: torch.Tensor):
        allreduce_output = tensor_model_parallel_all_reduce(input)

        fused_add_rmsnorm_out_tuple = \
        auto_functionalized(
            RMS_ADD_OP,
            input=allreduce_output,
            residual=residual,
            weight=weight,
            epsilon=self.epsilon)
        quant_out_tuple = auto_functionalized(
            STATIC_FP4_QUANT_OP,
            output=quant_result,
            input=fused_add_rmsnorm_out_tuple[1],
            output_scale=output_scale,
            input_scale=input_global_scale)

        # quant_out, allreduce_output, output_scale
        return quant_out_tuple[1], fused_add_rmsnorm_out_tuple[
            2], quant_out_tuple[2]

    def replacement(quant_result: torch.Tensor, residual: torch.Tensor,
                    input: torch.Tensor, output_scale: torch.Tensor,
                    weight: torch.Tensor,
                    input_global_scale: torch.Tensor):
        allreduce = auto_functionalized(
            flashinfer_trtllm_fused_allreduce_norm,
            allreduce_in=input,
            residual=residual,
            norm_out=None,
            quant_out=quant_result,
            scale_out=output_scale,
            rms_gamma=weight,
            rms_eps=self.epsilon,
            pattern_code=flashinfer_comm.AllReduceFusionPattern.
            kARResidualRMSNormFP4Quant,  # we don't use norm_out afterwards
            scale_factor=input_global_scale,
            **self.allreduce_params.get_trtllm_fused_allreduce_kwargs(),
        )
        # quant_out, rms_norm_residual, output_scale
        return allreduce[4], allreduce[2], allreduce[5]

    pm.register_replacement(pattern, replacement, get_inputs(),
                            pm.fwd_only, pm_pass)

AllReduceFusedRMSNormStaticQuantFP8Pattern

Bases: BasePattern

This pattern replaces the allreduce + rms norm (without residual) + static fp8 quant with fused flashinfer implementation. Applies to allreduce + rmsnorm + quant before attn in the first Transformer block.

Source code in vllm/compilation/collective_fusion.py
class AllReduceFusedRMSNormStaticQuantFP8Pattern(BasePattern):
    """
    This pattern replaces the allreduce + rms norm (without residual) 
    + static fp8 quant with fused flashinfer implementation.
    Applies to allreduce + rmsnorm + quant before attn 
    in the first Transformer block.
    """

    def __init__(self, epsilon: float, dtype: torch.dtype, device: str,
                 allreduce_params: FlashInferFusedAllReduceParams):
        super().__init__(dtype, device)
        self.epsilon = epsilon
        self.allreduce_params = allreduce_params
        self.quant_dtype = torch.float8_e4m3fn

    def register(self, pm_pass: PatternMatcherPass):

        def get_inputs():
            input = torch.zeros([1, 8, 4],
                                device=self.device,
                                dtype=self.dtype)
            rmsnorm_result = torch.empty([1, 8, 4],
                                         device=self.device,
                                         dtype=self.dtype)
            quant_result = torch.empty([1, 8, 4],
                                       device=self.device,
                                       dtype=self.quant_dtype)
            weight = torch.empty([4], device=self.device, dtype=self.dtype)
            scale = torch.tensor(1.0, device=self.device, dtype=torch.float32)
            return [input, rmsnorm_result, quant_result, weight, scale]

        def pattern(
            input: torch.Tensor,
            rmsnorm_result: torch.Tensor,
            quant_result: torch.Tensor,
            weight: torch.Tensor,
            scale: torch.Tensor,
        ):
            all_reduce = tensor_model_parallel_all_reduce(input)
            rmsnorm_out_tuple = auto_functionalized(RMS_OP,
                                                    result=rmsnorm_result,
                                                    input=all_reduce,
                                                    weight=weight,
                                                    epsilon=self.epsilon)

            quant_out_tuple = auto_functionalized(STATIC_FP8_QUANT_OP,
                                                  result=quant_result,
                                                  input=rmsnorm_out_tuple[1],
                                                  scale=scale)

            # quant_out, allreduce_output
            return quant_out_tuple[1], all_reduce

        def replacement(input: torch.Tensor, result_rms: torch.Tensor,
                        quant_result: torch.Tensor, weight: torch.Tensor,
                        scale: torch.Tensor):
            residual = torch.zeros_like(input)
            allreduce = auto_functionalized(
                flashinfer_trtllm_fused_allreduce_norm,
                allreduce_in=input,
                residual=residual,
                norm_out=result_rms,
                quant_out=quant_result,
                scale_out=None,
                rms_gamma=weight,
                rms_eps=self.epsilon,
                pattern_code=flashinfer_comm.AllReduceFusionPattern.
                kARResidualRMSNormFP8Quant,  # we don't use norm_out afterwards
                scale_factor=scale,
                **self.allreduce_params.get_trtllm_fused_allreduce_kwargs(),
            )

            # quant_out, allreduce_output
            return allreduce[4], allreduce[1]

        pm.register_replacement(pattern, replacement, get_inputs(),
                                pm.fwd_only, pm_pass)

allreduce_params instance-attribute

allreduce_params = allreduce_params

epsilon instance-attribute

epsilon = epsilon

quant_dtype instance-attribute

quant_dtype = float8_e4m3fn

__init__

__init__(
    epsilon: float,
    dtype: dtype,
    device: str,
    allreduce_params: FlashInferFusedAllReduceParams,
)
Source code in vllm/compilation/collective_fusion.py
def __init__(self, epsilon: float, dtype: torch.dtype, device: str,
             allreduce_params: FlashInferFusedAllReduceParams):
    super().__init__(dtype, device)
    self.epsilon = epsilon
    self.allreduce_params = allreduce_params
    self.quant_dtype = torch.float8_e4m3fn

register

register(pm_pass: PatternMatcherPass)
Source code in vllm/compilation/collective_fusion.py
def register(self, pm_pass: PatternMatcherPass):

    def get_inputs():
        input = torch.zeros([1, 8, 4],
                            device=self.device,
                            dtype=self.dtype)
        rmsnorm_result = torch.empty([1, 8, 4],
                                     device=self.device,
                                     dtype=self.dtype)
        quant_result = torch.empty([1, 8, 4],
                                   device=self.device,
                                   dtype=self.quant_dtype)
        weight = torch.empty([4], device=self.device, dtype=self.dtype)
        scale = torch.tensor(1.0, device=self.device, dtype=torch.float32)
        return [input, rmsnorm_result, quant_result, weight, scale]

    def pattern(
        input: torch.Tensor,
        rmsnorm_result: torch.Tensor,
        quant_result: torch.Tensor,
        weight: torch.Tensor,
        scale: torch.Tensor,
    ):
        all_reduce = tensor_model_parallel_all_reduce(input)
        rmsnorm_out_tuple = auto_functionalized(RMS_OP,
                                                result=rmsnorm_result,
                                                input=all_reduce,
                                                weight=weight,
                                                epsilon=self.epsilon)

        quant_out_tuple = auto_functionalized(STATIC_FP8_QUANT_OP,
                                              result=quant_result,
                                              input=rmsnorm_out_tuple[1],
                                              scale=scale)

        # quant_out, allreduce_output
        return quant_out_tuple[1], all_reduce

    def replacement(input: torch.Tensor, result_rms: torch.Tensor,
                    quant_result: torch.Tensor, weight: torch.Tensor,
                    scale: torch.Tensor):
        residual = torch.zeros_like(input)
        allreduce = auto_functionalized(
            flashinfer_trtllm_fused_allreduce_norm,
            allreduce_in=input,
            residual=residual,
            norm_out=result_rms,
            quant_out=quant_result,
            scale_out=None,
            rms_gamma=weight,
            rms_eps=self.epsilon,
            pattern_code=flashinfer_comm.AllReduceFusionPattern.
            kARResidualRMSNormFP8Quant,  # we don't use norm_out afterwards
            scale_factor=scale,
            **self.allreduce_params.get_trtllm_fused_allreduce_kwargs(),
        )

        # quant_out, allreduce_output
        return allreduce[4], allreduce[1]

    pm.register_replacement(pattern, replacement, get_inputs(),
                            pm.fwd_only, pm_pass)

AllReduceFusedRMSNormStaticQuantNVFP4Pattern

Bases: BasePattern

This pattern replaces the allreduce + rms norm (without residual) + static nvfp4 quant with fused flashinfer implementation. Applies to allreduce + rmsnorm + quant before attn in the first Transformer block.

Source code in vllm/compilation/collective_fusion.py
class AllReduceFusedRMSNormStaticQuantNVFP4Pattern(BasePattern):
    """
    This pattern replaces the allreduce + rms norm (without residual) 
    + static nvfp4 quant with fused flashinfer implementation.
    Applies to allreduce + rmsnorm + quant before attn 
    in the first Transformer block.
    """

    def __init__(self, epsilon: float, dtype: torch.dtype, device: str,
                 allreduce_params: FlashInferFusedAllReduceParams):
        super().__init__(dtype, device)
        self.epsilon = epsilon
        self.allreduce_params = allreduce_params

    def register(self, pm_pass: PatternMatcherPass):

        def get_inputs():
            input = torch.empty([1, 16, 16],
                                device=self.device,
                                dtype=self.dtype)

            rmsnorm_result = torch.empty([1, 16, 16],
                                         device=self.device,
                                         dtype=self.dtype)
            quant_result = torch.empty((16, 8),
                                       device=self.device,
                                       dtype=torch.uint8)
            input_global_scale = torch.empty([1, 1],
                                             device=self.device,
                                             dtype=torch.float32)
            weight = torch.empty([16], device=self.device, dtype=self.dtype)
            output_scale = torch.empty([128, 4],
                                       device=self.device,
                                       dtype=torch.int32)

            return [
                input, rmsnorm_result, quant_result, weight,
                input_global_scale, output_scale
            ]

        def pattern(
            input: torch.Tensor,
            rmsnorm_result: torch.Tensor,
            quant_result: torch.Tensor,
            weight: torch.Tensor,
            input_global_scale: torch.Tensor,
            output_scale: torch.Tensor,
        ):
            all_reduce = tensor_model_parallel_all_reduce(input)
            rmsnorm_out_tuple = auto_functionalized(RMS_OP,
                                                    result=rmsnorm_result,
                                                    input=all_reduce,
                                                    weight=weight,
                                                    epsilon=self.epsilon)

            quant_out_tuple = auto_functionalized(
                STATIC_FP4_QUANT_OP,
                output=quant_result,
                input=rmsnorm_out_tuple[1],
                output_scale=output_scale,
                input_scale=input_global_scale)

            # quant_out, allreduce_output, output_scale
            return quant_out_tuple[1], all_reduce, quant_out_tuple[2]

        def replacement(input: torch.Tensor, result_rms: torch.Tensor,
                        quant_result: torch.Tensor, weight: torch.Tensor,
                        input_global_scale: torch.Tensor,
                        output_scale: torch.Tensor):
            residual = torch.zeros_like(input)
            allreduce = auto_functionalized(
                flashinfer_trtllm_fused_allreduce_norm,
                allreduce_in=input,
                residual=residual,
                norm_out=result_rms,
                quant_out=quant_result,
                scale_out=output_scale,
                rms_gamma=weight,
                rms_eps=self.epsilon,
                pattern_code=flashinfer_comm.AllReduceFusionPattern.
                kARResidualRMSNormFP4Quant,  # we don't use norm_out afterwards
                scale_factor=input_global_scale,
                **self.allreduce_params.get_trtllm_fused_allreduce_kwargs(),
            )

            # quant_out, allreduce_output, output_scale
            return allreduce[4], allreduce[1], allreduce[5]

        pm.register_replacement(pattern, replacement, get_inputs(),
                                pm.fwd_only, pm_pass)

allreduce_params instance-attribute

allreduce_params = allreduce_params

epsilon instance-attribute

epsilon = epsilon

__init__

__init__(
    epsilon: float,
    dtype: dtype,
    device: str,
    allreduce_params: FlashInferFusedAllReduceParams,
)
Source code in vllm/compilation/collective_fusion.py
def __init__(self, epsilon: float, dtype: torch.dtype, device: str,
             allreduce_params: FlashInferFusedAllReduceParams):
    super().__init__(dtype, device)
    self.epsilon = epsilon
    self.allreduce_params = allreduce_params

register

register(pm_pass: PatternMatcherPass)
Source code in vllm/compilation/collective_fusion.py
def register(self, pm_pass: PatternMatcherPass):

    def get_inputs():
        input = torch.empty([1, 16, 16],
                            device=self.device,
                            dtype=self.dtype)

        rmsnorm_result = torch.empty([1, 16, 16],
                                     device=self.device,
                                     dtype=self.dtype)
        quant_result = torch.empty((16, 8),
                                   device=self.device,
                                   dtype=torch.uint8)
        input_global_scale = torch.empty([1, 1],
                                         device=self.device,
                                         dtype=torch.float32)
        weight = torch.empty([16], device=self.device, dtype=self.dtype)
        output_scale = torch.empty([128, 4],
                                   device=self.device,
                                   dtype=torch.int32)

        return [
            input, rmsnorm_result, quant_result, weight,
            input_global_scale, output_scale
        ]

    def pattern(
        input: torch.Tensor,
        rmsnorm_result: torch.Tensor,
        quant_result: torch.Tensor,
        weight: torch.Tensor,
        input_global_scale: torch.Tensor,
        output_scale: torch.Tensor,
    ):
        all_reduce = tensor_model_parallel_all_reduce(input)
        rmsnorm_out_tuple = auto_functionalized(RMS_OP,
                                                result=rmsnorm_result,
                                                input=all_reduce,
                                                weight=weight,
                                                epsilon=self.epsilon)

        quant_out_tuple = auto_functionalized(
            STATIC_FP4_QUANT_OP,
            output=quant_result,
            input=rmsnorm_out_tuple[1],
            output_scale=output_scale,
            input_scale=input_global_scale)

        # quant_out, allreduce_output, output_scale
        return quant_out_tuple[1], all_reduce, quant_out_tuple[2]

    def replacement(input: torch.Tensor, result_rms: torch.Tensor,
                    quant_result: torch.Tensor, weight: torch.Tensor,
                    input_global_scale: torch.Tensor,
                    output_scale: torch.Tensor):
        residual = torch.zeros_like(input)
        allreduce = auto_functionalized(
            flashinfer_trtllm_fused_allreduce_norm,
            allreduce_in=input,
            residual=residual,
            norm_out=result_rms,
            quant_out=quant_result,
            scale_out=output_scale,
            rms_gamma=weight,
            rms_eps=self.epsilon,
            pattern_code=flashinfer_comm.AllReduceFusionPattern.
            kARResidualRMSNormFP4Quant,  # we don't use norm_out afterwards
            scale_factor=input_global_scale,
            **self.allreduce_params.get_trtllm_fused_allreduce_kwargs(),
        )

        # quant_out, allreduce_output, output_scale
        return allreduce[4], allreduce[1], allreduce[5]

    pm.register_replacement(pattern, replacement, get_inputs(),
                            pm.fwd_only, pm_pass)

AllReduceFusionPass

Bases: VllmInductorPass

Source code in vllm/compilation/collective_fusion.py
class AllReduceFusionPass(VllmInductorPass):

    def __init__(self, config: VllmConfig):
        super().__init__(config)
        self.disabled = True
        self.tp_size = get_tensor_model_parallel_world_size()
        if self.tp_size <= 1:
            return
        self.patterns: PatternMatcherPass = PatternMatcherPass(
            pass_name="all_reduce_fusion_pass")
        if config.model_config is None:
            return
        self.hidden_dim = config.model_config.get_hidden_size()
        self.group = get_tp_group().device_group
        rank = get_tensor_model_parallel_rank()
        use_fp32_lamport = self.model_dtype == torch.float32
        if flashinfer_comm is None:
            logger.warning(
                "Flashinfer is not installed or comm module not found, "
                "skipping allreduce fusion pass")
            return
        # Check if the world size is supported
        if self.tp_size not in _FI_MAX_SIZES:
            logger.warning(
                "Flashinfer allreduce fusion is not "
                "supported for world size %s",
                self.tp_size,
            )
            return
        max_num_token = min(
            _FI_MAX_SIZES.get(self.tp_size, _DEFAULT_FI_MAX_SIZE) //
            (self.hidden_dim * self.tp_size * (4 if use_fp32_lamport else 2)),
            config.compilation_config.pass_config.
            fi_allreduce_fusion_max_token_num)
        self.ipc_handles, workspace_tensor = (
            flashinfer_comm.trtllm_create_ipc_workspace_for_all_reduce_fusion(
                tp_rank=rank,
                tp_size=self.tp_size,
                max_token_num=max_num_token,
                hidden_dim=self.hidden_dim,
                group=self.group,
                use_fp32_lamport=use_fp32_lamport,
            ))

        global _FI_WORKSPACE_TENSOR
        _FI_WORKSPACE_TENSOR = workspace_tensor
        self.allreduce_params = FlashInferFusedAllReduceParams(
            rank=rank,
            world_size=self.tp_size,
            use_fp32_lamport=use_fp32_lamport,
            max_token_num=max_num_token,
            # fuse rms norm static fp8 quant fused op
            # in fallback path, when we don't use flashinfer
            fuse_rms_quant=config.compilation_config.pass_config.enable_fusion)

        for epsilon in [1e-5, 1e-6]:
            AllReduceFusedRMSNormStaticQuantFP8Pattern(
                epsilon,
                self.model_dtype,
                self.device,
                self.allreduce_params,
            ).register(self.patterns)
            AllReduceFusedAddRMSNormStaticQuantFP8Pattern(
                epsilon,
                self.model_dtype,
                self.device,
                self.allreduce_params,
            ).register(self.patterns)
            if current_platform.has_device_capability(100):
                AllReduceFusedRMSNormStaticQuantNVFP4Pattern(
                    epsilon,
                    self.model_dtype,
                    self.device,
                    self.allreduce_params,
                ).register(self.patterns)
                AllReduceFusedAddRMSNormStaticQuantNVFP4Pattern(
                    epsilon,
                    self.model_dtype,
                    self.device,
                    self.allreduce_params,
                ).register(self.patterns)
            AllReduceRMSNormPattern(
                epsilon,
                self.model_dtype,
                self.device,
                self.allreduce_params,
            ).register(self.patterns)
            AllReduceFusedAddRMSNormPattern(
                epsilon,
                self.model_dtype,
                self.device,
                self.allreduce_params,
            ).register(self.patterns)

            # WARNING: This is a hack to clear the pattern matcher cache
            # and allow multiple values of epsilon.
            torch._inductor.pattern_matcher._seen_patterns.clear()

        self.disabled = False

    def __call__(self, graph: fx.Graph):
        if self.disabled:
            return
        self.begin()
        self.dump_graph(graph, "before_all_reduce_fusion_pass")
        count = self.patterns.apply(graph)
        logger.debug("Replaced %s patterns", count)
        self.dump_graph(graph, "after_all_reduce_fusion_pass")
        self.end_and_log()

    def __del__(self):
        if self.disabled:
            return
        if flashinfer_comm is not None:
            flashinfer_comm.trtllm_destroy_ipc_workspace_for_all_reduce(
                self.ipc_handles, self.group)

allreduce_params instance-attribute

allreduce_params = FlashInferFusedAllReduceParams(
    rank=rank,
    world_size=tp_size,
    use_fp32_lamport=use_fp32_lamport,
    max_token_num=max_num_token,
    fuse_rms_quant=enable_fusion,
)

disabled instance-attribute

disabled = False

group instance-attribute

group = device_group

hidden_dim instance-attribute

hidden_dim = get_hidden_size()

patterns instance-attribute

patterns: PatternMatcherPass = PatternMatcherPass(
    pass_name="all_reduce_fusion_pass"
)

tp_size instance-attribute

__call__

__call__(graph: Graph)
Source code in vllm/compilation/collective_fusion.py
def __call__(self, graph: fx.Graph):
    if self.disabled:
        return
    self.begin()
    self.dump_graph(graph, "before_all_reduce_fusion_pass")
    count = self.patterns.apply(graph)
    logger.debug("Replaced %s patterns", count)
    self.dump_graph(graph, "after_all_reduce_fusion_pass")
    self.end_and_log()

__del__

__del__()
Source code in vllm/compilation/collective_fusion.py
def __del__(self):
    if self.disabled:
        return
    if flashinfer_comm is not None:
        flashinfer_comm.trtllm_destroy_ipc_workspace_for_all_reduce(
            self.ipc_handles, self.group)

__init__

__init__(config: VllmConfig)
Source code in vllm/compilation/collective_fusion.py
def __init__(self, config: VllmConfig):
    super().__init__(config)
    self.disabled = True
    self.tp_size = get_tensor_model_parallel_world_size()
    if self.tp_size <= 1:
        return
    self.patterns: PatternMatcherPass = PatternMatcherPass(
        pass_name="all_reduce_fusion_pass")
    if config.model_config is None:
        return
    self.hidden_dim = config.model_config.get_hidden_size()
    self.group = get_tp_group().device_group
    rank = get_tensor_model_parallel_rank()
    use_fp32_lamport = self.model_dtype == torch.float32
    if flashinfer_comm is None:
        logger.warning(
            "Flashinfer is not installed or comm module not found, "
            "skipping allreduce fusion pass")
        return
    # Check if the world size is supported
    if self.tp_size not in _FI_MAX_SIZES:
        logger.warning(
            "Flashinfer allreduce fusion is not "
            "supported for world size %s",
            self.tp_size,
        )
        return
    max_num_token = min(
        _FI_MAX_SIZES.get(self.tp_size, _DEFAULT_FI_MAX_SIZE) //
        (self.hidden_dim * self.tp_size * (4 if use_fp32_lamport else 2)),
        config.compilation_config.pass_config.
        fi_allreduce_fusion_max_token_num)
    self.ipc_handles, workspace_tensor = (
        flashinfer_comm.trtllm_create_ipc_workspace_for_all_reduce_fusion(
            tp_rank=rank,
            tp_size=self.tp_size,
            max_token_num=max_num_token,
            hidden_dim=self.hidden_dim,
            group=self.group,
            use_fp32_lamport=use_fp32_lamport,
        ))

    global _FI_WORKSPACE_TENSOR
    _FI_WORKSPACE_TENSOR = workspace_tensor
    self.allreduce_params = FlashInferFusedAllReduceParams(
        rank=rank,
        world_size=self.tp_size,
        use_fp32_lamport=use_fp32_lamport,
        max_token_num=max_num_token,
        # fuse rms norm static fp8 quant fused op
        # in fallback path, when we don't use flashinfer
        fuse_rms_quant=config.compilation_config.pass_config.enable_fusion)

    for epsilon in [1e-5, 1e-6]:
        AllReduceFusedRMSNormStaticQuantFP8Pattern(
            epsilon,
            self.model_dtype,
            self.device,
            self.allreduce_params,
        ).register(self.patterns)
        AllReduceFusedAddRMSNormStaticQuantFP8Pattern(
            epsilon,
            self.model_dtype,
            self.device,
            self.allreduce_params,
        ).register(self.patterns)
        if current_platform.has_device_capability(100):
            AllReduceFusedRMSNormStaticQuantNVFP4Pattern(
                epsilon,
                self.model_dtype,
                self.device,
                self.allreduce_params,
            ).register(self.patterns)
            AllReduceFusedAddRMSNormStaticQuantNVFP4Pattern(
                epsilon,
                self.model_dtype,
                self.device,
                self.allreduce_params,
            ).register(self.patterns)
        AllReduceRMSNormPattern(
            epsilon,
            self.model_dtype,
            self.device,
            self.allreduce_params,
        ).register(self.patterns)
        AllReduceFusedAddRMSNormPattern(
            epsilon,
            self.model_dtype,
            self.device,
            self.allreduce_params,
        ).register(self.patterns)

        # WARNING: This is a hack to clear the pattern matcher cache
        # and allow multiple values of epsilon.
        torch._inductor.pattern_matcher._seen_patterns.clear()

    self.disabled = False

AllReduceRMSNormPattern

Bases: BasePattern

This pattern replaces the allreduce + rms norm (without residual) with fused flashinfer implementation. Applies to allreduce + rmsnorm before attn in the first Transformer block.

Source code in vllm/compilation/collective_fusion.py
class AllReduceRMSNormPattern(BasePattern):
    """
    This pattern replaces the allreduce + rms norm (without residual) 
    with fused flashinfer implementation.
    Applies to allreduce + rmsnorm before attn in the first Transformer block.
    """

    def __init__(
        self,
        epsilon: float,
        dtype: torch.dtype,
        device: str,
        allreduce_params: FlashInferFusedAllReduceParams,
    ):
        super().__init__(dtype, device)
        self.epsilon = epsilon
        self.allreduce_params = allreduce_params

    def get_inputs(self):
        input = torch.empty([1, 8, 4], device=self.device, dtype=self.dtype)
        rms_result = torch.empty([1, 8, 4],
                                 device=self.device,
                                 dtype=self.dtype)
        weight = torch.empty([4], device=self.device, dtype=self.dtype)

        return [input, rms_result, weight]

    def register(self, pm_pass: PatternMatcherPass):

        def pattern(input: torch.Tensor, rms_result: torch.Tensor,
                    weight: torch.Tensor):
            allreduce_output = tensor_model_parallel_all_reduce(input)
            rms = auto_functionalized(
                RMS_OP,
                result=rms_result,
                input=allreduce_output,
                weight=weight,
                epsilon=self.epsilon,
            )
            # rms_result, allreduce_output
            return rms[1], allreduce_output

        def replacement(input: torch.Tensor, rms_result: torch.Tensor,
                        weight: torch.Tensor):
            residual = torch.zeros_like(input)
            allreduce = auto_functionalized(
                flashinfer_trtllm_fused_allreduce_norm,
                allreduce_in=input,
                residual=residual,
                norm_out=rms_result,
                quant_out=None,
                scale_out=None,
                rms_gamma=weight,
                rms_eps=self.epsilon,
                pattern_code=flashinfer_comm.AllReduceFusionPattern.
                kARResidualRMSNorm,
                **self.allreduce_params.get_trtllm_fused_allreduce_kwargs(),
            )
            # rms_result, allreduce_in
            return allreduce[3], allreduce[1]

        pm.register_replacement(pattern, replacement, self.get_inputs(),
                                pm.fwd_only, pm_pass)

allreduce_params instance-attribute

allreduce_params = allreduce_params

epsilon instance-attribute

epsilon = epsilon

__init__

__init__(
    epsilon: float,
    dtype: dtype,
    device: str,
    allreduce_params: FlashInferFusedAllReduceParams,
)
Source code in vllm/compilation/collective_fusion.py
def __init__(
    self,
    epsilon: float,
    dtype: torch.dtype,
    device: str,
    allreduce_params: FlashInferFusedAllReduceParams,
):
    super().__init__(dtype, device)
    self.epsilon = epsilon
    self.allreduce_params = allreduce_params

get_inputs

get_inputs()
Source code in vllm/compilation/collective_fusion.py
def get_inputs(self):
    input = torch.empty([1, 8, 4], device=self.device, dtype=self.dtype)
    rms_result = torch.empty([1, 8, 4],
                             device=self.device,
                             dtype=self.dtype)
    weight = torch.empty([4], device=self.device, dtype=self.dtype)

    return [input, rms_result, weight]

register

register(pm_pass: PatternMatcherPass)
Source code in vllm/compilation/collective_fusion.py
def register(self, pm_pass: PatternMatcherPass):

    def pattern(input: torch.Tensor, rms_result: torch.Tensor,
                weight: torch.Tensor):
        allreduce_output = tensor_model_parallel_all_reduce(input)
        rms = auto_functionalized(
            RMS_OP,
            result=rms_result,
            input=allreduce_output,
            weight=weight,
            epsilon=self.epsilon,
        )
        # rms_result, allreduce_output
        return rms[1], allreduce_output

    def replacement(input: torch.Tensor, rms_result: torch.Tensor,
                    weight: torch.Tensor):
        residual = torch.zeros_like(input)
        allreduce = auto_functionalized(
            flashinfer_trtllm_fused_allreduce_norm,
            allreduce_in=input,
            residual=residual,
            norm_out=rms_result,
            quant_out=None,
            scale_out=None,
            rms_gamma=weight,
            rms_eps=self.epsilon,
            pattern_code=flashinfer_comm.AllReduceFusionPattern.
            kARResidualRMSNorm,
            **self.allreduce_params.get_trtllm_fused_allreduce_kwargs(),
        )
        # rms_result, allreduce_in
        return allreduce[3], allreduce[1]

    pm.register_replacement(pattern, replacement, self.get_inputs(),
                            pm.fwd_only, pm_pass)

AsyncTPPass

Bases: VllmInductorPass

Source code in vllm/compilation/collective_fusion.py
class AsyncTPPass(VllmInductorPass):

    def __init__(self, config: VllmConfig):
        super().__init__(config)

        # Enable symmetric memory for the TP process group
        enable_symm_mem_for_group(get_tp_group().device_group.group_name)
        self.patterns: PatternMatcherPass = PatternMatcherPass(
            pass_name="async_tp_pass")
        GEMMReduceScatterPattern(self.model_dtype,
                                 self.device).register(self.patterns)

        AllGatherGEMMPattern(self.model_dtype,
                             self.device).register(self.patterns)

        # These fusions are enabled only for bfloat16 models because
        # `scaled_mm` or `cutlass_scaled_mm` with per-token (row-wise) scaling
        # only supports bfloat16 as the output dtype.
        if self.model_dtype == torch.bfloat16:
            ScaledMMReduceScatterPattern(self.model_dtype,
                                         self.device).register(self.patterns)
            AllGatherScaledMMPattern(self.model_dtype,
                                     self.device).register(self.patterns)

            CutlassScaledMMReduceScatterPattern(
                self.model_dtype, self.device).register(self.patterns)
            AllGatherCutlassScaledMMPattern(
                self.model_dtype, self.device).register(self.patterns)

    def is_applicable_for_shape(self, shape: Optional[int]) -> bool:
        # only do replace for specific shapes
        tp_size = get_tensor_model_parallel_world_size()
        return shape is not None and shape % tp_size == 0

    def __call__(self, graph: fx.Graph):
        self.begin()
        self.dump_graph(graph, "before_async_tp_pass")
        count = self.patterns.apply(graph)
        logger.debug("Replaced %s patterns with async TP pass.", count)
        self.dump_graph(graph, "after_async_tp_pass")
        self.end_and_log()

patterns instance-attribute

patterns: PatternMatcherPass = PatternMatcherPass(
    pass_name="async_tp_pass"
)

__call__

__call__(graph: Graph)
Source code in vllm/compilation/collective_fusion.py
def __call__(self, graph: fx.Graph):
    self.begin()
    self.dump_graph(graph, "before_async_tp_pass")
    count = self.patterns.apply(graph)
    logger.debug("Replaced %s patterns with async TP pass.", count)
    self.dump_graph(graph, "after_async_tp_pass")
    self.end_and_log()

__init__

__init__(config: VllmConfig)
Source code in vllm/compilation/collective_fusion.py
def __init__(self, config: VllmConfig):
    super().__init__(config)

    # Enable symmetric memory for the TP process group
    enable_symm_mem_for_group(get_tp_group().device_group.group_name)
    self.patterns: PatternMatcherPass = PatternMatcherPass(
        pass_name="async_tp_pass")
    GEMMReduceScatterPattern(self.model_dtype,
                             self.device).register(self.patterns)

    AllGatherGEMMPattern(self.model_dtype,
                         self.device).register(self.patterns)

    # These fusions are enabled only for bfloat16 models because
    # `scaled_mm` or `cutlass_scaled_mm` with per-token (row-wise) scaling
    # only supports bfloat16 as the output dtype.
    if self.model_dtype == torch.bfloat16:
        ScaledMMReduceScatterPattern(self.model_dtype,
                                     self.device).register(self.patterns)
        AllGatherScaledMMPattern(self.model_dtype,
                                 self.device).register(self.patterns)

        CutlassScaledMMReduceScatterPattern(
            self.model_dtype, self.device).register(self.patterns)
        AllGatherCutlassScaledMMPattern(
            self.model_dtype, self.device).register(self.patterns)

is_applicable_for_shape

is_applicable_for_shape(shape: Optional[int]) -> bool
Source code in vllm/compilation/collective_fusion.py
def is_applicable_for_shape(self, shape: Optional[int]) -> bool:
    # only do replace for specific shapes
    tp_size = get_tensor_model_parallel_world_size()
    return shape is not None and shape % tp_size == 0

BasePattern

Source code in vllm/compilation/collective_fusion.py
class BasePattern:

    def __init__(self, dtype: torch.dtype, device: str):
        self.dtype = dtype
        self.device = device
        self.tp = get_tp_group()
        self.tp_size = get_tensor_model_parallel_world_size()

device instance-attribute

device = device

dtype instance-attribute

dtype = dtype

tp instance-attribute

tp = get_tp_group()

tp_size instance-attribute

__init__

__init__(dtype: dtype, device: str)
Source code in vllm/compilation/collective_fusion.py
def __init__(self, dtype: torch.dtype, device: str):
    self.dtype = dtype
    self.device = device
    self.tp = get_tp_group()
    self.tp_size = get_tensor_model_parallel_world_size()

CutlassScaledMMReduceScatterPattern

Bases: BasePattern

Source code in vllm/compilation/collective_fusion.py
class CutlassScaledMMReduceScatterPattern(BasePattern):

    def get_inputs(self):
        input = torch.empty([16, 16], device=self.device, dtype=FP8_DTYPE)
        mm_weight = torch.empty([16, 16], device=self.device,
                                dtype=FP8_DTYPE).contiguous().transpose(0, 1)
        scale_a = torch.empty([16, 1], device=self.device, dtype=torch.float32)
        scale_b = torch.empty([1, 16], device=self.device, dtype=torch.float32)

        cutlass_mm_output = torch.empty([16, 16],
                                        device=self.device,
                                        dtype=self.dtype)
        return [input, mm_weight, scale_a, scale_b, cutlass_mm_output]

    def register(self, pm_pass: PatternMatcherPass):

        def pattern(input: torch.Tensor, weight: torch.Tensor,
                    scale_a: torch.Tensor, scale_b: torch.Tensor,
                    cutlass_mm_output: torch.Tensor) -> torch.Tensor:
            cutlass_scaled_mm = torch.ops.higher_order.auto_functionalized(
                torch.ops._C.cutlass_scaled_mm.default,
                out=cutlass_mm_output,
                a=input,
                b=weight,
                a_scales=scale_a,
                b_scales=scale_b,
                bias=None)

            reduce_scatter = torch.ops.vllm.reduce_scatter.default(
                cutlass_scaled_mm[1],
                dim=0,
                world_size=self.tp_size,
                group_name=self.tp.unique_name)
            return reduce_scatter

        def replacement(input: torch.Tensor, mat2: torch.Tensor,
                        scale_a: torch.Tensor, scale_b: torch.Tensor,
                        cutlass_mm_output: torch.Tensor) -> torch.Tensor:
            gemm_rs = torch.ops.symm_mem.fused_scaled_matmul_reduce_scatter(
                input,
                mat2,
                scale_a,
                scale_b,
                "avg",
                scatter_dim=0,
                out_dtype=self.dtype,
                group_name=self.tp.device_group.group_name,
            )

            return gemm_rs

        pm.register_replacement(pattern, replacement, self.get_inputs(),
                                pm.fwd_only, pm_pass)

get_inputs

get_inputs()
Source code in vllm/compilation/collective_fusion.py
def get_inputs(self):
    input = torch.empty([16, 16], device=self.device, dtype=FP8_DTYPE)
    mm_weight = torch.empty([16, 16], device=self.device,
                            dtype=FP8_DTYPE).contiguous().transpose(0, 1)
    scale_a = torch.empty([16, 1], device=self.device, dtype=torch.float32)
    scale_b = torch.empty([1, 16], device=self.device, dtype=torch.float32)

    cutlass_mm_output = torch.empty([16, 16],
                                    device=self.device,
                                    dtype=self.dtype)
    return [input, mm_weight, scale_a, scale_b, cutlass_mm_output]

register

register(pm_pass: PatternMatcherPass)
Source code in vllm/compilation/collective_fusion.py
def register(self, pm_pass: PatternMatcherPass):

    def pattern(input: torch.Tensor, weight: torch.Tensor,
                scale_a: torch.Tensor, scale_b: torch.Tensor,
                cutlass_mm_output: torch.Tensor) -> torch.Tensor:
        cutlass_scaled_mm = torch.ops.higher_order.auto_functionalized(
            torch.ops._C.cutlass_scaled_mm.default,
            out=cutlass_mm_output,
            a=input,
            b=weight,
            a_scales=scale_a,
            b_scales=scale_b,
            bias=None)

        reduce_scatter = torch.ops.vllm.reduce_scatter.default(
            cutlass_scaled_mm[1],
            dim=0,
            world_size=self.tp_size,
            group_name=self.tp.unique_name)
        return reduce_scatter

    def replacement(input: torch.Tensor, mat2: torch.Tensor,
                    scale_a: torch.Tensor, scale_b: torch.Tensor,
                    cutlass_mm_output: torch.Tensor) -> torch.Tensor:
        gemm_rs = torch.ops.symm_mem.fused_scaled_matmul_reduce_scatter(
            input,
            mat2,
            scale_a,
            scale_b,
            "avg",
            scatter_dim=0,
            out_dtype=self.dtype,
            group_name=self.tp.device_group.group_name,
        )

        return gemm_rs

    pm.register_replacement(pattern, replacement, self.get_inputs(),
                            pm.fwd_only, pm_pass)

FlashInferFusedAllReduceParams

Parameters for FlashInfer fused allreduce operations.

Source code in vllm/compilation/collective_fusion.py
class FlashInferFusedAllReduceParams:
    """Parameters for FlashInfer fused allreduce operations."""

    def __init__(
        self,
        rank: int,
        world_size: int,
        use_fp32_lamport: bool = False,
        max_token_num: int = 1024,
        fuse_rms_quant: bool = False,
    ):
        self.rank = rank
        self.world_size = world_size
        self.use_fp32_lamport = use_fp32_lamport
        self.trigger_completion_at_end = True
        self.launch_with_pdl = True
        self.fp32_acc = True
        self.use_oneshot = False
        self.max_token_num = max_token_num
        self.fuse_rms_quant = fuse_rms_quant

    def get_trtllm_fused_allreduce_kwargs(self):
        return {
            "world_rank": self.rank,
            "world_size": self.world_size,
            "launch_with_pdl": self.launch_with_pdl,
            "trigger_completion_at_end": self.trigger_completion_at_end,
            "fp32_acc": self.fp32_acc,
            "max_token_num": self.max_token_num,
            "fuse_rms_quant": self.fuse_rms_quant,
        }

fp32_acc instance-attribute

fp32_acc = True

fuse_rms_quant instance-attribute

fuse_rms_quant = fuse_rms_quant

launch_with_pdl instance-attribute

launch_with_pdl = True

max_token_num instance-attribute

max_token_num = max_token_num

rank instance-attribute

rank = rank

trigger_completion_at_end instance-attribute

trigger_completion_at_end = True

use_fp32_lamport instance-attribute

use_fp32_lamport = use_fp32_lamport

use_oneshot instance-attribute

use_oneshot = False

world_size instance-attribute

world_size = world_size

__init__

__init__(
    rank: int,
    world_size: int,
    use_fp32_lamport: bool = False,
    max_token_num: int = 1024,
    fuse_rms_quant: bool = False,
)
Source code in vllm/compilation/collective_fusion.py
def __init__(
    self,
    rank: int,
    world_size: int,
    use_fp32_lamport: bool = False,
    max_token_num: int = 1024,
    fuse_rms_quant: bool = False,
):
    self.rank = rank
    self.world_size = world_size
    self.use_fp32_lamport = use_fp32_lamport
    self.trigger_completion_at_end = True
    self.launch_with_pdl = True
    self.fp32_acc = True
    self.use_oneshot = False
    self.max_token_num = max_token_num
    self.fuse_rms_quant = fuse_rms_quant

get_trtllm_fused_allreduce_kwargs

get_trtllm_fused_allreduce_kwargs()
Source code in vllm/compilation/collective_fusion.py
def get_trtllm_fused_allreduce_kwargs(self):
    return {
        "world_rank": self.rank,
        "world_size": self.world_size,
        "launch_with_pdl": self.launch_with_pdl,
        "trigger_completion_at_end": self.trigger_completion_at_end,
        "fp32_acc": self.fp32_acc,
        "max_token_num": self.max_token_num,
        "fuse_rms_quant": self.fuse_rms_quant,
    }

GEMMReduceScatterPattern

Bases: BasePattern

Source code in vllm/compilation/collective_fusion.py
class GEMMReduceScatterPattern(BasePattern):

    def get_inputs(self):
        mul = torch.empty([16, 4], device=self.device, dtype=self.dtype)
        mm_weight = torch.empty([4, 4], device=self.device, dtype=self.dtype)
        return [mul, mm_weight]

    def register(self, pm_pass: PatternMatcherPass):

        def pattern(mul: torch.Tensor, mm_weight: torch.Tensor):
            mm = torch.ops.aten.mm.default(mul, mm_weight)
            reduce_scatter = torch.ops.vllm.reduce_scatter.default(
                mm,
                dim=0,
                world_size=self.tp_size,
                group_name=self.tp.unique_name,
            )
            return reduce_scatter

        def replacement(mul: torch.Tensor, mm_weight: torch.Tensor):
            gemm_rs = torch.ops.symm_mem.fused_matmul_reduce_scatter(
                mul,
                mm_weight,
                "avg",
                scatter_dim=0,
                group_name=self.tp.device_group.group_name,
            )

            return gemm_rs

        pm.register_replacement(pattern, replacement, self.get_inputs(),
                                pm.fwd_only, pm_pass)

get_inputs

get_inputs()
Source code in vllm/compilation/collective_fusion.py
def get_inputs(self):
    mul = torch.empty([16, 4], device=self.device, dtype=self.dtype)
    mm_weight = torch.empty([4, 4], device=self.device, dtype=self.dtype)
    return [mul, mm_weight]

register

register(pm_pass: PatternMatcherPass)
Source code in vllm/compilation/collective_fusion.py
def register(self, pm_pass: PatternMatcherPass):

    def pattern(mul: torch.Tensor, mm_weight: torch.Tensor):
        mm = torch.ops.aten.mm.default(mul, mm_weight)
        reduce_scatter = torch.ops.vllm.reduce_scatter.default(
            mm,
            dim=0,
            world_size=self.tp_size,
            group_name=self.tp.unique_name,
        )
        return reduce_scatter

    def replacement(mul: torch.Tensor, mm_weight: torch.Tensor):
        gemm_rs = torch.ops.symm_mem.fused_matmul_reduce_scatter(
            mul,
            mm_weight,
            "avg",
            scatter_dim=0,
            group_name=self.tp.device_group.group_name,
        )

        return gemm_rs

    pm.register_replacement(pattern, replacement, self.get_inputs(),
                            pm.fwd_only, pm_pass)

ScaledMMReduceScatterPattern

Bases: BasePattern

Source code in vllm/compilation/collective_fusion.py
class ScaledMMReduceScatterPattern(BasePattern):

    def get_inputs(self):
        input = torch.empty([16, 16], device=self.device, dtype=FP8_DTYPE)
        mm_weight = torch.empty([16, 16], device=self.device,
                                dtype=FP8_DTYPE).contiguous().transpose(0, 1)
        scale_a = torch.empty([16, 1], device=self.device, dtype=torch.float32)
        scale_b = torch.empty([1, 16], device=self.device, dtype=torch.float32)
        return [input, mm_weight, scale_a, scale_b]

    def register(self, pm_pass: PatternMatcherPass):

        def pattern(input: torch.Tensor, mat2: torch.Tensor,
                    scale_a: torch.Tensor,
                    scale_b: torch.Tensor) -> torch.Tensor:
            scaled_mm = torch.ops.aten._scaled_mm.default(input,
                                                          mat2=mat2,
                                                          scale_a=scale_a,
                                                          scale_b=scale_b,
                                                          bias=None,
                                                          scale_result=None,
                                                          out_dtype=self.dtype)
            reduce_scatter = torch.ops.vllm.reduce_scatter.default(
                scaled_mm,
                dim=0,
                world_size=self.tp_size,
                group_name=self.tp.unique_name)
            return reduce_scatter

        def replacement(input: torch.Tensor, mat2: torch.Tensor,
                        scale_a: torch.Tensor,
                        scale_b: torch.Tensor) -> torch.Tensor:
            gemm_rs = torch.ops.symm_mem.fused_scaled_matmul_reduce_scatter(
                input,
                mat2,
                scale_a,
                scale_b,
                "avg",
                scatter_dim=0,
                out_dtype=self.dtype,
                group_name=self.tp.device_group.group_name,
            )

            return gemm_rs

        pm.register_replacement(pattern, replacement, self.get_inputs(),
                                pm.fwd_only, pm_pass)

get_inputs

get_inputs()
Source code in vllm/compilation/collective_fusion.py
def get_inputs(self):
    input = torch.empty([16, 16], device=self.device, dtype=FP8_DTYPE)
    mm_weight = torch.empty([16, 16], device=self.device,
                            dtype=FP8_DTYPE).contiguous().transpose(0, 1)
    scale_a = torch.empty([16, 1], device=self.device, dtype=torch.float32)
    scale_b = torch.empty([1, 16], device=self.device, dtype=torch.float32)
    return [input, mm_weight, scale_a, scale_b]

register

register(pm_pass: PatternMatcherPass)
Source code in vllm/compilation/collective_fusion.py
def register(self, pm_pass: PatternMatcherPass):

    def pattern(input: torch.Tensor, mat2: torch.Tensor,
                scale_a: torch.Tensor,
                scale_b: torch.Tensor) -> torch.Tensor:
        scaled_mm = torch.ops.aten._scaled_mm.default(input,
                                                      mat2=mat2,
                                                      scale_a=scale_a,
                                                      scale_b=scale_b,
                                                      bias=None,
                                                      scale_result=None,
                                                      out_dtype=self.dtype)
        reduce_scatter = torch.ops.vllm.reduce_scatter.default(
            scaled_mm,
            dim=0,
            world_size=self.tp_size,
            group_name=self.tp.unique_name)
        return reduce_scatter

    def replacement(input: torch.Tensor, mat2: torch.Tensor,
                    scale_a: torch.Tensor,
                    scale_b: torch.Tensor) -> torch.Tensor:
        gemm_rs = torch.ops.symm_mem.fused_scaled_matmul_reduce_scatter(
            input,
            mat2,
            scale_a,
            scale_b,
            "avg",
            scatter_dim=0,
            out_dtype=self.dtype,
            group_name=self.tp.device_group.group_name,
        )

        return gemm_rs

    pm.register_replacement(pattern, replacement, self.get_inputs(),
                            pm.fwd_only, pm_pass)

call_trtllm_fused_allreduce_norm

call_trtllm_fused_allreduce_norm(
    allreduce_in: Tensor,
    residual: Tensor,
    rms_gamma: Tensor,
    rms_eps: float,
    world_rank: int,
    world_size: int,
    launch_with_pdl: bool,
    trigger_completion_at_end: bool,
    fp32_acc: bool,
    max_token_num: int,
    pattern_code: int,
    fuse_rms_quant: bool,
    norm_out: Optional[Tensor] = None,
    quant_out: Optional[Tensor] = None,
    scale_out: Optional[Tensor] = None,
    scale_factor: Optional[Tensor] = None,
) -> None
Source code in vllm/compilation/collective_fusion.py
def call_trtllm_fused_allreduce_norm(
    allreduce_in: torch.Tensor,
    residual: torch.Tensor,
    rms_gamma: torch.Tensor,
    rms_eps: float,
    world_rank: int,
    world_size: int,
    launch_with_pdl: bool,
    trigger_completion_at_end: bool,
    fp32_acc: bool,
    max_token_num: int,
    pattern_code: int,
    fuse_rms_quant: bool,
    norm_out: Optional[torch.Tensor] = None,
    quant_out: Optional[torch.Tensor] = None,
    scale_out: Optional[torch.Tensor] = None,
    scale_factor: Optional[torch.Tensor] = None,
) -> None:
    num_tokens, hidden_size = allreduce_in.shape
    element_size = allreduce_in.element_size()
    current_tensor_size = num_tokens * hidden_size * element_size
    max_fusion_size = max_token_num * hidden_size * element_size
    use_flashinfer = current_tensor_size <= min(
        _FI_MAX_SIZES.get(world_size, _DEFAULT_FI_MAX_SIZE),
        max_fusion_size,
    )
    if use_flashinfer:
        assert (_FI_WORKSPACE_TENSOR is not None
                ), "Flashinfer must be enabled when using flashinfer"
        if norm_out is None:
            norm_out = allreduce_in
            residual_out = residual
        else:
            # return residual_out as allreduce_out with zeroed residual_in
            # as flashinfer does not support rms_norm
            # and allreduce_out together
            residual_out = allreduce_in
        # For the sizes that are smaller than the max size,
        # we only use flashinfer one shot allreduce
        flashinfer_comm.trtllm_allreduce_fusion(
            allreduce_in=allreduce_in,
            token_num=allreduce_in.shape[0],
            residual_in=residual,
            residual_out=residual_out,
            norm_out=norm_out,
            rms_gamma=rms_gamma,
            rms_eps=rms_eps,
            world_rank=world_rank,
            world_size=world_size,
            hidden_dim=allreduce_in.shape[-1],
            workspace_ptrs=_FI_WORKSPACE_TENSOR,
            launch_with_pdl=launch_with_pdl,
            use_oneshot=True,
            trigger_completion_at_end=trigger_completion_at_end,
            fp32_acc=fp32_acc,
            pattern_code=pattern_code,
            allreduce_out=None,
            quant_out=quant_out,
            scale_out=scale_out,
            # in vllm we only support swizzled layout
            layout_code=flashinfer_comm.QuantizationSFLayout.
            SWIZZLED_128x4,
            scale_factor=scale_factor,
        )
    else:
        allreduce_out = tensor_model_parallel_all_reduce(allreduce_in)
        if (scale_factor is not None and scale_out is None
                and fuse_rms_quant):
            # Do fused rms norm static fp8 quant fused op
            if norm_out is None:
                torch.ops._C.fused_add_rms_norm_static_fp8_quant(
                    quant_out, allreduce_out, residual, rms_gamma,
                    scale_factor, rms_eps)
            else:
                torch.ops._C.rms_norm_static_fp8_quant(
                    quant_out, allreduce_out, rms_gamma, scale_factor,
                    rms_eps)
        else:
            if norm_out is None:
                torch.ops._C.fused_add_rms_norm(allreduce_out, residual,
                                                rms_gamma, rms_eps)
                norm_out = allreduce_out
            else:
                torch.ops._C.rms_norm(norm_out, allreduce_out, rms_gamma,
                                      rms_eps)
            if scale_factor is not None:
                if scale_out is not None:
                    torch.ops._C.scaled_fp4_quant(quant_out, norm_out,
                                                  scale_out, scale_factor)
                else:
                    torch.ops._C.static_scaled_fp8_quant(
                        quant_out, norm_out, scale_factor)
        if scale_factor is None or norm_out is not None:
            # we need to return allreduce outpput
            # in cases of non quant fused AR + RMS norm
            # and fused AR + RMS norm + quant without fused add
            allreduce_in.copy_(allreduce_out)

call_trtllm_fused_allreduce_norm_fake

call_trtllm_fused_allreduce_norm_fake(
    allreduce_in: Tensor,
    residual: Tensor,
    rms_gamma: Tensor,
    rms_eps: float,
    world_rank: int,
    world_size: int,
    launch_with_pdl: bool,
    trigger_completion_at_end: bool,
    fp32_acc: bool,
    max_token_num: int,
    pattern_code: int,
    fuse_rms_quant: bool,
    norm_out: Optional[Tensor] = None,
    quant_out: Optional[Tensor] = None,
    scale_out: Optional[Tensor] = None,
    scale_factor: Optional[Tensor] = None,
) -> None
Source code in vllm/compilation/collective_fusion.py
def call_trtllm_fused_allreduce_norm_fake(
        allreduce_in: torch.Tensor,
        residual: torch.Tensor,
        rms_gamma: torch.Tensor,
        rms_eps: float,
        world_rank: int,
        world_size: int,
        launch_with_pdl: bool,
        trigger_completion_at_end: bool,
        fp32_acc: bool,
        max_token_num: int,
        pattern_code: int,
        fuse_rms_quant: bool,
        norm_out: Optional[torch.Tensor] = None,
        quant_out: Optional[torch.Tensor] = None,
        scale_out: Optional[torch.Tensor] = None,
        scale_factor: Optional[torch.Tensor] = None) -> None:
    pass