Skip to content

vllm.engine.multiprocessing.engine

HEALTHY_RESPONSE module-attribute

HEALTHY_RESPONSE = (dumps(VLLM_RPC_SUCCESS_STR),)

POLLING_TIMEOUT_MS module-attribute

POLLING_TIMEOUT_MS = 10000

logger module-attribute

logger = init_logger(__name__)

MQLLMEngine

A multiprocessing wrapper for LLMEngine.

This class is used to wrap the LLMEngine class to enable use in concurrnet manner. It runs a background loop and uses zeromq to receive new requests and stream outputs incrementally via ipc.

The LLMEngine generate or encode process is kicked off when a new RPCProcessRequest is received by the input_socket.

The self.engine_loop checks the input_socket for new requests, adds them to the LLMEngine if there are any, calls the internal LLMEngine.step(), and sends the RequestOutputs back over the output_socket.

If use_async_sockets is set, the logic associated with reading new requests from the socket and sending data to the socket is passed as a callback to the llm_engine, which calls the logic asynchronously such that the IPC can be overlapped with the GPU.

Parameters:

Name Type Description Default
ipc_path str

Base path for zeromq interprocess messaging

required
use_async_sockets bool

Whether to make send/recv async with GPU

required
log_requests bool

Whether to log the requests.

True
*args

Arguments for LLMEngine.

()
**kwargs

Arguments for LLMEngine.

{}
Source code in vllm/engine/multiprocessing/engine.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
class MQLLMEngine:
    """A multiprocessing wrapper for
    [`LLMEngine`][vllm.engine.llm_engine.LLMEngine].

    This class is used to wrap the
    [`LLMEngine`][vllm.engine.llm_engine.LLMEngine] class to enable use
    in concurrnet manner. It runs a background loop and uses zeromq to
    receive new requests and stream outputs incrementally via ipc.

    The [`LLMEngine`][vllm.engine.llm_engine.LLMEngine] generate or encode
    process is kicked off when a new RPCProcessRequest is received by the
    input_socket.

    The self.engine_loop checks the input_socket for new requests,
    adds them to the LLMEngine if there are any, calls the internal
    [`LLMEngine.step()`][vllm.engine.llm_engine.LLMEngine.step], and sends
    the RequestOutputs back over the output_socket.

    If use_async_sockets is set, the logic associated with reading new
    requests from the socket and sending data to the socket is passed
    as a callback to the llm_engine, which calls the logic asynchronously
    such that the IPC can be overlapped with the GPU.

    Args:
        ipc_path: Base path for zeromq interprocess messaging
        use_async_sockets: Whether to make send/recv async with GPU
        log_requests: Whether to log the requests.
        *args: Arguments for [`LLMEngine`][vllm.engine.llm_engine.LLMEngine].
        **kwargs: Arguments for [`LLMEngine`][vllm.engine.llm_engine.LLMEngine].
    """

    def __init__(self,
                 ipc_path: str,
                 use_async_sockets: bool,
                 *args,
                 log_requests: bool = True,
                 **kwargs) -> None:
        # For MQLLMEngine, we can use cached outputs, since each new request
        # output is immediately pickled and send over the socket, which frees
        # the python object to be reused again.
        kwargs['use_cached_outputs'] = True

        self.engine = LLMEngine(*args, **kwargs)
        self.log_requests = log_requests

        self.use_async_sockets = use_async_sockets
        if self.use_async_sockets:
            self.engine.process_request_outputs_callback = \
                self._async_socket_engine_callback

        self.ctx = zmq.Context()  # type: ignore[attr-defined]

        # Receive input from the client.
        self.input_socket = self.ctx.socket(zmq.constants.PULL)
        self.input_socket.bind(f"{ipc_path}{IPC_INPUT_EXT}")

        # Send output stream back to client.
        self.output_socket = self.ctx.socket(zmq.constants.PUSH)
        self.output_socket.bind(f"{ipc_path}{IPC_OUTPUT_EXT}")

        # Send heartbeats back to client.
        self.heartbeat_socket = self.ctx.socket(zmq.constants.PUSH)
        self.heartbeat_socket.bind(f"{ipc_path}{IPC_HEALTH_EXT}")

        # IPC path for the data socket.
        self.data_ipc_path = f"{ipc_path}{IPC_DATA_EXT}"

        # Error state.
        self._errored_with: Optional[BaseException] = None

    @property
    def dead_error(self) -> BaseException:
        if self._errored_with is not None:
            return ENGINE_DEAD_ERROR(self._errored_with)
        else:
            return ENGINE_DEAD_ERROR()

    @classmethod
    @deprecate_kwargs(
        "disable_log_requests",
        additional_message=("This argument will have no effect. "
                            "Use `enable_log_requests` instead."),
    )
    def from_vllm_config(
            cls,
            vllm_config: VllmConfig,
            usage_context: UsageContext,
            enable_log_requests: bool,
            disable_log_stats: bool,
            ipc_path: str,
            disable_log_requests: bool = True,  # Deprecated, will be removed
    ) -> "MQLLMEngine":
        # Setup plugins for each process
        from vllm.plugins import load_general_plugins
        load_general_plugins()

        use_async_sockets = vllm_config.model_config.use_async_output_proc

        return cls(
            vllm_config=vllm_config,
            executor_class=LLMEngine._get_executor_cls(vllm_config),
            ipc_path=ipc_path,
            usage_context=usage_context,
            use_async_sockets=use_async_sockets,
            log_requests=enable_log_requests,
            log_stats=(not disable_log_stats),
        )

    @staticmethod
    def from_engine_args(engine_args: AsyncEngineArgs,
                         usage_context: UsageContext, ipc_path: str):
        """Creates an MQLLMEngine from the engine arguments."""

        vllm_config = engine_args.create_engine_config(usage_context)
        return MQLLMEngine.from_vllm_config(
            ipc_path=ipc_path,
            vllm_config=vllm_config,
            usage_context=usage_context,
            enable_log_requests=engine_args.enable_log_requests,
            disable_log_stats=engine_args.disable_log_stats,
        )

    def start(self):
        try:
            try:
                logger.debug("Starting Startup Loop.")
                self.run_startup_loop()
                logger.debug("Starting Engine Loop.")
                self.run_engine_loop()
            except Exception as e:
                logger.exception(repr(e))
        except KeyboardInterrupt:
            logger.debug("Shutting down MQLLMEngine.")
        finally:
            logger.debug("MQLLMEngine is shut down.")
            self.cleanup()

    def cleanup(self):
        """Cleanup zeromq state on shutdown."""
        # Closes all sockets and destroys context.
        self.ctx.destroy(linger=0)
        del self.engine

    @contextmanager
    def make_data_socket(
            self) -> Iterator[zmq.Socket]:  # type: ignore[name-defined]
        socket = self.ctx.socket(zmq.constants.ROUTER)
        try:
            socket.bind(self.data_ipc_path)
            yield socket
        finally:
            socket.close(linger=0)

    def run_startup_loop(self) -> None:
        """Startup loop for sending data from Engine -> Client."""

        with self.make_data_socket() as socket:
            response: Union[RPCStartupResponse, BaseException]
            try:
                identity, message = socket.recv_multipart(copy=False)
                request: RPCStartupRequest = pickle.loads(message.buffer)

                # Handle the query from the Client.
                if request == RPCStartupRequest.IS_SERVER_READY:
                    tracing_enabled = self.engine.is_tracing_enabled()
                    response = RPCStartupResponse(
                        tracing_enabled=tracing_enabled)

            except Exception as e:
                response = e

            socket.send_multipart((identity, pickle.dumps(response)),
                                  copy=False)

    def run_engine_loop(self):
        """Core busy loop of the LLMEngine."""

        while True:
            if not self.engine.has_unfinished_requests():
                # Poll until there is work to do.
                while self.input_socket.poll(timeout=POLLING_TIMEOUT_MS) == 0:
                    # When there's no work, check on engine health and send
                    # health status back to client
                    self._health_check()
                    self.engine.do_log_stats()
                    logger.debug("Waiting for new requests in engine loop.")

            # Handle any input from the client.
            self.handle_new_input()

            # Engine step.
            request_outputs = self.engine_step()

            # Send request outputs (if async, done in engine_step callback).
            if not self.use_async_sockets:
                self._send_outputs(request_outputs)

    def engine_step(self) -> List[RequestOutput]:
        """Engine step wrapper with error handling."""
        try:
            return self.engine.step()
        except SystemExit:
            raise
        except InputProcessingError as e:
            # Special case where we handle an error preparing the inputs for
            # a single request in the batch
            rpc_err = RPCError(request_id=e.request_id,
                               is_engine_errored=False,
                               exception=e.__cause__)
            self._send_outputs(rpc_err)
            return []
        except BaseException as e:
            self._set_errored(e)
            rpc_err = RPCError(request_id=None,
                               is_engine_errored=True,
                               exception=e)
            self._send_outputs(rpc_err)
            raise e

    def handle_new_input(self):
        """Handle new input from the socket"""
        try:
            while self.input_socket.poll(timeout=0) != 0:
                frames = self.input_socket.recv_multipart(copy=False)
                request = pickle.loads(frames[0].buffer)

                if isinstance(request, RPCProcessRequest):
                    if len(frames) > 1:
                        # Use cloudpickle for logits processors
                        assert isinstance(request.params, SamplingParams)
                        lprocs = cloudpickle.loads(frames[1].buffer)
                        request.params.logits_processors = lprocs
                    self._handle_process_request(request)
                elif isinstance(request, RPCAbortRequest):
                    self._handle_abort_request(request)
                elif isinstance(request, RPCUProfileRequest):
                    if request == RPCUProfileRequest.START_PROFILE:
                        self.start_profile()
                    else:
                        self.stop_profile()
                elif isinstance(request, RPCLoadAdapterRequest):
                    self._handle_load_adapter_request(request)
                elif isinstance(request, RPCResetMultiModalCacheRequest):
                    self.reset_mm_cache()
                elif isinstance(request, RPCResetPrefixCacheRequest):
                    self.reset_prefix_cache()
                elif isinstance(request, RPCSleepRequest):
                    self.sleep(request.value)
                elif isinstance(request, RPCWakeUpRequest):
                    self.wake_up(request.tags)
                elif isinstance(request, RPCIsSleepingRequest):
                    self._handle_is_sleeping_request(request)
                else:
                    raise ValueError("Unknown RPCRequest Type: "
                                     f"{type(request)}")

        except Exception as e:
            self._set_errored(e)
            self._send_unhealthy(e)
            raise e from None

    def _handle_process_request(self, request: RPCProcessRequest):
        """Handle RPCProcessRequest by adding it to the LLMEngine."""
        request_id = request.request_id

        if self._errored_with is not None:
            rpc_err = RPCError(request_id=request_id,
                               is_engine_errored=True,
                               exception=ENGINE_DEAD_ERROR(self._errored_with))
            self._send_outputs(rpc_err)

        try:
            self.engine.add_request(request_id=request_id,
                                    prompt=request.prompt,
                                    params=request.params,
                                    lora_request=request.lora_request,
                                    trace_headers=request.trace_headers,
                                    priority=request.priority)

            if self.log_requests:
                logger.info("Added request %s.", request.request_id)

        except Exception as e:
            # We do not set self._errored = True here, since the error
            # is due to an issue adding this request to the engine,
            # rather than an issue with the engine itself.
            logger.debug("Failed to add request %s to engine. %s",
                         request.request_id, e)
            is_errored = self._errored_with is not None
            rpc_err = RPCError(request_id=request_id,
                               is_engine_errored=is_errored,
                               exception=e)
            self._send_outputs(rpc_err)

            # Remove request from the engine.
            self.engine.abort_request(request_id)

    def _handle_abort_request(self, request: RPCAbortRequest):
        self.engine.abort_request(request.request_id)
        if self.log_requests:
            logger.info("Aborted request %s.", request.request_id)

    def _handle_load_adapter_request(self, request: RPCLoadAdapterRequest):
        try:
            self.engine.add_lora(request.lora_request)
        except BaseException as e:
            # Send back an error if the adater fails to load
            rpc_err = RPCError(request_id=request.request_id,
                               is_engine_errored=False,
                               exception=e)
            self._send_outputs(rpc_err)
            return
        # Otherwise, send back the successful load message
        self._send_outputs(
            RPCAdapterLoadedResponse(request_id=request.request_id))

    def _handle_is_sleeping_request(self, request: RPCIsSleepingRequest):
        is_sleeping = self.is_sleeping()
        self._send_outputs(
            RPCIsSleepingResponse(request_id=request.request_id,
                                  is_sleeping=is_sleeping))

    def _health_check(self):
        # Send unhealthy if engine has already errored
        if self._errored_with is not None:
            self._send_unhealthy(self._errored_with)
        try:
            self.engine.check_health()
            self._send_healthy()
        except Exception as e:
            self._set_errored(e)
            self._send_unhealthy(e)

    def _send_outputs(self, outputs: REQUEST_OUTPUTS_T):
        """Send outputs back to the engine client. These can be:
        - Exceptions
        - A list of generation outputs
        - A response from loading a lora adapter
        """
        if outputs:
            try:
                from ray.exceptions import RayTaskError

                # RayTaskError might not pickelable here. We need to unpack the
                # underlying exception as the real exception in the output.
                if (isinstance(outputs, RPCError)
                        and isinstance(outputs.exception, RayTaskError)):
                    outputs.exception = outputs.exception.cause
            except ImportError:
                pass

            output_bytes = pickle.dumps(outputs)
            self.output_socket.send_multipart((output_bytes, ), copy=False)

    def _send_healthy(self):
        """Send HEALTHY message to RPCClient."""
        if not self.heartbeat_socket.closed:
            self.heartbeat_socket.send_multipart(HEALTHY_RESPONSE, copy=False)

    def _send_unhealthy(self, error: BaseException):
        """Send UNHEALTHY message to RPCClient."""
        if not self.heartbeat_socket.closed:
            error_bytes = pickle.dumps(error)
            self.heartbeat_socket.send_multipart((error_bytes, ), copy=False)

    def _async_socket_engine_callback(self,
                                      request_outputs: REQUEST_OUTPUTS_T):
        """Callback used by engine to make socket handling async with GPU."""
        self._send_outputs(request_outputs)
        self.handle_new_input()

    def _set_errored(self, e: BaseException):
        """Log and set errored status if this is the first issue."""
        if self._errored_with is None:
            self._errored_with = e

    def start_profile(self) -> None:
        self.engine.start_profile()

    def stop_profile(self) -> None:
        self.engine.stop_profile()

    def reset_mm_cache(self) -> bool:
        return self.engine.reset_mm_cache()

    def reset_prefix_cache(self) -> bool:
        return self.engine.reset_prefix_cache()

    def sleep(self, level: int = 1) -> None:
        self.engine.sleep(level)

    def wake_up(self, tags: Optional[list[str]] = None) -> None:
        self.engine.wake_up(tags)

    def is_sleeping(self) -> bool:
        return self.engine.is_sleeping()

_errored_with instance-attribute

_errored_with: Optional[BaseException] = None

ctx instance-attribute

ctx = Context()

data_ipc_path instance-attribute

data_ipc_path = f'{ipc_path}{IPC_DATA_EXT}'

dead_error property

dead_error: BaseException

engine instance-attribute

engine = LLMEngine(*args, **kwargs)

heartbeat_socket instance-attribute

heartbeat_socket = socket(PUSH)

input_socket instance-attribute

input_socket = socket(PULL)

log_requests instance-attribute

log_requests = log_requests

output_socket instance-attribute

output_socket = socket(PUSH)

use_async_sockets instance-attribute

use_async_sockets = use_async_sockets

__init__

__init__(
    ipc_path: str,
    use_async_sockets: bool,
    *args,
    log_requests: bool = True,
    **kwargs,
) -> None
Source code in vllm/engine/multiprocessing/engine.py
def __init__(self,
             ipc_path: str,
             use_async_sockets: bool,
             *args,
             log_requests: bool = True,
             **kwargs) -> None:
    # For MQLLMEngine, we can use cached outputs, since each new request
    # output is immediately pickled and send over the socket, which frees
    # the python object to be reused again.
    kwargs['use_cached_outputs'] = True

    self.engine = LLMEngine(*args, **kwargs)
    self.log_requests = log_requests

    self.use_async_sockets = use_async_sockets
    if self.use_async_sockets:
        self.engine.process_request_outputs_callback = \
            self._async_socket_engine_callback

    self.ctx = zmq.Context()  # type: ignore[attr-defined]

    # Receive input from the client.
    self.input_socket = self.ctx.socket(zmq.constants.PULL)
    self.input_socket.bind(f"{ipc_path}{IPC_INPUT_EXT}")

    # Send output stream back to client.
    self.output_socket = self.ctx.socket(zmq.constants.PUSH)
    self.output_socket.bind(f"{ipc_path}{IPC_OUTPUT_EXT}")

    # Send heartbeats back to client.
    self.heartbeat_socket = self.ctx.socket(zmq.constants.PUSH)
    self.heartbeat_socket.bind(f"{ipc_path}{IPC_HEALTH_EXT}")

    # IPC path for the data socket.
    self.data_ipc_path = f"{ipc_path}{IPC_DATA_EXT}"

    # Error state.
    self._errored_with: Optional[BaseException] = None

_async_socket_engine_callback

_async_socket_engine_callback(
    request_outputs: REQUEST_OUTPUTS_T,
)

Callback used by engine to make socket handling async with GPU.

Source code in vllm/engine/multiprocessing/engine.py
def _async_socket_engine_callback(self,
                                  request_outputs: REQUEST_OUTPUTS_T):
    """Callback used by engine to make socket handling async with GPU."""
    self._send_outputs(request_outputs)
    self.handle_new_input()

_handle_abort_request

_handle_abort_request(request: RPCAbortRequest)
Source code in vllm/engine/multiprocessing/engine.py
def _handle_abort_request(self, request: RPCAbortRequest):
    self.engine.abort_request(request.request_id)
    if self.log_requests:
        logger.info("Aborted request %s.", request.request_id)

_handle_is_sleeping_request

_handle_is_sleeping_request(request: RPCIsSleepingRequest)
Source code in vllm/engine/multiprocessing/engine.py
def _handle_is_sleeping_request(self, request: RPCIsSleepingRequest):
    is_sleeping = self.is_sleeping()
    self._send_outputs(
        RPCIsSleepingResponse(request_id=request.request_id,
                              is_sleeping=is_sleeping))

_handle_load_adapter_request

_handle_load_adapter_request(
    request: RPCLoadAdapterRequest,
)
Source code in vllm/engine/multiprocessing/engine.py
def _handle_load_adapter_request(self, request: RPCLoadAdapterRequest):
    try:
        self.engine.add_lora(request.lora_request)
    except BaseException as e:
        # Send back an error if the adater fails to load
        rpc_err = RPCError(request_id=request.request_id,
                           is_engine_errored=False,
                           exception=e)
        self._send_outputs(rpc_err)
        return
    # Otherwise, send back the successful load message
    self._send_outputs(
        RPCAdapterLoadedResponse(request_id=request.request_id))

_handle_process_request

_handle_process_request(request: RPCProcessRequest)

Handle RPCProcessRequest by adding it to the LLMEngine.

Source code in vllm/engine/multiprocessing/engine.py
def _handle_process_request(self, request: RPCProcessRequest):
    """Handle RPCProcessRequest by adding it to the LLMEngine."""
    request_id = request.request_id

    if self._errored_with is not None:
        rpc_err = RPCError(request_id=request_id,
                           is_engine_errored=True,
                           exception=ENGINE_DEAD_ERROR(self._errored_with))
        self._send_outputs(rpc_err)

    try:
        self.engine.add_request(request_id=request_id,
                                prompt=request.prompt,
                                params=request.params,
                                lora_request=request.lora_request,
                                trace_headers=request.trace_headers,
                                priority=request.priority)

        if self.log_requests:
            logger.info("Added request %s.", request.request_id)

    except Exception as e:
        # We do not set self._errored = True here, since the error
        # is due to an issue adding this request to the engine,
        # rather than an issue with the engine itself.
        logger.debug("Failed to add request %s to engine. %s",
                     request.request_id, e)
        is_errored = self._errored_with is not None
        rpc_err = RPCError(request_id=request_id,
                           is_engine_errored=is_errored,
                           exception=e)
        self._send_outputs(rpc_err)

        # Remove request from the engine.
        self.engine.abort_request(request_id)

_health_check

_health_check()
Source code in vllm/engine/multiprocessing/engine.py
def _health_check(self):
    # Send unhealthy if engine has already errored
    if self._errored_with is not None:
        self._send_unhealthy(self._errored_with)
    try:
        self.engine.check_health()
        self._send_healthy()
    except Exception as e:
        self._set_errored(e)
        self._send_unhealthy(e)

_send_healthy

_send_healthy()

Send HEALTHY message to RPCClient.

Source code in vllm/engine/multiprocessing/engine.py
def _send_healthy(self):
    """Send HEALTHY message to RPCClient."""
    if not self.heartbeat_socket.closed:
        self.heartbeat_socket.send_multipart(HEALTHY_RESPONSE, copy=False)

_send_outputs

_send_outputs(outputs: REQUEST_OUTPUTS_T)

Send outputs back to the engine client. These can be: - Exceptions - A list of generation outputs - A response from loading a lora adapter

Source code in vllm/engine/multiprocessing/engine.py
def _send_outputs(self, outputs: REQUEST_OUTPUTS_T):
    """Send outputs back to the engine client. These can be:
    - Exceptions
    - A list of generation outputs
    - A response from loading a lora adapter
    """
    if outputs:
        try:
            from ray.exceptions import RayTaskError

            # RayTaskError might not pickelable here. We need to unpack the
            # underlying exception as the real exception in the output.
            if (isinstance(outputs, RPCError)
                    and isinstance(outputs.exception, RayTaskError)):
                outputs.exception = outputs.exception.cause
        except ImportError:
            pass

        output_bytes = pickle.dumps(outputs)
        self.output_socket.send_multipart((output_bytes, ), copy=False)

_send_unhealthy

_send_unhealthy(error: BaseException)

Send UNHEALTHY message to RPCClient.

Source code in vllm/engine/multiprocessing/engine.py
def _send_unhealthy(self, error: BaseException):
    """Send UNHEALTHY message to RPCClient."""
    if not self.heartbeat_socket.closed:
        error_bytes = pickle.dumps(error)
        self.heartbeat_socket.send_multipart((error_bytes, ), copy=False)

_set_errored

_set_errored(e: BaseException)

Log and set errored status if this is the first issue.

Source code in vllm/engine/multiprocessing/engine.py
def _set_errored(self, e: BaseException):
    """Log and set errored status if this is the first issue."""
    if self._errored_with is None:
        self._errored_with = e

cleanup

cleanup()

Cleanup zeromq state on shutdown.

Source code in vllm/engine/multiprocessing/engine.py
def cleanup(self):
    """Cleanup zeromq state on shutdown."""
    # Closes all sockets and destroys context.
    self.ctx.destroy(linger=0)
    del self.engine

engine_step

engine_step() -> List[RequestOutput]

Engine step wrapper with error handling.

Source code in vllm/engine/multiprocessing/engine.py
def engine_step(self) -> List[RequestOutput]:
    """Engine step wrapper with error handling."""
    try:
        return self.engine.step()
    except SystemExit:
        raise
    except InputProcessingError as e:
        # Special case where we handle an error preparing the inputs for
        # a single request in the batch
        rpc_err = RPCError(request_id=e.request_id,
                           is_engine_errored=False,
                           exception=e.__cause__)
        self._send_outputs(rpc_err)
        return []
    except BaseException as e:
        self._set_errored(e)
        rpc_err = RPCError(request_id=None,
                           is_engine_errored=True,
                           exception=e)
        self._send_outputs(rpc_err)
        raise e

from_engine_args staticmethod

from_engine_args(
    engine_args: AsyncEngineArgs,
    usage_context: UsageContext,
    ipc_path: str,
)

Creates an MQLLMEngine from the engine arguments.

Source code in vllm/engine/multiprocessing/engine.py
@staticmethod
def from_engine_args(engine_args: AsyncEngineArgs,
                     usage_context: UsageContext, ipc_path: str):
    """Creates an MQLLMEngine from the engine arguments."""

    vllm_config = engine_args.create_engine_config(usage_context)
    return MQLLMEngine.from_vllm_config(
        ipc_path=ipc_path,
        vllm_config=vllm_config,
        usage_context=usage_context,
        enable_log_requests=engine_args.enable_log_requests,
        disable_log_stats=engine_args.disable_log_stats,
    )

from_vllm_config classmethod

from_vllm_config(
    vllm_config: VllmConfig,
    usage_context: UsageContext,
    enable_log_requests: bool,
    disable_log_stats: bool,
    ipc_path: str,
    disable_log_requests: bool = True,
) -> MQLLMEngine
Source code in vllm/engine/multiprocessing/engine.py
@classmethod
@deprecate_kwargs(
    "disable_log_requests",
    additional_message=("This argument will have no effect. "
                        "Use `enable_log_requests` instead."),
)
def from_vllm_config(
        cls,
        vllm_config: VllmConfig,
        usage_context: UsageContext,
        enable_log_requests: bool,
        disable_log_stats: bool,
        ipc_path: str,
        disable_log_requests: bool = True,  # Deprecated, will be removed
) -> "MQLLMEngine":
    # Setup plugins for each process
    from vllm.plugins import load_general_plugins
    load_general_plugins()

    use_async_sockets = vllm_config.model_config.use_async_output_proc

    return cls(
        vllm_config=vllm_config,
        executor_class=LLMEngine._get_executor_cls(vllm_config),
        ipc_path=ipc_path,
        usage_context=usage_context,
        use_async_sockets=use_async_sockets,
        log_requests=enable_log_requests,
        log_stats=(not disable_log_stats),
    )

handle_new_input

handle_new_input()

Handle new input from the socket

Source code in vllm/engine/multiprocessing/engine.py
def handle_new_input(self):
    """Handle new input from the socket"""
    try:
        while self.input_socket.poll(timeout=0) != 0:
            frames = self.input_socket.recv_multipart(copy=False)
            request = pickle.loads(frames[0].buffer)

            if isinstance(request, RPCProcessRequest):
                if len(frames) > 1:
                    # Use cloudpickle for logits processors
                    assert isinstance(request.params, SamplingParams)
                    lprocs = cloudpickle.loads(frames[1].buffer)
                    request.params.logits_processors = lprocs
                self._handle_process_request(request)
            elif isinstance(request, RPCAbortRequest):
                self._handle_abort_request(request)
            elif isinstance(request, RPCUProfileRequest):
                if request == RPCUProfileRequest.START_PROFILE:
                    self.start_profile()
                else:
                    self.stop_profile()
            elif isinstance(request, RPCLoadAdapterRequest):
                self._handle_load_adapter_request(request)
            elif isinstance(request, RPCResetMultiModalCacheRequest):
                self.reset_mm_cache()
            elif isinstance(request, RPCResetPrefixCacheRequest):
                self.reset_prefix_cache()
            elif isinstance(request, RPCSleepRequest):
                self.sleep(request.value)
            elif isinstance(request, RPCWakeUpRequest):
                self.wake_up(request.tags)
            elif isinstance(request, RPCIsSleepingRequest):
                self._handle_is_sleeping_request(request)
            else:
                raise ValueError("Unknown RPCRequest Type: "
                                 f"{type(request)}")

    except Exception as e:
        self._set_errored(e)
        self._send_unhealthy(e)
        raise e from None

is_sleeping

is_sleeping() -> bool
Source code in vllm/engine/multiprocessing/engine.py
def is_sleeping(self) -> bool:
    return self.engine.is_sleeping()

make_data_socket

make_data_socket() -> Iterator[Socket]
Source code in vllm/engine/multiprocessing/engine.py
@contextmanager
def make_data_socket(
        self) -> Iterator[zmq.Socket]:  # type: ignore[name-defined]
    socket = self.ctx.socket(zmq.constants.ROUTER)
    try:
        socket.bind(self.data_ipc_path)
        yield socket
    finally:
        socket.close(linger=0)

reset_mm_cache

reset_mm_cache() -> bool
Source code in vllm/engine/multiprocessing/engine.py
def reset_mm_cache(self) -> bool:
    return self.engine.reset_mm_cache()

reset_prefix_cache

reset_prefix_cache() -> bool
Source code in vllm/engine/multiprocessing/engine.py
def reset_prefix_cache(self) -> bool:
    return self.engine.reset_prefix_cache()

run_engine_loop

run_engine_loop()

Core busy loop of the LLMEngine.

Source code in vllm/engine/multiprocessing/engine.py
def run_engine_loop(self):
    """Core busy loop of the LLMEngine."""

    while True:
        if not self.engine.has_unfinished_requests():
            # Poll until there is work to do.
            while self.input_socket.poll(timeout=POLLING_TIMEOUT_MS) == 0:
                # When there's no work, check on engine health and send
                # health status back to client
                self._health_check()
                self.engine.do_log_stats()
                logger.debug("Waiting for new requests in engine loop.")

        # Handle any input from the client.
        self.handle_new_input()

        # Engine step.
        request_outputs = self.engine_step()

        # Send request outputs (if async, done in engine_step callback).
        if not self.use_async_sockets:
            self._send_outputs(request_outputs)

run_startup_loop

run_startup_loop() -> None

Startup loop for sending data from Engine -> Client.

Source code in vllm/engine/multiprocessing/engine.py
def run_startup_loop(self) -> None:
    """Startup loop for sending data from Engine -> Client."""

    with self.make_data_socket() as socket:
        response: Union[RPCStartupResponse, BaseException]
        try:
            identity, message = socket.recv_multipart(copy=False)
            request: RPCStartupRequest = pickle.loads(message.buffer)

            # Handle the query from the Client.
            if request == RPCStartupRequest.IS_SERVER_READY:
                tracing_enabled = self.engine.is_tracing_enabled()
                response = RPCStartupResponse(
                    tracing_enabled=tracing_enabled)

        except Exception as e:
            response = e

        socket.send_multipart((identity, pickle.dumps(response)),
                              copy=False)

sleep

sleep(level: int = 1) -> None
Source code in vllm/engine/multiprocessing/engine.py
def sleep(self, level: int = 1) -> None:
    self.engine.sleep(level)

start

start()
Source code in vllm/engine/multiprocessing/engine.py
def start(self):
    try:
        try:
            logger.debug("Starting Startup Loop.")
            self.run_startup_loop()
            logger.debug("Starting Engine Loop.")
            self.run_engine_loop()
        except Exception as e:
            logger.exception(repr(e))
    except KeyboardInterrupt:
        logger.debug("Shutting down MQLLMEngine.")
    finally:
        logger.debug("MQLLMEngine is shut down.")
        self.cleanup()

start_profile

start_profile() -> None
Source code in vllm/engine/multiprocessing/engine.py
def start_profile(self) -> None:
    self.engine.start_profile()

stop_profile

stop_profile() -> None
Source code in vllm/engine/multiprocessing/engine.py
def stop_profile(self) -> None:
    self.engine.stop_profile()

wake_up

wake_up(tags: Optional[list[str]] = None) -> None
Source code in vllm/engine/multiprocessing/engine.py
def wake_up(self, tags: Optional[list[str]] = None) -> None:
    self.engine.wake_up(tags)

run_mp_engine

run_mp_engine(
    vllm_config: VllmConfig,
    usage_context: UsageContext,
    ipc_path: str,
    disable_log_stats: bool,
    enable_log_requests: bool,
    engine_alive,
)
Source code in vllm/engine/multiprocessing/engine.py
def run_mp_engine(vllm_config: VllmConfig, usage_context: UsageContext,
                  ipc_path: str, disable_log_stats: bool,
                  enable_log_requests: bool, engine_alive):
    try:
        # Ensure we can serialize transformer config before spawning
        maybe_register_config_serialize_by_value()

        engine = MQLLMEngine.from_vllm_config(
            vllm_config=vllm_config,
            usage_context=usage_context,
            disable_log_stats=disable_log_stats,
            enable_log_requests=enable_log_requests,
            ipc_path=ipc_path)

        signal.signal(signal.SIGTERM, signal_handler)

        engine.start()

    except BaseException as e:
        logger.exception(e)
        engine_alive.value = False
        raise e from None

signal_handler

signal_handler(*_) -> None
Source code in vllm/engine/multiprocessing/engine.py
def signal_handler(*_) -> None:
    raise KeyboardInterrupt("MQLLMEngine terminated")