class XPUPlatform(Platform):
_enum = PlatformEnum.XPU
device_name: str = "xpu"
device_type: str = "xpu"
dispatch_key: str = "XPU"
# Intel XPU's device key is "GPU" for Ray.
# see https://github.com/ray-project/ray/blob/6a5eb5865eeb9ccf058a79b44f107e327e360673/python/ray/_private/accelerators/intel_gpu.py#L20 # noqa: E501
ray_device_key: str = "GPU"
dist_backend: str = "ccl" # ccl | xccl
device_control_env_var: str = "ZE_AFFINITY_MASK"
@classmethod
def get_attn_backend_cls(cls, selected_backend: _Backend, head_size: int,
dtype: torch.dtype, kv_cache_dtype: Optional[str],
block_size: int, use_v1: bool, use_mla: bool,
has_sink: bool) -> str:
if selected_backend is not None and selected_backend != _Backend.IPEX:
logger.info("Cannot use %s backend on XPU.", selected_backend)
use_v1 = envs.VLLM_USE_V1
if not use_v1:
raise ValueError("XPU backend only supports V1.")
logger.info("Using Flash Attention backend on V1 engine.")
return "vllm.v1.attention.backends.flash_attn.FlashAttentionBackend"
@classmethod
def set_device(cls, device: torch.device) -> None:
"""
Set the device for the current platform.
"""
torch.xpu.set_device(device)
@classmethod
def get_device_capability(
cls,
device_id: int = 0,
) -> Optional[DeviceCapability]:
# capacity format differs from cuda's and will cause unexpected
# failure, so use None directly
return None
@classmethod
def get_device_name(cls, device_id: int = 0) -> str:
return torch.xpu.get_device_name(device_id)
@classmethod
def get_punica_wrapper(cls) -> str:
return "vllm.lora.punica_wrapper.punica_xpu.PunicaWrapperXPU"
@classmethod
def get_device_total_memory(cls, device_id: int = 0) -> int:
device_props = torch.xpu.get_device_properties(device_id)
return device_props.total_memory
@classmethod
def is_async_output_supported(cls, enforce_eager: Optional[bool]) -> bool:
return True
@classmethod
def inference_mode(cls):
return torch.no_grad()
@classmethod
def check_and_update_config(cls, vllm_config: VllmConfig) -> None:
cache_config = vllm_config.cache_config
model_config = vllm_config.model_config
# in V1(or with ipex chunked prefill) block_size is 64
if cache_config and cache_config.block_size is None:
cache_config.block_size = 64
# FIXME: Temporarily forcing eager mode
# remove after t.compile support stabilizes.
if (envs.VLLM_USE_V1 and model_config is not None
and not vllm_config.model_config.enforce_eager):
from vllm.config import CompilationLevel
vllm_config.compilation_config.level = CompilationLevel.NO_COMPILATION # noqa: E501
# lazy import to avoid circular import
from vllm.config import CUDAGraphMode
compilation_config = vllm_config.compilation_config
if compilation_config.cudagraph_mode is None or \
compilation_config.cudagraph_mode.max_cudagraph_mode() \
!= CUDAGraphMode.NONE:
logger.info("[XPU] CUDA graph is not supported on XPU, "
"disabling cudagraphs.")
compilation_config.cudagraph_mode = CUDAGraphMode.NONE
# check and update parallel config
parallel_config = vllm_config.parallel_config
parallel_config.worker_cls = "vllm.v1.worker.xpu_worker.XPUWorker"
if parallel_config.distributed_executor_backend is None:
if parallel_config.world_size > 1:
parallel_config.distributed_executor_backend = "ray"
else:
parallel_config.distributed_executor_backend = "uni"
elif parallel_config.distributed_executor_backend == "mp":
# FIXME(kunshang):
# spawn needs calling `if __name__ == '__main__':``
# fork is not supported for xpu start new process.
if envs.VLLM_WORKER_MULTIPROC_METHOD != "spawn":
os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"
logger.warning(
"Please use spawn as start method if you want to use mp.")
elif (parallel_config.distributed_executor_backend != "ray"
and parallel_config.distributed_executor_backend != "uni"
and parallel_config.distributed_executor_backend
!= "external_launcher"):
logger.warning(
"%s is not supported on XPU, fallback to ray distributed"
" executor backend.",
parallel_config.distributed_executor_backend)
parallel_config.distributed_executor_backend = "ray"
if model_config and model_config.use_mla:
logger.info(
"MLA is enabled on a non-GPU platform; forcing chunked "
"prefill and prefix caching to be disabled.")
vllm_config.scheduler_config.enable_chunked_prefill = False
vllm_config.scheduler_config.chunked_prefill_enabled = False
vllm_config.scheduler_config.max_num_batched_tokens = max(
vllm_config.scheduler_config.max_model_len,
DEFAULT_MAX_NUM_BATCHED_TOKENS)
@classmethod
def is_pin_memory_available(cls):
return True
@classmethod
def get_current_memory_usage(cls,
device: Optional[torch.types.Device] = None
) -> float:
torch.xpu.reset_peak_memory_stats(device)
return torch.xpu.max_memory_allocated(device)
@classmethod
def is_data_center_gpu(cls) -> bool:
device_name = cls.get_device_name().lower()
return device_name.count("data center gpu") > 0
@classmethod
def get_device_communicator_cls(cls) -> str:
return "vllm.distributed.device_communicators.xpu_communicator.XpuCommunicator" # noqa
@classmethod
def supports_v1(cls, model_config: ModelConfig) -> bool:
return True
@classmethod
def device_count(cls) -> int:
return torch.xpu.device_count()
@classmethod
def check_if_supports_dtype(cls, torch_dtype: torch.dtype):
if torch_dtype == torch.bfloat16: # noqa: SIM102
device_name = cls.get_device_name().lower()
# client gpu a770
if device_name.count("a770") > 0:
raise ValueError(
"Intel Arc A770 have bfloat16 accuracy known issue. "
"You can use float16 instead by explicitly setting the "
"`dtype` flag in CLI, for example: --dtype=half.")