Skip to content

vllm.distributed.kv_transfer.kv_connector.factory

logger module-attribute

logger = init_logger(__name__)

KVConnectorFactory

Source code in vllm/distributed/kv_transfer/kv_connector/factory.py
class KVConnectorFactory:
    _registry: dict[str, Callable[[], type[KVConnectorBase]]] = {}

    @classmethod
    def register_connector(cls, name: str, module_path: str,
                           class_name: str) -> None:
        """Register a connector with a lazy-loading module and class name."""
        if name in cls._registry:
            raise ValueError(f"Connector '{name}' is already registered.")

        def loader() -> type[KVConnectorBase]:
            module = importlib.import_module(module_path)
            return getattr(module, class_name)

        cls._registry[name] = loader

    @classmethod
    def create_connector(
        cls,
        config: "VllmConfig",
        role: KVConnectorRole,
    ) -> KVConnectorBase:
        if not envs.VLLM_USE_V1:
            raise ValueError("Attempting to initialize a V1 Connector, "
                             f"but found {envs.VLLM_USE_V1=}")

        kv_transfer_config = config.kv_transfer_config
        connector_cls = cls.get_connector_class(kv_transfer_config)
        logger.info("Creating v1 connector with name: %s and engine_id: %s",
                    connector_cls.__name__, kv_transfer_config.engine_id)
        # NOTE(Kuntai): v1 connector is explicitly separated into two roles.
        # Scheduler connector:
        # - Co-locate with scheduler process
        # - Should only be used inside the Scheduler class
        # Worker connector:
        # - Co-locate with worker process
        # - Should only be used inside the forward context & attention layer
        # We build separately to enforce strict separation
        return connector_cls(config, role)

    @classmethod
    def get_connector_class(
            cls, kv_transfer_config: "KVTransferConfig"
    ) -> type[KVConnectorBaseType]:
        """Get the connector class by name."""
        connector_name = kv_transfer_config.kv_connector
        if connector_name in cls._registry:
            connector_cls = cls._registry[connector_name]()
        else:
            connector_module_path = kv_transfer_config.kv_connector_module_path
            if connector_module_path is None:
                raise ValueError(
                    f"Unsupported connector type: {connector_name}")
            connector_module = importlib.import_module(connector_module_path)
            connector_cls = getattr(connector_module, connector_name)
        return connector_cls

_registry class-attribute instance-attribute

_registry: dict[
    str, Callable[[], type[KVConnectorBase]]
] = {}

create_connector classmethod

create_connector(
    config: VllmConfig, role: KVConnectorRole
) -> KVConnectorBase
Source code in vllm/distributed/kv_transfer/kv_connector/factory.py
@classmethod
def create_connector(
    cls,
    config: "VllmConfig",
    role: KVConnectorRole,
) -> KVConnectorBase:
    if not envs.VLLM_USE_V1:
        raise ValueError("Attempting to initialize a V1 Connector, "
                         f"but found {envs.VLLM_USE_V1=}")

    kv_transfer_config = config.kv_transfer_config
    connector_cls = cls.get_connector_class(kv_transfer_config)
    logger.info("Creating v1 connector with name: %s and engine_id: %s",
                connector_cls.__name__, kv_transfer_config.engine_id)
    # NOTE(Kuntai): v1 connector is explicitly separated into two roles.
    # Scheduler connector:
    # - Co-locate with scheduler process
    # - Should only be used inside the Scheduler class
    # Worker connector:
    # - Co-locate with worker process
    # - Should only be used inside the forward context & attention layer
    # We build separately to enforce strict separation
    return connector_cls(config, role)

get_connector_class classmethod

get_connector_class(
    kv_transfer_config: KVTransferConfig,
) -> type[KVConnectorBaseType]

Get the connector class by name.

Source code in vllm/distributed/kv_transfer/kv_connector/factory.py
@classmethod
def get_connector_class(
        cls, kv_transfer_config: "KVTransferConfig"
) -> type[KVConnectorBaseType]:
    """Get the connector class by name."""
    connector_name = kv_transfer_config.kv_connector
    if connector_name in cls._registry:
        connector_cls = cls._registry[connector_name]()
    else:
        connector_module_path = kv_transfer_config.kv_connector_module_path
        if connector_module_path is None:
            raise ValueError(
                f"Unsupported connector type: {connector_name}")
        connector_module = importlib.import_module(connector_module_path)
        connector_cls = getattr(connector_module, connector_name)
    return connector_cls

register_connector classmethod

register_connector(
    name: str, module_path: str, class_name: str
) -> None

Register a connector with a lazy-loading module and class name.

Source code in vllm/distributed/kv_transfer/kv_connector/factory.py
@classmethod
def register_connector(cls, name: str, module_path: str,
                       class_name: str) -> None:
    """Register a connector with a lazy-loading module and class name."""
    if name in cls._registry:
        raise ValueError(f"Connector '{name}' is already registered.")

    def loader() -> type[KVConnectorBase]:
        module = importlib.import_module(module_path)
        return getattr(module, class_name)

    cls._registry[name] = loader