From 2cc5016a1929f290517f99c2c77e7b9e7413554e Mon Sep 17 00:00:00 2001 From: Michael Yao Date: Wed, 23 Jul 2025 18:37:25 +0800 Subject: [PATCH 1/5] [Docs] Clean up v1/metrics.md (#21449) Signed-off-by: windsonsea --- docs/design/v1/metrics.md | 165 +++++++++++++++++--------------------- 1 file changed, 73 insertions(+), 92 deletions(-) diff --git a/docs/design/v1/metrics.md b/docs/design/v1/metrics.md index e23308f2637cf..52cd320dd4e11 100644 --- a/docs/design/v1/metrics.md +++ b/docs/design/v1/metrics.md @@ -5,17 +5,17 @@ Ensure the v1 LLM Engine exposes a superset of the metrics available in v0. ## Objectives - Achieve parity of metrics between v0 and v1. -- The priority use case is accessing these metrics via Prometheus as this is what we expect to be used in production environments. -- Logging support - i.e. printing metrics to the info log - is provided for more ad-hoc testing, debugging, development, and exploratory use cases. +- The priority use case is accessing these metrics via Prometheus, as this is what we expect to be used in production environments. +- Logging support (i.e. printing metrics to the info log) is provided for more ad-hoc testing, debugging, development, and exploratory use cases. ## Background Metrics in vLLM can be categorized as follows: -1. Server-level metrics: these are global metrics that track the state and performance of the LLM engine. These are typically exposed as Gauges or Counters in Prometheus. -2. Request-level metrics: these are metrics that track the characteristics - e.g. size and timing - of individual requests. These are typically exposed as Histograms in Prometheus, and are often the SLO that an SRE monitoring vLLM will be tracking. +1. Server-level metrics: Global metrics that track the state and performance of the LLM engine. These are typically exposed as Gauges or Counters in Prometheus. +2. Request-level metrics: Metrics that track the characteristics (e.g. size and timing) of individual requests. These are typically exposed as Histograms in Prometheus and are often the SLOs that an SRE monitoring vLLM will be tracking. -The mental model is that the "Server-level Metrics" explain why the "Request-level Metrics" are what they are. +The mental model is that server-level metrics help explain the values of request-level metrics. ### v0 Metrics @@ -65,20 +65,20 @@ vLLM also provides [a reference example](../../examples/online_serving/prometheu The subset of metrics exposed in the Grafana dashboard gives us an indication of which metrics are especially important: -- `vllm:e2e_request_latency_seconds_bucket` - End to end request latency measured in seconds -- `vllm:prompt_tokens_total` - Prompt Tokens -- `vllm:generation_tokens_total` - Generation Tokens -- `vllm:time_per_output_token_seconds` - Inter token latency (Time Per Output Token, TPOT) in second. +- `vllm:e2e_request_latency_seconds_bucket` - End to end request latency measured in seconds. +- `vllm:prompt_tokens_total` - Prompt tokens. +- `vllm:generation_tokens_total` - Generation tokens. +- `vllm:time_per_output_token_seconds` - Inter-token latency (Time Per Output Token, TPOT) in seconds. - `vllm:time_to_first_token_seconds` - Time to First Token (TTFT) latency in seconds. -- `vllm:num_requests_running` (also, `_swapped` and `_waiting`) - Number of requests in RUNNING, WAITING, and SWAPPED state +- `vllm:num_requests_running` (also, `_swapped` and `_waiting`) - Number of requests in the RUNNING, WAITING, and SWAPPED states. - `vllm:gpu_cache_usage_perc` - Percentage of used cache blocks by vLLM. -- `vllm:request_prompt_tokens` - Request prompt length -- `vllm:request_generation_tokens` - request generation length -- `vllm:request_success_total` - Number of finished requests by their finish reason: either an EOS token was generated or the max sequence length was reached -- `vllm:request_queue_time_seconds` - Queue Time -- `vllm:request_prefill_time_seconds` - Requests Prefill Time -- `vllm:request_decode_time_seconds` - Requests Decode Time -- `vllm:request_max_num_generation_tokens` - Max Generation Token in Sequence Group +- `vllm:request_prompt_tokens` - Request prompt length. +- `vllm:request_generation_tokens` - Request generation length. +- `vllm:request_success_total` - Number of finished requests by their finish reason: either an EOS token was generated or the max sequence length was reached. +- `vllm:request_queue_time_seconds` - Queue time. +- `vllm:request_prefill_time_seconds` - Requests prefill time. +- `vllm:request_decode_time_seconds` - Requests decode time. +- `vllm:request_max_num_generation_tokens` - Max generation tokens in a sequence group. See [the PR which added this Dashboard](gh-pr:2316) for interesting and useful background on the choices made here. @@ -103,7 +103,7 @@ In v0, metrics are collected in the engine core process and we use multi-process ### Built in Python/Process Metrics -The following metrics are supported by default by `prometheus_client`, but the are not exposed with multiprocess mode is used: +The following metrics are supported by default by `prometheus_client`, but they are not exposed when multi-process mode is used: - `python_gc_objects_collected_total` - `python_gc_objects_uncollectable_total` @@ -158,6 +158,7 @@ In v1, we wish to move computation and overhead out of the engine core process to minimize the time between each forward pass. The overall idea of V1 EngineCore design is: + - EngineCore is the inner loop. Performance is most critical here - AsyncLLM is the outer loop. This is overlapped with GPU execution (ideally), so this is where any "overheads" should be if @@ -178,7 +179,7 @@ time" (`time.time()`) to calculate intervals as the former is unaffected by system clock changes (e.g. from NTP). It's also important to note that monotonic clocks differ between -processes - each process has its own reference. point. So it is +processes - each process has its own reference point. So it is meaningless to compare monotonic timestamps from different processes. Therefore, in order to calculate an interval, we must compare two @@ -343,14 +344,15 @@ vllm:time_to_first_token_seconds_bucket{le="0.1",model_name="meta-llama/Llama-3. vllm:time_to_first_token_seconds_count{model_name="meta-llama/Llama-3.1-8B-Instruct"} 140.0 ``` -Note - the choice of histogram buckets to be most useful to users -across a broad set of use cases is not straightforward and will -require refinement over time. +!!! note + The choice of histogram buckets to be most useful to users + across a broad set of use cases is not straightforward and will + require refinement over time. ### Cache Config Info -`prometheus_client` has support for [Info -metrics](https://prometheus.github.io/client_python/instrumenting/info/) +`prometheus_client` has support for +[Info metrics](https://prometheus.github.io/client_python/instrumenting/info/) which are equivalent to a `Gauge` whose value is permanently set to 1, but exposes interesting key/value pair information via labels. This is used for information about an instance that does not change - so it @@ -363,14 +365,11 @@ We use this concept for the `vllm:cache_config_info` metric: # HELP vllm:cache_config_info Information of the LLMEngine CacheConfig # TYPE vllm:cache_config_info gauge vllm:cache_config_info{block_size="16",cache_dtype="auto",calculate_kv_scales="False",cpu_offload_gb="0",enable_prefix_caching="False",gpu_memory_utilization="0.9",...} 1.0 - ``` -However, `prometheus_client` has [never supported Info metrics in -multiprocessing -mode](https://github.com/prometheus/client_python/pull/300) - for -[unclear -reasons](gh-pr:7279#discussion_r1710417152). We +However, `prometheus_client` has +[never supported Info metrics in multiprocessing mode](https://github.com/prometheus/client_python/pull/300) - +for [unclear reasons](gh-pr:7279#discussion_r1710417152). We simply use a `Gauge` metric set to 1 and `multiprocess_mode="mostrecent"` instead. @@ -395,11 +394,9 @@ distinguish between per-adapter counts. This should be revisited. Note that `multiprocess_mode="livemostrecent"` is used - the most recent metric is used, but only from currently running processes. -This was added in - and there is -[at least one known -user](https://github.com/kubernetes-sigs/gateway-api-inference-extension/pull/54). If -we revisit this design and deprecate the old metric, we should reduce +This was added in and there is +[at least one known user](https://github.com/kubernetes-sigs/gateway-api-inference-extension/pull/54). +If we revisit this design and deprecate the old metric, we should reduce the need for a significant deprecation period by making the change in v0 also and asking this project to move to the new metric. @@ -442,23 +439,20 @@ suddenly (from their perspective) when it is removed, even if there is an equivalent metric for them to use. As an example, see how `vllm:avg_prompt_throughput_toks_per_s` was -[deprecated](gh-pr:2764) (with a -comment in the code), -[removed](gh-pr:12383), and then -[noticed by a -user](gh-issue:13218). +[deprecated](gh-pr:2764) (with a comment in the code), +[removed](gh-pr:12383), and then [noticed by a user](gh-issue:13218). In general: -1) We should be cautious about deprecating metrics, especially since +1. We should be cautious about deprecating metrics, especially since it can be hard to predict the user impact. -2) We should include a prominent deprecation notice in the help string +2. We should include a prominent deprecation notice in the help string that is included in the `/metrics' output. -3) We should list deprecated metrics in user-facing documentation and +3. We should list deprecated metrics in user-facing documentation and release notes. -4) We should consider hiding deprecated metrics behind a CLI argument - in order to give administrators [an escape - hatch](https://kubernetes.io/docs/concepts/cluster-administration/system-metrics/#show-hidden-metrics) +4. We should consider hiding deprecated metrics behind a CLI argument + in order to give administrators + [an escape hatch](https://kubernetes.io/docs/concepts/cluster-administration/system-metrics/#show-hidden-metrics) for some time before deleting them. See the [deprecation policy](../../contributing/deprecation_policy.md) for @@ -474,7 +468,7 @@ removed. The `vllm:time_in_queue_requests` Histogram metric was added by and its calculation is: -``` +```python self.metrics.first_scheduled_time = now self.metrics.time_in_queue = now - self.metrics.arrival_time ``` @@ -482,7 +476,7 @@ The `vllm:time_in_queue_requests` Histogram metric was added by Two weeks later, added `vllm:request_queue_time_seconds` leaving us with: -``` +```python if seq_group.is_finished(): if (seq_group.metrics.first_scheduled_time is not None and seq_group.metrics.first_token_time is not None): @@ -517,8 +511,7 @@ cache to complete other requests), we swap kv cache blocks out to CPU memory. This is also known as "KV cache offloading" and is configured with `--swap-space` and `--preemption-mode`. -In v0, [vLLM has long supported beam -search](gh-issue:6226). The +In v0, [vLLM has long supported beam search](gh-issue:6226). The SequenceGroup encapsulated the idea of N Sequences which all shared the same prompt kv blocks. This enabled KV cache block sharing between requests, and copy-on-write to do branching. CPU @@ -530,9 +523,8 @@ option than CPU swapping since blocks can be evicted slowly on demand and the part of the prompt that was evicted can be recomputed. SequenceGroup was removed in V1, although a replacement will be -required for "parallel sampling" (`n>1`). [Beam search was moved out of -the core (in -V0)](gh-issue:8306). There was a +required for "parallel sampling" (`n>1`). +[Beam search was moved out of the core (in V0)](gh-issue:8306). There was a lot of complex code for a very uncommon feature. In V1, with prefix caching being better (zero over head) and therefore @@ -547,18 +539,18 @@ Some v0 metrics are only relevant in the context of "parallel sampling". This is where the `n` parameter in a request is used to request multiple completions from the same prompt. -As part of adding parallel sampling support in we should +As part of adding parallel sampling support in , we should also add these metrics. - `vllm:request_params_n` (Histogram) -Observes the value of the 'n' parameter of every finished request. + Observes the value of the 'n' parameter of every finished request. - `vllm:request_max_num_generation_tokens` (Histogram) -Observes the maximum output length of all sequences in every finished -sequence group. In the absence of parallel sampling, this is -equivalent to `vllm:request_generation_tokens`. + Observes the maximum output length of all sequences in every finished + sequence group. In the absence of parallel sampling, this is + equivalent to `vllm:request_generation_tokens`. ### Speculative Decoding @@ -576,26 +568,23 @@ There is a PR under review () to add "prompt lookup (ngram)" seculative decoding to v1. Other techniques will follow. We should revisit the v0 metrics in this context. -Note - we should probably expose acceptance rate as separate accepted -and draft counters, like we do for prefix caching hit rate. Efficiency -likely also needs similar treatment. +!!! note + We should probably expose acceptance rate as separate accepted + and draft counters, like we do for prefix caching hit rate. Efficiency + likely also needs similar treatment. ### Autoscaling and Load-balancing A common use case for our metrics is to support automated scaling of vLLM instances. -For related discussion from the [Kubernetes Serving Working -Group](https://github.com/kubernetes/community/tree/master/wg-serving), +For related discussion from the +[Kubernetes Serving Working Group](https://github.com/kubernetes/community/tree/master/wg-serving), see: -- [Standardizing Large Model Server Metrics in - Kubernetes](https://docs.google.com/document/d/1SpSp1E6moa4HSrJnS4x3NpLuj88sMXr2tbofKlzTZpk) -- [Benchmarking LLM Workloads for Performance Evaluation and - Autoscaling in - Kubernetes](https://docs.google.com/document/d/1k4Q4X14hW4vftElIuYGDu5KDe2LtV1XammoG-Xi3bbQ) -- [Inference - Perf](https://github.com/kubernetes-sigs/wg-serving/tree/main/proposals/013-inference-perf) +- [Standardizing Large Model Server Metrics in Kubernetes](https://docs.google.com/document/d/1SpSp1E6moa4HSrJnS4x3NpLuj88sMXr2tbofKlzTZpk) +- [Benchmarking LLM Workloads for Performance Evaluation and Autoscaling in Kubernetes](https://docs.google.com/document/d/1k4Q4X14hW4vftElIuYGDu5KDe2LtV1XammoG-Xi3bbQ) +- [Inference Perf](https://github.com/kubernetes-sigs/wg-serving/tree/main/proposals/013-inference-perf) - and . This is a non-trivial topic. Consider this comment from Rob: @@ -619,19 +608,16 @@ should judge an instance as approaching saturation: Our approach to naming metrics probably deserves to be revisited: -1. The use of colons in metric names seems contrary to ["colons are - reserved for user defined recording - rules"](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels) +1. The use of colons in metric names seems contrary to + ["colons are reserved for user defined recording rules"](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). 2. Most of our metrics follow the convention of ending with units, but not all do. 3. Some of our metric names end with `_total`: -``` -If there is a suffix of `_total` on the metric name, it will be removed. When -exposing the time series for counter, a `_total` suffix will be added. This is -for compatibility between OpenMetrics and the Prometheus text format, as OpenMetrics -requires the `_total` suffix. -``` + If there is a suffix of `_total` on the metric name, it will be removed. When + exposing the time series for counter, a `_total` suffix will be added. This is + for compatibility between OpenMetrics and the Prometheus text format, as OpenMetrics + requires the `_total` suffix. ### Adding More Metrics @@ -642,8 +628,7 @@ There is no shortage of ideas for new metrics: - Proposals arising from specific use cases, like the Kubernetes auto-scaling topic above - Proposals that might arise out of standardisation efforts like - [OpenTelemetry Semantic Conventions for Gen - AI](https://github.com/open-telemetry/semantic-conventions/tree/main/docs/gen-ai). + [OpenTelemetry Semantic Conventions for Gen AI](https://github.com/open-telemetry/semantic-conventions/tree/main/docs/gen-ai). We should be cautious in our approach to adding new metrics. While metrics are often relatively straightforward to add: @@ -668,18 +653,14 @@ fall under the more general heading of "Observability". v0 has support for OpenTelemetry tracing: - Added by -- Configured with `--oltp-traces-endpoint` and - `--collect-detailed-traces` -- [OpenTelemetry blog - post](https://opentelemetry.io/blog/2024/llm-observability/) +- Configured with `--oltp-traces-endpoint` and `--collect-detailed-traces` +- [OpenTelemetry blog post](https://opentelemetry.io/blog/2024/llm-observability/) - [User-facing docs](../../examples/online_serving/opentelemetry.md) -- [Blog - post](https://medium.com/@ronen.schaffer/follow-the-trail-supercharging-vllm-with-opentelemetry-distributed-tracing-aa655229b46f) -- [IBM product - docs](https://www.ibm.com/docs/en/instana-observability/current?topic=mgaa-monitoring-large-language-models-llms-vllm-public-preview) +- [Blog post](https://medium.com/@ronen.schaffer/follow-the-trail-supercharging-vllm-with-opentelemetry-distributed-tracing-aa655229b46f) +- [IBM product docs](https://www.ibm.com/docs/en/instana-observability/current?topic=mgaa-monitoring-large-language-models-llms-vllm-public-preview) -OpenTelemetry has a [Gen AI Working -Group](https://github.com/open-telemetry/community/blob/main/projects/gen-ai.md). +OpenTelemetry has a +[Gen AI Working Group](https://github.com/open-telemetry/community/blob/main/projects/gen-ai.md). Since metrics is a big enough topic on its own, we are going to tackle the topic of tracing in v1 separately. @@ -698,7 +679,7 @@ These metrics are only enabled when OpenTelemetry tracing is enabled and if `--collect-detailed-traces=all/model/worker` is used. The documentation for this option states: -> collect detailed traces for the specified "modules. This involves +> collect detailed traces for the specified modules. This involves > use of possibly costly and or blocking operations and hence might > have a performance impact. From 2671334d45ea96ca57938cc765ba26cdb796d067 Mon Sep 17 00:00:00 2001 From: Asher Date: Wed, 23 Jul 2025 18:54:08 +0800 Subject: [PATCH 2/5] [Model] add Hunyuan V1 Dense Model support. (#21368) Signed-off-by: Asher Zhang --- docs/models/supported_models.md | 1 + tests/models/registry.py | 2 + .../{hunyuan_v1_moe.py => hunyuan_v1.py} | 70 ++++++++++++++----- vllm/model_executor/models/registry.py | 3 +- 4 files changed, 57 insertions(+), 19 deletions(-) rename vllm/model_executor/models/{hunyuan_v1_moe.py => hunyuan_v1.py} (95%) diff --git a/docs/models/supported_models.md b/docs/models/supported_models.md index bbb52f035c727..c8b6c6c861209 100644 --- a/docs/models/supported_models.md +++ b/docs/models/supported_models.md @@ -363,6 +363,7 @@ th { | `GraniteMoeSharedForCausalLM` | Granite MoE Shared | `ibm-research/moe-7b-1b-active-shared-experts` (test model) | ✅︎ | ✅︎ | ✅︎ | | `GritLM` | GritLM | `parasail-ai/GritLM-7B-vllm`. | ✅︎ | ✅︎ | | | `Grok1ModelForCausalLM` | Grok1 | `hpcai-tech/grok-1`. | ✅︎ | ✅︎ | ✅︎ | +| `HunYuanDenseV1ForCausalLM` | Hunyuan-7B-Instruct-0124 | `tencent/Hunyuan-7B-Instruct-0124` | ✅︎ | | ✅︎ | | `HunYuanMoEV1ForCausalLM` | Hunyuan-80B-A13B | `tencent/Hunyuan-A13B-Instruct`, `tencent/Hunyuan-A13B-Pretrain`, `tencent/Hunyuan-A13B-Instruct-FP8`, etc. | ✅︎ | | ✅︎ | | `InternLMForCausalLM` | InternLM | `internlm/internlm-7b`, `internlm/internlm-chat-7b`, etc. | ✅︎ | ✅︎ | ✅︎ | | `InternLM2ForCausalLM` | InternLM2 | `internlm/internlm2-7b`, `internlm/internlm2-chat-7b`, etc. | ✅︎ | ✅︎ | ✅︎ | diff --git a/tests/models/registry.py b/tests/models/registry.py index 1eb7f7b9d8296..84ca0bc60003e 100644 --- a/tests/models/registry.py +++ b/tests/models/registry.py @@ -199,6 +199,8 @@ _TEXT_GENERATION_EXAMPLE_MODELS = { trust_remote_code=True), "HunYuanMoEV1ForCausalLM": _HfExamplesInfo("tencent/Hunyuan-A13B-Instruct", trust_remote_code=True), + "HunYuanDenseV1ForCausalLM":_HfExamplesInfo("tencent/Hunyuan-7B-Instruct-0124", + trust_remote_code=True), "InternLMForCausalLM": _HfExamplesInfo("internlm/internlm-chat-7b", trust_remote_code=True), "InternLM2ForCausalLM": _HfExamplesInfo("internlm/internlm2-chat-7b", diff --git a/vllm/model_executor/models/hunyuan_v1_moe.py b/vllm/model_executor/models/hunyuan_v1.py similarity index 95% rename from vllm/model_executor/models/hunyuan_v1_moe.py rename to vllm/model_executor/models/hunyuan_v1.py index b3baec98b0fc0..fbba849a76f23 100644 --- a/vllm/model_executor/models/hunyuan_v1_moe.py +++ b/vllm/model_executor/models/hunyuan_v1.py @@ -61,6 +61,19 @@ from .utils import (AutoWeightsLoader, PPMissingLayer, is_pp_missing_parameter, make_layers) +def _is_moe(config: PretrainedConfig) -> bool: + num_experts = getattr(config, "num_experts", None) + if isinstance(num_experts, int): + return num_experts > 1 + if isinstance(num_experts, list) and num_experts: + # Ensure all elements are integers before calling max. + if all(isinstance(e, int) for e in num_experts): + return max(num_experts) > 1 + else: + return False + return False + + def _get_cla_factor(config: PretrainedConfig) -> int: if not getattr(config, "use_cla", False): return 1 @@ -140,8 +153,8 @@ class HunYuanAttention(nn.Module): # the KV heads across multiple tensor parallel GPUs. assert tp_size % self.total_num_kv_heads == 0 self.num_kv_heads = max(1, self.total_num_kv_heads // tp_size) - # MistralConfig has an optional head_dim introduced by Mistral-Nemo - if hasattr(config, "head_dim"): + + if hasattr(config, "head_dim") and config.head_dim: self.head_dim = config.head_dim elif hasattr(config, "attention_head_dim"): self.head_dim = config.attention_head_dim @@ -490,12 +503,23 @@ class HunYuanDecoderLayer(nn.Module): else: raise RuntimeError(f"Unsupported attention type: {attention_type}") - self.mlp = HunYuanSparseMoeBlock( - config=config, - quant_config=quant_config, - layer_id=layer_id, - prefix=f"{prefix}.mlp", - ) + if _is_moe(config): + self.mlp = HunYuanSparseMoeBlock( + config=config, + quant_config=quant_config, + layer_id=layer_id, + prefix=f"{prefix}.mlp", + ) + else: + self.mlp = HunYuanMLP( + hidden_size=self.hidden_size, + intermediate_size=self.intermediate_size, + hidden_act=config.hidden_act, + quant_config=quant_config, + bias=getattr(config, "mlp_bias", False), + prefix=f"{prefix}.mlp", + ) + self.input_layernorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps) self.post_attention_layernorm = RMSNorm(config.hidden_size, @@ -642,15 +666,17 @@ class HunYuanModel(nn.Module): return torch.concat((q, k, v)) def get_expert_mapping(self) -> list[tuple[str, str, int, str]]: - - # Params for weights, fp8 weight scales, fp8 activation scales - # (param_name, weight_name, expert_id, shard_id) - return FusedMoE.make_expert_params_mapping( - ckpt_gate_proj_name="gate_proj", - ckpt_down_proj_name="down_proj", - ckpt_up_proj_name="up_proj", - num_experts=self.config.num_experts, - ) + if _is_moe(self.config): + # Params for weights, fp8 weight scales, fp8 activation scales + # (param_name, weight_name, expert_id, shard_id) + return FusedMoE.make_expert_params_mapping( + ckpt_gate_proj_name="gate_proj", + ckpt_down_proj_name="down_proj", + ckpt_up_proj_name="up_proj", + num_experts=self.config.num_experts, + ) + else: + return [] def load_weights(self, weights: Iterable[tuple[str, torch.Tensor]]): cla_factor = _get_cla_factor(self.config) @@ -815,7 +841,7 @@ class HunYuanModel(nn.Module): return loaded_params -class HunYuanMoEV1ForCausalLM(nn.Module, SupportsLoRA): +class HunYuanV1Base(nn.Module, SupportsLoRA): packed_modules_mapping = { "qkv_proj": [ "q_proj", @@ -901,3 +927,11 @@ class HunYuanMoEV1ForCausalLM(nn.Module, SupportsLoRA): def get_expert_mapping(self) -> list[tuple[str, str, int, str]]: return self.model.get_expert_mapping() + + +class HunYuanDenseV1ForCausalLM(HunYuanV1Base): + pass + + +class HunYuanMoEV1ForCausalLM(HunYuanV1Base): + pass diff --git a/vllm/model_executor/models/registry.py b/vllm/model_executor/models/registry.py index 100532943c2bf..fafb6a704383b 100644 --- a/vllm/model_executor/models/registry.py +++ b/vllm/model_executor/models/registry.py @@ -79,7 +79,8 @@ _TEXT_GENERATION_MODELS = { "GraniteMoeSharedForCausalLM": ("granitemoeshared", "GraniteMoeSharedForCausalLM"), # noqa: E501 "GritLM": ("gritlm", "GritLM"), "Grok1ModelForCausalLM": ("grok1", "Grok1ForCausalLM"), - "HunYuanMoEV1ForCausalLM": ("hunyuan_v1_moe", "HunYuanMoEV1ForCausalLM"), + "HunYuanMoEV1ForCausalLM": ("hunyuan_v1", "HunYuanMoEV1ForCausalLM"), + "HunYuanDenseV1ForCausalLM": ("hunyuan_v1", "HunYuanDenseV1ForCausalLM"), "InternLMForCausalLM": ("llama", "LlamaForCausalLM"), "InternLM2ForCausalLM": ("internlm2", "InternLM2ForCausalLM"), "InternLM2VEForCausalLM": ("internlm2_ve", "InternLM2VEForCausalLM"), From f59ec35b7f9ff5b1da8aae12e10b83154685c958 Mon Sep 17 00:00:00 2001 From: Cyrus Leung Date: Wed, 23 Jul 2025 20:53:26 +0800 Subject: [PATCH 3/5] [V1] Check all pooling tasks during profiling (#21299) Signed-off-by: DarkLight1337 --- vllm/sequence.py | 7 ++++ vllm/v1/worker/gpu_model_runner.py | 63 +++++++++++++++++++----------- 2 files changed, 47 insertions(+), 23 deletions(-) diff --git a/vllm/sequence.py b/vllm/sequence.py index 99208fbad65fe..1f507add0d91b 100644 --- a/vllm/sequence.py +++ b/vllm/sequence.py @@ -1173,6 +1173,10 @@ class PoolingSequenceGroupOutput( # The actual type is in SequenceGroup.pooled_data data: Any + def get_data_nbytes(self) -> int: + data: torch.Tensor = self.data + return data.nbytes + def __repr__(self) -> str: return f"PoolingSequenceGroupOutput(data={self.data}" @@ -1234,6 +1238,9 @@ class PoolerOutput( """The output from a pooling operation in the pooling model.""" outputs: list[PoolingSequenceGroupOutput] + def get_data_nbytes(self) -> int: + return sum(o.get_data_nbytes() for o in self.outputs) + def __getitem__(self, idx: int) -> PoolingSequenceGroupOutput: return self.outputs[idx] diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index 6a42e01f14b01..2078fedac9223 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -41,7 +41,7 @@ from vllm.multimodal.inputs import MultiModalKwargs, PlaceholderRange from vllm.multimodal.utils import group_mm_inputs_by_modality from vllm.pooling_params import PoolingParams, PoolingTask from vllm.sampling_params import SamplingType -from vllm.sequence import IntermediateTensors +from vllm.sequence import IntermediateTensors, PoolerOutput from vllm.utils import (STR_DTYPE_TO_TORCH_DTYPE, DeviceMemoryProfiler, GiB_bytes, LazyLoader, check_use_alibi, get_dtype_size, is_pin_memory_available, round_up) @@ -1819,7 +1819,7 @@ class GPUModelRunner(LoRAModelRunnerMixin): old_global_expert_indices = None rank_mapping = None - with DeviceMemoryProfiler() as m: # noqa: SIM117 + with DeviceMemoryProfiler() as m: time_before_load = time.perf_counter() model_loader = get_model_loader(self.load_config) if not hasattr(self, "model"): @@ -2215,12 +2215,11 @@ class GPUModelRunner(LoRAModelRunnerMixin): ) return sampler_output - @torch.inference_mode() - def _dummy_pooler_run( + def _dummy_pooler_run_task( self, hidden_states: torch.Tensor, - ) -> torch.Tensor: - + task: PoolingTask, + ) -> PoolerOutput: num_tokens = hidden_states.shape[0] max_num_reqs = self.scheduler_config.max_num_seqs num_reqs = min(num_tokens, max_num_reqs) @@ -2232,37 +2231,55 @@ class GPUModelRunner(LoRAModelRunnerMixin): hidden_states_list = list( torch.split(hidden_states, num_scheduled_tokens_list)) - req_num_tokens = num_tokens // num_reqs - model = cast(VllmModelForPooling, self.model) - dummy_task = self.get_supported_pooling_tasks()[0] - dummy_pooling_params = PoolingParams(task=dummy_task) + dummy_prompt_lens = torch.tensor( + [h.shape[0] for h in hidden_states_list], + device=self.device, + ) + dummy_token_ids = torch.zeros((num_reqs, req_num_tokens), + dtype=torch.int32, + device=self.device) - to_update = model.pooler.get_pooling_updates(dummy_task) + model = cast(VllmModelForPooling, self.model) + dummy_pooling_params = PoolingParams(task=task) + to_update = model.pooler.get_pooling_updates(task) to_update.apply(dummy_pooling_params) dummy_metadata = PoolingMetadata( - prompt_lens=torch.tensor([h.shape[0] for h in hidden_states_list], - device=self.device), - prompt_token_ids=torch.zeros((num_reqs, req_num_tokens), - dtype=torch.int32, - device=self.device), - pooling_params=[dummy_pooling_params] * num_reqs) + prompt_lens=dummy_prompt_lens, + prompt_token_ids=dummy_token_ids, + pooling_params=[dummy_pooling_params] * num_reqs, + ) try: - pooler_output = model.pooler(hidden_states=hidden_states_list, - pooling_metadata=dummy_metadata) + return model.pooler(hidden_states=hidden_states_list, + pooling_metadata=dummy_metadata) except RuntimeError as e: if 'out of memory' in str(e): raise RuntimeError( - "CUDA out of memory occurred when warming up pooler with " - f"{num_reqs} dummy requests. Please try lowering " - "`max_num_seqs` or `gpu_memory_utilization` when " + "CUDA out of memory occurred when warming up pooler " + f"({task=}) with {num_reqs} dummy requests. Please try " + "lowering `max_num_seqs` or `gpu_memory_utilization` when " "initializing the engine.") from e else: raise e - return pooler_output + + @torch.inference_mode() + def _dummy_pooler_run( + self, + hidden_states: torch.Tensor, + ) -> PoolerOutput: + # Find the task that has the largest output for subsequent steps + output_size = dict[PoolingTask, float]() + for task in self.get_supported_pooling_tasks(): + # Run a full batch with each task to ensure none of them OOMs + output = self._dummy_pooler_run_task(hidden_states, task) + output_size[task] = output.get_data_nbytes() + del output # Allow GC + + max_task = max(output_size.items(), key=lambda x: x[1])[0] + return self._dummy_pooler_run_task(hidden_states, max_task) def profile_run(self) -> None: # Profile with multimodal encoder & encoder cache. From 7c734ee09b0a40681ba49d3d7ef5517ddb106074 Mon Sep 17 00:00:00 2001 From: Tao He Date: Wed, 23 Jul 2025 21:34:37 +0800 Subject: [PATCH 4/5] [Bugfix][Qwen][DCA] fixes bug in dual-chunk-flash-attn backend for qwen 1m models. (#21364) Signed-off-by: Tao He --- vllm/attention/backends/dual_chunk_flash_attn.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/vllm/attention/backends/dual_chunk_flash_attn.py b/vllm/attention/backends/dual_chunk_flash_attn.py index e108646e7ffb5..fa6f3f1b39cca 100644 --- a/vllm/attention/backends/dual_chunk_flash_attn.py +++ b/vllm/attention/backends/dual_chunk_flash_attn.py @@ -1055,7 +1055,6 @@ class DualChunkFlashAttentionImpl(FlashAttentionImpl): v_states_intra, softmax_scale=softmax_scale, causal=True, - block_table=block_table, stage="intra", vertical_indices=vertical_buffer, slash_indices=slash_buffer, @@ -1070,7 +1069,6 @@ class DualChunkFlashAttentionImpl(FlashAttentionImpl): v_states_intra, softmax_scale=softmax_scale, causal=True, - block_table=block_table, stage="intra", vertical_indices=intra_vertical_indices, slash_indices=intra_slash_indices, @@ -1085,7 +1083,6 @@ class DualChunkFlashAttentionImpl(FlashAttentionImpl): v_states_succ, softmax_scale=softmax_scale, causal=False, - block_table=block_table, stage="succ", vertical_indices=succ_vertical_buffer, slash_indices=succ_slash_buffer, @@ -1100,7 +1097,6 @@ class DualChunkFlashAttentionImpl(FlashAttentionImpl): v_states_succ, softmax_scale=softmax_scale, causal=False, - block_table=block_table, stage="succ", vertical_indices=succ_vertical_indices, slash_indices=succ_slash_indices, @@ -1115,7 +1111,6 @@ class DualChunkFlashAttentionImpl(FlashAttentionImpl): v_states_inter, softmax_scale=softmax_scale, causal=False, - block_table=block_table, stage="inter", vertical_indices=inter_vertical_buffer, slash_indices=inter_slash_buffer, @@ -1130,7 +1125,6 @@ class DualChunkFlashAttentionImpl(FlashAttentionImpl): v_states_inter, softmax_scale=softmax_scale, causal=False, - block_table=block_table, stage="inter", vertical_indices=inter_vertical_indices, slash_indices=inter_slash_indices, @@ -1151,7 +1145,6 @@ class DualChunkFlashAttentionImpl(FlashAttentionImpl): value_states: torch.Tensor, softmax_scale: float, causal: bool = True, - block_table: torch.Tensor = None, max_seqlen_k: Optional[int] = None, stage: str = "intra", vertical_indices: Optional[torch.Tensor] = None, @@ -1230,7 +1223,6 @@ class DualChunkFlashAttentionImpl(FlashAttentionImpl): device=query_states.device), max_seqlen_k=max_seqlen_k, causal=causal, - block_table=block_table.unsqueeze(0), return_softmax_lse=True, ) softmax_lse = softmax_lse.view(q_len, q_heads, 1).transpose(0, From 316b1bf706f874d68d72c1bc1ba2b4e1f3b491bd Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Wed, 23 Jul 2025 15:49:25 +0100 Subject: [PATCH 5/5] [Tests] Add tests for headless internal DP LB (#21450) Signed-off-by: Nick Hill --- .buildkite/test-pipeline.yaml | 2 + .../openai/test_multi_api_servers.py | 123 +--- tests/v1/test_internal_lb_dp.py | 639 ++++++++++++++++++ tests/v1/test_utils.py | 124 ++++ 4 files changed, 768 insertions(+), 120 deletions(-) create mode 100644 tests/v1/test_internal_lb_dp.py diff --git a/.buildkite/test-pipeline.yaml b/.buildkite/test-pipeline.yaml index 00608229b95e3..c7378bf8ba5e7 100644 --- a/.buildkite/test-pipeline.yaml +++ b/.buildkite/test-pipeline.yaml @@ -165,6 +165,7 @@ steps: - tests/examples/offline_inference/data_parallel.py - tests/v1/test_async_llm_dp.py - tests/v1/test_external_lb_dp.py + - tests/v1/test_internal_lb_dp.py - tests/v1/engine/test_engine_core_client.py commands: # test with tp=2 and external_dp=2 @@ -176,6 +177,7 @@ steps: - python3 ../examples/offline_inference/data_parallel.py --enforce-eager - TP_SIZE=2 DP_SIZE=2 pytest -v -s v1/test_async_llm_dp.py - TP_SIZE=2 DP_SIZE=2 pytest -v -s v1/test_external_lb_dp.py + - TP_SIZE=1 DP_SIZE=4 pytest -v -s v1/test_internal_lb_dp.py - pytest -v -s v1/engine/test_engine_core_client.py::test_kv_cache_events_dp - pytest -v -s distributed/test_utils.py - pytest -v -s compile/test_basic_correctness.py diff --git a/tests/v1/entrypoints/openai/test_multi_api_servers.py b/tests/v1/entrypoints/openai/test_multi_api_servers.py index e84b5e3095d06..f7c31b0c43778 100644 --- a/tests/v1/entrypoints/openai/test_multi_api_servers.py +++ b/tests/v1/entrypoints/openai/test_multi_api_servers.py @@ -2,136 +2,19 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project import asyncio import os -import re import openai # use the official client for correctness check import pytest import pytest_asyncio -import requests from tests.utils import RemoteOpenAIServer +from tests.v1.test_utils import check_request_balancing MODEL_NAME = "ibm-research/PowerMoE-3b" DP_SIZE = os.getenv("DP_SIZE", "1") -def get_prometheus_metrics( - server: RemoteOpenAIServer) -> dict[str, dict[str, float]]: - """Fetch and parse Prometheus metrics from the /metrics endpoint. - - Returns: - Dict mapping metric names to their values grouped by labels. - For example: {"vllm:request_success": { - "engine=0": 5.0, "engine=1": 3.0} - } - """ - try: - response = requests.get(server.url_for("metrics"), timeout=10) - response.raise_for_status() - - metrics: dict[str, dict[str, float]] = {} - - # Regex patterns for Prometheus metrics - metric_with_labels = re.compile( - r'^([a-zA-Z_:][a-zA-Z0-9_:]*)\{([^}]*)\}\s+([\d\.\-\+e]+)$') - metric_simple = re.compile( - r'^([a-zA-Z_:][a-zA-Z0-9_:]*)\s+([\d\.\-\+e]+)$') - - for line in response.text.split('\n'): - line = line.strip() - # Skip comments and empty lines - if not line or line.startswith('#'): - continue - - # Try to match metric with labels first - match = metric_with_labels.match(line) - if match: - metric_name, labels_part, value_str = match.groups() - try: - value = float(value_str) - if metric_name not in metrics: - metrics[metric_name] = {} - metrics[metric_name][f'{{{labels_part}}}'] = value - except ValueError: - continue - else: - # Try simple metric without labels - match = metric_simple.match(line) - if match: - metric_name, value_str = match.groups() - try: - value = float(value_str) - if metric_name not in metrics: - metrics[metric_name] = {} - metrics[metric_name][''] = value - except ValueError: - continue - - return metrics - except Exception as e: - pytest.fail(f"Failed to fetch Prometheus metrics: {e}") - return {} - - -def get_engine_request_counts( - metrics: dict[str, dict[str, float]]) -> dict[str, float]: - """Extract request counts per engine from Prometheus metrics. - - Returns: - Dict mapping engine indices to request counts. - For example: {"0": 15.0, "1": 12.0} - """ - engine_counts = {} - - # Look for request success metrics with engine labels - success_metrics = metrics.get("vllm:request_success_total", {}) - engine_pattern = re.compile(r'engine="([^"]*)"') - - for labels, count in success_metrics.items(): - # Extract engine ID from labels using regex - match = engine_pattern.search(labels) - if match: - engine_id = match.group(1) - if engine_id not in engine_counts: - engine_counts[engine_id] = 0.0 - engine_counts[engine_id] += count - - return engine_counts - - -def check_request_balancing(server: RemoteOpenAIServer): - """Check request balancing via Prometheus metrics if DP_SIZE > 1. - - Args: - server: The RemoteOpenAIServer instance - """ - dp_size = int(DP_SIZE) - if dp_size <= 1: - return - - # Get metrics after all requests are completed - metrics = get_prometheus_metrics(server) - engine_counts = get_engine_request_counts(metrics) - - # Check that multiple engines received requests - engines_with_requests = [ - engine for engine, count in engine_counts.items() if count > 0 - ] - assert len(engines_with_requests) == dp_size, ( - f"Expected requests to be distributed across multiple engines," - f" but only engine(s) {engines_with_requests} received " - f"requests. Engine counts: {engine_counts}") - - # Verify that the load is reasonably balanced - # (no engine should handle all requests) - total_requests = sum(engine_counts.values()) - - for count in engine_counts.values(): - assert count > total_requests // (dp_size + 1), ( - f"requests are imbalanced: {engine_counts}") - - @pytest.fixture(scope="module") def default_server_args(): return [ @@ -217,7 +100,7 @@ async def test_single_completion(client: openai.AsyncOpenAI, assert all(completion is not None for completion in results) # Check request balancing via Prometheus metrics if DP_SIZE > 1 - check_request_balancing(server) + check_request_balancing(server, int(DP_SIZE)) @pytest.mark.asyncio @@ -295,4 +178,4 @@ async def test_completion_streaming(client: openai.AsyncOpenAI, assert all(results), "Not all streaming requests completed successfully." # Check request balancing via Prometheus metrics if DP_SIZE > 1 - check_request_balancing(server) + check_request_balancing(server, int(DP_SIZE)) diff --git a/tests/v1/test_internal_lb_dp.py b/tests/v1/test_internal_lb_dp.py new file mode 100644 index 0000000000000..9aef4d5821e82 --- /dev/null +++ b/tests/v1/test_internal_lb_dp.py @@ -0,0 +1,639 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import asyncio +import os +import threading +import time + +import openai # use the official client for correctness check +import pytest +import pytest_asyncio + +from tests.utils import RemoteOpenAIServer +from tests.v1.test_utils import check_request_balancing +from vllm.platforms import Platform + +MODEL_NAME = "ibm-research/PowerMoE-3b" + +# Number of data parallel ranks for multi-node internal LB testing +DP_SIZE = int(os.getenv("DP_SIZE", "2")) +# Default tensor parallel size to use +TP_SIZE = int(os.getenv("TP_SIZE", "1")) + +# Number of nodes to simulate +NUM_NODES = 2 + + +class MultinodeInternalLBServerManager: + """Manages multi-node data parallel vLLM server instances for internal + load balancer testing using --headless mode.""" + + def __init__(self, + model_name: str, + dp_size: int, + api_server_count: int, + base_server_args: list, + dp_per_node: int = 1, + tp_size: int = TP_SIZE): + self.model_name = model_name + self.dp_size = dp_size + self.dp_per_node = dp_per_node + self.tp_size = tp_size + self.api_server_count = api_server_count + self.base_server_args = base_server_args + self.servers: list[tuple[RemoteOpenAIServer, list[str]]] = [] + self.server_threads: list[threading.Thread] = [] + + def __enter__(self) -> list[tuple[RemoteOpenAIServer, list[str]]]: + """Start all server instances for multi-node internal LB mode.""" + for rank in range(0, self.dp_size, self.dp_per_node): + # Create server args for this specific rank + server_args = self.base_server_args.copy() + + if rank == 0: + # Head node - runs API server and first DP rank + server_args.extend([ + "--data-parallel-size", + str(self.dp_size), + "--data-parallel-size-local", + str(self.dp_per_node), + "--tensor-parallel-size", + str(self.tp_size), + "--port", + "8000", # Single endpoint for all requests + "--api-server-count", + str(self.api_server_count), + "--data-parallel-address", + "127.0.0.1", + "--data-parallel-rpc-port", + "13345", + ]) + else: + # Secondary nodes - run in headless mode + server_args.extend([ + "--headless", + "--data-parallel-size", + str(self.dp_size), + "--data-parallel-size-local", + str(self.dp_per_node), + "--data-parallel-start-rank", + str(rank), + "--tensor-parallel-size", + str(self.tp_size), + "--data-parallel-address", + "127.0.0.1", + "--data-parallel-rpc-port", + "13345", + ]) + + # Use a thread to start each server to allow parallel initialization + def start_server(r: int, sargs: list[str]): + gpus_per_node = self.tp_size * self.dp_per_node + try: + # Start the server + server = RemoteOpenAIServer( + self.model_name, + sargs, + auto_port=False, + env_dict={ + "CUDA_VISIBLE_DEVICES": + ",".join( + str(Platform.device_id_to_physical_device_id( + i)) for i in range(r, r + gpus_per_node)) + }) + server.__enter__() + if r == 0: + print( + f"Head node (rank {r}) started successfully with " + f"{self.api_server_count} API servers") + else: + print(f"Headless node (rank {r}) started successfully") + self.servers.append((server, sargs)) + except Exception as e: + print(f"Failed to start server rank {r}: {e}") + raise + + thread = threading.Thread(target=start_server, + args=(rank, server_args)) + thread.start() + + self.server_threads.append(thread) + + # Wait for all servers to start + for thread in self.server_threads: + thread.join() + + # Give servers additional time to fully initialize and coordinate + time.sleep(3) + + if len(self.servers) != self.dp_size // self.dp_per_node: + raise Exception("Servers failed to start") + + return self.servers + + def __exit__(self, exc_type, exc_val, exc_tb): + """Stop all server instances.""" + while self.servers: + try: + self.servers.pop()[0].__exit__(exc_type, exc_val, exc_tb) + except Exception as e: + print(f"Error stopping server: {e}") + + +class APIOnlyServerManager: + """Manages API-only server (Node 0) and headless engines server (Node 1) + for testing separated API server and engine configuration.""" + + def __init__(self, + model_name: str, + dp_size: int, + api_server_count: int, + base_server_args: list, + tp_size: int = TP_SIZE): + self.model_name = model_name + self.dp_size = dp_size + self.tp_size = tp_size + self.api_server_count = api_server_count + self.base_server_args = base_server_args + self.servers: list[tuple[RemoteOpenAIServer, list[str]]] = [] + self.server_threads: list[threading.Thread] = [] + + def __enter__(self) -> list[tuple[RemoteOpenAIServer, list[str]]]: + """Start API-only server and headless engines server.""" + + # Start API-only server (Node 0) - no engines, only API server + api_server_args = self.base_server_args.copy() + api_server_args.extend([ + "--data-parallel-size", + str(self.dp_size), + "--data-parallel-size-local", + "0", # No engines on this node + "--tensor-parallel-size", + str(self.tp_size), + "--port", + "8000", + "--api-server-count", + str(self.api_server_count), + "--data-parallel-address", + "127.0.0.1", + "--data-parallel-rpc-port", + "13345", + ]) + + # Start headless engines server (Node 1) - all engines, no API server + engines_server_args = self.base_server_args.copy() + engines_server_args.extend([ + "--headless", + "--data-parallel-size", + str(self.dp_size), + "--data-parallel-size-local", + str(self.dp_size), # All engines on this node + "--tensor-parallel-size", + str(self.tp_size), + "--data-parallel-address", + "127.0.0.1", + "--data-parallel-rpc-port", + "13345", + ]) + + # Use threads to start both servers in parallel + def start_api_server(): + try: + server = RemoteOpenAIServer( + self.model_name, + api_server_args, + auto_port=False, + env_dict={}) # No GPUs needed for API-only server + server.__enter__() + print(f"API-only server started successfully with " + f"{self.api_server_count} API servers") + self.servers.append((server, api_server_args)) + except Exception as e: + print(f"Failed to start API-only server: {e}") + raise + + def start_engines_server(): + try: + server = RemoteOpenAIServer( + self.model_name, + engines_server_args, + auto_port=False, + env_dict={ + "CUDA_VISIBLE_DEVICES": + ",".join( + str(Platform.device_id_to_physical_device_id(i)) + for i in range(self.dp_size * self.tp_size)) + }) + server.__enter__() + print(f"Headless engines server started successfully with " + f"{self.dp_size} engines") + self.servers.append((server, engines_server_args)) + except Exception as e: + print(f"Failed to start headless engines server: {e}") + raise + + # Start API server first + api_thread = threading.Thread(target=start_api_server) + api_thread.start() + self.server_threads.append(api_thread) + + # Start engines server second + engines_thread = threading.Thread(target=start_engines_server) + engines_thread.start() + self.server_threads.append(engines_thread) + + # Wait for both servers to start + for thread in self.server_threads: + thread.join() + + # Give servers additional time to fully initialize and coordinate + time.sleep(3) + + if len(self.servers) != 2: + raise Exception("Both servers failed to start") + + return self.servers + + def __exit__(self, exc_type, exc_val, exc_tb): + """Stop both server instances.""" + while self.servers: + try: + self.servers.pop()[0].__exit__(exc_type, exc_val, exc_tb) + except Exception as e: + print(f"Error stopping server: {e}") + + +@pytest.fixture(scope="module") +def default_server_args(): + return [ + # use half precision for speed and memory savings in CI environment + "--dtype", + "bfloat16", + "--max-model-len", + "2048", + "--max-num-seqs", + "128", + "--enforce-eager", + ] + + +@pytest.fixture(scope="module", params=[1, 4]) +def servers(request, default_server_args): + api_server_count = request.param + with MultinodeInternalLBServerManager(MODEL_NAME, DP_SIZE, + api_server_count, + default_server_args, + DP_SIZE // NUM_NODES, + TP_SIZE) as server_list: + yield server_list + + +@pytest.fixture(scope="module", params=[1, 4]) +def api_only_servers(request, default_server_args): + """Fixture for API-only server + headless engines configuration.""" + api_server_count = request.param + with APIOnlyServerManager(MODEL_NAME, DP_SIZE, api_server_count, + default_server_args, TP_SIZE) as server_list: + yield server_list + + +@pytest_asyncio.fixture +async def client(servers: list[tuple[RemoteOpenAIServer, list[str]]]): + # For internal LB, we only connect to the head node (rank 0) + # which provides the single API endpoint + head_server = servers[0][0] + async with head_server.get_async_client() as client: + yield client + + +@pytest_asyncio.fixture +async def api_only_client(api_only_servers: list[tuple[RemoteOpenAIServer, + list[str]]]): + """Client fixture for API-only server configuration.""" + # Connect to the API-only server (first server in the list) + api_server = api_only_servers[0][0] + async with api_server.get_async_client() as client: + yield client + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "model_name", + [MODEL_NAME], +) +async def test_multinode_dp_completion(client: openai.AsyncOpenAI, + servers: list[tuple[RemoteOpenAIServer, + list[str]]], + model_name: str) -> None: + + async def make_request(): + completion = await client.completions.create( + model=model_name, + prompt="Hello, my name is", + max_tokens=10, + temperature=1.0) + + assert completion.id is not None + assert completion.choices is not None and len(completion.choices) == 1 + + choice = completion.choices[0] + # The exact number of tokens can vary slightly with temperature=1.0, + # so we check for a reasonable minimum length. + assert len(choice.text) >= 1 + # Finish reason might not always be 'length' if the model finishes early + # or due to other reasons, especially with high temperature. + # So, we'll accept 'length' or 'stop'. + assert choice.finish_reason in ("length", "stop") + + # Token counts can also vary, so we check they are positive. + assert completion.usage.completion_tokens > 0 + assert completion.usage.prompt_tokens > 0 + assert completion.usage.total_tokens > 0 + return completion + + # Test single request + result = await make_request() + assert result is not None + print( + "Multi-node internal LB handled single completion request successfully" + ) + + await asyncio.sleep(0.5) + + # Send multiple requests - internal LB should distribute across DP ranks + num_requests = 50 + all_tasks = [make_request() for _ in range(num_requests)] + + results = await asyncio.gather(*all_tasks) + assert len(results) == num_requests + assert all(completion is not None for completion in results) + + await asyncio.sleep(0.5) + + # Second burst of requests + all_tasks = [make_request() for _ in range(num_requests)] + + results = await asyncio.gather(*all_tasks) + assert len(results) == num_requests + assert all(completion is not None for completion in results) + + _, server_args = servers[0] + api_server_count = ( + server_args.count('--api-server-count') + and server_args[server_args.index('--api-server-count') + 1] or 1) + print(f"Successfully completed multi-node internal LB test with " + f"{len(servers)} DP ranks (API server count: {api_server_count})") + + # Check request balancing via Prometheus metrics + head_server = servers[0][0] + check_request_balancing(head_server, DP_SIZE) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "model_name", + [MODEL_NAME], +) +async def test_multinode_dp_completion_streaming(client: openai.AsyncOpenAI, + servers: list[ + tuple[RemoteOpenAIServer, + list[str]]], + model_name: str) -> None: + prompt = "What is an LLM?" + + async def make_streaming_request(): + # Perform a non-streaming request to get the expected full output + single_completion = await client.completions.create( + model=model_name, + prompt=prompt, + max_tokens=5, + temperature=0.0, + ) + single_output = single_completion.choices[0].text + + # Perform the streaming request + stream = await client.completions.create(model=model_name, + prompt=prompt, + max_tokens=5, + temperature=0.0, + stream=True) + chunks: list[str] = [] + finish_reason_count = 0 + last_chunk = None + async for chunk in stream: + chunks.append(chunk.choices[0].text) + if chunk.choices[0].finish_reason is not None: + finish_reason_count += 1 + last_chunk = chunk # Keep track of the last chunk + + # finish reason should only return in the last block for OpenAI API + assert finish_reason_count == 1, ( + "Finish reason should appear exactly once.") + assert last_chunk is not None, ( + "Stream should have yielded at least one chunk.") + assert last_chunk.choices[ + 0].finish_reason == "length", "Finish reason should be 'length'." + # Check that the combined text matches the non-streamed version. + assert "".join( + chunks + ) == single_output, "Streamed output should match non-streamed output." + return True # Indicate success for this request + + # Test single streaming request + result = await make_streaming_request() + assert result is not None + print( + "Multi-node internal LB handled single streaming request successfully") + + await asyncio.sleep(0.5) + + # Send multiple streaming requests - internal LB should distribute across + # DP ranks + num_requests = 50 + all_tasks = [make_streaming_request() for _ in range(num_requests)] + + results = await asyncio.gather(*all_tasks) + assert len(results) == num_requests + assert all(results), "Not all streaming requests completed successfully." + + await asyncio.sleep(0.5) + + # Second burst of streaming requests + all_tasks = [make_streaming_request() for _ in range(num_requests)] + + results = await asyncio.gather(*all_tasks) + assert len(results) == num_requests + assert all(results), "Not all streaming requests completed successfully." + + _, server_args = servers[0] + api_server_count = ( + server_args.count('--api-server-count') + and server_args[server_args.index('--api-server-count') + 1] or 1) + print(f"Successfully completed multi-node internal LB streaming test with " + f"{len(servers)} DP ranks (API server count: {api_server_count})") + + # Check request balancing via Prometheus metrics + head_server = servers[0][0] + check_request_balancing(head_server, DP_SIZE) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "model_name", + [MODEL_NAME], +) +async def test_api_only_multinode_dp_completion( + api_only_client: openai.AsyncOpenAI, + api_only_servers: list[tuple[RemoteOpenAIServer, + list[str]]], model_name: str) -> None: + """Test API-only server with all engines on separate headless server.""" + + async def make_request(): + completion = await api_only_client.completions.create( + model=model_name, + prompt="Hello, my name is", + max_tokens=10, + temperature=1.0) + + assert completion.id is not None + assert completion.choices is not None and len(completion.choices) == 1 + + choice = completion.choices[0] + # The exact number of tokens can vary slightly with temperature=1.0, + # so we check for a reasonable minimum length. + assert len(choice.text) >= 1 + # Finish reason might not always be 'length' if the model finishes + # early or due to other reasons, especially with high temperature. + # So, we'll accept 'length' or 'stop'. + assert choice.finish_reason in ("length", "stop") + + # Token counts can also vary, so we check they are positive. + assert completion.usage.completion_tokens > 0 + assert completion.usage.prompt_tokens > 0 + assert completion.usage.total_tokens > 0 + return completion + + # Test single request + result = await make_request() + assert result is not None + print("API-only server handled single completion request successfully") + + await asyncio.sleep(0.5) + + # Send multiple requests - should be distributed across engines on + # headless server + num_requests = 50 + all_tasks = [make_request() for _ in range(num_requests)] + + results = await asyncio.gather(*all_tasks) + assert len(results) == num_requests + assert all(completion is not None for completion in results) + + await asyncio.sleep(0.5) + + # Second burst of requests + all_tasks = [make_request() for _ in range(num_requests)] + + results = await asyncio.gather(*all_tasks) + assert len(results) == num_requests + assert all(completion is not None for completion in results) + + _, api_server_args = api_only_servers[0] + api_server_count = ( + api_server_args.count('--api-server-count') + and api_server_args[api_server_args.index('--api-server-count') + 1] + or 1) + print(f"Successfully completed API-only multi-node test with {DP_SIZE} " + f"engines on headless server (API server count: {api_server_count})") + + # Check request balancing via Prometheus metrics + api_server = api_only_servers[0][0] + check_request_balancing(api_server, DP_SIZE) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "model_name", + [MODEL_NAME], +) +async def test_api_only_multinode_dp_completion_streaming( + api_only_client: openai.AsyncOpenAI, + api_only_servers: list[tuple[RemoteOpenAIServer, + list[str]]], model_name: str) -> None: + """Test API-only server streaming with all engines on separate + headless server.""" + prompt = "What is an LLM?" + + async def make_streaming_request(): + # Perform a non-streaming request to get the expected full output + single_completion = await api_only_client.completions.create( + model=model_name, + prompt=prompt, + max_tokens=5, + temperature=0.0, + ) + single_output = single_completion.choices[0].text + + # Perform the streaming request + stream = await api_only_client.completions.create(model=model_name, + prompt=prompt, + max_tokens=5, + temperature=0.0, + stream=True) + chunks: list[str] = [] + finish_reason_count = 0 + last_chunk = None + async for chunk in stream: + chunks.append(chunk.choices[0].text) + if chunk.choices[0].finish_reason is not None: + finish_reason_count += 1 + last_chunk = chunk # Keep track of the last chunk + + # finish reason should only return in the last block for OpenAI API + assert finish_reason_count == 1, ( + "Finish reason should appear exactly once.") + assert last_chunk is not None, ( + "Stream should have yielded at least one chunk.") + assert last_chunk.choices[ + 0].finish_reason == "length", "Finish reason should be 'length'." + # Check that the combined text matches the non-streamed version. + assert "".join( + chunks + ) == single_output, "Streamed output should match non-streamed output." + return True # Indicate success for this request + + # Test single streaming request + result = await make_streaming_request() + assert result is not None + print("API-only server handled single streaming request successfully") + + await asyncio.sleep(0.5) + + # Send multiple streaming requests - should be distributed across engines + num_requests = 50 + all_tasks = [make_streaming_request() for _ in range(num_requests)] + + results = await asyncio.gather(*all_tasks) + assert len(results) == num_requests + assert all(results), "Not all streaming requests completed successfully." + + await asyncio.sleep(0.5) + + # Second burst of streaming requests + all_tasks = [make_streaming_request() for _ in range(num_requests)] + + results = await asyncio.gather(*all_tasks) + assert len(results) == num_requests + assert all(results), "Not all streaming requests completed successfully." + + _, api_server_args = api_only_servers[0] + api_server_count = ( + api_server_args.count('--api-server-count') + and api_server_args[api_server_args.index('--api-server-count') + 1] + or 1) + print(f"Successfully completed API-only streaming test with {DP_SIZE} " + f"engines on headless server (API server count: {api_server_count})") + + # Check request balancing via Prometheus metrics + api_server = api_only_servers[0][0] + check_request_balancing(api_server, DP_SIZE) diff --git a/tests/v1/test_utils.py b/tests/v1/test_utils.py index fd0e630ce178a..0b892bd9dffdc 100644 --- a/tests/v1/test_utils.py +++ b/tests/v1/test_utils.py @@ -1,8 +1,13 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import re + +import pytest +import requests import torch +from tests.utils import RemoteOpenAIServer from vllm.v1.worker.utils import bind_kv_cache @@ -61,3 +66,122 @@ def test_bind_kv_cache_non_attention(): assert runner_kv_caches[0] is kv_cache['model.layers.20.attn'] assert runner_kv_caches[1] is kv_cache['model.layers.28.attn'] + + +# Prometheus metrics utilities for testing + + +def get_prometheus_metrics( + server: RemoteOpenAIServer) -> dict[str, dict[str, float]]: + """Fetch and parse Prometheus metrics from the /metrics endpoint. + + Returns: + Dict mapping metric names to their values grouped by labels. + For example: {"vllm:request_success": { + "engine=0": 5.0, "engine=1": 3.0} + } + """ + try: + response = requests.get(server.url_for("metrics"), timeout=10) + response.raise_for_status() + + metrics: dict[str, dict[str, float]] = {} + + # Regex patterns for Prometheus metrics + metric_with_labels = re.compile( + r'^([a-zA-Z_:][a-zA-Z0-9_:]*)\{([^}]*)\}\s+([\d\.\-\+e]+)$') + metric_simple = re.compile( + r'^([a-zA-Z_:][a-zA-Z0-9_:]*)\s+([\d\.\-\+e]+)$') + + for line in response.text.split('\n'): + line = line.strip() + # Skip comments and empty lines + if not line or line.startswith('#'): + continue + + # Try to match metric with labels first + match = metric_with_labels.match(line) + if match: + metric_name, labels_part, value_str = match.groups() + try: + value = float(value_str) + if metric_name not in metrics: + metrics[metric_name] = {} + metrics[metric_name][f'{{{labels_part}}}'] = value + except ValueError: + continue + else: + # Try simple metric without labels + match = metric_simple.match(line) + if match: + metric_name, value_str = match.groups() + try: + value = float(value_str) + if metric_name not in metrics: + metrics[metric_name] = {} + metrics[metric_name][''] = value + except ValueError: + continue + + return metrics + except Exception as e: + pytest.fail(f"Failed to fetch Prometheus metrics: {e}") + return {} + + +def get_engine_request_counts( + metrics: dict[str, dict[str, float]]) -> dict[str, float]: + """Extract request counts per engine from Prometheus metrics. + + Returns: + Dict mapping engine indices to request counts. + For example: {"0": 15.0, "1": 12.0} + """ + engine_counts = {} + + # Look for request success metrics with engine labels + success_metrics = metrics.get("vllm:request_success_total", {}) + engine_pattern = re.compile(r'engine="([^"]*)"') + + for labels, count in success_metrics.items(): + # Extract engine ID from labels using regex + match = engine_pattern.search(labels) + if match: + engine_id = match.group(1) + if engine_id not in engine_counts: + engine_counts[engine_id] = 0.0 + engine_counts[engine_id] += count + + return engine_counts + + +def check_request_balancing(server: RemoteOpenAIServer, dp_size: int): + """Check request balancing via Prometheus metrics if dp_size > 1. + + Args: + server: The RemoteOpenAIServer instance + dp_size: Number of data parallel ranks + """ + if dp_size <= 1: + return + + # Get metrics after all requests are completed + metrics = get_prometheus_metrics(server) + engine_counts = get_engine_request_counts(metrics) + + # Check that multiple engines received requests + engines_with_requests = [ + engine for engine, count in engine_counts.items() if count > 0 + ] + assert len(engines_with_requests) == dp_size, ( + f"Expected requests to be distributed across multiple engines," + f" but only engine(s) {engines_with_requests} received " + f"requests. Engine counts: {engine_counts}") + + # Verify that the load is reasonably balanced + # (no engine should handle all requests) + total_requests = sum(engine_counts.values()) + + for count in engine_counts.values(): + assert count > total_requests // (dp_size + 1), ( + f"requests are imbalanced: {engine_counts}")