@CustomOp.register("quant_fp8")
class QuantFP8(CustomOp):
"""
Quantize input tensor to per-tensor or per-token FP8.
This CustomOp supports both static and dynamic quantization.
"""
def __init__(self,
static: bool,
group_shape: GroupShape,
num_token_padding: Optional[int] = None):
"""
:param static: static or dynamic quantization
:param group_shape: quantization group shape (PER_TOKEN or PER_TENSOR)
:param num_token_padding: Pad the token dimension of output to this size
"""
super().__init__()
self.num_token_padding = num_token_padding
assert group_shape in {GroupShape.PER_TOKEN, GroupShape.PER_TENSOR}
assert not static or group_shape == GroupShape.PER_TENSOR, \
"Only per-tensor scales supported for static quantization."
self.static = static
self.group_shape = group_shape
self.use_per_token_if_dynamic = group_shape == GroupShape.PER_TOKEN
def forward_cuda(
self,
x: torch.Tensor,
scale: Optional[torch.Tensor] = None,
scale_ub: Optional[torch.Tensor] = None,
) -> tuple[torch.Tensor, torch.Tensor]:
assert (scale is not None) == self.static
assert scale_ub is None or (not self.static and self.group_shape
== GroupShape.PER_TOKEN
and scale_ub.numel() == 1)
return ops.scaled_fp8_quant(
x,
scale,
num_token_padding=self.num_token_padding,
scale_ub=scale_ub,
use_per_token_if_dynamic=self.use_per_token_if_dynamic)
def forward_native(
self,
x: torch.Tensor,
scale: Optional[torch.Tensor] = None,
scale_ub: Optional[torch.Tensor] = None,
):
assert (scale is not None) == self.static
assert scale_ub is None or (not self.static and self.group_shape
== GroupShape.PER_TOKEN
and scale_ub.numel() == 1)
if scale is None:
if self.group_shape == GroupShape.PER_TOKEN:
x_max, _ = x.abs().max(dim=-1)
x_max = x_max.unsqueeze(-1).to(torch.float32)
if scale_ub is not None:
x_max = x_max.clamp(max=scale_ub)
else:
x_max = x.abs().max().unsqueeze(-1).to(torch.float32)
scale = x_max / _FP8_MAX
scale = scale.clamp(min=_FP8_MIN_SCALING_FACTOR)
# Even for dynamic per-token scales,
# reciprocal performs slightly better than division
out = x.to(torch.float32) * scale.reciprocal()
out = out.clamp(_FP8_MIN, _FP8_MAX).to(_FP8_DTYPE)
# This currently generates an extra Triton kernel in compilation.
# Fortunately, we don't use padding if compiling.
# TODO(luka): benchmark torch._scaled_mm to hopefully remove padding
# in general.
if self.num_token_padding is not None:
padding = max(self.num_token_padding - out.size(0), 0)
out = F.pad(out, (0, 0, 0, padding), "constant", 0.0)
return out, scale