Skip to content

vllm.v1.structured_output

Modules:

Name Description
backend_guidance
backend_lm_format_enforcer
backend_outlines
backend_types
backend_xgrammar
request
utils

logger module-attribute

logger = init_logger(__name__)

StructuredOutputManager

Engine-level manager for structured output requests.

Source code in vllm/v1/structured_output/__init__.py
class StructuredOutputManager:
    """Engine-level manager for structured output requests."""

    def __init__(self, vllm_config: VllmConfig):
        self.backend: Optional[StructuredOutputBackend] = None
        self.reasoner: Optional[ReasoningParser] = None
        self.vllm_config = vllm_config

        self._grammar_bitmask: Optional[torch.Tensor] = None
        self._full_mask = torch.tensor(-1, dtype=torch.int32)

        max_batch_size = self.vllm_config.scheduler_config.max_num_seqs
        self.fill_bitmask_parallel_threshold = 128
        if self.fill_bitmask_parallel_threshold < max_batch_size:
            self.fill_bitmask_parallel_batch_size = 16
            # Use:
            # - at least 1 CPU
            # - at most half the number of CPUs or 8, whichever is less
            max_workers = max(1, min(multiprocessing.cpu_count() // 2, 8))
            self.executor_for_fillmask = ThreadPoolExecutor(
                max_workers=max_workers)

        if not self.vllm_config.model_config.skip_tokenizer_init:
            # The default max_workers if not specified is the number of
            # CPUs * 5, which is way too high since these tasks are CPU-bound,
            # not I/O bound. We also know we would never dominate CPU usage
            # with just grammar compilation, so we set it to half the number
            # of CPUs.
            max_workers = max(1, (multiprocessing.cpu_count() + 1) // 2)
            self.executor = ThreadPoolExecutor(max_workers=max_workers)
            self.tokenizer = init_tokenizer_from_configs(
                model_config=self.vllm_config.model_config,
                scheduler_config=self.vllm_config.scheduler_config,
                lora_config=self.vllm_config.lora_config,
            ).get_lora_tokenizer(None)
            reasoning_backend = \
                    self.vllm_config.decoding_config.reasoning_backend
            if reasoning_backend:
                reasoner_cls = ReasoningParserManager.get_reasoning_parser(
                    reasoning_backend)
                self.reasoner = reasoner_cls(tokenizer=self.tokenizer)

    def grammar_init(self, request: Request) -> None:
        if request.structured_output_request is None:
            return

        if TYPE_CHECKING:
            assert request.sampling_params is not None and \
                request.sampling_params.guided_decoding is not None

        # Initialize the backend the first time it is needed.
        #
        # NOTE: We only support a single backend. We do NOT support different
        # backends on a per-request basis in V1 (for now, anyway...).
        if self.backend is None:
            assert request.sampling_params is not None
            backend = request.sampling_params.guided_decoding.backend
            vocab_size = self.vllm_config.model_config.get_vocab_size()
            if backend == "xgrammar":
                self.backend = XgrammarBackend(
                    self.vllm_config,
                    tokenizer=self.tokenizer,
                    vocab_size=vocab_size,
                )
            elif backend == "guidance":
                self.backend = GuidanceBackend(
                    self.vllm_config,
                    tokenizer=self.tokenizer,
                    vocab_size=vocab_size,
                )
            elif backend == "outlines":
                from vllm.v1.structured_output.backend_outlines import (
                    OutlinesBackend)

                self.backend = OutlinesBackend(
                    self.vllm_config,
                    tokenizer=self.tokenizer,
                    vocab_size=vocab_size,
                )
            elif backend == "lm-format-enforcer":
                from vllm.v1.structured_output.backend_lm_format_enforcer import (  # noqa: E501
                    LMFormatEnforcerBackend)
                self.backend = LMFormatEnforcerBackend(
                    self.vllm_config,
                    tokenizer=self.tokenizer,
                    vocab_size=vocab_size,
                )
            else:
                raise ValueError(
                    f"Unsupported structured output backend: {backend}")

        grammar = self.executor.submit(self._async_create_grammar, request)
        request.structured_output_request.grammar = grammar  # type: ignore[assignment]

    def _async_create_grammar(
        self,
        request: Request,
    ) -> StructuredOutputGrammar:
        key = request.structured_output_request.structured_output_key  # type: ignore[union-attr]

        # Note that the request was validated in the engine core client,
        # so at this point we know it is a supported type of request.
        #
        # TODO: we still need to handle xgrammar compilation failures,
        # though it should be unlikely as we test that up front as well.
        request_type, grammar_spec = key

        assert self.backend is not None
        return self.backend.compile_grammar(request_type, grammar_spec)

    def _fill_bitmasks(
        self,
        batch: list[tuple[StructuredOutputGrammar, int, bool]],
    ) -> None:
        assert self._grammar_bitmask is not None
        for grammar, index, apply_bitmask in batch:
            if apply_bitmask and not grammar.is_terminated():
                grammar.fill_bitmask(self._grammar_bitmask, index)
            else:
                # Note that for thinking support, we will need to
                # reset the relevant part of the bitmask for consequent
                # requests here.
                self._grammar_bitmask[index].fill_(self._full_mask)

    def _async_submit_fill_bitmask(
        self,
        batch: list[tuple[StructuredOutputGrammar, int, bool]],
    ) -> Future:
        return self.executor_for_fillmask.submit(self._fill_bitmasks, batch)

    def grammar_bitmask(
        self,
        requests: dict[str, Request],
        structured_output_request_ids: dict[str, int],
        scheduled_spec_decode_tokens: dict[str, list[int]],
    ) -> Optional[npt.NDArray[np.int32]]:
        # Prepare the structured output bitmask for this batch.
        if not structured_output_request_ids:
            return None

        max_num_spec_tokens = 0
        if self.vllm_config.speculative_config is not None:
            max_num_spec_tokens = \
                self.vllm_config.speculative_config.num_speculative_tokens

        if self._grammar_bitmask is None:
            assert self.backend is not None
            max_batch_size = self.vllm_config.scheduler_config.max_num_seqs

            # Allocate a bitmask for each token needing to be checked:
            # one for each speculative position, and one more for the
            # bonus token / non-speculative token.
            self._grammar_bitmask = \
                self.backend.allocate_token_bitmask(
                    max_batch_size * (1 + max_num_spec_tokens))

        # Generate a batched bitmask for all structured output requests.
        # When speculative decoding is enabled, we need to include multiple
        # masks for each request, one for each possible bonus token position.
        # These are stored inline in the tensor and unpacked by the gpu runner.
        cumulative_index = 0
        ordered_seq = sorted(structured_output_request_ids.items(),
                             key=lambda x: x[1])

        # Optimized parallel filling of bitmasks for
        # non-spec, large-batch-size cases
        if len(ordered_seq) > self.fill_bitmask_parallel_threshold and \
                max_num_spec_tokens == 0:
            promises = []
            batch = []
            for req_id, _ in ordered_seq:
                request = requests[req_id]
                structured_output_request = request.structured_output_request
                if TYPE_CHECKING:
                    assert structured_output_request is not None
                    assert structured_output_request.grammar is not None

                apply_bitmask = self.should_fill_bitmask(request)
                batch.append((structured_output_request.grammar,
                              cumulative_index, apply_bitmask))
                if len(batch) == self.fill_bitmask_parallel_batch_size:
                    promises.append(self._async_submit_fill_bitmask(batch))
                    batch = []

                cumulative_index += 1
            if batch:
                promises.append(self._async_submit_fill_bitmask(batch))

            # Wait for all bitmask filling tasks to complete.
            for promise in promises:
                promise.result()
        else:
            # Fallback to serial filling of bitmasks for small-batch-size cases
            for req_id, _ in ordered_seq:
                request = requests[req_id]
                structured_output_request = request.structured_output_request

                if TYPE_CHECKING:
                    assert structured_output_request is not None
                    assert structured_output_request.grammar is not None
                apply_bitmask = self.should_fill_bitmask(request)

                state_advancements = 0
                req_tokens = scheduled_spec_decode_tokens.get(req_id, [])
                for i, token in enumerate(req_tokens + [None]):
                    self._fill_bitmasks([(structured_output_request.grammar,
                                          cumulative_index, apply_bitmask)])

                    if apply_bitmask and token is not None and \
                        not structured_output_request.grammar.is_terminated():
                        assert structured_output_request.grammar.accept_tokens(
                            req_id, [token])
                        state_advancements += 1
                    cumulative_index += 1
                if state_advancements > 0:
                    structured_output_request.grammar.rollback(
                        state_advancements)

        bitmask_tensor = self._grammar_bitmask
        if cumulative_index < bitmask_tensor.shape[0]:
            bitmask_tensor = bitmask_tensor[:cumulative_index]

        # After finishing with the xgrammar operations, we convert to
        # np.ndarray, because that is much more efficient for serialization
        # and deserialization when sending this to the GPU workers.
        return bitmask_tensor.numpy()

    def should_fill_bitmask(self, request: Request) -> bool:
        if self.reasoner is not None:
            assert request.structured_output_request is not None
            if request.structured_output_request.reasoning_ended is None:
                request.structured_output_request.reasoning_ended = \
                    self.reasoner.is_reasoning_end(request.prompt_token_ids)
            return request.structured_output_request.reasoning_ended
        return True

    def should_advance(self, request: Request) -> bool:
        if not request.use_structured_output:
            return False

        # To determine whether we can advance the FSM.
        # Supports thinking usage where we skip the reasoning components.
        if TYPE_CHECKING:
            assert request.structured_output_request is not None
            assert request.structured_output_request.grammar is not None
        # by default, we should always advance
        # for cases that don't use thinking mode.
        if self.reasoner is not None:
            structured_req = request.structured_output_request

            if structured_req.reasoning_ended:
                return True

            # Check if reasoning ends in *this* step
            if self.reasoner.is_reasoning_end(request.all_token_ids):
                # Reasoning just ended, so we shouldn't advance til
                # next pass
                structured_req.reasoning_ended = True

            return False
        else:
            return True

    def clear_backend(self) -> None:
        if self.backend is not None:
            self.backend.destroy()

_full_mask instance-attribute

_full_mask = tensor(-1, dtype=int32)

_grammar_bitmask instance-attribute

_grammar_bitmask: Optional[Tensor] = None

backend instance-attribute

executor instance-attribute

executor = ThreadPoolExecutor(max_workers=max_workers)

executor_for_fillmask instance-attribute

executor_for_fillmask = ThreadPoolExecutor(
    max_workers=max_workers
)

fill_bitmask_parallel_batch_size instance-attribute

fill_bitmask_parallel_batch_size = 16

fill_bitmask_parallel_threshold instance-attribute

fill_bitmask_parallel_threshold = 128

reasoner instance-attribute

reasoner: Optional[ReasoningParser] = None

tokenizer instance-attribute

tokenizer = get_lora_tokenizer(None)

vllm_config instance-attribute

vllm_config = vllm_config

__init__

__init__(vllm_config: VllmConfig)
Source code in vllm/v1/structured_output/__init__.py
def __init__(self, vllm_config: VllmConfig):
    self.backend: Optional[StructuredOutputBackend] = None
    self.reasoner: Optional[ReasoningParser] = None
    self.vllm_config = vllm_config

    self._grammar_bitmask: Optional[torch.Tensor] = None
    self._full_mask = torch.tensor(-1, dtype=torch.int32)

    max_batch_size = self.vllm_config.scheduler_config.max_num_seqs
    self.fill_bitmask_parallel_threshold = 128
    if self.fill_bitmask_parallel_threshold < max_batch_size:
        self.fill_bitmask_parallel_batch_size = 16
        # Use:
        # - at least 1 CPU
        # - at most half the number of CPUs or 8, whichever is less
        max_workers = max(1, min(multiprocessing.cpu_count() // 2, 8))
        self.executor_for_fillmask = ThreadPoolExecutor(
            max_workers=max_workers)

    if not self.vllm_config.model_config.skip_tokenizer_init:
        # The default max_workers if not specified is the number of
        # CPUs * 5, which is way too high since these tasks are CPU-bound,
        # not I/O bound. We also know we would never dominate CPU usage
        # with just grammar compilation, so we set it to half the number
        # of CPUs.
        max_workers = max(1, (multiprocessing.cpu_count() + 1) // 2)
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.tokenizer = init_tokenizer_from_configs(
            model_config=self.vllm_config.model_config,
            scheduler_config=self.vllm_config.scheduler_config,
            lora_config=self.vllm_config.lora_config,
        ).get_lora_tokenizer(None)
        reasoning_backend = \
                self.vllm_config.decoding_config.reasoning_backend
        if reasoning_backend:
            reasoner_cls = ReasoningParserManager.get_reasoning_parser(
                reasoning_backend)
            self.reasoner = reasoner_cls(tokenizer=self.tokenizer)

_async_create_grammar

_async_create_grammar(
    request: Request,
) -> StructuredOutputGrammar
Source code in vllm/v1/structured_output/__init__.py
def _async_create_grammar(
    self,
    request: Request,
) -> StructuredOutputGrammar:
    key = request.structured_output_request.structured_output_key  # type: ignore[union-attr]

    # Note that the request was validated in the engine core client,
    # so at this point we know it is a supported type of request.
    #
    # TODO: we still need to handle xgrammar compilation failures,
    # though it should be unlikely as we test that up front as well.
    request_type, grammar_spec = key

    assert self.backend is not None
    return self.backend.compile_grammar(request_type, grammar_spec)

_async_submit_fill_bitmask

_async_submit_fill_bitmask(
    batch: list[tuple[StructuredOutputGrammar, int, bool]],
) -> Future
Source code in vllm/v1/structured_output/__init__.py
def _async_submit_fill_bitmask(
    self,
    batch: list[tuple[StructuredOutputGrammar, int, bool]],
) -> Future:
    return self.executor_for_fillmask.submit(self._fill_bitmasks, batch)

_fill_bitmasks

_fill_bitmasks(
    batch: list[tuple[StructuredOutputGrammar, int, bool]],
) -> None
Source code in vllm/v1/structured_output/__init__.py
def _fill_bitmasks(
    self,
    batch: list[tuple[StructuredOutputGrammar, int, bool]],
) -> None:
    assert self._grammar_bitmask is not None
    for grammar, index, apply_bitmask in batch:
        if apply_bitmask and not grammar.is_terminated():
            grammar.fill_bitmask(self._grammar_bitmask, index)
        else:
            # Note that for thinking support, we will need to
            # reset the relevant part of the bitmask for consequent
            # requests here.
            self._grammar_bitmask[index].fill_(self._full_mask)

clear_backend

clear_backend() -> None
Source code in vllm/v1/structured_output/__init__.py
def clear_backend(self) -> None:
    if self.backend is not None:
        self.backend.destroy()

grammar_bitmask

grammar_bitmask(
    requests: dict[str, Request],
    structured_output_request_ids: dict[str, int],
    scheduled_spec_decode_tokens: dict[str, list[int]],
) -> Optional[NDArray[int32]]
Source code in vllm/v1/structured_output/__init__.py
def grammar_bitmask(
    self,
    requests: dict[str, Request],
    structured_output_request_ids: dict[str, int],
    scheduled_spec_decode_tokens: dict[str, list[int]],
) -> Optional[npt.NDArray[np.int32]]:
    # Prepare the structured output bitmask for this batch.
    if not structured_output_request_ids:
        return None

    max_num_spec_tokens = 0
    if self.vllm_config.speculative_config is not None:
        max_num_spec_tokens = \
            self.vllm_config.speculative_config.num_speculative_tokens

    if self._grammar_bitmask is None:
        assert self.backend is not None
        max_batch_size = self.vllm_config.scheduler_config.max_num_seqs

        # Allocate a bitmask for each token needing to be checked:
        # one for each speculative position, and one more for the
        # bonus token / non-speculative token.
        self._grammar_bitmask = \
            self.backend.allocate_token_bitmask(
                max_batch_size * (1 + max_num_spec_tokens))

    # Generate a batched bitmask for all structured output requests.
    # When speculative decoding is enabled, we need to include multiple
    # masks for each request, one for each possible bonus token position.
    # These are stored inline in the tensor and unpacked by the gpu runner.
    cumulative_index = 0
    ordered_seq = sorted(structured_output_request_ids.items(),
                         key=lambda x: x[1])

    # Optimized parallel filling of bitmasks for
    # non-spec, large-batch-size cases
    if len(ordered_seq) > self.fill_bitmask_parallel_threshold and \
            max_num_spec_tokens == 0:
        promises = []
        batch = []
        for req_id, _ in ordered_seq:
            request = requests[req_id]
            structured_output_request = request.structured_output_request
            if TYPE_CHECKING:
                assert structured_output_request is not None
                assert structured_output_request.grammar is not None

            apply_bitmask = self.should_fill_bitmask(request)
            batch.append((structured_output_request.grammar,
                          cumulative_index, apply_bitmask))
            if len(batch) == self.fill_bitmask_parallel_batch_size:
                promises.append(self._async_submit_fill_bitmask(batch))
                batch = []

            cumulative_index += 1
        if batch:
            promises.append(self._async_submit_fill_bitmask(batch))

        # Wait for all bitmask filling tasks to complete.
        for promise in promises:
            promise.result()
    else:
        # Fallback to serial filling of bitmasks for small-batch-size cases
        for req_id, _ in ordered_seq:
            request = requests[req_id]
            structured_output_request = request.structured_output_request

            if TYPE_CHECKING:
                assert structured_output_request is not None
                assert structured_output_request.grammar is not None
            apply_bitmask = self.should_fill_bitmask(request)

            state_advancements = 0
            req_tokens = scheduled_spec_decode_tokens.get(req_id, [])
            for i, token in enumerate(req_tokens + [None]):
                self._fill_bitmasks([(structured_output_request.grammar,
                                      cumulative_index, apply_bitmask)])

                if apply_bitmask and token is not None and \
                    not structured_output_request.grammar.is_terminated():
                    assert structured_output_request.grammar.accept_tokens(
                        req_id, [token])
                    state_advancements += 1
                cumulative_index += 1
            if state_advancements > 0:
                structured_output_request.grammar.rollback(
                    state_advancements)

    bitmask_tensor = self._grammar_bitmask
    if cumulative_index < bitmask_tensor.shape[0]:
        bitmask_tensor = bitmask_tensor[:cumulative_index]

    # After finishing with the xgrammar operations, we convert to
    # np.ndarray, because that is much more efficient for serialization
    # and deserialization when sending this to the GPU workers.
    return bitmask_tensor.numpy()

grammar_init

grammar_init(request: Request) -> None
Source code in vllm/v1/structured_output/__init__.py
def grammar_init(self, request: Request) -> None:
    if request.structured_output_request is None:
        return

    if TYPE_CHECKING:
        assert request.sampling_params is not None and \
            request.sampling_params.guided_decoding is not None

    # Initialize the backend the first time it is needed.
    #
    # NOTE: We only support a single backend. We do NOT support different
    # backends on a per-request basis in V1 (for now, anyway...).
    if self.backend is None:
        assert request.sampling_params is not None
        backend = request.sampling_params.guided_decoding.backend
        vocab_size = self.vllm_config.model_config.get_vocab_size()
        if backend == "xgrammar":
            self.backend = XgrammarBackend(
                self.vllm_config,
                tokenizer=self.tokenizer,
                vocab_size=vocab_size,
            )
        elif backend == "guidance":
            self.backend = GuidanceBackend(
                self.vllm_config,
                tokenizer=self.tokenizer,
                vocab_size=vocab_size,
            )
        elif backend == "outlines":
            from vllm.v1.structured_output.backend_outlines import (
                OutlinesBackend)

            self.backend = OutlinesBackend(
                self.vllm_config,
                tokenizer=self.tokenizer,
                vocab_size=vocab_size,
            )
        elif backend == "lm-format-enforcer":
            from vllm.v1.structured_output.backend_lm_format_enforcer import (  # noqa: E501
                LMFormatEnforcerBackend)
            self.backend = LMFormatEnforcerBackend(
                self.vllm_config,
                tokenizer=self.tokenizer,
                vocab_size=vocab_size,
            )
        else:
            raise ValueError(
                f"Unsupported structured output backend: {backend}")

    grammar = self.executor.submit(self._async_create_grammar, request)
    request.structured_output_request.grammar = grammar  # type: ignore[assignment]

should_advance

should_advance(request: Request) -> bool
Source code in vllm/v1/structured_output/__init__.py
def should_advance(self, request: Request) -> bool:
    if not request.use_structured_output:
        return False

    # To determine whether we can advance the FSM.
    # Supports thinking usage where we skip the reasoning components.
    if TYPE_CHECKING:
        assert request.structured_output_request is not None
        assert request.structured_output_request.grammar is not None
    # by default, we should always advance
    # for cases that don't use thinking mode.
    if self.reasoner is not None:
        structured_req = request.structured_output_request

        if structured_req.reasoning_ended:
            return True

        # Check if reasoning ends in *this* step
        if self.reasoner.is_reasoning_end(request.all_token_ids):
            # Reasoning just ended, so we shouldn't advance til
            # next pass
            structured_req.reasoning_ended = True

        return False
    else:
        return True

should_fill_bitmask

should_fill_bitmask(request: Request) -> bool
Source code in vllm/v1/structured_output/__init__.py
def should_fill_bitmask(self, request: Request) -> bool:
    if self.reasoner is not None:
        assert request.structured_output_request is not None
        if request.structured_output_request.reasoning_ended is None:
            request.structured_output_request.reasoning_ended = \
                self.reasoner.is_reasoning_end(request.prompt_token_ids)
        return request.structured_output_request.reasoning_ended
    return True