[V0 Deprecation] Remove more V0 tests (#25117)

Signed-off-by: Woosuk Kwon <woosuk.kwon@berkeley.edu>
This commit is contained in:
Woosuk Kwon 2025-09-17 22:05:25 -07:00 committed by GitHub
parent 9d8a2d86d2
commit 5c65a72bb1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 0 additions and 1500 deletions

View File

@ -46,22 +46,18 @@ steps:
mirror_hardwares: [amdexperimental]
source_file_dependencies:
- vllm/
- tests/async_engine
- tests/test_inputs.py
- tests/test_outputs.py
- tests/multimodal
- tests/utils_
- tests/worker
- tests/standalone_tests/lazy_imports.py
- tests/transformers_utils
commands:
- python3 standalone_tests/lazy_imports.py
- pytest -v -s async_engine # AsyncLLMEngine
- pytest -v -s test_inputs.py
- pytest -v -s test_outputs.py
- pytest -v -s multimodal
- pytest -v -s utils_ # Utils
- pytest -v -s worker # Worker
- pytest -v -s transformers_utils # transformers_utils
- label: Python-only Installation Test # 10min
@ -82,14 +78,12 @@ steps:
- vllm/
- tests/basic_correctness/test_basic_correctness
- tests/basic_correctness/test_cpu_offload
- tests/basic_correctness/test_preemption
- tests/basic_correctness/test_cumem.py
commands:
- export VLLM_WORKER_MULTIPROC_METHOD=spawn
- pytest -v -s basic_correctness/test_cumem.py
- pytest -v -s basic_correctness/test_basic_correctness.py
- pytest -v -s basic_correctness/test_cpu_offload.py
- VLLM_TEST_ENABLE_ARTIFICIAL_PREEMPT=1 pytest -v -s basic_correctness/test_preemption.py
- label: Entrypoints Unit Tests # 5min
timeout_in_minutes: 10

2
.github/CODEOWNERS vendored
View File

@ -41,7 +41,6 @@ CMakeLists.txt @tlrmchlsmth @LucasWilkinson
# Test ownership
/.buildkite/lm-eval-harness @mgoin @simon-mo
/tests/async_engine @njhill @robertgshaw2-redhat @simon-mo
/tests/distributed/test_multi_node_assignment.py @youkaichao
/tests/distributed/test_pipeline_parallel.py @youkaichao
/tests/distributed/test_same_node.py @youkaichao
@ -50,7 +49,6 @@ CMakeLists.txt @tlrmchlsmth @LucasWilkinson
/tests/kernels @mgoin @tlrmchlsmth @WoosukKwon @yewentao256
/tests/models @DarkLight1337 @ywang96
/tests/multimodal @DarkLight1337 @ywang96 @NickLucche
/tests/prefix_caching @comaniac @KuntaiDu
/tests/quantization @mgoin @robertgshaw2-redhat @yewentao256
/tests/test_inputs.py @DarkLight1337 @ywang96
/tests/v1/entrypoints/llm/test_struct_output_generate.py @mgoin @russellb @aarnphm

View File

@ -1,54 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""vllm.entrypoints.api_server with some extra logging for testing."""
from collections.abc import Iterable
from typing import Any
import uvicorn
from fastapi.responses import JSONResponse, Response
import vllm.entrypoints.api_server
import vllm.envs as envs
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.async_llm_engine import AsyncLLMEngine
from vllm.utils import FlexibleArgumentParser
app = vllm.entrypoints.api_server.app
class AsyncLLMEngineWithStats(AsyncLLMEngine):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._num_aborts = 0
async def _engine_abort(self, request_ids: Iterable[str]):
ids = list(request_ids)
self._num_aborts += len(ids)
await super()._engine_abort(ids)
def testing_stats(self) -> dict[str, Any]:
return {"num_aborted_requests": self._num_aborts}
@app.get("/stats")
def stats() -> Response:
"""Get the statistics of the engine."""
return JSONResponse(engine.testing_stats())
if __name__ == "__main__":
parser = FlexibleArgumentParser()
parser.add_argument("--host", type=str, default="localhost")
parser.add_argument("--port", type=int, default=8000)
parser = AsyncEngineArgs.add_cli_args(parser)
args = parser.parse_args()
engine_args = AsyncEngineArgs.from_cli_args(args)
engine = AsyncLLMEngineWithStats.from_engine_args(engine_args)
vllm.entrypoints.api_server.engine = engine
uvicorn.run(app,
host=args.host,
port=args.port,
log_level="debug",
timeout_keep_alive=envs.VLLM_HTTP_TIMEOUT_KEEP_ALIVE)

View File

@ -1,12 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import pytest
@pytest.fixture(scope="function", autouse=True)
def use_v0_only(monkeypatch):
"""
Since this module is V0 only, set VLLM_USE_V1=0 for
all tests in the module.
"""
monkeypatch.setenv('VLLM_USE_V1', '0')

View File

@ -1,139 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import copyreg
import os
import subprocess
import sys
import time
from multiprocessing import Pool
from pathlib import Path
import pytest
import requests
import urllib3.exceptions
def _pickle_new_connection_error(obj):
"""Custom pickler for NewConnectionError to fix tblib compatibility."""
# Extract the original message by removing the "conn: " prefix
full_message = obj.args[0] if obj.args else ""
if ': ' in full_message:
# Split off the connection part and keep the actual message
_, actual_message = full_message.split(': ', 1)
else:
actual_message = full_message
return _unpickle_new_connection_error, (actual_message, )
def _unpickle_new_connection_error(message):
"""Custom unpickler for NewConnectionError."""
# Create with None as conn and the actual message
return urllib3.exceptions.NewConnectionError(None, message)
# Register the custom pickle/unpickle functions for tblib compatibility
copyreg.pickle(urllib3.exceptions.NewConnectionError,
_pickle_new_connection_error)
def _query_server(prompt: str, max_tokens: int = 5) -> dict:
response = requests.post("http://localhost:8000/generate",
json={
"prompt": prompt,
"max_tokens": max_tokens,
"temperature": 0,
"ignore_eos": True
})
response.raise_for_status()
return response.json()
def _query_server_long(prompt: str) -> dict:
return _query_server(prompt, max_tokens=500)
@pytest.fixture
def api_server(distributed_executor_backend: str):
script_path = Path(__file__).parent.joinpath(
"api_server_async_engine.py").absolute()
commands = [
sys.executable,
"-u",
str(script_path),
"--model",
"facebook/opt-125m",
"--host",
"127.0.0.1",
"--distributed-executor-backend",
distributed_executor_backend,
]
# API Server Test Requires V0.
my_env = os.environ.copy()
my_env["VLLM_USE_V1"] = "0"
uvicorn_process = subprocess.Popen(commands, env=my_env)
yield
uvicorn_process.terminate()
@pytest.mark.timeout(300)
@pytest.mark.parametrize("distributed_executor_backend", ["mp", "ray"])
def test_api_server(api_server, distributed_executor_backend: str):
"""
Run the API server and test it.
We run both the server and requests in separate processes.
We test that the server can handle incoming requests, including
multiple requests at the same time, and that it can handle requests
being cancelled without crashing.
"""
with Pool(32) as pool:
# Wait until the server is ready
prompts = ["warm up"] * 1
result = None
while not result:
try:
for r in pool.map(_query_server, prompts):
result = r
break
except requests.exceptions.ConnectionError:
time.sleep(1)
# Actual tests start here
# Try with 1 prompt
for result in pool.map(_query_server, prompts):
assert result
num_aborted_requests = requests.get(
"http://localhost:8000/stats").json()["num_aborted_requests"]
assert num_aborted_requests == 0
# Try with 100 prompts
prompts = ["test prompt"] * 100
for result in pool.map(_query_server, prompts):
assert result
with Pool(32) as pool:
# Cancel requests
prompts = ["canceled requests"] * 100
pool.map_async(_query_server_long, prompts)
time.sleep(0.01)
pool.terminate()
pool.join()
# check cancellation stats
# give it some time to update the stats
time.sleep(1)
num_aborted_requests = requests.get(
"http://localhost:8000/stats").json()["num_aborted_requests"]
assert num_aborted_requests > 0
# check that server still runs after cancellations
with Pool(32) as pool:
# Try with 100 prompts
prompts = ["test prompt after canceled"] * 100
for result in pool.map(_query_server, prompts):
assert result

View File

@ -1,71 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import pytest
from vllm.engine.async_llm_engine import RequestTracker
from vllm.outputs import RequestOutput
@pytest.mark.asyncio
async def test_request_tracker():
tracker = RequestTracker()
stream_1 = tracker.add_request("1")
assert tracker.new_requests_event.is_set()
await tracker.wait_for_new_requests()
new, aborted = tracker.get_new_and_aborted_requests()
assert not tracker.new_requests_event.is_set()
assert len(new) == 1
assert new[0]["request_id"] == "1"
assert not aborted
assert not stream_1.finished
stream_2 = tracker.add_request("2")
stream_3 = tracker.add_request("3")
assert tracker.new_requests_event.is_set()
await tracker.wait_for_new_requests()
new, aborted = tracker.get_new_and_aborted_requests()
assert not tracker.new_requests_event.is_set()
assert len(new) == 2
assert new[0]["request_id"] == "2"
assert new[1]["request_id"] == "3"
assert not aborted
assert not stream_2.finished
assert not stream_3.finished
# request_ids must be unique
with pytest.raises(KeyError):
tracker.add_request("1")
assert not tracker.new_requests_event.is_set()
tracker.abort_request("1")
new, aborted = tracker.get_new_and_aborted_requests()
assert len(aborted) == 1
assert "1" in aborted
assert not new
assert stream_1.finished
stream_4 = tracker.add_request("4")
tracker.abort_request("4")
assert tracker.new_requests_event.is_set()
await tracker.wait_for_new_requests()
new, aborted = tracker.get_new_and_aborted_requests()
# aborted new requests will cancel each other out -
# there's no need for them to propagate into the
# engine
assert not aborted
assert not new
assert stream_4.finished
stream_5 = tracker.add_request("5")
assert tracker.new_requests_event.is_set()
tracker.process_request_output(
RequestOutput("2", "output", [], [], [], finished=True))
await tracker.wait_for_new_requests()
new, aborted = tracker.get_new_and_aborted_requests()
assert not tracker.new_requests_event.is_set()
assert not aborted
assert len(new) == 1
assert new[0]["request_id"] == "5"
assert stream_2.finished
assert not stream_5.finished

View File

@ -1,189 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""Compare the short outputs of HF and vLLM when using greedy sampling.
VLLM_TEST_ENABLE_ARTIFICIAL_PREEMPT=1 has to be set before running this test.
Run `VLLM_TEST_ENABLE_ARTIFICIAL_PREEMPT=1
pytest tests/basic_correctness/test_preemption.py`.
"""
import pytest
from prometheus_client import REGISTRY
import vllm.envs as envs
from vllm import SamplingParams
from vllm.core.scheduler import (ARTIFICIAL_PREEMPTION_MAX_CNT,
ENABLE_ARTIFICIAL_PREEMPT)
from ..models.utils import check_outputs_equal
MODELS = [
"distilbert/distilgpt2",
]
@pytest.fixture(scope="function", autouse=True)
def use_v0_only(monkeypatch):
"""
We should enable this for V1, but VLLM_TEST_ENABLE_ARTIFICIAL_PREEMPT,
so use VLLM_USE_V1=0 for all tests in the file.
"""
monkeypatch.setenv('VLLM_USE_V1', '0')
@pytest.fixture(scope="module", autouse=True)
def check_settings():
assert ENABLE_ARTIFICIAL_PREEMPT is True, (
"Use an env var VLLM_TEST_ENABLE_ARTIFICIAL_PREEMPT=1."
"`VLLM_TEST_ENABLE_ARTIFICIAL_PREEMPT=1 "
"pytest tests/basic_correctness/test_preemption.py`")
@pytest.fixture
def distributed_executor_backend() -> str:
# When SPMD worker is used, use distributed_executor_backend="ray"
# to test delta input optimization works with preemption.
return "ray" if envs.VLLM_USE_RAY_SPMD_WORKER else "mp"
@pytest.mark.parametrize("model", MODELS)
@pytest.mark.parametrize("dtype", ["half"])
@pytest.mark.parametrize("max_tokens", [96])
@pytest.mark.parametrize("chunked_prefill_token_size", [16])
def test_chunked_prefill_recompute(
hf_runner,
vllm_runner,
example_prompts,
model: str,
dtype: str,
max_tokens: int,
chunked_prefill_token_size: int,
distributed_executor_backend: str,
) -> None:
"""Ensure that chunked prefill works with preemption."""
max_num_seqs = min(chunked_prefill_token_size, 256)
enable_chunked_prefill = False
max_num_batched_tokens = None
if chunked_prefill_token_size != -1:
enable_chunked_prefill = True
max_num_batched_tokens = chunked_prefill_token_size
with hf_runner(model, dtype=dtype) as hf_model:
hf_outputs = hf_model.generate_greedy(example_prompts, max_tokens)
with vllm_runner(
model,
dtype=dtype,
max_num_batched_tokens=max_num_batched_tokens,
enable_chunked_prefill=enable_chunked_prefill,
max_num_seqs=max_num_seqs,
distributed_executor_backend=distributed_executor_backend,
disable_log_stats=False,
) as vllm_model:
vllm_outputs = vllm_model.generate_greedy(example_prompts, max_tokens)
assert (vllm_model.llm.llm_engine.scheduler[0].artificial_preempt_cnt
< ARTIFICIAL_PREEMPTION_MAX_CNT)
for i in range(len(example_prompts)):
hf_output_ids, hf_output_str = hf_outputs[i]
vllm_output_ids, vllm_output_str = vllm_outputs[i]
assert hf_output_str == vllm_output_str, (
f"Test{i}:\nHF: {hf_output_str!r}\nvLLM: {vllm_output_str!r}")
assert hf_output_ids == vllm_output_ids, (
f"Test{i}:\nHF: {hf_output_ids}\nvLLM: {vllm_output_ids}")
@pytest.mark.parametrize("model", MODELS)
@pytest.mark.parametrize("dtype", ["float"])
@pytest.mark.parametrize("max_tokens", [96])
def test_preemption(
caplog_vllm,
hf_runner,
vllm_runner,
example_prompts,
model: str,
dtype: str,
max_tokens: int,
distributed_executor_backend: str,
) -> None:
"""By default, recompute preemption is enabled"""
with hf_runner(model, dtype=dtype) as hf_model:
hf_outputs = hf_model.generate_greedy(example_prompts, max_tokens)
with vllm_runner(
model,
dtype=dtype,
disable_log_stats=False,
distributed_executor_backend=distributed_executor_backend,
) as vllm_model:
vllm_outputs = vllm_model.generate_greedy(example_prompts, max_tokens)
assert (vllm_model.llm.llm_engine.scheduler[0].artificial_preempt_cnt
< ARTIFICIAL_PREEMPTION_MAX_CNT)
total_preemption = (
vllm_model.llm.llm_engine.scheduler[0].num_cumulative_preemption)
check_outputs_equal(
outputs_0_lst=hf_outputs,
outputs_1_lst=vllm_outputs,
name_0="hf",
name_1="vllm",
)
assert ("is preempted by PreemptionMode.RECOMPUTE mode because there "
"is not enough KV cache space." in caplog_vllm.text)
# Ensure the count bucket of request-level histogram metrics matches
# the number of requests as a simple sanity check to ensure metrics are
# generated
preemption_metrics = None
for m in REGISTRY.collect():
if m.name == "vllm:num_preemptions":
preemption_metrics = m
assert preemption_metrics is not None
total_recorded_preemption = 0
for sample in preemption_metrics.samples:
total_recorded_preemption += sample.value
assert total_preemption == total_recorded_preemption
@pytest.mark.parametrize("model", MODELS)
@pytest.mark.parametrize("dtype", ["float"])
@pytest.mark.parametrize("max_tokens", [96])
def test_preemption_infeasible(
vllm_runner,
example_prompts,
model: str,
dtype: str,
max_tokens: int,
distributed_executor_backend: str,
) -> None:
"""Verify infeasible preemption request will be ignored."""
BLOCK_SIZE = 16
prefill_blocks = 2
decode_blocks = max_tokens // BLOCK_SIZE
with vllm_runner(
model,
dtype=dtype,
block_size=BLOCK_SIZE,
# Not enough gpu blocks to complete a single sequence.
# preemption should happen, and the sequence should be
# ignored instead of hanging forever.
num_gpu_blocks_override=prefill_blocks + decode_blocks // 2,
max_model_len=((prefill_blocks + decode_blocks // 2) * BLOCK_SIZE),
distributed_executor_backend=distributed_executor_backend,
) as vllm_model:
sampling_params = SamplingParams(max_tokens=max_tokens,
ignore_eos=True)
req_outputs = vllm_model.llm.generate(
example_prompts,
sampling_params=sampling_params,
)
assert (vllm_model.llm.llm_engine.scheduler[0].artificial_preempt_cnt
< ARTIFICIAL_PREEMPTION_MAX_CNT)
# Verify the request is ignored and not hang.
for req_output in req_outputs:
outputs = req_output.outputs
assert len(outputs) == 1
assert outputs[0].finish_reason == "length"

View File

@ -1,11 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import pytest
@pytest.fixture(autouse=True)
def v1(run_with_both_engines):
# Simple autouse wrapper to run both engines for each test
# This can be promoted up to conftest.py to run for every
# test in a package
pass

View File

@ -1,83 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import pytest
from vllm.engine.output_processor.stop_checker import StopChecker
from vllm.inputs import token_inputs
from vllm.sampling_params import SamplingParams
from vllm.sequence import Logprob, Sequence, SequenceStatus
def sequence_with_eos(text: str, eos_token: str,
eos_token_id: int) -> Sequence:
"""
Create a Sequence that ends with an EOS token.
"""
seq = Sequence(
seq_id=0,
inputs=token_inputs([]),
block_size=16,
eos_token_id=eos_token_id,
)
seq.output_text = text + eos_token
offset = eos_token_id + 1
for i in range(offset, len(text) + offset):
seq.append_token_id(token_id=i, logprobs={i: Logprob(0.0)})
seq.append_token_id(token_id=eos_token_id,
logprobs={eos_token_id: Logprob(0.0)})
seq.status = SequenceStatus.RUNNING
return seq
@pytest.mark.parametrize(["text_wo_eos", "eos_token", "eos_token_id"], [
("This text ends with EOS token", "</s>", 2),
])
@pytest.mark.parametrize("ignore_eos", [True, False])
@pytest.mark.parametrize("include_stop_str_in_output", [True, False])
@pytest.mark.skip_global_cleanup
def test_stop_on_eos_token(text_wo_eos: str, eos_token: str, eos_token_id: int,
ignore_eos: bool, include_stop_str_in_output: bool):
"""
Test the behavior of the StopChecker's maybe_stop_sequence method
when an EOS token is encountered.
This test covers:
- When the EOS token should stop the sequence and be removed from the output
- When the EOS token should stop the sequence and be included in the output
- When the EOS token should be ignored, and the sequence continues
"""
stop_checker = StopChecker(max_model_len=1024)
seq = sequence_with_eos(
text=text_wo_eos,
eos_token=eos_token,
eos_token_id=eos_token_id,
)
new_char_count = len(eos_token)
# Note that `stop` and `stop_token_ids` are not specified
sampling_params = SamplingParams(
min_tokens=1,
ignore_eos=ignore_eos,
include_stop_str_in_output=include_stop_str_in_output)
stop_checker.maybe_stop_sequence(
seq=seq,
new_char_count=new_char_count,
sampling_params=sampling_params,
)
if ignore_eos:
assert seq.status == SequenceStatus.RUNNING
assert seq.output_text == text_wo_eos + eos_token
elif include_stop_str_in_output:
assert seq.status == SequenceStatus.FINISHED_STOPPED
assert seq.output_text == text_wo_eos + eos_token
else:
assert seq.status == SequenceStatus.FINISHED_STOPPED
assert seq.output_text == text_wo_eos

View File

@ -81,13 +81,3 @@ def test_lm_eval_accuracy_v1_engine(monkeypatch: pytest.MonkeyPatch):
more_args = ["--max-num-seqs", "64"]
run_test(more_args)
@pytest.mark.parametrize("more_args", MORE_ARGS_LIST)
def test_lm_eval_accuracy_v0_engine(monkeypatch: pytest.MonkeyPatch,
more_args):
"""Run with the V0 Engine."""
with monkeypatch.context() as m:
m.setenv("VLLM_USE_V1", "0")
run_test(more_args)

View File

@ -1,182 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import pytest
import torch
from vllm import SamplingParams
from ..conftest import VllmRunner
MODELS = ["distilbert/distilgpt2"]
@pytest.fixture(scope="function", autouse=True)
def use_v0_only(monkeypatch):
"""
This module is V0 only since it uses dtype=float, so
set VLLM_USE_V1=0 for all tests in the module.
"""
monkeypatch.setenv('VLLM_USE_V1', '0')
@pytest.mark.parametrize("model", MODELS)
@pytest.mark.parametrize("dtype",
["float"]) # needed for comparing logprobs with HF
@pytest.mark.parametrize("chunked_prefill_token_size", [1, 4, 16, -1])
@pytest.mark.parametrize("num_top_logprobs", [0, 6]) # 32000 == vocab_size
@pytest.mark.parametrize("detokenize", [True, False])
def test_get_prompt_logprobs(
hf_runner,
vllm_runner,
model,
dtype,
chunked_prefill_token_size: int,
num_top_logprobs: int,
detokenize: bool,
example_prompts,
):
max_num_seqs = 256
enable_chunked_prefill = False
max_num_batched_tokens = None
if chunked_prefill_token_size != -1:
enable_chunked_prefill = True
max_num_seqs = min(chunked_prefill_token_size, max_num_seqs)
max_num_batched_tokens = chunked_prefill_token_size
max_tokens = 5
with hf_runner(model, dtype=dtype) as hf_model:
hf_logprobs = hf_model.generate_greedy_logprobs(
example_prompts,
max_tokens=max_tokens,
)
with vllm_runner(
model,
dtype=dtype,
max_logprobs=num_top_logprobs,
enable_chunked_prefill=enable_chunked_prefill,
max_num_batched_tokens=max_num_batched_tokens,
max_num_seqs=max_num_seqs,
) as vllm_model:
vllm_sampling_params = SamplingParams(max_tokens=max_tokens,
logprobs=num_top_logprobs,
prompt_logprobs=num_top_logprobs,
temperature=0.0,
detokenize=detokenize)
vllm_results = vllm_model.llm.generate(
example_prompts, sampling_params=vllm_sampling_params)
# Test whether logprobs are included in the results.
for result in vllm_results:
assert result.prompt_logprobs is not None
assert result.outputs[0].logprobs is not None
assert len(result.outputs[0].logprobs) == max_tokens
for logprobs in result.outputs[0].logprobs:
# If the output token is not included in the top X
# logprob, it can return 1 more data
assert (len(logprobs) == num_top_logprobs
or len(logprobs) == num_top_logprobs + 1)
output_text = result.outputs[0].text
output_string_from_most_likely_tokens_lst: list[str] = []
for top_logprobs in result.outputs[0].logprobs:
top_logprob = next(iter(top_logprobs.values()))
output_string_from_most_likely_tokens_lst.append(
top_logprob.decoded_token)
if detokenize:
output_string_from_most_likely_tokens = "".join(
output_string_from_most_likely_tokens_lst)
assert output_text == output_string_from_most_likely_tokens, (
"The output text from the top logprob for each token position "
"should be the same as the output text in the result.")
else:
assert output_text == ''
assert output_string_from_most_likely_tokens_lst == ([None] *
max_tokens)
# The first prompt logprob is always None
assert result.prompt_logprobs[0] is None
for prompt_logprobs in result.prompt_logprobs[1:]:
# If the prompt token is not included in the top X
# logprob, it can return 1 more data
assert (len(prompt_logprobs) == num_top_logprobs
or len(prompt_logprobs) == num_top_logprobs + 1)
# Test whether prompt logprobs are consistent with HF
for vllm_result, hf_logprob in zip(vllm_results, hf_logprobs):
# Check prompt logprobs
# The first prompt logprob is always None, so we compare it from 1:.
vllm_prompt_logprobs = vllm_result.prompt_logprobs[1:]
for i, vllm_prompt_logprob_dict in enumerate(vllm_prompt_logprobs):
for token_id, logprob in vllm_prompt_logprob_dict.items():
torch.testing.assert_close(logprob.logprob,
hf_logprob[0][i][token_id].item(),
atol=1e-2,
rtol=1e-2)
vllm_sample_logprobs = vllm_result.outputs[0].logprobs
for i, top_logprobs in enumerate(vllm_sample_logprobs):
for token_id, sample_logprob in top_logprobs.items():
logprob = sample_logprob.logprob
torch.testing.assert_close(logprob,
hf_logprob[i][-1][token_id].item(),
atol=1e-2,
rtol=1e-2)
if detokenize:
assert isinstance(sample_logprob.decoded_token, str), (
"The token should be decoded by the time it is returned"
" to the user.")
# Test if prompt logprobs are correctly set.
for vllm_result in vllm_results:
token_ids = vllm_result.prompt_token_ids
prompt_logprobs = vllm_result.prompt_logprobs
# The first token doesn't have logprob.
assert prompt_logprobs[0] is None
for token_id, logprob_dict in zip(token_ids[1:], prompt_logprobs[1:]):
assert token_id in logprob_dict
def test_max_logprobs():
runner = VllmRunner("facebook/opt-125m", max_logprobs=1)
vllm_sampling_params = SamplingParams(logprobs=1)
# should pass
runner.generate(["Hello world"], sampling_params=vllm_sampling_params)
bad_sampling_params = SamplingParams(logprobs=2)
with pytest.raises(ValueError):
runner.generate(["Hello world"], sampling_params=bad_sampling_params)
@pytest.mark.parametrize("model", MODELS)
@pytest.mark.parametrize("chunked_prefill_token_size", [1, 4, 16, -1])
@pytest.mark.parametrize("detokenize", [True, False])
def test_none_logprobs(vllm_runner, model, chunked_prefill_token_size: int,
detokenize: bool, example_prompts):
max_num_seqs = 256
enable_chunked_prefill = False
max_num_batched_tokens = None
if chunked_prefill_token_size != -1:
enable_chunked_prefill = True
max_num_seqs = min(chunked_prefill_token_size, max_num_seqs)
max_num_batched_tokens = chunked_prefill_token_size
max_tokens = 5
with vllm_runner(
model,
enable_chunked_prefill=enable_chunked_prefill,
max_num_batched_tokens=max_num_batched_tokens,
max_num_seqs=max_num_seqs,
) as vllm_model:
sampling_params_logprobs_none = SamplingParams(max_tokens=max_tokens,
logprobs=None,
temperature=0.0,
detokenize=detokenize)
results_logprobs_none = vllm_model.llm.generate(
example_prompts, sampling_params=sampling_params_logprobs_none)
for i in range(len(results_logprobs_none)):
assert results_logprobs_none[i].outputs[0].logprobs is None
assert results_logprobs_none[i].outputs[0].cumulative_logprob is None

View File

@ -1,11 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import pytest
@pytest.fixture(scope="function", autouse=True)
def use_v0_only(monkeypatch):
"""
This module tests V0 internals, so set VLLM_USE_V1=0.
"""
monkeypatch.setenv('VLLM_USE_V1', '0')

View File

@ -1,113 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import dataclasses
import torch
from vllm.attention import AttentionMetadata, AttentionMetadataBuilder
from vllm.attention.backends.abstract import AttentionBackend
from vllm.attention.backends.utils import CommonAttentionState
from vllm.model_executor import SamplingMetadata
from vllm.worker.model_runner import ModelInputForGPUWithSamplingMetadata
class MockAttentionBackend(AttentionBackend):
@staticmethod
def get_name() -> str:
raise NotImplementedError
@staticmethod
def get_impl_cls():
raise NotImplementedError
@staticmethod
def get_metadata_cls() -> type["AttentionMetadata"]:
return AttentionMetadata
@staticmethod
def get_builder_cls() -> type["AttentionMetadataBuilder"]:
return AttentionMetadataBuilder
@staticmethod
def get_state_cls() -> type["CommonAttentionState"]:
return CommonAttentionState
@staticmethod
def get_kv_cache_shape(
num_blocks: int,
block_size: int,
num_kv_heads: int,
head_size: int,
) -> tuple[int, ...]:
raise NotImplementedError
@staticmethod
def swap_blocks(
src_kv_cache: torch.Tensor,
dst_kv_cache: torch.Tensor,
src_to_dst: torch.Tensor,
) -> None:
pass
@staticmethod
def copy_blocks(
kv_caches: list[torch.Tensor],
src_to_dists: torch.Tensor,
) -> None:
pass
def test_model_runner_input():
sampling_metadata = SamplingMetadata(
["seq_group"],
"selected_token_indices",
"categorized_sample_indices",
"num_prompts",
)
attn_metadata = AttentionMetadata(
num_prefills=1,
num_prefill_tokens=2,
num_decode_tokens=3,
slot_mapping=torch.zeros(1),
multi_modal_placeholder_index_maps=None,
enable_kv_scales_calculation=True,
)
model_input = ModelInputForGPUWithSamplingMetadata(
input_tokens=torch.ones(10),
input_positions=torch.ones(10),
sampling_metadata=sampling_metadata,
attn_metadata=attn_metadata)
assert isinstance(model_input, ModelInputForGPUWithSamplingMetadata)
# Test round trip serialization.
tensor_dict = model_input.as_broadcastable_tensor_dict()
attn_backend = MockAttentionBackend()
received_model_input = (
ModelInputForGPUWithSamplingMetadata.from_broadcasted_tensor_dict(
tensor_dict, attn_backend=attn_backend))
# Check that received copy has correct values.
assert isinstance(received_model_input,
ModelInputForGPUWithSamplingMetadata)
assert received_model_input.input_tokens is not None
assert (
received_model_input.input_tokens == model_input.input_tokens).all()
assert received_model_input.input_positions is not None
assert (received_model_input.input_positions == model_input.input_positions
).all()
assert received_model_input.multi_modal_kwargs is None
assert (received_model_input.multi_modal_kwargs ==
model_input.multi_modal_kwargs)
assert received_model_input.lora_requests is None
assert received_model_input.lora_requests == model_input.lora_requests
assert received_model_input.lora_mapping is None
assert received_model_input.lora_mapping == model_input.lora_mapping
for field in dataclasses.fields(AttentionMetadata):
assert getattr(received_model_input.attn_metadata, field.name,
None) == getattr(attn_metadata, field.name, None)
# For sampling metadata, only selected_token_indices is copied.
assert (received_model_input.sampling_metadata.selected_token_indices ==
sampling_metadata.selected_token_indices)
assert received_model_input.sampling_metadata.seq_groups is None

View File

@ -1,462 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import pytest
import torch
from vllm.distributed.parallel_state import (ensure_model_parallel_initialized,
init_distributed_environment)
from vllm.engine.arg_utils import EngineArgs
from vllm.model_executor.sampling_metadata import SamplingMetadata
from vllm.sequence import SamplingParams, SequenceData, SequenceGroupMetadata
from vllm.utils import get_open_port
from vllm.worker.model_runner import ModelRunner
def _create_model_runner(model: str, *args, **kwargs) -> ModelRunner:
engine_args = EngineArgs(model, *args, **kwargs)
engine_config = engine_args.create_engine_config()
model_runner = ModelRunner(
vllm_config=engine_config,
is_driver_worker=True,
)
return model_runner
def test_deepseek_mla_attn_backend_module():
model_runner = _create_model_runner(
"deepseek-ai/DeepSeek-Coder-V2-Lite-Instruct",
trust_remote_code=True,
enable_chunked_prefill=False,
)
assert model_runner.attn_backend.__name__ == "TritonMLABackend"
@pytest.mark.parametrize("batch_size", list(range(1, 257, 3)))
@pytest.mark.parametrize("use_prompt_embeds", [True, False])
def test_prepare_prompt(batch_size, use_prompt_embeds, monkeypatch):
if use_prompt_embeds:
# Prompt Embeddings is only currently supported on V0
monkeypatch.setenv("VLLM_USE_V1", "0")
model_runner = _create_model_runner(
"facebook/opt-125m",
max_num_batched_tokens=100000,
max_num_seqs=100000,
enable_chunked_prefill=False,
enable_prompt_embeds=True,
)
seq_lens: list[int] = []
seq_group_metadata_list: list[SequenceGroupMetadata] = []
block_tables = {0: [1]}
expected_input_embeds_len = 0
for i in range(batch_size):
# make sure all tokens fit into one block
seq_len = i % (model_runner.block_size - 1) + 1
seq_lens.append(seq_len)
if use_prompt_embeds:
seq_data = SequenceData.from_seqs(
prompt_token_ids=[0] * seq_len,
prompt_embeds=torch.rand(seq_len, 10),
)
expected_input_embeds_len += seq_len
else:
seq_data = SequenceData.from_seqs(prompt_token_ids=range(seq_len))
seq_group_metadata = SequenceGroupMetadata(
request_id=f"test_{i}",
is_prompt=True,
seq_data={0: seq_data},
sampling_params=SamplingParams(temperature=0),
block_tables=block_tables,
)
assert seq_group_metadata.token_chunk_size == seq_data.get_len()
seq_group_metadata_list.append(seq_group_metadata)
expected_selected_token_indices = []
selected_token_start_idx = 0
for seq_len in seq_lens:
expected_selected_token_indices.append(selected_token_start_idx +
seq_len - 1)
selected_token_start_idx += seq_len
model_input = model_runner._prepare_model_input_tensors(
seq_group_metadata_list)
input_tokens = model_input.input_tokens
input_positions = model_input.input_positions
input_embeds = model_input.inputs_embeds
attn_metadata = model_input.attn_metadata
return_seq_lens = model_input.seq_lens
slot_mapping = attn_metadata.slot_mapping
assert return_seq_lens == seq_lens
assert len(slot_mapping) == len(input_tokens)
# Verify input metadata is correct for prompts.
device = model_runner.device
assert attn_metadata.num_prefills > 0
assert attn_metadata.num_decode_tokens == 0
torch.testing.assert_close(
attn_metadata.seq_lens_tensor,
torch.tensor(seq_lens, device=device, dtype=torch.int))
assert attn_metadata.seq_lens == seq_lens
assert attn_metadata.max_prefill_seq_len == max(seq_lens)
assert attn_metadata.max_decode_seq_len == 0
# Test subquery start locs.
start_idx = 0
start_loc = [start_idx]
for seq_len in seq_lens:
start_idx += seq_len
start_loc.append(start_idx)
torch.testing.assert_close(
attn_metadata.query_start_loc,
torch.tensor(start_loc, dtype=torch.int32, device=device))
# Test seq start locs. Note that for normal prefill it is
# equivalent to query_start_loc.
start_idx = 0
seq_start_loc = [start_idx]
for seq_len in seq_lens:
start_idx += seq_len
seq_start_loc.append(start_idx)
torch.testing.assert_close(
attn_metadata.seq_start_loc,
torch.tensor(start_loc, dtype=torch.int32, device=device))
torch.testing.assert_close(
attn_metadata.context_lens_tensor,
torch.zeros(attn_metadata.context_lens_tensor.shape[0],
dtype=torch.int,
device=device))
expected = torch.tensor([[] for _ in range(len(seq_group_metadata_list))],
dtype=torch.int32,
device=model_runner.device)
torch.testing.assert_close(attn_metadata.block_tables, expected)
# Cuda graph should not be used for prerill.
assert attn_metadata.use_cuda_graph is False
assert len(input_tokens) == sum(seq_lens)
assert len(input_positions) == sum(seq_lens)
if expected_input_embeds_len == 0:
torch.testing.assert_close(input_tokens, input_positions)
assert input_embeds is None
else:
assert len(input_embeds) == expected_input_embeds_len
sampling_metadata = SamplingMetadata.prepare(
seq_group_metadata_list,
seq_lens,
query_lens=seq_lens,
device=model_runner.device,
pin_memory=model_runner.pin_memory)
assert len(input_tokens) == sum(seq_lens)
assert len(input_positions) == sum(seq_lens)
actual = sampling_metadata.selected_token_indices
expected = torch.tensor(expected_selected_token_indices,
device=actual.device,
dtype=actual.dtype)
torch.testing.assert_close(actual, expected)
torch.allclose(input_tokens, input_positions)
actual = sampling_metadata.selected_token_indices
expected = torch.tensor(expected_selected_token_indices,
device=actual.device,
dtype=actual.dtype)
torch.testing.assert_close(actual, expected)
@pytest.mark.parametrize("batch_size", list(range(1, 257, 3)))
@pytest.mark.parametrize("use_prompt_embeds", [True, False])
def test_prepare_decode_cuda_graph(batch_size, use_prompt_embeds, monkeypatch):
if use_prompt_embeds:
# Prompt Embeddings is only currently supported on V0
monkeypatch.setenv("VLLM_USE_V1", "0")
model_runner = _create_model_runner(
"facebook/opt-125m",
seed=0,
dtype="float16",
enforce_eager=False,
max_num_batched_tokens=100000,
max_num_seqs=100000,
enable_chunked_prefill=False,
enable_prompt_embeds=True,
)
context_lens: list[int] = []
seq_group_metadata_list: list[SequenceGroupMetadata] = []
# Assume each seq group finishes prefill.
for i in range(batch_size):
# make sure all tokens fit into one block
context_len = i % (model_runner.block_size - 1) + 1
context_lens.append(context_len)
if use_prompt_embeds:
seq_data = SequenceData.from_seqs(
prompt_token_ids=[0] * context_len,
prompt_embeds=torch.rand(context_len, 10),
)
output_embed = torch.rand(10)
else:
seq_data = SequenceData.from_seqs(
prompt_token_ids=range(context_len))
output_embed = None
seq_data.update_num_computed_tokens(context_len)
# Append one token ID since prefill is finished.
seq_data.append_token_id(1, 0, output_embed)
seq_group_metadata = SequenceGroupMetadata(
request_id=f"test_{i}",
is_prompt=False,
seq_data={0: seq_data},
sampling_params=SamplingParams(temperature=0),
block_tables={0: [1]},
)
assert seq_group_metadata.token_chunk_size == 1
seq_group_metadata_list.append(seq_group_metadata)
model_input = model_runner._prepare_model_input_tensors(
seq_group_metadata_list)
input_tokens = model_input.input_tokens
input_positions = model_input.input_positions
input_embeds = model_input.inputs_embeds
attn_metadata = model_input.attn_metadata
slot_mapping = attn_metadata.slot_mapping
assert len(slot_mapping) == len(input_tokens)
expected_bs = model_runner.vllm_config.pad_for_cudagraph(
len(seq_group_metadata_list))
# Verify input metadata is correct for prompts.
device = model_runner.device
assert attn_metadata.num_prefills == 0
assert attn_metadata.num_prefill_tokens == 0
seq_lens = [context_len + 1 for context_len in context_lens]
# seq_lens are padded to expected_bs
for _ in range(expected_bs - len(seq_lens)):
seq_lens.append(1)
assert attn_metadata.seq_lens == seq_lens
assert attn_metadata.num_decode_tokens == len(seq_lens)
start_idx = 0
start_loc = [start_idx]
for _ in context_lens:
# decode has only 1 token for query.
start_idx += 1
start_loc.append(start_idx)
torch.testing.assert_close(
attn_metadata.query_start_loc,
torch.tensor(start_loc, dtype=torch.int32, device=device))
start_idx = 0
seq_start_loc = [start_idx]
for seq_len in seq_lens:
start_idx += seq_len
seq_start_loc.append(start_idx)
torch.testing.assert_close(
attn_metadata.seq_start_loc,
torch.tensor(seq_start_loc, dtype=torch.int32, device=device))
torch.testing.assert_close(
attn_metadata.context_lens_tensor,
torch.tensor(context_lens, dtype=torch.int, device=device))
assert attn_metadata.max_decode_seq_len == max(seq_lens)
torch.testing.assert_close(
attn_metadata.seq_lens_tensor[:len(seq_lens)],
torch.tensor(seq_lens, dtype=torch.int, device=device))
# block table's first index corresponds to each batch, meaning in
# decoding it is each token.
assert attn_metadata.block_tables.shape[0] == len(input_tokens)
# Block table's second dim corresponds to each token's block number.
# It is padded up to
assert attn_metadata.block_tables.shape[1] == (
model_runner.get_max_block_per_batch())
assert attn_metadata.use_cuda_graph is True
assert len(input_tokens) == expected_bs
assert len(input_positions) == expected_bs
if use_prompt_embeds:
expected_input_embeds_length = start_loc[-1]
assert len(input_embeds) == expected_input_embeds_length
assert expected_input_embeds_length <= expected_bs
else:
assert input_embeds is None
# Verify Sampling
expected_selected_token_indices = []
for selected_token_start_idx, _ in enumerate(context_lens):
expected_selected_token_indices.append(selected_token_start_idx)
sampling_metadata = SamplingMetadata.prepare(
seq_group_metadata_list,
seq_lens,
# query lens is all 1 for decode.
query_lens=[1 for _ in range(len(context_lens))],
device=model_runner.device,
pin_memory=model_runner.pin_memory)
actual = sampling_metadata.selected_token_indices
expected = torch.tensor(expected_selected_token_indices,
device=actual.device,
dtype=actual.dtype)
torch.testing.assert_close(actual, expected)
def test_empty_seq_group():
"""Verify prepare prompt and decode returns empty output."""
model_runner = _create_model_runner(
"facebook/opt-125m",
seed=0,
dtype="float16",
enforce_eager=False,
)
seq_group_metadata_list: list[SequenceGroupMetadata] = []
model_input = model_runner._prepare_model_input_tensors(
seq_group_metadata_list)
input_tokens = model_input.input_tokens
input_positions = model_input.input_positions
attn_metadata = model_input.attn_metadata
assert input_tokens is None
assert input_positions is None
assert attn_metadata is None
model_input = model_runner._prepare_model_input_tensors(
seq_group_metadata_list)
input_tokens = model_input.input_tokens
input_positions = model_input.input_positions
input_embeds = model_input.inputs_embeds
attn_metadata = model_input.attn_metadata
return_seq_lens = model_input.seq_lens
assert input_tokens is None
assert input_positions is None
assert input_embeds is None
assert attn_metadata is None
assert return_seq_lens is None
@pytest.fixture
def distributed_init():
init_distributed_environment(
world_size=1,
rank=0,
distributed_init_method=f"tcp://127.0.0.1:{get_open_port()}",
local_rank=0)
ensure_model_parallel_initialized(1, 1)
@pytest.mark.parametrize("batch_size", list(range(2, 128, 3)))
@pytest.mark.parametrize("enforce_eager", [True, False])
@pytest.mark.parametrize('use_prompt_embeds', [True, False])
def test_hybrid_batches(batch_size, enforce_eager, use_prompt_embeds,
distributed_init, monkeypatch):
if use_prompt_embeds:
# Prompt Embeddings is only currently supported on V0
monkeypatch.setenv("VLLM_USE_V1", "0")
model_runner = _create_model_runner(
"facebook/opt-125m",
seed=0,
dtype="float16",
enforce_eager=enforce_eager,
max_num_batched_tokens=100000,
max_num_seqs=100000,
enable_chunked_prefill=True,
enable_prompt_embeds=True,
)
# Add prefill requests.
seq_lens: list[int] = []
seq_group_metadata_list: list[SequenceGroupMetadata] = []
prefill_metadata_list: list[SequenceGroupMetadata] = []
decode_metadata_list: list[SequenceGroupMetadata] = []
block_tables = {0: [1]}
prefill_batch_size = batch_size // 2
decode_batch_size = batch_size - prefill_batch_size
expected_input_embeds_len = 0
for i in range(prefill_batch_size):
# make sure all tokens fit into one block
seq_len = i % (model_runner.block_size - 1) + 1
seq_lens.append(seq_len)
if use_prompt_embeds:
seq_data = SequenceData.from_seqs(
prompt_token_ids=[0] * seq_len,
prompt_embeds=torch.rand(seq_len, 10),
)
expected_input_embeds_len += seq_len
else:
seq_data = SequenceData.from_seqs(
prompt_token_ids=range(seq_len), )
seq_group_metadata = SequenceGroupMetadata(
request_id=f"test_{i}",
is_prompt=True,
seq_data={0: seq_data},
sampling_params=SamplingParams(temperature=0),
block_tables=block_tables,
)
assert seq_group_metadata.token_chunk_size == seq_data.get_len()
seq_group_metadata_list.append(seq_group_metadata)
prefill_metadata_list.append(seq_group_metadata)
# Add decode requests
for i in range(prefill_batch_size, batch_size):
# make sure all tokens fit into one block
context_len = i % (model_runner.block_size - 1) + 1
if use_prompt_embeds:
seq_data = SequenceData.from_seqs(
prompt_token_ids=[0] * context_len,
prompt_embeds=torch.rand(context_len, 10),
)
output_embed = torch.rand(10)
# This also iterates the expected input_embeds, because the model
# needs both the input and output embeddings passed into together
expected_input_embeds_len += 1
else:
seq_data = SequenceData.from_seqs(
prompt_token_ids=range(context_len), )
output_embed = None
assert len(seq_data.prompt_token_ids) == context_len
seq_data.append_token_id(1, 0, output_embed)
seq_data.update_num_computed_tokens(context_len)
seq_group_metadata = SequenceGroupMetadata(
request_id=f"test_{i}",
is_prompt=False,
seq_data={0: seq_data},
sampling_params=SamplingParams(temperature=0),
block_tables={0: [1]},
)
assert seq_group_metadata.token_chunk_size == 1
seq_group_metadata_list.append(seq_group_metadata)
decode_metadata_list.append(seq_group_metadata)
model_input = model_runner.prepare_model_input(seq_group_metadata_list)
input_tokens = model_input.input_tokens
input_positions = model_input.input_positions
input_embeds = model_input.inputs_embeds
attn_metadata = model_input.attn_metadata
prefill_meta_actual = attn_metadata.prefill_metadata
decode_meta_actual = attn_metadata.decode_metadata
assert len(attn_metadata.slot_mapping) == len(input_tokens)
assert len(input_positions) == len(input_tokens)
assert attn_metadata.num_prefills == prefill_batch_size
assert attn_metadata.num_decode_tokens == decode_batch_size
assert attn_metadata.num_prefill_tokens == sum(seq_lens)
if expected_input_embeds_len == 0:
assert input_embeds is None
else:
assert len(input_embeds) == expected_input_embeds_len
# Verify attn metadata is consistent. We don't need to test individual
# values here because they are tested above.
attn_metadata = model_runner._prepare_model_input_tensors(
seq_group_metadata_list).attn_metadata
for attr_expected, attr_actual in zip(vars(attn_metadata.prefill_metadata),
vars(prefill_meta_actual)):
assert attr_expected[1] == attr_actual[1]
for attr_expected, attr_actual in zip(vars(attn_metadata.decode_metadata),
vars(decode_meta_actual)):
assert attr_expected[1] == attr_actual[1]

View File

@ -1,68 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import torch
from vllm.engine.arg_utils import EngineArgs
from vllm.utils import get_distributed_init_method, get_ip, get_open_port
from vllm.worker.cache_engine import CacheEngine
from vllm.worker.worker import Worker
def test_gpu_memory_profiling():
# Tests the gpu profiling that happens in order to determine the number of
# KV cache blocks that we can allocate on the GPU.
# This test mocks the maximum available gpu memory so that it can run on
# any gpu setup.
# Set up engine args to build a worker.
engine_args = EngineArgs(model="facebook/opt-125m",
dtype="half",
load_format="dummy")
engine_config = engine_args.create_engine_config()
engine_config.cache_config.num_gpu_blocks = 1000
engine_config.cache_config.num_cpu_blocks = 1000
# Create the worker.
distributed_init_method = get_distributed_init_method(
get_ip(), get_open_port())
worker = Worker(
vllm_config=engine_config,
local_rank=0,
rank=0,
distributed_init_method=distributed_init_method,
is_driver_worker=True,
)
# Set 10GiB as the total gpu ram to be device-agnostic
def mock_mem_info():
current_usage = torch.cuda.memory_stats(
)["allocated_bytes.all.current"]
mock_total_bytes = 10 * 1024**3
free = mock_total_bytes - current_usage
return (free, mock_total_bytes)
from unittest.mock import patch
with patch("torch.cuda.mem_get_info", side_effect=mock_mem_info):
# Load the model so we can profile it
worker.init_device()
worker.load_model()
gpu_blocks, _ = worker.determine_num_available_blocks()
# Peak vram usage by torch should be 0.47 GiB
# Model weights take 0.25 GiB
# No memory should be allocated outside of torch
# 9.0 GiB should be the utilization target
# 8.28 GiB should be available for the KV cache
block_size = CacheEngine.get_cache_block_size(
engine_config.cache_config, engine_config.model_config,
engine_config.parallel_config)
expected_blocks = (8.28 * 1024**3) // block_size
# Check within a small tolerance for portability
# Hardware, kernel, or dependency changes could all affect memory
# utilization.
# A 100 block tolerance here should be about 60MB of wiggle room.
assert abs(gpu_blocks - expected_blocks) < 100

View File

@ -1,87 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import torch
from vllm.engine.arg_utils import EngineArgs
from vllm.sequence import ExecuteModelRequest
from vllm.utils import get_distributed_init_method, get_ip, get_open_port
from vllm.worker.worker import Worker
def test_swap() -> None:
# Configure the engine.
engine_args = EngineArgs(model="distilbert/distilgpt2",
dtype="half",
load_format="dummy")
engine_config = engine_args.create_engine_config()
engine_config.cache_config.num_gpu_blocks = 1000
engine_config.cache_config.num_cpu_blocks = 1000
# Create the worker.
distributed_init_method = get_distributed_init_method(
get_ip(), get_open_port())
worker = Worker(
vllm_config=engine_config,
local_rank=0,
rank=0,
distributed_init_method=distributed_init_method,
is_driver_worker=True,
)
# Initialize the worker.
worker.init_device()
worker.load_model()
worker.initialize_cache(
num_gpu_blocks=engine_config.cache_config.num_gpu_blocks,
num_cpu_blocks=engine_config.cache_config.num_cpu_blocks)
# Randomly initialize the cache.
gpu_cache = worker.cache_engine[0].gpu_cache
cpu_cache = worker.cache_engine[0].cpu_cache
num_layers = len(gpu_cache)
for i in range(num_layers):
gpu_key_cache, gpu_value_cache = gpu_cache[i]
gpu_key_cache.random_()
gpu_value_cache.random_()
cpu_key_cache, cpu_value_cache = cpu_cache[i]
cpu_key_cache.random_()
cpu_value_cache.random_()
allclose = lambda a, b: torch.allclose(
a.cuda(), b.cuda(), rtol=0.0, atol=0.0)
# Test swap out.
blocks_to_swap_out = [(3, 72), (56, 35), (84, 34)]
execute_model_req = ExecuteModelRequest(
seq_group_metadata_list=[],
blocks_to_swap_in=[],
blocks_to_swap_out=blocks_to_swap_out,
blocks_to_copy=[],
)
worker.execute_model(execute_model_req=execute_model_req)
for i in range(num_layers):
gpu_key_cache, gpu_value_cache = gpu_cache[i]
cpu_key_cache, cpu_value_cache = cpu_cache[i]
for src, dst in blocks_to_swap_out:
assert allclose(gpu_key_cache[src], cpu_key_cache[dst])
assert allclose(gpu_value_cache[src], cpu_value_cache[dst])
# Test swap in.
execute_model_req.blocks_to_swap_out = []
execute_model_req.blocks_to_swap_in = [
(19, 45),
(67, 23),
(12, 78),
(40, 99),
(1, 71),
]
worker.execute_model(execute_model_req=execute_model_req)
for i in range(num_layers):
gpu_key_cache, gpu_value_cache = gpu_cache[i]
cpu_key_cache, cpu_value_cache = cpu_cache[i]
for src, dst in execute_model_req.blocks_to_swap_in:
assert allclose(gpu_key_cache[dst], cpu_key_cache[src])
assert allclose(gpu_value_cache[dst], cpu_value_cache[src])