Helps track persistent batch state changes and build a batch update data structure for logitsprocs Assumptions: * All information about requests removed from persistent batch during a step is aggregated in self._removed through calls to self.removed_append() at the beginning of a step. This must happen before the first time that self.removed, self.pop_removed() or self.peek_removed() are invoked in a given step * After the first time that self.removed, self.pop_removed() or self.peek_removed() are read in a step, no new removals are registered using self.removed_append() * Elements of self._removed are never directly modified, added or removed (i.e. modification is only via self.removed_append() and self.pop_removed()) Guarantees under above assumptions: * self.removed is always sorted in descending order * self.pop_removed() and self.peek_removed() both return the lowest removed request index in the current step
Source code in vllm/v1/sample/logits_processor/state.py
| class BatchUpdateBuilder:
"""Helps track persistent batch state changes and build
a batch update data structure for logitsprocs
Assumptions:
* All information about requests removed from persistent batch
during a step is aggregated in self._removed through calls to
self.removed_append() at the beginning of a step. This must happen
before the first time that self.removed, self.pop_removed()
or self.peek_removed() are invoked in a given step
* After the first time that self.removed, self.pop_removed()
or self.peek_removed() are read in a step, no new removals
are registered using self.removed_append()
* Elements of self._removed are never directly modified, added or
removed (i.e. modification is only via self.removed_append() and
self.pop_removed())
Guarantees under above assumptions:
* self.removed is always sorted in descending order
* self.pop_removed() and self.peek_removed() both return
the lowest removed request index in the current step
"""
_removed: list[RemovedRequest]
_is_removed_sorted: bool
moved: list[MovedRequest]
added: list[AddedRequest]
def __init__(
self,
removed: Optional[list[RemovedRequest]] = None,
moved: Optional[list[MovedRequest]] = None,
added: Optional[list[AddedRequest]] = None,
) -> None:
self._removed = removed or []
self.moved = moved or []
self.added = added or []
self._is_removed_sorted = False
# Used to track changes in the pooling case
# where we don't populate the added list.
self.batch_changed = False
def _ensure_removed_sorted(self) -> None:
"""Sort removed request indices in
descending order.
Idempotent after first call in a
given step, until reset.
"""
if not self._is_removed_sorted:
self._removed.sort(reverse=True)
self._is_removed_sorted = True
@property
def removed(self) -> list[RemovedRequest]:
"""Removed request indices sorted in
descending order"""
self._ensure_removed_sorted()
return self._removed
def removed_append(self, index: int) -> None:
"""Register the removal of a request from the persistent batch.
Must not be called after the first time self.removed,
self.pop_removed() or self.peek_removed() are invoked.
Args:
index: request index
"""
if self._is_removed_sorted:
raise RuntimeError("Cannot register new removed request after"
" self.removed has been read.")
self._removed.append(index)
self.batch_changed = True
def has_removed(self) -> bool:
return bool(self._removed)
def peek_removed(self) -> Optional[int]:
"""Return lowest removed request index"""
if self.has_removed():
self._ensure_removed_sorted()
return self._removed[-1]
return None
def pop_removed(self) -> Optional[int]:
"""Pop lowest removed request index"""
if self.has_removed():
self._ensure_removed_sorted()
return self._removed.pop()
return None
def reset(self) -> bool:
"""Returns True if there were any changes to the batch."""
self._is_removed_sorted = False
self._removed.clear()
self.moved.clear()
self.added.clear()
batch_changed = self.batch_changed
self.batch_changed = False
return batch_changed
def get_and_reset(self, batch_size: int) -> Optional[BatchUpdate]:
"""Generate a logitsprocs batch update data structure and reset
internal batch update builder state.
Args:
batch_size: current persistent batch size
Returns:
Frozen logitsprocs batch update instance; `None` if no updates
"""
# Reset removal-sorting logic
self._is_removed_sorted = False
self.batch_changed = False
if not any((self._removed, self.moved, self.added)):
# No update; short-circuit
return None
# Build batch state update
batch_update = BatchUpdate(
batch_size=batch_size,
removed=self._removed,
moved=self.moved,
added=self.added,
)
self._removed = []
self.moved = []
self.added = []
return batch_update
|
_is_removed_sorted instance-attribute
_is_removed_sorted: bool = False
_removed instance-attribute
batch_changed instance-attribute
removed property
Removed request indices sorted in descending order
__init__
Source code in vllm/v1/sample/logits_processor/state.py
| def __init__(
self,
removed: Optional[list[RemovedRequest]] = None,
moved: Optional[list[MovedRequest]] = None,
added: Optional[list[AddedRequest]] = None,
) -> None:
self._removed = removed or []
self.moved = moved or []
self.added = added or []
self._is_removed_sorted = False
# Used to track changes in the pooling case
# where we don't populate the added list.
self.batch_changed = False
|
_ensure_removed_sorted
_ensure_removed_sorted() -> None
Sort removed request indices in descending order. Idempotent after first call in a given step, until reset.
Source code in vllm/v1/sample/logits_processor/state.py
| def _ensure_removed_sorted(self) -> None:
"""Sort removed request indices in
descending order.
Idempotent after first call in a
given step, until reset.
"""
if not self._is_removed_sorted:
self._removed.sort(reverse=True)
self._is_removed_sorted = True
|
get_and_reset
Generate a logitsprocs batch update data structure and reset internal batch update builder state.
Parameters:
Name | Type | Description | Default |
batch_size | int | current persistent batch size | required |
Returns:
Type | Description |
Optional[BatchUpdate] | Frozen logitsprocs batch update instance; None if no updates |
Source code in vllm/v1/sample/logits_processor/state.py
| def get_and_reset(self, batch_size: int) -> Optional[BatchUpdate]:
"""Generate a logitsprocs batch update data structure and reset
internal batch update builder state.
Args:
batch_size: current persistent batch size
Returns:
Frozen logitsprocs batch update instance; `None` if no updates
"""
# Reset removal-sorting logic
self._is_removed_sorted = False
self.batch_changed = False
if not any((self._removed, self.moved, self.added)):
# No update; short-circuit
return None
# Build batch state update
batch_update = BatchUpdate(
batch_size=batch_size,
removed=self._removed,
moved=self.moved,
added=self.added,
)
self._removed = []
self.moved = []
self.added = []
return batch_update
|
has_removed
Source code in vllm/v1/sample/logits_processor/state.py
| def has_removed(self) -> bool:
return bool(self._removed)
|
peek_removed
Return lowest removed request index
Source code in vllm/v1/sample/logits_processor/state.py
| def peek_removed(self) -> Optional[int]:
"""Return lowest removed request index"""
if self.has_removed():
self._ensure_removed_sorted()
return self._removed[-1]
return None
|
pop_removed
Pop lowest removed request index
Source code in vllm/v1/sample/logits_processor/state.py
| def pop_removed(self) -> Optional[int]:
"""Pop lowest removed request index"""
if self.has_removed():
self._ensure_removed_sorted()
return self._removed.pop()
return None
|
removed_append
removed_append(index: int) -> None
Register the removal of a request from the persistent batch.
Must not be called after the first time self.removed, self.pop_removed() or self.peek_removed() are invoked.
Parameters:
Name | Type | Description | Default |
index | int | | required |
Source code in vllm/v1/sample/logits_processor/state.py
| def removed_append(self, index: int) -> None:
"""Register the removal of a request from the persistent batch.
Must not be called after the first time self.removed,
self.pop_removed() or self.peek_removed() are invoked.
Args:
index: request index
"""
if self._is_removed_sorted:
raise RuntimeError("Cannot register new removed request after"
" self.removed has been read.")
self._removed.append(index)
self.batch_changed = True
|
reset
Returns True if there were any changes to the batch.
Source code in vllm/v1/sample/logits_processor/state.py
| def reset(self) -> bool:
"""Returns True if there were any changes to the batch."""
self._is_removed_sorted = False
self._removed.clear()
self.moved.clear()
self.added.clear()
batch_changed = self.batch_changed
self.batch_changed = False
return batch_changed
|