diff --git a/vllm/benchmarks/serve.py b/vllm/benchmarks/serve.py index 8b16fea9e3d3c..a4d51936320b9 100644 --- a/vllm/benchmarks/serve.py +++ b/vllm/benchmarks/serve.py @@ -138,31 +138,54 @@ async def get_request( input_requests = list(input_requests) total_requests = len(input_requests) - request_index = 0 + assert total_requests > 0, "No requests provided." - for request in input_requests: + # Precompute delays among requests to minimize request send laggings + request_rates = [] + delay_ts = [] + for request_index, request in enumerate(input_requests): current_request_rate = _get_current_request_rate(ramp_up_strategy, ramp_up_start_rps, ramp_up_end_rps, request_index, total_requests, request_rate) - - yield request, current_request_rate - - request_index += 1 - + request_rates.append(current_request_rate) if current_request_rate == float("inf"): - # If the request rate is infinity, then we don't need to wait. - continue + delay_ts.append(0) + else: + theta = 1.0 / (current_request_rate * burstiness) - theta = 1.0 / (current_request_rate * burstiness) + # Sample the request interval from the gamma distribution. + # If burstiness is 1, it follows exponential distribution. + delay_ts.append(np.random.gamma(shape=burstiness, scale=theta)) + + # Calculate the cumulative delay time from the first sent out requests. + for i in range(1, len(delay_ts)): + delay_ts[i] += delay_ts[i - 1] + if ramp_up_strategy is None and delay_ts[-1] != 0: + # When ramp_up_strategy is not set, we assume the request rate is fixed + # and all requests should be sent in target_total_delay_s, the following + # logic would re-scale delay time to ensure the final delay_ts + # align with target_total_delay_s. + # + # NOTE: If we simply accumulate the random delta values + # from the gamma distribution, their sum would have 1-2% gap + # from target_total_delay_s. The purpose of the following logic is to + # close the gap for stablizing the throughput data + # from different random seeds. + target_total_delay_s = total_requests / request_rate + normalize_factor = target_total_delay_s / delay_ts[-1] + delay_ts = [delay * normalize_factor for delay in delay_ts] - # Sample the request interval from the gamma distribution. - # If burstiness is 1, it follows exponential distribution. - interval = np.random.gamma(shape=burstiness, scale=theta) - # The next request will be sent after the interval. - await asyncio.sleep(interval) + start_ts = time.time() + request_index = 0 + for request_index, request in enumerate(input_requests): + current_ts = time.time() + sleep_interval_s = start_ts + delay_ts[request_index] - current_ts + if sleep_interval_s > 0: + await asyncio.sleep(sleep_interval_s) + yield request, request_rates[request_index] def calculate_metrics(