[V0 Deprecation] Remove V0 Core tests (#25082)

Signed-off-by: Woosuk Kwon <woosuk.kwon@berkeley.edu>
This commit is contained in:
Woosuk Kwon 2025-09-17 09:32:42 -07:00 committed by GitHub
parent 087c6ffc92
commit 4b946d693e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 0 additions and 5706 deletions

View File

@ -91,17 +91,6 @@ steps:
- pytest -v -s basic_correctness/test_cpu_offload.py
- VLLM_TEST_ENABLE_ARTIFICIAL_PREEMPT=1 pytest -v -s basic_correctness/test_preemption.py
- label: Core Test # 22min
timeout_in_minutes: 35
mirror_hardwares: [amdexperimental]
fast_check: true
source_file_dependencies:
- vllm/core
- vllm/distributed
- tests/core
commands:
- pytest -v -s core
- label: Entrypoints Unit Tests # 5min
timeout_in_minutes: 10
working_dir: "/vllm-workspace/tests"

View File

View File

@ -1,15 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import pytest
@pytest.fixture()
def should_do_global_cleanup_after_test() -> bool:
"""Disable the global cleanup fixture for tests in this directory. This
provides a ~10x speedup for unit tests that don't load a model to GPU.
This requires that tests in this directory clean up after themselves if they
use the GPU.
"""
return False

View File

@ -1,71 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from collections.abc import Iterable
from typing import Callable, Optional
import pytest
from vllm import LLM
from vllm.distributed import cleanup_dist_env_and_memory
from vllm.model_executor.utils import set_random_seed
@pytest.fixture
def baseline_llm_generator(common_llm_kwargs, per_test_common_llm_kwargs,
baseline_llm_kwargs, seed):
return create_llm_generator(common_llm_kwargs, per_test_common_llm_kwargs,
baseline_llm_kwargs, seed)
@pytest.fixture
def test_llm_generator(common_llm_kwargs, per_test_common_llm_kwargs,
test_llm_kwargs, seed):
return create_llm_generator(common_llm_kwargs, per_test_common_llm_kwargs,
test_llm_kwargs, seed)
def create_llm_generator(common_llm_kwargs, per_test_common_llm_kwargs,
distinct_llm_kwargs, seed):
kwargs = {
**common_llm_kwargs,
**per_test_common_llm_kwargs,
**distinct_llm_kwargs,
}
def generator_inner():
llm = LLM(**kwargs)
set_random_seed(seed)
yield llm
del llm
cleanup_dist_env_and_memory()
for llm in generator_inner():
yield llm
del llm
def get_text_from_llm_generator(llm_generator: Iterable[LLM],
prompts,
sampling_params,
llm_cb: Optional[Callable[[LLM],
None]] = None):
for llm in llm_generator:
if llm_cb:
llm_cb(llm)
outputs = llm.generate(prompts, sampling_params, use_tqdm=True)
text = [output.outputs[0].text for output in outputs]
del llm
return text
def get_token_ids_from_llm_generator(llm_generator, prompts, sampling_params):
for llm in llm_generator:
outputs = llm.generate(prompts, sampling_params, use_tqdm=True)
token_ids = [output.outputs[0].token_ids for output in outputs]
del llm
return token_ids

View File

@ -1,479 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from itertools import cycle
import pytest
from vllm import SamplingParams
from .conftest import get_token_ids_from_llm_generator
@pytest.mark.parametrize(
"common_llm_kwargs",
[{
# Use a small model for a fast test.
"model": "facebook/opt-125m",
# skip cuda graph creation for fast test.
"enforce_eager": True,
# Allow only 5 sequences of ~1024 tokens in worst case.
"block_size": 16,
"num_gpu_blocks_override": 5 * (64 + 1),
}])
@pytest.mark.parametrize("per_test_common_llm_kwargs", [{}])
@pytest.mark.parametrize("baseline_llm_kwargs", [{}])
@pytest.mark.parametrize("test_llm_kwargs", [{
"preemption_mode": "swap"
}, {
"preemption_mode": "recompute"
}])
@pytest.mark.parametrize("batch_size", [10])
@pytest.mark.parametrize("seed", [1])
def test_block_manager_with_preemption(baseline_llm_generator,
test_llm_generator, batch_size):
"""Verify block manager produces same outputs even when there is preemption.
This constructs two LLM, each with limited number of GPU blocks. The limit
is decided such that as the sequences in the batch grow, sequences must be
preempted and removed from cache.
If the output token ids are equivalent, then we have confidence that the KV
cache is not corrupted.
NOTE: We want a significant number of generated tokens so that any incorrect
KV mapping has time to build up error.
NOTE(Kuntai): Though we have removed block manager v1, this test is still
useful as it asserts the behavior of block manager v2 (now it is called
SelfAttnBlockSpaceManager) is the same when swapping / preemption, so we
keep this test.
"""
output_len = 1024
temperature = 0.0
# We want to ensure equality even with preemption.
# We force the total block size to be 1 + cdiv(output_len, block_size)
# so that only one sequence can fit at a time (once the sequences grow).
prompts = [
"Hello, my name is",
"The president of the United States is",
"The capital of France is",
"The future of AI is",
]
prompts = [prompt for prompt, _ in zip(cycle(prompts), range(batch_size))]
sampling_params = SamplingParams(
max_tokens=output_len,
ignore_eos=True,
temperature=temperature,
)
baseline_token_ids = get_token_ids_from_llm_generator(
baseline_llm_generator, prompts, sampling_params)
test_token_ids = get_token_ids_from_llm_generator(test_llm_generator,
prompts, sampling_params)
for expected_token_ids, actual_token_ids in zip(baseline_token_ids,
test_token_ids):
assert expected_token_ids == actual_token_ids
assert baseline_token_ids == test_token_ids
@pytest.mark.parametrize(
"common_llm_kwargs",
[{
# Use a small model for a fast test.
"model": "facebook/opt-125m",
# Our prompts will generate 128 tokens; since the prompts themselves are
# small, we don't need much KV space beyond 128.
"max_model_len": 160,
# skip cuda graph creation for fast test.
"enforce_eager": True,
}])
@pytest.mark.parametrize(
"per_test_common_llm_kwargs",
[
{
"block_size": 16,
# Allow only 2 sequences of ~128 tokens in worst case.
# Note 8 = 128/block_size
"num_gpu_blocks_override": 2 * (8 + 1),
},
{
"block_size": 8,
# Allow only 2 sequences of ~128 tokens in worst case.
# Note 16 = 128/block_size
"num_gpu_blocks_override": 2 * (16 + 2),
}
])
@pytest.mark.parametrize("baseline_llm_kwargs", [{
"num_lookahead_slots": 0,
}])
@pytest.mark.parametrize(
"test_llm_kwargs",
[
{
# We run one test with block_size < lookahead_slots, one test with
# block_size > lookahead_slots
"num_lookahead_slots": 10,
"preemption_mode": "swap",
},
{
"num_lookahead_slots": 10,
"preemption_mode": "recompute",
}
])
@pytest.mark.parametrize("batch_size", [4])
@pytest.mark.parametrize("seed", [1])
def test_lookahead_greedy_equality_with_preemption(baseline_llm_generator,
test_llm_generator,
batch_size):
"""Verify vLLM produces the same output with greedy sampling, when lookahead
scheduling is used vs. not.
Lookahead scheduling is not expected to modify the output, as it simply
allocates empty slots ahead of the known token ids in a sliding fashion.
This test constrains the total number of blocks to force preemption. It also
varies the block size so that the lookahead size is less than and greater
than the block size.
"""
output_len = 128
temperature = 0.0
prompts = [
"Hello, my name is",
"The president of the United States is",
"The capital of France is",
"The future of AI is",
]
prompts = [prompt for prompt, _ in zip(cycle(prompts), range(batch_size))]
sampling_params = SamplingParams(
max_tokens=output_len,
ignore_eos=True,
temperature=temperature,
)
print('Getting token ids without lookahead scheduling')
baseline_token_ids = get_token_ids_from_llm_generator(
baseline_llm_generator, prompts, sampling_params)
print('Getting token ids with lookahead scheduling')
test_token_ids = get_token_ids_from_llm_generator(test_llm_generator,
prompts, sampling_params)
for expected_token_ids, actual_token_ids in zip(baseline_token_ids,
test_token_ids):
assert expected_token_ids == actual_token_ids
assert baseline_token_ids == test_token_ids
@pytest.mark.parametrize(
"common_llm_kwargs",
[
{
# Use a small model for a fast test.
"model": "facebook/opt-125m",
# skip cuda graph creation for fast test.
"enforce_eager": True,
"enable_chunked_prefill": True,
},
])
@pytest.mark.parametrize("per_test_common_llm_kwargs",
[{
"block_size": 16,
"max_num_batched_tokens": 2,
"max_num_seqs": 2,
}, {
"block_size": 16,
"max_num_batched_tokens": 3,
"max_num_seqs": 2,
}, {
"block_size": 16,
"max_num_batched_tokens": 256,
"max_num_seqs": 10,
}])
@pytest.mark.parametrize("baseline_llm_kwargs", [
{},
])
@pytest.mark.parametrize("test_llm_kwargs", [
{
"num_lookahead_slots": 0,
},
{
"num_lookahead_slots": 5,
},
])
@pytest.mark.parametrize("batch_size", [4])
@pytest.mark.parametrize("seed", [1])
def test_chunked_prefill_block_manager(baseline_llm_generator,
test_llm_generator, batch_size):
"""Verify that chunked prefill works with SelfAttnBlockSpaceManager,
with and without lookahead scheduling.
"""
output_len = 32
temperature = 0.0
prompts = [
"Hello, my name is",
"The president of the United States is",
("1 + " * 50) + " 1 = ", # Longer prompt.
"The capital of France is",
"The future of AI is",
]
prompts = [prompt for prompt, _ in zip(cycle(prompts), range(batch_size))]
sampling_params = SamplingParams(
max_tokens=output_len,
ignore_eos=True,
temperature=temperature,
)
print('Getting token ids with BlockManager')
baseline_token_ids = get_token_ids_from_llm_generator(
baseline_llm_generator, prompts, sampling_params)
print('Getting token ids with BlockManager, with lookahead slots.')
test_token_ids = get_token_ids_from_llm_generator(test_llm_generator,
prompts, sampling_params)
for expected_token_ids, actual_token_ids in zip(baseline_token_ids,
test_token_ids):
assert expected_token_ids == actual_token_ids
assert baseline_token_ids == test_token_ids
@pytest.mark.parametrize(
"common_llm_kwargs",
[{
# Use a small model for a fast test.
"model": "facebook/opt-125m",
# skip cuda graph creation for fast test.
"enforce_eager": True,
# Allow only 5 sequences of ~1024 tokens in worst case.
"block_size": 16,
"num_gpu_blocks_override": 5 * (64 + 1),
# Enable prefill cache
"enable_prefix_caching": True,
}])
@pytest.mark.parametrize("per_test_common_llm_kwargs", [{}])
@pytest.mark.parametrize("baseline_llm_kwargs", [{}])
@pytest.mark.parametrize("test_llm_kwargs", [{
"preemption_mode": "swap"
}, {
"preemption_mode": "recompute"
}])
@pytest.mark.parametrize("batch_size", [10])
@pytest.mark.parametrize("seed", [1])
def test_block_manager_prefix_caching_enabled_with_preemption(
baseline_llm_generator, test_llm_generator, batch_size):
"""Verify block manager produces same outputs even when there is preemption.
This constructs two LLM, each with limited number of GPU blocks. The limit
is decided such that as the sequences in the batch grow, sequences must be
preempted and removed from cache.
If the output token ids are equivalent, then we have confidence that the KV
cache is not corrupted.
NOTE: We want a significant number of generated tokens so that any incorrect
KV mapping has time to build up error.
NOTE(Kuntai): Though we have removed block manager v1, this test is still
useful as it asserts the behavior of block manager v2 (now it is called
SelfAttnBlockSpaceManager) is the same when swapping / preemption, so we
keep this test.
"""
output_len = 1024
temperature = 0.0
# We want to ensure equality even with preemption.
# We force the total block size to be 1 + cdiv(output_len, block_size)
# so that only one sequence can fit at a time (once the sequences grow).
prompts = [
"Hello, my name is",
"The president of the United States is",
"The capital of France is",
"The future of AI is",
]
prompts = [prompt for prompt, _ in zip(cycle(prompts), range(batch_size))]
sampling_params = SamplingParams(
max_tokens=output_len,
ignore_eos=True,
temperature=temperature,
)
print('Getting token ids from block manager')
baseline_token_ids = get_token_ids_from_llm_generator(
baseline_llm_generator, prompts, sampling_params)
print('Getting token ids from block manager, with preemption')
test_token_ids = get_token_ids_from_llm_generator(test_llm_generator,
prompts, sampling_params)
for expected_token_ids, actual_token_ids in zip(baseline_token_ids,
test_token_ids):
assert expected_token_ids == actual_token_ids
assert baseline_token_ids == test_token_ids
@pytest.mark.parametrize(
"common_llm_kwargs",
[{
# Use a small model for a fast test.
"model": "facebook/opt-125m",
# skip cuda graph creation for fast test.
"enforce_eager": True,
# Allow only 5 sequences of ~1024 tokens in worst case.
"block_size": 16,
"num_gpu_blocks_override": 5 * (64 + 1),
}])
@pytest.mark.parametrize("per_test_common_llm_kwargs", [{}])
@pytest.mark.parametrize("baseline_llm_kwargs", [{
"enable_prefix_caching": False
}])
@pytest.mark.parametrize("test_llm_kwargs", [{
"enable_prefix_caching": True,
"preemption_mode": "swap"
}, {
"enable_prefix_caching": True,
"preemption_mode": "recompute"
}])
@pytest.mark.parametrize("batch_size", [10])
@pytest.mark.parametrize("seed", [1])
def test_auto_prefix_caching_with_preemption(baseline_llm_generator,
test_llm_generator, batch_size):
"""Verify block manager v2 with auto prefix caching enabled produces same
outputs as auto prefix caching disabled, even when there is preemption.
This constructs two LLM, each with limited number of GPU blocks. The limit
is decided such that as the sequences in the batch grow, sequences must be
preempted and removed from cache.
If the output token ids are equivalent, then we have confidence that auto
prefix caching itself at least don't cause result error.
"""
output_len = 1024
temperature = 0.0
# We want to ensure equality even with preemption.
# We force the total block size to be 1 + cdiv(output_len, block_size)
# so that only one sequence can fit at a time (once the sequences grow).
prompts = [
"Hello, my name is",
"The president of the United States is",
"The capital of France is",
"The future of AI is",
]
prompts = [prompt for prompt, _ in zip(cycle(prompts), range(batch_size))]
sampling_params = SamplingParams(
max_tokens=output_len,
ignore_eos=True,
temperature=temperature,
)
print('Getting token ids with APC disabled')
baseline_token_ids = get_token_ids_from_llm_generator(
baseline_llm_generator, prompts, sampling_params)
print('Getting token ids with APC enabled')
test_token_ids = get_token_ids_from_llm_generator(test_llm_generator,
prompts, sampling_params)
for expected_token_ids, actual_token_ids in zip(baseline_token_ids,
test_token_ids):
assert expected_token_ids == actual_token_ids
assert baseline_token_ids == test_token_ids
@pytest.mark.parametrize(
"common_llm_kwargs",
[{
# Use a small model for a fast test.
"model": "facebook/opt-125m",
# skip cuda graph creation for fast test.
"enforce_eager": True,
# we keep the blocks small, so that hit eviction quickly
"max_model_len": 48,
"block_size": 16,
"num_gpu_blocks_override": 3,
}])
@pytest.mark.parametrize("per_test_common_llm_kwargs", [{}])
@pytest.mark.parametrize("baseline_llm_kwargs", [{
"enable_prefix_caching": False
}])
@pytest.mark.parametrize("test_llm_kwargs", [{
"enable_prefix_caching": True,
}])
@pytest.mark.parametrize("seed", [1])
def test_auto_prefix_caching_after_eviction_start(baseline_llm_generator,
test_llm_generator):
"""Verify block manager v2 with auto prefix caching could work normally
even when eviction started.
With APC enabled, all blocks are held by native block at the beginning.
Then blocks are managed by evictor instead. If cache hit at the evictor's
block, then it could be reused, or we need to recompute its kv cache.
"""
output_len = 10
temperature = 0.0
prompts = [
"You are a helpful assistant. Please answer truthfully and write "
"out your thinking step by step to be sure you get the right answer. "
"If you make a mistake, attempt to correct it. who are you?",
"You are a helpful assistant. Please answer truthfully and write out "
"your thinking step by step to be sure you get the right answer. You "
"are helpful and harmless and you follow ethical guidelines. "
"who are you?"
]
sampling_params = SamplingParams(
max_tokens=output_len,
ignore_eos=True,
temperature=temperature,
)
print('Getting token ids with APC disabled')
baseline_token_ids = get_token_ids_from_llm_generator(
baseline_llm_generator, prompts, sampling_params)
print('Getting token ids with APC enabled')
test_token_ids = get_token_ids_from_llm_generator(test_llm_generator,
prompts, sampling_params)
for expected_token_ids, actual_token_ids in zip(baseline_token_ids,
test_token_ids):
assert expected_token_ids == actual_token_ids
assert baseline_token_ids == test_token_ids

View File

@ -1,185 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import random
import pytest
from tests.kernels.utils import override_backend_env_variable
from vllm import LLM, SamplingParams
from vllm.platforms import current_platform
from .conftest import get_text_from_llm_generator
# relatively small model with 4k sliding window
MODEL = "bigcode/starcoder2-3b"
BLOCK_SIZE = 16
@pytest.mark.parametrize(
"common_llm_kwargs",
[{
"model": MODEL,
# skip cuda graph creation for fast test.
"enforce_eager": True,
"block_size": BLOCK_SIZE,
# needed due to https://github.com/vllm-project/vllm/issues/1908#issuecomment-2101122008
"num_gpu_blocks_override": 100000 // BLOCK_SIZE,
}])
@pytest.mark.parametrize("per_test_common_llm_kwargs", [{}])
@pytest.mark.parametrize("baseline_llm_kwargs", [{}])
@pytest.mark.parametrize("test_llm_kwargs", [{}])
@pytest.mark.parametrize("batch_size", [5])
@pytest.mark.parametrize("seed", [1])
@pytest.mark.parametrize("backend", ["FLASH_ATTN", "XFORMERS"])
def test_sliding_window_retrieval(baseline_llm_generator, test_llm_generator,
batch_size, seed, backend, monkeypatch):
"""
The test does a bunch of assignments "x1 = 10\nx2 = 33\n..." and then
asks for value of one of them (which is outside the sliding window).
If we tell it upfront which we are going to be looking for, then
it answers correctly (mostly).
Additionally, we compare the results of the v1 and v2 managers.
"""
if backend == "XFORMERS" and current_platform.is_rocm():
pytest.skip("Xformers does not support ROCm/HIP.")
override_backend_env_variable(monkeypatch, backend)
sampling_params = SamplingParams(
max_tokens=1024,
ignore_eos=True,
temperature=0.0,
)
prompts, answer, indices = prep_prompts(batch_size)
baseline_texts = get_text_from_llm_generator(baseline_llm_generator,
prompts,
sampling_params,
llm_cb=check_window(prompts))
check_answers(indices, answer, baseline_texts)
print('Getting token ids from block manager v2')
test_texts = get_text_from_llm_generator(test_llm_generator, prompts,
sampling_params)
check_answers(indices, answer, test_texts)
cmp = [
expected_text == actual_text
for expected_text, actual_text in zip(baseline_texts, test_texts)
]
print(cmp)
# make sure it's mostly OK; this is possibly because https://github.com/vllm-project/vllm/pull/4768
# however, https://github.com/vllm-project/vllm/issues/3385#issuecomment-1995924290
# states that xformers and flash_attn have different ideas about the window
# size anyways
assert sum(cmp) > 0.7 * len(cmp)
@pytest.mark.parametrize(
"common_llm_kwargs",
[{
"model": MODEL,
# skip cuda graph creation for fast test.
"enforce_eager": True,
"block_size": BLOCK_SIZE,
"num_gpu_blocks_override": 100000 // BLOCK_SIZE,
}])
@pytest.mark.parametrize("per_test_common_llm_kwargs", [{}])
@pytest.mark.parametrize("test_llm_kwargs", [{"enable_chunked_prefill": True}])
@pytest.mark.parametrize("batch_size", [5])
@pytest.mark.parametrize("seed", [1])
@pytest.mark.parametrize("backend", ["FLASH_ATTN", "XFORMERS"])
def test_sliding_window_chunked_prefill(test_llm_generator, batch_size, seed,
backend, monkeypatch):
"""
This is similar to test_sliding_window_retrieval, however, it doesn't
compare against the v1 block manager since v1 doesn't support
chunked prefill with sliding window.
The results with and without chunked prefill are not the same due to
numerical instabilities.
"""
if backend == "XFORMERS" and current_platform.is_rocm():
pytest.skip("Xformers does not support ROCm/HIP.")
override_backend_env_variable(monkeypatch, backend)
sampling_params = SamplingParams(
max_tokens=10,
ignore_eos=True,
temperature=0.0,
)
prompts, answer, indices = prep_prompts(batch_size)
# We don't compare with the baseline model here, since the results
# slightly different due to different tailing in attention.
test_texts = get_text_from_llm_generator(test_llm_generator,
prompts,
sampling_params,
llm_cb=check_window(prompts))
check_answers(indices, answer, test_texts)
def prep_prompts(batch_size: int, ln_range: tuple[int, int] = (800, 1100)):
"""
Generate prompts which a bunch of assignments,
then asking for the value of one of them.
The prompt is just under 10k tokens; sliding window is 4k
so the answer is outside sliding window, but should still be correct.
Args:
batch_size: number of prompts to generate
ln_range: an argument to control the length of the prompt
"""
prompts: list[str] = []
answer: list[int] = []
indices: list[int] = []
random.seed(1)
for _ in range(batch_size):
idx = random.randint(30, 90)
indices.append(idx)
prompt = "```python\n# We set a number of variables, " + \
f"x{idx} will be important later\n"
ln = random.randint(*ln_range)
for k in range(30, ln):
v = random.randint(10, 99)
if k == idx:
answer.append(v)
prompt += f"x{k} = {v}\n"
prompt += f"# Now, we check the value of x{idx}:\n"
prompt += f"assert x{idx} == "
prompts.append(prompt)
return prompts, answer, indices
def check_answers(indices: list[int],
answer: list[int],
outputs: list[str],
accept_rate: float = 0.7):
answer2 = [int(text[0:2].strip()) for text in outputs]
print(list(zip(indices, zip(answer, answer2))))
numok = 0
for a1, a2 in zip(answer, answer2):
if a1 == a2:
numok += 1
frac_ok = numok / len(answer)
print(f"Num OK: {numok}/{len(answer)} {frac_ok}")
assert frac_ok >= accept_rate
def check_window(prompts: list[str]):
def inner(llm: LLM):
sliding_window = llm.llm_engine.model_config.get_sliding_window()
assert sliding_window and sliding_window > 0
assert any(
len(llm.get_tokenizer().tokenize(prompt)) > sliding_window
for prompt in prompts)
return inner

View File

@ -1,341 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import pytest
from vllm.core.block_manager import SelfAttnBlockSpaceManager
from vllm.core.interfaces import AllocStatus
from vllm.sequence import Logprob, SequenceStatus
from vllm.utils import chunk_list
from ..utils import create_dummy_prompt, create_seq_group
@pytest.mark.parametrize("block_size", [16])
@pytest.mark.parametrize("num_gpu_blocks", [8, 40, 80])
@pytest.mark.parametrize("num_seqs_per_group", [1, 4])
@pytest.mark.parametrize("watermark", [0.0, 0.5])
def test_can_allocate_seq_group(block_size: int, num_seqs_per_group: int,
num_gpu_blocks: int, watermark: float):
block_manager = SelfAttnBlockSpaceManager(
block_size=block_size,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=1024,
watermark=watermark,
)
num_watermark_blocks = int(watermark * num_gpu_blocks)
num_output_blocks_per_seq = 1
# NOTE: This should be num_output_blocks_per_seq * num_seqs_per_group, but
# the current implementation assumes all seqs are new prompts / don't have
# different output lens.
num_output_blocks = num_output_blocks_per_seq
for num_prompt_blocks in range(1, num_gpu_blocks - num_output_blocks):
seq_group = create_seq_group(
seq_prompt_len=block_size * num_prompt_blocks,
seq_output_lens=[
block_size * num_output_blocks_per_seq
for _ in range(num_seqs_per_group)
],
)
assert num_prompt_blocks + num_output_blocks <= num_gpu_blocks
can_allocate_result = block_manager.can_allocate(seq_group)
num_required_blocks = num_prompt_blocks + num_output_blocks
if num_gpu_blocks - num_required_blocks < num_watermark_blocks:
assert can_allocate_result == AllocStatus.NEVER
elif num_gpu_blocks >= num_required_blocks:
assert can_allocate_result == AllocStatus.OK
else:
assert can_allocate_result == AllocStatus.LATER
@pytest.mark.parametrize("block_size", [1, 8])
@pytest.mark.parametrize("prompt_len", [1, 7, 8])
@pytest.mark.parametrize("num_slots_to_append", [1, 8, 129])
@pytest.mark.parametrize("num_lookahead_slots", [0, 10])
def test_append_slots(block_size, prompt_len, num_slots_to_append,
num_lookahead_slots):
"""Verify append_slots consumes the correct number of blocks from the block
table.
"""
num_gpu_blocks = 1024
watermark = 0.1
block_manager = SelfAttnBlockSpaceManager(
block_size=block_size,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=0,
watermark=watermark,
)
seq_group = create_seq_group(
seq_prompt_len=prompt_len,
seq_output_lens=[0],
)
# Allocate seq
assert block_manager.can_allocate(seq_group)
block_manager.allocate(seq_group)
# Seq seq to RUNNING
seq = seq_group.get_seqs()[0]
seq.status = SequenceStatus.RUNNING
# Append tokens to the sequeqnce
for token_id in range(num_slots_to_append):
seq.append_token_id(token_id, {token_id: Logprob(0.0)})
# Append slots for new tokens and lookahead slots.
free_blocks_before_append = block_manager.get_num_free_gpu_blocks()
block_manager.append_slots(seq, num_lookahead_slots)
num_consumed_blocks = (free_blocks_before_append -
block_manager.get_num_free_gpu_blocks())
# Expect consumed blocks to be new blocks required to support the new slots.
expected_consumed_blocks = len(
list(
chunk_list(
list(
range(prompt_len + num_slots_to_append +
num_lookahead_slots)),
block_size))) - len(
list(chunk_list(list(range(prompt_len)), block_size)))
assert num_consumed_blocks == expected_consumed_blocks
@pytest.mark.parametrize("block_size", [8])
@pytest.mark.parametrize("num_cpu_blocks", [4])
@pytest.mark.parametrize("num_gpu_blocks", [4])
@pytest.mark.parametrize("num_lookahead_slots", [0, 2, 10])
@pytest.mark.parametrize("enable_caching", [False, True])
def test_swap(block_size, num_cpu_blocks, num_gpu_blocks, num_lookahead_slots,
enable_caching):
"""Verify blocks number on src/desc device is correct after swapping in/out
sequence group (not missing or extra blocks).
"""
block_manager = SelfAttnBlockSpaceManager(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0,
enable_caching=enable_caching)
prompt, seq_group = create_dummy_prompt("1", prompt_length=block_size - 1)
prompt.status = SequenceStatus.WAITING
block_manager.allocate(seq_group)
# Emulate a forward pass by appending a single token.
# The block manager then knows how many unprocessed
# tokens will be written in the next forward pass.
token_id = 0
prompt.status = SequenceStatus.RUNNING
prompt.append_token_id(token_id, {token_id: Logprob(0.0)})
# Swap seq group from GPU -> CPU.
gpu_blocks = block_manager.get_block_table(prompt)
assert block_manager.can_swap_out(seq_group)
before_cpu_blocks = block_manager.get_num_free_cpu_blocks()
before_gpu_blocks = block_manager.get_num_free_gpu_blocks()
mapping = block_manager.swap_out(seq_group)
mapping_keys = [key for key, _ in mapping]
assert mapping_keys == gpu_blocks
after_cpu_blocks = block_manager.get_num_free_cpu_blocks()
after_gpu_blocks = block_manager.get_num_free_gpu_blocks()
assert before_cpu_blocks == after_cpu_blocks + len(gpu_blocks)
assert before_gpu_blocks + len(gpu_blocks) == after_gpu_blocks
prompt.status = SequenceStatus.SWAPPED
# Swap seq group from CPU -> GPU.
assert block_manager.can_swap_in(seq_group, num_lookahead_slots)
before_cpu_blocks = block_manager.get_num_free_cpu_blocks()
before_gpu_blocks = block_manager.get_num_free_gpu_blocks()
mapping = block_manager.swap_in(seq_group)
cpu_blocks = block_manager.get_block_table(prompt)
mapping_keys = [key for key, _ in mapping]
assert mapping_keys == [cpu_blocks[0]]
after_cpu_blocks = block_manager.get_num_free_cpu_blocks()
after_gpu_blocks = block_manager.get_num_free_gpu_blocks()
assert before_gpu_blocks == after_gpu_blocks + len(cpu_blocks)
@pytest.mark.parametrize("block_size", [8])
@pytest.mark.parametrize("num_gpu_blocks", [4])
@pytest.mark.parametrize("num_lookahead_slots", [3, 8, 10])
@pytest.mark.parametrize("enable_caching", [True, False])
def test_can_swap(block_size, num_gpu_blocks, num_lookahead_slots,
enable_caching):
""" Verify the block manager can correctly determine if a sequence group
can be swapped in/out.
"""
num_cpu_blocks = num_gpu_blocks
block_manager = SelfAttnBlockSpaceManager(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0,
enable_caching=enable_caching)
prompt, seq_group = create_dummy_prompt(
"1", prompt_length=(num_gpu_blocks - 1) * block_size - 1)
prompt.status = SequenceStatus.WAITING
block_manager.allocate(seq_group)
prompt.status = SequenceStatus.RUNNING
# Swap seq group from GPU -> CPU.
gpu_blocks = block_manager.get_block_table(prompt)
assert block_manager.can_swap_out(seq_group)
before_cpu_blocks = block_manager.get_num_free_cpu_blocks()
before_gpu_blocks = block_manager.get_num_free_gpu_blocks()
mapping = block_manager.swap_out(seq_group)
mapping_keys = [key for key, _ in mapping]
assert mapping_keys == gpu_blocks
after_cpu_blocks = block_manager.get_num_free_cpu_blocks()
after_gpu_blocks = block_manager.get_num_free_gpu_blocks()
assert before_cpu_blocks == after_cpu_blocks + len(gpu_blocks)
assert before_gpu_blocks + len(gpu_blocks) == after_gpu_blocks
prompt.status = SequenceStatus.SWAPPED
# At this moment, we still have enough free blocks to swap in the seq group.
if num_lookahead_slots <= block_size:
assert block_manager.can_swap_in(seq_group,
num_lookahead_slots) == AllocStatus.OK
else:
assert block_manager.can_swap_in(
seq_group, num_lookahead_slots) == AllocStatus.NEVER
# During Swapped out, 2 cached blocks were evicted from the GPU,
# so the prompt1 can't be swapped in
prompt2_len = 2 * block_size - 1
prompt2, seq_group2 = create_dummy_prompt(
"2",
prompt_length=prompt2_len,
prompt_tokens=[10000 + i for i in range(prompt2_len)])
prompt2.status = SequenceStatus.WAITING
block_manager.allocate(seq_group2)
# Swap seq group from CPU -> GPU.
if num_lookahead_slots <= block_size:
assert block_manager.can_swap_in(
seq_group, num_lookahead_slots) == AllocStatus.LATER
else:
assert block_manager.can_swap_in(
seq_group, num_lookahead_slots) == AllocStatus.NEVER
@pytest.mark.parametrize("num_lookahead_slots", [0, 2, 10])
@pytest.mark.parametrize("enable_caching", [False, True])
def test_swap_in_infeasible(num_lookahead_slots, enable_caching):
"""Verifies that swapping fails if there is not enough free blocks
to account for unseen tokens and lookahead_slots.
"""
block_size = 8
num_cpu_blocks = 1
num_gpu_blocks = 1
block_manager = SelfAttnBlockSpaceManager(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0,
enable_caching=enable_caching)
prompt_length = block_size - 3
assert prompt_length > 0
prompt, seq_group = create_dummy_prompt("1", prompt_length=prompt_length)
prompt.status = SequenceStatus.WAITING
block_manager.allocate(seq_group)
# Emulate a forward pass by appending a single token.
# The block manager then knows how many unprocessed
# tokens will be written in the next forward pass.
token_id = 0
prompt.status = SequenceStatus.RUNNING
prompt.append_token_id(token_id, {token_id: Logprob(0.0)})
# Swap seq group from GPU -> CPU.
assert block_manager.can_swap_out(seq_group)
block_manager.swap_out(seq_group)
prompt.status = SequenceStatus.SWAPPED
# Swap seq group from CPU -> GPU.
# The number of unseen tokens is 1. If the number of existing
# tokens plus the unseen ones and number of lookahead slots exceeds
# the total number of available GPU blocks then the swap
# should fail.
num_unseen_tokens = 1
if (num_lookahead_slots + num_unseen_tokens +
prompt_length) <= (block_size * num_gpu_blocks):
assert block_manager.can_swap_in(seq_group,
num_lookahead_slots) == AllocStatus.OK
else:
assert block_manager.can_swap_in(
seq_group, num_lookahead_slots) == AllocStatus.NEVER
# TODO(cade/kaiyang): add comprehensive tests for swapping at allocator level.
@pytest.mark.parametrize("block_size", [8, 16])
@pytest.mark.parametrize("prompt_len", [10, 300, 1000])
@pytest.mark.parametrize("num_slots_to_append", [50])
@pytest.mark.parametrize("sliding_window", [20, 32, 200, 512])
def test_sliding_window(block_size, prompt_len, num_slots_to_append,
sliding_window):
"""Verify append_slots consumes the correct number of blocks from the block
table.
"""
num_gpu_blocks = 1024
watermark = 0.1
block_manager = SelfAttnBlockSpaceManager(
block_size=block_size,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=0,
watermark=watermark,
sliding_window=sliding_window,
)
def check_used(min_n, max_n=None):
if max_n is None:
max_n = min_n
used = num_gpu_blocks - block_manager.get_num_free_gpu_blocks()
assert min_n <= used
assert used <= max_n
def num_blocks(num_tokens):
return (num_tokens + block_size - 1) // block_size
check_used(0)
seq_group = create_seq_group(
seq_prompt_len=prompt_len,
seq_output_lens=[0],
)
check_used(0)
# Allocate seq
assert block_manager.can_allocate(seq_group)
block_manager.allocate(seq_group)
check_used(num_blocks(prompt_len))
# Seq seq to RUNNING
seq = seq_group.get_seqs()[0]
seq.status = SequenceStatus.RUNNING
seq.data.update_num_computed_tokens(prompt_len)
check_used(num_blocks(prompt_len))
# this is how we compute it in SelfAttnBlockSpaceManager.__init__
sliding_blocks = (sliding_window // block_size) + 2
# plus one block for null block
sliding_blocks += 1
# Append tokens to the sequeqnce
for token_id in range(num_slots_to_append):
seq.append_token_id(token_id, {token_id: Logprob(0.0)})
seq.data.update_num_computed_tokens(1)
block_manager.append_slots(seq, num_lookahead_slots=0)
if prompt_len < sliding_window + 10:
check_used(0, sliding_blocks + 1)
else:
check_used(sliding_blocks, sliding_blocks + 1)

View File

@ -1,577 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import pytest
from vllm.core.block.block_table import BlockTable
from vllm.core.block.cpu_gpu_block_allocator import CpuGpuBlockAllocator
from vllm.utils import Device, cdiv, chunk_list
@pytest.mark.parametrize("block_size", [16])
@pytest.mark.parametrize("sequence_len", [1, 16, 129])
def test_allocate_naive(block_size: int, sequence_len: int):
"""Test the allocation of blocks using the naive allocator.
This test creates a CpuGpuBlockAllocator with the specified block size and
number of blocks. It then allocates multiple BlockTables with varying
sequence lengths and verifies that the number of free blocks decreases as
expected after each allocation.
"""
assert block_size > 1
num_gpu_blocks = 1024
allocator = CpuGpuBlockAllocator.create(
allocator_type="naive",
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=1024,
block_size=block_size,
)
token_ids = list(range(sequence_len))
num_blocks_per_alloc = len(list(chunk_list(token_ids, block_size)))
block_tables: list[BlockTable] = []
for i in range(5):
assert allocator.get_num_free_blocks(
device=Device.GPU) == num_gpu_blocks - i * num_blocks_per_alloc
block_tables.append(
BlockTable(
block_size=block_size,
block_allocator=allocator,
))
block_tables[-1].allocate(token_ids=token_ids, device=Device.GPU)
@pytest.mark.parametrize("block_size", [16])
@pytest.mark.parametrize("sequence_len", [1, 16, 129])
def test_allocate_prefix_caching(block_size: int, sequence_len: int):
"""Test the allocation of blocks using the prefix caching allocator.
This test creates a CpuGpuBlockAllocator with the specified block size and
number of blocks, using the prefix caching allocator. It then allocates
multiple BlockTables with varying sequence lengths and verifies that the
number of free blocks decreases as expected after each allocation.
The test expects all sequences to share allocations, except for their last
block, which may be mutable. It calculates the expected number of immutable
and mutable blocks per allocation based on the sequence length and block
size.
"""
assert block_size > 1
num_gpu_blocks = 1024
allocator = CpuGpuBlockAllocator.create(
allocator_type="prefix_caching",
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=1024,
block_size=block_size,
)
token_ids = list(range(sequence_len))
chunked_tokens = list(chunk_list(token_ids, block_size))
num_mutable_blocks_per_alloc = 0 if len(
chunked_tokens[-1]) == block_size else 1
num_immutable_blocks_per_alloc = len(
chunked_tokens) - num_mutable_blocks_per_alloc
block_tables: list[BlockTable] = []
for alloc_i in range(1, 6):
block_tables.append(
BlockTable(
block_size=block_size,
block_allocator=allocator,
))
block_tables[-1].allocate(token_ids=token_ids, device=Device.GPU)
# Expect all sequences to share allocations, except for their last block
# (which may be mutable).
assert allocator.get_num_free_blocks(
device=Device.GPU) == num_gpu_blocks - (
num_immutable_blocks_per_alloc + num_mutable_blocks_per_alloc *
(alloc_i))
@pytest.mark.parametrize("block_size", [16])
@pytest.mark.parametrize("sequence_len", [1, 16, 129])
@pytest.mark.parametrize("allocator_type", ["naive", "prefix_caching"])
@pytest.mark.parametrize("device", ["cpu", "gpu"])
def test_allocate_free(block_size: int, sequence_len: int, allocator_type: str,
device: str):
"""Test the allocation and freeing of blocks using different allocators and
devices.
This test creates a CpuGpuBlockAllocator with the specified block size,
number of blocks, allocator type, and device. It then allocates a BlockTable
multiple times with the same sequence and verifies that the number of free
blocks remains consistent after each allocation and freeing.
"""
device = Device[device.upper()]
num_device_blocks = 1024
allocator = CpuGpuBlockAllocator.create(
allocator_type=allocator_type,
num_gpu_blocks=num_device_blocks,
num_cpu_blocks=num_device_blocks,
block_size=block_size,
)
token_ids = list(range(sequence_len))
num_blocks_per_alloc = len(list(chunk_list(token_ids, block_size)))
block_table = BlockTable(
block_size=block_size,
block_allocator=allocator,
)
for i in range(5):
block_table.allocate(token_ids=token_ids, device=device)
assert allocator.get_num_free_blocks(
device) == num_device_blocks - num_blocks_per_alloc
assert all(block_id is not None
for block_id in block_table.physical_block_ids)
block_table.free()
assert allocator.get_num_free_blocks(device) == num_device_blocks
@pytest.mark.parametrize("block_size", [1, 8])
@pytest.mark.parametrize("sequence_len", [1, 16, 129])
@pytest.mark.parametrize("append_len", [1, 16, 129])
@pytest.mark.parametrize("allocator_type", ["naive", "prefix_caching"])
def test_append_token_ids_allocation(block_size: int, sequence_len: int,
append_len: int, allocator_type: str):
"""Test the allocation behavior when appending token IDs to a BlockTable.
This test creates a CpuGpuBlockAllocator with the specified block size,
number of blocks, and allocator type. It then allocates a BlockTable with an
initial sequence and appends additional token IDs to it. The test verifies
that the number of allocated blocks before and after appending matches the
expected values.
"""
num_gpu_blocks = 1024
allocator = CpuGpuBlockAllocator.create(
allocator_type=allocator_type,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=1024,
block_size=block_size,
)
token_ids = list(range(sequence_len))
token_ids_to_append = list(range(append_len))
block_table = BlockTable(
block_size=block_size,
block_allocator=allocator,
)
num_expected_blocks_before_append = len(
list(chunk_list(token_ids, block_size)))
num_expected_appended_blocks = len(
list(chunk_list(token_ids + token_ids_to_append,
block_size))) - num_expected_blocks_before_append
block_table.allocate(token_ids=token_ids, device=Device.GPU)
assert len(
block_table.physical_block_ids) == num_expected_blocks_before_append
block_table.append_token_ids(token_ids_to_append)
assert len(
block_table.physical_block_ids
) == num_expected_blocks_before_append + num_expected_appended_blocks
@pytest.mark.parametrize("block_size", [1, 8])
@pytest.mark.parametrize("sequence_len", [1, 16, 129])
@pytest.mark.parametrize("num_empty_slots", [1, 16, 129])
@pytest.mark.parametrize("allocator_type", ["naive", "prefix_caching"])
def test_ensure_num_empty_slots_allocation(block_size: int, sequence_len: int,
num_empty_slots: int,
allocator_type: str):
"""Test the allocation behavior when ensuring a certain number of empty
slots in a BlockTable.
This test creates a CpuGpuBlockAllocator with the specified block size,
number of blocks, and allocator type. It then allocates a BlockTable with an
initial sequence and ensures a certain number of empty slots. The test
verifies that the number of allocated blocks before and after ensuring empty
slots matches the expected values. It also checks that filling up the empty
slots does not consume additional blocks.
"""
num_gpu_blocks = 1024
allocator = CpuGpuBlockAllocator.create(
allocator_type=allocator_type,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=1024,
block_size=block_size,
)
token_ids = list(range(sequence_len))
block_table = BlockTable(
block_size=block_size,
block_allocator=allocator,
)
num_expected_blocks_before_append = len(
list(chunk_list(token_ids, block_size)))
num_expected_appended_blocks = len(
list(chunk_list(token_ids + [-1] * num_empty_slots,
block_size))) - num_expected_blocks_before_append
block_table.allocate(token_ids=token_ids, device=Device.GPU)
# Assert that the empty slots consume the expected number of additional
# blocks.
assert len(
block_table.physical_block_ids) == num_expected_blocks_before_append
block_table.ensure_num_empty_slots(num_empty_slots)
assert len(
block_table.physical_block_ids
) == num_expected_blocks_before_append + num_expected_appended_blocks
# Now, ensure no additional blocks consumed as we fill up the empty slots.
num_free_blocks = allocator.get_num_free_blocks(device=Device.GPU)
block_table.append_token_ids(token_ids=list(range(num_empty_slots)))
assert num_free_blocks == allocator.get_num_free_blocks(device=Device.GPU)
@pytest.mark.parametrize("block_size", [1, 8])
@pytest.mark.parametrize("sequence_len", [1, 9])
@pytest.mark.parametrize("append_len", [1, 16, 129])
@pytest.mark.parametrize("append_size", [1, 4, 129])
@pytest.mark.parametrize("allocator_type", ["naive", "prefix_caching"])
def test_append_token_ids_correct_content(block_size: int, sequence_len: int,
append_len: int, allocator_type: str,
append_size: int):
"""Verify token ids are correctly appended. Appends various amounts of
token ids in various append sizes, and verifies the final sequence is
correct.
"""
num_gpu_blocks = 1024
allocator = CpuGpuBlockAllocator.create(
allocator_type=allocator_type,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=1024,
block_size=block_size,
)
token_ids = list(range(sequence_len))
token_ids_to_append = list(range(append_len))
block_table = BlockTable(
block_size=block_size,
block_allocator=allocator,
)
block_table.allocate(token_ids=token_ids, device=Device.GPU)
appended_so_far: list[int] = []
for append in chunk_list(token_ids_to_append, append_size):
block_table.append_token_ids(append)
appended_so_far.extend(append)
assert block_table._get_all_token_ids() == token_ids + appended_so_far
assert block_table._get_all_token_ids() == token_ids + token_ids_to_append
@pytest.mark.parametrize("seq_len", [1, 9, 129])
@pytest.mark.parametrize("block_size", [1, 8])
@pytest.mark.parametrize("allocator_type", ["naive", "prefix_caching"])
def test_fork(seq_len: int, block_size: int, allocator_type: str):
"""Create a sequence using the specified allocator.
1. Assert that after forking the sequence, the free block count is the
same.
2. Assert that the forked sequence has the same physical mappings.
3. Then free the original sequence; verify that the free block count is
the same.
4. Finally, free the forked sequence and verify that the free block
count drops to zero.
"""
num_gpu_blocks = 1024
allocator = CpuGpuBlockAllocator.create(
allocator_type=allocator_type,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=0,
block_size=block_size,
)
token_ids = list(range(seq_len))
block_table = BlockTable(
block_size=block_size,
block_allocator=allocator,
)
block_table.allocate(token_ids)
num_free_blocks_before_fork = allocator.get_num_free_blocks(
device=Device.GPU)
forked_block_table = block_table.fork()
# Expect physical_block_ids and token_ids to match.
assert (block_table.physical_block_ids ==
forked_block_table.physical_block_ids)
assert block_table._get_all_token_ids(
) == forked_block_table._get_all_token_ids()
# Do not expect any additional allocations.
assert allocator.get_num_free_blocks(
device=Device.GPU) == num_free_blocks_before_fork
# Free the original blocks. Assert num free blocks does not change, since
# refcount is nonzero.
block_table.free()
assert allocator.get_num_free_blocks(
device=Device.GPU) == num_free_blocks_before_fork
# Expect the forked block table to be unaffected by the free.
assert all(block_id is not None
for block_id in forked_block_table.physical_block_ids)
# Free the forked blocks. Assert num free blocks does change, since
# refcount is now zero.
forked_block_table.free()
assert allocator.get_num_free_blocks(device=Device.GPU) == num_gpu_blocks
@pytest.mark.parametrize("block_size", [8])
@pytest.mark.parametrize("sequence_len", [1, 16, 129])
@pytest.mark.parametrize("append_len", [1, 16, 129])
@pytest.mark.parametrize("appender", ["forked", "original"])
@pytest.mark.parametrize("allocator_type", ["naive", "prefix_caching"])
def test_cow(block_size: int, sequence_len: int, append_len: int,
allocator_type: str, appender: str):
"""Fork a sequence; append to the forked sequence; verify there's a CoW.
"""
num_gpu_blocks = 1024
allocator = CpuGpuBlockAllocator.create(
allocator_type=allocator_type,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=0,
block_size=block_size,
)
token_ids = list(range(sequence_len))
token_ids_to_append = list(range(append_len))
original_block_table = BlockTable(
block_size=block_size,
block_allocator=allocator,
)
num_expected_non_cow_blocks = cdiv(sequence_len, block_size)
num_expected_cow_blocks = cdiv(sequence_len + append_len,
block_size) - (sequence_len // block_size)
original_block_table.allocate(token_ids=token_ids, device=Device.GPU)
original_block_ids = original_block_table.physical_block_ids[:]
print("original_block_ids = {}".format(original_block_ids))
forked_block_table = original_block_table.fork()
# Expect no additional allocation (copy on _write_).
assert allocator.get_num_free_blocks(
Device.GPU) == (num_gpu_blocks - num_expected_non_cow_blocks)
if appender == "forked":
appender_block_table = forked_block_table
static_block_table = original_block_table
elif appender == "original":
appender_block_table = original_block_table
static_block_table = forked_block_table
else:
raise ValueError(f"unknown test config {appender=}")
# Write tokens.
appender_block_table.append_token_ids(token_ids_to_append)
# Expect the non-appending block table to have no change.
assert static_block_table.physical_block_ids == original_block_ids
assert appender_block_table.physical_block_ids != original_block_ids
# Expect the blocks changed during append to have a CoW.
assert allocator.get_num_free_blocks(
Device.GPU) == num_gpu_blocks - (num_expected_non_cow_blocks +
num_expected_cow_blocks)
cows = allocator.clear_copy_on_writes()
if sequence_len % block_size > 0:
# If the last block in the sequence is not full, then when appending we
# expect a CoW.
assert cows
cow_block_id = sequence_len // block_size
expected_src = static_block_table.physical_block_ids[cow_block_id]
expected_dst = appender_block_table.physical_block_ids[cow_block_id]
assert (expected_src, expected_dst) in cows
else:
# Otherwise, there should be no copy-on-write.
assert not cows
static_block_table.free()
appender_block_table.free()
# After free, expect all blocks to be freed.
assert allocator.get_num_free_blocks(Device.GPU) == num_gpu_blocks
@pytest.mark.parametrize("block_size", [8])
@pytest.mark.parametrize("sequence_len", [1, 16, 129])
@pytest.mark.parametrize("append_len", [1, 16, 129])
@pytest.mark.parametrize("lookahead_slots", [1, 16, 129])
@pytest.mark.parametrize("appender", ["forked", "original"])
@pytest.mark.parametrize("allocator_type", ["naive", "prefix_caching"])
def test_cow_lookahead_simple(block_size: int, sequence_len: int,
append_len: int, lookahead_slots: int,
allocator_type: str, appender: str):
"""Similar to test_cow, except with lookahead allocation. The assertions are
less rigorous due to the complexity of the property under test.
"""
num_gpu_blocks = 1024
allocator = CpuGpuBlockAllocator.create(
allocator_type=allocator_type,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=0,
block_size=block_size,
)
token_ids = list(range(sequence_len))
token_ids_to_append = list(range(append_len))
original_block_table = BlockTable(
block_size=block_size,
block_allocator=allocator,
)
original_block_table.allocate(token_ids=token_ids, device=Device.GPU)
# Allocate lookahead slots.
original_block_table.ensure_num_empty_slots(lookahead_slots)
original_block_ids = original_block_table.physical_block_ids[:]
forked_block_table = original_block_table.fork()
if appender == "forked":
appender_block_table = forked_block_table
static_block_table = original_block_table
elif appender == "original":
appender_block_table = original_block_table
static_block_table = forked_block_table
else:
raise ValueError(f"unknown test config {appender=}")
# Write tokens.
appender_block_table.append_token_ids(token_ids_to_append)
# Expect the non-appending block table to have no change.
assert static_block_table.physical_block_ids == original_block_ids
assert appender_block_table.physical_block_ids != original_block_ids
cows = allocator.clear_copy_on_writes()
# Always expect copy-on-write
assert cows
if sequence_len % block_size > 0:
# If the last block in the sequence is not full, then when appending we
# expect a CoW.
assert cows
cow_block_id = sequence_len // block_size
expected_src = static_block_table.physical_block_ids[cow_block_id]
expected_dst = appender_block_table.physical_block_ids[cow_block_id]
assert (expected_src, expected_dst) in cows
static_block_table.free()
appender_block_table.free()
# After free, expect all blocks to be freed.
assert allocator.get_num_free_blocks(Device.GPU) == num_gpu_blocks
@pytest.mark.parametrize("block_size", [1, 8])
@pytest.mark.parametrize("sequence_len", [1, 16, 129])
@pytest.mark.parametrize("num_new_tokens", [1, 16, 129])
@pytest.mark.parametrize("num_lookahead_slots", [1, 7, 8])
@pytest.mark.parametrize("allocator_type", ["naive", "prefix_caching"])
def test_num_blocks_touched_by_append_slots(block_size: int, sequence_len: int,
num_new_tokens: int,
num_lookahead_slots: int,
allocator_type: str):
"""Verify correct calculation of get_num_blocks_touched_by_append_slots.
This is done by using copy-on-write, which requires any modified block to
be copied before write if the refcount > 1. We set the refcount>1 by forking
a sequence, then measure the free blocks before and after an append. If the
number of consumed blocks equals what `get_num_blocks_touched_by_append_
slots` returns, then the calculation is correct.
"""
num_gpu_blocks = 1024
allocator = CpuGpuBlockAllocator.create(
allocator_type=allocator_type,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=0,
block_size=block_size,
)
token_ids = list(range(sequence_len))
token_ids_to_append = list(range(num_new_tokens))
block_table = BlockTable(
block_size=block_size,
block_allocator=allocator,
)
block_table.allocate(token_ids=token_ids, device=Device.GPU)
# Add lookahead before fork so both sequences have the same lookahead
# blocks.
block_table.ensure_num_empty_slots(num_empty_slots=num_lookahead_slots)
# Fork sequence so that every block has refcount > 1.
_ = block_table.fork()
# Determine how many blocks should be touched.
expected_num_touched_blocks = (
block_table.get_num_blocks_touched_by_append_slots(
token_ids=token_ids_to_append,
num_lookahead_slots=num_lookahead_slots))
# Measure how many blocks are touched by measuring num_free_blocks before
# and after the append.
#
# We expect append_token_ids to CoW all mutated blocks that have refcount>1.
num_free_blocks_before_append = allocator.get_num_free_blocks(Device.GPU)
block_table.append_token_ids(token_ids_to_append, num_lookahead_slots)
num_consumed_blocks = (num_free_blocks_before_append -
allocator.get_num_free_blocks(Device.GPU))
# TODO(cade) ensure equality when num_lookahead_slots > 0.
# The reason we have < is because lookahead blocks are not copied eagerly;
# they are copied on first write. This will cause issues for beam search +
# speculative decoding. This is acceptable for now as it is a large effort
# to combine the two. To fix this, we can ensure single sequence ownership
# of lookahead blocks by appending empty slots to each block, which will
# trigger the CoW.
#
# Until then, we can accept that the consumed tokens are <= the expected
# tokens when appending with lookahead.
if num_lookahead_slots > 0:
assert num_consumed_blocks <= expected_num_touched_blocks
else:
assert num_consumed_blocks == expected_num_touched_blocks

View File

@ -1,45 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import random
import pytest
from vllm.core.block.common import RefCounter
@pytest.mark.parametrize("seed", list(range(20)))
@pytest.mark.parametrize("num_incrs", [1, 100])
@pytest.mark.parametrize("num_blocks", [1024])
def test_incr(seed: int, num_incrs: int, num_blocks: int):
random.seed(seed)
all_block_indices = list(range(num_blocks))
counter = RefCounter(all_block_indices=all_block_indices)
block_id = random.randint(0, num_blocks - 1)
for i in range(num_incrs):
value = counter.incr(block_id)
assert value == i + 1
@pytest.mark.parametrize("seed", list(range(20)))
@pytest.mark.parametrize("num_incrs", [1, 100])
@pytest.mark.parametrize("num_blocks", [1024])
def test_incr_decr(seed: int, num_incrs: int, num_blocks: int):
random.seed(seed)
all_block_indices = list(range(num_blocks))
counter = RefCounter(all_block_indices=all_block_indices)
block_id = random.randint(0, num_blocks - 1)
for i in range(num_incrs):
value = counter.incr(block_id)
assert value == i + 1
for i in range(num_incrs):
value = counter.decr(block_id)
assert value == num_incrs - (i + 1)
with pytest.raises(AssertionError):
counter.decr(block_id)

View File

@ -1,96 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import pytest
from vllm.core.block.cpu_gpu_block_allocator import CpuGpuBlockAllocator
from vllm.utils import Device, chunk_list
@pytest.mark.parametrize("num_cpu_blocks", [0, 512])
@pytest.mark.parametrize("num_gpu_blocks", [1024])
@pytest.mark.parametrize("block_size", [16])
@pytest.mark.parametrize("allocator_type", ["naive", "prefix_caching"])
def test_allocate_mutable_block(num_cpu_blocks: int, num_gpu_blocks: int,
block_size: int, allocator_type: str):
allocator = CpuGpuBlockAllocator.create(
allocator_type=allocator_type,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=num_cpu_blocks,
block_size=block_size,
)
assert allocator.get_num_free_blocks(Device.CPU) == num_cpu_blocks
assert allocator.get_num_free_blocks(Device.GPU) == num_gpu_blocks
cpu_blocks = [
allocator.allocate_mutable_block(prev_block=None, device=Device.CPU)
for _ in range(num_cpu_blocks)
]
assert allocator.get_num_free_blocks(Device.CPU) == 0
assert allocator.get_num_free_blocks(Device.GPU) == num_gpu_blocks
gpu_blocks = [
allocator.allocate_mutable_block(prev_block=None, device=Device.GPU)
for _ in range(num_gpu_blocks)
]
assert allocator.get_num_free_blocks(Device.CPU) == 0
assert allocator.get_num_free_blocks(Device.GPU) == 0
_ = [allocator.free(block) for block in cpu_blocks]
assert allocator.get_num_free_blocks(Device.CPU) == num_cpu_blocks
assert allocator.get_num_free_blocks(Device.GPU) == 0
_ = [allocator.free(block) for block in gpu_blocks]
assert allocator.get_num_free_blocks(Device.CPU) == num_cpu_blocks
assert allocator.get_num_free_blocks(Device.GPU) == num_gpu_blocks
@pytest.mark.parametrize("num_cpu_blocks", [0, 512])
@pytest.mark.parametrize("num_gpu_blocks", [1024])
@pytest.mark.parametrize("block_size", [2])
@pytest.mark.parametrize("allocator_type", ["naive", "prefix_caching"])
def test_allocate_immutable_block(num_cpu_blocks: int, num_gpu_blocks: int,
block_size: int, allocator_type: str):
allocator = CpuGpuBlockAllocator.create(
allocator_type=allocator_type,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=num_cpu_blocks,
block_size=block_size,
)
unique_token_ids = list(
range((num_cpu_blocks + num_gpu_blocks) * block_size))
gpu_token_ids = list(
chunk_list(unique_token_ids[:num_gpu_blocks * block_size], block_size))
cpu_token_ids = list(
chunk_list(unique_token_ids[num_gpu_blocks * block_size:], block_size))
assert allocator.get_num_free_blocks(Device.CPU) == num_cpu_blocks
assert allocator.get_num_free_blocks(Device.GPU) == num_gpu_blocks
cpu_blocks = [
allocator.allocate_immutable_block(prev_block=None,
token_ids=token_ids,
device=Device.CPU)
for token_ids in cpu_token_ids
]
assert allocator.get_num_free_blocks(Device.CPU) == 0
assert allocator.get_num_free_blocks(Device.GPU) == num_gpu_blocks
gpu_blocks = [
allocator.allocate_immutable_block(prev_block=None,
token_ids=token_ids,
device=Device.GPU)
for token_ids in gpu_token_ids
]
assert allocator.get_num_free_blocks(Device.CPU) == 0
assert allocator.get_num_free_blocks(Device.GPU) == 0
_ = [allocator.free(block) for block in cpu_blocks]
assert allocator.get_num_free_blocks(Device.CPU) == num_cpu_blocks
assert allocator.get_num_free_blocks(Device.GPU) == 0
_ = [allocator.free(block) for block in gpu_blocks]
assert allocator.get_num_free_blocks(Device.CPU) == num_cpu_blocks
assert allocator.get_num_free_blocks(Device.GPU) == num_gpu_blocks

View File

@ -1,148 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from typing import Optional
import pytest
from vllm.core.block.interfaces import Block, BlockAllocator
from vllm.core.block.naive_block import NaiveBlock, NaiveBlockAllocator
class TestNaiveBlockAllocator:
@staticmethod
def create_allocate_lambda(allocate_type: str,
allocator: NaiveBlockAllocator,
prev_block: Optional[Block],
token_ids: list[int]):
if allocate_type == "immutable":
allocate_block = lambda: allocator.allocate_immutable_block(
prev_block=prev_block, token_ids=token_ids)
elif allocate_type == "mutable":
allocate_block = lambda: allocator.allocate_mutable_block(
prev_block=prev_block)
else:
raise ValueError()
return allocate_block
@staticmethod
@pytest.mark.parametrize("allocate_type", ["immutable", "mutable"])
@pytest.mark.parametrize("num_blocks", [1, 1024])
@pytest.mark.parametrize("block_size", [1, 16])
def test_allocate_ooms(allocate_type: str, num_blocks: int,
block_size: int):
allocator = NaiveBlockAllocator(create_block=NaiveBlock,
num_blocks=num_blocks,
block_size=block_size)
allocate_block = TestNaiveBlockAllocator.create_allocate_lambda(
allocate_type,
allocator,
prev_block=None,
token_ids=list(range(block_size)))
[allocate_block() for _ in range(num_blocks)]
with pytest.raises(BlockAllocator.NoFreeBlocksError):
allocate_block()
@staticmethod
@pytest.mark.parametrize("allocate_type", ["immutable", "mutable"])
@pytest.mark.parametrize("num_blocks", [1, 1024])
@pytest.mark.parametrize("block_size", [1, 16])
def test_free_prevents_oom(allocate_type: str, num_blocks: int,
block_size: int):
allocator = NaiveBlockAllocator(create_block=NaiveBlock,
num_blocks=num_blocks,
block_size=block_size)
allocate_block = TestNaiveBlockAllocator.create_allocate_lambda(
allocate_type,
allocator,
prev_block=None,
token_ids=list(range(block_size)))
blocks = [allocate_block() for _ in range(num_blocks)]
with pytest.raises(BlockAllocator.NoFreeBlocksError):
allocate_block()
block_to_free = blocks.pop()
for _ in range(100):
block_id = block_to_free.block_id
allocator.free(block_to_free)
assert block_to_free.block_id is None
new_block = allocate_block()
assert new_block.block_id == block_id
with pytest.raises(BlockAllocator.NoFreeBlocksError):
allocate_block()
block_to_free = new_block
@staticmethod
@pytest.mark.parametrize("allocate_type", ["immutable", "mutable"])
@pytest.mark.parametrize("num_blocks", [1024])
@pytest.mark.parametrize("block_size", [16])
def test_get_num_free_blocks(allocate_type: str, num_blocks: int,
block_size: int):
allocator = NaiveBlockAllocator(create_block=NaiveBlock,
num_blocks=num_blocks,
block_size=block_size)
allocate_block = TestNaiveBlockAllocator.create_allocate_lambda(
allocate_type,
allocator,
prev_block=None,
token_ids=list(range(block_size)))
assert allocator.get_num_free_blocks() == num_blocks
blocks = [allocate_block() for _ in range(num_blocks)]
for i, block in enumerate(blocks):
assert allocator.get_num_free_blocks() == i
allocator.free(block)
@staticmethod
@pytest.mark.parametrize("num_blocks", [4])
@pytest.mark.parametrize("block_size", [8])
def test_naive_block_get_num_full_blocks_touched(num_blocks, block_size):
""" Verify the allocator can correctly return the number of
full blocks touched.
"""
allocator_src = NaiveBlockAllocator(create_block=NaiveBlock,
num_blocks=num_blocks,
block_size=block_size)
allocator_dst = NaiveBlockAllocator(create_block=NaiveBlock,
num_blocks=num_blocks,
block_size=block_size)
# Create a chain of cacheable blocks in the dst
allocate_block = TestNaiveBlockAllocator.create_allocate_lambda(
"immutable",
allocator_src,
prev_block=None,
token_ids=list(range(block_size)))
src_blocks = [allocate_block() for _ in range(num_blocks - 1)]
# All blocks are cached
assert allocator_dst.get_num_full_blocks_touched(
src_blocks) == num_blocks - 1
# Insert one non-full block in the src
allocate_non_full_block = \
TestNaiveBlockAllocator.create_allocate_lambda(
"mutable", allocator_src,
prev_block=src_blocks[-1],token_ids=[]
)
src_blocks.append(allocate_non_full_block())
src_blocks[-1].append_token_ids([0])
assert allocator_dst.get_num_full_blocks_touched(
src_blocks) == num_blocks - 1
# Fill up the last source block and then invoke
# get_num_blocks_touched
src_blocks[-1].append_token_ids([0] * (block_size - 1))
assert allocator_dst.get_num_full_blocks_touched(
src_blocks) == num_blocks

File diff suppressed because it is too large Load Diff

View File

@ -1,12 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import pytest
@pytest.fixture(scope="function", autouse=True)
def use_v0_only(monkeypatch):
"""
Since this module is V0 only, set VLLM_USE_V1=0 for
all tests in the module.
"""
monkeypatch.setenv('VLLM_USE_V1', '0')

View File

@ -1,858 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from unittest.mock import MagicMock
import pytest # noqa
from vllm.config import CacheConfig, SchedulerConfig
from vllm.core.scheduler import Scheduler
from vllm.engine.arg_utils import EngineArgs
from vllm.engine.llm_engine import LLMEngine
from vllm.sampling_params import SamplingParams
from vllm.sequence import Logprob, SequenceGroup
from .utils import create_dummy_prompt
def get_sequence_groups(scheduler_output):
return [s.seq_group for s in scheduler_output.scheduled_seq_groups]
def append_new_token(seq_group: SequenceGroup, token_id: int):
for seq in seq_group.get_seqs():
seq.append_token_id(token_id, {token_id: Logprob(token_id)})
def schedule_and_update_computed_tokens(scheduler):
metas, out, _ = scheduler.schedule()
for s, meta in zip(out.scheduled_seq_groups, metas):
s.seq_group.update_num_computed_tokens(meta.token_chunk_size)
return metas, out
def test_simple():
"""Verify basic scheduling works."""
block_size = 4
num_seq_group = 4
max_model_len = 16
max_num_batched_tokens = 64
scheduler_config = SchedulerConfig("generate",
max_num_batched_tokens,
num_seq_group,
max_model_len,
enable_chunked_prefill=True)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8
scheduler = Scheduler(scheduler_config, cache_config, None)
running: list[SequenceGroup] = []
# Add seq groups to scheduler.
for i in range(num_seq_group):
_, seq_group = create_dummy_prompt(str(i),
prompt_length=block_size,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
# Schedule seq groups prompts.
num_tokens = block_size * num_seq_group
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert set(get_sequence_groups(out)) == set(running)
assert out.num_batched_tokens == num_tokens
assert (not out.blocks_to_copy and not out.blocks_to_swap_in
and not out.blocks_to_swap_out)
assert len(seq_group_meta) == num_seq_group
for s in running:
append_new_token(s, 1)
# Schedule seq groups generation.
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert set(get_sequence_groups(out)) == set(running)
assert out.num_batched_tokens == num_seq_group
assert (not out.blocks_to_copy and not out.blocks_to_swap_in
and not out.blocks_to_swap_out)
assert len(seq_group_meta) == num_seq_group
def test_chunk():
"""Verify prefills are chunked properly."""
block_size = 4
max_seqs = 60
max_model_len = 80
max_num_batched_tokens = 64
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 32
cache_config.num_gpu_blocks = 32
scheduler = Scheduler(scheduler_config, cache_config, None)
running: list[SequenceGroup] = []
# Add seq groups to scheduler.
for i in range(2):
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
# Verify the second request is chunked.
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
print()
assert set(get_sequence_groups(out)) == set(running)
assert seq_group_meta[0].token_chunk_size == 60
# Verify it is chunked.
assert seq_group_meta[1].token_chunk_size == 4
assert out.num_prefill_groups == 2
assert out.num_batched_tokens == 64
# Only the first seq group has a new token appended.
append_new_token(running[0], 1)
# One chunked prefill, and one decoding.
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert set(get_sequence_groups(out)) == set(running)
# The first one is prefill. Scheduler guarantees ordering.
assert seq_group_meta[0].token_chunk_size == 56
# The second one is a chunked prefill.
assert seq_group_meta[1].token_chunk_size == 1
assert out.num_prefill_groups == 1
assert out.num_batched_tokens == 57
def test_concurrent_chunking():
"""Verify prefills are chunked properly when
--max-num-partial-prefills is > 1"""
block_size = 4
max_seqs = 60
max_model_len = 2000
max_num_batched_tokens = 64
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
max_num_partial_prefills=2, # Up to 2 partial prefills at a time
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 32
cache_config.num_gpu_blocks = 32
scheduler = Scheduler(scheduler_config, cache_config, None)
running: list[SequenceGroup] = []
# Add seq groups to scheduler.
for i in range(2):
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
# Verify both requests are chunked with half of max_num_batched_tokens each
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert set(get_sequence_groups(out)) == set(running)
assert seq_group_meta[0].token_chunk_size == 32
assert seq_group_meta[1].token_chunk_size == 32
assert out.num_prefill_groups == 2
assert out.num_batched_tokens == 64
# After one iteration, both should have 60 - 32 = 28 tokens left to prefill
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert set(get_sequence_groups(out)) == set(running)
assert seq_group_meta[0].token_chunk_size == 28
assert seq_group_meta[1].token_chunk_size == 28
assert out.num_prefill_groups == 2
assert out.num_batched_tokens == 56
def test_concurrent_chunking_large_requests():
"""Verify large prefill requests are run one at a time"""
block_size = 4
max_seqs = 60
max_model_len = 2000
max_num_batched_tokens = 64
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
max_num_partial_prefills=2, # Up to 2 partial prefills at a time
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 3200 # large KV cache size for large requests
cache_config.num_gpu_blocks = 3200
scheduler = Scheduler(scheduler_config, cache_config, None)
# Add seq groups to scheduler.
for i in range(2):
_, seq_group = create_dummy_prompt(
str(i),
prompt_length=1200, # Very large prompt
block_size=block_size)
scheduler.add_seq_group(seq_group)
# Verify only a single request is chunked, and it gets all 64 tokens
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert len(get_sequence_groups(out)) == 1
assert seq_group_meta[0].token_chunk_size == 64
assert out.num_prefill_groups == 1
assert out.num_batched_tokens == 64
def test_short_prompts_jump_long_prompts_in_queue():
"""Verify large prefill requests are punted behind smaller ones if
another large prefill request is already running"""
block_size = 4
max_seqs = 60
max_model_len = 2000
max_num_batched_tokens = 64
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
max_num_partial_prefills=2, # Up to 2 partial prefills at a time
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 3200 # large KV cache size for large requests
cache_config.num_gpu_blocks = 3200
scheduler = Scheduler(scheduler_config, cache_config, None)
long_seqs: list[SequenceGroup] = []
short_seqs: list[SequenceGroup] = []
# Add 2 large seq groups to scheduler.
for i in range(2):
_, seq_group = create_dummy_prompt(
str(i),
prompt_length=1200, # Very large prompt
block_size=block_size)
scheduler.add_seq_group(seq_group)
long_seqs.append(seq_group)
assert seq_group.is_prefill()
# Add 2 small seq groups behind them
for i in range(2):
_, seq_group = create_dummy_prompt(
str(i + 2),
prompt_length=40, # Very small prompt
block_size=block_size)
scheduler.add_seq_group(seq_group)
short_seqs.append(seq_group)
assert seq_group.is_prefill()
# Verify one large req and 1 small req chunked
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert seq_group_meta[0].token_chunk_size == 32 # large req gets 32 tokens
assert seq_group_meta[1].token_chunk_size == 32 # small req gets 32 tokens
# all 4 are prefilling
assert long_seqs[0].is_prefill()
assert long_seqs[1].is_prefill()
assert short_seqs[0].is_prefill()
assert short_seqs[1].is_prefill()
# First short and first long sequences have been scheduled
assert long_seqs[0].first_seq.get_num_computed_tokens() == 32
assert long_seqs[1].first_seq.get_num_computed_tokens() == 0
assert short_seqs[0].first_seq.get_num_computed_tokens() == 32
assert short_seqs[1].first_seq.get_num_computed_tokens() == 0
assert out.num_prefill_groups == 2
assert out.num_batched_tokens == 64
# in the second iteration,
# the first small request had only 8 tokens left
# so it went to decode
# The other small req is scheduled
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
# the new small req got 64 - (32+8) tokens
assert seq_group_meta[0].token_chunk_size == 24
assert seq_group_meta[1].token_chunk_size == 32 # large req still got 32
# the other small request had only 8 tokens left
assert seq_group_meta[2].token_chunk_size == 8 # 40-32
# The first small request got to decode now
assert long_seqs[0].is_prefill()
assert long_seqs[1].is_prefill()
assert not short_seqs[0].is_prefill()
assert short_seqs[1].is_prefill()
# Both small requests have started in front of the second long request
assert long_seqs[0].first_seq.get_num_computed_tokens() == 64
assert long_seqs[1].first_seq.get_num_computed_tokens() == 0
assert short_seqs[0].first_seq.get_num_computed_tokens() == 40
assert short_seqs[1].first_seq.get_num_computed_tokens() == 24
assert out.num_prefill_groups == 3
assert out.num_batched_tokens == 64
# the first small seq group has a new token appended.
append_new_token(short_seqs[0], 1)
# in the third iteration,
# the first small request is already decoding
# the second small request only has 16 tokens left and will enter decoding
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert seq_group_meta[0].token_chunk_size == 32 # large still got 32
# small req finished prefilling 40-24=16 tokens
assert seq_group_meta[1].token_chunk_size == 16
assert seq_group_meta[2].token_chunk_size == 1 # decode
assert out.num_prefill_groups == 2
assert out.num_batched_tokens == 49 # (32+16+1 decode)
# both small requests have now reached decode
assert long_seqs[0].is_prefill()
assert long_seqs[1].is_prefill()
assert not short_seqs[0].is_prefill()
assert not short_seqs[1].is_prefill()
assert long_seqs[0].first_seq.get_num_computed_tokens() == 96
assert long_seqs[1].first_seq.get_num_computed_tokens() == 0
assert short_seqs[0].first_seq.get_num_computed_tokens() == 41
assert short_seqs[1].first_seq.get_num_computed_tokens() == 40
# both the small seq groups have a new token appended
append_new_token(short_seqs[0], 1)
append_new_token(short_seqs[1], 1)
# in the fourth iteration, both small requests are decoding
# so large request gets all the budget
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
# large req gets 62 tokens (minus 2 for decode)
assert seq_group_meta[0].token_chunk_size == 62
assert seq_group_meta[1].token_chunk_size == 1 # decode
assert seq_group_meta[2].token_chunk_size == 1 # decode
assert out.num_prefill_groups == 1
assert out.num_batched_tokens == 64
assert long_seqs[0].first_seq.get_num_computed_tokens() == 158
# assert long_seqs[0].is_prefill()
# assert long_seqs[1].is_prefill()
# assert not short_seqs[0].is_prefill()
# assert not short_seqs[1].is_prefill()
# # both the small seq groups have a new token appended
# append_new_token(short_seqs[0], 1)
# append_new_token(short_seqs[1], 1)
# # in the fifth iteration, large request gets all the budget
# # while both small requests are decoding
# seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
# assert seq_group_meta[0].token_chunk_size == 62
# assert seq_group_meta[1].token_chunk_size == 1 # decode
# assert seq_group_meta[2].token_chunk_size == 1 # decode
# assert out.num_prefill_groups == 1
# assert out.num_batched_tokens == 64
def test_complex():
block_size = 4
max_seqs = 60
max_model_len = 80
max_num_batched_tokens = 64
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 64
cache_config.num_gpu_blocks = 64
scheduler = Scheduler(scheduler_config, cache_config, None)
running: list[SequenceGroup] = []
# Add seq groups to scheduler.
for i in range(2):
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
assert seq_group.is_prefill()
# Verify the second request is chunked.
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert set(get_sequence_groups(out)) == set(running)
assert seq_group_meta[0].token_chunk_size == 60
# Verify it is chunked.
assert seq_group_meta[1].token_chunk_size == 4
assert not running[0].is_prefill()
assert running[1].is_prefill()
assert out.num_prefill_groups == 2
assert out.num_batched_tokens == 64
# Only the first seq group has a new token appended.
append_new_token(running[0], 1)
# Add 2 more requests.
for i in range(2, 4):
_, seq_group = create_dummy_prompt(str(i),
prompt_length=60,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
# Decoding & chunked prefill & first chunk of 3rd request is scheduled.
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert len(get_sequence_groups(out)) == 3
# The first one is the first chunked prefill.
assert seq_group_meta[0].token_chunk_size == 7
# The second one is the second new chunked prefill.
assert seq_group_meta[1].token_chunk_size == 56
# The last one is decode.
assert seq_group_meta[2].token_chunk_size == 1
# Two of them are in chunked prefill.
assert out.num_prefill_groups == 2
assert out.num_batched_tokens == 64
# The first 2 requests are now in decodine phase.
append_new_token(running[0], 1)
assert not running[0].is_prefill()
append_new_token(running[1], 1)
assert not running[1].is_prefill()
# The third request is still in prefill stage.
assert running[2].is_prefill()
def test_maximal_decoding():
"""Verify decoding requests are prioritized."""
block_size = 4
max_seqs = 2
max_model_len = 8
max_num_batched_tokens = 2
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 8
cache_config.num_gpu_blocks = 8
scheduler = Scheduler(scheduler_config, cache_config, None)
running: list[SequenceGroup] = []
# Add seq groups to scheduler.
for i in range(2):
_, seq_group = create_dummy_prompt(str(i),
prompt_length=2,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
assert seq_group.is_prefill()
# The first prefill is scheduled.
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert len(get_sequence_groups(out)) == 1
assert seq_group_meta[0].token_chunk_size == 2
assert not running[0].is_prefill()
assert running[1].is_prefill()
assert out.num_prefill_groups == 1
assert out.num_batched_tokens == 2
# Only the first seq group has a new token appended.
append_new_token(running[0], 1)
# Create one more seq_group.
_, seq_group = create_dummy_prompt("3",
prompt_length=2,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
assert seq_group.is_prefill()
# The first decoding + second chunk is scheduled.
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert len(get_sequence_groups(out)) == 2
assert seq_group_meta[0].token_chunk_size == 1
assert seq_group_meta[1].token_chunk_size == 1
assert not running[0].is_prefill()
assert running[1].is_prefill()
assert running[2].is_prefill()
assert out.num_prefill_groups == 1
assert out.num_batched_tokens == 2
append_new_token(running[0], 1)
# Decoding + running prefill is prioritized.
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert len(get_sequence_groups(out)) == 2
assert seq_group_meta[0].token_chunk_size == 1
assert seq_group_meta[1].token_chunk_size == 1
assert not running[0].is_prefill()
assert not running[1].is_prefill()
assert out.num_prefill_groups == 1
assert out.num_batched_tokens == 2
append_new_token(running[0], 1)
append_new_token(running[1], 1)
# Only decoding is prioritized.
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert len(get_sequence_groups(out)) == 2
assert seq_group_meta[0].token_chunk_size == 1
assert seq_group_meta[1].token_chunk_size == 1
assert not running[0].is_prefill()
assert not running[1].is_prefill()
assert out.num_prefill_groups == 0
assert out.num_batched_tokens == 2
append_new_token(running[0], 1)
append_new_token(running[1], 1)
# After aborting the decoding request, the fcfs new prefill is prioritized.
scheduler.abort_seq_group(running[0].request_id)
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert len(get_sequence_groups(out)) == 2
assert seq_group_meta[0].token_chunk_size == 1
assert seq_group_meta[1].token_chunk_size == 1
assert not running[1].is_prefill()
assert running[2].is_prefill()
assert out.num_prefill_groups == 1
assert out.num_batched_tokens == 2
def test_prompt_limit():
"""Verify max_num_batched_tokens < max_model_len is possible."""
block_size = 4
max_seqs = 32
max_model_len = 64
max_num_batched_tokens = 32
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 16
cache_config.num_gpu_blocks = 16
scheduler = Scheduler(scheduler_config, cache_config, None)
running: list[SequenceGroup] = []
_, seq_group = create_dummy_prompt("1",
prompt_length=48,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
assert seq_group.is_prefill()
# The prompt length > max_num_batched_tokens should be still scheduled.
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert len(get_sequence_groups(out)) == 1
assert seq_group_meta[0].token_chunk_size == 32
assert running[0].is_prefill()
assert out.num_prefill_groups == 1
assert out.num_batched_tokens == 32
def test_prompt_limit_exceed():
block_size = 4
max_seqs = 64
max_model_len = 32
max_num_batched_tokens = 64
scheduler_config = SchedulerConfig("generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 16
cache_config.num_gpu_blocks = 16
scheduler = Scheduler(scheduler_config, cache_config, None)
running: list[SequenceGroup] = []
_, seq_group = create_dummy_prompt("2",
prompt_length=48,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
assert seq_group.is_prefill()
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.ignored_seq_groups) == 1
assert out.ignored_seq_groups[0] == seq_group
def test_chunked_prefill_preempt():
"""Verify preempt works with chunked prefill requests"""
block_size = 4
max_seqs = 30
max_model_len = 200
max_num_batched_tokens = 30
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 16
cache_config.num_gpu_blocks = 16
scheduler = Scheduler(scheduler_config, cache_config, None)
_, seq_group = create_dummy_prompt("1",
prompt_length=60,
block_size=block_size)
scheduler.add_seq_group(seq_group)
_, out = schedule_and_update_computed_tokens(scheduler)
# The request is chunked.
# prefill scheduled now.
assert len(out.scheduled_seq_groups) == 1
assert out.num_prefill_groups == 1
assert seq_group.is_prefill()
assert out.num_batched_tokens == max_num_batched_tokens
# The request should be preempted.
scheduler.block_manager.can_append_slots = MagicMock()
def cannot_append_second_group1(seq_group, num_lookahead_slots):
return seq_group.request_id != "1"
scheduler.block_manager.can_append_slots.side_effect = (
cannot_append_second_group1)
# The running prefill is now preempted.
_, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 0
assert out.num_batched_tokens == 0
assert out.blocks_to_swap_out == []
assert out.blocks_to_swap_in == []
# Make sure we can reschedule preempted request.
_, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 1
assert out.num_prefill_groups == 1
assert seq_group.is_prefill()
assert out.num_batched_tokens == max_num_batched_tokens
assert seq_group.get_num_uncomputed_tokens() == 30
# We should be able to run prefill twice as it is chunked.
def cannot_append_second_group2(seq_group, num_lookahead_slots):
return True
scheduler.block_manager.can_append_slots.side_effect = (
cannot_append_second_group2)
_, out = schedule_and_update_computed_tokens(scheduler)
assert len(out.scheduled_seq_groups) == 1
assert out.num_prefill_groups == 1
assert not seq_group.is_prefill()
assert out.num_batched_tokens == max_num_batched_tokens
def test_chunked_prefill_spec_prefill():
"""Verify that the num_lookahead_slots is set appropriately for an all"""
"""prefill batch."""
block_size = 4
max_seqs = 30
max_model_len = 200
max_num_batched_tokens = 30
num_lookahead_slots = 4
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
num_lookahead_slots=num_lookahead_slots,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 16
cache_config.num_gpu_blocks = 16
scheduler = Scheduler(scheduler_config, cache_config, None)
_, seq_group = create_dummy_prompt("1",
prompt_length=30,
block_size=block_size)
scheduler.add_seq_group(seq_group)
_, out = schedule_and_update_computed_tokens(scheduler)
# The request is chunked.
# prefill scheduled now.
assert len(out.scheduled_seq_groups) == 1
assert out.num_prefill_groups == 1
assert out.num_batched_tokens == max_num_batched_tokens
print(out.num_lookahead_slots)
assert out.num_lookahead_slots == 0
def test_chunked_prefill_max_seqs():
block_size = 4
max_seqs = 2
max_model_len = 80
max_num_batched_tokens = 64
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
)
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
cache_config.num_cpu_blocks = 128
cache_config.num_gpu_blocks = 128
scheduler = Scheduler(scheduler_config, cache_config, None)
running: list[SequenceGroup] = []
_, seq_group = create_dummy_prompt("1",
prompt_length=65,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
# The first prefill is chunked.
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert seq_group_meta[0].token_chunk_size == max_num_batched_tokens
assert len(get_sequence_groups(out)) == 1
# Add new requests.
for i in range(4):
_, seq_group = create_dummy_prompt(str(i),
prompt_length=65,
block_size=block_size)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
# Make sure only 2 requests are scheduled.
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert out.num_batched_tokens == max_num_batched_tokens
assert len(get_sequence_groups(out)) == 2
assert not running[0].is_prefill()
assert running[1].is_prefill()
append_new_token(running[0], 1)
# Although we have enough token budget, we can only schedule max_seqs.
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert seq_group_meta[0].token_chunk_size == 2
assert seq_group_meta[1].token_chunk_size == 1
assert out.num_batched_tokens == 3
assert len(get_sequence_groups(out)) == max_seqs
assert not running[0].is_prefill()
assert not running[1].is_prefill()
def test_prefix_caching():
"""Verify allocating full blocks when prefix caching is enabled."""
block_size = 4
max_seqs = 10
max_model_len = 80
max_num_batched_tokens = 64
scheduler_config = SchedulerConfig(
"generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
)
cache_config = CacheConfig(block_size,
1.0,
1,
"auto",
enable_prefix_caching=True)
cache_config.num_cpu_blocks = 0
cache_config.num_gpu_blocks = 32
scheduler = Scheduler(scheduler_config, cache_config, None)
running: list[SequenceGroup] = []
# Add seq groups to scheduler.
for i in range(2):
_, seq_group = create_dummy_prompt(str(i),
block_size=block_size,
prompt_length=50)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert set(get_sequence_groups(out)) == set(running)
assert seq_group_meta[0].token_chunk_size == 50
# Verify it is chunked. Note that although the budget is 64-50=14,
# we only allocate full blocks for prefix caching, so only 4*(14//4)=12
# tokens are allocated.
assert seq_group_meta[1].token_chunk_size == 12
assert out.num_prefill_groups == 2
assert out.num_batched_tokens == 62
def test_prefix_caching_with_concurrent_partial_prefills():
"""Verify allocating full blocks when prefix caching is enabled with
--max-num-partial-prefills > 1."""
block_size = 4
max_seqs = 10
max_model_len = 8000
max_num_batched_tokens = 60 # With two slots, each slot will get 30 tokens
scheduler_config = SchedulerConfig("generate",
max_num_batched_tokens,
max_seqs,
max_model_len,
enable_chunked_prefill=True,
max_num_partial_prefills=2)
cache_config = CacheConfig(block_size,
1.0,
1,
"auto",
enable_prefix_caching=True)
cache_config.num_cpu_blocks = 0
cache_config.num_gpu_blocks = 32
scheduler = Scheduler(scheduler_config, cache_config, None)
running: list[SequenceGroup] = []
# Add seq groups to scheduler.
for i in range(2):
_, seq_group = create_dummy_prompt(str(i),
block_size=block_size,
prompt_length=50)
scheduler.add_seq_group(seq_group)
running.append(seq_group)
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert set(get_sequence_groups(out)) == set(running)
# To partially prefill both sequences, both can chunk up to 30 tokens
# But the next lowest multiple of the block size (4) is 28
assert seq_group_meta[0].token_chunk_size == 28
assert seq_group_meta[1].token_chunk_size == 28
assert out.num_prefill_groups == 2
assert out.num_batched_tokens == 56
# On the next iteration, both sequences should finish prefill
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
assert set(get_sequence_groups(out)) == set(running)
# Both sequences have 50 - 28 = 22 tokens left to prefill.
# This is not a multiple of the block size, but we don't care since we don't
# cache the final partial block of prefix sequences
assert seq_group_meta[0].token_chunk_size == 22
assert seq_group_meta[1].token_chunk_size == 22
assert out.num_prefill_groups == 2
assert out.num_batched_tokens == 44
@pytest.mark.parametrize("model", ["facebook/opt-125m"])
@pytest.mark.parametrize("max_num_partial_prefills", [2, 4, 8])
def test_chunked_prefill_with_actual_engine(model: str,
max_num_partial_prefills: int):
"""Make sure the model can actually sample with concurrent
partial prefills
"""
prompt = "hello" * 40
engine_args = EngineArgs(
model=model,
max_num_partial_prefills=max_num_partial_prefills,
max_num_batched_tokens=40,
max_num_seqs=8,
enable_chunked_prefill=True,
gpu_memory_utilization=0.8,
)
engine = LLMEngine.from_engine_args(engine_args)
sampling_params = SamplingParams(temperature=0)
for req_num in range(max_num_partial_prefills):
engine.add_request(f"{req_num}", prompt, sampling_params)
# first step
request_outputs = engine.step()
# means all are prefilling
assert len(request_outputs) == 0
assert len(engine.scheduler[0].running) == max_num_partial_prefills

View File

@ -1,67 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import pytest
from tests.conftest import VllmRunner
from tests.core.utils import create_dummy_prompt
from vllm.engine.llm_engine import LLMEngine
from vllm.sequence import SequenceGroup
MODEL = "JackFram/llama-160m"
def add_seq_group_to_engine(engine: LLMEngine, seq_group: SequenceGroup):
scheduler = engine.scheduler[0]
scheduler.add_seq_group(seq_group)
@pytest.mark.parametrize("enable_chunked_prefill", [False, True])
@pytest.mark.parametrize("enforce_eager", [False, True])
def test_num_computed_tokens_update(enable_chunked_prefill: bool,
enforce_eager: bool):
# Make a vllm engine
runner = VllmRunner(model_name=MODEL,
gpu_memory_utilization=0.7,
enable_chunked_prefill=enable_chunked_prefill,
enforce_eager=enforce_eager)
engine: LLMEngine = runner.llm.llm_engine
num_prompt_steps = 1
num_output_tokens_list = [4, 8, 12, 15, 16, 17]
# Create sequence and add to engine
prompt_len = 10
for req_idx, num_output_tokens in enumerate(num_output_tokens_list):
seq, seq_group = create_dummy_prompt(request_id=str(req_idx),
prompt_length=prompt_len,
min_tokens=num_output_tokens,
max_tokens=num_output_tokens)
add_seq_group_to_engine(engine, seq_group)
assert seq.data.get_num_computed_tokens() == 0
for _ in range(num_prompt_steps):
# prompt steps
engine.step()
if not seq.is_finished():
prompt_num_computed_tokens = seq.data.get_num_computed_tokens()
# Test correctness of num_computed_tokens after the prompt steps
assert prompt_num_computed_tokens == \
prompt_len + num_prompt_steps - 1
decode_step_counter = 0
while not seq.is_finished():
# Test correctness of num_computed_tokens after the decode steps
assert seq.data.get_num_computed_tokens(
) == prompt_num_computed_tokens + decode_step_counter
engine.step()
decode_step_counter += 1
# Test correctness of num_computed_tokens after the sequence finish.
assert seq.data.get_num_computed_tokens(
) == prompt_len + num_output_tokens - 1

File diff suppressed because it is too large Load Diff

View File

@ -1,36 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import msgspec
from vllm.executor.msgspec_utils import decode_hook, encode_hook
from vllm.sequence import ExecuteModelRequest
from .utils import create_batch
def test_msgspec_serialization():
num_lookahead_slots = 4
seq_group_metadata_list, _, _ = create_batch(16, num_lookahead_slots)
execute_model_req = ExecuteModelRequest(
seq_group_metadata_list=seq_group_metadata_list,
num_lookahead_slots=num_lookahead_slots,
running_queue_size=4)
encoder = msgspec.msgpack.Encoder(enc_hook=encode_hook)
decoder = msgspec.msgpack.Decoder(ExecuteModelRequest,
dec_hook=decode_hook)
req = decoder.decode(encoder.encode(execute_model_req))
expected = execute_model_req.seq_group_metadata_list
actual = req.seq_group_metadata_list
assert (len(expected) == len(actual))
expected = expected[0]
actual = actual[0]
assert expected.block_tables == actual.block_tables
assert expected.is_prompt == actual.is_prompt
assert expected.request_id == actual.request_id
assert (expected.seq_data[0].prompt_token_ids ==
actual.seq_data[0].prompt_token_ids)
assert (expected.seq_data[0].output_token_ids ==
actual.seq_data[0].output_token_ids)

View File

@ -1,392 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import time
from collections import defaultdict
from collections.abc import Sequence as GenericSequence
from itertools import count
from typing import Any, Optional, Union
import torch
from vllm.core.scheduler import Scheduler, SchedulerOutputs
from vllm.inputs import EncoderDecoderInputs, embeds_inputs, token_inputs
from vllm.lora.request import LoRARequest
from vllm.sampling_params import SamplingParams
from vllm.sequence import (Logprob, Sequence, SequenceData, SequenceGroup,
SequenceGroupMetadata)
def create_dummy_prompt(
request_id: str,
prompt_length: int = -1,
block_size: Optional[int] = None,
lora_request: Optional[LoRARequest] = None,
prompt_tokens: Optional[list[int]] = None,
prompt_embeds: Optional[torch.Tensor] = None,
min_tokens: int = 0,
max_tokens: int = 16,
) -> tuple[Sequence, SequenceGroup]:
if not block_size:
block_size = prompt_length
if prompt_tokens is None:
# Create dummy prompt sequence with tokens 0...block_size-1
# and prompt "0 ... block_size".
prompt_tokens = list(range(prompt_length))
prompt_str = " ".join([str(t) for t in prompt_tokens])
inputs = token_inputs(
prompt_token_ids=prompt_tokens,
prompt=prompt_str) if prompt_embeds is None else embeds_inputs(
prompt_embeds=prompt_embeds)
prompt = Sequence(
int(request_id),
inputs=inputs,
block_size=block_size,
)
seq_group = SequenceGroup(
request_id=request_id,
seqs=[prompt],
arrival_time=time.time(),
sampling_params=SamplingParams(max_tokens=max_tokens,
min_tokens=min_tokens),
lora_request=lora_request,
)
return prompt, seq_group
def create_dummy_lora_sequence(request_id: int, token_ids: list[int],
block_size: int, lora_int_id: int) -> Sequence:
return Sequence(seq_id=request_id,
inputs=token_inputs(token_ids),
block_size=block_size,
lora_request=LoRARequest(lora_name="dummy",
lora_path="/dummy",
lora_int_id=lora_int_id))
def create_dummy_sequence(request_id: int, token_ids: list[int],
block_size: int) -> Sequence:
return Sequence(
seq_id=request_id,
inputs=token_inputs(token_ids),
block_size=block_size,
)
def create_dummy_prompt_encoder_decoder(
request_id: str,
decoder_prompt_length: int,
encoder_prompt_length: int,
block_size: Optional[int] = None,
lora_request: Optional[LoRARequest] = None,
) -> tuple[Sequence, Sequence, SequenceGroup]:
if not block_size:
block_size = decoder_prompt_length
# Create dummy prompt sequence with tokens 0...block_size-1
# and prompt "0 ... block_size". Note that the prompt string
# doesn't actually match the tokens
decoder_prompt_tokens = list(range(decoder_prompt_length))
decoder_prompt_str = " ".join([str(t) for t in decoder_prompt_tokens])
encoder_prompt_tokens = list(reversed(list(range(encoder_prompt_length))))
encoder_prompt_str = " ".join([str(t) for t in encoder_prompt_tokens])
inputs: EncoderDecoderInputs = {
"decoder": token_inputs(decoder_prompt_tokens,
prompt=decoder_prompt_str),
"encoder": token_inputs(encoder_prompt_tokens,
prompt=encoder_prompt_str),
}
decoder_prompt = Sequence(int(request_id),
inputs=inputs["decoder"],
block_size=block_size)
encoder_prompt = Sequence(int(request_id),
inputs=inputs["encoder"],
block_size=block_size)
seq_group = SequenceGroup(request_id=request_id,
seqs=[decoder_prompt],
arrival_time=time.time(),
lora_request=lora_request,
encoder_seq=encoder_prompt)
return decoder_prompt, encoder_prompt, seq_group
def create_seq_group(
seq_prompt_len: int = 1024,
seq_output_lens: GenericSequence[int] = (128, ),
request_id: str = '0',
seq_id_start: int = 0,
sampling_params: Optional[SamplingParams] = None) -> SequenceGroup:
assert len(seq_output_lens) > 0
if sampling_params is None:
sampling_params = SamplingParams()
prompt_token_ids = [0] * seq_prompt_len
seqs: list[Sequence] = []
for seq_id_offset, output_len in enumerate(seq_output_lens):
seq = Sequence(
seq_id=seq_id_start + seq_id_offset,
inputs=token_inputs(prompt_token_ids),
block_size=16,
)
for i in range(output_len):
seq.append_token_id(
token_id=i,
logprobs={i: Logprob(0.0)},
)
seqs.append(seq)
seq_group = SequenceGroup(
request_id=request_id,
seqs=seqs,
sampling_params=sampling_params,
arrival_time=time.time(),
)
return seq_group
def create_seq_group_encoder_decoder(
seq_prompt_len: int = 1024,
seq_output_lens: GenericSequence[int] = (128, ),
request_id: str = '0',
seq_id_start: int = 0,
sampling_params: Optional[SamplingParams] = None) -> SequenceGroup:
assert len(seq_output_lens) > 0
if sampling_params is None:
sampling_params = SamplingParams()
prompt_token_ids = [0] * seq_prompt_len
inputs: EncoderDecoderInputs = {
"decoder": token_inputs(prompt_token_ids),
"encoder": token_inputs(prompt_token_ids),
}
seqs = []
for seq_id_offset, output_len in enumerate(seq_output_lens):
# Construct decoder input sequences
seq = Sequence(
seq_id=seq_id_start + seq_id_offset,
inputs=inputs["decoder"],
block_size=16,
)
for i in range(output_len):
seq.append_token_id(
token_id=i,
logprobs={i: Logprob(0.0)},
)
seqs.append(seq)
# Encoder input sequence
encoder_seq = Sequence(
seq_id=seq_id_start + len(seq_output_lens),
inputs=inputs["encoder"],
block_size=16,
)
return SequenceGroup(request_id=request_id,
seqs=seqs,
sampling_params=sampling_params,
arrival_time=time.time(),
encoder_seq=encoder_seq)
def round_up_to_next_block(seq_len: int, block_size: int) -> int:
return (seq_len + block_size - 1) // block_size
# Helper functions for scheduler tests
def get_sequence_groups(scheduler_output):
return [s.seq_group for s in scheduler_output.scheduled_seq_groups]
def append_new_token(out, token_id: int):
seq_groups = get_sequence_groups(out)
for seq_group in seq_groups:
for seq in seq_group.get_seqs():
seq.append_token_id(token_id, {token_id: Logprob(token_id)})
def schedule_and_update_computed_tokens(scheduler):
metas, out, _ = scheduler.schedule()
for s in out.scheduled_seq_groups:
s.seq_group.update_num_computed_tokens(s.token_chunk_size)
return metas, out
def append_new_token_seq(seq: Sequence, token_id: int):
seq.append_token_id(token_id, {token_id: Logprob(token_id)})
def append_new_token_seq_group(token_chunk_size, seq_group, token_id: int):
seq_group.update_num_computed_tokens(token_chunk_size)
for seq in seq_group.get_seqs():
seq.append_token_id(token_id, {token_id: Logprob(token_id)})
class SchedulerProxy:
"""
A proxy class to forward calls to the scheduler.
"""
def __init__(self, scheduler: Scheduler):
self.scheduler_ = scheduler
self.call_history: dict[str, list[Any]] = defaultdict(list)
def __getattr__(self, name: str) -> Any:
def wrapper(*args, **kwargs):
result = getattr(self.scheduler_, name)(*args, **kwargs)
self.call_history[name].append((args, kwargs, result))
return result
return wrapper
def last_schedule_ret(
self, ) -> tuple[list[SequenceGroupMetadata], SchedulerOutputs, Any]:
_, _, ret = self.call_history["schedule"][-1]
return ret
def create_seq_group_metadata_from_prompts(
prompts: list[list[int]],
num_gpu_blocks: int,
block_size: int,
final_prompt_lens: list[int],
continuations: Optional[list[list[int]]] = None,
seq_ids: Optional[list[int]] = None,
) -> list[SequenceGroupMetadata]:
if continuations is None:
continuations = [[] for _ in prompts]
if seq_ids is None:
seq_ids = list(i for i, _ in enumerate(prompts))
free_gpu_blocks = list(range(num_gpu_blocks))
block_allocations = {
i: [
free_gpu_blocks.pop()
for _ in range(round_up_to_next_block(final_len, block_size))
]
for i, final_len in enumerate(final_prompt_lens)
}
seq_grou_metadata_list = []
for i, (prompt_token_ids,
cont_token_ids) in enumerate(zip(prompts, continuations)):
data = SequenceData.from_seqs(prompt_token_ids, cont_token_ids)
data.update_num_computed_tokens(
len(prompt_token_ids) + len(cont_token_ids) - 1)
seq_data = {i: data}
seq_grou_metadata_list.append(
SequenceGroupMetadata(
request_id=str(i),
is_prompt=len(cont_token_ids) == 0,
seq_data=seq_data,
sampling_params=SamplingParams(temperature=0.0),
block_tables={i: block_allocations[i][:]},
))
return seq_grou_metadata_list
def create_chunked_seq_group_metadata_from_prompt(
prompt: list[int],
num_gpu_blocks: int,
chunk_size: int,
block_size: int,
seq_id: Optional[int] = None) -> list[SequenceGroupMetadata]:
if seq_id is None:
seq_id = 0
free_gpu_blocks = list(range(num_gpu_blocks))
block_allocations = [
free_gpu_blocks.pop()
for _ in range(round_up_to_next_block(len(prompt), block_size))
]
seq_group_metadata_list = []
for i, idx in enumerate(range(0, len(prompt), chunk_size)):
chunk_ids = prompt[idx:idx + chunk_size]
data = SequenceData.from_seqs(prompt)
data.update_num_computed_tokens(idx)
seq_data = {i: data}
seq_group_metadata_list.append(
SequenceGroupMetadata(
request_id=str(seq_id),
is_prompt=True,
do_sample=idx + chunk_size >= len(prompt), # terminal chunk
seq_data=seq_data,
sampling_params=SamplingParams(temperature=0.0),
block_tables={i: block_allocations},
token_chunk_size=len(chunk_ids)))
return seq_group_metadata_list
def create_batch(batch_size,
k,
prompt_len: Union[int, list[int]] = 10,
prev_output_token_len: int = 10,
seq_ids: Optional[list[int]] = None,
num_gpu_blocks: Optional[int] = None,
block_size: Optional[int] = None,
prefill_chunk_size: Optional[int] = None):
if block_size is None:
block_size = 8
if num_gpu_blocks is None:
num_gpu_blocks = 2048 // block_size
iterator = count()
if isinstance(prompt_len, int):
prompt_lens = [prompt_len for _ in range(batch_size)]
else:
prompt_lens = prompt_len
prompts = [[next(iterator) for _ in range(p_len)] for p_len in prompt_lens]
if prefill_chunk_size:
# Create a batch of chunked prompts.
if not seq_ids:
seq_ids = list(range(len(prompts)))
seq_group_metadata_list = []
for p, sid in zip(prompts, seq_ids):
seq_group_metadata_list += \
create_chunked_seq_group_metadata_from_prompt(
p, num_gpu_blocks, prefill_chunk_size, block_size, sid)
seq_group_metadata_list = seq_group_metadata_list[:batch_size]
prev_output_tokens = []
else:
prev_output_tokens = [[
next(iterator) for _ in range(prev_output_token_len)
] for _ in range(batch_size)]
final_prompt_lens = [
len(prompt) + len(prev_output_token) + k + 1
for prompt, prev_output_token in zip(prompts, prev_output_tokens)
]
seq_group_metadata_list = create_seq_group_metadata_from_prompts(
prompts, num_gpu_blocks, block_size, final_prompt_lens,
prev_output_tokens, seq_ids)
return seq_group_metadata_list, prompts, prev_output_tokens