vllm/tests/core/block/test_block_manager.py
Woosuk Kwon 759ef49b15
Remove V0 Encoder-Decoder Support (#24907)
Signed-off-by: Woosuk Kwon <woosuk@thinkingmachines.ai>
2025-09-15 21:17:14 -07:00

342 lines
13 KiB
Python

# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import pytest
from vllm.core.block_manager import SelfAttnBlockSpaceManager
from vllm.core.interfaces import AllocStatus
from vllm.sequence import Logprob, SequenceStatus
from vllm.utils import chunk_list
from ..utils import create_dummy_prompt, create_seq_group
@pytest.mark.parametrize("block_size", [16])
@pytest.mark.parametrize("num_gpu_blocks", [8, 40, 80])
@pytest.mark.parametrize("num_seqs_per_group", [1, 4])
@pytest.mark.parametrize("watermark", [0.0, 0.5])
def test_can_allocate_seq_group(block_size: int, num_seqs_per_group: int,
num_gpu_blocks: int, watermark: float):
block_manager = SelfAttnBlockSpaceManager(
block_size=block_size,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=1024,
watermark=watermark,
)
num_watermark_blocks = int(watermark * num_gpu_blocks)
num_output_blocks_per_seq = 1
# NOTE: This should be num_output_blocks_per_seq * num_seqs_per_group, but
# the current implementation assumes all seqs are new prompts / don't have
# different output lens.
num_output_blocks = num_output_blocks_per_seq
for num_prompt_blocks in range(1, num_gpu_blocks - num_output_blocks):
seq_group = create_seq_group(
seq_prompt_len=block_size * num_prompt_blocks,
seq_output_lens=[
block_size * num_output_blocks_per_seq
for _ in range(num_seqs_per_group)
],
)
assert num_prompt_blocks + num_output_blocks <= num_gpu_blocks
can_allocate_result = block_manager.can_allocate(seq_group)
num_required_blocks = num_prompt_blocks + num_output_blocks
if num_gpu_blocks - num_required_blocks < num_watermark_blocks:
assert can_allocate_result == AllocStatus.NEVER
elif num_gpu_blocks >= num_required_blocks:
assert can_allocate_result == AllocStatus.OK
else:
assert can_allocate_result == AllocStatus.LATER
@pytest.mark.parametrize("block_size", [1, 8])
@pytest.mark.parametrize("prompt_len", [1, 7, 8])
@pytest.mark.parametrize("num_slots_to_append", [1, 8, 129])
@pytest.mark.parametrize("num_lookahead_slots", [0, 10])
def test_append_slots(block_size, prompt_len, num_slots_to_append,
num_lookahead_slots):
"""Verify append_slots consumes the correct number of blocks from the block
table.
"""
num_gpu_blocks = 1024
watermark = 0.1
block_manager = SelfAttnBlockSpaceManager(
block_size=block_size,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=0,
watermark=watermark,
)
seq_group = create_seq_group(
seq_prompt_len=prompt_len,
seq_output_lens=[0],
)
# Allocate seq
assert block_manager.can_allocate(seq_group)
block_manager.allocate(seq_group)
# Seq seq to RUNNING
seq = seq_group.get_seqs()[0]
seq.status = SequenceStatus.RUNNING
# Append tokens to the sequeqnce
for token_id in range(num_slots_to_append):
seq.append_token_id(token_id, {token_id: Logprob(0.0)})
# Append slots for new tokens and lookahead slots.
free_blocks_before_append = block_manager.get_num_free_gpu_blocks()
block_manager.append_slots(seq, num_lookahead_slots)
num_consumed_blocks = (free_blocks_before_append -
block_manager.get_num_free_gpu_blocks())
# Expect consumed blocks to be new blocks required to support the new slots.
expected_consumed_blocks = len(
list(
chunk_list(
list(
range(prompt_len + num_slots_to_append +
num_lookahead_slots)),
block_size))) - len(
list(chunk_list(list(range(prompt_len)), block_size)))
assert num_consumed_blocks == expected_consumed_blocks
@pytest.mark.parametrize("block_size", [8])
@pytest.mark.parametrize("num_cpu_blocks", [4])
@pytest.mark.parametrize("num_gpu_blocks", [4])
@pytest.mark.parametrize("num_lookahead_slots", [0, 2, 10])
@pytest.mark.parametrize("enable_caching", [False, True])
def test_swap(block_size, num_cpu_blocks, num_gpu_blocks, num_lookahead_slots,
enable_caching):
"""Verify blocks number on src/desc device is correct after swapping in/out
sequence group (not missing or extra blocks).
"""
block_manager = SelfAttnBlockSpaceManager(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0,
enable_caching=enable_caching)
prompt, seq_group = create_dummy_prompt("1", prompt_length=block_size - 1)
prompt.status = SequenceStatus.WAITING
block_manager.allocate(seq_group)
# Emulate a forward pass by appending a single token.
# The block manager then knows how many unprocessed
# tokens will be written in the next forward pass.
token_id = 0
prompt.status = SequenceStatus.RUNNING
prompt.append_token_id(token_id, {token_id: Logprob(0.0)})
# Swap seq group from GPU -> CPU.
gpu_blocks = block_manager.get_block_table(prompt)
assert block_manager.can_swap_out(seq_group)
before_cpu_blocks = block_manager.get_num_free_cpu_blocks()
before_gpu_blocks = block_manager.get_num_free_gpu_blocks()
mapping = block_manager.swap_out(seq_group)
mapping_keys = [key for key, _ in mapping]
assert mapping_keys == gpu_blocks
after_cpu_blocks = block_manager.get_num_free_cpu_blocks()
after_gpu_blocks = block_manager.get_num_free_gpu_blocks()
assert before_cpu_blocks == after_cpu_blocks + len(gpu_blocks)
assert before_gpu_blocks + len(gpu_blocks) == after_gpu_blocks
prompt.status = SequenceStatus.SWAPPED
# Swap seq group from CPU -> GPU.
assert block_manager.can_swap_in(seq_group, num_lookahead_slots)
before_cpu_blocks = block_manager.get_num_free_cpu_blocks()
before_gpu_blocks = block_manager.get_num_free_gpu_blocks()
mapping = block_manager.swap_in(seq_group)
cpu_blocks = block_manager.get_block_table(prompt)
mapping_keys = [key for key, _ in mapping]
assert mapping_keys == [cpu_blocks[0]]
after_cpu_blocks = block_manager.get_num_free_cpu_blocks()
after_gpu_blocks = block_manager.get_num_free_gpu_blocks()
assert before_gpu_blocks == after_gpu_blocks + len(cpu_blocks)
@pytest.mark.parametrize("block_size", [8])
@pytest.mark.parametrize("num_gpu_blocks", [4])
@pytest.mark.parametrize("num_lookahead_slots", [3, 8, 10])
@pytest.mark.parametrize("enable_caching", [True, False])
def test_can_swap(block_size, num_gpu_blocks, num_lookahead_slots,
enable_caching):
""" Verify the block manager can correctly determine if a sequence group
can be swapped in/out.
"""
num_cpu_blocks = num_gpu_blocks
block_manager = SelfAttnBlockSpaceManager(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0,
enable_caching=enable_caching)
prompt, seq_group = create_dummy_prompt(
"1", prompt_length=(num_gpu_blocks - 1) * block_size - 1)
prompt.status = SequenceStatus.WAITING
block_manager.allocate(seq_group)
prompt.status = SequenceStatus.RUNNING
# Swap seq group from GPU -> CPU.
gpu_blocks = block_manager.get_block_table(prompt)
assert block_manager.can_swap_out(seq_group)
before_cpu_blocks = block_manager.get_num_free_cpu_blocks()
before_gpu_blocks = block_manager.get_num_free_gpu_blocks()
mapping = block_manager.swap_out(seq_group)
mapping_keys = [key for key, _ in mapping]
assert mapping_keys == gpu_blocks
after_cpu_blocks = block_manager.get_num_free_cpu_blocks()
after_gpu_blocks = block_manager.get_num_free_gpu_blocks()
assert before_cpu_blocks == after_cpu_blocks + len(gpu_blocks)
assert before_gpu_blocks + len(gpu_blocks) == after_gpu_blocks
prompt.status = SequenceStatus.SWAPPED
# At this moment, we still have enough free blocks to swap in the seq group.
if num_lookahead_slots <= block_size:
assert block_manager.can_swap_in(seq_group,
num_lookahead_slots) == AllocStatus.OK
else:
assert block_manager.can_swap_in(
seq_group, num_lookahead_slots) == AllocStatus.NEVER
# During Swapped out, 2 cached blocks were evicted from the GPU,
# so the prompt1 can't be swapped in
prompt2_len = 2 * block_size - 1
prompt2, seq_group2 = create_dummy_prompt(
"2",
prompt_length=prompt2_len,
prompt_tokens=[10000 + i for i in range(prompt2_len)])
prompt2.status = SequenceStatus.WAITING
block_manager.allocate(seq_group2)
# Swap seq group from CPU -> GPU.
if num_lookahead_slots <= block_size:
assert block_manager.can_swap_in(
seq_group, num_lookahead_slots) == AllocStatus.LATER
else:
assert block_manager.can_swap_in(
seq_group, num_lookahead_slots) == AllocStatus.NEVER
@pytest.mark.parametrize("num_lookahead_slots", [0, 2, 10])
@pytest.mark.parametrize("enable_caching", [False, True])
def test_swap_in_infeasible(num_lookahead_slots, enable_caching):
"""Verifies that swapping fails if there is not enough free blocks
to account for unseen tokens and lookahead_slots.
"""
block_size = 8
num_cpu_blocks = 1
num_gpu_blocks = 1
block_manager = SelfAttnBlockSpaceManager(block_size,
num_cpu_blocks,
num_gpu_blocks,
watermark=0,
enable_caching=enable_caching)
prompt_length = block_size - 3
assert prompt_length > 0
prompt, seq_group = create_dummy_prompt("1", prompt_length=prompt_length)
prompt.status = SequenceStatus.WAITING
block_manager.allocate(seq_group)
# Emulate a forward pass by appending a single token.
# The block manager then knows how many unprocessed
# tokens will be written in the next forward pass.
token_id = 0
prompt.status = SequenceStatus.RUNNING
prompt.append_token_id(token_id, {token_id: Logprob(0.0)})
# Swap seq group from GPU -> CPU.
assert block_manager.can_swap_out(seq_group)
block_manager.swap_out(seq_group)
prompt.status = SequenceStatus.SWAPPED
# Swap seq group from CPU -> GPU.
# The number of unseen tokens is 1. If the number of existing
# tokens plus the unseen ones and number of lookahead slots exceeds
# the total number of available GPU blocks then the swap
# should fail.
num_unseen_tokens = 1
if (num_lookahead_slots + num_unseen_tokens +
prompt_length) <= (block_size * num_gpu_blocks):
assert block_manager.can_swap_in(seq_group,
num_lookahead_slots) == AllocStatus.OK
else:
assert block_manager.can_swap_in(
seq_group, num_lookahead_slots) == AllocStatus.NEVER
# TODO(cade/kaiyang): add comprehensive tests for swapping at allocator level.
@pytest.mark.parametrize("block_size", [8, 16])
@pytest.mark.parametrize("prompt_len", [10, 300, 1000])
@pytest.mark.parametrize("num_slots_to_append", [50])
@pytest.mark.parametrize("sliding_window", [20, 32, 200, 512])
def test_sliding_window(block_size, prompt_len, num_slots_to_append,
sliding_window):
"""Verify append_slots consumes the correct number of blocks from the block
table.
"""
num_gpu_blocks = 1024
watermark = 0.1
block_manager = SelfAttnBlockSpaceManager(
block_size=block_size,
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=0,
watermark=watermark,
sliding_window=sliding_window,
)
def check_used(min_n, max_n=None):
if max_n is None:
max_n = min_n
used = num_gpu_blocks - block_manager.get_num_free_gpu_blocks()
assert min_n <= used
assert used <= max_n
def num_blocks(num_tokens):
return (num_tokens + block_size - 1) // block_size
check_used(0)
seq_group = create_seq_group(
seq_prompt_len=prompt_len,
seq_output_lens=[0],
)
check_used(0)
# Allocate seq
assert block_manager.can_allocate(seq_group)
block_manager.allocate(seq_group)
check_used(num_blocks(prompt_len))
# Seq seq to RUNNING
seq = seq_group.get_seqs()[0]
seq.status = SequenceStatus.RUNNING
seq.data.update_num_computed_tokens(prompt_len)
check_used(num_blocks(prompt_len))
# this is how we compute it in SelfAttnBlockSpaceManager.__init__
sliding_blocks = (sliding_window // block_size) + 2
# plus one block for null block
sliding_blocks += 1
# Append tokens to the sequeqnce
for token_id in range(num_slots_to_append):
seq.append_token_id(token_id, {token_id: Logprob(0.0)})
seq.data.update_num_computed_tokens(1)
block_manager.append_slots(seq, num_lookahead_slots=0)
if prompt_len < sliding_window + 10:
check_used(0, sliding_blocks + 1)
else:
check_used(sliding_blocks, sliding_blocks + 1)