diff --git a/.github/workflows/auto-update-dev.yml b/.github/workflows/auto-update-dev.yml index bf6ccfa219..f6423e5887 100644 --- a/.github/workflows/auto-update-dev.yml +++ b/.github/workflows/auto-update-dev.yml @@ -12,7 +12,6 @@ jobs: strategy: matrix: branch: [ "dev", "docs" ] - runs-on: ubuntu-latest permissions: diff --git a/multimodal/vl2l/README.md b/multimodal/vl2l/README.md index 7ca8b93395..ffc3ef5e92 100644 --- a/multimodal/vl2l/README.md +++ b/multimodal/vl2l/README.md @@ -2,19 +2,7 @@ ## Quick Start -### Get the source code - -Clone the MLPerf Inference repo via: - -```bash -git clone --recurse-submodules https://github.com/mlcommons/inference.git mlperf-inference -``` - -Then enter the repo: - -```bash -cd mlperf-inference/ -``` +This guide demonstrates how you can run the benchmark on your local machine. ### Create a Conda environment @@ -26,43 +14,31 @@ environment via: conda create -n mlperf-inf-mm-vl2l python=3.12 ``` -### Install LoadGen +### Install the VL2L benchmarking CLI -Update `libstdc++` in the conda environment: +#### For users -```bash -conda install -c conda-forge libstdcxx-ng -``` - -Install `absl-py` and `numpy`: +Install `mlperf-inf-mm-vl2l` with: ```bash -conda install absl-py numpy +pip install git+https://github.com/mlcommons/inference.git#subdirectory=multimodal/vl2l/ ``` -Build and install LoadGen from source: +#### For developers -```bash -cd loadgen/ -CFLAGS="-std=c++14 -O3" python -m pip install . -cd ../ -``` - -Run a quick test to validate that LoadGen was installed correctly: +Clone the MLPerf Inference repo via: ```bash -python loadgen/demos/token_metrics/py_demo_server.py +git clone --recurse-submodules https://github.com/mlcommons/inference.git mlperf-inference ``` -### Install the VL2L benchmarking CLI - -For users, install `mlperf-inf-mm-vl2l` with: +Then enter the repo: ```bash -pip install multimodal/vl2l/ +cd mlperf-inference/ ``` -For developers, install `mlperf-inf-mm-vl2l` and the development tools with: +Install `mlperf-inf-mm-vl2l` and the development tools with: - On Bash ```bash @@ -73,6 +49,8 @@ pip install multimodal/vl2l/[dev] pip install multimodal/vl2l/"[dev]" ``` +### Post VL2L benchmarking CLI installation + After installation, you can check the CLI flags that `mlperf-inf-mm-vl2l` can take with: ```bash @@ -107,13 +85,13 @@ docker run --gpus all \ # Use all the GPUs on th Performance only mode: ```bash -mlperf-inf-mm-vl2l --settings.test.scenario offline --settings.test.mode performance_only +mlperf-inf-mm-vl2l benchmark endpoint --settings.test.scenario offline --settings.test.mode performance_only ``` Accuracy only mode: ```bash -mlperf-inf-mm-vl2l --settings.test.scenario offline --settings.test.mode accuracy_only +mlperf-inf-mm-vl2l benchmark endpoint --settings.test.scenario offline --settings.test.mode accuracy_only ``` ### Run the benchmark for the Server scenario @@ -121,13 +99,45 @@ mlperf-inf-mm-vl2l --settings.test.scenario offline --settings.test.mode accurac Performance only mode: ```bash -mlperf-inf-mm-vl2l --settings.test.scenario server --settings.test.mode performance_only +mlperf-inf-mm-vl2l benchmark endpoint --settings.test.scenario server --settings.test.mode performance_only ``` Accuracy only mode: ```bash -mlperf-inf-mm-vl2l --settings.test.scenario server --settings.test.mode accuracy_only +mlperf-inf-mm-vl2l benchmark endpoint --settings.test.scenario server --settings.test.mode accuracy_only +``` + +## Docker + +[docker/](docker/) provides examples of Dockerfiles that install the VL2L benchmarking +CLI into the container images of the inference engine. This is useful when you have to +run both the inference engine and the VL2L benchmarking CLI inside the same container, +for example, in a situation where you must use a GPU cluster managed by +[Slurm](https://slurm.schedmd.com/) with [enroot](https://github.com/nvidia/enroot) and +[pyxis](https://github.com/NVIDIA/pyxis). + +### Benchmark against vLLM inside the container + +If you are running `mlperf-inf-mm-vl2l` inside a local environment that has access to +vLLM (such as inside a container that was created using the +[docker/vllm-cuda.Dockerfile](docker/vllm-cuda.Dockerfile)), you can use a single +`mlperf-inf-mm-vl2l benchmark vllm` command to achieve: + +1. Deploy an endpoint using vLLM. +2. Wait for the endpoint to be healthy. +3. Run the benchmark against that endpoint. + +For example, inside the container, you can run the Offline scenario Performance only +mode with: + +```bash +mlperf-inf-mm-vl2l benchmark vllm \ + --vllm.model.repo_id Qwen/Qwen3-VL-235B-A22B-Instruct \ + --vllm.arg=--tensor-parallel-size=8 \ + --vllm.arg=--limit-mm-per-prompt.video=0 \ + --settings.test.scenario offline \ + --settings.test.mode performance_only ``` ## Developer Guide diff --git a/multimodal/vl2l/docker/vllm-cuda.Dockerfile b/multimodal/vl2l/docker/vllm-cuda.Dockerfile new file mode 100644 index 0000000000..0c7597ce76 --- /dev/null +++ b/multimodal/vl2l/docker/vllm-cuda.Dockerfile @@ -0,0 +1,105 @@ +# Start from a vLLM image that was built from +# https://github.com/vllm-project/vllm/blob/main/docker/Dockerfile +# +# ============================================================================ +# USAGE EXAMPLES +# ============================================================================ +# +# 1. Install from default git URL (remote): +# docker build -t myimage . +# +# 2. Install from a different git URL or branch: +# docker build --build-arg MLPERF_INF_MM_VL2L_INSTALL_URL=git+https://github.com/USER/REPO.git@BRANCH#subdirectory=multimodal/vl2l \ +# -t myimage . +# +# 3. Install from local directory (build from repo root with git auto-detection): +# (Version number will be auto-detected from git if the build context includes .git) +# docker build --build-arg MLPERF_INF_MM_VL2L_INSTALL_URL=multimodal/vl2l \ +# -f multimodal/vl2l/docker/vllm-cuda.Dockerfile \ +# -t myimage . +# +# 4. Install from local directory (build from multimodal/vl2l subdirectory): +# (No .git in subdirectory, will use fallback version "0.0.0.dev0") +# docker build --build-arg MLPERF_INF_MM_VL2L_INSTALL_URL=. \ +# -f multimodal/vl2l/docker/vllm-cuda.Dockerfile \ +# -t myimage multimodal/vl2l +# +# 5. Install from local directory when pwd is already multimodal/vl2l: +# (No .git in subdirectory, will use fallback version "0.0.0.dev0") +# cd multimodal/vl2l +# docker build --build-arg MLPERF_INF_MM_VL2L_INSTALL_URL=. \ +# -f docker/vllm-cuda.Dockerfile \ +# -t myimage . +# +# 6. Install from local directory with a custom fallback version: +# (Override the default "0.0.0.dev0" version when git is not available) +# cd multimodal/vl2l +# docker build --build-arg MLPERF_INF_MM_VL2L_INSTALL_URL=. \ +# --build-arg MLPERF_INF_MM_VL2L_VERSION=1.0.0 \ +# -f docker/vllm-cuda.Dockerfile \ +# -t myimage . +# +# 7. Use a custom vLLM base image: +# docker build --build-arg BASE_IMAGE_URL=my-custom-vllm:latest \ +# -t myimage . +# +# ============================================================================ + +ARG BASE_IMAGE_URL=vllm/vllm-openai:nightly +FROM ${BASE_IMAGE_URL} + +# MLPERF_INF_MM_VL2L_INSTALL_URL can be either: +# 1. A git URL (default): git+https://github.com/... +# 2. A local directory path relative to the build context (e.g., multimodal/vl2l) +# Note: The build context is the directory you pass to `docker build` (the final arg) +# MLPERF_INF_MM_VL2L_INSTALL_URL must be a valid path inside that build context +ARG MLPERF_INF_MM_VL2L_INSTALL_URL=git+https://github.com/mlcommons/inference.git#subdirectory=multimodal/vl2l + +# Temporary directory inside the container where the build context will be copied +# Only used when installing from a local directory path +ARG BUILD_CONTEXT_DIR=/tmp/mm_vl2l_build_context + +# Fallback version to use when building from local directory without git metadata +# setuptools-scm will first try to detect version from .git, and use this as fallback +# Must be a valid PEP 440 version string (e.g., "0.0.0.dev0", "1.0.0", "0.1.0.dev1") +# Can be overridden at build time with --build-arg +ARG MLPERF_INF_MM_VL2L_VERSION=0.0.0.dev0 + +# Install +# - git (required for installing "git+..." dependencies to work) +# - tmux (for `vllm serve` and `mlperf-inf-mm-vl2l` in different tmux sessions) +# - vim (for editing files in the container) +RUN apt-get update && \ + apt-get install -y git tmux vim && \ + rm -rf /var/lib/apt/lists/* + +# Set environment variables. +# Setting LD_LIBRARY_PATH here ensures it works during the build process +# and persists when you run the container later. +#ENV LD_LIBRARY_PATH=/usr/local/lib/python3.12/dist-packages/torch/lib:$LD_LIBRARY_PATH + +# Copy build context. +# This will be used only if MLPERF_INF_MM_VL2L_INSTALL_URL is a local path. +COPY . ${BUILD_CONTEXT_DIR}/ + +# Install the mlperf-inference-multimodal-vl2l package. +# We use --system to install into the container's global python environment. +# Detect if MLPERF_INF_MM_VL2L_INSTALL_URL is a git URL or a local path: +RUN if echo "${MLPERF_INF_MM_VL2L_INSTALL_URL}" | grep -q "^git+"; then \ + echo "Installing from git URL: ${MLPERF_INF_MM_VL2L_INSTALL_URL}"; \ + uv pip install --system --no-cache --verbose "${MLPERF_INF_MM_VL2L_INSTALL_URL}"; \ + else \ + echo "Installing from local path: ${MLPERF_INF_MM_VL2L_INSTALL_URL}"; \ + # Check if the package directory is inside a git repository \ + if cd "${BUILD_CONTEXT_DIR}/${MLPERF_INF_MM_VL2L_INSTALL_URL}" && git rev-parse --git-dir > /dev/null 2>&1; then \ + echo "Git repository detected, setuptools-scm will detect version automatically"; \ + else \ + echo "Not in a git repository, using fallback version: ${MLPERF_INF_MM_VL2L_VERSION}"; \ + export SETUPTOOLS_SCM_PRETEND_VERSION_FOR_MLPERF_INFERENCE_MULTIMODAL_VL2L="${MLPERF_INF_MM_VL2L_VERSION}"; \ + fi; \ + uv pip install --system --no-cache --verbose "${BUILD_CONTEXT_DIR}/${MLPERF_INF_MM_VL2L_INSTALL_URL}"; \ + fi && \ + rm -rf "${BUILD_CONTEXT_DIR}" + +# Set the entrypoint to bash so it opens a shell by default +ENTRYPOINT ["/bin/bash"] \ No newline at end of file diff --git a/multimodal/vl2l/pyproject.toml b/multimodal/vl2l/pyproject.toml index d4644d6099..df75744b04 100644 --- a/multimodal/vl2l/pyproject.toml +++ b/multimodal/vl2l/pyproject.toml @@ -19,6 +19,9 @@ dependencies = [ "pydantic-typer @ git+https://github.com/CentML/pydantic-typer.git@wangshangsam/preserve-full-annotated-type", "pympler", "typer", + "scikit-learn", + "tabulate", + "hiclass", ] dynamic = ["version"] @@ -57,6 +60,9 @@ max-args = 10 [tool.ruff.lint.pydocstyle] convention = "google" +[tool.ruff.lint.flake8-type-checking] +runtime-evaluated-base-classes = ["pydantic.BaseModel"] + [tool.mypy] check_untyped_defs = true plugins = ['pydantic.mypy'] diff --git a/multimodal/vl2l/scripts/linters.sh b/multimodal/vl2l/scripts/linters.sh index e33cd8b1d4..73a433f2a6 100644 --- a/multimodal/vl2l/scripts/linters.sh +++ b/multimodal/vl2l/scripts/linters.sh @@ -46,4 +46,4 @@ docker run --rm \ -it \ -v "${PROJECT_ROOT}":/to-scan \ trufflesecurity/trufflehog:latest \ - filesystem /to-scan \ No newline at end of file + filesystem /to-scan diff --git a/multimodal/vl2l/src/mlperf_inference_multimodal_vl2l/cli.py b/multimodal/vl2l/src/mlperf_inference_multimodal_vl2l/cli.py index ede0188607..4fe9d8bbf3 100644 --- a/multimodal/vl2l/src/mlperf_inference_multimodal_vl2l/cli.py +++ b/multimodal/vl2l/src/mlperf_inference_multimodal_vl2l/cli.py @@ -2,415 +2,48 @@ from __future__ import annotations -import sys -from datetime import timedelta -from enum import StrEnum, auto -from pathlib import Path from typing import Annotated import mlperf_loadgen as lg from loguru import logger -from pydantic import BaseModel, DirectoryPath, Field, field_validator +from pydantic import FilePath # noqa: TC002 from pydantic_typer import Typer from typer import Option +from .deploy import LocalVllmDeployer +from .evaluation import run_evaluation +from .log import setup_loguru_for_benchmark +from .schema import Dataset, Endpoint, Settings, Verbosity, VllmEndpoint from .task import ShopifyGlobalCatalogue app = Typer() +benchmark_app = Typer() +app.add_typer( + benchmark_app, + name="benchmark", + help="Main CLI for running the VL2L benchmark.", +) -class TestScenario(StrEnum): - """The test scenario for the MLPerf inference LoadGen.""" - - SERVER = auto() - """Run the benchmark in server/interactive scenario.""" - - OFFLINE = auto() - """Run the benchmark in offline/batch scenario.""" - - def to_lgtype(self) -> lg.TestScenario: - """Convert the test scenario to its corresponding LoadGen type.""" - match self: - case TestScenario.SERVER: - return lg.TestScenario.Server - case TestScenario.OFFLINE: - return lg.TestScenario.Offline - case _: - raise UnknownTestScenarioValueError(self) - - -class UnknownTestScenarioValueError(ValueError): - """The exception raised when an unknown test scenario is encountered.""" - - def __init__(self, test_scenario: TestScenario) -> None: - """Initialize the exception.""" - super().__init__(f"Unknown test scenario: {test_scenario}") - - -class TestMode(StrEnum): - """The test mode for the MLPerf inference LoadGen.""" - - PERFORMANCE_ONLY = auto() - """Run the benchmark to evaluate performance.""" - - ACCURACY_ONLY = auto() - """Run the benchmark to evaluate model quality.""" - - def to_lgtype(self) -> lg.TestMode: - """Convert the test mode to its corresponding LoadGen type.""" - match self: - case TestMode.PERFORMANCE_ONLY: - return lg.TestMode.PerformanceOnly - case TestMode.ACCURACY_ONLY: - return lg.TestMode.AccuracyOnly - case _: - raise UnknownTestModeValueError(self) - - -class UnknownTestModeValueError(ValueError): - """The exception raised when an unknown test mode is encountered.""" - - def __init__(self, test_mode: TestMode) -> None: - """Initialize the exception.""" - super().__init__(f"Unknown test mode: {test_mode}") - - -class LoggingMode(StrEnum): - """Specifies when logging should be sampled and stringified.""" - - ASYNC_POLL = auto() - """ Logs are serialized and output on an IOThread that polls for new logs - at a fixed interval. This is the only mode currently implemented.""" - - END_OF_TEST_ONLY = auto() - """ Not implemented """ - - SYNCHRONOUS = auto() - """ Not implemented """ - - def to_lgtype(self) -> lg.LoggingMode: - """Convert logging mode to its corresponding LoadGen type.""" - match self: - case LoggingMode.ASYNC_POLL: - return lg.LoggingMode.AsyncPoll - case _: - raise UnknownLoggingModeValueError - - -class UnknownLoggingModeValueError(ValueError): - """The exception raised when an unknown logging mode is encountered.""" - - def __init__(self, test_mode: TestMode) -> None: - """Initialize the exception.""" - super().__init__(f"Unknown logging mode: {test_mode}") - - -class TestSettings(BaseModel): - """The test settings for the MLPerf inference LoadGen.""" - - scenario: Annotated[ - TestScenario, - Field( - description=( - "The MLPerf inference benchmarking scenario to run the benchmark in." - ), - ), - ] = TestScenario.OFFLINE - - mode: Annotated[ - TestMode, - Field( - description=( - "Whether you want to run the benchmark for performance or accuracy." - ), - ), - ] = TestMode.PERFORMANCE_ONLY - - offline_expected_qps: Annotated[ - float, - Field( - description="The expected QPS for the offline scenario.", - ), - ] = 100 - - server_expected_qps: Annotated[ - float, - Field( - description="The expected QPS for the server scenario. " - "Loadgen will try to send as many request as necessary " - "to achieve this value.", - ), - ] = 1 - - server_target_latency: Annotated[ - timedelta, - Field(description="""Expected latency constraint for Server scenario. - This is a constraint that we expect depending - on the argument server_expected_qps. - When server_expected_qps increases, we expect the latency to also increase. - When server_expected_qps decreases, we expect the latency to also decrease."""), - ] = timedelta(seconds=1) - - server_ttft_latency: Annotated[ - timedelta, - Field(description="""Time to First Token (TTFT) - latency constraint result validation" - (used when use_token_latencies is enabled)."""), - ] = timedelta(seconds=1) - - server_tpot_latency: Annotated[ - timedelta, - Field(description="""Time per Output Token (TPOT) - latency constraint result validation" - (used when use_token_latencies is enabled)."""), - ] = timedelta(seconds=1) - - min_duration: Annotated[ - timedelta, - Field( - description="""The minimum testing duration - (in seconds or ISO 8601 format like PT5S). - The benchmark runs until this value has been met.""", - ), - ] = timedelta(seconds=5) - - min_query_count: Annotated[ - int, - Field( - description="""The minimum testing query count. - The benchmark runs until this value has been met.""", - ), - ] = 100 - - use_token_latencies: Annotated[ - bool, - Field( - description="""By default, - the Server scenario will use server_target_latency as the constraint. - When set to True, the Server scenario will use server_ttft_latency - and server_tpot_latency as the constraint.""", - ), - ] = False - - @field_validator("server_target_latency", - "server_ttft_latency", - "server_tpot_latency", - "min_duration", - mode="before") - @classmethod - def parse_timedelta(cls, value: timedelta | - float | str) -> timedelta | str: - """Parse timedelta from seconds (int/float/str) or ISO 8601 format.""" - if isinstance(value, timedelta): - return value - if isinstance(value, (int, float)): - return timedelta(seconds=value) - if isinstance(value, str): - # Try to parse as a number first - try: - return timedelta(seconds=float(value)) - except ValueError: - # If it fails, it might be ISO 8601 format - # Let pydantic's default parser handle it - pass - return value - - def to_lgtype(self) -> lg.TestSettings: - """Convert the test settings to its corresponding LoadGen type.""" - settings = lg.TestSettings() - settings.scenario = self.scenario.to_lgtype() - settings.mode = self.mode.to_lgtype() - settings.offline_expected_qps = self.offline_expected_qps - settings.server_target_qps = self.server_expected_qps - settings.server_target_latency_ns = round( - self.server_target_latency.total_seconds() * 1e9) - settings.ttft_latency = round( - self.server_ttft_latency.total_seconds() * 1e9) - settings.tpot_latency = round( - self.server_tpot_latency.total_seconds() * 1e9) - settings.min_duration_ms = round( - self.min_duration.total_seconds() * 1000) - settings.min_query_count = self.min_query_count - settings.use_token_latencies = self.use_token_latencies - return settings - - -class LogOutputSettings(BaseModel): - """The test log output settings for the MLPerf inference LoadGen.""" - outdir: Annotated[ - DirectoryPath, - Field( - description="Where to save the output files from the benchmark.", - ), - ] = DirectoryPath("output") - prefix: Annotated[ - str, - Field( - description="Modify the filenames of the logs with a prefix.", - ), - ] = "mlperf_log_" - suffix: Annotated[ - str, - Field( - description="Modify the filenames of the logs with a suffix.", - ), - ] = "" - prefix_with_datetime: Annotated[ - bool, - Field( - description="Modify the filenames of the logs with a datetime.", - ), - ] = False - copy_detail_to_stdout: Annotated[ - bool, - Field( - description="Print details of performance test to stdout.", - ), - ] = False - copy_summary_to_stdout: Annotated[ - bool, - Field( - description="Print results of performance test to terminal.", - ), - ] = True - - @field_validator("outdir", mode="before") - @classmethod - def parse_directory_field(cls, value: str) -> None: - """Verify and create the output directory to store log files.""" - path = Path(value) - path.mkdir(exist_ok=True) - return path - - def to_lgtype(self) -> lg.LogOutputSettings: - """Convert the log output settings to its corresponding LoadGen type.""" - log_output_settings = lg.LogOutputSettings() - log_output_settings.outdir = self.outdir.as_posix() - log_output_settings.prefix = self.prefix - log_output_settings.suffix = self.suffix - log_output_settings.prefix_with_datetime = self.prefix_with_datetime - log_output_settings.copy_detail_to_stdout = self.copy_detail_to_stdout - log_output_settings.copy_summary_to_stdout = self.copy_summary_to_stdout - return log_output_settings - - -class LogSettings(BaseModel): - """The test log settings for the MLPerf inference LoadGen.""" - log_output: Annotated[ - LogOutputSettings, - Field( - description="Log output settings", - ), - ] = LogOutputSettings - log_mode: Annotated[ - LoggingMode, - Field( - description="""How and when logging should be - sampled and stringified at runtime""", - ), - ] = LoggingMode.ASYNC_POLL - enable_trace: Annotated[ - bool, - Field( - description="Enable trace", - ), - ] = True - - def to_lgtype(self) -> lg.LogSettings: - """Convert log settings to its corresponding LoadGen type.""" - log_settings = lg.LogSettings() - log_settings.log_output = self.log_output.to_lgtype() - log_settings.log_mode = self.log_mode.to_lgtype() - log_settings.enable_trace = self.enable_trace - return log_settings - - -class Settings(BaseModel): - """Combine the settings for the test and logging of LoadGen.""" - test: Annotated[ - TestSettings, - Field( - description="Test settings parameters.", - ), - ] = TestSettings - - logging: Annotated[ - LogSettings, - Field( - description="Test logging parameters", - ), - ] = LogSettings - - def to_lgtype(self) -> tuple[lg.TestSettings, lg.LogSettings]: - """Return test and log settings for LoadGen.""" - test_settings = self.test.to_lgtype() - log_settings = self.logging.to_lgtype() - return (test_settings, log_settings) - - -class Model(BaseModel): - """Specifies the model to use for the VL2L benchmark.""" - - repo_id: Annotated[ - str, - Field(description="The HuggingFace repository ID of the model."), - ] = "Qwen/Qwen3-VL-235B-A22B-Instruct" - - -class Dataset(BaseModel): - """Specifies a dataset on HuggingFace.""" - - repo_id: Annotated[ - str, - Field(description="The HuggingFace repository ID of the dataset."), - ] = "Shopify/the-catalogue-public-beta" - - token: Annotated[ - str | None, - Field( - description=( - "The token to access the HuggingFace repository of the dataset." - ), - ), - ] = None - - -class Verbosity(StrEnum): - """The verbosity level of the logger.""" - - TRACE = auto() - """The trace verbosity level.""" - - DEBUG = auto() - """The debug verbosity level.""" - - INFO = auto() - """The info verbosity level (default).""" - - -class Endpoint(BaseModel): - """Specifies the OpenAI API endpoint to use for the VL2L benchmark.""" - - url: Annotated[ - str, - Field( - description=( - "The URL of the OpenAI API endpoint that the inference requests will be" - " sent to." - ), +@app.command() +def evaluate( + filename: Annotated[ + FilePath, + Option( + help="Location of the accuracy file.", ), - ] = "http://localhost:8000/v1" - api_key: Annotated[ - str, - Field(description="The API key to authenticate the inference requests."), - ] = "" + ], + dataset: Dataset, +) -> None: + """Evaluate the accuracy of the VLM responses.""" + logger.info("Evaluating the accuracy file") + run_evaluation(filename=filename, dataset=dataset) -@app.command() -def main( +@benchmark_app.command(name="endpoint") +def benchmark_endpoint( *, settings: Settings, - model: Model, dataset: Dataset, endpoint: Endpoint, random_seed: Annotated[ @@ -422,11 +55,28 @@ def main( Option(help="The verbosity level of the logger."), ] = Verbosity.INFO, ) -> None: - """Main CLI for running the VL2L benchmark.""" - logger.remove() - logger.add(sys.stdout, level=verbosity.value.upper()) + """Benchmark an already deployed OpenAI API endpoint. + + This is suitable when you have already deployed an OpenAI API endpoint that is + accessible via a URL (and an API key, if applicable). + """ + setup_loguru_for_benchmark(settings=settings, verbosity=verbosity) + _run_benchmark( + settings=settings, + dataset=dataset, + endpoint=endpoint, + random_seed=random_seed, + ) + + +def _run_benchmark( + settings: Settings, + dataset: Dataset, + endpoint: Endpoint, + random_seed: int, +) -> None: + """Run the VL2L benchmark.""" logger.info("Running VL2L benchmark with settings: {}", settings) - logger.info("Running VL2L benchmark with model: {}", model) logger.info("Running VL2L benchmark with dataset: {}", dataset) logger.info( "Running VL2L benchmark with OpenAI API endpoint: {}", @@ -434,10 +84,9 @@ def main( logger.info("Running VL2L benchmark with random seed: {}", random_seed) test_settings, log_settings = settings.to_lgtype() task = ShopifyGlobalCatalogue( - dataset_cli=dataset, - model_cli=model, - endpoint_cli=endpoint, - scenario=settings.test.scenario, + dataset=dataset, + endpoint=endpoint, + settings=settings.test, random_seed=random_seed, ) sut = task.construct_sut() @@ -447,3 +96,33 @@ def main( logger.info("The VL2L benchmark with LoadGen completed.") lg.DestroyQSL(qsl) lg.DestroySUT(sut) + + +@benchmark_app.command(name="vllm") +def benchmark_vllm( + *, + settings: Settings, + dataset: Dataset, + vllm: VllmEndpoint, + random_seed: Annotated[ + int, + Option(help="The seed for the random number generator used by the benchmark."), + ] = 12345, + verbosity: Annotated[ + Verbosity, + Option(help="The verbosity level of the logger."), + ] = Verbosity.INFO, +) -> None: + """Deploy the endpoint using vLLM into a healthy state and then benchmark it. + + This is suitable when you have access to the `vllm serve` command in the local + environment where this benchmarking CLI is running. + """ + setup_loguru_for_benchmark(settings=settings, verbosity=verbosity) + with LocalVllmDeployer(endpoint=vllm, settings=settings): + _run_benchmark( + settings=settings, + dataset=dataset, + endpoint=vllm, + random_seed=random_seed, + ) diff --git a/multimodal/vl2l/src/mlperf_inference_multimodal_vl2l/deploy.py b/multimodal/vl2l/src/mlperf_inference_multimodal_vl2l/deploy.py new file mode 100644 index 0000000000..778492f6f1 --- /dev/null +++ b/multimodal/vl2l/src/mlperf_inference_multimodal_vl2l/deploy.py @@ -0,0 +1,272 @@ +"""Endpoint deployers for deploying and managing the lifecycles of VLM endpoints.""" + +from __future__ import annotations + +import os +import subprocess +import time +from abc import ABC, abstractmethod +from datetime import timedelta +from typing import TYPE_CHECKING, Self +from urllib.parse import urlparse + +import requests +from loguru import logger + +from .log import get_log_file_path + +if TYPE_CHECKING: + from types import TracebackType + + from .schema import EndpointToDeploy, Settings, VllmEndpoint + + +# HTTP status code constants +HTTP_OK = 200 + + +class EndpointStartupTimeoutError(RuntimeError): + """The exception raised when the endpoint fails to start within the timeout.""" + + def __init__(self, timeout: timedelta) -> None: + """Initialize the exception. + + Args: + timeout: The timeout duration that was exceeded. + """ + super().__init__( + f"Endpoint failed to start within the timeout of {timeout}.", + ) + + +class EndpointDeployer(ABC): + """Abstract base class for deploying and managing VLM endpoints. + + Subclasses should implement the deployment and cleanup logic for specific + inference frameworks (e.g., vLLM, TensorRT-LLM, etc.). + + This class is designed to be used as a context manager: + + ```python + with EndpointDeployer(...): + # Endpoint is ready to use + _run_benchmark(...) + # Endpoint is shut down + ``` + """ + + def __init__(self, endpoint: EndpointToDeploy, settings: Settings) -> None: + """Initialize the endpoint deployer. + + Args: + endpoint: The endpoint configuration. + settings: The benchmark settings. + """ + self.endpoint = endpoint + self.settings = settings + + def __enter__(self) -> Self: + """Enter the context manager and deploy the endpoint. + + Returns: + The deployer instance. + """ + self._startup() + self._wait_for_ready() + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + """Exit the context manager and shut down the endpoint. + + Args: + exc_type: The exception type if an exception was raised. + exc_val: The exception value if an exception was raised. + exc_tb: The exception traceback if an exception was raised. + """ + logger.info("Shutting down endpoint: {}", self.endpoint) + self._shutdown() + logger.info("Endpoint shut down successfully") + + @abstractmethod + def _startup(self) -> None: + """Start up the endpoint. + + This method should start the endpoint. + """ + raise NotImplementedError + + def _wait_for_ready(self) -> None: + """Wait for the endpoint to be ready.""" + health_url = self.endpoint.url.rstrip("/v1") + "/health" + start_time = time.time() + while time.time() - start_time < self.endpoint.startup_timeout.total_seconds(): + logger.info( + "Waiting {:0.2f} seconds for endpoint to be ready...", + time.time() - start_time, + ) + try: + response = requests.get( + health_url, + timeout=self.endpoint.healthcheck_timeout.total_seconds(), + ) + if response.status_code == HTTP_OK: + logger.info("Endpoint is healthy and ready!") + return + except requests.exceptions.RequestException: + pass + + time.sleep(self.endpoint.poll_interval.total_seconds()) + + raise EndpointStartupTimeoutError(self.endpoint.startup_timeout) + + @abstractmethod + def _shutdown(self) -> None: + """Shut down the endpoint and clean up resources. + + This method should gracefully terminate the endpoint and clean up + any resources (processes, files, etc.). + """ + raise NotImplementedError + + +class LocalProcessDeployer(EndpointDeployer): + """Deploy and manage an endpoint that is powered by a local process.""" + + def __init__(self, endpoint: EndpointToDeploy, settings: Settings) -> None: + """Initialize the local process deployer. + + Args: + endpoint: The endpoint configuration. + settings: The benchmark settings. + """ + super().__init__(endpoint=endpoint, settings=settings) + self._process: subprocess.Popen | None = None + + @abstractmethod + def _build_command(self) -> list[str]: + """Build the command to start the local process.""" + raise NotImplementedError + + @property + @abstractmethod + def _stdout_log_file_key(self) -> str: + """Get the log file key for the stdout log.""" + raise NotImplementedError + + @property + @abstractmethod + def _stderr_log_file_key(self) -> str: + """Get the log file key for the stderr log.""" + raise NotImplementedError + + def _startup(self) -> None: + """Start the local process.""" + cmd = self._build_command() + logger.info("Starting local process with command: {}", cmd) + logger.info( + "Starting local process with environment variables: {}", + os.environ) + + # Get log file paths + stdout_file_path = get_log_file_path( + key=self._stdout_log_file_key, + settings=self.settings, + ) + stderr_file_path = get_log_file_path( + key=self._stderr_log_file_key, + settings=self.settings, + ) + + # Start the server + process = subprocess.Popen( # noqa: S603 + cmd, + stdout=stdout_file_path.open("w"), + stderr=stderr_file_path.open("w"), + text=True, + ) + + logger.info("Started local process with PID: {}", process.pid) + logger.info( + "Local process stdout will be logged to: {}", + stdout_file_path) + logger.info( + "Local process stderr will be logged to: {}", + stderr_file_path) + + self._process = process + + def _shutdown(self) -> None: + """Shut down the local process gracefully.""" + if self._process is None: + logger.warning("No local process to shut down") + return + + # Try graceful termination first + self._process.terminate() + try: + self._process.wait( + timeout=self.endpoint.shutdown_timeout.total_seconds()) + logger.info("Local process terminated gracefully") + except subprocess.TimeoutExpired: + logger.warning( + "Local process did not terminate within timeout, forcefully killing", + ) + self._process.kill() + self._process.wait() + logger.info("Local process killed") + + +class LocalVllmDeployer(LocalProcessDeployer): + """Deploy and manage an endpoint that is powered by a local vLLM server.""" + + def __init__(self, endpoint: VllmEndpoint, settings: Settings) -> None: + """Initialize the endpoint deployer. + + Args: + endpoint: The endpoint configuration. + settings: The benchmark settings. + """ + super().__init__(endpoint=endpoint, settings=settings) + self.endpoint: VllmEndpoint + + @property + def _stdout_log_file_key(self) -> str: + """Get the log file key for the stdout log.""" + return "vllm-stdout" + + @property + def _stderr_log_file_key(self) -> str: + """Get the log file key for the stderr log.""" + return "vllm-stderr" + + def _build_command(self) -> list[str]: + """Build the command to start the vLLM server.""" + # Parse the URL to extract host and port + parsed_url = urlparse(self.endpoint.url) + host = parsed_url.hostname or "localhost" + port = parsed_url.port or 8000 + + # Build the command + cmd = [ + "vllm", + "serve", + self.endpoint.model.repo_id, + "--host", + host, + "--port", + str(port), + ] + + # Add API key if provided + if self.endpoint.api_key: + cmd.extend(["--api-key", self.endpoint.api_key]) + + # Add any additional arguments from the VllmEndpoint.cli + cmd.extend(self.endpoint.cli) + + return cmd diff --git a/multimodal/vl2l/src/mlperf_inference_multimodal_vl2l/evaluation.py b/multimodal/vl2l/src/mlperf_inference_multimodal_vl2l/evaluation.py new file mode 100644 index 0000000000..203b48d09a --- /dev/null +++ b/multimodal/vl2l/src/mlperf_inference_multimodal_vl2l/evaluation.py @@ -0,0 +1,262 @@ +"""Task definitions for the VL2L benchmark.""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import TYPE_CHECKING + +import numpy as np +from datasets import load_dataset +from hiclass.metrics import f1 # type: ignore[import-untyped] +from loguru import logger +from pydantic import ValidationError +from sklearn.metrics import f1_score # type: ignore[import-untyped] +from tabulate import tabulate + +if TYPE_CHECKING: + from pydantic import FilePath + + from .cli import Dataset as DatasetCLI + +from .schema import ProductMetadata + +# Initialize the Generator +# As of NumPy 1.17+, +# this isolates the random state, +# which is safer for reproducibility and parallel processing. +rng = np.random.default_rng() + + +def get_hierarchical_components( + predicted_path: str, + true_path: str, + separator: str = " > ", +) -> tuple[int, int, int]: + """Calculates the components for Hierarchical Precision. + + Args: + predicted_path: Categories predicted by the VLM. + true_path: Ground truth categories. + separator: String used to separate each category. + + Returns: + Tuple of number of intersections, + correctly predicted categories and + ground truth categories. + """ + # 1. Split the paths into categories (nodes) + predicted_categories = [c.strip() for c in predicted_path.split(separator)] + true_categories = [c.strip() for c in true_path.split(separator)] + + # Check for empty paths + if not predicted_categories or not true_categories: + return 0, len(predicted_categories), len(true_categories) + + # 2. Count the intersection (longest common prefix) + intersection_count = 0 + + # Iterate through the paths simultaneously + for pred_cat, true_cat in zip( + predicted_categories, true_categories, strict=False): + if pred_cat == true_cat: + intersection_count += 1 + else: + # Stop as soon as a mismatch is found (enforces hierarchical match) + break + + pred_length = len(predicted_categories) + true_length = len(true_categories) + + return intersection_count, pred_length, true_length + + +def calculate_hierarchical_f1(data: list[tuple[str, str]]) -> float: + """Calculates the aggregate hF scores for a list of samples. + + Args: + data: A list of tuples, where each tuple is + (predicted_path_str, true_path_str). + + Returns: + F1 score + """ + total_intersection = 0 + total_predicted_length = 0 + total_true_length = 0 + + # 1. Aggregate the components across all samples + for pred_path, true_path in data: + intersection, pred_len, true_len = get_hierarchical_components( + pred_path, + true_path, + ) + + total_intersection += intersection + total_predicted_length += pred_len + total_true_length += true_len + + # 2. Calculate hP and hR + hp = ( + total_intersection / total_predicted_length + if total_predicted_length > 0 + else 0.0 + ) + hr = total_intersection / total_true_length if total_true_length > 0 else 0.0 + + return 0.0 if hp + hr == 0 else 2 * (hp * hr) / (hp + hr) + + +def calculate_exact_match(generated_text: str, original_text: str) -> float: + """Calculates binary Exact Match (EM) score. + + We clean the text (lowercase, strip whitespace) for a fairer comparison. + + Args: + generated_text: Output from the VLM. + original_text: Ground truth information from the dataset. + + Returns: + 1 if the values match or 0 otherwise + """ + gen = generated_text.strip().lower() + orig = original_text.strip().lower() + + return 1.0 if gen == orig else 0.0 + + +def calculate_secondhand_f1(data: list[tuple[bool, bool]]) -> float: + """Calculate F1 score of is_secondhand field. + + Args: + data: List of tuples of predicted and true values + Returs: + f1 score + """ + y_pred = [] + y_src = [] + for pred, src in data: + y_pred.append(pred) + y_src.append(src) + + return f1_score(y_src, y_pred) + + +def calculate_hiclass_f1(data: list[tuple[str, str]]) -> float: + """Alt method to calculate hierarchical F1. + + Args: + data: List of tuples of predicted and true values + Returs: + f1 score + """ + y_pred_raw = [] + y_true_raw = [] + + for pred, src in data: + path1 = pred.split(" > ") + path2 = src.split(" > ") + + y_pred_raw.append(path1) + y_true_raw.append(path2) + + # 2. Find the global maximum length across ALL samples + # We check the longest path in both true and pred lists + max_len = max(len(p) for p in y_true_raw + y_pred_raw) + + # 3. Pad all lists to the global max_len + for i in range(len(y_true_raw)): + # Pad Truth + pad_len_true = max_len - len(y_true_raw[i]) + y_true_raw[i] += [""] * pad_len_true + + # Pad Prediction + pad_len_pred = max_len - len(y_pred_raw[i]) + y_pred_raw[i] += [""] * pad_len_pred + + # 4. Convert to numpy arrays + y_true = np.array(y_true_raw) + y_pred = np.array(y_pred_raw) + + # 5. Calculate Score + return f1(y_true, y_pred) + + +def run_evaluation(filename: FilePath, dataset: DatasetCLI) -> None: + """Main function to run the evaluation.""" + with Path.open(filename) as f: + model_output = json.load(f) + + original_data = load_dataset( + dataset.repo_id, + token=dataset.token, + split="+".join(dataset.split), + ) + + category_dataset_pred_src = [] + category_rand_pred_src = [] + is_secondhand_pred_src = [] + is_secondhand_rand_pred_src = [] + + for elem in model_output: + idx = elem["qsl_idx"] + response = bytes.fromhex(elem["data"]).decode("utf-8") + try: + pred_item = ProductMetadata.model_validate_json(response) + except ValidationError: + logger.exception( + "Response\n{}\n(for the sample at index {}) cannot be validated against" + " the expected schema\n{}\n. Thus, this submission result is invalid.", + response, + idx, + json.dumps(ProductMetadata.model_json_schema(), indent=2), + ) + ground_truth_item = original_data[idx] + category_dataset_pred_src.append( + (pred_item.category, ground_truth_item["ground_truth_category"]), + ) + is_secondhand_pred_src.append( + ( + pred_item.is_secondhand, + ground_truth_item["ground_truth_is_secondhand"], + ), + ) + # random category selection + # Uniform distribution is the default + rand_cat = rng.choice(ground_truth_item["potential_product_categories"], + size=1).tolist()[0] + category_rand_pred_src.append((rand_cat, + ground_truth_item["ground_truth_category"])) + + # random is_secondhand selection + rand_is_secondhand = rng.choice([True, False], size=1).tolist()[0] + is_secondhand_rand_pred_src.append((rand_is_secondhand, + ground_truth_item["ground_truth_is_secondhand"])) + + category_f1_score = calculate_hierarchical_f1(category_dataset_pred_src) + hiclass_f1_score = calculate_hiclass_f1(category_dataset_pred_src) + is_secondhand_f1_score = calculate_secondhand_f1(is_secondhand_pred_src) + + rand_cat_f1_score = calculate_hierarchical_f1(category_rand_pred_src) + rand_hiclass_f1_score = calculate_hierarchical_f1(category_rand_pred_src) + rand_is_seconhand_f1_score = calculate_secondhand_f1( + is_secondhand_rand_pred_src) + + data = [ + ["category", category_f1_score, hiclass_f1_score, + rand_cat_f1_score, rand_hiclass_f1_score], + ["is_secondhand", is_secondhand_f1_score, 0, + rand_is_seconhand_f1_score, 0], + ] + + logger.info( + "Results:\n{}", + tabulate( + data, + headers=["Fields", "F1 Score", + "HiClass F1 Score", + "F1 Score Random Selection", + "HiClass F1 Score Random Selection"], + tablefmt="fancy_grid", + ), + ) diff --git a/multimodal/vl2l/src/mlperf_inference_multimodal_vl2l/log.py b/multimodal/vl2l/src/mlperf_inference_multimodal_vl2l/log.py new file mode 100644 index 0000000000..a24eb514f0 --- /dev/null +++ b/multimodal/vl2l/src/mlperf_inference_multimodal_vl2l/log.py @@ -0,0 +1,43 @@ +"""Logging utilities for the VL2L benchmark.""" + +from __future__ import annotations + +import sys +from datetime import UTC, datetime +from pathlib import Path +from typing import TYPE_CHECKING + +from loguru import logger + +if TYPE_CHECKING: + from .schema import Settings, Verbosity + + +def get_log_file_path(key: str, settings: Settings) -> Path: + """Get the log file path for a given key based on MLPerf LoadGen's convention.""" + datetime_str_in_log_filename = ( + datetime.now(tz=UTC).astimezone().strftime("%FT%TZ_") + if settings.logging.log_output.prefix_with_datetime + else "" + ) + return Path( + settings.logging.log_output.outdir + / ( + f"{settings.logging.log_output.prefix}" + f"{datetime_str_in_log_filename}" + f"{key}" + f"{settings.logging.log_output.suffix}" + ".txt" + ), + ) + + +def setup_loguru_for_benchmark( + settings: Settings, verbosity: Verbosity) -> None: + """Setup the loguru logger for running the benchmark.""" + logger.remove() + logger.add(sys.stdout, level=verbosity.value.upper()) + logger.add( + get_log_file_path(key="benchmark", settings=settings), + level=verbosity.value.upper(), + ) diff --git a/multimodal/vl2l/src/mlperf_inference_multimodal_vl2l/schema.py b/multimodal/vl2l/src/mlperf_inference_multimodal_vl2l/schema.py new file mode 100644 index 0000000000..46c13d8dee --- /dev/null +++ b/multimodal/vl2l/src/mlperf_inference_multimodal_vl2l/schema.py @@ -0,0 +1,484 @@ +"""Schema definitions of various data structures in the VL2L benchmark.""" + +from __future__ import annotations + +from datetime import timedelta +from enum import StrEnum, auto +from pathlib import Path +from typing import Annotated, ClassVar, Self + +import mlperf_loadgen as lg +from openai.types import ResponseFormatJSONSchema +from openai.types.chat import ChatCompletionMessageParam +from pydantic import ( + BaseModel, + ConfigDict, + DirectoryPath, + Field, + NonNegativeInt, + field_validator, + model_validator, +) + +MAX_NUM_ESTIMATION_PERFORMANCE_SAMPLES = 100 +ALLOWED_MEMORY_FOOTPRINT_PERFORMANCE_SAMPLES = 1 * 1024 * 1024 * 1024 # 1GB + + +class TestScenario(StrEnum): + """The test scenario for the MLPerf inference LoadGen.""" + + SERVER = auto() + """Run the benchmark in server/interactive scenario.""" + + OFFLINE = auto() + """Run the benchmark in offline/batch scenario.""" + + def to_lgtype(self) -> lg.TestScenario: + """Convert the test scenario to its corresponding LoadGen type.""" + match self: + case TestScenario.SERVER: + return lg.TestScenario.Server + case TestScenario.OFFLINE: + return lg.TestScenario.Offline + case _: + raise UnknownTestScenarioValueError(self) + + +class UnknownTestScenarioValueError(ValueError): + """The exception raised when an unknown test scenario is encountered.""" + + def __init__(self, test_scenario: TestScenario) -> None: + """Initialize the exception.""" + super().__init__(f"Unknown test scenario: {test_scenario}") + + +class TestMode(StrEnum): + """The test mode for the MLPerf inference LoadGen.""" + + PERFORMANCE_ONLY = auto() + """Run the benchmark to evaluate performance.""" + + ACCURACY_ONLY = auto() + """Run the benchmark to evaluate model quality.""" + + def to_lgtype(self) -> lg.TestMode: + """Convert the test mode to its corresponding LoadGen type.""" + match self: + case TestMode.PERFORMANCE_ONLY: + return lg.TestMode.PerformanceOnly + case TestMode.ACCURACY_ONLY: + return lg.TestMode.AccuracyOnly + case _: + raise UnknownTestModeValueError(self) + + +class UnknownTestModeValueError(ValueError): + """The exception raised when an unknown test mode is encountered.""" + + def __init__(self, test_mode: TestMode) -> None: + """Initialize the exception.""" + super().__init__(f"Unknown test mode: {test_mode}") + + +class LoggingMode(StrEnum): + """Specifies when logging should be sampled and stringified.""" + + ASYNC_POLL = auto() + """ Logs are serialized and output on an IOThread that polls for new logs + at a fixed interval. This is the only mode currently implemented.""" + + END_OF_TEST_ONLY = auto() + """ Not implemented """ + + SYNCHRONOUS = auto() + """ Not implemented """ + + def to_lgtype(self) -> lg.LoggingMode: + """Convert logging mode to its corresponding LoadGen type.""" + match self: + case LoggingMode.ASYNC_POLL: + return lg.LoggingMode.AsyncPoll + case _: + raise UnknownLoggingModeValueError(self) + + +class UnknownLoggingModeValueError(ValueError): + """The exception raised when an unknown logging mode is encountered.""" + + def __init__(self, logging_mode: LoggingMode) -> None: + """Initialize the exception.""" + super().__init__(f"Unknown logging mode: {logging_mode}") + + +class BaseModelWithAttributeDescriptionsFromDocstrings(BaseModel): + """Base model that automatically adds attribute descriptions from docstrings.""" + + model_config = ConfigDict(use_attribute_docstrings=True, extra="forbid") + """Pydantic settings for + - Automatically add the attribute descriptions from docstrings. + - Forbid extra attributes. + """ + + +class TestSettings(BaseModelWithAttributeDescriptionsFromDocstrings): + """The test settings for the MLPerf inference LoadGen.""" + + scenario: TestScenario = TestScenario.OFFLINE + """The MLPerf inference benchmarking scenario to run the benchmark in.""" + + mode: TestMode = TestMode.PERFORMANCE_ONLY + """Whether you want to run the benchmark for performance measurement or accuracy + evaluation. + """ + + offline_expected_qps: float = 100 + """The expected QPS for the offline scenario.""" + + server_expected_qps: float = 1 + """The expected QPS for the server scenario. Loadgen will try to send as many + request as necessary to achieve this value. + """ + + server_target_latency: timedelta = timedelta(seconds=1) + """Expected latency constraint for Server scenario. This is a constraint that we + expect depending on the argument server_expected_qps. When server_expected_qps + increases, we expect the latency to also increase. When server_expected_qps + decreases, we expect the latency to also decrease. + """ + + server_ttft_latency: timedelta = timedelta(seconds=1) + """Time to First Token (TTFT) latency constraint result validation (used when + use_token_latencies is enabled). + """ + + server_tpot_latency: timedelta = timedelta(seconds=1) + """Time per Output Token (TPOT) latency constraint result validation (used when + use_token_latencies is enabled). + """ + + min_duration: timedelta = timedelta(seconds=5) + """The minimum testing duration (in seconds or ISO 8601 format like `PT5S`). The + benchmark runs until this value has been met. + """ + + min_query_count: int = 100 + """The minimum testing query count. The benchmark runs until this value has been + met. If min_query_count is less than the total number of samples in the dataset, + only the first min_query_count samples will be used during testing. + """ + + performance_sample_count_override: Annotated[ + NonNegativeInt, + Field( + description="The number of samples to use for the performance test. In the " # noqa: S608 + "performance mode, the benchmark will select P random samples from the " + "dataset, then send enough queries using these P samples (and repeating " + "them if necessary) to reach the min_duration and min_query_count. If a " + "non-zero value is passed to this flag, the P will be this value. " + "Otherwise, the benchmark will estimate how many samples can be loaded into" + f" {ALLOWED_MEMORY_FOOTPRINT_PERFORMANCE_SAMPLES} bytes of memory " + "based on the memory footprint of randomly selected " + f"{MAX_NUM_ESTIMATION_PERFORMANCE_SAMPLES} samples (at most), and then" + " use this estimation as the value P.", + ), + ] = 0 + + use_token_latencies: bool = False + """By default, the Server scenario will use `server_target_latency` as the + constraint. When set to True, the Server scenario will use `server_ttft_latency` and + `server_tpot_latency` as the constraint. + """ + + @field_validator( + "server_target_latency", + "server_ttft_latency", + "server_tpot_latency", + "min_duration", + mode="before", + ) + @classmethod + def parse_timedelta(cls, value: timedelta | float | + str) -> timedelta | str: + """Parse timedelta from seconds (int/float/str) or ISO 8601 format.""" + if isinstance(value, timedelta): + return value + if isinstance(value, (int, float)): + return timedelta(seconds=value) + if isinstance(value, str): + # Try to parse as a number first + try: + return timedelta(seconds=float(value)) + except ValueError: + # If it fails, it might be ISO 8601 format + # Let pydantic's default parser handle it + pass + return value + + def to_lgtype(self) -> lg.TestSettings: + """Convert the test settings to its corresponding LoadGen type.""" + settings = lg.TestSettings() + settings.scenario = self.scenario.to_lgtype() + settings.mode = self.mode.to_lgtype() + settings.offline_expected_qps = self.offline_expected_qps + settings.server_target_qps = self.server_expected_qps + settings.server_target_latency_ns = round( + self.server_target_latency.total_seconds() * 1e9, + ) + settings.ttft_latency = round( + self.server_ttft_latency.total_seconds() * 1e9) + settings.tpot_latency = round( + self.server_tpot_latency.total_seconds() * 1e9) + settings.min_duration_ms = round( + self.min_duration.total_seconds() * 1000) + settings.min_query_count = self.min_query_count + settings.performance_sample_count_override = ( + self.performance_sample_count_override + ) + settings.use_token_latencies = self.use_token_latencies + return settings + + +class LogOutputSettings(BaseModelWithAttributeDescriptionsFromDocstrings): + """The test log output settings for the MLPerf inference LoadGen.""" + + outdir: DirectoryPath = DirectoryPath("./output") + """Where to save the output files from the benchmark.""" + + prefix: str = "mlperf_log_" + """Modify the filenames of the logs with a prefix.""" + + suffix: str = "" + """Modify the filenames of the logs with a suffix.""" + + prefix_with_datetime: bool = False + """Modify the filenames of the logs with a datetime.""" + + copy_detail_to_stdout: bool = False + """Print details of performance test to stdout.""" + + copy_summary_to_stdout: bool = True + """Print results of performance test to terminal.""" + + @field_validator("outdir", mode="before") + @classmethod + def parse_directory_field(cls, value: str) -> Path: + """Verify and create the output directory to store log files.""" + path = Path(value) + path.mkdir(exist_ok=True) + return path + + def to_lgtype(self) -> lg.LogOutputSettings: + """Convert the log output settings to its corresponding LoadGen type.""" + log_output_settings = lg.LogOutputSettings() + log_output_settings.outdir = self.outdir.as_posix() + log_output_settings.prefix = self.prefix + log_output_settings.suffix = self.suffix + log_output_settings.prefix_with_datetime = self.prefix_with_datetime + log_output_settings.copy_detail_to_stdout = self.copy_detail_to_stdout + log_output_settings.copy_summary_to_stdout = self.copy_summary_to_stdout + return log_output_settings + + +class LogSettings(BaseModelWithAttributeDescriptionsFromDocstrings): + """The test log settings for the MLPerf inference LoadGen.""" + + log_output: LogOutputSettings = LogOutputSettings() + """Log output settings""" + + log_mode: LoggingMode = LoggingMode.ASYNC_POLL + """How and when logging should be sampled and stringified at runtime""" + + enable_trace: bool = True + """Enable trace""" + + def to_lgtype(self) -> lg.LogSettings: + """Convert log settings to its corresponding LoadGen type.""" + log_settings = lg.LogSettings() + log_settings.log_output = self.log_output.to_lgtype() + log_settings.log_mode = self.log_mode.to_lgtype() + log_settings.enable_trace = self.enable_trace + return log_settings + + +class Settings(BaseModelWithAttributeDescriptionsFromDocstrings): + """Combine the settings for the test and logging of LoadGen.""" + + test: TestSettings + """Test settings parameters.""" + + logging: LogSettings + """Test logging parameters.""" + + def to_lgtype(self) -> tuple[lg.TestSettings, lg.LogSettings]: + """Return test and log settings for LoadGen.""" + test_settings = self.test.to_lgtype() + log_settings = self.logging.to_lgtype() + return (test_settings, log_settings) + + +class Model(BaseModelWithAttributeDescriptionsFromDocstrings): + """Specifies the model to use for the VL2L benchmark.""" + + repo_id: str = "Qwen/Qwen3-VL-235B-A22B-Instruct" + """The HuggingFace repository ID of the model.""" + + +class Dataset(BaseModelWithAttributeDescriptionsFromDocstrings): + """Specifies a dataset on HuggingFace.""" + + repo_id: str = "Shopify/the-catalogue-public-beta" + """The HuggingFace repository ID of the dataset.""" + + token: str | None = None + """The token to access the HuggingFace repository of the dataset.""" + + split: list[str] = ["train", "test"] + """Dataset splits to use for the benchmark, e.g., "train" and "test". You can add + multiple splits by repeating the same CLI flag multiple times, e.g.: + --dataset.split test --dataset.split train + The testing dataset is a concatenation of these splits in the same order. + """ + + +class Verbosity(StrEnum): + """The verbosity level of the logger.""" + + TRACE = auto() + """The trace verbosity level.""" + + DEBUG = auto() + """The debug verbosity level.""" + + INFO = auto() + """The info verbosity level (default).""" + + +class Endpoint(BaseModelWithAttributeDescriptionsFromDocstrings): + """Specifies the OpenAI API endpoint to use for the VL2L benchmark.""" + + url: str = "http://localhost:8000/v1" + """The URL of the OpenAI API endpoint that the inference requests are sent to.""" + + api_key: str = "" + """The API key to authenticate the inference requests.""" + + model: Model + """The model to use for the VL2L benchmark, i.e., the model that was deployed behind + this OpenAI API endpoint. + """ + + use_guided_decoding: bool = True + """If True, the benchmark will enable guided decoding for the requests. This + requires the endpoint (and the inference engine behind it) to support guided + decoding. If False, the response from the endpoint might not be directly parsable + by the response JSON schema (e.g., the JSON object might be fenced in a + ```json ... ``` code block). + """ + + +class EndpointToDeploy(Endpoint): + """Specifies the endpoint to deploy for the VL2L benchmark.""" + + startup_timeout: timedelta = timedelta(minutes=20) + """The timeout for the endpoint to start up.""" + + shutdown_timeout: timedelta = timedelta(minutes=1) + """The timeout for the endpoint to shut down.""" + + poll_interval: timedelta = timedelta(seconds=60) + """The interval to poll the endpoint for readiness.""" + + healthcheck_timeout: timedelta = timedelta(seconds=5) + """The timeout for the healthcheck request to the endpoint.""" + + +class VllmEndpoint(EndpointToDeploy): + """Specifies how to deploy an OpenAI API endpoint in vLLM for benchmarking.""" + + cli: list[str] = [] + """The CLI arguments to pass to `vllm serve`. This excludes vllm's `--host`, + `--port`, --api-key` and `--model` CLI arguments which will be determined by + the `url`, `api_key` and `model` fields of this schema.""" + + @model_validator(mode="after") + def validate_cli(self) -> Self: + """Validate the vllm CLI arguments.""" + for flag in self.cli: + if not flag.startswith(("--", "-")): + raise PositionalVllmCliFlagError(flag) + if flag.split("=", 1)[0] in BlacklistedVllmCliFlagError.BLACKLIST: + raise BlacklistedVllmCliFlagError(flag) + return self + + +class PositionalVllmCliFlagError(ValueError): + """The exception raised when a positional vllm CLI flag is encountered.""" + + def __init__(self, flag: str) -> None: + """Initialize the exception.""" + super().__init__( + f"Positional vllm CLI flag: {flag} is not allowed. Only optional flags are " + "allowed to be passed to `--vllm.cli`.", + ) + + +class BlacklistedVllmCliFlagError(ValueError): + """The exception raised when a blacklisted vllm CLI flag is encountered.""" + + BLACKLIST: ClassVar[list[str]] = [ + "--model", "--host", "--port", "--api-key"] + + def __init__(self, flag: str) -> None: + """Initialize the exception.""" + super().__init__( + f"Blacklisted vllm CLI flag: {flag} is not allowed. The blacklisted flags" + f"are {self.BLACKLIST}.", + ) + + +class ProductMetadata(BaseModelWithAttributeDescriptionsFromDocstrings): + """Json format for the expected responses from the VLM.""" + + category: str + """The complete category of the product, e.g., + "Clothing & Accessories > Clothing > Shirts > Polo Shirts". + Each categorical level is separated by " > ". + """ + + brands: list[str] + """The brands of the product, e.g., ["giorgio armani", "hugo boss"].""" + + is_secondhand: bool + """True if the product is second-hand, False otherwise.""" + + +class LoadedSample(BaseModelWithAttributeDescriptionsFromDocstrings): + """Sample format to be used by LoadGen.""" + + messages: list[ChatCompletionMessageParam] + """The messages to be sent for chat completion to the VLM inference endpoint.""" + + response_format: ResponseFormatJSONSchema | None = None + """The response format to be used during guided decoding.""" + + @field_validator("messages", mode="after") + @classmethod + def ensure_content_is_list( + cls, + messages: list[ChatCompletionMessageParam], + ) -> list[ChatCompletionMessageParam]: + """If the content is a `ValidatorIterator`, convert it back to a list. + + This is to workaround a Pydantic bug. See + https://github.com/pydantic/pydantic/issues/9467 for more details. + """ + for message in messages: + if ( + "content" in message + and message["content"].__class__.__module__ + == "pydantic_core._pydantic_core" + and message["content"].__class__.__name__ == "ValidatorIterator" + ): + message["content"] = list( + message["content"]) # type: ignore[arg-type] + return messages diff --git a/multimodal/vl2l/src/mlperf_inference_multimodal_vl2l/task.py b/multimodal/vl2l/src/mlperf_inference_multimodal_vl2l/task.py index 6f28d2e941..9ac1c7a2fc 100644 --- a/multimodal/vl2l/src/mlperf_inference_multimodal_vl2l/task.py +++ b/multimodal/vl2l/src/mlperf_inference_multimodal_vl2l/task.py @@ -5,12 +5,13 @@ import array import asyncio import base64 +import json import random import threading import time from abc import ABC, abstractmethod from io import BytesIO -from typing import TYPE_CHECKING, Any +from typing import Any import mlperf_loadgen as lg from datasets import load_dataset @@ -18,15 +19,16 @@ from openai import AsyncOpenAI, DefaultAioHttpClient from pympler import asizeof -if TYPE_CHECKING: - from openai.types.chat.chat_completion_message_param import ( - ChatCompletionMessageParam, - ) - - from .cli import Dataset as DatasetCLI - from .cli import Endpoint as EndpointCLI - from .cli import Model as ModelCLI - from .cli import TestScenario +from .schema import ( + ALLOWED_MEMORY_FOOTPRINT_PERFORMANCE_SAMPLES, + MAX_NUM_ESTIMATION_PERFORMANCE_SAMPLES, + Dataset, + Endpoint, + LoadedSample, + ProductMetadata, + TestScenario, + TestSettings, +) class Task(ABC): @@ -34,37 +36,41 @@ class Task(ABC): def __init__( self, - dataset_cli: DatasetCLI, - model_cli: ModelCLI, - endpoint_cli: EndpointCLI, - scenario: TestScenario, + dataset: Dataset, + endpoint: Endpoint, + settings: TestSettings, random_seed: int = 12345, ) -> None: """Initialize the task. Args: - dataset_cli: The dataset configuration passed in from the CLI. - model_cli: The model configuration passed in from the CLI. - endpoint_cli: The endpoint configuration passed in from the CLI. - scenario: Declare if the benchmark is for performance or accuracy scenario + dataset: The dataset configuration passed in from the CLI. + endpoint: The endpoint configuration passed in from the CLI. + settings: Parameters of the current benchmark. random_seed: The random seed to use for the task. """ random.seed(random_seed) - self.scenario = scenario self.dataset = load_dataset( - dataset_cli.repo_id, - token=dataset_cli.token, + dataset.repo_id, + token=dataset.token, + split="+".join(dataset.split), + ) + logger.debug( + "Loaded {} samples from the dataset splits {}.", + len(self.dataset), + dataset.split, ) - self.model_cli = model_cli + self.endpoint = endpoint self.openai_api_client = AsyncOpenAI( - base_url=endpoint_cli.url, + base_url=endpoint.url, http_client=DefaultAioHttpClient(), - api_key=endpoint_cli.api_key, + api_key=endpoint.api_key, ) self.event_loop, self.event_loop_thread = ( self._create_event_loop_in_separate_thread() ) - self.loaded_messages: dict[int, list[ChatCompletionMessageParam]] = {} + self.loaded_samples: dict[int, LoadedSample] = {} + self.settings = settings def __del__(self) -> None: """Clean up the resources used by the task.""" @@ -100,17 +106,15 @@ def _run_event_loop_forever() -> None: event_loop_thread.start() return event_loop, event_loop_thread - @staticmethod @abstractmethod - def formulate_messages( - sample: dict[str, Any]) -> list[ChatCompletionMessageParam]: - """Formulate the messages for chat completion. + def formulate_loaded_sample(self, sample: dict[str, Any]) -> LoadedSample: + """Formulate the sample to be loaded into host memory before testing. Args: - sample: The sample to formulate the messages for. + sample: The sample from the dataset to be formulated into a loaded sample. Returns: - The messages for chat completion. + The loaded sample to be used for issuing queries to the inference endpoint. """ raise NotImplementedError @@ -121,40 +125,54 @@ def total_num_samples(self) -> int: This is used to set the `total_sample_count` parameter in the LoadGen QSL constructor. """ - return len(self.dataset) + return min(len(self.dataset), self.settings.min_query_count) @property - def max_num_samples_in_host_memory(self) -> int: - """The maximum number of samples that are guaranteed to fit in host memory. + def estimated_num_performance_samples(self) -> int: + """The estimated number of performance samples. This is used to set the `performance_sample_count` parameter in the LoadGen QSL - constructor. - - We estimate this value by assuming that we reserve 1GB of host memory for - storing the samples, and we try to estimate how many samples can fit in that - 1GB of memory. If this value is bigger than the total number of samples, we will - just load all samples into host memory. + constructor. In performance mode, the performance samples will be loaded into + host memory before testing, and LoadGen will keep sending queries using these + performance samples (and repeating them if necessary) until the min_duration + and min_query_count are met. + + We estimate this value by assuming that we reserve + `self.ALLOWED_MEMORY_FOOTPRINT_PERFORMANCE_SAMPLES` bytes of host memory for + storing the performance samples, and we try to estimate how many samples can fit + in that amount of memory. If this value is bigger than the total number of + samples, we will just load all samples into host memory. """ - num_estimation_samples = 10 - allowed_memory_footprint = 1024 * 1024 * 1024 # 1GB estimation_indices = random.sample( range(self.total_num_samples), - k=num_estimation_samples, + k=min( + MAX_NUM_ESTIMATION_PERFORMANCE_SAMPLES, + self.total_num_samples), ) estimation_samples = [ - self.formulate_messages(self.dataset[i]) for i in estimation_indices + self.formulate_loaded_sample(self.dataset[i]) for i in estimation_indices ] avg_messages_footprint = sum( asizeof.asizeof(m) for m in estimation_samples ) / len(estimation_samples) result = min( - round(allowed_memory_footprint / avg_messages_footprint), + round( + ALLOWED_MEMORY_FOOTPRINT_PERFORMANCE_SAMPLES / avg_messages_footprint, + ), self.total_num_samples, ) - logger.info( - "Estimated maximum number of samples to load into the host memory is {}.", + logger.debug( + "Estimated number of performance samples that will be loaded into the host" + " memory before testing is {}.", result, ) + if self.settings.performance_sample_count_override > 0: + logger.debug( + "However, performance_sample_count_override is set to {} and will " + "override the estimated number of performance samples inside the " + "LoadGen.", + self.settings.performance_sample_count_override, + ) return result def construct_qsl(self) -> int: @@ -167,7 +185,7 @@ def _load_samples_to_ram(query_sample_indices: list[int]) -> None: query_sample_indices: The indices of the samples to load to host memory. """ for index in query_sample_indices: - self.loaded_messages[index] = self.formulate_messages( + self.loaded_samples[index] = self.formulate_loaded_sample( self.dataset[index], ) @@ -179,76 +197,215 @@ def _unload_samples_from_ram(query_sample_indices: list[int]) -> None: memory. """ for index in query_sample_indices: - sample_to_unload = self.loaded_messages.pop(index, None) + sample_to_unload = self.loaded_samples.pop(index, None) del sample_to_unload return lg.ConstructQSL( self.total_num_samples, - self.max_num_samples_in_host_memory, + self.estimated_num_performance_samples, _load_samples_to_ram, _unload_samples_from_ram, ) - def construct_sut(self) -> int: - """Construct the LoadGen SUT for the task.""" - # avoid circular import - from .cli import TestScenario - - def _issue_queries(query_samples: list[lg.QuerySample]) -> None: - """Called by the LoadGen to issue queries to the inference endpoint. - - Args: - query_samples: The list of query samples to issue to the inference - endpoint. Each query sample contains (1) `id`: `lg.ResponseId` - (i.e., unique identifier for the response), and (2) `index`: - `lg.QuerySampleIndex` (i.e., the sample index into the dataset). - """ + async def _query_endpoint_async_batch( + self, query_sample: lg.QuerySample) -> None: + """Query the endpoint through the async OpenAI API client.""" + try: + sample = self.loaded_samples[query_sample.index] + logger.debug( + "Issuing query sample index: {} with response ID: {}", + query_sample.index, + query_sample.id, + ) + logger.trace( + "The sample (query sample index: {}, response ID: {}) to be " + "issued to the endpoint is: {}", + query_sample.index, + query_sample.id, + sample, + ) + tic = time.perf_counter() + response = await self.openai_api_client.chat.completions.create( # type: ignore[call-overload] + model=self.endpoint.model.repo_id, + messages=sample.messages, + response_format=( + sample.response_format.model_dump( + mode="json", + by_alias=True, + ) + if sample.response_format is not None + else None + ), + ) + logger.debug( + "Received response (ID: {}) from endpoint after {} seconds.", + query_sample.id, + time.perf_counter() - tic, + ) + logger.trace( + "The response (ID: {}) from the endpoint is: {}", + query_sample.id, + response, + ) + content = response.choices[0].message.content + if content is None: + content = "" + bytes_array = array.array("B", content.encode("utf-8")) + address, length = bytes_array.buffer_info() + size_in_bytes = length * bytes_array.itemsize + lg.QuerySamplesComplete( + [ + lg.QuerySampleResponse( + query_sample.id, + address, + size_in_bytes, + int(response.usage.completion_tokens), + ), + ], + ) + except Exception: # noqa: BLE001 + logger.exception( + "Error processing query sample index {} with response ID {}.", + query_sample.index, + query_sample.id, + ) + # Send empty response to LoadGen to avoid hanging. + empty_content = "" + bytes_array = array.array("B", empty_content.encode("utf-8")) + address, length = bytes_array.buffer_info() + size_in_bytes = length * bytes_array.itemsize + lg.QuerySamplesComplete( + [ + lg.QuerySampleResponse( + query_sample.id, + address, + size_in_bytes, + 0, + ), + ], + ) - async def _query_endpoint_async( - query_sample: lg.QuerySample) -> None: - """Query the endpoint through the async OpenAI API client.""" - messages = self.loaded_messages[query_sample.index] - logger.trace( - "Issuing query sample index: {} with response ID: {}", - query_sample.index, - query_sample.id, - ) - tic = time.perf_counter() - response = await self.openai_api_client.chat.completions.create( - model=self.model_cli.repo_id, - messages=messages, - ) - logger.trace( - "Received response (ID: {}) from endpoint after {} seconds: {}", - query_sample.id, - time.perf_counter() - tic, - response, - ) - content = response.choices[0].message.content - if content is None: - content = "" - bytes_array = array.array("B", content.encode("utf-8")) - address, length = bytes_array.buffer_info() - size_in_bytes = length * bytes_array.itemsize - lg.QuerySamplesComplete( + async def _query_endpoint_async_stream( + self, query_sample: lg.QuerySample) -> None: + """Query the endpoint through the async OpenAI API client.""" + ttft_set = False + try: + sample = self.loaded_samples[query_sample.index] + logger.debug( + "Issuing query sample index: {} with response ID: {}", + query_sample.index, + query_sample.id, + ) + logger.trace( + "The sample (query sample index: {}, response ID: {}) to be " + "issued to the endpoint is: {}", + query_sample.index, + query_sample.id, + sample, + ) + word_array = [] + stream = await self.openai_api_client.chat.completions.create( # type: ignore[call-overload] + stream=True, + model=self.endpoint.model.repo_id, + messages=sample.messages, + stream_options={"include_usage": True}, + response_format=( + sample.response_format.model_dump( + mode="json", + by_alias=True, + ) + if sample.response_format is not None + else None + ), + ) + # iterate asynchronously + total_tokens = 0 + async for chunk in stream: + + # This is the final chunk and will not have 'choices' + if chunk.usage is not None: + total_tokens = int(chunk.usage.completion_tokens) + + # If it's not the usage chunk, process it as a content + # chunk + choices = getattr(chunk, "choices", None) + if not choices: + continue + # first non-empty token -> TTFT + delta = choices[0].delta + text = getattr(delta, "content", None) + if not text: + continue + if ttft_set is False: + bytes_array = array.array("B", text.encode("utf-8")) + address, length = bytes_array.buffer_info() + size_in_bytes = length * bytes_array.itemsize + lg.FirstTokenComplete( + [ + lg.QuerySampleResponse( + query_sample.id, + address, + size_in_bytes, + 1, + ), + ], + ) + ttft_set = True + word_array.append(text) + + # when the stream ends, total latency + content = "".join(word_array) + bytes_array = array.array("B", content.encode("utf-8")) + address, length = bytes_array.buffer_info() + size_in_bytes = length * bytes_array.itemsize + lg.QuerySamplesComplete( + [ + lg.QuerySampleResponse( + query_sample.id, + address, + size_in_bytes, + total_tokens, + ), + ], + ) + except Exception: # noqa: BLE001 + logger.exception( + "Error processing query sample index {} with response ID {}.", + query_sample.index, + query_sample.id, + ) + # Send empty response to LoadGen to avoid hanging. + empty_content = "" + bytes_array = array.array("B", empty_content.encode("utf-8")) + address, length = bytes_array.buffer_info() + size_in_bytes = length * bytes_array.itemsize + # If TTFT was not set, we still need to complete that. + if not ttft_set: + lg.FirstTokenComplete( [ lg.QuerySampleResponse( query_sample.id, address, size_in_bytes, - int(response.usage.completion_tokens), + 0, ), ], ) + lg.QuerySamplesComplete( + [ + lg.QuerySampleResponse( + query_sample.id, + address, + size_in_bytes, + 0, + ), + ], + ) - for query_sample in query_samples: - asyncio.run_coroutine_threadsafe( - _query_endpoint_async(query_sample), - self.event_loop, - ) + def construct_sut(self) -> int: + """Construct the LoadGen SUT for the task.""" - def _issue_streaming_queries( - query_samples: list[lg.QuerySample]) -> None: + def _issue_queries(query_samples: list[lg.QuerySample]) -> None: """Called by the LoadGen to issue queries to the inference endpoint. Args: @@ -257,75 +414,13 @@ def _issue_streaming_queries( (i.e., unique identifier for the response), and (2) `index`: `lg.QuerySampleIndex` (i.e., the sample index into the dataset). """ - - async def _query_endpoint_async( - query_sample: lg.QuerySample) -> None: - """Query the endpoint through the async OpenAI API client.""" - messages = self.loaded_messages[query_sample.index] - logger.trace( - "Issuing query sample index: {} with response ID: {}", - query_sample.index, - query_sample.id, - ) - ttft_set = False - word_array = [] - stream = await self.openai_api_client.chat.completions.create( - stream=True, - model=self.model_cli.repo_id, - messages=messages, - stream_options={"include_usage": True}, - ) - # iterate asynchronously - total_tokens = 0 - async for chunk in stream: - - # This is the final chunk and will not have 'choices' - if chunk.usage is not None: - total_tokens = int(chunk.usage.completion_tokens) - - # If it's not the usage chunk, process it as a content - # chunk - choices = getattr(chunk, "choices", None) - if not choices: - continue - # first non-empty token -> TTFT - delta = choices[0].delta - text = getattr(delta, "content", None) - if not text: - continue - if ttft_set is False: - bytes_array = array.array( - "B", text.encode("utf-8")) - address, length = bytes_array.buffer_info() - size_in_bytes = length * bytes_array.itemsize - lg.FirstTokenComplete([ - lg.QuerySampleResponse(query_sample.id, - address, - size_in_bytes, - 1), - ]) - ttft_set = True - word_array.append(text) - - # when the stream ends, total latency - content = "".join(word_array) - bytes_array = array.array("B", content.encode("utf-8")) - address, length = bytes_array.buffer_info() - size_in_bytes = length * bytes_array.itemsize - lg.QuerySamplesComplete( - [ - lg.QuerySampleResponse( - query_sample.id, - address, - size_in_bytes, - total_tokens, - ), - ], - ) - for query_sample in query_samples: asyncio.run_coroutine_threadsafe( - _query_endpoint_async(query_sample), + ( + self._query_endpoint_async_stream(query_sample) + if self.settings.scenario is TestScenario.SERVER + else self._query_endpoint_async_batch(query_sample) + ), self.event_loop, ) @@ -354,9 +449,7 @@ async def _wait_for_pending_queries_async() -> None: ) future.result() - return lg.ConstructSUT(_issue_streaming_queries - if self.scenario is TestScenario.SERVER - else _issue_queries, _flush_queries) + return lg.ConstructSUT(_issue_queries, _flush_queries) class ShopifyGlobalCatalogue(Task): @@ -364,77 +457,104 @@ class ShopifyGlobalCatalogue(Task): def __init__( self, - dataset_cli: DatasetCLI, - model_cli: ModelCLI, - endpoint_cli: EndpointCLI, - scenario: TestScenario, + dataset: Dataset, + endpoint: Endpoint, + settings: TestSettings, random_seed: int = 12345, ) -> None: """Initialize the task. Args: - dataset_cli: The dataset configuration passed in from the CLI. - model_cli: The model configuration passed in from the CLI. - endpoint_cli: The endpoint configuration passed in from the CLI. - scenario: Declare if the benchmark is for performance or accuracy scenario + dataset: The dataset configuration passed in from the CLI. + endpoint: The endpoint configuration passed in from the CLI. + settings: Parameters of the current benchmark. random_seed: The random seed to use for the task. """ super().__init__( - dataset_cli=dataset_cli, - model_cli=model_cli, - endpoint_cli=endpoint_cli, - scenario=scenario, + dataset=dataset, + endpoint=endpoint, + settings=settings, random_seed=random_seed, ) - # Shopify only released the train split so far. - self.dataset = self.dataset["train"] - @staticmethod - def formulate_messages( - sample: dict[str, Any]) -> list[ChatCompletionMessageParam]: - """Formulate the messages for chat completion. + def formulate_loaded_sample(self, sample: dict[str, Any]) -> LoadedSample: + """Formulate the sample to be loaded into host memory before testing. Args: - sample: The sample to formulate the messages for. + sample: The sample from the dataset to be formulated into a loaded sample. Returns: - The messages for chat completion. + The loaded sample to be used for issuing queries to the inference endpoint. """ image_file = BytesIO() - sample["image"].save(image_file, format="PNG") + image_format = sample["product_image"].format + sample["product_image"].save(image_file, format=image_format) image_bytes = image_file.getvalue() image_base64 = base64.b64encode(image_bytes) image_base64_string = image_base64.decode("utf-8") - return [ + messages = [ { "role": "system", - "content": ( - "Please analyze the following product and provide the following " - "fields in JSON format:\n" - "- category\n" - "- standardized_title\n" - "- standardized_description\n" - "- brands\n" - "- is_secondhand\n" - ), + "content": f"""Please analyze the product from the user prompt +and provide the following fields in a valid JSON object: +- category +- brands +- is_secondhand + +You must choose only one, which is the most appropriate, correct, and specifc +category out of the list of possible product categories. + +Your response should only contain a valid JSON object and nothing more. +The JSON object should match the followng JSON schema: +```json +{json.dumps(ProductMetadata.model_json_schema(), indent=2)} +``` +""", }, { "role": "user", "content": [ { "type": "text", - "text": ( - f"The title of the product is: {sample['title']}\n\n" - f"The description of the product is: " - f"{sample['description']}" - ), + "text": f"""The title of the product is the following: +```text +{sample['product_title']} +``` + +The description of the product is the following: +```text +{sample['product_description']} +``` + +The following are the possible product categories: +```json +{sample['potential_product_categories']} +``` +""", }, { "type": "image_url", "image_url": { - "url": f"data:image/png;base64,{image_base64_string}", + "url": f"data:image/{image_format};base64," + f"{image_base64_string}", }, }, ], }, ] + + return LoadedSample( + messages=messages, + response_format=( + { + "type": "json_schema", + "json_schema": { + "name": "product_metadata", + "schema": ProductMetadata.model_json_schema(), + "strict": True, + }, + } + if self.endpoint.use_guided_decoding + else None + ), + )