From 92ee7baaf9a5bf6c8132dde56e4056933c61f50f Mon Sep 17 00:00:00 2001 From: Kuntai Du Date: Mon, 30 Jun 2025 21:03:55 -0700 Subject: [PATCH] [Example] add one-click runnable example for P2P NCCL XpYd (#20246) Signed-off-by: KuntaiDu --- .../disagg_example_p2p_nccl_xpyd.sh | 245 ++++++++++++++++++ .../disagg_proxy_p2p_nccl_xpyd.py} | 0 2 files changed, 245 insertions(+) create mode 100644 examples/online_serving/disaggregated_serving_p2p_nccl_xpyd/disagg_example_p2p_nccl_xpyd.sh rename examples/online_serving/{disagg_xpyd/disagg_prefill_proxy_xpyd.py => disaggregated_serving_p2p_nccl_xpyd/disagg_proxy_p2p_nccl_xpyd.py} (100%) diff --git a/examples/online_serving/disaggregated_serving_p2p_nccl_xpyd/disagg_example_p2p_nccl_xpyd.sh b/examples/online_serving/disaggregated_serving_p2p_nccl_xpyd/disagg_example_p2p_nccl_xpyd.sh new file mode 100644 index 000000000000..2966f386c93a --- /dev/null +++ b/examples/online_serving/disaggregated_serving_p2p_nccl_xpyd/disagg_example_p2p_nccl_xpyd.sh @@ -0,0 +1,245 @@ +#!/bin/bash + +# ============================================================================= +# vLLM Disaggregated Serving Script - P2P NCCL XpYd Architecture +# ============================================================================= +# This script demonstrates disaggregated prefill and decode serving using +# P2P NCCL communication. The architecture supports various XpYd configurations: +# +# - 1P3D: 1 Prefill server + 3 Decode servers (current default) +# - 3P1D: 3 Prefill servers + 1 Decode server +# - etc. +# +# Configuration can be customized via environment variables: +# MODEL: Model to serve +# PREFILL_GPUS: Comma-separated GPU IDs for prefill servers +# DECODE_GPUS: Comma-separated GPU IDs for decode servers +# PREFILL_PORTS: Comma-separated ports for prefill servers +# DECODE_PORTS: Comma-separated ports for decode servers +# PROXY_PORT: Proxy server port used to setup XpYd connection. +# TIMEOUT_SECONDS: Server startup timeout +# ============================================================================= + +# Configuration - can be overridden via environment variables +MODEL=${MODEL:-meta-llama/Llama-3.1-8B-Instruct} +TIMEOUT_SECONDS=${TIMEOUT_SECONDS:-1200} +PROXY_PORT=${PROXY_PORT:-30001} + +# Default 1P3D configuration (1 Prefill + 3 Decode) +PREFILL_GPUS=${PREFILL_GPUS:-0} +DECODE_GPUS=${DECODE_GPUS:-1,2,3} +PREFILL_PORTS=${PREFILL_PORTS:-20003} +DECODE_PORTS=${DECODE_PORTS:-20005,20007,20009} + +echo "Warning: P2P NCCL disaggregated prefill XpYd support for vLLM v1 is experimental and subject to change." +echo "" +echo "Architecture Configuration:" +echo " Model: $MODEL" +echo " Prefill GPUs: $PREFILL_GPUS, Ports: $PREFILL_PORTS" +echo " Decode GPUs: $DECODE_GPUS, Ports: $DECODE_PORTS" +echo " Proxy Port: $PROXY_PORT" +echo " Timeout: ${TIMEOUT_SECONDS}s" +echo "" + +PIDS=() + +# Switch to the directory of the current script +cd "$(dirname "${BASH_SOURCE[0]}")" + +check_required_files() { + local files=("disagg_proxy_p2p_nccl_xpyd.py") + for file in "${files[@]}"; do + if [[ ! -f "$file" ]]; then + echo "Required file $file not found in $(pwd)" + exit 1 + fi + done +} + +check_hf_token() { + if [ -z "$HF_TOKEN" ]; then + echo "HF_TOKEN is not set. Please set it to your Hugging Face token." + echo "Example: export HF_TOKEN=your_token_here" + exit 1 + fi + if [[ "$HF_TOKEN" != hf_* ]]; then + echo "HF_TOKEN is not a valid Hugging Face token. Please set it to your Hugging Face token." + exit 1 + fi + echo "HF_TOKEN is set and valid." +} + +check_num_gpus() { + # Check if the number of GPUs are >=2 via nvidia-smi + num_gpus=$(nvidia-smi --query-gpu=name --format=csv,noheader | wc -l) + if [ "$num_gpus" -lt 2 ]; then + echo "You need at least 2 GPUs to run disaggregated prefill." + exit 1 + else + echo "Found $num_gpus GPUs." + fi +} + +ensure_python_library_installed() { + echo "Checking if $1 is installed..." + if ! python3 -c "import $1" > /dev/null 2>&1; then + echo "$1 is not installed. Please install it via pip install $1." + exit 1 + else + echo "$1 is installed." + fi +} + +cleanup() { + echo "Stopping everything…" + trap - INT TERM # prevent re-entrancy + kill -- -$$ # negative PID == "this whole process-group" + wait # reap children so we don't leave zombies + exit 0 +} + +wait_for_server() { + local port=$1 + local timeout_seconds=$TIMEOUT_SECONDS + local start_time=$(date +%s) + + echo "Waiting for server on port $port..." + + while true; do + if curl -s "localhost:${port}/v1/completions" > /dev/null; then + echo "Server on port $port is ready." + return 0 + fi + + local now=$(date +%s) + if (( now - start_time >= timeout_seconds )); then + echo "Timeout waiting for server on port $port" + return 1 + fi + + sleep 1 + done +} + +main() { + check_required_files + check_hf_token + check_num_gpus + ensure_python_library_installed pandas + ensure_python_library_installed datasets + ensure_python_library_installed vllm + ensure_python_library_installed quart + + trap cleanup INT + trap cleanup USR1 + trap cleanup TERM + + echo "Launching disaggregated serving components..." + echo "Please check the log files for detailed output:" + echo " - prefill*.log: Prefill server logs" + echo " - decode*.log: Decode server logs" + echo " - proxy.log: Proxy server log" + + # ============================================================================= + # Launch Proxy Server + # ============================================================================= + echo "" + echo "Starting proxy server on port $PROXY_PORT..." + python3 disagg_proxy_p2p_nccl_xpyd.py & + PIDS+=($!) + + # Parse GPU and port arrays + IFS=',' read -ra PREFILL_GPU_ARRAY <<< "$PREFILL_GPUS" + IFS=',' read -ra DECODE_GPU_ARRAY <<< "$DECODE_GPUS" + IFS=',' read -ra PREFILL_PORT_ARRAY <<< "$PREFILL_PORTS" + IFS=',' read -ra DECODE_PORT_ARRAY <<< "$DECODE_PORTS" + + # ============================================================================= + # Launch Prefill Servers (X Producers) + # ============================================================================= + echo "" + echo "Starting ${#PREFILL_GPU_ARRAY[@]} prefill server(s)..." + for i in "${!PREFILL_GPU_ARRAY[@]}"; do + local gpu_id=${PREFILL_GPU_ARRAY[$i]} + local port=${PREFILL_PORT_ARRAY[$i]} + local kv_port=$((21001 + i)) + + echo " Prefill server $((i+1)): GPU $gpu_id, Port $port, KV Port $kv_port" + CUDA_VISIBLE_DEVICES=$gpu_id VLLM_USE_V1=1 vllm serve $MODEL \ + --enforce-eager \ + --host 0.0.0.0 \ + --port $port \ + --tensor-parallel-size 1 \ + --seed 1024 \ + --dtype float16 \ + --max-model-len 10000 \ + --max-num-batched-tokens 10000 \ + --max-num-seqs 256 \ + --trust-remote-code \ + --gpu-memory-utilization 0.9 \ + --disable-log-request \ + --kv-transfer-config \ + "{\"kv_connector\":\"P2pNcclConnector\",\"kv_role\":\"kv_producer\",\"kv_buffer_size\":\"1e1\",\"kv_port\":\"$kv_port\",\"kv_connector_extra_config\":{\"proxy_ip\":\"0.0.0.0\",\"proxy_port\":\"$PROXY_PORT\",\"http_port\":\"$port\",\"send_type\":\"PUT_ASYNC\",\"nccl_num_channels\":\"16\"}}" > prefill$((i+1)).log 2>&1 & + PIDS+=($!) + done + + # ============================================================================= + # Launch Decode Servers (Y Decoders) + # ============================================================================= + echo "" + echo "Starting ${#DECODE_GPU_ARRAY[@]} decode server(s)..." + for i in "${!DECODE_GPU_ARRAY[@]}"; do + local gpu_id=${DECODE_GPU_ARRAY[$i]} + local port=${DECODE_PORT_ARRAY[$i]} + local kv_port=$((22001 + i)) + + echo " Decode server $((i+1)): GPU $gpu_id, Port $port, KV Port $kv_port" + VLLM_USE_V1=1 CUDA_VISIBLE_DEVICES=$gpu_id vllm serve $MODEL \ + --enforce-eager \ + --host 0.0.0.0 \ + --port $port \ + --tensor-parallel-size 1 \ + --seed 1024 \ + --dtype float16 \ + --max-model-len 10000 \ + --max-num-batched-tokens 10000 \ + --max-num-seqs 256 \ + --trust-remote-code \ + --gpu-memory-utilization 0.7 \ + --disable-log-request \ + --kv-transfer-config \ + "{\"kv_connector\":\"P2pNcclConnector\",\"kv_role\":\"kv_consumer\",\"kv_buffer_size\":\"8e9\",\"kv_port\":\"$kv_port\",\"kv_connector_extra_config\":{\"proxy_ip\":\"0.0.0.0\",\"proxy_port\":\"$PROXY_PORT\",\"http_port\":\"$port\",\"send_type\":\"PUT_ASYNC\",\"nccl_num_channels\":\"16\"}}" > decode$((i+1)).log 2>&1 & + PIDS+=($!) + done + + # ============================================================================= + # Wait for All Servers to Start + # ============================================================================= + echo "" + echo "Waiting for all servers to start..." + for port in "${PREFILL_PORT_ARRAY[@]}" "${DECODE_PORT_ARRAY[@]}"; do + if ! wait_for_server $port; then + echo "Failed to start server on port $port" + cleanup + exit 1 + fi + done + + echo "" + echo "All servers are up. Starting benchmark..." + + # ============================================================================= + # Run Benchmark + # ============================================================================= + cd ../../../benchmarks/ + python3 benchmark_serving.py --port 10001 --seed $(date +%s) \ + --model $MODEL \ + --dataset-name random --random-input-len 7500 --random-output-len 200 \ + --num-prompts 200 --burstiness 100 --request-rate 2 | tee benchmark.log + + echo "Benchmarking done. Cleaning up..." + + cleanup +} + +main \ No newline at end of file diff --git a/examples/online_serving/disagg_xpyd/disagg_prefill_proxy_xpyd.py b/examples/online_serving/disaggregated_serving_p2p_nccl_xpyd/disagg_proxy_p2p_nccl_xpyd.py similarity index 100% rename from examples/online_serving/disagg_xpyd/disagg_prefill_proxy_xpyd.py rename to examples/online_serving/disaggregated_serving_p2p_nccl_xpyd/disagg_proxy_p2p_nccl_xpyd.py