diff --git a/benchmarks/profiler/profile_sla.py b/benchmarks/profiler/profile_sla.py index 7b887e8fbf..1c6a2d2707 100644 --- a/benchmarks/profiler/profile_sla.py +++ b/benchmarks/profiler/profile_sla.py @@ -21,19 +21,22 @@ import numpy as np import yaml -from utils.config import CONFIG_MODIFIERS, WORKER_COMPONENT_NAMES -from utils.defaults import DECODE_NUM_REQUESTS_RANGE -from utils.genai_perf import benchmark_decode, benchmark_prefill -from utils.plot import plot_decode_performance, plot_prefill_performance -from utils.profile_cache import ( + +from benchmarks.profiler.utils.config import CONFIG_MODIFIERS, WORKER_COMPONENT_NAMES +from benchmarks.profiler.utils.defaults import DECODE_NUM_REQUESTS_RANGE +from benchmarks.profiler.utils.genai_perf import benchmark_decode, benchmark_prefill +from benchmarks.profiler.utils.plot import ( + plot_decode_performance, + plot_prefill_performance, +) +from benchmarks.profiler.utils.profile_cache import ( check_decode_results_exist, check_prefill_results_exist, load_existing_decode_results, load_existing_prefill_results, ) -from utils.profile_decode import profile_decode -from utils.profile_prefill import profile_prefill - +from benchmarks.profiler.utils.profile_decode import profile_decode +from benchmarks.profiler.utils.profile_prefill import profile_prefill from deploy.utils.dynamo_deployment import ( DynamoDeploymentClient, cleanup_remaining_deployments, @@ -57,18 +60,6 @@ async def run_profile(args): try: config_modifier = CONFIG_MODIFIERS[args.backend] - if args.example_dir is None: - logger.info( - "Example directory not provided, inferring from config file location..." - ) - try: - args.example_dir = os.path.dirname(os.path.dirname(args.config)) - except Exception: - logger.error( - "Failed to infer example directory, please provide explicitly using --example-dir " - ) - exit(1) - with open(args.config, "r") as f: config = yaml.safe_load(f) @@ -134,47 +125,50 @@ async def run_profile(args): with open(prefill_config_fn, "w") as f: yaml.dump(prefill_config, f) - client = DynamoDeploymentClient( - namespace=args.namespace, - base_log_dir=work_dir, - model_name=model_name, - service_name=args.service_name, - frontend_port=frontend_port, - deployment_name=prefill_config["metadata"]["name"], - ) - logger.info(f"Created client with service_name: {client.service_name}") - deployment_clients.append(client) # Track for cleanup - await client.create_deployment(prefill_config_fn) - logger.info("Waiting for deployment to be ready...") - await client.wait_for_deployment_ready() - logger.info("Deployment is ready") - - logger.info("Getting deployment logs...") - await client.get_deployment_logs() - logger.info( - f"Logs have been saved to {client.base_log_dir / client.deployment_name}" - ) + if args.dry_run: + logger.info("Skipping deployment creation in dry run mode") + else: + client = DynamoDeploymentClient( + namespace=args.namespace, + base_log_dir=work_dir, + model_name=model_name, + service_name=args.service_name, + frontend_port=frontend_port, + deployment_name=prefill_config["metadata"]["name"], + ) + logger.info(f"Created client with service_name: {client.service_name}") + deployment_clients.append(client) # Track for cleanup + await client.create_deployment(prefill_config_fn) + logger.info("Waiting for deployment to be ready...") + await client.wait_for_deployment_ready() + logger.info("Deployment is ready") + + logger.info("Getting deployment logs...") + await client.get_deployment_logs() + logger.info( + f"Logs have been saved to {client.base_log_dir / client.deployment_name}" + ) - # run genai-perf - base_url = client.get_service_url() - genai_perf_artifact_dir = f"{work_dir}/gap_isl{args.isl}" - gap_result = benchmark_prefill( - args.isl, - genai_perf_artifact_dir, - model_name, - model_name, - base_url=base_url, - ) - if gap_result is not None: - ttft = gap_result["time_to_first_token"]["avg"] - prefill_tp_size.append(tp_size) - prefill_ttft.append(ttft) - prefill_thpt_per_gpu.append(args.isl / ttft / tp_size * 1000) + # run genai-perf + base_url = client.get_service_url() + genai_perf_artifact_dir = f"{work_dir}/gap_isl{args.isl}" + gap_result = benchmark_prefill( + args.isl, + genai_perf_artifact_dir, + model_name, + model_name, + base_url=base_url, + ) + if gap_result is not None: + ttft = gap_result["time_to_first_token"]["avg"] + prefill_tp_size.append(tp_size) + prefill_ttft.append(ttft) + prefill_thpt_per_gpu.append(args.isl / ttft / tp_size * 1000) - logger.info("Cleaning up deployment...") - await client.delete_deployment() - deployment_clients.remove(client) - logger.info("Deployment deleted") + logger.info("Cleaning up deployment...") + await client.delete_deployment() + deployment_clients.remove(client) + logger.info("Deployment deleted") # Plot the results as a 2D scatter plot if prefill_tp_size and prefill_ttft and prefill_thpt_per_gpu: @@ -246,149 +240,163 @@ async def run_profile(args): with open(decode_config_fn, "w") as f: yaml.dump(decode_config, f) - client = DynamoDeploymentClient( - namespace=args.namespace, - base_log_dir=work_dir, - model_name=model_name, - service_name=args.service_name, - frontend_port=frontend_port, - deployment_name=decode_config["metadata"]["name"], - ) - deployment_clients.append(client) # Track for cleanup - await client.create_deployment(decode_config_fn) - logger.info("Waiting for deployment to be ready...") - await client.wait_for_deployment_ready() - logger.info("Deployment is ready") - - logger.info("Getting deployment logs...") - await client.get_deployment_logs() - logger.info( - f"Logs have been saved to {client.base_log_dir / client.deployment_name}" - ) - - max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log( - f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log" - ) - max_concurrency = max_kv_tokens // (args.isl + args.osl) - sweep_num_request = [ - num for num in DECODE_NUM_REQUESTS_RANGE if num <= max_concurrency - ] - logger.info( - f"Sweeping num_request range based on maximum number of kv tokens: {sweep_num_request}" - ) + if args.dry_run: + logger.info("Skipping deployment creation in dry run mode") + else: + client = DynamoDeploymentClient( + namespace=args.namespace, + base_log_dir=work_dir, + model_name=model_name, + service_name=args.service_name, + frontend_port=frontend_port, + deployment_name=decode_config["metadata"]["name"], + ) + deployment_clients.append(client) # Track for cleanup + await client.create_deployment(decode_config_fn) + logger.info("Waiting for deployment to be ready...") + await client.wait_for_deployment_ready() + logger.info("Deployment is ready") + + logger.info("Getting deployment logs...") + await client.get_deployment_logs() + logger.info( + f"Logs have been saved to {client.base_log_dir / client.deployment_name}" + ) - engine_decode_itl = [] - engine_decode_thpt_per_gpu = [] - base_url = client.get_service_url() - for num_request in sweep_num_request: - genai_perf_artifact_dir = f"{work_dir}/gap_request{num_request}_isl{args.isl}_osl{args.osl}_n{num_request}" - gap_result = benchmark_decode( - args.isl, - args.osl, - num_request, - genai_perf_artifact_dir, - model_name, - model_name, - base_url=base_url, + max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log( + f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log" ) - if gap_result is not None: - itl = gap_result["inter_token_latency"]["avg"] - thpt_per_gpu = ( - gap_result["output_token_throughput"]["avg"] / tp_size + max_concurrency = max_kv_tokens // (args.isl + args.osl) + sweep_num_request = [ + num for num in DECODE_NUM_REQUESTS_RANGE if num <= max_concurrency + ] + logger.info( + f"Sweeping num_request range based on maximum number of kv tokens: {sweep_num_request}" + ) + + engine_decode_itl = [] + engine_decode_thpt_per_gpu = [] + base_url = client.get_service_url() + for num_request in sweep_num_request: + genai_perf_artifact_dir = f"{work_dir}/gap_request{num_request}_isl{args.isl}_osl{args.osl}_n{num_request}" + gap_result = benchmark_decode( + args.isl, + args.osl, + num_request, + genai_perf_artifact_dir, + model_name, + model_name, + base_url=base_url, ) - engine_decode_itl.append(itl) - engine_decode_thpt_per_gpu.append(thpt_per_gpu) - decode_tp_size.append(tp_size) - decode_itl.append(itl) - decode_thpt_per_gpu.append(thpt_per_gpu) - decode_concurrency.append(num_request) - decode_kv_cache_size.append(max_kv_tokens) + if gap_result is not None: + itl = gap_result["inter_token_latency"]["avg"] + thpt_per_gpu = ( + gap_result["output_token_throughput"]["avg"] / tp_size + ) + engine_decode_itl.append(itl) + engine_decode_thpt_per_gpu.append(thpt_per_gpu) + decode_tp_size.append(tp_size) + decode_itl.append(itl) + decode_thpt_per_gpu.append(thpt_per_gpu) + decode_concurrency.append(num_request) + decode_kv_cache_size.append(max_kv_tokens) - logger.info("Cleaning up deployment...") - await client.delete_deployment() - deployment_clients.remove(client) - logger.info("Deployment deleted") + logger.info("Cleaning up deployment...") + await client.delete_deployment() + deployment_clients.remove(client) + logger.info("Deployment deleted") - # Store partial results for plotting later - decode_results.append( - (tp_size, engine_decode_itl, engine_decode_thpt_per_gpu) - ) + # Store partial results for plotting later + decode_results.append( + (tp_size, engine_decode_itl, engine_decode_thpt_per_gpu) + ) # Plot all decode results after profiling is complete if decode_results: plot_decode_performance(decode_results, args.itl, args.output_dir) - logger.info("Analyzing results and generate recommendations...") - # Safety guards: no results → exit early with a clear message - if not (prefill_tp_size and prefill_ttft and prefill_thpt_per_gpu): - logger.error("No prefill results produced; skipping recommendations.") - return + if args.dry_run: + logger.info("Skipping recommendations in dry run mode") + else: + logger.info("Analyzing results and generate recommendations...") + # Safety guards: no results → exit early with a clear message + if not (prefill_tp_size and prefill_ttft and prefill_thpt_per_gpu): + logger.error("No prefill results produced; skipping recommendations.") + + # select best tp size for prefill + if min(prefill_ttft) > args.ttft: + logger.info( + "No TP size satisfies the TTFT requirement, please try a smaller model or a more powerful GPU SKU" + ) + selected_prefill_idx = int(np.argmin(np.array(prefill_ttft))) + else: + valid_indices = [ + i for i, ttft in enumerate(prefill_ttft) if ttft <= args.ttft + ] + # Among valid TP sizes, select the one with highest throughput per GPU + valid_thpts = [prefill_thpt_per_gpu[i] for i in valid_indices] + max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))] + selected_prefill_idx = max_thpt_idx + logger.info( + f"Suggested prefill TP:{prefill_tp_size[selected_prefill_idx]} (TTFT {prefill_ttft[selected_prefill_idx]:.2f} ms, throughput {prefill_thpt_per_gpu[selected_prefill_idx]:.2f} tokens/s/GPU)" + ) - # select best tp size for prefill - if min(prefill_ttft) > args.ttft: + # scale up if estimated TTFT is 120% of target TTFT + prefill_queue_size_upper_bound = max( + 0.1, args.ttft * 1.2 / prefill_ttft[selected_prefill_idx] - 1 + ) + # scale down if estimated TTFT is 80% of target TTFT + prefill_queue_size_lower_bound = max( + 0.1, args.ttft * 0.8 / prefill_ttft[selected_prefill_idx] - 1 + ) logger.info( - "No TP size satisfies the TTFT requirement, please try a smaller model or a more powerful GPU SKU" + f"Suggested planner upper/lower bound for prefill queue size: {prefill_queue_size_upper_bound:.2f}/{prefill_queue_size_lower_bound:.2f}" ) - selected_prefill_idx = int(np.argmin(np.array(prefill_ttft))) - else: - valid_indices = [ - i for i, ttft in enumerate(prefill_ttft) if ttft <= args.ttft - ] - # Among valid TP sizes, select the one with highest throughput per GPU - valid_thpts = [prefill_thpt_per_gpu[i] for i in valid_indices] - max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))] - selected_prefill_idx = max_thpt_idx - logger.info( - f"Suggested prefill TP:{prefill_tp_size[selected_prefill_idx]} (TTFT {prefill_ttft[selected_prefill_idx]:.2f} ms, throughput {prefill_thpt_per_gpu[selected_prefill_idx]:.2f} tokens/s/GPU)" - ) - # scale up if estimated TTFT is 120% of target TTFT - prefill_queue_size_upper_bound = max( - 0.1, args.ttft * 1.2 / prefill_ttft[selected_prefill_idx] - 1 - ) - # scale down if estimated TTFT is 80% of target TTFT - prefill_queue_size_lower_bound = max( - 0.1, args.ttft * 0.8 / prefill_ttft[selected_prefill_idx] - 1 - ) - logger.info( - f"Suggested planner upper/lower bound for prefill queue size: {prefill_queue_size_upper_bound:.2f}/{prefill_queue_size_lower_bound:.2f}" - ) + # select best tp size for decode + if not ( + decode_tp_size + and decode_itl + and decode_thpt_per_gpu + and decode_concurrency + and decode_kv_cache_size + ): + logger.error("No decode results produced; skipping recommendations.") + return + if min(decode_itl) > args.itl: + logger.info( + "No TP size satisfies the ITL requirement, please try a smaller model or a more powerful GPU SKU" + ) + selected_decode_idx = int(np.argmin(np.array(decode_itl))) + else: + valid_indices = [ + i for i, itl in enumerate(decode_itl) if itl <= args.itl + ] + # Among valid TP sizes, select the one with highest throughput per GPU + valid_thpts = [decode_thpt_per_gpu[i] for i in valid_indices] + max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))] + selected_decode_idx = max_thpt_idx + logger.info( + f"Suggested decode TP:{decode_tp_size[selected_decode_idx]} (ITL {decode_itl[selected_decode_idx]:.2f} ms, throughput {decode_thpt_per_gpu[selected_decode_idx]:.2f} tokens/s/GPU)" + ) - # select best tp size for decode - if not ( - decode_tp_size - and decode_itl - and decode_thpt_per_gpu - and decode_concurrency - and decode_kv_cache_size - ): - logger.error("No decode results produced; skipping recommendations.") - return - if min(decode_itl) > args.itl: + # calculate kv cache utlization for the selected TP and concurrency + selected_decode_kv_cache_utilization = ( + decode_concurrency[selected_decode_idx] + * (args.isl + (args.osl / 2)) + / decode_kv_cache_size[selected_decode_idx] + ) + # set a +- 20% range for the kv cache utilization logger.info( - "No TP size satisfies the ITL requirement, please try a smaller model or a more powerful GPU SKU" + f"Suggested planner upper/lower bound for decode kv cache utilization: {min(1, selected_decode_kv_cache_utilization + 0.2):.2f}/{max(0.1, selected_decode_kv_cache_utilization - 0.2):.2f}" ) - selected_decode_idx = int(np.argmin(np.array(decode_itl))) - else: - valid_indices = [i for i, itl in enumerate(decode_itl) if itl <= args.itl] - # Among valid TP sizes, select the one with highest throughput per GPU - valid_thpts = [decode_thpt_per_gpu[i] for i in valid_indices] - max_thpt_idx = valid_indices[int(np.argmax(valid_thpts))] - selected_decode_idx = max_thpt_idx - logger.info( - f"Suggested decode TP:{decode_tp_size[selected_decode_idx]} (ITL {decode_itl[selected_decode_idx]:.2f} ms, throughput {decode_thpt_per_gpu[selected_decode_idx]:.2f} tokens/s/GPU)" - ) - # calculate kv cache utlization for the selected TP and concurrency - selected_decode_kv_cache_utilization = ( - decode_concurrency[selected_decode_idx] - * (args.isl + (args.osl / 2)) - / decode_kv_cache_size[selected_decode_idx] - ) - # set a +- 20% range for the kv cache utilization - logger.info( - f"Suggested planner upper/lower bound for decode kv cache utilization: {min(1, selected_decode_kv_cache_utilization + 0.2):.2f}/{max(0.1, selected_decode_kv_cache_utilization - 0.2):.2f}" - ) + if args.dry_run: + # use min value for prefill and decode TP sizes + prefill_tp_size = [args.min_num_gpus_per_engine] + decode_tp_size = [args.min_num_gpus_per_engine] + selected_prefill_idx = 0 + selected_decode_idx = 0 # interpolate ISL - TTFT with best prefill TP best_prefill_tp = prefill_tp_size[selected_prefill_idx] @@ -408,50 +416,53 @@ async def run_profile(args): with open(prefill_config_fn, "w") as f: yaml.dump(prefill_config, f) - client = DynamoDeploymentClient( - namespace=args.namespace, - base_log_dir=work_dir, - model_name=model_name, - service_name=args.service_name, - frontend_port=frontend_port, - deployment_name=prefill_config["metadata"]["name"], - ) - deployment_clients.append(client) # Track for cleanup - await client.create_deployment(prefill_config_fn) - logger.info("Waiting for deployment to be ready...") - try: - await client.wait_for_deployment_ready() - logger.info("Deployment is ready") - skip_profile = False - except TimeoutError: - logger.error( - "Deployment failed to become ready within timeout, skipping profiling" + if args.dry_run: + logger.info("Skipping deployment creation in dry run mode") + else: + client = DynamoDeploymentClient( + namespace=args.namespace, + base_log_dir=work_dir, + model_name=model_name, + service_name=args.service_name, + frontend_port=frontend_port, + deployment_name=prefill_config["metadata"]["name"], ) - skip_profile = True + deployment_clients.append(client) # Track for cleanup + await client.create_deployment(prefill_config_fn) + logger.info("Waiting for deployment to be ready...") + try: + await client.wait_for_deployment_ready() + logger.info("Deployment is ready") + skip_profile = False + except TimeoutError: + logger.error( + "Deployment failed to become ready within timeout, skipping profiling" + ) + skip_profile = True - if not skip_profile: - logger.info("Getting deployment logs...") - await client.get_deployment_logs() - logger.info( - f"Logs have been saved to {client.base_log_dir / client.deployment_name}" - ) + if not skip_profile: + logger.info("Getting deployment logs...") + await client.get_deployment_logs() + logger.info( + f"Logs have been saved to {client.base_log_dir / client.deployment_name}" + ) - base_url = client.get_service_url() + base_url = client.get_service_url() - profile_prefill( - work_dir, - model_name, - model_name, - base_url, - best_prefill_tp, - args.max_context_length, - args.prefill_interpolation_granularity, - ) + profile_prefill( + work_dir, + model_name, + model_name, + base_url, + best_prefill_tp, + args.max_context_length, + args.prefill_interpolation_granularity, + ) - logger.info("Cleaning up deployment...") - await client.delete_deployment() - deployment_clients.remove(client) - logger.info("Deployment deleted") + logger.info("Cleaning up deployment...") + await client.delete_deployment() + deployment_clients.remove(client) + logger.info("Deployment deleted") # interpolate ITL - Active_KV_Cache - Decode_Context_Length with best decode TP best_decode_tp = decode_tp_size[selected_decode_idx] @@ -468,47 +479,50 @@ async def run_profile(args): with open(decode_config_fn, "w") as f: yaml.dump(decode_config, f) - client = DynamoDeploymentClient( - namespace=args.namespace, - base_log_dir=work_dir, - model_name=model_name, - service_name=args.service_name, - frontend_port=frontend_port, - deployment_name=decode_config["metadata"]["name"], - ) - deployment_clients.append(client) # Track for cleanup - await client.create_deployment(decode_config_fn) - logger.info("Waiting for deployment to be ready...") - await client.wait_for_deployment_ready() - logger.info("Deployment is ready") - - logger.info("Getting deployment logs...") - await client.get_deployment_logs() - logger.info( - f"Logs have been saved to {client.base_log_dir / client.deployment_name}" - ) + if args.dry_run: + logger.info("Skipping deployment creation in dry run mode") + else: + client = DynamoDeploymentClient( + namespace=args.namespace, + base_log_dir=work_dir, + model_name=model_name, + service_name=args.service_name, + frontend_port=frontend_port, + deployment_name=decode_config["metadata"]["name"], + ) + deployment_clients.append(client) # Track for cleanup + await client.create_deployment(decode_config_fn) + logger.info("Waiting for deployment to be ready...") + await client.wait_for_deployment_ready() + logger.info("Deployment is ready") - max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log( - f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log" - ) + logger.info("Getting deployment logs...") + await client.get_deployment_logs() + logger.info( + f"Logs have been saved to {client.base_log_dir / client.deployment_name}" + ) - base_url = client.get_service_url() - - profile_decode( - work_dir, - model_name, - model_name, - base_url, - best_decode_tp, - max_kv_tokens, - args.max_context_length, - args.decode_interpolation_granularity, - ) + max_kv_tokens = config_modifier.get_kv_cache_size_from_dynamo_log( + f"{work_dir}/{client.deployment_name}/{WORKER_COMPONENT_NAMES[args.backend].decode_worker_k8s_name.lower()}/0.log" + ) + + base_url = client.get_service_url() + + profile_decode( + work_dir, + model_name, + model_name, + base_url, + best_decode_tp, + max_kv_tokens, + args.max_context_length, + args.decode_interpolation_granularity, + ) - logger.info("Cleaning up deployment...") - await client.delete_deployment() - deployment_clients.remove(client) - logger.info("Deployment deleted") + logger.info("Cleaning up deployment...") + await client.delete_deployment() + deployment_clients.remove(client) + logger.info("Deployment deleted") except Exception as e: logger.error(f"Profile job failed with error: {e}") @@ -543,12 +557,6 @@ async def run_profile(args): required=True, help="Path to the DynamoGraphDeployment config file", ) - parser.add_argument( - "--example-dir", - type=str, - default=None, - help="path to the example directory, if not provided, will try to infer from config file location", - ) parser.add_argument( "--output-dir", type=str, @@ -614,6 +622,11 @@ async def run_profile(args): default="", help="Service name for port forwarding (default: {deployment_name}-frontend)", ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Dry run the profile job", + ) args = parser.parse_args() asyncio.run(run_profile(args)) diff --git a/benchmarks/profiler/utils/config.py b/benchmarks/profiler/utils/config.py index 762fbf91ae..13055cf9d8 100644 --- a/benchmarks/profiler/utils/config.py +++ b/benchmarks/profiler/utils/config.py @@ -15,11 +15,14 @@ import logging import re -from typing import Literal, Optional, cast +from typing import Literal, Optional, Protocol from pydantic import BaseModel -from utils.defaults import DEFAULT_MODEL_NAME, DYNAMO_RUN_DEFAULT_PORT +from benchmarks.profiler.utils.defaults import ( + DEFAULT_MODEL_NAME, + DYNAMO_RUN_DEFAULT_PORT, +) from dynamo.planner.defaults import WORKER_COMPONENT_NAMES logger = logging.getLogger(__name__) @@ -34,27 +37,30 @@ class Container(BaseModel): - args: list[str] = [] + args: Optional[list[str]] = None + model_config = {"extra": "allow"} class PodSpec(BaseModel): - mainContainer: Container + mainContainer: Optional[Container] = None + model_config = {"extra": "allow"} class ServiceResources(BaseModel): - requests: dict[str, str] + requests: Optional[dict[str, str]] = None limits: Optional[dict[str, str]] = None class Service(BaseModel): - replicas: int - resources: ServiceResources - extraPodSpec: PodSpec + replicas: Optional[int] = None + resources: Optional[ServiceResources] = None + extraPodSpec: Optional[PodSpec] = None + model_config = {"extra": "allow"} class Services(BaseModel): Frontend: Service - __root__: dict[str, Service] + model_config = {"extra": "allow"} class Spec(BaseModel): @@ -68,15 +74,19 @@ class Metadata(BaseModel): class Config(BaseModel): metadata: Metadata spec: Spec + model_config = {"extra": "allow"} -def break_arguments(args: list[str]) -> list[str]: - ans = [] +def break_arguments(args: list[str] | None) -> list[str]: + ans: list[str] = [] + if args is None: + return ans if isinstance(args, str): ans = re.split(r"[ =]", args) else: for arg in args: - ans.extend(arg.split(" ")) + if arg is not None: + ans.extend(arg.split(" ")) return ans @@ -122,6 +132,28 @@ def find_arg_index(args: list[str]) -> int: return idx +class ConfigModifierProtocol(Protocol): + @classmethod + def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> dict: + ... + + @classmethod + def set_config_tp_size(cls, config: dict, tp_size: int) -> dict: + ... + + @classmethod + def get_model_name(cls, config: dict) -> str: + ... + + @classmethod + def get_port(cls, config: dict) -> int: + ... + + @classmethod + def get_kv_cache_size_from_dynamo_log(cls, dynamo_log_fn: str) -> int: + ... + + class VllmV1ConfigModifier: @classmethod def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> dict: @@ -145,9 +177,17 @@ def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> d WORKER_COMPONENT_NAMES["vllm"].prefill_worker_k8s_name ] - args = cfg.spec.services[ + worker_service = cfg.spec.services[ WORKER_COMPONENT_NAMES["vllm"].decode_worker_k8s_name - ].extraPodSpec.mainContainer.args + ] + if ( + not worker_service.extraPodSpec + or not worker_service.extraPodSpec.mainContainer + ): + raise ValueError( + "Missing extraPodSpec or mainContainer in worker service" + ) + args = worker_service.extraPodSpec.mainContainer.args args = break_arguments(args) @@ -160,9 +200,7 @@ def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> d if "--no-enable-prefix-caching" not in args: args = append_argument(args, "--no-enable-prefix-caching") - cfg.spec.services[ - WORKER_COMPONENT_NAMES["vllm"].decode_worker_k8s_name - ].extraPodSpec.mainContainer.args = join_arguments(args) + worker_service.extraPodSpec.mainContainer.args = join_arguments(args) elif target == "decode": # delete prefill worker @@ -170,9 +208,17 @@ def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> d WORKER_COMPONENT_NAMES["vllm"].prefill_worker_k8s_name ] - args = cfg.spec.services[ + worker_service = cfg.spec.services[ WORKER_COMPONENT_NAMES["vllm"].decode_worker_k8s_name - ].extraPodSpec.mainContainer.args + ] + if ( + not worker_service.extraPodSpec + or not worker_service.extraPodSpec.mainContainer + ): + raise ValueError( + "Missing extraPodSpec or mainContainer in worker service" + ) + args = worker_service.extraPodSpec.mainContainer.args args = break_arguments(args) @@ -182,9 +228,7 @@ def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> d if "--no-enable-prefix-caching" in args: args.remove("--no-enable-prefix-caching") - cfg.spec.services[ - WORKER_COMPONENT_NAMES["vllm"].decode_worker_k8s_name - ].extraPodSpec.mainContainer.args = join_arguments(args) + worker_service.extraPodSpec.mainContainer.args = join_arguments(args) # set num workers to 1 decode_worker_config = cfg.spec.services[ @@ -198,27 +242,30 @@ def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> d def set_config_tp_size(cls, config: dict, tp_size: int): cfg = Config.model_validate(config) - cfg.spec.services[ + worker_service = cfg.spec.services[ WORKER_COMPONENT_NAMES["vllm"].decode_worker_k8s_name - ].resources.requests["gpu"] = str(tp_size) + ] + + # Ensure resources exists + if worker_service.resources is None: + worker_service.resources = ServiceResources() + + # Ensure requests exists + if worker_service.resources.requests is None: + worker_service.resources.requests = {} + + worker_service.resources.requests["gpu"] = str(tp_size) + + # Update limits if they exist + if worker_service.resources.limits is not None: + worker_service.resources.limits["gpu"] = str(tp_size) + if ( - cfg.spec.services[ - WORKER_COMPONENT_NAMES["vllm"].decode_worker_k8s_name - ].resources.limits - is not None + not worker_service.extraPodSpec + or not worker_service.extraPodSpec.mainContainer ): - # Explicitly cast `limits` as the typecheck cannot determine that - # limits is not None here - cast( - dict[str, str], - cfg.spec.services[ - WORKER_COMPONENT_NAMES["vllm"].decode_worker_k8s_name - ].resources.limits, - )["gpu"] = str(tp_size) - - args = cfg.spec.services[ - WORKER_COMPONENT_NAMES["vllm"].decode_worker_k8s_name - ].extraPodSpec.mainContainer.args + raise ValueError("Missing extraPodSpec or mainContainer in worker service") + args = worker_service.extraPodSpec.mainContainer.args args = break_arguments(args) @@ -228,9 +275,7 @@ def set_config_tp_size(cls, config: dict, tp_size: int): except ValueError: args = append_argument(args, ["--tensor-parallel-size", str(tp_size)]) - cfg.spec.services[ - WORKER_COMPONENT_NAMES["vllm"].decode_worker_k8s_name - ].extraPodSpec.mainContainer.args = join_arguments(args) + worker_service.extraPodSpec.mainContainer.args = join_arguments(args) return cfg.model_dump() @@ -238,7 +283,16 @@ def set_config_tp_size(cls, config: dict, tp_size: int): def get_model_name(cls, config: dict) -> str: cfg = Config.model_validate(config) worker_name = WORKER_COMPONENT_NAMES["vllm"].decode_worker_k8s_name - args = cfg.spec.services[worker_name].extraPodSpec.mainContainer.args + worker_service = cfg.spec.services[worker_name] + if ( + not worker_service.extraPodSpec + or not worker_service.extraPodSpec.mainContainer + ): + logger.warning( + f"Worker service missing extraPodSpec or mainContainer, using default model name: {DEFAULT_MODEL_NAME}" + ) + return DEFAULT_MODEL_NAME + args = worker_service.extraPodSpec.mainContainer.args args = break_arguments(args) for i, arg in enumerate(args): @@ -253,12 +307,29 @@ def get_model_name(cls, config: dict) -> str: @classmethod def get_port(cls, config: dict) -> int: cfg = Config.model_validate(config) - args = cfg.spec.services["Frontend"].extraPodSpec.mainContainer.args + frontend_service = cfg.spec.services.get("Frontend") + if ( + not frontend_service + or not frontend_service.extraPodSpec + or not frontend_service.extraPodSpec.mainContainer + ): + logger.warning( + f"Frontend service or container not found, using default port: {DYNAMO_RUN_DEFAULT_PORT}" + ) + return DYNAMO_RUN_DEFAULT_PORT + + args = frontend_service.extraPodSpec.mainContainer.args + if not args: + logger.warning( + f"No args found in Frontend configuration, using default port: {DYNAMO_RUN_DEFAULT_PORT}" + ) + return DYNAMO_RUN_DEFAULT_PORT + args = break_arguments(args) try: idx = args.index("--http-port") return int(args[idx + 1]) - except ValueError: + except (ValueError, IndexError): logger.warning( f"Port not found in configuration args, using default port: {DYNAMO_RUN_DEFAULT_PORT}" ) @@ -311,9 +382,17 @@ def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> d WORKER_COMPONENT_NAMES["sglang"].prefill_worker_k8s_name ] - args = cfg.spec.services[ + worker_service = cfg.spec.services[ WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name - ].extraPodSpec.mainContainer.args + ] + if ( + not worker_service.extraPodSpec + or not worker_service.extraPodSpec.mainContainer + ): + raise ValueError( + "Missing extraPodSpec or mainContainer in worker service" + ) + args = worker_service.extraPodSpec.mainContainer.args args = break_arguments(args) @@ -325,9 +404,7 @@ def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> d if "--disable-radix-cache" not in args: args = append_argument(args, "--disable-radix-cache") - cfg.spec.services[ - WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name - ].extraPodSpec.mainContainer.args = join_arguments(args) + worker_service.extraPodSpec.mainContainer.args = join_arguments(args) elif target == "decode": # delete prefill worker @@ -335,16 +412,20 @@ def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> d WORKER_COMPONENT_NAMES["sglang"].prefill_worker_k8s_name ] - args = cfg.spec.services[ + worker_service = cfg.spec.services[ WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name - ].extraPodSpec.mainContainer.args + ] + if ( + not worker_service.extraPodSpec + or not worker_service.extraPodSpec.mainContainer + ): + raise ValueError( + "Missing extraPodSpec or mainContainer in worker service" + ) + args = worker_service.extraPodSpec.mainContainer.args args = break_arguments(args) - # call `dynamo.sglang.worker` instead of `dynamo.sglang.decode_worker` - idx = args.index("dynamo.sglang.decode_worker") - args[idx] = "dynamo.sglang.worker" - # remove `--disaggregation-mode` and `--disaggregation-transfer-backend` args = remove_valued_arguments(args, "--disaggregation-mode") args = remove_valued_arguments(args, "--disaggregation-transfer-backend") @@ -353,9 +434,7 @@ def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> d if "--disable-radix-cache" in args: args.remove("--disable-radix-cache") - cfg.spec.services[ - WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name - ].extraPodSpec.mainContainer.args = join_arguments(args) + worker_service.extraPodSpec.mainContainer.args = join_arguments(args) # set num workers to 1 decode_worker_config = config["spec"]["services"][ @@ -369,27 +448,30 @@ def convert_config(cls, config: dict, target: Literal["prefill", "decode"]) -> d def set_config_tp_size(cls, config: dict, tp_size: int): cfg = Config.model_validate(config) - cfg.spec.services[ + worker_service = cfg.spec.services[ WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name - ].resources.requests["gpu"] = str(tp_size) + ] + + # Ensure resources exists + if worker_service.resources is None: + worker_service.resources = ServiceResources() + + # Ensure requests exists + if worker_service.resources.requests is None: + worker_service.resources.requests = {} + + worker_service.resources.requests["gpu"] = str(tp_size) + + # Update limits if they exist + if worker_service.resources.limits is not None: + worker_service.resources.limits["gpu"] = str(tp_size) + if ( - cfg.spec.services[ - WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name - ].resources.limits - is not None + not worker_service.extraPodSpec + or not worker_service.extraPodSpec.mainContainer ): - # Explicitly cast `limits` as the typecheck cannot determine that - # limits is not None here - cast( - dict[str, str], - cfg.spec.services[ - WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name - ].resources.limits, - )["gpu"] = str(tp_size) - - args = cfg.spec.services[ - WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name - ].extraPodSpec.mainContainer.args + raise ValueError("Missing extraPodSpec or mainContainer in worker service") + args = worker_service.extraPodSpec.mainContainer.args args = break_arguments(args) @@ -399,9 +481,7 @@ def set_config_tp_size(cls, config: dict, tp_size: int): except ValueError: args = append_argument(args, ["--tp", str(tp_size)]) - cfg.spec.services[ - WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name - ].extraPodSpec.mainContainer.args = join_arguments(args) + worker_service.extraPodSpec.mainContainer.args = join_arguments(args) return cfg.model_dump() @@ -409,7 +489,16 @@ def set_config_tp_size(cls, config: dict, tp_size: int): def get_model_name(cls, config: dict) -> str: cfg = Config.model_validate(config) worker_name = WORKER_COMPONENT_NAMES["sglang"].decode_worker_k8s_name - args = cfg.spec.services[worker_name].extraPodSpec.mainContainer.args + worker_service = cfg.spec.services[worker_name] + if ( + not worker_service.extraPodSpec + or not worker_service.extraPodSpec.mainContainer + ): + logger.warning( + f"Worker service missing extraPodSpec or mainContainer, using default model name: {DEFAULT_MODEL_NAME}" + ) + return DEFAULT_MODEL_NAME + args = worker_service.extraPodSpec.mainContainer.args args = break_arguments(args) for i, arg in enumerate(args): @@ -424,12 +513,29 @@ def get_model_name(cls, config: dict) -> str: @classmethod def get_port(cls, config: dict) -> int: cfg = Config.model_validate(config) - args = cfg.spec.services["Frontend"].extraPodSpec.mainContainer.args + frontend_service = cfg.spec.services.get("Frontend") + if ( + not frontend_service + or not frontend_service.extraPodSpec + or not frontend_service.extraPodSpec.mainContainer + ): + logger.warning( + f"Frontend service or container not found, using default port: {DYNAMO_RUN_DEFAULT_PORT}" + ) + return DYNAMO_RUN_DEFAULT_PORT + + args = frontend_service.extraPodSpec.mainContainer.args + if not args: + logger.warning( + f"No args found in Frontend configuration, using default port: {DYNAMO_RUN_DEFAULT_PORT}" + ) + return DYNAMO_RUN_DEFAULT_PORT + args = break_arguments(args) try: idx = args.index("--http-port") return int(args[idx + 1]) - except ValueError: + except (ValueError, IndexError): logger.warning( f"Port not found in configuration args, using default port: {DYNAMO_RUN_DEFAULT_PORT}" ) @@ -451,7 +557,10 @@ def get_kv_cache_size_from_dynamo_log(cls, dynamo_log_fn: str) -> int: return 0 -CONFIG_MODIFIERS = { +CONFIG_MODIFIERS: dict[str, type[ConfigModifierProtocol]] = { "vllm": VllmV1ConfigModifier, "sglang": SGLangConfigModifier, } + +# Re-export WORKER_COMPONENT_NAMES for profile_sla.py +__all__ = ["CONFIG_MODIFIERS", "WORKER_COMPONENT_NAMES"] diff --git a/benchmarks/profiler/utils/profile_decode.py b/benchmarks/profiler/utils/profile_decode.py index a3b330220d..6ef60490ea 100644 --- a/benchmarks/profiler/utils/profile_decode.py +++ b/benchmarks/profiler/utils/profile_decode.py @@ -4,8 +4,9 @@ import logging import numpy as np -from utils.genai_perf import benchmark_decode -from utils.plot import plot_decode_3d_surface + +from benchmarks.profiler.utils.genai_perf import benchmark_decode +from benchmarks.profiler.utils.plot import plot_decode_3d_surface logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) diff --git a/benchmarks/profiler/utils/profile_prefill.py b/benchmarks/profiler/utils/profile_prefill.py index 6cbdec3cff..68c493495b 100644 --- a/benchmarks/profiler/utils/profile_prefill.py +++ b/benchmarks/profiler/utils/profile_prefill.py @@ -4,8 +4,9 @@ import logging import numpy as np -from utils.genai_perf import benchmark_prefill -from utils.plot import plot_prefill_interpolation + +from benchmarks.profiler.utils.genai_perf import benchmark_prefill +from benchmarks.profiler.utils.plot import plot_prefill_interpolation logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) diff --git a/tests/profiler/__init__.py b/tests/profiler/__init__.py new file mode 100644 index 0000000000..2b1d83fc67 --- /dev/null +++ b/tests/profiler/__init__.py @@ -0,0 +1,16 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the profiler module.""" diff --git a/tests/profiler/test_profile_sla_dryrun.py b/tests/profiler/test_profile_sla_dryrun.py new file mode 100644 index 0000000000..80b32af99f --- /dev/null +++ b/tests/profiler/test_profile_sla_dryrun.py @@ -0,0 +1,88 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Test suite for profile_sla dry-run functionality. + +This test ensures that the profile_sla script can successfully run in dry-run mode +for both vllm and sglang backends with their respective disagg.yaml configurations. +""" + +import sys +from pathlib import Path + +import pytest + +# Add the project root to sys.path to enable imports +project_root = Path(__file__).parent.parent.parent +sys.path.insert(0, str(project_root)) + +from benchmarks.profiler.profile_sla import run_profile # noqa: E402 + + +class TestProfileSLADryRun: + """Test class for profile_sla dry-run functionality.""" + + @pytest.fixture + def vllm_args(self): + """Create arguments for vllm backend dry-run test.""" + + class Args: + backend = "vllm" + config = "components/backends/vllm/deploy/disagg.yaml" + output_dir = "/tmp/test_profiling_results" + namespace = "test-namespace" + min_num_gpus_per_engine = 1 + max_num_gpus_per_engine = 8 + skip_existing_results = False + force_rerun = False + isl = 3000 + osl = 500 + ttft = 50 + itl = 10 + max_context_length = 16384 + prefill_interpolation_granularity = 16 + decode_interpolation_granularity = 6 + service_name = "" + dry_run = True + + return Args() + + @pytest.fixture + def sglang_args(self): + """Create arguments for sglang backend dry-run test.""" + + class Args: + backend = "sglang" + config = "components/backends/sglang/deploy/disagg.yaml" + output_dir = "/tmp/test_profiling_results" + namespace = "test-namespace" + min_num_gpus_per_engine = 1 + max_num_gpus_per_engine = 8 + skip_existing_results = False + force_rerun = False + isl = 3000 + osl = 500 + ttft = 50 + itl = 10 + max_context_length = 16384 + prefill_interpolation_granularity = 16 + decode_interpolation_granularity = 6 + service_name = "" + dry_run = True + + return Args() + + @pytest.mark.pre_merge + @pytest.mark.asyncio + async def test_vllm_dryrun(self, vllm_args): + """Test that profile_sla dry-run works for vllm backend with disagg.yaml config.""" + # Run the profile in dry-run mode - should complete without errors + await run_profile(vllm_args) + + @pytest.mark.pre_merge + @pytest.mark.asyncio + async def test_sglang_dryrun(self, sglang_args): + """Test that profile_sla dry-run works for sglang backend with disagg.yaml config.""" + # Run the profile in dry-run mode - should complete without errors + await run_profile(sglang_args)