Skip to content

vllm.benchmarks.lib.endpoint_request_func

The request function for API endpoints.

AIOHTTP_TIMEOUT module-attribute

AIOHTTP_TIMEOUT = ClientTimeout(total=6 * 60 * 60)

ASYNC_REQUEST_FUNCS module-attribute

ASYNC_REQUEST_FUNCS = {
    "vllm": async_request_openai_completions,
    "openai": async_request_openai_completions,
    "openai-chat": async_request_openai_chat_completions,
    "openai-audio": async_request_openai_audio,
    "openai-embeddings": async_request_openai_embeddings,
}

OPENAI_COMPATIBLE_BACKENDS module-attribute

OPENAI_COMPATIBLE_BACKENDS = [
    k
    for (k, v) in (items())
    if v
    in (
        async_request_openai_completions,
        async_request_openai_chat_completions,
    )
]

RequestFuncInput dataclass

The input for the request function.

Source code in vllm/benchmarks/lib/endpoint_request_func.py
@dataclass
class RequestFuncInput:
    """The input for the request function."""
    prompt: str
    api_url: str
    prompt_len: int
    output_len: int
    model: str
    model_name: Optional[str] = None
    logprobs: Optional[int] = None
    extra_body: Optional[dict] = None
    multi_modal_content: Optional[Union[dict, list[dict]]] = None
    ignore_eos: bool = False
    language: Optional[str] = None
    request_id: Optional[str] = None

api_url instance-attribute

api_url: str

extra_body class-attribute instance-attribute

extra_body: Optional[dict] = None

ignore_eos class-attribute instance-attribute

ignore_eos: bool = False

language class-attribute instance-attribute

language: Optional[str] = None

logprobs class-attribute instance-attribute

logprobs: Optional[int] = None

model instance-attribute

model: str

model_name class-attribute instance-attribute

model_name: Optional[str] = None

multi_modal_content class-attribute instance-attribute

multi_modal_content: Optional[Union[dict, list[dict]]] = (
    None
)

output_len instance-attribute

output_len: int

prompt instance-attribute

prompt: str

prompt_len instance-attribute

prompt_len: int

request_id class-attribute instance-attribute

request_id: Optional[str] = None

__init__

__init__(
    prompt: str,
    api_url: str,
    prompt_len: int,
    output_len: int,
    model: str,
    model_name: Optional[str] = None,
    logprobs: Optional[int] = None,
    extra_body: Optional[dict] = None,
    multi_modal_content: Optional[
        Union[dict, list[dict]]
    ] = None,
    ignore_eos: bool = False,
    language: Optional[str] = None,
    request_id: Optional[str] = None,
) -> None

RequestFuncOutput dataclass

The output of the request function including metrics.

Source code in vllm/benchmarks/lib/endpoint_request_func.py
@dataclass
class RequestFuncOutput:
    """The output of the request function including metrics."""
    generated_text: str = ""
    success: bool = False
    latency: float = 0.0
    output_tokens: int = 0
    ttft: float = 0.0  # Time to first token
    itl: list[float] = field(
        default_factory=list)  # list of inter-token latencies
    tpot: float = 0.0  # avg next-token latencies
    prompt_len: int = 0
    error: str = ""

error class-attribute instance-attribute

error: str = ''

generated_text class-attribute instance-attribute

generated_text: str = ''

itl class-attribute instance-attribute

itl: list[float] = field(default_factory=list)

latency class-attribute instance-attribute

latency: float = 0.0

output_tokens class-attribute instance-attribute

output_tokens: int = 0

prompt_len class-attribute instance-attribute

prompt_len: int = 0

success class-attribute instance-attribute

success: bool = False

tpot class-attribute instance-attribute

tpot: float = 0.0

ttft class-attribute instance-attribute

ttft: float = 0.0

__init__

__init__(
    generated_text: str = "",
    success: bool = False,
    latency: float = 0.0,
    output_tokens: int = 0,
    ttft: float = 0.0,
    itl: list[float] = list(),
    tpot: float = 0.0,
    prompt_len: int = 0,
    error: str = "",
) -> None

async_request_openai_audio async

async_request_openai_audio(
    request_func_input: RequestFuncInput,
    session: ClientSession,
    pbar: Optional[tqdm] = None,
) -> RequestFuncOutput
Source code in vllm/benchmarks/lib/endpoint_request_func.py
async def async_request_openai_audio(
    request_func_input: RequestFuncInput,
    session: aiohttp.ClientSession,
    pbar: Optional[tqdm] = None,
) -> RequestFuncOutput:
    # Lazy import without PlaceholderModule to avoid vllm dep.
    import soundfile

    api_url = request_func_input.api_url
    assert api_url.endswith(("transcriptions", "translations")), (
        "OpenAI Chat Completions API URL must end with 'transcriptions' ")
    "or `translations`."

    content = [{"type": "text", "text": request_func_input.prompt}]
    payload = {
        "model":
        request_func_input.model_name
        if request_func_input.model_name else request_func_input.model,
        "temperature":
        0.0,
        "max_completion_tokens":
        request_func_input.output_len,
        "stream":
        True,
        "language":
        "en",
        # Flattened due to multipart/form-data
        "stream_include_usage":
        True,
        "stream_continuous_usage_stats":
        True,
    }
    if request_func_input.extra_body:
        payload.update(request_func_input.extra_body)
    headers = {
        "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}",
    }
    if request_func_input.request_id:
        headers["x-request-id"] = request_func_input.request_id

    # Send audio file
    def to_bytes(y, sr):
        buffer = io.BytesIO()
        soundfile.write(buffer, y, sr, format="WAV")
        buffer.seek(0)
        return buffer

    mm_audio = request_func_input.multi_modal_content
    if not isinstance(mm_audio, dict) or "audio" not in mm_audio:
        raise TypeError("multi_modal_content must be a dict containing 'audio'")
    with to_bytes(*mm_audio["audio"]) as f:
        form = aiohttp.FormData()
        form.add_field("file", f, content_type="audio/wav")
        for key, value in payload.items():
            form.add_field(key, str(value))

        output = RequestFuncOutput()
        output.prompt_len = request_func_input.prompt_len

        generated_text = ""
        ttft = 0.0
        st = time.perf_counter()
        most_recent_timestamp = st
        try:
            async with session.post(url=api_url,
                                    data=form,
                                    headers=headers) as response:
                if response.status == 200:
                    async for chunk_bytes in response.content:
                        chunk_bytes = chunk_bytes.strip()
                        if not chunk_bytes:
                            continue

                        chunk = chunk_bytes.decode("utf-8").removeprefix(
                            "data: ")
                        if chunk != "[DONE]":
                            timestamp = time.perf_counter()
                            data = json.loads(chunk)

                            if choices := data.get("choices"):
                                content = choices[0]["delta"].get(
                                    "content")
                                # First token
                                if ttft == 0.0:
                                    ttft = timestamp - st
                                    output.ttft = ttft

                                # Decoding phase
                                else:
                                    output.itl.append(
                                        timestamp - most_recent_timestamp)

                                generated_text += content or ""
                            elif usage := data.get("usage"):
                                output.output_tokens = usage.get(
                                    "completion_tokens")

                            most_recent_timestamp = timestamp

                    output.generated_text = generated_text
                    output.success = True
                    output.latency = most_recent_timestamp - st
                else:
                    output.error = response.reason or ""
                    output.success = False
        except Exception:
            output.success = False
            exc_info = sys.exc_info()
            output.error = "".join(traceback.format_exception(*exc_info))

    if pbar:
        pbar.update(1)
    return output

async_request_openai_chat_completions async

async_request_openai_chat_completions(
    request_func_input: RequestFuncInput,
    session: ClientSession,
    pbar: Optional[tqdm] = None,
) -> RequestFuncOutput
Source code in vllm/benchmarks/lib/endpoint_request_func.py
async def async_request_openai_chat_completions(
    request_func_input: RequestFuncInput,
    session: aiohttp.ClientSession,
    pbar: Optional[tqdm] = None,
) -> RequestFuncOutput:
    api_url = request_func_input.api_url
    assert api_url.endswith(("chat/completions", "profile")), (
        "OpenAI Chat Completions API URL must end with 'chat/completions'.")

    content = [{"type": "text", "text": request_func_input.prompt}]
    if request_func_input.multi_modal_content:
        mm_content = request_func_input.multi_modal_content
        if isinstance(mm_content, list):
            content.extend(mm_content)
        elif isinstance(mm_content, dict):
            content.append(mm_content)
        else:
            raise TypeError(
                "multi_modal_content must be a dict or list[dict] "
                "for openai-chat"
            )
    payload = {
        "model":
        request_func_input.model_name
        if request_func_input.model_name else request_func_input.model,
        "messages": [
            {
                "role": "user",
                "content": content
            },
        ],
        "temperature":
        0.0,
        "max_completion_tokens":
        request_func_input.output_len,
        "stream":
        True,
        "stream_options": {
            "include_usage": True,
        },
    }
    if request_func_input.ignore_eos:
        payload["ignore_eos"] = request_func_input.ignore_eos
    if request_func_input.extra_body:
        payload.update(request_func_input.extra_body)
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}",
    }
    if request_func_input.request_id:
        headers["x-request-id"] = request_func_input.request_id

    output = RequestFuncOutput()
    output.prompt_len = request_func_input.prompt_len

    generated_text = ""
    ttft = 0.0
    st = time.perf_counter()
    most_recent_timestamp = st
    try:
        async with session.post(url=api_url, json=payload,
                                headers=headers) as response:
            if response.status == 200:
                async for chunk_bytes in response.content:
                    chunk_bytes = chunk_bytes.strip()
                    if not chunk_bytes:
                        continue
                    chunk_bytes = chunk_bytes.decode("utf-8")
                    # NOTE: SSE comments (often used as pings) start with
                    # a colon. These are not JSON data payload and should
                    # be skipped.
                    if chunk_bytes.startswith(":"):
                        continue

                    chunk = chunk_bytes.removeprefix("data: ")

                    if chunk != "[DONE]":
                        timestamp = time.perf_counter()
                        data = json.loads(chunk)

                        if choices := data.get("choices"):
                            content = choices[0]["delta"].get("content")
                            # First token
                            if ttft == 0.0:
                                ttft = timestamp - st
                                output.ttft = ttft

                            # Decoding phase
                            else:
                                output.itl.append(timestamp -
                                                  most_recent_timestamp)

                            generated_text += content or ""
                        elif usage := data.get("usage"):
                            output.output_tokens = usage.get(
                                "completion_tokens")

                        most_recent_timestamp = timestamp

                output.generated_text = generated_text
                output.success = True
                output.latency = most_recent_timestamp - st
            else:
                output.error = response.reason or ""
                output.success = False
    except Exception:
        output.success = False
        exc_info = sys.exc_info()
        output.error = "".join(traceback.format_exception(*exc_info))

    if pbar:
        pbar.update(1)
    return output

async_request_openai_completions async

async_request_openai_completions(
    request_func_input: RequestFuncInput,
    session: ClientSession,
    pbar: Optional[tqdm] = None,
) -> RequestFuncOutput

The async request function for the OpenAI Completions API.

Parameters:

Name Type Description Default
request_func_input RequestFuncInput

The input for the request function.

required
pbar Optional[tqdm]

The progress bar to display the progress.

None

Returns:

Type Description
RequestFuncOutput

The output of the request function.

Source code in vllm/benchmarks/lib/endpoint_request_func.py
async def async_request_openai_completions(
    request_func_input: RequestFuncInput,
    session: aiohttp.ClientSession,
    pbar: Optional[tqdm] = None,
) -> RequestFuncOutput:
    """The async request function for the OpenAI Completions API.

    Args:
        request_func_input: The input for the request function.
        pbar: The progress bar to display the progress.

    Returns:
        The output of the request function.
    """
    api_url = request_func_input.api_url
    assert api_url.endswith(
        ("completions", "profile")
    ), "OpenAI Completions API URL must end with 'completions' or 'profile'."

    payload = {
        "model": request_func_input.model_name
        if request_func_input.model_name else request_func_input.model,
        "prompt": request_func_input.prompt,
        "temperature": 0.0,
        "repetition_penalty": 1.0,
        "max_tokens": request_func_input.output_len,
        "logprobs": request_func_input.logprobs,
        "stream": True,
        "stream_options": {
            "include_usage": True,
        },
    }
    if request_func_input.ignore_eos:
        payload["ignore_eos"] = request_func_input.ignore_eos
    if request_func_input.extra_body:
        payload.update(request_func_input.extra_body)
    headers = {
        "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}"
    }
    if request_func_input.request_id:
        headers["x-request-id"] = request_func_input.request_id

    output = RequestFuncOutput()
    output.prompt_len = request_func_input.prompt_len

    generated_text = ""
    st = time.perf_counter()
    most_recent_timestamp = st
    try:
        async with session.post(url=api_url, json=payload,
                                headers=headers) as response:
            if response.status == 200:
                first_chunk_received = False
                async for chunk_bytes in response.content:
                    chunk_bytes = chunk_bytes.strip()
                    if not chunk_bytes:
                        continue
                    chunk_bytes = chunk_bytes.decode("utf-8")
                    # NOTE: SSE comments (often used as pings) start with
                    # a colon. These are not JSON data payload and should
                    # be skipped.
                    if chunk_bytes.startswith(":"):
                        continue

                    chunk = chunk_bytes.removeprefix("data: ")

                    if chunk != "[DONE]":
                        data = json.loads(chunk)

                        # NOTE: Some completion API might have a last
                        # usage summary response without a token so we
                        # want to check a token was generated
                        if choices := data.get("choices"):
                            # Note that text could be empty here
                            # e.g. for special tokens
                            text = choices[0].get("text")
                            timestamp = time.perf_counter()
                            # First token
                            if not first_chunk_received:
                                first_chunk_received = True
                                ttft = time.perf_counter() - st
                                output.ttft = ttft

                            # Decoding phase
                            else:
                                output.itl.append(timestamp -
                                                  most_recent_timestamp)

                            most_recent_timestamp = timestamp
                            generated_text += text or ""
                        elif usage := data.get("usage"):
                            output.output_tokens = usage.get(
                                "completion_tokens")
                if first_chunk_received:
                    output.success = True
                else:
                    output.success = False
                    output.error = (
                        "Never received a valid chunk to calculate TTFT."
                        "This response will be marked as failed!")
                output.generated_text = generated_text
                output.latency = most_recent_timestamp - st
            else:
                output.error = response.reason or ""
                output.success = False
    except Exception:
        output.success = False
        exc_info = sys.exc_info()
        output.error = "".join(traceback.format_exception(*exc_info))

    if pbar:
        pbar.update(1)
    return output

async_request_openai_embeddings async

async_request_openai_embeddings(
    request_func_input: RequestFuncInput,
    session: ClientSession,
    pbar: Optional[tqdm] = None,
)
Source code in vllm/benchmarks/lib/endpoint_request_func.py
async def async_request_openai_embeddings(
    request_func_input: RequestFuncInput,
    session: aiohttp.ClientSession,
    pbar: Optional[tqdm] = None,
):
    api_url = request_func_input.api_url
    assert api_url.endswith(
        "embeddings"
    ), "OpenAI Embeddings API URL must end with 'embeddings'."

    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}",
    }

    payload = {
        "model": request_func_input.model,
        "input": request_func_input.prompt,
    }

    output = RequestFuncOutput()
    st = time.perf_counter()
    try:
        async with session.post(
            url=api_url,
            headers=headers,
            json=payload
        ) as response:
            if response.status == 200:
                output.latency = time.perf_counter() - st
                data = await response.json()
                output.success = True
                output.generated_text = ""
                output.prompt_len = data.get(
                    "usage", {}).get(
                    "prompt_tokens", 0)
            else:
                output.success = False
                output.error = response.reason or ""
    except Exception as e:
        output.success = False
        output.error = str(e)

    if pbar:
        pbar.update(1)
    return output