diff --git a/.buildkite/nightly-benchmarks/tests/genai-perf-tests.json b/.buildkite/nightly-benchmarks/tests/genai-perf-tests.json index f26ae7634f3d9..afb844880f9ff 100644 --- a/.buildkite/nightly-benchmarks/tests/genai-perf-tests.json +++ b/.buildkite/nightly-benchmarks/tests/genai-perf-tests.json @@ -12,7 +12,6 @@ "vllm_server_parameters": { "disable_log_stats": "", "gpu_memory_utilization": 0.9, - "num_scheduler_steps": 10, "max_num_seqs": 512, "dtype": "bfloat16" }, diff --git a/.buildkite/nightly-benchmarks/tests/nightly-tests.json b/.buildkite/nightly-benchmarks/tests/nightly-tests.json index 41b4a4008801d..423a3bfe12677 100644 --- a/.buildkite/nightly-benchmarks/tests/nightly-tests.json +++ b/.buildkite/nightly-benchmarks/tests/nightly-tests.json @@ -36,7 +36,6 @@ "vllm_server_parameters": { "disable_log_stats": "", "gpu_memory_utilization": 0.9, - "num_scheduler_steps": 10, "max_num_seqs": 512, "dtype": "bfloat16" }, @@ -90,7 +89,6 @@ "vllm_server_parameters": { "disable_log_stats": "", "gpu_memory_utilization": 0.9, - "num_scheduler_steps": 10, "max_num_seqs": 512, "dtype": "bfloat16" }, @@ -144,7 +142,6 @@ "vllm_server_parameters": { "disable_log_stats": "", "gpu_memory_utilization": 0.9, - "num_scheduler_steps": 10, "max_num_seqs": 512, "dtype": "bfloat16" }, @@ -195,7 +192,6 @@ "vllm_server_parameters": { "disable_log_stats": "", "gpu_memory_utilization": 0.9, - "num_scheduler_steps": 10, "max_num_seqs": 512, "dtype": "bfloat16" }, @@ -248,7 +244,6 @@ "vllm_server_parameters": { "disable_log_stats": "", "gpu_memory_utilization": 0.9, - "num_scheduler_steps": 10, "max_num_seqs": 512, "dtype": "bfloat16" }, @@ -301,7 +296,6 @@ "vllm_server_parameters": { "disable_log_stats": "", "gpu_memory_utilization": 0.9, - "num_scheduler_steps": 10, "max_num_seqs": 512, "dtype": "bfloat16" }, diff --git a/.buildkite/test-pipeline.yaml b/.buildkite/test-pipeline.yaml index ebcf51981ef33..740be2bc87706 100644 --- a/.buildkite/test-pipeline.yaml +++ b/.buildkite/test-pipeline.yaml @@ -67,7 +67,6 @@ steps: - python3 standalone_tests/lazy_imports.py - pytest -v -s mq_llm_engine # MQLLMEngine - pytest -v -s async_engine # AsyncLLMEngine - - NUM_SCHEDULER_STEPS=4 pytest -v -s async_engine/test_async_llm_engine.py - pytest -v -s test_inputs.py - pytest -v -s test_outputs.py - pytest -v -s multimodal @@ -773,27 +772,6 @@ steps: - pytest -v -s models/test_oot_registration.py # it needs a clean process - pytest -v -s plugins/lora_resolvers # unit tests for in-tree lora resolver plugins -- label: Multi-step Tests (4 GPUs) # 36min - mirror_hardwares: [amdexperimental] - working_dir: "/vllm-workspace/tests" - num_gpus: 4 - source_file_dependencies: - - vllm/model_executor/layers/sampler.py - - vllm/sequence.py - - vllm/worker/worker_base.py - - vllm/worker/worker.py - - vllm/worker/multi_step_worker.py - - vllm/worker/model_runner_base.py - - vllm/worker/model_runner.py - - vllm/worker/multi_step_model_runner.py - - vllm/engine - - tests/multi_step - commands: - # this test is quite flaky - # TODO: investigate and fix. - # - pytest -v -s multi_step/test_correctness_async_llm.py - - pytest -v -s multi_step/test_correctness_llm.py - - label: Pipeline Parallelism Test # 45min mirror_hardwares: [amdexperimental] working_dir: "/vllm-workspace/tests" diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index a0a327319a468..b0dd5e99d4c72 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -36,7 +36,6 @@ CMakeLists.txt @tlrmchlsmth @LucasWilkinson /tests/entrypoints @DarkLight1337 @robertgshaw2-redhat @simon-mo @aarnphm /tests/kernels @tlrmchlsmth @WoosukKwon @yewentao256 /tests/models @DarkLight1337 @ywang96 -/tests/multi_step @alexm-redhat @comaniac /tests/multimodal @DarkLight1337 @ywang96 /tests/prefix_caching @comaniac @KuntaiDu /tests/quantization @mgoin @robertgshaw2-redhat @yewentao256 diff --git a/tests/async_engine/test_async_llm_engine.py b/tests/async_engine/test_async_llm_engine.py deleted file mode 100644 index 0eb7a6eb52aa6..0000000000000 --- a/tests/async_engine/test_async_llm_engine.py +++ /dev/null @@ -1,409 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project - -import asyncio -import os -import uuid -from asyncio import CancelledError -from copy import copy -from dataclasses import dataclass, field -from typing import Any, Optional - -import pytest -import pytest_asyncio -import torch - -from vllm import SamplingParams -from vllm.config import ParallelConfig -from vllm.distributed import cleanup_dist_env_and_memory -from vllm.engine.async_llm_engine import AsyncEngineArgs, AsyncLLMEngine -from vllm.outputs import RequestOutput as RealRequestOutput -from vllm.sampling_params import RequestOutputKind - -from ..utils import wait_for_gpu_memory_to_clear - - -@dataclass -class RequestOutput: - request_id: int - finished: bool = False - - -@dataclass -class MockModelConfig: - use_async_output_proc = True - media_io_kwargs: dict[str, dict[str, Any]] = field(default_factory=dict) - - -class MockEngine: - - def __init__(self): - self.step_calls = 0 - self.add_request_calls = 0 - self.abort_request_calls = 0 - self.request_id = None - # Ugly, remove dependency when possible - self.parallel_config = ParallelConfig() - self.model_config = MockModelConfig() - - async def step_async(self, virtual_engine): - # PP size is 1, ignore virtual engine - self.step_calls += 1 - return [RequestOutput( - request_id=self.request_id)] if self.request_id else [] - - async def process_model_inputs_async(self, *args, **kwargs): - pass - - async def stop_remote_worker_execution_loop_async(self): - pass - - def generate(self, request_id): - self.request_id = request_id - - def stop_generating(self): - self.request_id = None - - def add_request(self, **kwargs): - del kwargs # Unused - self.add_request_calls += 1 - print(f'Request calls: {self.add_request_calls}') - - async def add_request_async(self, **kwargs): - self.add_request_calls += 1 - return - - def abort_request(self, request_id): - del request_id # Unused - self.abort_request_calls += 1 - - def has_unfinished_requests(self): - return self.request_id is not None - - def has_unfinished_requests_for_virtual_engine(self, virtual_engine): - return self.request_id is not None - - -class MockAsyncLLMEngine(AsyncLLMEngine): - _engine_class = MockEngine - - -@pytest.mark.asyncio -async def test_new_requests_event(): - params = SamplingParams() - - engine = MockAsyncLLMEngine() - engine.start_background_loop() - await asyncio.sleep(0.01) - assert engine.engine.step_calls == 0 - - await engine.add_request("1", "", params) - await asyncio.sleep(0.01) - assert engine.engine.add_request_calls == 1 - assert engine.engine.step_calls == 1 - - await engine.add_request("2", "", params) - engine.engine.generate("2") - await asyncio.sleep(0) - await asyncio.sleep(0) - await asyncio.sleep(0) - assert engine.engine.add_request_calls == 2 - assert engine.engine.step_calls >= 2 - await asyncio.sleep(0.001) - assert engine.engine.step_calls >= 3 - engine.engine.stop_generating() - await asyncio.sleep(0.001) - old_step_calls = engine.engine.step_calls - await asyncio.sleep(0.001) - assert engine.engine.step_calls == old_step_calls - - await engine.add_request("3", "", params) - await asyncio.sleep(0.01) - assert engine.engine.add_request_calls == 3 - assert engine.engine.step_calls == old_step_calls + 1 - await asyncio.sleep(0.01) - assert engine.engine.add_request_calls == 3 - assert engine.engine.step_calls == old_step_calls + 1 - - engine = MockAsyncLLMEngine() - assert engine.get_model_config() is not None - assert engine.get_tokenizer() is not None - assert engine.get_decoding_config() is not None - - -def start_engine(): - wait_for_gpu_memory_to_clear( - devices=list(range(torch.cuda.device_count())), - threshold_bytes=2 * 2**30, - timeout_s=60, - ) - - num_scheduler_steps = int(os.getenv("NUM_SCHEDULER_STEPS", "1")) - print(f"Starting engine with num_scheduler_steps={num_scheduler_steps}") - - return AsyncLLMEngine.from_engine_args( - AsyncEngineArgs(model="facebook/opt-125m", - enforce_eager=True, - num_scheduler_steps=num_scheduler_steps)) - - -def uid() -> str: - return str(uuid.uuid4()) - - -@pytest_asyncio.fixture(scope="module") -async def async_engine(): - # We cannot use monkeypatch since this is a module - # scoped fixture and monkeypatch is function scoped. - previous_value = os.getenv("VLLM_USE_V1", None) - os.environ["VLLM_USE_V1"] = "0" - engine = await asyncio.get_event_loop().run_in_executor(executor=None, - func=start_engine) - try: - yield engine - finally: - engine.shutdown_background_loop() - del engine - await asyncio.sleep(0.1) - cleanup_dist_env_and_memory() - - if previous_value: - os.environ["VLLM_USE_V1"] = previous_value - else: - del os.environ["VLLM_USE_V1"] - - -@pytest.fixture() -def should_do_global_cleanup_after_test(request) -> bool: - # So we can share the async engine fixture between these tests - return False - - -@pytest.mark.asyncio(scope="module") -@pytest.mark.parametrize("stop", [None, ["a stop string"]]) -async def test_asyncio_run(async_engine, stop): - - scheduler_config = await async_engine.get_scheduler_config() - num_scheduler_steps = scheduler_config.num_scheduler_steps - - async def run(prompt: str): - sampling_params = SamplingParams( - temperature=0, - max_tokens=32, - min_tokens=32, - stop=stop, - ) - - output_count = 0 - final_output = None - async for output in async_engine.generate(prompt, - sampling_params, - request_id=uid()): - output_count += 1 - final_output = output - return final_output, output_count - - results = await asyncio.gather( - run("test0"), - run("test0"), - ) - assert len(results) == 2 - first, second = results - - # remove nondeterministic fields for comparison - first[0].metrics = None - second[0].metrics = None - first[0].request_id = None - second[0].request_id = None - - assert str(first) == str(second) - - output_count = results[0][1] - if num_scheduler_steps == 1: - assert output_count == 32 - else: - assert 1 < output_count < 32 - - -@pytest.mark.asyncio(scope="module") -@pytest.mark.parametrize("stop", [None, ["a stop string"]]) -async def test_output_kinds(async_engine, stop): - """Test that output_kind works as expected and that - results are equivalent across different kinds.""" - - scheduler_config = await async_engine.get_scheduler_config() - num_scheduler_steps = scheduler_config.num_scheduler_steps - - sampling_params = SamplingParams( - temperature=0, - max_tokens=32, - min_tokens=32, - stop=stop, - ) - - async def run(prompt: str, kind: RequestOutputKind): - params = copy(sampling_params) - params.output_kind = kind - - output_count = 0 - final_output = None - async for output in async_engine.generate(prompt, - params, - request_id=uid()): - output_count += 1 - final_output = output - - assert final_output is not None - assert final_output.finished - - return (final_output.prompt_token_ids, - final_output.outputs[0].token_ids, - final_output.outputs[0].text, output_count) - - async def run_deltas(prompt: str): - params = copy(sampling_params) - params.output_kind = RequestOutputKind.DELTA - - prompt_tokens = None - output_tokens: list[int] = [] - output_text = "" - output_count = 0 - final_output = None - async for output in async_engine.generate(prompt, - params, - request_id=uid()): - token_ids = output.outputs[0].token_ids - text = output.outputs[0].text - final_output = output - - # Ensure we get prompt ids iff we haven't yet received output tokens - if output_tokens: - assert 1 <= len(token_ids) <= num_scheduler_steps - assert stop or text - assert not output.prompt_token_ids - else: - assert output.prompt_token_ids - prompt_tokens = output.prompt_token_ids - - output_tokens.extend(token_ids) - output_text += text - - output_count += 1 - - assert final_output is not None - assert final_output.finished - - return prompt_tokens, output_tokens, output_text, output_count - - results = await asyncio.gather( - run("common input prompt", RequestOutputKind.CUMULATIVE), - run("common input prompt", RequestOutputKind.FINAL_ONLY), - run_deltas("common input prompt")) - - # Make sure outputs are the same - prompt_set = set(tuple(prompt_ids) for prompt_ids, _, _, _ in results) - assert len(prompt_set) == 1 - - text_set = set(text for _, _, text, _ in results) - assert len(text_set) == 1 - - tokens_set = set(tuple(ids) for _, ids, _, _ in results) - assert len(tokens_set) == 1 - - cumulative, final, deltas = results - - # output message counts - assert cumulative[3] == deltas[3] - - if num_scheduler_steps == 1: - assert cumulative[3] == 32 - else: - assert 1 < cumulative[3] < 32 - - assert final[3] == 1 - - -@pytest.mark.asyncio(scope="module") -@pytest.mark.parametrize("stop", [None, ["a stop string"]]) -async def test_cancellation(async_engine, stop): - scheduler_config = await async_engine.get_scheduler_config() - num_scheduler_steps = scheduler_config.num_scheduler_steps - - sampling_params = SamplingParams( - temperature=0, - min_tokens=13, - max_tokens=13, - stop=stop, - ) - - stop_at = 5 if num_scheduler_steps == 1 else 1 - - request_id = uid() - - i = 0 - with pytest.raises(CancelledError): - async for output in async_engine.generate("test2", - sampling_params, - request_id=request_id): - assert not output.finished - i += 1 - if i == stop_at: - await async_engine.abort(request_id) - - assert i == stop_at - - -@pytest.mark.asyncio(scope="module") -@pytest.mark.parametrize("stop", [None, ["a stop string"]]) -async def test_delayed_generator(async_engine, stop): - scheduler_config = await async_engine.get_scheduler_config() - - if scheduler_config.num_scheduler_steps != 1: - pytest.skip("no need to test this one with multistep") - - sampling_params = SamplingParams( - temperature=0, - min_tokens=10, - max_tokens=10, - stop=stop, - ) - - stream = async_engine.generate("test3", sampling_params, request_id=uid()) - i = 0 - final_output: Optional[RealRequestOutput] = None - async for output in stream: - final_output = output - if i == 0: - # wait for generation to complete before consuming - # the remaining messages - await asyncio.sleep(1) - if i < 9: - assert not output.finished - i += 1 - - assert i == 10 - assert final_output is not None - assert len(final_output.outputs[0].token_ids) == 10 - assert final_output.finished - - -@pytest.mark.asyncio(scope="module") -async def test_invalid_argument(async_engine): - scheduler_config = await async_engine.get_scheduler_config() - - if scheduler_config.num_scheduler_steps != 1: - pytest.skip("no need to test this one with multistep") - - sampling_params = SamplingParams( - temperature=0, - min_tokens=10, - max_tokens=10, - ) - - # Targeting specific DP rank only supported in v1 multi-instance DP - with pytest.raises(ValueError): - async for _ in async_engine.generate("test", - sampling_params, - request_id=uid(), - data_parallel_rank=0): - pass diff --git a/tests/config/test_config.yaml b/tests/config/test_config.yaml index 5090e8f357bb8..a16857b5f2fbd 100644 --- a/tests/config/test_config.yaml +++ b/tests/config/test_config.yaml @@ -2,4 +2,3 @@ port: 12312 served_model_name: mymodel tensor_parallel_size: 2 trust_remote_code: true -multi_step_stream_outputs: false diff --git a/tests/config/test_config_with_model.yaml b/tests/config/test_config_with_model.yaml index d8c8c7bc8162a..9fbdb77d4ef24 100644 --- a/tests/config/test_config_with_model.yaml +++ b/tests/config/test_config_with_model.yaml @@ -4,4 +4,3 @@ port: 12312 served_model_name: mymodel tensor_parallel_size: 2 trust_remote_code: true -multi_step_stream_outputs: false diff --git a/tests/core/test_chunked_prefill_scheduler.py b/tests/core/test_chunked_prefill_scheduler.py index d4dacc4f1296d..ce1fe189b3ca1 100644 --- a/tests/core/test_chunked_prefill_scheduler.py +++ b/tests/core/test_chunked_prefill_scheduler.py @@ -644,11 +644,9 @@ def test_chunked_prefill_preempt(): assert out.num_batched_tokens == max_num_batched_tokens -@pytest.mark.parametrize("num_scheduler_steps", [1, 5]) -def test_chunked_prefill_spec_prefill(num_scheduler_steps): +def test_chunked_prefill_spec_prefill(): """Verify that the num_lookahead_slots is set appropriately for an all""" - """prefill batch depending on whether multi-step scheduling is enabled""" - """or not""" + """prefill batch.""" block_size = 4 max_seqs = 30 max_model_len = 200 @@ -661,7 +659,6 @@ def test_chunked_prefill_spec_prefill(num_scheduler_steps): max_model_len, enable_chunked_prefill=True, num_lookahead_slots=num_lookahead_slots, - num_scheduler_steps=num_scheduler_steps, ) cache_config = CacheConfig(block_size, 1.0, 1, "auto") cache_config.num_cpu_blocks = 16 @@ -679,8 +676,7 @@ def test_chunked_prefill_spec_prefill(num_scheduler_steps): assert out.num_prefill_groups == 1 assert out.num_batched_tokens == max_num_batched_tokens print(out.num_lookahead_slots) - assert out.num_lookahead_slots == (0 if (num_scheduler_steps == 1) else - num_lookahead_slots) + assert out.num_lookahead_slots == 0 def test_chunked_prefill_max_seqs(): diff --git a/tests/core/test_num_computed_tokens_update.py b/tests/core/test_num_computed_tokens_update.py index 9e1b7913dfb99..131a7b3a6299b 100644 --- a/tests/core/test_num_computed_tokens_update.py +++ b/tests/core/test_num_computed_tokens_update.py @@ -6,7 +6,6 @@ import pytest from tests.conftest import VllmRunner from tests.core.utils import create_dummy_prompt from vllm.engine.llm_engine import LLMEngine -from vllm.platforms import current_platform from vllm.sequence import SequenceGroup MODEL = "JackFram/llama-160m" @@ -17,32 +16,19 @@ def add_seq_group_to_engine(engine: LLMEngine, seq_group: SequenceGroup): scheduler.add_seq_group(seq_group) -@pytest.mark.parametrize("num_scheduler_steps", [1, 8]) @pytest.mark.parametrize("enable_chunked_prefill", [False, True]) @pytest.mark.parametrize("enforce_eager", [False, True]) -def test_num_computed_tokens_update(num_scheduler_steps: int, - enable_chunked_prefill: bool, +def test_num_computed_tokens_update(enable_chunked_prefill: bool, enforce_eager: bool): - is_multi_step = num_scheduler_steps > 1 - is_multi_step_chunked_prefill = is_multi_step and enable_chunked_prefill - - if is_multi_step_chunked_prefill and current_platform.is_rocm(): - pytest.skip("Multi-step with Chunked-Prefill does not support " - "rocm_flash_attn backend") - # Make a vllm engine runner = VllmRunner(model_name=MODEL, gpu_memory_utilization=0.7, - num_scheduler_steps=num_scheduler_steps, enable_chunked_prefill=enable_chunked_prefill, enforce_eager=enforce_eager) engine: LLMEngine = runner.llm.llm_engine - # In multi-step + chunked-prefill there is no separate single prompt step. - # What is scheduled will run for num_scheduler_steps always. - num_prompt_steps = num_scheduler_steps \ - if is_multi_step_chunked_prefill else 1 + num_prompt_steps = 1 num_output_tokens_list = [4, 8, 12, 15, 16, 17] @@ -73,10 +59,8 @@ def test_num_computed_tokens_update(num_scheduler_steps: int, # Test correctness of num_computed_tokens after the decode steps assert seq.data.get_num_computed_tokens( ) == prompt_num_computed_tokens + decode_step_counter - for _ in range(num_scheduler_steps): - # decode step - engine.step() - decode_step_counter += 1 + engine.step() + decode_step_counter += 1 # Test correctness of num_computed_tokens after the sequence finish. assert seq.data.get_num_computed_tokens( diff --git a/tests/engine/test_multi_step_output_processor.py b/tests/engine/test_multi_step_output_processor.py deleted file mode 100644 index 458f4deb743ac..0000000000000 --- a/tests/engine/test_multi_step_output_processor.py +++ /dev/null @@ -1,274 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project - -import random -from unittest.mock import MagicMock - -import pytest -from transformers import PreTrainedTokenizer - -from vllm.core.scheduler import Scheduler -from vllm.engine.output_processor.multi_step import MultiStepOutputProcessor -from vllm.engine.output_processor.stop_checker import StopChecker -from vllm.sampling_params import SamplingParams -from vllm.sequence import (CompletionSequenceGroupOutput, Logprob, - SequenceOutput, SequenceStatus) -from vllm.transformers_utils.detokenizer import Detokenizer -from vllm.utils import Counter - -from ..core.utils import create_seq_group - - -@pytest.mark.parametrize("seq_output_len", [128]) -@pytest.mark.parametrize("num_new_tokens", [1, 12]) -@pytest.mark.skip_global_cleanup -def test_appends_token_ids(num_new_tokens: int, seq_output_len: int): - """Verify multi-step decoding appends token ids correctly. - - We append token ids and verify all the token ids were appended correctly. - Note that ignore_eos=True. - """ - detokenizer = MagicMock(spec=Detokenizer) - scheduler = MagicMock(spec=Scheduler) - stop_checker = MagicMock(spec=StopChecker) - seq_counter = Counter() - - output_processor = MultiStepOutputProcessor( - detokenizer=detokenizer, - scheduler=[scheduler], - seq_counter=seq_counter, - get_tokenizer_for_seq=lambda _: mock_tokenizer(), - stop_checker=stop_checker, - ) - - seq_group = create_seq_group( - seq_prompt_len=1024, - seq_output_lens=[seq_output_len], - sampling_params=SamplingParams(max_tokens=seq_output_len + - num_new_tokens, - ignore_eos=True), - ) - - seq = seq_group.get_seqs()[0] - seq.status = SequenceStatus.RUNNING - - new_token_ids = list(range(num_new_tokens)) - - outputs = [ - CompletionSequenceGroupOutput( - samples=[ - SequenceOutput( - parent_seq_id=seq.seq_id, - output_token=output_token, - logprobs={output_token: Logprob(0.0)}, - ) - ], - prompt_logprobs=None, - ) for output_token in new_token_ids - ] - - assert seq.get_token_ids()[-len(new_token_ids):] != new_token_ids - output_processor.process_outputs(seq_group, outputs) - assert seq.get_token_ids()[-len(new_token_ids):] == new_token_ids - - -@pytest.mark.parametrize("seq_prompt_len", [1024]) -@pytest.mark.parametrize("seq_output_len", [128]) -@pytest.mark.parametrize("num_new_tokens", [5, 6, 7, 8]) -@pytest.mark.parametrize("max_tokens", [128 + 3]) -@pytest.mark.skip_global_cleanup -def test_respects_max_tokens(num_new_tokens: int, seq_prompt_len: int, - seq_output_len: int, max_tokens: int): - """Verify tokens after max_tokens are dropped and not appended to the - sequence. - """ - detokenizer = MagicMock(spec=Detokenizer) - scheduler = MagicMock(spec=Scheduler) - stop_checker = MagicMock(spec=StopChecker) - seq_counter = Counter() - - output_processor = MultiStepOutputProcessor( - detokenizer=detokenizer, - scheduler=[scheduler], - seq_counter=seq_counter, - get_tokenizer_for_seq=lambda _: mock_tokenizer(), - stop_checker=stop_checker, - ) - - seq_group = create_seq_group( - seq_prompt_len=seq_prompt_len, - seq_output_lens=[seq_output_len], - sampling_params=SamplingParams(max_tokens=max_tokens, ), - ) - - seq = seq_group.get_seqs()[0] - seq.status = SequenceStatus.RUNNING - - new_token_ids = list(range(num_new_tokens)) - - outputs = [ - CompletionSequenceGroupOutput( - samples=[ - SequenceOutput( - parent_seq_id=seq.seq_id, - output_token=output_token, - logprobs={output_token: Logprob(0.0)}, - ) - ], - prompt_logprobs=None, - ) for output_token in new_token_ids - ] - - assert seq.get_len() == seq_prompt_len + seq_output_len - output_processor.process_outputs(seq_group, outputs) - - # Expect the processed sequence to not go over max tokens in len. - assert seq.get_len() == seq_prompt_len + max_tokens - - # Expect the correct tokens were appended. - expected_appended_tokens = new_token_ids[:max_tokens - seq_output_len] - assert seq.get_token_ids( - )[-len(expected_appended_tokens):] == expected_appended_tokens - - -@pytest.mark.parametrize("seq_prompt_len", [1024]) -@pytest.mark.parametrize("seq_output_len", [128]) -@pytest.mark.parametrize("num_new_tokens", [12]) -@pytest.mark.parametrize("seed", list(range(6))) -@pytest.mark.skip_global_cleanup -def test_respects_eos_token_id(num_new_tokens: int, seq_prompt_len: int, - seq_output_len: int, seed: int): - """Verify the eos token id is included in the sequence, but subsequent - tokens are dropped (not appended to sequence). - """ - random.seed(seed) - detokenizer = MagicMock(spec=Detokenizer) - scheduler = MagicMock(spec=Scheduler) - stop_checker = MagicMock(spec=StopChecker) - seq_counter = Counter() - - eos_token_id = 100 - - output_processor = MultiStepOutputProcessor( - detokenizer=detokenizer, - scheduler=[scheduler], - seq_counter=seq_counter, - get_tokenizer_for_seq=lambda _: mock_tokenizer(eos_token_id), - stop_checker=stop_checker, - ) - - seq_group = create_seq_group( - seq_prompt_len=seq_prompt_len, - seq_output_lens=[seq_output_len], - sampling_params=SamplingParams( - # Ensure enough space. - max_tokens=seq_output_len + num_new_tokens, ), - ) - - seq = seq_group.get_seqs()[0] - seq.status = SequenceStatus.RUNNING - - new_token_ids = list(range(num_new_tokens)) - assert eos_token_id not in new_token_ids - eos_index = random.randint(0, len(new_token_ids) - 1) - new_token_ids[eos_index] = eos_token_id - - outputs = [ - CompletionSequenceGroupOutput( - samples=[ - SequenceOutput( - parent_seq_id=seq.seq_id, - output_token=output_token, - logprobs={output_token: Logprob(0.0)}, - ) - ], - prompt_logprobs=None, - ) for output_token in new_token_ids - ] - - assert seq.get_len() == seq_prompt_len + seq_output_len - output_processor.process_outputs(seq_group, outputs) - - # Expect the processed sequence to not go beyond provided eos. - assert seq.get_len() == seq_prompt_len + seq_output_len + (eos_index + 1) - - # Expect the correct tokens were appended. - expected_appended_tokens = new_token_ids[:eos_index + 1] - assert seq.get_token_ids( - )[-len(expected_appended_tokens):] == expected_appended_tokens - - -@pytest.mark.parametrize("seq_prompt_len", [1024]) -@pytest.mark.parametrize("seq_output_len", [128]) -@pytest.mark.parametrize("num_new_tokens", [12]) -@pytest.mark.parametrize("seed", list(range(6))) -@pytest.mark.skip_global_cleanup -def test_ignores_eos_token_id(num_new_tokens: int, seq_prompt_len: int, - seq_output_len: int, seed: int): - """When sampling parameters dictate that we should ignore the eos token id, - ensure all token ids are appended even if the eos token id is emitted. - """ - random.seed(seed) - detokenizer = MagicMock(spec=Detokenizer) - scheduler = MagicMock(spec=Scheduler) - stop_checker = MagicMock(spec=StopChecker) - seq_counter = Counter() - - eos_token_id = 100 - - output_processor = MultiStepOutputProcessor( - detokenizer=detokenizer, - scheduler=[scheduler], - seq_counter=seq_counter, - get_tokenizer_for_seq=lambda _: mock_tokenizer(eos_token_id), - stop_checker=stop_checker, - ) - - seq_group = create_seq_group( - seq_prompt_len=seq_prompt_len, - seq_output_lens=[seq_output_len], - sampling_params=SamplingParams( - # Ensure enough space. - max_tokens=seq_output_len + num_new_tokens, - ignore_eos=True, - ), - ) - - seq = seq_group.get_seqs()[0] - seq.status = SequenceStatus.RUNNING - - new_token_ids = list(range(num_new_tokens)) - assert eos_token_id not in new_token_ids - eos_index = random.randint(0, len(new_token_ids) - 1) - new_token_ids[eos_index] = eos_token_id - - outputs = [ - CompletionSequenceGroupOutput( - samples=[ - SequenceOutput( - parent_seq_id=seq.seq_id, - output_token=output_token, - logprobs={output_token: Logprob(0.0)}, - ) - ], - prompt_logprobs=None, - ) for output_token in new_token_ids - ] - - assert seq.get_len() == seq_prompt_len + seq_output_len - output_processor.process_outputs(seq_group, outputs) - - # Expect the processed sequence to go beyond eos. - assert seq.get_len() == seq_prompt_len + seq_output_len + num_new_tokens - - # Expect the correct tokens were appended. - expected_appended_tokens = new_token_ids[:seq_output_len + num_new_tokens - - seq_output_len] - assert seq.get_token_ids( - )[-len(expected_appended_tokens):] == expected_appended_tokens - - -def mock_tokenizer(eos_token_id=1000): - tokenizer = MagicMock(spec=PreTrainedTokenizer) - tokenizer.eos_token_id = eos_token_id - return tokenizer diff --git a/tests/entrypoints/openai/correctness/test_lmeval.py b/tests/entrypoints/openai/correctness/test_lmeval.py index d75731637d282..684407cd6ee97 100644 --- a/tests/entrypoints/openai/correctness/test_lmeval.py +++ b/tests/entrypoints/openai/correctness/test_lmeval.py @@ -26,15 +26,12 @@ DEFAULT_ARGS = ["--max-model-len", "4096"] MORE_ARGS_LIST = [ [], # Default ["--enable-chunked-prefill"], # Chunked - ["--num-scheduler-steps", "8"], # MS - ["--num-scheduler-steps", "8", "--multi-step-stream-outputs"] # MS+Stream ] MAX_WAIT_SECONDS = None if current_platform.is_tpu(): MORE_ARGS_LIST = [ [], # Default - # ["--num-scheduler-steps", "8"], # Multi-step << currently fails ] MAX_WAIT_SECONDS = 600 diff --git a/tests/metrics/test_metrics.py b/tests/metrics/test_metrics.py index 8cae8a80d38ed..dbd9c518e0200 100644 --- a/tests/metrics/test_metrics.py +++ b/tests/metrics/test_metrics.py @@ -94,45 +94,6 @@ def test_metric_counter_generation_tokens( f"metric: {metric_count!r}") -@pytest.mark.parametrize("model", MODELS) -@pytest.mark.parametrize("max_tokens", [128, 129]) -@pytest.mark.parametrize("disable_async_output_proc", [True, False]) -def test_metric_counter_generation_tokens_multi_step( - vllm_runner, - example_prompts, - model: str, - max_tokens: int, - disable_async_output_proc: bool, -) -> None: - num_scheduler_steps = 8 - with vllm_runner( - model, - disable_log_stats=False, - gpu_memory_utilization=0.4, - num_scheduler_steps=num_scheduler_steps, - disable_async_output_proc=disable_async_output_proc, - ) as vllm_model: - vllm_outputs = vllm_model.generate_greedy(example_prompts, max_tokens) - tokenizer = vllm_model.llm.get_tokenizer() - stat_logger = vllm_model.llm.llm_engine.stat_loggers['prometheus'] - metric_count = stat_logger.metrics.counter_generation_tokens.labels( - **stat_logger.labels)._value.get() - vllm_generation_count = 0 - for i in range(len(example_prompts)): - vllm_output_ids, vllm_output_str = vllm_outputs[i] - prompt_ids = tokenizer.encode(example_prompts[i]) - # vllm_output_ids contains both prompt tokens and generation tokens. - # We're interested only in the count of the generation tokens. - vllm_generation_count += len(vllm_output_ids) - len(prompt_ids) - - # The multi-step scheduling will continue to execute forward even when - # encountering EOS, leading to slightly imprecise metrics. - assert abs(vllm_generation_count - metric_count) <\ - len(example_prompts) * num_scheduler_steps, \ - (f"generation token count: {vllm_generation_count!r}\n" - f"metric: {metric_count!r}") - - @pytest.mark.parametrize("model", MODELS) @pytest.mark.parametrize("dtype", ["float"]) @pytest.mark.parametrize( diff --git a/tests/models/language/generation/test_hybrid.py b/tests/models/language/generation/test_hybrid.py index 76f6c226bab7c..19fcbf5616407 100644 --- a/tests/models/language/generation/test_hybrid.py +++ b/tests/models/language/generation/test_hybrid.py @@ -331,32 +331,6 @@ def test_state_cleanup( "could be related to finished_requests_ids") -@pytest.mark.parametrize("model", [SSM_MODELS[0], HYBRID_MODELS[0]]) -@pytest.mark.parametrize("max_tokens", [64]) -def test_multistep_correctness( - vllm_runner, - example_prompts, - model: str, - max_tokens: int, -) -> None: - with vllm_runner(model, num_scheduler_steps=8, - max_num_seqs=2) as vllm_model: - vllm_outputs_multistep = vllm_model.generate_greedy( - example_prompts, max_tokens) - - with vllm_runner(model, num_scheduler_steps=1, - max_num_seqs=2) as vllm_model: - vllm_outputs_single_step = vllm_model.generate_greedy( - example_prompts, max_tokens) - - check_outputs_equal( - outputs_0_lst=vllm_outputs_multistep, - outputs_1_lst=vllm_outputs_single_step, - name_0="vllm_outputs_multistep", - name_1="vllm_outputs_single_step", - ) - - @multi_gpu_test(num_gpus=2) @pytest.mark.parametrize("model", [SSM_MODELS[0], HYBRID_MODELS[0]]) @pytest.mark.parametrize("max_tokens", [64]) diff --git a/tests/multi_step/test_correctness_async_llm.py b/tests/multi_step/test_correctness_async_llm.py deleted file mode 100644 index 56e339d485c56..0000000000000 --- a/tests/multi_step/test_correctness_async_llm.py +++ /dev/null @@ -1,232 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project - -# Test the AsyncLLMEngine with multi-step-decoding -from typing import Optional - -import pytest - -from vllm.utils import STR_BACKEND_ENV_VAR - -from ..models.utils import check_logprobs_close -from ..utils import (completions_with_server_args, get_client_text_generations, - get_client_text_logprob_generations) - -MODELS = [ - "JackFram/llama-160m", -] -NUM_SCHEDULER_STEPS = [8] # Multi-step decoding steps -NUM_PROMPTS = [10] - -DEFAULT_SERVER_ARGS: list[str] = [ - "--distributed-executor-backend", - "ray", - "--gpu-memory-utilization", - "0.85", - "--swap-space", - "16", -] - - -@pytest.mark.parametrize("model", MODELS) -@pytest.mark.parametrize(("tp_size, pp_size"), [ - (1, 1), - (2, 2), -]) -@pytest.mark.parametrize("eager_mode", [False, True]) -@pytest.mark.parametrize("num_scheduler_steps", NUM_SCHEDULER_STEPS) -@pytest.mark.parametrize("num_prompts", NUM_PROMPTS) -@pytest.mark.parametrize("num_logprobs", [5]) -@pytest.mark.parametrize("is_async", [True]) -@pytest.mark.parametrize("attention_backend", ["FLASHINFER", "FLASH_ATTN"]) -@pytest.mark.parametrize("enable_chunked_prefill", [True, False]) -@pytest.mark.asyncio -async def test_multi_step( - example_prompts, - model: str, - tp_size: int, - pp_size: int, - eager_mode: int, - num_scheduler_steps: int, - num_prompts: int, - is_async: bool, - num_logprobs: Optional[int], - attention_backend: str, - enable_chunked_prefill: bool, - monkeypatch: pytest.MonkeyPatch, -) -> None: - """Test vLLM engine with multi-step scheduling in an OpenAI-protocol - client/server environment. - - Set up an engine with single-step scheduling as a ground-truth reference. - - Send a completions API request to both engines with the same prompts. - - Validate: - * Generated tokens match - * Generated logprobs are all very close - - Args: - example_prompts: test fixture providing example prompts - model: model under test (same for single- and multi-step engines) - tp_size: degree of tensor-parallelism - pp_size: degree of pipeline-parallelism - eager_mode - num_scheduler_steps: for multi-step scheduling, GPU-side steps per - GPU -> CPU output transfer - num_prompts: number of example prompts under test - num_logprobs: corresponds to the `logprobs` argument to the OpenAI - completions endpoint; `None` -> no logprobs - """ - if enable_chunked_prefill and \ - (pp_size > 1 or attention_backend != "FLASH_ATTN"): - pytest.skip("Multi-step with Chunked-Prefill only supports" - "PP=1 and FLASH_ATTN backend") - - with monkeypatch.context() as m: - m.setenv(STR_BACKEND_ENV_VAR, attention_backend) - - prompts = example_prompts - if len(prompts) < num_prompts: - prompts = prompts * ((num_prompts // len(prompts)) + 1) - prompts = prompts[:num_prompts] - assert len(prompts) == num_prompts - - server_args = DEFAULT_SERVER_ARGS + ["--enforce-eager"] - ms_server_args = DEFAULT_SERVER_ARGS + \ - ["--num-scheduler-steps", f"{num_scheduler_steps}"] - - if not is_async: - ms_server_args += ["--disable-async-output-proc"] - - if eager_mode: - ms_server_args.append("--enforce-eager") - - if enable_chunked_prefill: - ms_server_args.append("--enable-chunked-prefill") - - distributed_args = [ - "--tensor-parallel-size", - str(tp_size), - "--pipeline-parallel-size", - str(pp_size), - ] - - # Spin up client/server & issue completion API requests. - # Default `max_wait_seconds` is 240 but was empirically - # was raised 5x to 1200 *just for this test* due to - # observed timeouts in GHA CI - ref_completions = await completions_with_server_args( - prompts, - model, - server_args + distributed_args, - num_logprobs, - max_wait_seconds=5 * 240) - test_completions = await completions_with_server_args( - prompts, - model, - ms_server_args + distributed_args, - num_logprobs, - max_wait_seconds=5 * 240) - - # Assert multi-step scheduling produces identical tokens - # to single-step scheduling. - ref_generations = get_client_text_generations(ref_completions) - test_generations = get_client_text_generations(test_completions) - assert ref_generations == test_generations - - # Assert multi-step scheduling produces nearly-identical logprobs - # to single-step scheduling. - ref_text_logprobs = get_client_text_logprob_generations( - ref_completions) - test_text_logprobs = get_client_text_logprob_generations( - test_completions) - check_logprobs_close( - outputs_0_lst=ref_text_logprobs, - outputs_1_lst=test_text_logprobs, - name_0="hf", - name_1="vllm", - ) - - -@pytest.mark.parametrize(("tp_size, pp_size"), [ - (1, 2), -]) -@pytest.mark.asyncio -async def test_multi_step_pp_smoke( - tp_size: int, - pp_size: int, - monkeypatch: pytest.MonkeyPatch, -) -> None: - """ - Smoke test for the vLLM engine with multi-step scheduling in an - OpenAI-protocol client/server environment. - - This tests compares the outputs between multi-step scheduling and - single-step scheduling. Notably, this test lets the engines generate - more tokens (default is 5) and test for an exact match over all the - tokens. - - Args: - tp_size: degree of tensor-parallelism - pp_size: degree of pipeline-parallelism - eager_mode - """ - - model = "JackFram/llama-160m" - num_scheduler_steps = 8 - attention_backend = "FLASH_ATTN" - max_num_seqs = 3 - - with monkeypatch.context() as m: - m.setenv(STR_BACKEND_ENV_VAR, attention_backend) - - # Prompt from the ShareGPT dataset - prompts = [ - "in the jtbd context whats a push?", # codespell:ignore - "in the jtbd context whats a push?", # codespell:ignore - "in the jtbd context whats a push?", # codespell:ignore - "in the jtbd context whats a push?", # codespell:ignore - ] - # Use varying max_tokens to introduce scheduling randomness. - max_tokens = [10 * i for i in range(1, len(prompts) + 1)] - assert len(prompts) == len(max_tokens) - - test_args = [ - "--tensor-parallel-size", - str(tp_size), "--pipeline-parallel-size", - str(pp_size), "--max-num-seqs", - str(max_num_seqs) - ] - - server_args = DEFAULT_SERVER_ARGS + test_args - ms_server_args = DEFAULT_SERVER_ARGS + \ - ["--num-scheduler-steps", f"{num_scheduler_steps}"] + \ - test_args - - # Spin up client/server & issue completion API requests. - # Default `max_wait_seconds` is 240 but was empirically - # was raised 3x to 720 *just for this test* due to - # observed timeouts in GHA CI - ref_completions = await completions_with_server_args( - prompts=prompts, - model_name=model, - server_cli_args=server_args, - num_logprobs=None, - max_wait_seconds=5 * 240, - max_tokens=max_tokens) - - test_completions = await completions_with_server_args( - prompts=prompts, - model_name=model, - server_cli_args=ms_server_args, - num_logprobs=None, - max_wait_seconds=5 * 240, - max_tokens=max_tokens) - - # Assert multi-step scheduling produces identical tokens - # to single-step scheduling. - ref_generations = get_client_text_generations(ref_completions) - test_generations = get_client_text_generations(test_completions) - - assert ref_generations == test_generations diff --git a/tests/multi_step/test_correctness_llm.py b/tests/multi_step/test_correctness_llm.py deleted file mode 100644 index 0df00c98b72cf..0000000000000 --- a/tests/multi_step/test_correctness_llm.py +++ /dev/null @@ -1,383 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project - -# Test the LLMEngine with multi-step-decoding - -import copy -from typing import Optional - -import pytest - -from vllm.platforms import current_platform -from vllm.utils import STR_BACKEND_ENV_VAR - -from ..models.utils import check_logprobs_close, check_outputs_equal - -MODELS = [ - "JackFram/llama-160m", -] -NUM_SCHEDULER_STEPS = [8] # Multi-step decoding steps -NUM_PROMPTS = [10] - - -@pytest.mark.parametrize("model", MODELS) -@pytest.mark.parametrize("dtype", ["half"]) -@pytest.mark.parametrize("tp_size", [1]) -@pytest.mark.parametrize("enable_chunked_prefill", [False, True]) -@pytest.mark.parametrize("max_tokens", [5]) -@pytest.mark.parametrize("enforce_eager", [True, False]) -@pytest.mark.parametrize("num_scheduler_steps", NUM_SCHEDULER_STEPS) -@pytest.mark.parametrize("num_prompts", NUM_PROMPTS) -@pytest.mark.parametrize("num_logprobs", [None, 5]) -@pytest.mark.parametrize("attention_backend", ["FLASH_ATTN", "FLASHINFER"]) -def test_multi_step_llm( - hf_runner, - vllm_runner, - example_prompts, - model: str, - dtype: str, - tp_size: int, - enable_chunked_prefill: bool, - max_tokens: int, - enforce_eager: int, - num_scheduler_steps: int, - num_prompts: int, - num_logprobs: Optional[int], - attention_backend: str, - monkeypatch: pytest.MonkeyPatch, -) -> None: - """Test vLLM engine with multi-step scheduling via sync LLM Engine. - - Set up a HuggingFace (HF) transformers model as a ground-truth reference. - - Prompt them with the same example prompts. - - Validate: - * Generated tokens match - * Generated logprobs are all very close - - Args: - hf_runner: HF transformers model runner fixture - vllm_runner: vLLM model runner fixture - example_prompts: test fixture providing example prompts - model: model under test (same for single- and multi-step engines) - dtype: tensor datatype for engine to utilize - tp_size: degree of tensor-parallelism - enable_chunked_prefill: chunked-prefill on/off - max_tokens: the maximum number of tokens to generate - enforce_eager - num_scheduler_steps: for multi-step scheduling, GPU-side steps per - GPU -> CPU output transfer - num_prompts: number of example prompts under test - num_logprobs: corresponds to the `logprobs` argument to the OpenAI - completions endpoint; `None` -> 1 logprob returned. - """ - if current_platform.is_rocm() and \ - (attention_backend == "FLASHINFER" or enable_chunked_prefill): - pytest.skip( - "Multi-Step with FLASHINFER or Chunked-Prefill is not supported" - "on ROCm") - - with monkeypatch.context() as m: - m.setenv(STR_BACKEND_ENV_VAR, attention_backend) - - prompts = example_prompts - if len(prompts) < num_prompts: - prompts = prompts * ((num_prompts // len(prompts)) + 1) - prompts = prompts[:num_prompts] - assert len(prompts) == num_prompts - - with vllm_runner( - model, - dtype=dtype, - enforce_eager=enforce_eager, - gpu_memory_utilization=0.7, - tensor_parallel_size=tp_size, - enable_chunked_prefill=enable_chunked_prefill, - num_scheduler_steps=num_scheduler_steps, - ) as vllm_model: - vllm_outputs = (vllm_model.generate_greedy(prompts, max_tokens) - if num_logprobs is None else - vllm_model.generate_greedy_logprobs( - prompts, max_tokens, num_logprobs)) - - with hf_runner(model, dtype=dtype) as hf_model: - hf_outputs = (hf_model.generate_greedy(prompts, max_tokens) - if num_logprobs is None else - hf_model.generate_greedy_logprobs_limit( - prompts, max_tokens, num_logprobs)) - - if num_logprobs is None: - check_outputs_equal( - outputs_0_lst=hf_outputs, - outputs_1_lst=vllm_outputs, - name_0="hf", - name_1="vllm", - ) - else: - check_logprobs_close( - outputs_0_lst=hf_outputs, - outputs_1_lst=vllm_outputs, - name_0="hf", - name_1="vllm", - ) - - -@pytest.mark.parametrize("model", MODELS) -@pytest.mark.parametrize("dtype", ["half"]) -@pytest.mark.parametrize("tp_size", [1]) -@pytest.mark.parametrize("max_tokens", [5]) -@pytest.mark.parametrize("enforce_eager", [True]) -@pytest.mark.parametrize("num_scheduler_steps", NUM_SCHEDULER_STEPS) -@pytest.mark.parametrize("num_prompts", NUM_PROMPTS) -@pytest.mark.parametrize("num_logprobs,num_prompt_logprobs", [(5, 5)]) -@pytest.mark.parametrize("attention_backend", ["FLASH_ATTN"]) -def test_multi_step_llm_w_prompt_logprobs( - vllm_runner, - example_prompts, - model: str, - dtype: str, - tp_size: int, - max_tokens: int, - enforce_eager: int, - num_scheduler_steps: int, - num_prompts: int, - num_logprobs: Optional[int], - num_prompt_logprobs: Optional[int], - attention_backend: str, - monkeypatch: pytest.MonkeyPatch, -) -> None: - """Test prompt logprobs with multi-step scheduling via sync LLM Engine. - - Set up a vLLM engine instance w/ single-step scheduling as a ground-truth - reference. - - Prompt them with the same example prompts. - - Validate: - * All generated logprobs are all very close - - Args: - hf_runner: HF transformers model runner fixture - vllm_runner: vLLM model runner fixture - example_prompts: test fixture providing example prompts - model: model under test (same for single- and multi-step engines) - dtype: tensor datatype for engine to utilize - tp_size: degree of tensor-parallelism - max_tokens: the maximum number of tokens to generate - enforce_eager - num_scheduler_steps: for multi-step scheduling, GPU-side steps per - GPU -> CPU output transfer - num_prompts: number of example prompts under test - num_logprobs: corresponds to the `logprobs` argument to the OpenAI - completions endpoint; `None` -> no logprobs - num_prompt_logprobs: number of logprobs to return for each prompt token; - note that this argument is not supported by the - OpenAI completions endpoint. - """ - with monkeypatch.context() as m: - m.setenv(STR_BACKEND_ENV_VAR, attention_backend) - - prompts = example_prompts - if len(prompts) < num_prompts: - prompts = prompts * ((num_prompts // len(prompts)) + 1) - prompts = prompts[:num_prompts] - assert len(prompts) == num_prompts - - with vllm_runner( - model, - dtype=dtype, - enforce_eager=enforce_eager, - gpu_memory_utilization=0.7, - tensor_parallel_size=tp_size, - num_scheduler_steps=num_scheduler_steps, - ) as vllm_model: - vllm_outputs = vllm_model.generate_greedy_logprobs( - prompts, - max_tokens, - num_logprobs, - num_prompt_logprobs=num_prompt_logprobs) - - with vllm_runner( - model, - dtype=dtype, - enforce_eager=enforce_eager, - gpu_memory_utilization=0.7, - tensor_parallel_size=tp_size, - ) as vllm_model: - single_step_vllm_outputs = vllm_model.generate_greedy_logprobs( - prompts, - max_tokens, - num_logprobs, - num_prompt_logprobs=num_prompt_logprobs) - - check_logprobs_close( - outputs_0_lst=single_step_vllm_outputs, - outputs_1_lst=vllm_outputs, - name_0="hf", - name_1="vllm", - ) - - -@pytest.mark.parametrize("model", MODELS) -@pytest.mark.parametrize("dtype", ["half"]) -@pytest.mark.parametrize("tp_size", [1]) -@pytest.mark.parametrize("max_tokens", [5]) -@pytest.mark.parametrize("enforce_eager", [True]) -@pytest.mark.parametrize("num_scheduler_steps", NUM_SCHEDULER_STEPS) -@pytest.mark.parametrize("num_prompts", NUM_PROMPTS) -@pytest.mark.parametrize("num_logprobs", [None, 5]) -@pytest.mark.parametrize("attention_backend", ["FLASH_ATTN"]) -@pytest.mark.skipif( - current_platform.is_rocm(), - reason="Multi-Step + Chunked-Prefill not supported on ROCm") -def test_multi_step_llm_chunked_prefill_prefix_cache( - vllm_runner, - example_prompts, - model: str, - dtype: str, - tp_size: int, - max_tokens: int, - enforce_eager: int, - num_scheduler_steps: int, - num_prompts: int, - num_logprobs: Optional[int], - attention_backend: str, - monkeypatch: pytest.MonkeyPatch, -) -> None: - """Test vLLM engine with multi-step+"single-step chunked prefill"+APC. - - Set up contrived scenario which tests for a possible failure mode of - scheduling with multi-step+"single-step chunked prefill"+APC - - "single-step chunked prefill" here refers to the current vLLM multi-step+ - chunked-prefill implementation, which requires that a prefill may only - be scheduled in the same step as decodes if the prefill prompt fits in a - single chunk (note that "complete" multi-step+chunked-prefill would allow - a prefill to span multiple chunks & multiple steps but that is not yet - the case.) - - "APC" is short for "automatic prefix caching". - - This test creates a scenario where the scheduler must decide whether/how - to schedule a prefill with a prompt that exceeds the available token budget. - The correct behavior for multi-step+"single-step chunked prefill"+APC is to - put off scheduling the prefill until a future step. - - Validate that: - * Multi-step kernels do not raise an exception due to incorrect scheduler - behavior - * Generated tokens match between - multi-step+"single-step chunked prefill"+APC and - single-step scheduling. - * (If logprobs are enabled) check logprobs are close enough - - Args: - vllm_runner: vLLM model runner fixture - example_prompts: test fixture providing example prompts - model: model under test (same for single- and multi-step engines) - dtype: tensor datatype for engine to utilize - tp_size: degree of tensor-parallelism - max_tokens: the maximum number of tokens to generate - enforce_eager - num_scheduler_steps: for multi-step scheduling, GPU-side steps per - GPU -> CPU output transfer - num_prompts: number of example prompts under test - num_logprobs: corresponds to the `logprobs` argument to the OpenAI - completions endpoint; `None` -> 1 logprob returned. - """ - - # Set up contrived test for correct scheduling behavior with - # multi-step+"single-step chunked prefill"+APC. - # - # Assume block_size=16 - # - # Assume max_num_batched_tokens=48 - # => Per-step token budget=48 - # - # 1. Scheduler schedules 0th prompt (24 tokens) - # => Remaining token budget=24 - # 2. Scheduler attempts to schedule 1st prompt (30 tokens) - # * 30 tokens exceeds 24 token remaining budget - # * Correct behavior: do not schedule this prompt in this step - # * Incorrect behavior: schedule prompt chunk - # * `do_sample=False` for this prompt in this step - # * Chunk size = (remaining tokens // block size) * block size - # - # The Incorrect scheduling behavior - if it occurs - will cause an exception - # in the model runner resulting from `do_sample=False`. - with monkeypatch.context() as m: - m.setenv(STR_BACKEND_ENV_VAR, attention_backend) - - assert len(example_prompts) >= 2 - challenge_prompts = copy.deepcopy(example_prompts) - challenge_prompts[0] = ( - 'vLLM is a high-throughput and memory-efficient ' - 'inference and serving engine for LLMs.\n') # 24 tok - challenge_prompts[1] = ( - 'Briefly describe the major milestones in the ' - 'development of artificial intelligence from 1950 to 2020.\n' - ) # 30 tok - - # If necessary, adjust the length of `challenge_prompts` to match - # `num_prompts` - if len(challenge_prompts) < num_prompts: - challenge_prompts = (challenge_prompts * - ((num_prompts // len(challenge_prompts)) + 1)) - challenge_prompts = challenge_prompts[:num_prompts] - assert len(challenge_prompts) == num_prompts - - # Single-step scheduler baseline - with vllm_runner( - model, - dtype=dtype, - enforce_eager=enforce_eager, - gpu_memory_utilization=0.7, - tensor_parallel_size=tp_size, - num_scheduler_steps=num_scheduler_steps, - max_model_len=48, - max_num_batched_tokens=48, - max_num_seqs=4, - block_size=16, - ) as vllm_model: - outputs_baseline = ( - vllm_model.generate_greedy(challenge_prompts, max_tokens) if - num_logprobs is None else vllm_model.generate_greedy_logprobs( - challenge_prompts, max_tokens, num_logprobs)) - - # multi-step+"single-step chunked prefill"+APC - with vllm_runner( - model, - dtype=dtype, - enforce_eager=enforce_eager, - gpu_memory_utilization=0.7, - tensor_parallel_size=tp_size, - enable_chunked_prefill=True, - enable_prefix_caching=True, - num_scheduler_steps=num_scheduler_steps, - max_model_len=48, - max_num_batched_tokens=48, - max_num_seqs=4, - block_size=16, - ) as vllm_model: - outputs_w_features = ( - vllm_model.generate_greedy(challenge_prompts, max_tokens) if - num_logprobs is None else vllm_model.generate_greedy_logprobs( - challenge_prompts, max_tokens, num_logprobs)) - - if num_logprobs is None: - # No-logprobs test - check_outputs_equal( - outputs_0_lst=outputs_baseline, - outputs_1_lst=outputs_w_features, - name_0="multi-step", - name_1="multi-step+features", - ) - else: - # Yes-logprobs test - check_logprobs_close( - outputs_0_lst=outputs_baseline, - outputs_1_lst=outputs_w_features, - name_0="multi-step", - name_1="multi-step+features", - ) diff --git a/tests/samplers/test_logits_processor.py b/tests/samplers/test_logits_processor.py deleted file mode 100644 index 123f9595e97b9..0000000000000 --- a/tests/samplers/test_logits_processor.py +++ /dev/null @@ -1,70 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project - -import pytest -import torch - -from vllm import SamplingParams - -MODELS = ["distilbert/distilgpt2"] - - -@pytest.fixture(scope="function", autouse=True) -def use_v0_only(monkeypatch): - """ - This file tests V0 internals, so set VLLM_USE_V1=0. - """ - monkeypatch.setenv('VLLM_USE_V1', '0') - - -@pytest.mark.parametrize("model", MODELS) -@pytest.mark.parametrize("dtype", ["half"]) -def test_logits_processor_force_generate( - vllm_runner, - example_prompts, - model: str, - dtype: str, -) -> None: - with vllm_runner(model, dtype=dtype) as vllm_model: - tokenizer = vllm_model.llm.get_tokenizer() - repeat_times = 2 - enforced_answers = " vLLM" - vllm_token_ids = tokenizer.encode(enforced_answers, - add_special_tokens=False) - max_tokens = len(vllm_token_ids) * repeat_times - - def pick_vllm(token_ids, logits): - token_id = vllm_token_ids[len(token_ids) % len(vllm_token_ids)] - logits[token_id] = torch.finfo(logits.dtype).max - return logits - - params_with_logprobs = SamplingParams( - logits_processors=[pick_vllm], - prompt_logprobs=3, - max_tokens=max_tokens, - ) - - # test logits_processors when prompt_logprobs is not None - vllm_model.llm._add_request( - example_prompts[0], - params=params_with_logprobs, - ) - - # test prompt_logprobs is not None - vllm_model.llm._add_request( - example_prompts[1], - params=SamplingParams( - prompt_logprobs=3, - max_tokens=max_tokens, - ), - ) - - # test grouped requests - vllm_model.llm._add_request( - example_prompts[2], - params=SamplingParams(max_tokens=max_tokens), - ) - - outputs = vllm_model.llm._run_engine(use_tqdm=False) - - assert outputs[0].outputs[0].text == enforced_answers * repeat_times diff --git a/tests/tpu/lora/test_lora.py b/tests/tpu/lora/test_lora.py index 4c47b8c43caff..636108e985816 100644 --- a/tests/tpu/lora/test_lora.py +++ b/tests/tpu/lora/test_lora.py @@ -30,7 +30,6 @@ def use_v1_only(monkeypatch: pytest.MonkeyPatch): def setup_vllm(num_loras: int, tp: int) -> vllm.LLM: return vllm.LLM(model="Qwen/Qwen2.5-3B-Instruct", - num_scheduler_steps=1, max_model_len=256, max_seq_len_to_capture=256, max_num_seqs=8, diff --git a/tests/utils_/test_utils.py b/tests/utils_/test_utils.py index a2db1ae684341..8be1e103dc651 100644 --- a/tests/utils_/test_utils.py +++ b/tests/utils_/test_utils.py @@ -236,7 +236,6 @@ def test_config_args(parser_with_config, cli_config_file): ['serve', 'mymodel', '--config', cli_config_file]) assert args.tensor_parallel_size == 2 assert args.trust_remote_code - assert not args.multi_step_stream_outputs def test_config_file(parser_with_config): @@ -828,7 +827,6 @@ def test_model_specification(parser_with_config, cli_config_file, ]) assert args.tensor_parallel_size == 2 assert args.trust_remote_code is True - assert args.multi_step_stream_outputs is False assert args.port == 12312 diff --git a/tests/v1/test_oracle.py b/tests/v1/test_oracle.py index a756c89b520f9..1f16e92f657e0 100644 --- a/tests/v1/test_oracle.py +++ b/tests/v1/test_oracle.py @@ -58,12 +58,6 @@ def test_unsupported_configs(monkeypatch): disable_async_output_proc=True, ).create_engine_config() - with pytest.raises(NotImplementedError): - AsyncEngineArgs( - model=MODEL, - num_scheduler_steps=5, - ).create_engine_config() - with pytest.raises(NotImplementedError): AsyncEngineArgs( model=MODEL, diff --git a/tests/worker/test_model_input.py b/tests/worker/test_model_input.py index ec33d334ab650..2031f41fab87d 100644 --- a/tests/worker/test_model_input.py +++ b/tests/worker/test_model_input.py @@ -11,7 +11,6 @@ from vllm.attention.backends.utils import CommonAttentionState from vllm.model_executor import SamplingMetadata from vllm.model_executor.pooling_metadata import PoolingMetadata from vllm.worker.model_runner import ModelInputForGPUWithSamplingMetadata -from vllm.worker.multi_step_model_runner import StatefulModelInput from vllm.worker.pooling_model_runner import ( ModelInputForGPUWithPoolingMetadata) @@ -166,81 +165,3 @@ def test_embedding_model_runner_input(): None) == getattr(attn_metadata, field.name, None) # Pooling metadata is not broadcast. assert received_model_input.pooling_metadata is None - - -def test_multi_step_model_runner_input(): - sampling_metadata = SamplingMetadata( - ["seq_group"], - "selected_token_indices", - "categorized_sample_indices", - "num_prompts", - ) - attn_metadata = AttentionMetadata( - num_prefills=1, - num_prefill_tokens=2, - num_decode_tokens=3, - slot_mapping=torch.zeros(1), - multi_modal_placeholder_index_maps=None, - enable_kv_scales_calculation=True, - ) - frozen_model_input = ModelInputForGPUWithSamplingMetadata( - input_tokens=torch.ones(10), - input_positions=torch.ones(10), - sampling_metadata=sampling_metadata, - attn_metadata=attn_metadata) - - model_input = StatefulModelInput( - frozen_model_input=frozen_model_input, - is_last_step=True, - is_first_multi_step=False, - current_step=4, - last_sampled_token_ids=torch.ones((10, 1)), - is_multi_step=True, - num_queries=8, - num_seqs=5, - cached_outputs=[], - ) - - assert isinstance(model_input, StatefulModelInput) - - # Test round trip serialization. - tensor_dict = model_input.as_broadcastable_tensor_dict() - attn_backend = MockAttentionBackend() - received_model_input = (StatefulModelInput.from_broadcasted_tensor_dict( - tensor_dict, attn_backend=attn_backend)) - - received_frozen_input = received_model_input.frozen_model_input - - # Check that received copy has correct values. - assert isinstance(received_model_input, StatefulModelInput) - assert received_frozen_input.input_tokens is not None - assert (received_frozen_input.input_tokens == - frozen_model_input.input_tokens).all() - assert received_frozen_input.input_positions is not None - assert (received_frozen_input.input_positions == - frozen_model_input.input_positions).all() - assert received_frozen_input.multi_modal_kwargs is None - assert (frozen_model_input.multi_modal_kwargs == - frozen_model_input.multi_modal_kwargs) - assert received_frozen_input.lora_requests is None - assert (received_frozen_input.lora_requests == - frozen_model_input.lora_requests) - assert received_frozen_input.lora_mapping is None - assert ( - received_frozen_input.lora_mapping == frozen_model_input.lora_mapping) - for field in dataclasses.fields(AttentionMetadata): - assert getattr(received_frozen_input.attn_metadata, field.name, - None) == getattr(attn_metadata, field.name, None) - # For sampling metadata, only selected_token_indices is copied. - assert (received_frozen_input.sampling_metadata.selected_token_indices == - sampling_metadata.selected_token_indices) - assert received_frozen_input.sampling_metadata.seq_groups is None - - # check non frozen fields - assert received_model_input.is_last_step == model_input.is_last_step - assert (received_model_input.is_first_multi_step == - model_input.is_first_multi_step) - assert received_model_input.current_step == model_input.current_step - assert (received_model_input.last_sampled_token_ids == - model_input.last_sampled_token_ids).all() - assert received_model_input.is_multi_step == model_input.is_multi_step diff --git a/vllm/config/__init__.py b/vllm/config/__init__.py index df4eb33f5d45d..6649cd89ee34f 100644 --- a/vllm/config/__init__.py +++ b/vllm/config/__init__.py @@ -3779,8 +3779,6 @@ class VllmConfig: f"observability_config={self.observability_config!r}, " f"seed={self.model_config.seed}, " f"served_model_name={self.model_config.served_model_name}, " - f"num_scheduler_steps={self.scheduler_config.num_scheduler_steps}, " - f"multi_step_stream_outputs={self.scheduler_config.multi_step_stream_outputs}, " # noqa f"enable_prefix_caching={self.cache_config.enable_prefix_caching}, " f"chunked_prefill_enabled={self.scheduler_config.chunked_prefill_enabled}, " # noqa f"use_async_output_proc={self.model_config.use_async_output_proc}, " diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index 61346da145bbd..63894e7f5dc8b 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -929,8 +929,7 @@ class Scheduler: ) def _get_prompt_limit(self, seq_group: SequenceGroup) -> int: - if (self.scheduler_config.chunked_prefill_enabled - and not self.scheduler_config.is_multi_step): + if self.scheduler_config.chunked_prefill_enabled: prompt_limit = self.scheduler_config.max_model_len else: prompt_limit = min( @@ -1114,9 +1113,6 @@ class Scheduler: continue num_lookahead_slots: int = 0 - if self.scheduler_config.is_multi_step and enable_chunking: - num_lookahead_slots = self._get_num_lookahead_slots( - True, enable_chunking) # If the sequence group cannot be allocated, stop. can_allocate = self.block_manager.can_allocate( @@ -1195,24 +1191,6 @@ class Scheduler: partial_prefill_metadata.maybe_increment_partial_prefills( seq_group) - if enable_chunking and self.scheduler_config.is_multi_step: - blocks_to_copy: List[Tuple[int, int]] = [] - # init_multi_step_from_lookahead_slots happens in append_slots - self._append_slots(seq_group, blocks_to_copy, enable_chunking) - # This assert will trip when a copy-on-write happens. This is - # not a concern as the very first sequence-group block - # allocation happens above. Still, we have the assert to - # catch any edge-cases. - assert not blocks_to_copy - else: - seq_group.init_multi_step_from_lookahead_slots( - num_lookahead_slots, - num_scheduler_steps=self.scheduler_config. - num_scheduler_steps, - is_multi_step=self.scheduler_config.is_multi_step, - enable_chunking=enable_chunking, - ) - seq_groups.append( ScheduledSequenceGroup(seq_group=seq_group, token_chunk_size=num_new_tokens)) @@ -1453,14 +1431,6 @@ class Scheduler: num_prefill_groups = (len(prefills.seq_groups) + len(swapped_in.prefill_seq_groups) + len(running_scheduled.prefill_seq_groups)) - # If all prompts, then we set num_lookahead_slots to 0 - # this allows us to go through the `no_spec` path in - # `spec_decode_worker.py` - all_prefills = len(scheduled_seq_groups) == num_prefill_groups - num_lookahead_slots = (0 if - (all_prefills - and not self.scheduler_config.is_multi_step) - else running_scheduled.num_lookahead_slots) return SchedulerOutputs( scheduled_seq_groups=scheduled_seq_groups, num_prefill_groups=num_prefill_groups, @@ -1472,7 +1442,7 @@ class Scheduler: swapped_in.blocks_to_copy, ignored_seq_groups=prefills.ignored_seq_groups + swapped_in.infeasible_seq_groups, - num_lookahead_slots=num_lookahead_slots, + num_lookahead_slots=0, running_queue_size=len(self.running), preempted=(len(running_scheduled.preempted) + len(running_scheduled.swapped_out)), @@ -1516,11 +1486,6 @@ class Scheduler: num_lookahead_slots = self._get_num_lookahead_slots( is_prefill, enable_chunking) - if is_prefill and num_lookahead_slots > 0: - # Appending prefill slots only happens multi-step and - # chunked-prefill are enabled together. - assert self.scheduler_config.is_multi_step and enable_chunking - return self.block_manager.can_append_slots( seq_group=seq_group, num_lookahead_slots=num_lookahead_slots) @@ -1776,19 +1741,7 @@ class Scheduler: num_lookahead_slots: int = self._get_num_lookahead_slots( is_prefill, enable_chunking) - seq_group.init_multi_step_from_lookahead_slots( - num_lookahead_slots, - num_scheduler_steps=self.scheduler_config.num_scheduler_steps, - is_multi_step=self.scheduler_config.is_multi_step, - enable_chunking=enable_chunking, - ) - seq_status: Optional[SequenceStatus] = SequenceStatus.RUNNING - if self.scheduler_config.is_multi_step and enable_chunking: - # In multi-step chunked-prefill any sequence type can have - # slots appended. - seq_status = None - for seq in seq_group.get_seqs(status=seq_status): cows = self.block_manager.append_slots(seq, num_lookahead_slots) if len(cows) > 0: @@ -1904,29 +1857,8 @@ class Scheduler: """The number of slots to allocate per sequence per step, beyond known token ids. Speculative decoding uses these slots to store KV activations of tokens which may or may not be accepted. - - Speculative decoding does not yet support prefill, so we do not perform - lookahead allocation for prefill. - - When chunking is enabled with multi-step, we allocate lookahead slots - for the prefills for when the prefills turn into decodes in the first - step. """ - if is_prefill: - if self.scheduler_config.is_multi_step and enable_chunking: - # num_lookahead_slots was introduced in the context of decodes, - # in Speculative Decoding. - # When the num_scheduler_steps is 8, say, then the - # num_lookahead_slots is 7. Meaning, we are doing a 1-step of - # decode anyways and we wish to do 7 more. - # - # "lookaheads" for prefills, is introduced in support for - # Chunked-Prefill in Multi-Step. - return self.scheduler_config.num_lookahead_slots + 1 - else: - return 0 - - return self.scheduler_config.num_lookahead_slots + return 0 def _get_num_new_uncached_and_cached_tokens( self, @@ -2068,24 +2000,6 @@ class Scheduler: The number of new tokens to schedule after chunking. """ remaining_token_budget = budget.remaining_token_budget() - if scheduler_config.is_multi_step: - # The current multi-step + chunked prefill capability does - # not actually support chunking prompts. - # - # Therefore, `num_new_tokens` is computed in the same fashion - # for both multi-step+chunked-prefill & - # multi-step+chunked-prefill+APC - # - # Prompts with more tokens than the current remaining budget - # are postponed to future scheduler steps - if num_new_tokens > prompt_limit: - # If the seq_group is in prompt-stage, pass the - # num_new_tokens as-is so the caller can ignore - # the sequence. - return num_new_tokens - - return 0 if num_new_tokens > \ - remaining_token_budget else num_new_tokens # Get the number of tokens to allocate to this prefill slot prefill_slot_budget = ( diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index d74db67bda0dc..c058001ceb974 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -362,8 +362,6 @@ class EngineArgs: lora_dtype: Optional[Union[str, torch.dtype]] = LoRAConfig.lora_dtype lora_extra_vocab_size: int = LoRAConfig.lora_extra_vocab_size - num_scheduler_steps: int = SchedulerConfig.num_scheduler_steps - multi_step_stream_outputs: bool = SchedulerConfig.multi_step_stream_outputs ray_workers_use_nsight: bool = ParallelConfig.ray_workers_use_nsight num_gpu_blocks_override: Optional[ int] = CacheConfig.num_gpu_blocks_override @@ -799,11 +797,8 @@ class EngineArgs: **scheduler_kwargs["delay_factor"]) scheduler_group.add_argument("--preemption-mode", **scheduler_kwargs["preemption_mode"]) - scheduler_group.add_argument("--num-scheduler-steps", - **scheduler_kwargs["num_scheduler_steps"]) - scheduler_group.add_argument( - "--multi-step-stream-outputs", - **scheduler_kwargs["multi_step_stream_outputs"]) + # multi-step scheduling has been removed; corresponding arguments + # are no longer supported. scheduler_group.add_argument("--scheduling-policy", **scheduler_kwargs["policy"]) scheduler_group.add_argument( @@ -1257,28 +1252,11 @@ class EngineArgs: disable_log_stats=self.disable_log_stats, ) - # Reminder: Please update docs/features/compatibility_matrix.md - # If the feature combo become valid - if self.num_scheduler_steps > 1: - if speculative_config is not None: - raise ValueError("Speculative decoding is not supported with " - "multi-step (--num-scheduler-steps > 1)") - if self.enable_chunked_prefill and self.pipeline_parallel_size > 1: - raise ValueError("Multi-Step Chunked-Prefill is not supported " - "for pipeline-parallel-size > 1") - if current_platform.is_cpu(): - logger.warning("Multi-Step (--num-scheduler-steps > 1) is " - "currently not supported for CPUs and has been " - "disabled.") - self.num_scheduler_steps = 1 - - # make sure num_lookahead_slots is set the higher value depending on - # if we are using speculative decoding or multi-step - num_lookahead_slots = max(self.num_lookahead_slots, - self.num_scheduler_steps - 1) - num_lookahead_slots = num_lookahead_slots \ - if speculative_config is None \ - else speculative_config.num_lookahead_slots + # make sure num_lookahead_slots is set appropriately depending on + # whether speculative decoding is enabled + num_lookahead_slots = self.num_lookahead_slots + if speculative_config is not None: + num_lookahead_slots = speculative_config.num_lookahead_slots scheduler_config = SchedulerConfig( runner_type=model_config.runner_type, @@ -1292,8 +1270,6 @@ class EngineArgs: disable_chunked_mm_input=self.disable_chunked_mm_input, is_multimodal_model=model_config.is_multimodal_model, preemption_mode=self.preemption_mode, - num_scheduler_steps=self.num_scheduler_steps, - multi_step_stream_outputs=self.multi_step_stream_outputs, send_delta_data=(envs.VLLM_USE_RAY_SPMD_WORKER and parallel_config.use_ray), policy=self.scheduling_policy, @@ -1392,11 +1368,6 @@ class EngineArgs: recommend_to_remove=True) return False - if self.num_scheduler_steps != SchedulerConfig.num_scheduler_steps: - _raise_or_fallback(feature_name="--num-scheduler-steps", - recommend_to_remove=True) - return False - if self.scheduler_delay_factor != SchedulerConfig.delay_factor: _raise_or_fallback(feature_name="--scheduler-delay-factor", recommend_to_remove=True) diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index 1f962b008ee03..b6ee4105340a1 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -15,7 +15,7 @@ from vllm.config import (DecodingConfig, LoRAConfig, ModelConfig, from vllm.core.scheduler import SchedulerOutputs from vllm.engine.arg_utils import AsyncEngineArgs from vllm.engine.async_timeout import asyncio_timeout -from vllm.engine.llm_engine import LLMEngine, SchedulerOutputState +from vllm.engine.llm_engine import LLMEngine from vllm.engine.metrics_types import StatLoggerBase from vllm.engine.protocol import EngineClient from vllm.executor.executor_base import ExecutorBase @@ -308,13 +308,6 @@ class _AsyncLLMEngine(LLMEngine): if not allow_async_output_proc and len(ctx.output_queue) > 0: self._process_model_outputs(ctx=ctx) - if (self.scheduler_config.is_multi_step - and scheduler_outputs.num_lookahead_slots > 0): - # cache the scheduler outputs for the next iteration if we have - # lookahead slots - self._cache_scheduler_outputs_for_multi_step( - virtual_engine, seq_group_metadata_list, scheduler_outputs, - allow_async_output_proc) else: finished_requests_ids = list() @@ -351,29 +344,14 @@ class _AsyncLLMEngine(LLMEngine): outputs = await self.model_executor.execute_model_async( execute_model_req) - # we need to do this here so that last step's sampled_token_ids can - # be passed to the next iteration for PP. - if self.scheduler_config.is_multi_step: - self._update_cached_scheduler_output(virtual_engine, outputs) else: if len(ctx.output_queue) > 0: self._process_model_outputs(ctx=ctx) outputs = [] - # Finish the current step for all the sequence groups. - if self.scheduler_config.is_multi_step: - for seq_group in seq_group_metadata_list: - seq_group.finish_step() - if not self._has_remaining_steps(seq_group_metadata_list): - # Clear the cache if we have finished all the steps - if self.scheduler_config.is_multi_step: - self.cached_scheduler_outputs[ - virtual_engine] = SchedulerOutputState() - # is_first_step_output is True only when the num_steps of all - # the sequences are 1. When the num_steps > 1, - # multi_step_model_runner does the first-step output append. + # the sequences are 1. is_first_step_output: bool = False if not seq_group_metadata_list \ else seq_group_metadata_list[0].state.num_steps == 1 diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 3fc4f6445df2a..bbe958351e87c 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -25,7 +25,6 @@ from vllm.engine.metrics_types import StatLoggerBase, Stats from vllm.engine.output_processor.interfaces import ( SequenceGroupOutputProcessor) from vllm.engine.output_processor.stop_checker import StopChecker -from vllm.engine.output_processor.util import create_output_by_sequence_group from vllm.entrypoints.openai.logits_processors import ( get_logits_processors as get_openai_logits_processors) from vllm.executor.executor_base import ExecutorBase @@ -91,7 +90,7 @@ class OutputData(NamedTuple): class SchedulerContext: - def __init__(self, multi_step_stream_outputs: bool = False): + def __init__(self) -> None: self.output_queue: Deque[OutputData] = deque() self.request_outputs: List[Union[RequestOutput, PoolingRequestOutput]] = [] @@ -99,8 +98,6 @@ class SchedulerContext: List[SequenceGroupMetadata]] = None self.scheduler_outputs: Optional[SchedulerOutputs] = None - self.multi_step_stream_outputs: bool = multi_step_stream_outputs - def append_output(self, outputs: List[SamplerOutput], seq_group_metadata_list: List[SequenceGroupMetadata], scheduler_outputs: SchedulerOutputs, is_async: bool, @@ -303,8 +300,7 @@ class LLMEngine: ] self.scheduler_contexts = [ - SchedulerContext(multi_step_stream_outputs=self.scheduler_config. - multi_step_stream_outputs) + SchedulerContext() for _ in range(self.parallel_config.pipeline_parallel_size) ] @@ -683,8 +679,7 @@ class LLMEngine: "Priority scheduling is not enabled.") if isinstance(params, SamplingParams) \ - and params.logits_processors \ - and self.scheduler_config.num_scheduler_steps > 1: + and params.logits_processors: raise ValueError( "Logits processors are not supported in multi-step decoding") @@ -868,45 +863,6 @@ class LLMEngine: return - def _update_num_computed_tokens_for_multi_step_prefill( - self, seq_group: SequenceGroup, - seq_group_meta: SequenceGroupMetadata, - is_first_step_output: Optional[bool]): - """ - This function updates num_computed_tokens for prompt sequences - when Multi-Step is enabled. - - seq_group: SequenceGroup to update the num_computed_tokens for. - seq_group_meta: Metadata of the given SequenceGroup. - is_first_step_output: Optional[bool] - - When available, is_first_step_output indicates if the appended - output token is the output of the first-step in multi-step. - A value of None indicates that outputs from all steps in - in multi-step are submitted in a single burst. - """ - - assert self.scheduler_config.is_multi_step - - if not seq_group_meta.is_prompt: - # num_computed_token updates for multi-step decodes happen after - # the tokens are appended to the sequence. - return - - do_update: bool = False - if self.scheduler_config.chunked_prefill_enabled: - # In multi-step + chunked-prefill case, the prompt sequences - # that are scheduled are fully processed in the first step. - do_update = is_first_step_output is None or is_first_step_output - else: - # Normal multi-step decoding case. In this case prompt-sequences - # are actually single-stepped. Always update in this case. - assert seq_group.state.num_steps == 1 - do_update = True - - if do_update: - seq_group.update_num_computed_tokens( - seq_group_meta.token_chunk_size) - def _process_model_outputs(self, ctx: SchedulerContext, request_id: Optional[str] = None) -> None: @@ -939,33 +895,8 @@ class LLMEngine: has_multiple_outputs: bool = len(outputs) > 1 outputs_by_sequence_group: List[List[SequenceGroupOutput]] - if has_multiple_outputs: - assert self.scheduler_config.is_multi_step or \ - self.speculative_config - # Organize outputs by [step][sequence group] instead of - # [sequence group][step]. - if self.scheduler_config.is_multi_step: - outputs_by_sequence_group = create_output_by_sequence_group( - outputs, len(seq_group_metadata_list)) - elif self.speculative_config: - # Decodes are multi-steps while prefills are not, outputting at - # most 1 token. Separate them so that we can trigger chunk - # processing without having to pad or copy over prompts K times - # to match decodes structure (costly with prompt_logprobs). - num_prefills = sum(sg.is_prompt - for sg in seq_group_metadata_list) - prefills, decodes = outputs[:num_prefills], outputs[ - num_prefills:] - outputs_by_sequence_group = create_output_by_sequence_group( - decodes, - num_seq_groups=len(seq_group_metadata_list) - num_prefills) - outputs_by_sequence_group = [p.outputs for p in prefills - ] + outputs_by_sequence_group - # We have outputs for multiple steps submitted in a single burst, - # so invalidate is_first_step_output. - is_first_step_output = None - else: - outputs_by_sequence_group = outputs + assert not has_multiple_outputs + outputs_by_sequence_group = outputs # Determine the requests we need to operate on if request_id: @@ -1006,13 +937,8 @@ class LLMEngine: output = [outputs_by_sequence_group[0][i]] if not is_async: - if self.scheduler_config.is_multi_step: - # Updates happen only if the sequence is prefill - self._update_num_computed_tokens_for_multi_step_prefill( - seq_group, seq_group_meta, is_first_step_output) - else: - seq_group.update_num_computed_tokens( - seq_group_meta.token_chunk_size or 0) + seq_group.update_num_computed_tokens( + seq_group_meta.token_chunk_size or 0) if outputs: for o in outputs: @@ -1074,15 +1000,6 @@ class LLMEngine: for scheduler in self.scheduler: scheduler.free_finished_seq_groups() - # For multi-step without streaming, don't create outputs each iteration - if not is_last_step and not ctx.multi_step_stream_outputs: - # Immediately process request outputs here (if callback is given) - if (finished_now - and self.process_request_outputs_callback is not None): - self.process_request_outputs_callback(ctx.request_outputs) - ctx.request_outputs.clear() - return - # Create the outputs for i in indices: if i in skip or i in finished_before or i in finished_now: @@ -1101,13 +1018,7 @@ class LLMEngine: if request_output: ctx.request_outputs.append(request_output) - # For multi-step with streaming, create outputs each iteration - if not is_last_step and ctx.multi_step_stream_outputs: - # Immediately process request outputs here (if callback is given) - if self.process_request_outputs_callback is not None: - self.process_request_outputs_callback(ctx.request_outputs) - ctx.request_outputs.clear() - return + # Create outputs only after processing the scheduler's results for seq_group in scheduler_outputs.ignored_seq_groups: params = seq_group.sampling_params @@ -1157,16 +1068,10 @@ class LLMEngine: if seq_group.is_finished(): continue - if self.scheduler_config.is_multi_step: - # Updates happen only if the sequence is prefill - self._update_num_computed_tokens_for_multi_step_prefill( - seq_group, seq_group_metadata, - seq_group.state.num_steps == 1) - else: - token_chunk_size = (seq_group_metadata.token_chunk_size - if seq_group_metadata.token_chunk_size - is not None else 0) - seq_group.update_num_computed_tokens(token_chunk_size) + token_chunk_size = (seq_group_metadata.token_chunk_size + if seq_group_metadata.token_chunk_size + is not None else 0) + seq_group.update_num_computed_tokens(token_chunk_size) if seq_group_metadata.do_sample: assert len(sequence_group_outputs.samples) == 1, ( @@ -1177,16 +1082,8 @@ class LLMEngine: assert len(seq_group.seqs) == 1 seq = seq_group.seqs[0] - if self.scheduler_config.is_multi_step: - is_prefill_append = seq.data.get_num_uncomputed_tokens( - ) == 0 - seq.append_token_id(sample.output_token, sample.logprobs, - sample.output_embed) - if not is_prefill_append: - seq_group.update_num_computed_tokens(1) - else: - seq.append_token_id(sample.output_token, sample.logprobs, - sample.output_embed) + seq.append_token_id(sample.output_token, sample.logprobs, + sample.output_embed) def step(self) -> List[Union[RequestOutput, PoolingRequestOutput]]: """Performs one decoding iteration and returns newly generated results. @@ -1289,13 +1186,6 @@ class LLMEngine: if not allow_async_output_proc and len(ctx.output_queue) > 0: self._process_model_outputs(ctx=ctx) - if (self.scheduler_config.is_multi_step - and scheduler_outputs.num_lookahead_slots > 0): - # cache the scheduler outputs for the next iteration if we have - # lookahead slots - self._cache_scheduler_outputs_for_multi_step( - virtual_engine, seq_group_metadata_list, scheduler_outputs, - allow_async_output_proc) else: finished_requests_ids = list() @@ -1345,10 +1235,6 @@ class LLMEngine: # Raise so the caller is notified that this request failed raise - # We need to do this here so that last step's sampled_token_ids can - # be passed to the next iteration for PP. - if self.scheduler_config.is_multi_step: - self._update_cached_scheduler_output(virtual_engine, outputs) else: # Nothing scheduled => If there is pending async postprocessor, # then finish it here. @@ -1357,19 +1243,9 @@ class LLMEngine: # No outputs in this case outputs = [] - # Finish the current step for all the sequence groups. - if self.scheduler_config.is_multi_step: - for seq_group in seq_group_metadata_list: - seq_group.finish_step() - if not self._has_remaining_steps(seq_group_metadata_list): - # clear the cache if we have finished all the steps. - if self.scheduler_config.is_multi_step: - self.cached_scheduler_outputs[0] = SchedulerOutputState() - # is_first_step_output is True only when the num_steps of all - # the sequences are 1. When the num_steps > 1, - # multi_step_model_runner does the first-step output append. + # the sequences are 1. is_first_step_output: bool = False if not seq_group_metadata_list \ else seq_group_metadata_list[0].state.num_steps == 1 @@ -1453,22 +1329,7 @@ class LLMEngine: def _has_remaining_steps( self, seq_group_metadata_list: Optional[List[SequenceGroupMetadata]] ) -> bool: - if (not self.scheduler_config.is_multi_step - or not seq_group_metadata_list): - return False - - # TODO(will) this is a sanity check for nowto make sure that all the - # seqs are on the same steps. Eventually we will want to do some sort of - # dynamic scheduling when doing multi-step decoding. - ref_remaining_steps = seq_group_metadata_list[0].state.remaining_steps - if any([ - seq_group.state.remaining_steps != ref_remaining_steps - for seq_group in seq_group_metadata_list[1:] - ]): - raise AssertionError("All running sequence groups should " - "have the same remaining steps.") - - return ref_remaining_steps > 0 + return False def _cache_scheduler_outputs_for_multi_step( self, virtual_engine: int, @@ -1497,13 +1358,6 @@ class LLMEngine: def _get_last_sampled_token_ids( self, virtual_engine: int) -> Optional[torch.Tensor]: - cached_last_output = self.cached_scheduler_outputs[ - virtual_engine].last_output - if (self.scheduler_config.is_multi_step - and self.parallel_config.pipeline_parallel_size > 1 - and cached_last_output is not None - and cached_last_output.sampled_token_ids_cpu is not None): - return cached_last_output.sampled_token_ids_cpu return None def add_logger(self, logger_name: str, logger: StatLoggerBase) -> None: diff --git a/vllm/engine/output_processor/interfaces.py b/vllm/engine/output_processor/interfaces.py index 19c5963d32dbb..4d75719c1719b 100644 --- a/vllm/engine/output_processor/interfaces.py +++ b/vllm/engine/output_processor/interfaces.py @@ -36,27 +36,13 @@ class SequenceGroupOutputProcessor(ABC): ): """Create an output processor. - This returns a single-step output processor if num_lookahead_slots is - zero, else returns a multi-step output processor. + Multi-step scheduling is no longer supported. Always return a + single-step output processor. """ - if scheduler_config.num_lookahead_slots == 0: - # Importing here to avoid cycle. - from vllm.engine.output_processor.single_step import ( - SingleStepOutputProcessor) - return SingleStepOutputProcessor(scheduler_config, detokenizer, - scheduler, seq_counter, - stop_checker) - else: - # Importing here to avoid cycle. - from vllm.engine.output_processor.multi_step import ( - MultiStepOutputProcessor) - return MultiStepOutputProcessor( - detokenizer, - scheduler, - seq_counter, - get_tokenizer_for_seq, - stop_checker, - ) + from vllm.engine.output_processor.single_step import ( + SingleStepOutputProcessor) + return SingleStepOutputProcessor(scheduler_config, detokenizer, + scheduler, seq_counter, stop_checker) @abstractmethod def process_outputs(self, sequence_group: SequenceGroup, diff --git a/vllm/engine/output_processor/multi_step.py b/vllm/engine/output_processor/multi_step.py deleted file mode 100644 index 8b66ef0dc7658..0000000000000 --- a/vllm/engine/output_processor/multi_step.py +++ /dev/null @@ -1,211 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project - -import functools -from typing import Callable, List, cast - -from vllm.core.scheduler import Scheduler -from vllm.engine.output_processor.interfaces import ( - SequenceGroupOutputProcessor) -from vllm.engine.output_processor.single_step import ( - single_step_process_prompt_logprob) -from vllm.engine.output_processor.stop_checker import StopChecker -from vllm.logger import init_logger -from vllm.sampling_params import SamplingParams -from vllm.sequence import (VLLM_INVALID_TOKEN_ID, - CompletionSequenceGroupOutput, Sequence, - SequenceGroup, SequenceGroupOutput, SequenceOutput, - SequenceStatus) -from vllm.transformers_utils.detokenizer import Detokenizer -from vllm.transformers_utils.tokenizer import AnyTokenizer -from vllm.utils import Counter - -logger = init_logger(__name__) - - -class MultiStepOutputProcessor(SequenceGroupOutputProcessor): - """SequenceGroupOutputProcessor which handles logic related to - detokenization and stopping conditions. It specializes to "multi-step - decoding", where vLLM's worker may generate multiple tokens per invocation. - This is currently mutually exclusive with advanced sampling techniques like - beam search, which motivates the separation of this logic from the single - step output processor. - - This class is responsible for things such as correctly appending all new - token ids to their sequence, detokenizing new token ids, truncating new - output tokens after an eos token, and correctly handling the case where the - number of new output tokens per sequence differs in a single batch. - """ - - def __init__( - self, - detokenizer: Detokenizer, - scheduler: List[Scheduler], - seq_counter: Counter, - get_tokenizer_for_seq: Callable[[Sequence], AnyTokenizer], - stop_checker: StopChecker, - ): - self.detokenizer = detokenizer - self.scheduler = scheduler - self.seq_counter = seq_counter - self.get_tokenizer_for_seq = get_tokenizer_for_seq - self.stop_checker = stop_checker - - def process_prompt_logprob(self, seq_group: SequenceGroup, - outputs: List[SequenceGroupOutput]) -> None: - """Process prompt logprobs associated with each step of a multi-step- - scheduled computation. - - Args: - seq_group: the outputs are associated with this - [`SequenceGroup`][vllm.sequence.SequenceGroup] - outputs: the - [`SequenceGroupOutput`][vllm.sequence.SequenceGroupOutput]s - for all scheduler steps - """ - for output in outputs: - # Concatenate single-step prompt logprob processing results. - assert isinstance(output, CompletionSequenceGroupOutput) - single_step_process_prompt_logprob(self, seq_group, output) - - @staticmethod - @functools.lru_cache - def _log_prompt_logprob_unsupported_warning_once(): - # Reminder: Please update docs/features/compatibility_matrix.md - # If the feature combo become valid - logger.warning( - "Prompt logprob is not supported by multi step workers. " - "(e.g., speculative decode uses multi step workers).") - - def process_outputs(self, - sequence_group: SequenceGroup, - outputs: List[SequenceGroupOutput], - is_async: bool = False) -> None: - """Append new tokens in the outputs to sequences in the sequence group. - - This only supports sequence groups of size 1. It supports greater than - one new token per sequence. - - This applies logic like stop condition checking and detokenization. - It also handles cases where there are tokens emitted after - the EOS token. - - is_async - Indicates whether this postprocessor runs in - parallel with the GPU forward pass and is processing - tokens from the previous step. If this is true, then - no tokens need to be appended since it is already done - externally (before the next schedule() call) - """ - # Sequences can be in RUNNING or FINISHED_ABORTED state - # once scheduled, as a sequence is moved to FINISHED_ABORTED - # if a client disconnects from the api server. - seqs = sequence_group.get_seqs(status=SequenceStatus.RUNNING) - if seqs is None: - seqs = sequence_group.get_seqs( - status=SequenceStatus.FINISHED_ABORTED) - - assert seqs, "Expected RUNNING or FINISHED_ABORTED sequences" - assert len(seqs) == 1, ( - "Beam search not supported in multi-step decoding.") - seq = seqs[0] - seq_id = seq.seq_id - # This method is defined in the more generic - # SequenceGroupOutputProcessor, but here we assume that the outputs are - # of a more specific type. - assert all([ - isinstance(output, CompletionSequenceGroupOutput) - for output in outputs - ]) - compl_outputs = cast(List[CompletionSequenceGroupOutput], outputs) - assert all([ - seq_id == output.samples[0].parent_seq_id - for output in compl_outputs - ]) - - if is_async: - # Async case: We process tokens one by one. Here, we know the token - # was already appended, so we only need to do the rest of the - # postprocessor: Detokenization + stopping logic - self._process_decode_and_stop(seq, sequence_group.sampling_params) - else: - # Standard multi-step case - - # Since there's only one sequence per sequence group, - # we can take the first sample. - samples = [output.samples[0] for output in compl_outputs] - - # entries in sample tokens may be invalid (eg. due to spec decode - # rejecting tokens). - valid_samples = [ - sample for sample in samples - if sample.output_token != VLLM_INVALID_TOKEN_ID - ] - - # When both spec-decode and pre-fill chunking are enabled, we - # don't have guaranteed samples here (e.g. all -1s). - if valid_samples: - self._process_seq_outputs(seq, valid_samples, - sequence_group.sampling_params) - - def _process_decode_and_stop(self, seq: Sequence, - sampling_params: SamplingParams) -> None: - new_char_count = 0 - if sampling_params.detokenize and self.detokenizer: - new_char_count = self.detokenizer.decode_sequence_inplace( - seq, sampling_params) - - # TODO(sang): Support lora. - self.stop_checker.maybe_stop_sequence( - seq, - new_char_count=new_char_count, - sampling_params=sampling_params, - ) - - def _process_seq_outputs(self, seq: Sequence, - valid_samples: List[SequenceOutput], - sampling_params: SamplingParams) -> None: - output_token_ids = [sample.output_token for sample in valid_samples] - output_logprobs = [sample.logprobs for sample in valid_samples] - output_embeds = [sample.output_embed for sample in valid_samples] - - # Truncate to max_tokens if necessary. - remaining_tokens = sampling_params.max_tokens - (seq.get_output_len() + - len(output_token_ids)) - if remaining_tokens < 0: - output_token_ids = output_token_ids[:remaining_tokens] - - # Truncate any tokens after EOS. This is required as spec decode - # generates a fixed number of tokens without evaluating stopping - # conditions within the block. This can cause an eos token to be - # unintentionally ignored. - if not sampling_params.ignore_eos and self.detokenizer: - eos_token_id = self.get_tokenizer_for_seq(seq).eos_token_id - # Avoiding .index calls as exception throwing in the happy path - # is expensive. - for i in range(len(output_token_ids)): - if output_token_ids[i] == eos_token_id: - output_token_ids = output_token_ids[:i + 1] - break - - is_prefill_sampled_token = seq.data.get_num_uncomputed_tokens() == 0 - # Incrementally append tokens to the sequence, as if we had only one new - # token. - for output_token_id, output_logprob, output_embed in zip( - output_token_ids, output_logprobs, output_embeds): - seq.append_token_id( - token_id=output_token_id, - logprobs=output_logprob, - token_embed=output_embed, - ) - - if is_prefill_sampled_token: - is_prefill_sampled_token = False - else: - # Update num_computed_tokens iff the sampled token is not from - # a prefill step. - seq.data.update_num_computed_tokens(1) - - self._process_decode_and_stop(seq, sampling_params) - - if seq.is_finished(): - break diff --git a/vllm/platforms/cuda.py b/vllm/platforms/cuda.py index c876c52a2e9c9..70959131573f9 100644 --- a/vllm/platforms/cuda.py +++ b/vllm/platforms/cuda.py @@ -118,20 +118,10 @@ class CudaPlatformBase(Platform): @classmethod def check_and_update_config(cls, vllm_config: "VllmConfig") -> None: parallel_config = vllm_config.parallel_config - scheduler_config = vllm_config.scheduler_config model_config = vllm_config.model_config if parallel_config.worker_cls == "auto": - if scheduler_config.is_multi_step: - if envs.VLLM_USE_V1: - raise NotImplementedError( - "Multi-step scheduling is not supported (and not " - "needed) on vLLM V1. Please launch without " - "--num-scheduler-steps.") - else: - parallel_config.worker_cls = \ - "vllm.worker.multi_step_worker.MultiStepWorker" - elif vllm_config.speculative_config: + if vllm_config.speculative_config: if not envs.VLLM_USE_V1: raise NotImplementedError( "Speculative decoding is not supported on vLLM V0.") @@ -139,7 +129,7 @@ class CudaPlatformBase(Platform): else: if envs.VLLM_USE_V1: parallel_config.worker_cls = \ - "vllm.v1.worker.gpu_worker.Worker" + "vllm.v1.worker.gpu_worker.Worker" else: parallel_config.worker_cls = "vllm.worker.worker.Worker" diff --git a/vllm/platforms/rocm.py b/vllm/platforms/rocm.py index 8005830f55cef..2d5bee5fc5053 100644 --- a/vllm/platforms/rocm.py +++ b/vllm/platforms/rocm.py @@ -327,18 +327,8 @@ class RocmPlatform(Platform): cache_config.block_size = 16 parallel_config = vllm_config.parallel_config - scheduler_config = vllm_config.scheduler_config if parallel_config.worker_cls == "auto": - if scheduler_config.is_multi_step: - if envs.VLLM_USE_V1: - raise NotImplementedError( - "Multi-step scheduling is not supported (and not " - "needed) on vLLM V1. Please launch without " - "--num-scheduler-steps.") - else: - parallel_config.worker_cls = \ - "vllm.worker.multi_step_worker.MultiStepWorker" - elif vllm_config.speculative_config: + if vllm_config.speculative_config: if not envs.VLLM_USE_V1: raise NotImplementedError( "Speculative decoding is not supported on vLLM V0.") @@ -346,7 +336,7 @@ class RocmPlatform(Platform): else: if envs.VLLM_USE_V1: parallel_config.worker_cls = \ - "vllm.v1.worker.gpu_worker.Worker" + "vllm.v1.worker.gpu_worker.Worker" else: parallel_config.worker_cls = "vllm.worker.worker.Worker" diff --git a/vllm/platforms/tpu.py b/vllm/platforms/tpu.py index c56096d93612d..c7522a89c2578 100644 --- a/vllm/platforms/tpu.py +++ b/vllm/platforms/tpu.py @@ -133,18 +133,13 @@ class TpuPlatform(Platform): parallel_config = vllm_config.parallel_config scheduler_config = vllm_config.scheduler_config if parallel_config.worker_cls == "auto": - if scheduler_config.is_multi_step: - raise NotImplementedError( - "Multi-step scheduling is not supported (and not " - "needed) on vLLM V1. Please launch without " - "--num-scheduler-steps.") parallel_config.worker_cls = "vllm.v1.worker.tpu_worker.TPUWorker" assert not vllm_config.speculative_config, ( "Speculative decoding is not yet supported for TPU backend") if scheduler_config.is_multimodal_model and not \ - scheduler_config.disable_chunked_mm_input: + scheduler_config.disable_chunked_mm_input: logger.warning("TPU does not support running Multimodal models"\ " without setting `--disable_chunked_mm_input`. " \ "Forcing --disable_chunked_mm_input.") diff --git a/vllm/sequence.py b/vllm/sequence.py index 6e65a2bd03189..cbe63f8d1d4e4 100644 --- a/vllm/sequence.py +++ b/vllm/sequence.py @@ -794,35 +794,6 @@ class SequenceGroup: def lora_int_id(self) -> int: return self.lora_request.lora_int_id if self.lora_request else 0 - def init_multi_step(self, num_steps: int) -> None: - self.state.num_steps = num_steps - self.state.current_step = 0 - - def init_multi_step_from_lookahead_slots(self, num_lookahead_slots: int, - num_scheduler_steps: int, - is_multi_step: bool, - enable_chunking: bool) -> None: - - if not is_multi_step: - self.init_multi_step(num_steps=num_scheduler_steps) - return - - # Multi-Step case - is_prefill = self.is_prefill() - - # The asserts below reflect the expectations of the current system. - if is_prefill and enable_chunking: - assert num_lookahead_slots == num_scheduler_steps - self.init_multi_step(num_steps=num_lookahead_slots) - else: - is_decode: bool = not is_prefill - # If it is a prefill, num_lookahead_slots must be 0 - assert num_lookahead_slots == 0 or is_decode - # If it is a decode, num_lookahead_slots + 1 must match - # the scheduler steps. - assert num_lookahead_slots + 1 == num_scheduler_steps or is_prefill - self.init_multi_step(num_steps=num_lookahead_slots + 1) - def set_last_token_time(self, now: float) -> None: """Sets the last token time for Request level timings.""" # If still in prefill phase, assertion fails. @@ -1367,15 +1338,6 @@ class ExecuteModelRequest( # Async callback async_callback: Optional[Callable] = None - @property - def is_first_multi_step(self) -> bool: - # TODO(will) make this be able to handle batches with variable number of - # steps - assert len(self.seq_group_metadata_list) > 0 - first_seq_group = self.seq_group_metadata_list[0] - assert first_seq_group.state is not None - return first_seq_group.state.current_step == 0 - @property def is_last_step(self) -> bool: # TODO(will) make this be able to handle batches with variable number of diff --git a/vllm/worker/model_runner.py b/vllm/worker/model_runner.py index 20b9b733cd3b9..a63797e3a46a2 100644 --- a/vllm/worker/model_runner.py +++ b/vllm/worker/model_runner.py @@ -508,8 +508,7 @@ class ModelInputForGPUBuilder(ModelRunnerInputBuilderBase[ModelInputForGPU]): if inter_data.is_prompt: context_len = seq_data.get_num_computed_tokens() seq_len = min(seq_len, context_len + token_chunk_size) - elif self.runner.scheduler_config.is_multi_step or \ - self.runner.model_config.is_encoder_decoder: + elif self.runner.model_config.is_encoder_decoder: context_len = seq_len - 1 else: context_len = seq_data.get_num_computed_tokens() @@ -778,9 +777,7 @@ class ModelInputForGPUBuilder(ModelRunnerInputBuilderBase[ModelInputForGPU]): int: Returns the determined number of padding sequences. If CUDA graphs is not viable, returns -1. """ - is_mscp: bool = self.runner.scheduler_config.is_multi_step and \ - self.runner.scheduler_config.chunked_prefill_enabled - decode_only = self.decode_only or is_mscp + decode_only = self.decode_only if not decode_only: # Early exit so we can treat num_seqs as the batch_size below. return -1 diff --git a/vllm/worker/multi_step_model_runner.py b/vllm/worker/multi_step_model_runner.py deleted file mode 100644 index 2aa910bdff6ba..0000000000000 --- a/vllm/worker/multi_step_model_runner.py +++ /dev/null @@ -1,908 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project - -import dataclasses -import functools -from dataclasses import dataclass, field -from typing import (TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, - Union) - -import torch - -from vllm.distributed import get_pp_group -from vllm.logger import init_logger -from vllm.model_executor.layers.sampler import (PromptLogprobs, SampleLogprobs, - SamplerOutput, - SamplingMetadata, get_logprobs, - get_pythonized_sample_results) -from vllm.platforms import current_platform -from vllm.sequence import (CompletionSequenceGroupOutput, IntermediateTensors, - Logprob, SequenceGroupMetadata, SequenceOutput) -from vllm.utils import PyObjectCache, async_tensor_h2d, current_stream -from vllm.worker.model_runner import (GPUModelRunnerBase, - ModelInputForGPUWithSamplingMetadata) -from vllm.worker.model_runner_base import ( - BroadcastableModelInput, _init_attn_metadata_from_tensor_dict, - _init_frozen_model_input_from_tensor_dict, - _init_sampling_metadata_from_tensor_dict) - -from ..model_executor.model_loader.tensorizer import TensorizerConfig - -if TYPE_CHECKING: - from vllm.attention.backends.abstract import AttentionBackend - -logger = init_logger(__name__) - -MULTI_STEP_ATTENTION_BACKENDS = [ - "FLASH_ATTN", "ROCM_FLASH", "FLASHINFER", "NO_ATTENTION" -] -MULTI_STEP_CHUNKED_PREFILL_ATTENTION_BACKENDS = ["FLASH_ATTN", "FLASHINFER"] - -def _get_supported_attention_backends(chunked_prefill_enabled: bool) \ - -> List[str]: - if chunked_prefill_enabled: - return MULTI_STEP_CHUNKED_PREFILL_ATTENTION_BACKENDS - else: - return MULTI_STEP_ATTENTION_BACKENDS - - -def seq_output_builder(): - return SequenceOutput( - 0, 0, - {0: Logprob(logprob=float('inf'), rank=None, decoded_token=None)}) - - -def completion_seq_group_output_builder(): - return CompletionSequenceGroupOutput([], None) - - -# Used by pythonization to reduce python object allocations -class PythonizationCache: - - def __init__(self): - self.cached_seq_output = PyObjectCache(seq_output_builder) - self.cached_completion_seq_group_output = PyObjectCache( - completion_seq_group_output_builder) - - def reset(self): - self.cached_seq_output.reset() - self.cached_completion_seq_group_output.reset() - - -@dataclass -class ModelOutput: - """The output of a single model forward pass. - - The sampler_output_ready_event is set when the tensors in - sampler_output are ready (the model+sampler forward pass has - completed). We use the event to synchronize the GPU->CPU transfer, - which we want to only run when the data has been written to the - GPU tensors. Until the event is ready, the tensors in sampler_output - will have garbage data. - - There are two scenarios: - 1. The output tensors are ready and we can pythonize them immediately. - 2. The output tensors are not ready and we need to wait for the event to be - ready. - """ - sampler_output: SamplerOutput - sampler_output_ready_event: torch.cuda.Event - sampled_token_ids: Optional[torch.Tensor] = None - pythonized: bool = False - # On-device tensor containing the logprobs of each token. - logprobs: Optional["torch.Tensor"] = None - pythonization_cache: Optional[PythonizationCache] = None - - def pythonize(self, input_metadata: "StatefulModelInput", - copy_stream: torch.cuda.Stream, - pinned_sampled_token_buffer: torch.Tensor) -> None: - """Pythonize the output. Blocking.""" - if not self.pythonized: - self._pythonize_sampler_output(input_metadata, copy_stream, - pinned_sampled_token_buffer, True) - self.pythonized = True - - def maybe_pythonize(self, input_metadata: "StatefulModelInput", - copy_stream: torch.cuda.Stream, - pinned_sampled_token_buffer: torch.Tensor) -> None: - """Pythonize the output if ready, else return None. Non-blocking.""" - if not self.pythonized: - self.pythonized = self._pythonize_sampler_output( - input_metadata, copy_stream, pinned_sampled_token_buffer, - False) - - def _pythonize_sampler_output(self, input_metadata: "StatefulModelInput", - copy_stream: torch.cuda.Stream, - pinned_sampled_token_buffer: torch.Tensor, - blocking: bool) -> bool: - """ - If blocking is set, will block until the forward pass for the output is - ready and pythonize the output. Upon completing Pythonization, erases - self.logprobs (note that a non-blocking call that is performed when - the sampler output is not yet ready, will not erase self.logprobs.) - """ - assert self.sampled_token_ids is not None - if not blocking and not self.sampler_output_ready_event.query(): - return False - - if blocking: - self.sampler_output_ready_event.synchronize() - with torch.cuda.stream(copy_stream): - _pythonize_sampler_output(input_metadata, self.sampler_output, - pinned_sampled_token_buffer, - self.sampled_token_ids, self.logprobs, - self.pythonization_cache) - - # Erase the logprobs GPU-side tensor. - # Note that although _pythonize_sampler_output() runs in its - # own CUDA stream, nonetheless _pythonize_sampler_output() - # cannot return until Pythonization is complete; therefore - # we know that by the time the CPU reaches this point, - # `self.logprobs` is no longer needed. - self.logprobs = None - return True - - -@dataclass(frozen=False) -class StatefulModelInput(BroadcastableModelInput): - # actual frozen model input dataclass passed to _base_model_runner - frozen_model_input: Optional[ModelInputForGPUWithSamplingMetadata] = None - - # list of model outputs for each step, may not be all pythonized - cached_outputs: List[ModelOutput] = field(default_factory=list) - - # used to pass sampled token ids from the last step to the current step for - # TP workers. Used to append to end of outputs and used by advance_step - last_sampled_token_ids: Optional[torch.Tensor] = None - current_step: int = 0 - is_multi_step: bool = True - is_last_step: bool = False - is_first_multi_step: bool = False - base_output_proc_callback: Optional[Callable] = None - # ping-pong data structures for multi-step to wait on the previous step - step_cuda_events: List[current_platform.Event] = field( - default_factory=lambda: [current_platform.Event(blocking=True)] * 2) - num_seqs: int = -1 - num_queries: int = -1 - num_single_step_prefills: int = 0 - - def as_broadcastable_tensor_dict(self) -> Dict[str, Any]: - assert self.frozen_model_input is not None - tensor_dict = self.frozen_model_input.as_broadcastable_tensor_dict() - new_tensor_dict = { - 'last_sampled_token_ids': self.last_sampled_token_ids, - 'current_step': self.current_step, - 'is_multi_step': self.is_multi_step, - 'is_last_step': self.is_last_step, - 'is_first_multi_step': self.is_first_multi_step, - 'num_seqs': self.num_seqs, - 'num_queries': self.num_queries, - 'num_single_step_prefills': self.num_single_step_prefills, - } - tensor_dict.update(new_tensor_dict) - return tensor_dict - - @classmethod - def from_broadcasted_tensor_dict( - cls, - tensor_dict: Dict[str, Any], - attn_backend: Optional["AttentionBackend"] = None, - ) -> "StatefulModelInput": - tensor_dict = _init_sampling_metadata_from_tensor_dict(tensor_dict) - if attn_backend is not None: - tensor_dict = _init_attn_metadata_from_tensor_dict( - attn_backend, tensor_dict) - tensor_dict = _init_frozen_model_input_from_tensor_dict( - ModelInputForGPUWithSamplingMetadata, tensor_dict) - - return cls(**tensor_dict) - - def record_step_event(self, current_stream: torch.cuda.Stream): - # record the event for the current step so that the next step can sync - # on it. We modulo by 2 to keep the events in a circular buffer and - # support any attn backends that may be supported in the future. ie - # Flashinfer would want two DecodeWrappers to overlap the CPU and GPU. - self.step_cuda_events[self.current_step & 1] = \ - torch.cuda.Event(blocking=True) - self.step_cuda_events[self.current_step & 1].record(current_stream) - - def wait_previous_step(self): - # These cuda events are an explicit synchronization to ensure that - # advance_step() (for other attn backends that may be supported in the - # future) do not clobber any data structures that is also used by any - # enqueued forwards steps. For distributed case, only a single event is - # needed, but for single GPU case, since we can let the CPU run much - # further ahead, two events allow us to overlap the advance_step with - # the previous forward (ie using two DecodeWrappers for flashinfer - # backend) - self.step_cuda_events[(self.current_step + 1) & 1].wait() - - def add_sampler_output(self, - sampler_output: SamplerOutput, - sampled_token_ids: Optional[torch.Tensor] = None): - self.cached_outputs.append( - ModelOutput(sampler_output=sampler_output, - sampler_output_ready_event=None, - sampled_token_ids=sampled_token_ids, - pythonized=False)) - - def maybe_advance_sampling_metadata(self, device: str, pin_memory: bool): - """ - sampling_metadata.selected_token_indices is constructed for the - first-step in Multi-Step. However, when chunked-prefill is enabled with - multi-step, the scheduled prompts are fully processed in the - first-step and are processed as decodes in the rest of the steps. - This function updates the sampling_metadata.selected_token_indices - to account for this conversion. - - Example: - Let 2 prompts and 2 decodes be scheduled together. Let the - num-tokens to process for the 2 prompts be 5 and 8 respectively. - - In that case, sampling_metadata.sampled_token_indices will be, - [4, 12, 13, 14] as it is constructed for the first-step in - multi-step. - However, the prompts turns to decodes after the first-step - and the num-tokens for the previously-prompt sequences will - be 1 and 1 as they are decodes now. The self.sampled_token_indices - must be updated to [0,1,2,3]. - """ - assert self.current_step == 1 and self.num_single_step_prefills > 0 - if not get_pp_group().is_last_rank: - return - - assert self.frozen_model_input is not None - assert self.frozen_model_input.sampling_metadata is not None - self.frozen_model_input.sampling_metadata.selected_token_indices = \ - async_tensor_h2d(list(range(self.num_queries)), - dtype=torch.long, - target_device=device, - pin_memory=pin_memory) - - def maybe_advance_frozen_model_input(self, device: str, pin_memory: bool): - """ - Advancing the datastructures of StatefulModelInput::frozen_model_input - is only required when prefills are scheduled with decodes to run in - multi-step. This advancement/correction is required to account for - the conversion of Prefills to Decodes after the first multi-step. - """ - if self.current_step != 1 or self.num_single_step_prefills == 0: - return - - assert self.frozen_model_input is not None - fmi = self.frozen_model_input - - # Truncate input_tokens - assert fmi.input_tokens is not None - assert fmi.input_tokens.shape[0] >= self.num_seqs - fmi_new_input_tokens: torch.Tensor = fmi.input_tokens[:self.num_seqs] - - # Update frozen_model_input::input_positions. - assert fmi.input_positions is not None - assert fmi.input_positions.shape[0] >= self.num_seqs - fmi_new_input_positions: torch.Tensor = fmi.input_positions[:self. - num_seqs] - - # Assert unsupported - assert fmi.lora_mapping is None - assert fmi.lora_requests is not None - assert len(fmi.lora_requests) == 0 - assert fmi.attn_metadata is not None - assert fmi.multi_modal_kwargs is not None - assert len(fmi.multi_modal_kwargs) == 0 - - self.frozen_model_input = dataclasses.replace( - self.frozen_model_input, - input_tokens=fmi_new_input_tokens, - input_positions=fmi_new_input_positions) - - self.maybe_advance_sampling_metadata(device, pin_memory) - - -# MutableModelInputForGPUWithMultiStepMetadata is not subclass of -# ModelInputForGPU but it wraps the actual input dataclass and adds multi-step -# metadata -# mypy: disable-error-code=type-var -class MultiStepModelRunner(GPUModelRunnerBase[StatefulModelInput]): - # mypy: enable-error-code=type-var - - def __init__(self, base_model_runner: GPUModelRunnerBase, *args, **kwargs): - - super().__init__(*args, **kwargs) - - # Check attention backend support. - supported_attention_backends: List[str] = \ - _get_supported_attention_backends( - self.scheduler_config.chunked_prefill_enabled) - if self.attn_backend.get_name() not in supported_attention_backends: - ms_config_str: str = "Multi-Step + Chunked-Prefill" \ - if self.scheduler_config.chunked_prefill_enabled \ - else "Multi-Step" - raise ValueError( - f"{ms_config_str} not supported for attention backend: " - f"{self.attn_backend.get_name()}. Set VLLM_ATTENTION_BACKEND " - f"to a value from {supported_attention_backends}.") - - # uses the base model runner to execute the model and wraps it with - # multi-step logic - self._base_model_runner: GPUModelRunnerBase = base_model_runner - - self.is_multi_step = self.scheduler_config.is_multi_step - self.pinned_sampled_token_ids: Optional[torch.Tensor] = None - - # Using the PythonizationCache in Pipeline-Parallel clobbers the - # SequenceOutput and CompletionSequenceGroupOutput object. - # When cache-reset happens at the last step of a multi-step - # execution, there may be other on-going single-step/multi-step - # executions. The current caching implementation does not check - # for this. - self.pythonization_cache = PythonizationCache() \ - if self.parallel_config.pipeline_parallel_size == 1 else None - - @functools.cached_property - def _copy_stream(self): - # used to copy tensors from GPU to CPU asynchronously - return torch.cuda.Stream() - - def make_model_input_from_broadcasted_tensor_dict( - self, tensor_dict: Dict[str, Any]) -> StatefulModelInput: - model_input = (StatefulModelInput.from_broadcasted_tensor_dict( - tensor_dict, - attn_backend=self.attn_backend, - )) - return model_input - - def prepare_model_input( - self, - seq_group_metadata_list: List[SequenceGroupMetadata], - virtual_engine: int = 0, - finished_requests_ids: Optional[List[str]] = None - ) -> StatefulModelInput: - frozen_model_input: ModelInputForGPUWithSamplingMetadata = \ - self._base_model_runner.prepare_model_input( - seq_group_metadata_list, - virtual_engine, - finished_requests_ids) - - assert frozen_model_input.query_lens is not None - assert frozen_model_input.seq_lens is not None - assert frozen_model_input.attn_metadata is not None - num_queries = len(frozen_model_input.query_lens) - num_seqs = len(frozen_model_input.seq_lens) - num_single_step_prefills = frozen_model_input.attn_metadata.num_prefills - - model_input = StatefulModelInput( - frozen_model_input=frozen_model_input, - num_seqs=num_seqs, - num_queries=num_queries, - num_single_step_prefills=num_single_step_prefills) - - return model_input - - def _async_process_outputs(self, model_input: StatefulModelInput, - output_proc_callback: Callable): - # Proceed with pythonization and output_proc in order. - # Stop on the first one that fails to pythonize - output_proc_callback() - - cont = True - for step_num, model_output in enumerate(model_input.cached_outputs): - if not model_output.pythonized: - model_output.maybe_pythonize(model_input, self._copy_stream, - self.pinned_sampled_token_ids) - if model_output.pythonized: - ctx = output_proc_callback.keywords["ctx"] - ctx.append_output( - outputs=[model_output.sampler_output], - seq_group_metadata_list=ctx.seq_group_metadata_list, - scheduler_outputs=ctx.scheduler_outputs, - is_async=False, - is_last_step=False, - is_first_step_output=step_num == 0) - - output_proc_callback() - else: - cont = False - - if not cont: - break - - def _final_process_outputs( - self, model_input: StatefulModelInput, - output_proc_callback: Optional[Callable]) -> List[SamplerOutput]: - assert model_input.frozen_model_input is not None - - has_async_callback = output_proc_callback is not None - - outputs = [] - for step_num, output in enumerate(model_input.cached_outputs): - is_last_step = step_num == len(model_input.cached_outputs) - 1 - - # For non-async case: - # -- We simply add the outputs - # For async case: - # -- Invoke callback, pythonize, add to callback queue and repeat - # -- For last output, just add to callback queue - if has_async_callback: - assert output_proc_callback is not None - - # Invoke callback before pythonize (to overlap with GPU) - output_proc_callback() - - # Pythonize - if not output.pythonized: - output.pythonize(model_input, self._copy_stream, - self.pinned_sampled_token_ids) - - # For non last step, add to callback queue to chain - # callbacks=>pythonize pairs (for GPU overlap) - if not is_last_step: - ctx = output_proc_callback.keywords[ # type: ignore - "ctx"] # type: ignore - ctx.append_output( - outputs=[output.sampler_output], - seq_group_metadata_list=ctx. - seq_group_metadata_list, - scheduler_outputs=ctx.scheduler_outputs, - is_async=False, - is_last_step=False, - is_first_step_output=step_num == 0) - else: - outputs.append(output.sampler_output) - else: - output.pythonize(model_input, self._copy_stream, - self.pinned_sampled_token_ids) - outputs.append(output.sampler_output) - - return outputs - - @torch.inference_mode() - def execute_model( - self, - model_input: StatefulModelInput, - kv_caches: List[torch.Tensor], - intermediate_tensors: Optional[IntermediateTensors] = None, - num_steps: int = 1, - ) -> Optional[Union[List[SamplerOutput], IntermediateTensors]]: - """ - Execute the model for a single step and update multi-step - metadata - """ - assert num_steps == 1, "MultiStepModelRunner only supports num_steps=1" - frozen_model_input = model_input.frozen_model_input - assert frozen_model_input is not None - - # path for warm up runs - if not model_input.is_multi_step: - return self._base_model_runner.execute_model( - frozen_model_input, None, intermediate_tensors, num_steps) - - # make sure we skip the sampler on the lask rank and only pythonize - # if CPU is ahead. - if self.is_driver_worker and get_pp_group().is_last_rank: - if self.pinned_sampled_token_ids is None: - self.pinned_sampled_token_ids = torch.zeros( - (self.scheduler_config.max_num_seqs, 1), - dtype=torch.long, - device="cpu", - pin_memory=True) - - self._base_model_runner.sampler.include_gpu_probs_tensor = True - if frozen_model_input.sampling_metadata: - frozen_model_input.sampling_metadata.skip_sampler_cpu_output = ( - True) - - # some pre-execute model logic for multi-step: - # - if it's the first step, we need to reset the sampling tensors - # - if it's not the first step, we need to advance the step using the - # appended sampler output from last iteration - # - also maybe pythonize if CPU is ahead of GPU - - stream = current_stream() - if not model_input.is_first_multi_step: - # Explicitly block on the previous step's forward to make sure we - # don't clobber any GPU tensors still in use. - # This is not needed for flashattn backend, but for other attn - # backends such as flashinfer that performs extra CPU operations on - # input metadata we may need to synchronize any CPU operations that - # might clobber enqueued forwards. (prevents CPU from running too - # far ahead if needed) - model_input.wait_previous_step() - model_input = self._advance_step( - model_input, model_input.cached_outputs[-1].sampler_output) - - # frozen_model_input may have been updated - frozen_model_input = model_input.frozen_model_input - assert frozen_model_input is not None - - if model_input.base_output_proc_callback is None: - assert frozen_model_input is not None - model_input.base_output_proc_callback = \ - frozen_model_input.async_callback - - if frozen_model_input.async_callback is not None: - assert model_input.base_output_proc_callback is not None - async_callback = functools.partial( - self._async_process_outputs, - model_input=model_input, - output_proc_callback=model_input.base_output_proc_callback) - - model_input.frozen_model_input = dataclasses.replace( # type: ignore - model_input.frozen_model_input, - async_callback=async_callback) - # Update the local instance - frozen_model_input = model_input.frozen_model_input - assert frozen_model_input is not None - - # Execute the model - output = self._base_model_runner.execute_model(frozen_model_input, - None, - intermediate_tensors, - num_steps=1) - - # record the event for the current step so that the next step can sync - model_input.record_step_event(stream) - - if get_pp_group().is_last_rank and self.is_driver_worker: - assert isinstance(output, list) - assert len( - output - ) == 1, "MultiStepModelRunner requires single-step base_models" - - # event for the pythonization so that we only pythonize if the - # tensors are ready. May be able to be combined with the step event - output_ready_event = torch.cuda.Event() - output_ready_event.record(stream) - if self.parallel_config.pipeline_parallel_size > 1: - output[0].sampled_token_ids_cpu = output[ - 0].sampled_token_ids.cpu() - model_input.cached_outputs.append( - ModelOutput(output[0], output_ready_event, - output[0].sampled_token_ids, False, - output[0].logprobs, self.pythonization_cache)) - - # These GPU tensors are not required by multi-step; - # erase them to ensure they are not pythonized or - # transferred to CPU - output[0].sampled_token_ids = None - output[0].sampled_token_probs = None - output[0].logprobs = None - - # Pythonize the output if CPU is ahead and the previous step is - # ready. - if frozen_model_input.async_callback is None: - for model_output in model_input.cached_outputs: - model_output.maybe_pythonize(model_input, - self._copy_stream, - self.pinned_sampled_token_ids) - - model_input.current_step += 1 - - if not get_pp_group().is_last_rank: - # Should be IntermediateTensors - assert isinstance(output, IntermediateTensors) - return output - if not self.is_driver_worker: - return [] - - # Pythonize the output and block if needed since it is the last step - if model_input.is_last_step: - outputs = self._final_process_outputs( - model_input, model_input.base_output_proc_callback) - if self.pythonization_cache: - self.pythonization_cache.reset() - return outputs - - # should be [SamplerOutput] - return output - - def _update_sampling_metadata(self, sampling_metadata: SamplingMetadata, - num_seqs: Optional[int], num_queries: int): - - assert sampling_metadata.num_prompts == 0 - assert len(sampling_metadata.seq_groups) == num_queries - assert sampling_metadata.selected_token_indices.shape == ( - num_queries, ) - # assert sampling_metadata.categorized_sample_indices == TODO: Add if needed # noqa: E501 - - # Verify that all sequences are decodes - for i in range(num_queries): - seq_group = sampling_metadata.seq_groups[i] - - assert seq_group.is_prompt is False # No prompt - assert seq_group.prompt_logprob_indices == [] # No prompt - assert seq_group.sample_indices == [i] # Simple - assert seq_group.seq_len is None # Decode - assert seq_group.query_len is None # Decode - - def _advance_step(self, model_input: StatefulModelInput, - out: SamplerOutput) -> StatefulModelInput: - - model_input.maybe_advance_frozen_model_input(self.device, - self.pin_memory) - frozen_model_input = model_input.frozen_model_input - assert frozen_model_input is not None - assert frozen_model_input.input_tokens is not None - assert frozen_model_input.input_tokens.shape[0] == model_input.num_seqs - assert frozen_model_input.attn_metadata is not None - - sampled_token_ids = model_input.cached_outputs[-1].sampled_token_ids - num_seqs = model_input.num_seqs - num_queries = model_input.num_queries - frozen_model_input = model_input.frozen_model_input - assert frozen_model_input is not None - attn_metadata = frozen_model_input.attn_metadata - assert attn_metadata is not None - - turn_prefills_into_decodes: bool = model_input.current_step == 1 and \ - model_input.num_single_step_prefills != 0 - attn_metadata.advance_step( - frozen_model_input, - sampled_token_ids, - self.block_size, - num_seqs, - num_queries, - turn_prefills_into_decodes=turn_prefills_into_decodes) - - return model_input - - def load_model(self) -> None: - self._base_model_runner.load_model() - self.model_memory_usage = self._base_model_runner.model_memory_usage - - def save_sharded_state( - self, - path: str, - pattern: Optional[str] = None, - max_size: Optional[int] = None, - ) -> None: - return self._base_model_runner.save_sharded_state( - path, pattern, max_size) - - def save_tensorized_model(self, - tensorizer_config: TensorizerConfig) -> None: - return self._base_model_runner.save_tensorized_model(tensorizer_config) - - def profile_run(self) -> None: - return self._base_model_runner.profile_run() - - def remove_all_loras(self): - return self._base_model_runner.remove_all_loras() - - def capture_model(self, kv_caches: List[List]) -> None: - return self._base_model_runner.capture_model(kv_caches) - - @property - def vocab_size(self) -> int: - return self._base_model_runner.vocab_size - - -DeferredLogprobsReturnType = Tuple[Optional[List[Optional[PromptLogprobs]]], - Optional[List[SampleLogprobs]]] - - -def deferred_pythonize_logprobs( - output: SamplerOutput, - sampling_metadata: SamplingMetadata, - logprobs_tensor: Optional[torch.Tensor], -) -> DeferredLogprobsReturnType: - """Perform deferred logprob Pythonization. - - 1. Pythonize GPU-side sampler result tensors into CPU-side sampler result. - 2. Pythonize GPU-side logprobs tensor into CPU-side logprobs lists, - utilizing the Pythonized sampler result computed in step 1. - - These deferred computations are not required for single-step scheduling - or the `profile_run()` phase of multi-step scheduling. - - Args: - output: sampler output (under deferred Pythonization) - sampling_metadata - - Returns: - prompt_logprobs (CPU), sample_logprobs (CPU) - """ - - # - Deferred pythonization of sample result - sampler_result = get_pythonized_sample_results( - output.deferred_sample_results_args) - - # - Erase the GPU-side deferred sample_result - # computation args to ensure it is never - # pythonized or transferred to CPU - output.deferred_sample_results_args = None - - # - Deferred pythonization of logprobs - ( - prompt_logprobs, - sample_logprobs, - ) = get_logprobs(logprobs_tensor, sampling_metadata, sampler_result) - assert len(prompt_logprobs) == len(sampling_metadata.seq_groups) - assert len(sample_logprobs) == len(sampling_metadata.seq_groups) - - return prompt_logprobs, sample_logprobs - - -def _pythonize_sampler_output( - model_input: StatefulModelInput, - output: SamplerOutput, - pinned_sampled_token_buffer: torch.Tensor, - sampled_token_ids: torch.Tensor, - logprobs_tensor: Optional[torch.Tensor], - cache: Optional[PythonizationCache], -) -> None: - """ This function is only called when the output tensors are ready. - See [`ModelOutput`][vllm.worker.multi_step_model_runner.ModelOutput]. - - Modifies `output.outputs` and `pinned_sampled_token_buffer` in-place, - adding a Pythonized output data structure - ([`CompletionSequenceGroupOutput`][vllm.sequence.CompletionSequenceGroupOutput]) - for each [`SequenceGroup`][vllm.sequence.SequenceGroup]. - - Args: - model_input - output: sampler output - pinned_sampled_token_token_buffer: CPU-side pinned memory - (receives copy of - GPU-side token buffer.) - sampled_token_ids: GPU-side token buffer - logprobs_tensor: GPU-side tensor containing - logprobs computed during sampling - """ - - assert model_input.frozen_model_input is not None - - frozen_model_input = model_input.frozen_model_input - assert frozen_model_input.sampling_metadata is not None - sampling_metadata = frozen_model_input.sampling_metadata - # samples generation should have been skipped - assert not output.outputs - - pinned_buffer = pinned_sampled_token_buffer[:model_input.num_queries] - - # We guarantee output tensors are ready, so it is safe to - # pythonize the sampler output & obtain CPU-side logprobs. - # - # However we should check whether logprobs pythonization may - # be skipped entirely, i.e. because no logprobs were requested - # or pythonization was not deferred. To that end, - # - # * `prompt_logprobs_are_requested_for_prefill` signals that - # there are *any* prefill-phase requests which specify that - # prompt logprobs should be returned. - # - # * `any_logprobs_are_requested` signals that there are any - # requests which (1) specify that sample logprobs should be - # returned, or (2) are in the prefill phase AND specify that - # prompt logprobs should be returned. - # - # Later on, these flags cause adjustments to the pythonization - # process to accommodate logprobs. - - seq_groups = sampling_metadata.seq_groups - prompt_logprobs_are_requested_for_prefill = any([ - sg.sampling_params.prompt_logprobs is not None and sg.is_prompt - for sg in seq_groups - ]) - any_logprobs_are_requested = ( - prompt_logprobs_are_requested_for_prefill - or any([sg.sampling_params.logprobs is not None for sg in seq_groups])) - - if prompt_logprobs_are_requested_for_prefill: - # CPU GPU sync, after gathering *only* sampled tokens (since - # requesting prompt logprobs leads `sampled_token_ids` to - # include prompt token ids in addition to sampled token ids.) - sample_idx_tensor = torch.tensor( - [sdx for sg in seq_groups for sdx in sg.sample_indices]) - pinned_buffer = pinned_buffer.copy_( - sampled_token_ids[sample_idx_tensor, :], non_blocking=False) - else: - # CPU GPU sync - pinned_buffer = pinned_buffer.copy_(sampled_token_ids, - non_blocking=False) - - # this will not block as the tensors are already on CPU - samples_list = pinned_buffer.tolist() - - skip_sampler_cpu_output = ( - frozen_model_input.sampling_metadata.skip_sampler_cpu_output) - - # *Don't* skip logprobs pythonization *if*: - # * Any requests require logprobs to be returned in this - # iteration AND - # * These requests are being scheduled in a fashion which - # defers pythonization (i.e. multi-step scheduling.) - do_pythonize_logprobs = (skip_sampler_cpu_output - and any_logprobs_are_requested) - ( - prompt_logprobs, - sample_logprobs, - ) = (deferred_pythonize_logprobs(output, sampling_metadata, - logprobs_tensor) - if do_pythonize_logprobs else (None, None)) - - for sgdx, (seq_group, - sample_result) in enumerate(zip(seq_groups, samples_list)): - # Reminder: Please update docs/features/compatibility_matrix.md - # If the feature combo become valid - # (Check for Guided Decoding) - if seq_group.sampling_params.logits_processors: - assert len(seq_group.sampling_params.logits_processors) == 0, ( - "Logits Processors are not supported in multi-step decoding") - - if do_pythonize_logprobs: - assert prompt_logprobs is not None - assert sample_logprobs is not None - - ( - group_prompt_logprobs, - group_sample_logprobs, - ) = ( # Utilize deferred pythonization results - prompt_logprobs[sgdx], - sample_logprobs[sgdx], - ) - elif any_logprobs_are_requested: - ( - group_prompt_logprobs, - group_sample_logprobs, - ) = ( - # profile_run: use already-computed logprobs - output.outputs[sgdx].prompt_logprobs, - [sample.logprobs for sample in output.outputs[sgdx].samples]) - - seq_ids = seq_group.seq_ids - next_token_ids = sample_result - parent_ids = [0] - seq_outputs: List[SequenceOutput] - - if cache is not None: - completion_seq_group_output: CompletionSequenceGroupOutput = \ - cache.cached_completion_seq_group_output.get_object() - completion_seq_group_output.samples.clear() - seq_outputs = completion_seq_group_output.samples - else: - seq_outputs = [] - - for tdx, (parent_id, - next_token_id) in enumerate(zip(parent_ids, next_token_ids)): - if cache is not None: - seq_output: SequenceOutput = cache.cached_seq_output.get_object( - ) - seq_output.parent_seq_id = seq_ids[parent_id] - seq_output.output_token = next_token_id - - if any_logprobs_are_requested: - seq_output.logprobs = group_sample_logprobs[tdx] - else: - logprobs = next(iter(seq_output.logprobs.values())) - seq_output.logprobs.clear() - - logprobs.logprob = float('inf') - logprobs.rank = None - logprobs.decoded_token = None - - seq_output.logprobs[next_token_id] = logprobs - - seq_outputs.append(seq_output) - - else: - seq_outputs.append( - SequenceOutput(seq_ids[parent_id], next_token_id, - (group_sample_logprobs[tdx] - if any_logprobs_are_requested else { - next_token_id: - Logprob(logprob=float('inf'), - rank=None, - decoded_token=None) - }))) - if cache is not None: - completion_seq_group_output.prompt_logprobs = \ - group_prompt_logprobs if any_logprobs_are_requested else None - output.outputs.append(completion_seq_group_output) - else: - output.outputs.append( - CompletionSequenceGroupOutput( - seq_outputs, (group_prompt_logprobs - if any_logprobs_are_requested else None))) - - assert len(output.outputs) > 0 diff --git a/vllm/worker/multi_step_neuron_model_runner.py b/vllm/worker/multi_step_neuron_model_runner.py deleted file mode 100644 index 25f588077cb42..0000000000000 --- a/vllm/worker/multi_step_neuron_model_runner.py +++ /dev/null @@ -1,84 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project - -from importlib.util import find_spec -from typing import List, Optional - -import torch - -from vllm.config import VllmConfig -from vllm.model_executor.layers.sampler import SamplerOutput -from vllm.multimodal import MultiModalKwargs -from vllm.sequence import IntermediateTensors -from vllm.worker.neuron_model_runner import (ModelInputForNeuron, - NeuronModelRunner) - - -class MultiStepNeuronModelRunner(NeuronModelRunner): - """A model runner for multi step decoding using the transformers_neuronx - framework""" - - def __init__( - self, - vllm_config: VllmConfig, - ): - super().__init__(vllm_config) - self.speculation_config = self.speculative_config - from transformers_neuronx.config import GenerationConfig - self.speculation_config.draft_model_config.neuron_sampling_params = ( - GenerationConfig( - max_length=self.scheduler_config.max_model_len, - do_sample=True, - per_batch_line=True, - top_k=[self._MAX_NEURON_SAMPLING_TOP_K] \ - * self.scheduler_config.max_num_seqs, - top_p=[1.0] * self.scheduler_config.max_num_seqs, - temperature=[1.0] * self.scheduler_config.max_num_seqs, - dynamic=True, - global_top_k=self._MAX_NEURON_SAMPLING_TOP_K - )) - - def load_model(self) -> None: - if find_spec("transformers_neuronx") is not None: - from vllm.model_executor.model_loader.neuron import ( - get_neuron_eagle_speculation_model, - get_neuron_speculation_model) - if self.speculation_config.speculative_token_tree is not None: - self.model = get_neuron_eagle_speculation_model( - self.model_config, - parallel_config=self.parallel_config, - scheduler_config=self.scheduler_config, - speculation_config=self.speculation_config) - else: - self.model = get_neuron_speculation_model( - self.model_config, - parallel_config=self.parallel_config, - scheduler_config=self.scheduler_config, - speculation_config=self.speculation_config) - else: - raise NotImplementedError( - "Supports only Transformer-NeuronX based models.") - - @torch.inference_mode() - def execute_model( - self, - model_input: ModelInputForNeuron, - kv_caches: Optional[List[torch.Tensor]] = None, - intermediate_tensors: Optional[IntermediateTensors] = None, - num_steps: int = 1, - ) -> Optional[List[SamplerOutput]]: - logits = self.model( - input_ids=model_input.input_tokens, - positions=model_input.input_positions, - input_block_ids=model_input.input_block_ids, - **MultiModalKwargs.as_kwargs( - model_input.multi_modal_kwargs or {}, - device=self.device, - ), - ) - - output = self.model.sample( - logits=logits, - sampling_metadata=model_input.sampling_metadata, - ) - return output diff --git a/vllm/worker/multi_step_neuronx_distributed_model_runner.py b/vllm/worker/multi_step_neuronx_distributed_model_runner.py deleted file mode 100644 index dd521dd67dad0..0000000000000 --- a/vllm/worker/multi_step_neuronx_distributed_model_runner.py +++ /dev/null @@ -1,63 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project -from typing import List, Optional - -import torch - -from vllm.config import VllmConfig -from vllm.model_executor.layers.sampler import SamplerOutput -from vllm.multimodal import MultiModalKwargs -from vllm.sequence import IntermediateTensors -from vllm.worker.neuronx_distributed_model_runner import ( - NeuronxDistributedModelRunner) - - -class MultiStepNeuronxDistributedModelRunner(NeuronxDistributedModelRunner): - """A model runner for multi-step decoding using the - neuronx-distributed-inference framework""" - - def __init__( - self, - vllm_config: VllmConfig, - ): - super().__init__(vllm_config) - - def load_model(self) -> None: - from vllm.model_executor.model_loader.neuronx_distributed import ( - get_neuron_speculation_model) - self.model = get_neuron_speculation_model( - self.model_config, - parallel_config=self.parallel_config, - scheduler_config=self.scheduler_config, - speculation_config=self.speculative_config) - - @torch.inference_mode() - def execute_model( - self, - model_input, - kv_caches: Optional[List[torch.Tensor]] = None, - intermediate_tensors: Optional[IntermediateTensors] = None, - num_steps: int = 1, - ) -> Optional[List[SamplerOutput]]: - sampling_params = torch.tensor([[ - seq_group.sampling_params.top_k, - seq_group.sampling_params.top_p, - seq_group.sampling_params.temperature, - ] for seq_group in model_input.sampling_metadata.seq_groups]) - - logits = self.model( - input_ids=model_input.input_tokens, - positions=model_input.input_positions, - input_block_ids=model_input.input_block_ids, - sampling_params=sampling_params, - **MultiModalKwargs.as_kwargs( - model_input.multi_modal_kwargs or {}, - device=self.device, - ), - ) - - output = self.model.sample( - logits=logits, - sampling_metadata=model_input.sampling_metadata, - ) - return output diff --git a/vllm/worker/multi_step_worker.py b/vllm/worker/multi_step_worker.py deleted file mode 100644 index ea16e14f9ecd4..0000000000000 --- a/vllm/worker/multi_step_worker.py +++ /dev/null @@ -1,197 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project - -import dataclasses -from dataclasses import dataclass -from typing import Dict, List, Optional, Tuple - -import torch - -from vllm.distributed import broadcast_tensor_dict, get_pp_group -from vllm.model_executor.layers.sampler import SamplerOutput -from vllm.sequence import ExecuteModelRequest -from vllm.worker.model_runner_base import BroadcastableModelInput -from vllm.worker.multi_step_model_runner import (MultiStepModelRunner, - StatefulModelInput) -from vllm.worker.worker import Worker, WorkerInput - - -@dataclass -class MultiStepState: - worker_input: WorkerInput - model_input: StatefulModelInput - - -class MultiStepWorker(Worker): - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - base_model_runner = self.model_runner - # for multi-step model, wrap the model runner with MultiStepModelRunner - self.model_runner = MultiStepModelRunner( - base_model_runner, - vllm_config=base_model_runner.vllm_config, - kv_cache_dtype=self.cache_config.cache_dtype, - is_driver_worker=base_model_runner.is_driver_worker, - ) - - pipeline_parallel_size = self.parallel_config.pipeline_parallel_size - self.multi_step_states: List[ - Optional[MultiStepState]] = [None] * pipeline_parallel_size - self.temp_output = None - - def _get_driver_input_and_broadcast( - self, execute_model_req: ExecuteModelRequest - ) -> Tuple[BroadcastableModelInput, WorkerInput, Dict[str, torch.Tensor]]: - """ - Get the driver input and broadcast it to other workers. - """ - assert self.is_driver_worker - virtual_engine = execute_model_req.virtual_engine - is_first_multi_step = execute_model_req.is_first_multi_step - if is_first_multi_step: - # on first step we prepare the worker input and model input normally - worker_input: WorkerInput = self.prepare_worker_input( - execute_model_req=execute_model_req) - model_input: StatefulModelInput = ( - self.model_runner.prepare_model_input( - execute_model_req.seq_group_metadata_list, - execute_model_req.virtual_engine, - execute_model_req.finished_requests_ids)) - - if execute_model_req.async_callback: - model_input.frozen_model_input = dataclasses.replace( # type: ignore - model_input.frozen_model_input, - async_callback=execute_model_req.async_callback) - else: - # on subsequent steps we reuse the worker input and model input - multi_step_state = self.multi_step_states[virtual_engine] - worker_input = multi_step_state.worker_input - model_input = multi_step_state.model_input - frozen_model_input = model_input.frozen_model_input - assert frozen_model_input is not None - assert frozen_model_input.attn_metadata is not None - # clear the cached metadata so that it can be recomputed on - # the workers. - frozen_model_input.attn_metadata._cached_prefill_metadata = None - frozen_model_input.attn_metadata._cached_decode_metadata = None - - model_input.is_first_multi_step = is_first_multi_step - model_input.is_last_step = execute_model_req.is_last_step - - if not is_first_multi_step: - # we broadcast the last sampled token ids to all TP workers so they - # can update their model input metadata in-place. - self._prepare_last_sampled_token_ids_for_tp_workers( - execute_model_req=execute_model_req, model_input=model_input) - - if self.do_metadata_broadcast: - broadcast_data = worker_input.as_broadcastable_tensor_dict() - broadcast_data.update(model_input.as_broadcastable_tensor_dict()) - broadcast_tensor_dict(broadcast_data, src=0) - - # Retuning empty dict here to keep this compatible with - # `LocalOrDistributedWorkerBase._get_driver_input_and_broadcast` - return model_input, worker_input, {} - - def _prepare_last_sampled_token_ids_for_tp_workers( - self, - execute_model_req: ExecuteModelRequest, - model_input: StatefulModelInput, - ) -> None: - """ - Prepare the last sampled token ids for TP workers. If it's the last - PP rank, then the last sampled token ids are already in the model_input. - If it is NOT the last PP rank, then we need to get the last sampled - token that is cached in the execute_model_req. - """ - if get_pp_group().is_last_rank: - assert model_input.cached_outputs[ - -1].sampler_output.sampled_token_ids is None - assert model_input.cached_outputs[-1].sampled_token_ids is not None - model_input.last_sampled_token_ids = model_input.cached_outputs[ - -1].sampled_token_ids - # free sampled token ids from the previous step if it has been - # pythonized. Cannot free the last sampled token ids because - # we need it for GPU advance_step. - for output in model_input.cached_outputs[:-1]: - if output.pythonized: - output.sampled_token_ids = None - else: - # otherwise we need to get the cached sampled token ids from the - # execute_model_req - assert execute_model_req.last_sampled_token_ids is not None - model_input.last_sampled_token_ids = ( - execute_model_req.last_sampled_token_ids.cuda()) - model_input.add_sampler_output( - SamplerOutput(outputs=[], sampled_token_ids=None), - model_input.last_sampled_token_ids) - - # free sampled token ids from the previous step. - # TODO(will) we could reuse the sampled token ids tensor from - # the previous step instead. - for output in model_input.cached_outputs[:-1]: - output.sampled_token_ids = None - assert model_input.cached_outputs[-1].sampled_token_ids is not None - - def prepare_input( - self, - execute_model_req: Optional[ExecuteModelRequest] = None, - ) -> Optional[Tuple[StatefulModelInput, WorkerInput, Dict[str, - torch.Tensor]]]: - """ - Depending on the current state of the request and multi step worker, - this method may skip the normal _prepare_model_input and - _prepare_worker_input methods and instead used cached values. - """ - if self.is_driver_worker: - if execute_model_req is None: - if self.do_metadata_broadcast: - # This signals that there's no more requests to process for - # now. All workers are running infinite loop with - # broadcast_tensor_dict, and it stops the loop when the - # driver broadcasts an empty input. Send an empty input to - # notify all other workers to stop their execution loop. - broadcast_tensor_dict({}, src=0) - return None - - virtual_engine = execute_model_req.virtual_engine - (model_input, worker_input, - kwargs) = self._get_driver_input_and_broadcast(execute_model_req) - assert isinstance(model_input, StatefulModelInput) - if execute_model_req.is_first_multi_step: - # cache the worker input and model input for the next steps - self.multi_step_states[virtual_engine] = MultiStepState( - worker_input=worker_input, model_input=model_input) - # if TP workers - else: - broadcast_data = self._get_worker_input_from_broadcast() - # if the driver has sent an empty input, we should stop the worker - # loop - if broadcast_data is None: - return None - model_input, worker_input, kwargs = broadcast_data - assert isinstance(model_input, StatefulModelInput) - virtual_engine = worker_input.virtual_engine - if model_input.is_first_multi_step: - pass - # TODO(will) Can cache the worker input and model input for the - # next steps. See below for details - else: - # TODO(will) possible to also cache and reuse the cached worker - # input and model input. The idea is essentially the delta - # optimization for model_inputs. Where the TP workers can cache - # the model input states and we only broadcast the delta need - # for the next step (sampled_token_ids from the previous step) - - assert isinstance(model_input, StatefulModelInput) - # we need to update the last sampled token ids in the model - # input for the workers so that they can run inplace - # advance_step - model_input.add_sampler_output( - SamplerOutput(outputs=[], sampled_token_ids=None), - model_input.last_sampled_token_ids) - - assert model_input is not None - assert worker_input is not None - return model_input, worker_input, kwargs diff --git a/vllm/worker/neuron_worker.py b/vllm/worker/neuron_worker.py index 4e1408300fb8b..3e4512a639083 100644 --- a/vllm/worker/neuron_worker.py +++ b/vllm/worker/neuron_worker.py @@ -64,25 +64,21 @@ class NeuronWorker(LocalOrDistributedWorkerBase): assert (self.lora_config is None), ("LoRA is not supported for TransformersNeuronX " "framework.") - from vllm.worker.multi_step_neuron_model_runner import ( - MultiStepNeuronModelRunner) if self.speculative_config is not None: - return MultiStepNeuronModelRunner(vllm_config=vllm_config) - else: - return NeuronModelRunner(vllm_config=vllm_config) + raise NotImplementedError( + "Speculative decoding is not supported for TransformersNeuronX" + ) + return NeuronModelRunner(vllm_config=vllm_config) def get_neuronx_distributed_model_runner(self, vllm_config): - from vllm.worker.multi_step_neuronx_distributed_model_runner import ( - MultiStepNeuronxDistributedModelRunner) from vllm.worker.neuronx_distributed_model_runner import ( NeuronxDistributedModelRunner) if self.speculative_config is not None: - assert (self.lora_config - is None), "LoRA is not supported for Speculative Decoding" - return MultiStepNeuronxDistributedModelRunner( - vllm_config=vllm_config) - else: - return NeuronxDistributedModelRunner(vllm_config=vllm_config) + assert (self.lora_config is None), ( + "LoRA is not supported for Speculative Decoding") + raise NotImplementedError( + "Speculative decoding is not supported for NeuronxDistributed") + return NeuronxDistributedModelRunner(vllm_config=vllm_config) def init_device(self) -> None: self.init_distributed_environment()