diff --git a/vllm/v1/core/sched/policy/normalized_scorer.py b/vllm/v1/core/sched/policy/normalized_scorer.py index 7b7e83cbbd708..145bd57ba5329 100644 --- a/vllm/v1/core/sched/policy/normalized_scorer.py +++ b/vllm/v1/core/sched/policy/normalized_scorer.py @@ -1,31 +1,36 @@ -from typing import List +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import math from vllm.logger import init_logger -import math - logger = init_logger(__name__) + class ScoreDim: """ Normalized scoring dimension. """ - def __init__(self, name: str, median: float, norm_scale=0.0, weight=0.5, reverse=False): + + def __init__( + self, name: str, median: float, norm_scale=0.0, weight=0.5, reverse=False + ): self.name = name self.median = median if norm_scale != 0.0: self.norm_scale = norm_scale else: - self.norm_scale = 1/median + self.norm_scale = 1 / median self.weight = weight self.reverse = reverse + class NormalizedScorer: """ Normalize unbounded N-dimensional values into a composite score using the Sigmoid function. """ - def __init__(self, dim_list: List[ScoreDim]) -> None: + def __init__(self, dim_list: list[ScoreDim]) -> None: """ :param dim_list: Scoring dimensions; each dimension must define a median reference point, scaling factor, and weight. """ @@ -50,32 +55,48 @@ class NormalizedScorer: Smaller value → higher score → use inverse Sigmoid. """ if len(dims) > self.dim_count: - raise ValueError(f"Dim num({len(dims)}) exceeds max num dim({self.dim_count})") + raise ValueError( + f"Dim num({len(dims)}) exceeds max num dim({self.dim_count})" + ) final_score = 0.0 for idx, dim_value in enumerate(dims): dim_info = self.dim_list[idx] if dim_info.reverse: - score = self._inv_sigmoid_normalize(dim_value, dim_info.median, dim_info.norm_scale) + score = self._inv_sigmoid_normalize( + dim_value, dim_info.median, dim_info.norm_scale + ) else: - score = self._sigmoid_normalize(dim_value, dim_info.median, dim_info.norm_scale) + score = self._sigmoid_normalize( + dim_value, dim_info.median, dim_info.norm_scale + ) logger.debug(f"{dim_info.name}({dim_info.reverse}) : {score:.10f}") # Weighted summation. final_score += score * dim_info.weight return max(0.0, min(1.0, final_score)) # Clamp to [0, 1]. + class TimeAndLengthScorer(NormalizedScorer): """ Scorer for time and length dimensions; defaults to forward scoring with equal weights (0.5 each). """ - def __init__(self, - time_median, length_median, - time_scale=0.0, length_scale=0.0, - time_weight=0.5, length_weight=0.5, - reverse_time=False, reverse_len=False) -> None: - dim_list = [ScoreDim("time", time_median, time_scale, time_weight, reverse_time), - ScoreDim("length", length_median, length_scale, length_weight, reverse_len)] + + def __init__( + self, + time_median, + length_median, + time_scale=0.0, + length_scale=0.0, + time_weight=0.5, + length_weight=0.5, + reverse_time=False, + reverse_len=False, + ) -> None: + dim_list = [ + ScoreDim("time", time_median, time_scale, time_weight, reverse_time), + ScoreDim("length", length_median, length_scale, length_weight, reverse_len), + ] super().__init__(dim_list) def score(self, time: float, length: float) -> float: diff --git a/vllm/v1/core/sched/policy/weighted_score_softer.py b/vllm/v1/core/sched/policy/weighted_score_softer.py index 17be66a9c6754..cfc6c7bc55bca 100644 --- a/vllm/v1/core/sched/policy/weighted_score_softer.py +++ b/vllm/v1/core/sched/policy/weighted_score_softer.py @@ -1,28 +1,44 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import time from functools import total_ordering from vllm.v1.core.sched.policy.normalized_scorer import TimeAndLengthScorer -import time TimeAndLengthScorer_Instance = None if TimeAndLengthScorer_Instance == None: - TimeAndLengthScorer_Instance = TimeAndLengthScorer(time_median=5, time_weight=0.5, length_median=32 * 1024, - length_weight=0.5, reverse_len=True) + TimeAndLengthScorer_Instance = TimeAndLengthScorer( + time_median=5, + time_weight=0.5, + length_median=32 * 1024, + length_weight=0.5, + reverse_len=True, + ) + + @total_ordering class WeightedScoreSorter: - def __init__(self, request_length: int, request_arrival_time: float, request_slo_requirement: list = None): + def __init__( + self, + request_length: int, + request_arrival_time: float, + request_slo_requirement: list = None, + ): self.request_length = request_length self.request_arrival_time = request_arrival_time self.request_slo_requirement = request_slo_requirement self.__update_stats() - def __lt__(self, other_request_weighted_score: 'WeightedScoreSorter') -> bool: + def __lt__(self, other_request_weighted_score: "WeightedScoreSorter") -> bool: self.__update_stats() return self.weighted_score > other_request_weighted_score.weighted_score - def __eq__(self, other_request_weighted_score: 'WeightedScoreSorter') -> bool: + def __eq__(self, other_request_weighted_score: "WeightedScoreSorter") -> bool: return self.weighted_score == other_request_weighted_score.weighted_score def __update_stats(self): self.wait_time = time.time() - self.request_arrival_time - self.weighted_score = TimeAndLengthScorer_Instance.score(self.wait_time, self.request_length) + self.weighted_score = TimeAndLengthScorer_Instance.score( + self.wait_time, self.request_length + ) diff --git a/vllm/v1/core/sched/request_queue.py b/vllm/v1/core/sched/request_queue.py index a6bdab44a26fa..9200ea524362a 100644 --- a/vllm/v1/core/sched/request_queue.py +++ b/vllm/v1/core/sched/request_queue.py @@ -7,8 +7,8 @@ from collections import deque from collections.abc import Iterable, Iterator from enum import Enum -from vllm.v1.request import Request from vllm.v1.core.sched.policy.weighted_score_softer import WeightedScoreSorter +from vllm.v1.request import Request class SchedulingPolicy(Enum): @@ -212,7 +212,7 @@ class PriorityRequestQueue(RequestQueue): class SJFRequestQueue(RequestQueue): """ A SJF queue that supports heap operations. - + Requests with a larger value of weighted score value are processed first. """ @@ -221,8 +221,15 @@ class SJFRequestQueue(RequestQueue): def add_request(self, request: Request) -> None: """Add a request to the queue according to SJF policy.""" - heapq.heappush(self._heap, - (WeightedScoreSorter(len(request.prompt_token_ids), request.arrival_time), request)) + heapq.heappush( + self._heap, + ( + WeightedScoreSorter( + len(request.prompt_token_ids), request.arrival_time + ), + request, + ), + ) def pop_request(self) -> Request: """Pop a request from the queue according to SJF policy.""" @@ -240,14 +247,14 @@ class SJFRequestQueue(RequestQueue): def prepend_request(self, request: Request) -> None: """Add a request to the queue according to SJF policy. - + Note: In a SJF queue, there is no concept of prepending to the front. Requests are ordered by (priority, arrival_time).""" self.add_request(request) def prepend_requests(self, requests: RequestQueue) -> None: """Add all requests from another queue according to SJF policy. - + Note: In a SJF queue, there is no concept of prepending to the front. Requests are ordered by weighted score.""" for request in requests: @@ -261,8 +268,7 @@ class SJFRequestQueue(RequestQueue): def remove_requests(self, requests: Iterable[Request]) -> None: """Remove multiple specific requests from the queue.""" requests_to_remove = set(requests) - self._heap = [(ws, r) for ws , r in self._heap - if r not in requests_to_remove] + self._heap = [(ws, r) for ws, r in self._heap if r not in requests_to_remove] heapq.heapify(self._heap) def __bool__(self) -> bool: @@ -282,7 +288,7 @@ class SJFRequestQueue(RequestQueue): def __reversed__(self) -> Iterator[Request]: """Iterate over the queue in reverse SJF order.""" - return reversed(list(self)) + return reversed(list(self)) def create_request_queue(policy: SchedulingPolicy) -> RequestQueue: