# SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project import time from collections import deque from typing import Optional from unittest.mock import MagicMock import pytest # noqa import torch from torch import Use # noqa from vllm.config import CacheConfig, SchedulerConfig from vllm.config.lora import LoRAConfig from vllm.core.interfaces import AllocStatus from vllm.core.scheduler import Scheduler, SchedulingBudget from vllm.lora.request import LoRARequest from vllm.sequence import SequenceGroup, SequenceStatus from .utils import (append_new_token, append_new_token_seq, append_new_token_seq_group, create_dummy_prompt, get_sequence_groups, schedule_and_update_computed_tokens) def test_scheduler_add_seq_group(): block_size = 4 scheduler_config = SchedulerConfig( "generate", max_num_batched_tokens=100, max_num_seqs=64, max_model_len=1, ) cache_config = CacheConfig(block_size, 1.0, 1, cache_dtype="auto") cache_config.num_cpu_blocks = 4 cache_config.num_gpu_blocks = 4 scheduler = Scheduler(scheduler_config, cache_config, None) # Add seq group to scheduler. num_seq_group = 4 for i in range(num_seq_group): _, seq_group = create_dummy_prompt(str(i), block_size, block_size=block_size) scheduler.add_seq_group(seq_group) assert scheduler.get_num_unfinished_seq_groups() == i + 1 def test_scheduler_abort_seq_group(): block_size = 4 scheduler_config = SchedulerConfig( "generate", max_num_batched_tokens=100, max_num_seqs=64, max_model_len=1, ) cache_config = CacheConfig(block_size, 1.0, 1, "auto") cache_config.num_cpu_blocks = 4 cache_config.num_gpu_blocks = 4 scheduler = Scheduler(scheduler_config, cache_config, None) # Add multiple seq groups to scheduler. num_seq_group = 4 request_ids: set[str] = set() for i in range(num_seq_group): _, seq_group = create_dummy_prompt(str(i), block_size) scheduler.add_seq_group(seq_group) request_ids.add(str(i)) # Abort all added seq groups. assert scheduler.get_num_unfinished_seq_groups() == num_seq_group scheduler.abort_seq_group(request_ids) assert scheduler.get_num_unfinished_seq_groups() == 0 def test_scheduler_schedule_simple(): block_size = 4 num_seq_group = 4 max_model_len = 16 scheduler_config = SchedulerConfig( "generate", max_num_batched_tokens=64, max_num_seqs=num_seq_group, max_model_len=max_model_len, ) cache_config = CacheConfig(block_size, 1.0, 1, "auto") cache_config.num_cpu_blocks = 8 cache_config.num_gpu_blocks = 8 scheduler = Scheduler(scheduler_config, cache_config, None) running: list[SequenceGroup] = [] # Add seq groups to scheduler. for i in range(num_seq_group): _, seq_group = create_dummy_prompt(str(i), prompt_length=block_size, block_size=block_size) scheduler.add_seq_group(seq_group) running.append(seq_group) # Schedule seq groups prompts. num_tokens = block_size * num_seq_group seq_group_meta, out = schedule_and_update_computed_tokens(scheduler) assert set(get_sequence_groups(out)) == set(running) assert out.num_batched_tokens == num_tokens assert (not out.blocks_to_copy and not out.blocks_to_swap_in and not out.blocks_to_swap_out) assert len(seq_group_meta) == num_seq_group append_new_token(out, 1) # Schedule seq groups generation. seq_group_meta, out = schedule_and_update_computed_tokens(scheduler) assert set(get_sequence_groups(out)) == set(running) assert out.num_batched_tokens == num_seq_group assert (not out.blocks_to_copy and not out.blocks_to_swap_in and not out.blocks_to_swap_out) assert len(seq_group_meta) == num_seq_group append_new_token(out, 1) def test_scheduler_prefill_prioritized(): """Verify running batched tokens are not applied to prefill requests.""" block_size = 4 max_model_len = 30 max_batched_num_tokens = 30 scheduler_config = SchedulerConfig( "generate", max_num_batched_tokens=max_batched_num_tokens, max_num_seqs=2, max_model_len=max_model_len, ) cache_config = CacheConfig(block_size, 1.0, 1, "auto") cache_config.num_cpu_blocks = 16 cache_config.num_gpu_blocks = 16 scheduler = Scheduler(scheduler_config, cache_config, None) # Add seq groups to scheduler. _, seq_group_a = create_dummy_prompt("1", 1, block_size=block_size) scheduler.add_seq_group(seq_group_a) # Schedule seq groups prompts. _, out = schedule_and_update_computed_tokens(scheduler) assert get_sequence_groups(out) == [seq_group_a] # Add a new prefill request B. _, seq_group_b = create_dummy_prompt("2", 30, block_size=block_size) scheduler.add_seq_group(seq_group_b) # Verify prefill requests are prioritized. Since max_batched_num_tokens # is 1, new prefill request has to be scheduled first. _, out = schedule_and_update_computed_tokens(scheduler) assert get_sequence_groups(out) == [seq_group_b] def test_scheduler_schedule_preempt_abort(): block_size = 4 max_model_len = 16 scheduler_config = SchedulerConfig( "generate", max_num_batched_tokens=64, max_num_seqs=2, max_model_len=max_model_len, ) cache_config = CacheConfig(block_size, 1.0, 1, "auto") cache_config.num_cpu_blocks = 2 cache_config.num_gpu_blocks = 2 scheduler = Scheduler(scheduler_config, cache_config, None) # Add seq groups to scheduler. seq_a, seq_group_a = create_dummy_prompt("1", block_size, block_size=block_size) seq_b, seq_group_b = create_dummy_prompt("2", block_size, block_size=block_size) scheduler.add_seq_group(seq_group_a) scheduler.add_seq_group(seq_group_b) # Schedule seq groups prompts. seq_group_meta, out = schedule_and_update_computed_tokens(scheduler) assert get_sequence_groups(out) == [seq_group_a, seq_group_b] assert out.num_batched_tokens == block_size * 2 # seq_a and seq_b assert (not out.blocks_to_copy and not out.blocks_to_swap_in and not out.blocks_to_swap_out) assert len(seq_group_meta) == 2 assert scheduler.get_num_unfinished_seq_groups() == 2 # Append "generated" tokens, allowing the sequence to mark prompt tokens as # processed. append_new_token(out, 1) # Schedule seq groups generation and preempt seq group b. seq_group_meta, out = schedule_and_update_computed_tokens(scheduler) assert get_sequence_groups(out) == [seq_group_a] assert out.num_batched_tokens == 1 assert (not out.blocks_to_copy and not out.blocks_to_swap_in and not out.blocks_to_swap_out) assert len(seq_group_meta) == 1 assert scheduler.get_num_unfinished_seq_groups() == 2 assert out.preempted == 1 # Abort seq group a. Re-schedule seq group b prompt with recomputation. scheduler.abort_seq_group("1") seq_group_meta, out = schedule_and_update_computed_tokens(scheduler) assert get_sequence_groups(out) == [seq_group_b] assert out.num_batched_tokens == 5 # 4 prompt + 1 generation. assert (not out.blocks_to_copy and not out.blocks_to_swap_in and not out.blocks_to_swap_out) assert len(seq_group_meta) == 1 assert scheduler.get_num_unfinished_seq_groups() == 1 def test_scheduler_max_seqs(): block_size = 4 num_seq_group = 4 max_seq_group = 2 max_model_len = 16 scheduler_config = SchedulerConfig( "generate", max_num_batched_tokens=64, max_num_seqs=max_seq_group, max_model_len=max_model_len, ) cache_config = CacheConfig(block_size, 1.0, 1, "auto") cache_config.num_cpu_blocks = 8 cache_config.num_gpu_blocks = 8 scheduler = Scheduler(scheduler_config, cache_config, None) all_seq_groups: list[SequenceGroup] = [] # Add seq groups to scheduler. for i in range(num_seq_group): _, seq_group = create_dummy_prompt(str(i), prompt_length=block_size, block_size=block_size) all_seq_groups.append(seq_group) # Append 1 seq group scheduler.add_seq_group(all_seq_groups[0]) # Schedule seq groups prompts. seq_group_meta, out = schedule_and_update_computed_tokens(scheduler) assert set(get_sequence_groups(out)) == set([all_seq_groups[0]]) append_new_token(out, 1) # Schedule seq groups generation. seq_group_meta, out = schedule_and_update_computed_tokens(scheduler) assert set(get_sequence_groups(out)) == set([all_seq_groups[0]]) append_new_token(out, 1) # Append 2 more seq group scheduler.add_seq_group(all_seq_groups[1]) scheduler.add_seq_group(all_seq_groups[2]) # Schedule seq groups prompts. # Only 1 seq group should be scheduled since max_seq_group is 2 # and one is prompting. _, out = schedule_and_update_computed_tokens(scheduler) assert set(get_sequence_groups(out)) == set([all_seq_groups[1]]) def test_scheduler_delay_factor(): block_size = 4 scheduler_config = SchedulerConfig( "generate", max_num_batched_tokens=100, max_num_seqs=64, max_model_len=16, delay_factor=0.5, ) cache_config = CacheConfig(block_size, 1.0, 1, "auto") cache_config.num_cpu_blocks = 8 cache_config.num_gpu_blocks = 8 scheduler = Scheduler(scheduler_config, cache_config, None) # schedule first prompt seq_group_meta, seq_group = create_dummy_prompt("0", prompt_length=block_size, block_size=block_size) scheduler.add_seq_group(seq_group) seq_group_meta, out = schedule_and_update_computed_tokens(scheduler) assert out.num_prefill_groups > 0 assert seq_group_meta[0].request_id == '0' append_new_token(out, 1) # wait for a second before scheduling next prompt time.sleep(1) seq_group_meta, seq_group = create_dummy_prompt("1", prompt_length=block_size, block_size=block_size) scheduler.add_seq_group(seq_group) # second prompt should *not* be scheduled seq_group_meta, out = schedule_and_update_computed_tokens(scheduler) assert out.num_prefill_groups == 0 assert seq_group_meta[0].request_id == '0' append_new_token(out, 1) # wait for more than 0.5 second and try again time.sleep(0.6) seq_group_meta, out = schedule_and_update_computed_tokens(scheduler) assert out.num_prefill_groups > 0 assert seq_group_meta[0].request_id == '1' append_new_token(out, 1) def initialize_scheduler( *, max_num_seqs=1000, max_token_budget=1000, max_model_len=1000, lora_config=None, block_size=4, num_cpu_blocks=8, num_gpu_blocks=8, enable_prefix_caching=False, enable_chunked_prefill=False, ): block_size = block_size scheduler_config = SchedulerConfig( "generate", max_num_batched_tokens=max_token_budget, max_num_seqs=max_num_seqs, max_model_len=max_model_len, enable_chunked_prefill=enable_chunked_prefill, ) cache_config = CacheConfig( block_size, 1.0, 1, "auto", enable_prefix_caching=enable_prefix_caching, ) cache_config.num_cpu_blocks = num_cpu_blocks cache_config.num_gpu_blocks = num_gpu_blocks scheduler = Scheduler(scheduler_config, cache_config, lora_config) return scheduler def create_token_budget(token_budget: int = 10000, max_num_seqs: int = 10000) -> SchedulingBudget: return SchedulingBudget( token_budget=token_budget, max_num_seqs=max_num_seqs, ) def add_token_budget(budget: SchedulingBudget, num_batched_tokens: int = 0, num_curr_seqs: int = 0): mock_seq_group = create_dummy_prompt('10', prompt_length=60)[1] budget.add_num_batched_tokens(mock_seq_group.request_id, num_batched_tokens) budget.add_num_seqs(mock_seq_group.request_id, num_curr_seqs) def test_prefill_schedule_max_prompt_len(): """ Test prompt longer than max_prompt_len is aborted. """ block_size = 4 scheduler = initialize_scheduler(max_model_len=30, block_size=block_size) _, seq_group = create_dummy_prompt("0", prompt_length=60, block_size=block_size) scheduler.add_seq_group(seq_group) budget = create_token_budget() output = scheduler._schedule_prefills(budget, None) remaining_waiting = scheduler.waiting assert len(output.ignored_seq_groups) == 1 assert len(output.seq_groups) == 0 assert budget.num_batched_tokens == 0 assert budget.num_curr_seqs == 0 assert len(remaining_waiting) == 0 def test_prefill_schedule_token_budget(): """ Test token budget respected. """ block_size = 4 scheduler = initialize_scheduler(block_size=block_size, num_cpu_blocks=64, num_gpu_blocks=64) budget = create_token_budget(token_budget=0) for i in range(2): _, seq_group = create_dummy_prompt(str(i), prompt_length=60, block_size=block_size) scheduler.add_seq_group(seq_group) # 0 token budget == nothing is scheduled. output = scheduler._schedule_prefills(budget, None) remaining_waiting = scheduler.waiting assert len(output.ignored_seq_groups) == 0 assert len(output.seq_groups) == 0 assert budget.num_batched_tokens == 0 assert budget.num_curr_seqs == 0 assert len(remaining_waiting) == 2 # 60 token budget == 1 request scheduled. budget = create_token_budget(token_budget=60) output = scheduler._schedule_prefills(budget, None) remaining_waiting = scheduler.waiting assert len(output.ignored_seq_groups) == 0 assert len(output.seq_groups) == 1 assert budget.num_batched_tokens == 60 assert budget.num_curr_seqs == 1 assert len(remaining_waiting) == 1 # Test when current_batched_tokens respected. scheduler = initialize_scheduler(block_size=block_size, num_cpu_blocks=16, num_gpu_blocks=16) budget = create_token_budget(token_budget=60) add_token_budget(budget, 30, 0) _, seq_group = create_dummy_prompt(str(i), prompt_length=60, block_size=block_size) # Cannot schedule a prompt that doesn't fit the budget. scheduler.add_seq_group(seq_group) output = scheduler._schedule_prefills(budget, None) remaining_waiting = scheduler.waiting assert len(output.ignored_seq_groups) == 0 assert len(output.seq_groups) == 0 assert budget.num_batched_tokens == 30 assert budget.num_curr_seqs == 0 assert len(remaining_waiting) == 1 budget = create_token_budget(token_budget=90) add_token_budget(budget, 30, 0) output = scheduler._schedule_prefills(budget, None) remaining_waiting = scheduler.waiting assert len(output.seq_groups) == 1 assert budget.num_batched_tokens == 90 assert budget.num_curr_seqs == 1 assert len(remaining_waiting) == 0 def test_prefill_schedule_max_seqs(): """ Test max seq respected. """ block_size = 4 scheduler = initialize_scheduler(block_size=block_size, num_cpu_blocks=64, num_gpu_blocks=64) budget = create_token_budget(max_num_seqs=2) for i in range(3): _, seq_group = create_dummy_prompt(str(i), prompt_length=60, block_size=block_size) scheduler.add_seq_group(seq_group) output = scheduler._schedule_prefills(budget, None) remaining_waiting = scheduler.waiting assert len(output.ignored_seq_groups) == 0 assert len(output.seq_groups) == 2 assert budget.num_batched_tokens == 120 assert budget.num_curr_seqs == 2 assert len(remaining_waiting) == 1 # Verify curr_num_seqs respected. scheduler.waiting = deque() budget = create_token_budget(max_num_seqs=2) add_token_budget(budget, 0, 2) _, seq_group = create_dummy_prompt(str(i), prompt_length=60, block_size=block_size) scheduler.add_seq_group(seq_group) output = scheduler._schedule_prefills(budget, None) remaining_waiting = scheduler.waiting assert len(output.ignored_seq_groups) == 0 assert len(output.seq_groups) == 0 assert budget.num_batched_tokens == 0 assert budget.num_curr_seqs == 2 assert len(remaining_waiting) == 1 def test_prefill_schedule_max_lora(): """ Test max lora is respected and prioritized. """ block_size = 4 lora_config = LoRAConfig(max_lora_rank=8, max_loras=1) scheduler = initialize_scheduler(lora_config=lora_config, block_size=block_size, num_cpu_blocks=64, num_gpu_blocks=64) budget = create_token_budget(token_budget=120) curr_loras: set[int] = set() for i in range(2): _, seq_group = create_dummy_prompt(str(i), prompt_length=60, block_size=block_size, lora_request=LoRARequest( lora_name=str(i), lora_int_id=i + 1, lora_path="abc")) scheduler.add_seq_group(seq_group) # Add two more requests to verify lora is prioritized. # 0: LoRA, 1: LoRA, 2: regular, 3: regular # In the first iteration, index 0, 2 is scheduled. # If a request is not scheduled because it hits max lora, it is # prioritized. Verify that. for i in range(2, 4): _, seq_group = create_dummy_prompt(str(i), prompt_length=60, block_size=block_size) scheduler.add_seq_group(seq_group) # Schedule 2 requests (0 and 2) output = scheduler._schedule_prefills(budget, curr_loras) remaining_waiting = scheduler.waiting assert len(output.ignored_seq_groups) == 0 assert len(output.seq_groups) == 2 assert budget.num_batched_tokens == 120 assert budget.num_curr_seqs == 2 assert len(remaining_waiting) == 2 assert len(curr_loras) == 1 # The second lora request is scheduled next as FCFS policy. # Reset curr_loras so that it can be scheduled. curr_loras = set() budget = create_token_budget(token_budget=60) output = scheduler._schedule_prefills(budget, curr_loras) remaining_waiting = scheduler.waiting assert len(output.seq_groups) == 1 assert output.seq_groups[0].seq_group.request_id == "1" assert len(remaining_waiting) == 1 assert len(curr_loras) == 1 assert budget.num_batched_tokens == 60 def test_prefill_schedule_no_block_manager_capacity(): """ Test sequence cannot be scheduled due to block manager has no capacity. """ block_size = 4 scheduler = initialize_scheduler(block_size=block_size, num_gpu_blocks=128, num_cpu_blocks=128) budget = create_token_budget() for i in range(3): _, seq_group = create_dummy_prompt(str(i), prompt_length=60, block_size=block_size) scheduler.add_seq_group(seq_group) scheduler.block_manager.can_allocate = MagicMock() scheduler.block_manager.can_allocate.return_value = AllocStatus.LATER output = scheduler._schedule_prefills(budget, None) remaining_waiting = scheduler.waiting assert len(output.ignored_seq_groups) == 0 assert len(output.seq_groups) == 0 assert budget.num_batched_tokens == 0 assert budget.num_curr_seqs == 0 assert len(remaining_waiting) == 3 scheduler = initialize_scheduler() budget = create_token_budget() for i in range(3): _, seq_group = create_dummy_prompt(str(i), prompt_length=60, block_size=block_size) scheduler.add_seq_group(seq_group) scheduler.block_manager.can_allocate = MagicMock() scheduler.block_manager.can_allocate.return_value = AllocStatus.NEVER output = scheduler._schedule_prefills(budget, None) remaining_waiting = scheduler.waiting assert len(output.ignored_seq_groups) == 3 assert len(output.seq_groups) == 0 assert budget.num_batched_tokens == 0 assert budget.num_curr_seqs == 0 assert len(remaining_waiting) == 0 def test_decode_schedule_preempted(): """ Test decodes cannot be scheduled and preempted. """ block_size = 4 scheduler = initialize_scheduler(block_size=block_size, num_cpu_blocks=64, num_gpu_blocks=64) curr_loras = None for i in range(3): _, seq_group = create_dummy_prompt(str(i), prompt_length=60, block_size=block_size) scheduler._allocate_and_set_running(seq_group) append_new_token_seq_group(60, seq_group, 1) scheduler._add_seq_group_to_running(seq_group) scheduler.block_manager.can_append_slots = MagicMock() def cannot_append_second_group(seq_group, num_lookahead_slots): return seq_group.request_id != "1" scheduler.block_manager.can_append_slots.side_effect = ( cannot_append_second_group) # 1 cannot be scheduled, and the lowest priority (request 2) # should be preempted. 1 will also be preempted. budget = create_token_budget() output = scheduler._schedule_running(budget, curr_loras) remaining_running = scheduler.running assert len(remaining_running) == 0 assert len(output.decode_seq_groups) == 1 assert len(output.prefill_seq_groups) == 0 assert output.decode_seq_groups[0].seq_group.request_id == "0" assert len(output.preempted) == 2 # Verify budgets are updated. assert budget.num_batched_tokens == 1 # NOTE: When enable_chunk is False, num_seqs budget is not updated. # assert budget.num_curr_seqs == 1 # Both should be preempted, not swapped. assert output.blocks_to_swap_out == [] # Nothing is copied. assert output.blocks_to_copy == [] def test_schedule_decode_blocks_to_copy_update(): """ Verify blocks_to_copy is updated. """ block_size = 4 scheduler = initialize_scheduler(block_size=4, num_cpu_blocks=16, num_gpu_blocks=16) _, seq_group = create_dummy_prompt("1", prompt_length=60, block_size=block_size) curr_loras = None scheduler._allocate_and_set_running(seq_group) append_new_token_seq_group(60, seq_group, 1) scheduler._add_seq_group_to_running(seq_group) # The last request should be swapped out. scheduler.block_manager.append_slots = MagicMock() scheduler.block_manager.append_slots.return_value = [(2, 3)] budget = create_token_budget() output = scheduler._schedule_running(budget, curr_loras) remaining_running = scheduler.running assert len(remaining_running) == 0 assert len(output.decode_seq_groups) == 1 assert len(output.prefill_seq_groups) == 0 assert len(output.preempted) == 0 assert len(output.swapped_out) == 0 # Nothing is preempted. assert output.blocks_to_swap_out == [] # Since append_slot returns the source -> dist mapping, it should # be applied. assert output.blocks_to_copy == [(2, 3)] def test_schedule_swapped_max_loras(): block_size = 4 lora_config = LoRAConfig(max_lora_rank=8, max_loras=1) scheduler = initialize_scheduler(lora_config=lora_config, block_size=block_size, num_cpu_blocks=32, num_gpu_blocks=32) curr_loras: set[int] = set() blocks_to_swap_out: list[tuple[int, int]] = [] for i in range(2): _, seq_group = create_dummy_prompt(str(i), prompt_length=60, block_size=block_size, lora_request=LoRARequest( lora_name=str(i), lora_int_id=i + 1, lora_path="abc")) scheduler._allocate_and_set_running(seq_group) append_new_token_seq_group(60, seq_group, 1) scheduler._swap_out(seq_group, blocks_to_swap_out) scheduler._add_seq_group_to_swapped(seq_group) budget = create_token_budget() output = scheduler._schedule_swapped(budget, curr_loras) remaining_swapped = scheduler.swapped assert len(remaining_swapped) == 1 assert budget.num_batched_tokens == 1 assert budget.num_curr_seqs == 1 assert len(output.decode_seq_groups) == 1 assert len(output.prefill_seq_groups) == 0 assert len(curr_loras) == 1 def test_schedule_swapped_cannot_swap_in(): block_size = 4 scheduler = initialize_scheduler(block_size=block_size, num_cpu_blocks=32, num_gpu_blocks=32) curr_loras = None blocks_to_swap_out: list[tuple[int, int]] = [] for i in range(2): _, seq_group = create_dummy_prompt(str(i), prompt_length=60, block_size=block_size) scheduler._allocate_and_set_running(seq_group) append_new_token_seq_group(60, seq_group, 1) scheduler._swap_out(seq_group, blocks_to_swap_out) scheduler._add_seq_group_to_swapped(seq_group) # The last request should be swapped out. scheduler.block_manager.can_swap_in = MagicMock() scheduler.block_manager.can_swap_in.return_value = AllocStatus.LATER # Since we cannot swap in, none of the requests are swapped in. budget = create_token_budget() output = scheduler._schedule_swapped(budget, curr_loras) remaining_swapped = scheduler.swapped assert len(remaining_swapped) == 2 assert budget.num_batched_tokens == 0 assert budget.num_curr_seqs == 0 assert len(output.decode_seq_groups) == 0 assert len(output.prefill_seq_groups) == 0 def test_infeasible_swap(): block_size = 4 scheduler = initialize_scheduler(block_size=block_size, num_cpu_blocks=32, num_gpu_blocks=32) curr_loras = None blocks_to_swap_out: list[tuple[int, int]] = [] for i in range(2): _, seq_group = create_dummy_prompt(str(i), prompt_length=60, block_size=block_size) scheduler._allocate_and_set_running(seq_group) append_new_token_seq_group(60, seq_group, 1) scheduler._swap_out(seq_group, blocks_to_swap_out) scheduler._add_seq_group_to_swapped(seq_group) # The last request should be swapped out. scheduler.block_manager.can_swap_in = MagicMock() scheduler.block_manager.can_swap_in.return_value = AllocStatus.NEVER # Since we cannot swap in, none of the requests are swapped in. budget = create_token_budget() output = scheduler._schedule_swapped(budget, curr_loras) remaining_swapped = scheduler.swapped assert len(remaining_swapped) == 0 assert len(output.infeasible_seq_groups) == 2 assert budget.num_batched_tokens == 0 assert budget.num_curr_seqs == 0 assert len(output.decode_seq_groups) == 0 assert len(output.prefill_seq_groups) == 0 def test_schedule_swapped_blocks_to_copy(): block_size = 4 scheduler = initialize_scheduler(block_size=block_size, num_cpu_blocks=32, num_gpu_blocks=32) curr_loras = None _, seq_group = create_dummy_prompt("1", prompt_length=60, block_size=block_size) scheduler._allocate_and_set_running(seq_group) append_new_token_seq_group(60, seq_group, 1) blocks_to_swap_out: list[tuple[int, int]] = [] scheduler._swap_out(seq_group, blocks_to_swap_out) scheduler._add_seq_group_to_swapped(seq_group) # The last request should be swapped out. scheduler.block_manager.append_slots = MagicMock() scheduler.block_manager.append_slots.return_value = [(2, 3)] budget = create_token_budget() output = scheduler._schedule_swapped(budget, curr_loras) remaining_swapped = scheduler.swapped assert len(remaining_swapped) == 0 assert len(output.decode_seq_groups) == 1 assert len(output.prefill_seq_groups) == 0 assert output.blocks_to_copy == [(2, 3)] def test_scheduling_budget(): TOKEN_BUDGET = 4 MAX_SEQS = 4 budget = SchedulingBudget(token_budget=TOKEN_BUDGET, max_num_seqs=MAX_SEQS) assert budget.can_schedule(num_new_tokens=1, num_new_seqs=1) assert budget.can_schedule(num_new_tokens=4, num_new_seqs=4) assert not budget.can_schedule(num_new_tokens=1, num_new_seqs=5) assert not budget.can_schedule(num_new_tokens=5, num_new_seqs=1) assert not budget.can_schedule(num_new_tokens=5, num_new_seqs=5) assert budget.remaining_token_budget() == TOKEN_BUDGET # Verify add/subtract num batched tokens. _, seq_group = create_dummy_prompt("1", 3) budget.add_num_batched_tokens(seq_group.request_id, 2) assert budget.remaining_token_budget() == 2 assert budget.num_batched_tokens == 2 assert budget.can_schedule(num_new_tokens=2, num_new_seqs=1) assert not budget.can_schedule(num_new_tokens=3, num_new_seqs=1) # Verify adding another seq group is no-op. budget.add_num_batched_tokens(seq_group.request_id, 2) assert budget.remaining_token_budget() == 2 assert budget.num_batched_tokens == 2 budget.subtract_num_batched_tokens(seq_group.request_id, 2) assert budget.remaining_token_budget() == 4 assert budget.num_batched_tokens == 0 budget.subtract_num_batched_tokens(seq_group.request_id, 2) assert budget.remaining_token_budget() == 4 assert budget.num_batched_tokens == 0 # Verify add/subtract max seqs. _, seq_group = create_dummy_prompt("1", 3) budget.add_num_seqs(seq_group.request_id, 2) assert budget.can_schedule(num_new_tokens=1, num_new_seqs=2) assert not budget.can_schedule(num_new_tokens=1, num_new_seqs=3) assert budget.num_curr_seqs == 2 # Verify adding another seq group is no-op. budget.add_num_seqs(seq_group.request_id, 2) assert budget.num_curr_seqs == 2 budget.subtract_num_seqs(seq_group.request_id, 2) assert budget.num_curr_seqs == 0 budget.subtract_num_seqs(seq_group.request_id, 2) assert budget.num_curr_seqs == 0 @pytest.mark.parametrize("enable_prefix_caching", [True, False]) def test_prefix_caching_aware_prefills(enable_prefix_caching): """ Test the below scenario: For 3 sequences, seqA, seqB, seqC, share the first block as prefix. The test verifies the below scenarios: 1. SeqA is first scheduled. 2. SeqB and SeqC can be prefilled together in a single schedule round even though there are not enough token budgets to prefill both without considering prefix caching. """ block_size = 4 max_num_batched_tokens = 12 max_seq_group = 3 scheduler = initialize_scheduler( block_size=block_size, num_cpu_blocks=16, num_gpu_blocks=16, max_token_budget=max_num_batched_tokens, max_num_seqs=max_seq_group, max_model_len=max_num_batched_tokens, enable_prefix_caching=enable_prefix_caching, ) seqA_tokens = list(range(8)) num_shared_tokens = 4 seqB_tokens = seqA_tokens[:num_shared_tokens] + list(range( 12, 16)) # Shared prefix first 4. seqC_tokens = seqA_tokens[:num_shared_tokens] + list(range( 16, 20)) # Shared prefix first 4. seqA, seqA_group = create_dummy_prompt("0", prompt_tokens=seqA_tokens, block_size=block_size) seqB, seqB_group = create_dummy_prompt("1", prompt_tokens=seqB_tokens, block_size=block_size) seqC, seqC_group = create_dummy_prompt("2", prompt_tokens=seqC_tokens, block_size=block_size) # Schedule seqA prefill. scheduler.add_seq_group(seqA_group) metas, out, _ = scheduler.schedule() assert (len(out.scheduled_seq_groups) == 1 and out.scheduled_seq_groups[0].seq_group == seqA_group) assert out.scheduled_seq_groups[0].token_chunk_size == len(seqA_tokens) # Schedule seqA decode. append_new_token_seq_group(len(seqA_tokens), seqA_group, 999) metas, out, _ = scheduler.schedule() assert len(out.scheduled_seq_groups) == 1 assert out.scheduled_seq_groups[0].seq_group == seqA_group assert out.scheduled_seq_groups[0].token_chunk_size == 1 # Schedule seqB and seqC prefills should work with prefix caching. scheduler.add_seq_group(seqB_group) scheduler.add_seq_group(seqC_group) metas, out, _ = scheduler.schedule() if enable_prefix_caching: assert len(out.scheduled_seq_groups) == 2 assert set([ out.scheduled_seq_groups[0].seq_group, out.scheduled_seq_groups[1].seq_group, ]) == set([seqB_group, seqC_group]) assert len(metas) == 2 for meta in metas: assert meta.token_chunk_size == 8 assert (len(meta.computed_block_nums) == num_shared_tokens // block_size) # 1 Block for the 8 tokens. else: assert len(out.scheduled_seq_groups) == 1 assert len(metas) == 1 assert metas[0].token_chunk_size == 8 assert len(metas[0].computed_block_nums) == 0 # No blocks computed. def test_no_multiple_partial_prefills_with_chunked_prefill_and_prefix_caching( ): """ This test verifies that we don't schedule new prefills if there's already a continuous prefill in progress even though the new prefills with shared prefix can fit in the token budget: - SeqA is being chunked prefill. - SeqB with the same prompt shouldn't be scheduled for prefill even though there's enough token budget to prefill the cached tokens. - Neither should seqC be scheduled. - When seqA is in decoding phase, seqB and seqC can be scheduled. - Entire seqB should be prefilled since it's a full prefix cache hit. - SeqC would be partially prefilled with the prefix shared, and the remaining unique tokens would be prefilled (rounded down to be block-size aligned). """ block_size = 2 max_num_batched_tokens = 4 max_seq_group = 3 scheduler = initialize_scheduler( block_size=block_size, num_cpu_blocks=16, num_gpu_blocks=16, max_token_budget=max_num_batched_tokens, max_num_seqs=max_seq_group, max_model_len=100, enable_prefix_caching=True, enable_chunked_prefill=True, ) seqA_tokens = list(range(8)) seqB_tokens = seqA_tokens seqC_shared_prefix_len = 4 seqC_tokens = seqA_tokens[:seqC_shared_prefix_len] + list(range(12, 20)) seqA, seqA_group = create_dummy_prompt("0", prompt_tokens=seqA_tokens, block_size=block_size) seqB, seqB_group = create_dummy_prompt("1", prompt_tokens=seqB_tokens, block_size=block_size) # Chunked prefill seqA. scheduler.add_seq_group(seqA_group) metas, out = schedule_and_update_computed_tokens(scheduler) assert len(out.scheduled_seq_groups) == 1 assert out.scheduled_seq_groups[0].seq_group == seqA_group assert out.scheduled_seq_groups[0].token_chunk_size == 4 # seqB should not be scheduled with ongoing prefills. scheduler.add_seq_group(seqB_group) metas, out = schedule_and_update_computed_tokens(scheduler) assert len(out.scheduled_seq_groups) == 1 assert out.scheduled_seq_groups[0].seq_group == seqA_group assert out.scheduled_seq_groups[0].token_chunk_size == 4 # both seqB and seqC can now be scheduled with seqA is over. # seqA is in decoding phase. append_new_token_seq(seqA, 999) seqC, seqC_group = create_dummy_prompt("2", prompt_tokens=seqC_tokens, block_size=block_size) scheduler.add_seq_group(seqC_group) metas, out = schedule_and_update_computed_tokens(scheduler) assert len(out.scheduled_seq_groups) == 3 metas = {meta.request_id: meta for meta in metas} assert metas[seqA_group.request_id].token_chunk_size == 1 # Decode assert (metas[seqB_group.request_id].token_chunk_size == 8 ) # Fully cached prefill assert ( metas[seqC_group.request_id].token_chunk_size == 6 ), "A partial prefix of C (4 tokens) should be prefilled, with the " "remaining tokens fit into 3 token budget (4-1 from the seqA). It will " "then be rounded down to 2 tokens on block size, thus 6 tokens in total." def test_no_batches_mixed_with_prompt_tokens_and_prompt_embeds(): """ Test that the scheduler does not schedule batches with prompt tokens and prompt embeddings co-mingled. """ block_size = 2 max_seq_group = 3 scheduler = initialize_scheduler( block_size=block_size, num_cpu_blocks=16, num_gpu_blocks=16, max_num_seqs=max_seq_group, max_model_len=100, enable_prefix_caching=True, ) # the odd indexed inputs should be passed in via embeddings, # evens via token_ids seq_length = 7 embedding_size = 5 num_seqs = 11 seq_tokens: list[list[int]] = [] seq_embeds: list[Optional[torch.Tensor]] = [] for i in range(num_seqs): if i % 2: seq_tokens.append(list(range(seq_length))) seq_embeds.append(None) else: seq_tokens.append([0] * seq_length) seq_embeds.append(torch.rand(embedding_size)) seq_and_seq_groups = [ create_dummy_prompt(f"{i}", prompt_tokens=seq_tokens[i], prompt_embeds=seq_embeds[i], block_size=block_size) for i in range(len(seq_tokens)) ] for _, seq_group in seq_and_seq_groups: scheduler.add_seq_group(seq_group) while not all(seq.is_finished() for seq, _ in seq_and_seq_groups): unfinished_seq_groups = [ seq_group for _, seq_group in seq_and_seq_groups if not seq_group.is_finished() ] _, out = schedule_and_update_computed_tokens(scheduler) assert len(out.scheduled_seq_groups) > 0 batch_is_prompt_embeds = out.scheduled_seq_groups[ 0].seq_group.uses_prompt_embeds() expected_scheduled_seq_groups = [ seq_group for seq_group in unfinished_seq_groups if seq_group.uses_prompt_embeds() == batch_is_prompt_embeds ] # We should have as many scheduled groups as possible, without mixing assert len(out.scheduled_seq_groups) == min( max_seq_group, len(expected_scheduled_seq_groups)) assert all(scheduled_seq_group.seq_group.uses_prompt_embeds() == batch_is_prompt_embeds for scheduled_seq_group in out.scheduled_seq_groups) # Finish the scheduled groups for scheduled_seq_group in out.scheduled_seq_groups: for seq in scheduled_seq_group.seq_group.seqs: seq.status = SequenceStatus.FINISHED_STOPPED scheduler.free_finished_seq_groups() def test_remove_seq_from_computed_blocks_tracker(): """ Test that computed_blocks_tracker correctly removes stale sequences during scheduling. The test covers 9 scheduling branches where stale seqs are removed: - 1 in _schedule_swapped - 1 in _schedule_priority_preemption - 7 in _schedule_prefill Each branch is tested to ensure proper cleanup of _seq_id_to_num_tokens_computed. """ # Budget can not schedule in swapped block_size = 2 max_seq_group = 3 seq_tokens_with_swapped: list[list[int]] = [] blocks_to_swap_out: list[tuple[int, int]] = [] curr_loras: set[int] = set() scheduler = initialize_scheduler( block_size=block_size, num_cpu_blocks=64, num_gpu_blocks=16, max_num_seqs=max_seq_group, enable_prefix_caching=True, ) budget = create_token_budget(token_budget=15) seq_length = 16 num_seqs = 3 for i in range(num_seqs): seq_tokens_with_swapped.append([i] * seq_length) seq_and_seq_groups = [ create_dummy_prompt(f"{i}", prompt_tokens=seq_tokens_with_swapped[i], block_size=block_size) for i in range(len(seq_tokens_with_swapped)) ] for _, seq_group in seq_and_seq_groups: scheduler._allocate_and_set_running(seq_group) scheduler._swap_out(seq_group, blocks_to_swap_out) scheduler._add_seq_group_to_swapped(seq_group) scheduler._schedule_swapped(budget, curr_loras) seq_id_to_num_tokens_computed = ( scheduler.block_manager._computed_blocks_tracker. _seq_id_to_num_tokens_computed.get(1)) assert seq_id_to_num_tokens_computed is None # Prefill schedule don't have a space for another LoRA, so # we ignore this request for now. block_size = 4 lora_config = LoRAConfig(max_lora_rank=8, max_loras=1) scheduler = initialize_scheduler(lora_config=lora_config, block_size=block_size, num_cpu_blocks=64, num_gpu_blocks=64, enable_prefix_caching=True) budget = create_token_budget(token_budget=120) num_seqs = 2 for i in range(num_seqs): _, seq_group = create_dummy_prompt(str(i), prompt_length=seq_length, block_size=block_size, lora_request=LoRARequest( lora_name=str(i), lora_int_id=i + 1, lora_path="abc")) scheduler.add_seq_group(seq_group) scheduler._schedule_prefills(budget, curr_loras) seq_id_to_num_tokens_computed = ( scheduler.block_manager._computed_blocks_tracker. _seq_id_to_num_tokens_computed.get(1)) assert seq_id_to_num_tokens_computed is None # Priority preemption schedule scheduler._schedule_priority_preemption(budget) seq_id_to_num_tokens_computed = ( scheduler.block_manager._computed_blocks_tracker. _seq_id_to_num_tokens_computed.get(1)) assert seq_id_to_num_tokens_computed is None # Prefill scheduler does not schedule batches with prompt tokens and # prompt embeddings co-mingled. block_size = 2 max_seq_group = 3 scheduler = initialize_scheduler( block_size=block_size, num_cpu_blocks=16, num_gpu_blocks=16, max_num_seqs=max_seq_group, max_model_len=100, enable_prefix_caching=True, ) seq_length = 7 embedding_size = 5 seq_tokens_with_embedding: list[list[int]] = [] seq_embeds: list[Optional[torch.Tensor]] = [] seq_tokens_with_embedding.append(list(range(seq_length))) seq_embeds.append(None) seq_tokens_with_embedding.append([0] * seq_length) seq_embeds.append(torch.rand(embedding_size)) seq_and_seq_groups = [ create_dummy_prompt(f"{i}", prompt_tokens=seq_tokens_with_embedding[i], prompt_embeds=seq_embeds[i], block_size=block_size) for i in range(len(seq_tokens_with_embedding)) ] for _, seq_group in seq_and_seq_groups: scheduler.add_seq_group(seq_group) scheduler._schedule_default() seq_id_to_num_tokens_computed = ( scheduler.block_manager._computed_blocks_tracker. _seq_id_to_num_tokens_computed.get(1)) assert seq_id_to_num_tokens_computed is None # Prefill scheduler budget num_batched_tokens # >= scheduler_config max_num_batched_tokens block_size = 2 max_seq_group = 3 seq_tokens_prefill_budget: list[list[int]] = [] scheduler = initialize_scheduler( block_size=block_size, max_token_budget=8, num_cpu_blocks=16, num_gpu_blocks=16, max_num_seqs=max_seq_group, max_model_len=5, enable_prefix_caching=True, ) seq_length = 4 num_seqs = 3 for i in range(num_seqs): seq_tokens_prefill_budget.append([i] * seq_length) seq_and_seq_groups = [ create_dummy_prompt(f"{i}", prompt_tokens=seq_tokens_prefill_budget[i], block_size=block_size) for i in range(len(seq_tokens_prefill_budget)) ] for _, seq_group in seq_and_seq_groups: scheduler.add_seq_group(seq_group) scheduler._schedule_default() seq_id_to_num_tokens_computed = ( scheduler.block_manager._computed_blocks_tracker. _seq_id_to_num_tokens_computed.get(2)) assert seq_id_to_num_tokens_computed is None # Budget can not schedule in waiting block_size = 2 max_seq_group = 3 scheduler = initialize_scheduler( block_size=block_size, max_token_budget=30, num_cpu_blocks=16, num_gpu_blocks=16, max_num_seqs=max_seq_group, max_model_len=30, enable_prefix_caching=True, ) seq_length = 16 num_seqs = 3 seq_tokens_prefill_budget_waiting: list[list[int]] = [] for i in range(num_seqs): seq_tokens_prefill_budget_waiting.append(list(range(seq_length))) seq_and_seq_groups = [ create_dummy_prompt(f"{i}", prompt_tokens=seq_tokens_prefill_budget_waiting[i], block_size=block_size) for i in range(len(seq_tokens_prefill_budget_waiting)) ] for _, seq_group in seq_and_seq_groups: scheduler.add_seq_group(seq_group) scheduler._schedule_default() seq_id_to_num_tokens_computed = ( scheduler.block_manager._computed_blocks_tracker. _seq_id_to_num_tokens_computed.get(1)) assert seq_id_to_num_tokens_computed is None # Sequence num_new_tokens > prompt_limit marked FINISHED_IGNORED block_size = 2 max_seq_group = 3 scheduler = initialize_scheduler( block_size=block_size, num_cpu_blocks=16, num_gpu_blocks=16, max_num_seqs=max_seq_group, max_model_len=30, enable_prefix_caching=True, ) seq_length = 31 seq_tokens_prompt_limit: list[list[int]] = [] seq_tokens_prompt_limit.append(list(range(seq_length))) seq_and_seq_groups = [ create_dummy_prompt("0", prompt_tokens=seq_tokens_prompt_limit[0], block_size=block_size) ] for _, seq_group in seq_and_seq_groups: scheduler.add_seq_group(seq_group) scheduler._schedule_default() seq_id_to_num_tokens_computed = ( scheduler.block_manager._computed_blocks_tracker. _seq_id_to_num_tokens_computed.get(0)) assert seq_id_to_num_tokens_computed is None # Budget can not allocate, AllocStatus is NEVER marked FINISHED_IGNORED block_size = 2 max_seq_group = 3 scheduler = initialize_scheduler( block_size=block_size, num_cpu_blocks=160, num_gpu_blocks=160, max_num_seqs=max_seq_group, max_model_len=320, enable_prefix_caching=True, ) seq_length = 320 num_seqs = 1 seq_tokens_never: list[list[int]] = [] for i in range(num_seqs): seq_tokens_never.append(list(range(seq_length))) seq_and_seq_groups = [ create_dummy_prompt(f"{i}", prompt_tokens=seq_tokens_never[i], block_size=block_size) for i in range(len(seq_tokens_never)) ] for _, seq_group in seq_and_seq_groups: scheduler.add_seq_group(seq_group) scheduler._schedule_default() seq_id_to_num_tokens_computed = ( scheduler.block_manager._computed_blocks_tracker. _seq_id_to_num_tokens_computed.get(0)) assert seq_id_to_num_tokens_computed is None # Budget can not allocate, AllocStatus is LATER block_size = 2 max_seq_group = 3 scheduler = initialize_scheduler( block_size=block_size, num_cpu_blocks=160, num_gpu_blocks=160, max_num_seqs=max_seq_group, max_model_len=320, enable_prefix_caching=True, ) seq_length = 160 num_seqs = 2 seq_tokens_later: list[list[int]] = [] for i in range(num_seqs): seq_tokens_later.append(list(range(seq_length))) seq_and_seq_groups = [ create_dummy_prompt(f"{i}", prompt_tokens=seq_tokens_later[i], block_size=block_size) for i in range(len(seq_tokens_later)) ] for _, seq_group in seq_and_seq_groups: scheduler.add_seq_group(seq_group) scheduler._schedule_default() seq_id_to_num_tokens_computed = ( scheduler.block_manager._computed_blocks_tracker. _seq_id_to_num_tokens_computed.get(1)) assert seq_id_to_num_tokens_computed is None