diff --git a/test/.gitignore b/test/.gitignore new file mode 100644 index 00000000..e6578117 --- /dev/null +++ b/test/.gitignore @@ -0,0 +1,9 @@ +reports/ +dataset/ +logs/ +$null +*__pycache__/ +.* +*.log +start.bat +!.gitignore \ No newline at end of file diff --git a/test/README.md b/test/README.md new file mode 100644 index 00000000..00aeb064 --- /dev/null +++ b/test/README.md @@ -0,0 +1,219 @@ +# UCM Pytest Testing Framework + +A unified cache management testing framework based on pytest, supporting multi-level testing, flexible marking, performance data collection, and beautiful Allure report generation. + +## Framework Features + +- [x] 🏗️ **Multi-level Testing**: UnitTest(0) → Smoke(1) → Feature(2) → E2E(3) +- [x] 🏷️ **Flexible Marking**: Support for feature tags, platform tags, and reliability tags +- [x] 📊 **Data Collection**: Integrated InfluxDB performance data pushing +- [x] 📋 **Beautiful Reports**: Allure test report integration, supporting both static HTML and dynamic server modes +- [x] 🔧 **Configuration Management**: Flexible YAML-based configuration system +- [x] 🚀 **Automation**: Support for parallel test execution and automatic cleanup + +## Test Level Definitions + +| Level | Name | Description | Execution Time | +|-----|------|------|----------| +| 0 | UnitTest | Unit Tests | Every code commit | +| 1 | Smoke | Smoke Tests | Build verification | +| 2 | Feature | Feature Tests | When features are completed | +| 3 | E2E | End-to-End Tests | Before version release | + +## Directory Structure + +``` +test/ +├── config.yaml # Test framework configuration file +├── conftest.py # pytest configuration and fixtures, main program entry +├── pytest.ini # pytest markers and basic configuration +├── requirements.txt # Dependency package list +├── common/ # Common utility library +│ ├── __init__.py +│ ├── config_utils.py # Configuration file reading tools +│ ├── influxdb_utils.py # InfluxDB writing tools +│ └── allure_utils.py # Allure reporting tools +├── suites/ # Test case directory +│ ├── UnitTest/ # Unit tests (stage 0) +│ ├── Smoke/ # Smoke tests (stage 1) +│ ├── Feature/ # Feature tests (stage 2) +│ ├── E2E/ # End-to-end tests (stage 3) +│ └── test_demo_function.py# Example test cases +├── reports/ # Test report directory +└── logs/ # Test log directory +``` + +## Quick Start + +### 1. Environment Setup +```bash +# Install dependencies +pip install -r requirements.txt + +# Ensure Allure CLI is installed (for report generation) +# Download from: https://github.com/allure-framework/allure2/releases +``` + +### 2. Configuration File +The main configuration file is `config.yaml`, containing the following configuration items: +- **reports**: Report generation configuration (HTML/Allure) +- **log**: Logging configuration +- **influxdb**: Performance data push configuration +- **llm_connection**: LLM connection configuration + +### 3. Running Tests +```bash +# Run all tests +pytest + +# Run specific level tests +pytest --stage=1 # Run smoke tests +pytest --stage=2+ # Run feature and end-to-end tests + +# Run specific tag tests +pytest --feature=performance # Run performance-related tests +pytest --platform=gpu # Run GPU platform tests +pytest --reliability=high # Run high reliability tests + +# Combined filtering +pytest --stage=1 --feature=performance,accuracy # Performance and accuracy tests in smoke tests +``` + +## Test Case Standards + +### Basic Structure +```python +import pytest +import allure +from common.config_utils import config_utils as config_instance + +class TestExample: + """Test example class""" + + @pytest.mark.stage(2) + @pytest.mark.feature("performance") + @pytest.mark.platform("gpu") + def test_gpu_performance(self): + """Test GPU performance""" + # Arrange + test_data = config_instance.get_config("test_data") + + # Act & Assert + with allure.step("Execute GPU computation"): + result = perform_gpu_calculation(test_data) + assert result.is_valid + + # Collect performance data + from common.influxdb_utils import push_to_influx + push_to_influx("gpu_compute_time", result.duration, { + "test_name": "test_gpu_performance", + "platform": "gpu" + }) +``` + +### Marking Usage Guidelines + +#### 1. Level Markers (Required) +```python +@pytest.mark.stage(0) # Unit tests +@pytest.mark.stage(1) # Smoke tests +@pytest.mark.stage(2) # Feature tests +@pytest.mark.stage(3) # End-to-end tests +``` + +#### 2. Feature Markers (Recommended) +```python +@pytest.mark.feature("performance") # Performance tests +@pytest.mark.feature("accuracy") # Accuracy tests +@pytest.mark.feature("memory") # Memory tests +``` + +#### 3. Platform Markers (Optional) +```python +@pytest.mark.platform("gpu") # GPU platform tests +@pytest.mark.platform("npu") # NPU platform tests +@pytest.mark.platform("cpu") # CPU platform tests +``` + +#### 4. Reliability Markers (Optional) +```python +@pytest.mark.reliability("high") # High reliability tests +@pytest.mark.reliability("medium") # Medium reliability tests +@pytest.mark.reliability("low") # Low reliability tests +``` + +## Allure Report Integration + +### 1. Basic Usage +```python +import allure + +@allure.feature('User Authentication') +@allure.story('Login Function') +def test_user_login(): + """Test user login functionality""" + with allure.step("Enter username and password"): + login_page.enter_credentials("user", "pass") + + with allure.step("Click login button"): + login_page.click_login() + + with allure.step("Verify successful login"): + assert dashboard_page.is_displayed() + + # Add attachment + allure.attach("Screenshot data", name="Login Screenshot", + attachment_type=allure.attachment_type.PNG) +``` + +### 2. Report Configuration +Configure Allure reports in `config.yaml`: +```yaml +reports: + allure: + enabled: true + html_enable: true + serve_mode: true # Use dynamic server mode + serve_host: "localhost" + serve_port: 8081 + directory: "allure-results" +``` + +### 3. Report Viewing +- **Static HTML Mode**: Automatically generates static HTML reports after test completion +- **Dynamic Server Mode**: Starts Allure server, providing interactive report interface + +## Performance Data Collection + +### InfluxDB Integration +```python +from common.influxdb_utils import push_to_influx + +# Collect performance data in tests +def test_performance_metrics(): + start_time = time.time() + + # Execute test logic + result = perform_operation() + + # Push performance data to InfluxDB + push_to_influx("operation_duration", time.time() - start_time, { + "test_name": "test_performance_metrics", + "operation_type": "calculation", + "success": str(result.success) + }) +``` + +## Extensions and Customization + +### Adding New Markers +1. Add new marker definitions in the `markers` section of `pytest.ini` +2. Keep the `markers =` and `# end of markers` lines unchanged +3. Re-run tests to use new markers + +### Custom Configuration +Customize through `config.yaml`: +- Report format and storage location +- Log level and output format +- InfluxDB connection parameters +- LLM service configuration diff --git a/test/README_zh.md b/test/README_zh.md new file mode 100644 index 00000000..56c68815 --- /dev/null +++ b/test/README_zh.md @@ -0,0 +1,227 @@ +# UCM Pytest 测试框架 + +基于pytest的统一缓存管理测试框架,支持多级别测试、灵活标记、性能数据收集和Allure精美报告生成。 + +## 框架特性 + +- [x] 🏗️ **多级别测试**: UnitTest(0) → Smoke(1) → Feature(2) → E2E(3) +- [x] 🏷️ **灵活标记**: 支持功能标签、平台标签和可靠性标签 +- [x] 📊 **数据收集**: 集成InfluxDB性能数据推送 +- [x] 📋 **精美报告**: Allure测试报告集成,支持静态HTML和动态服务模式 +- [x] 🔧 **配置管理**: 基于YAML的灵活配置系统 +- [x] 🚀 **自动化**: 支持并行测试执行和自动清理 + +## 测试级别定义 + +| 级别 | 名称 | 说明 | 执行时机 | +|-----|------|------|----------| +| 0 | UnitTest | 单元测试 | 每次代码提交 | +| 1 | Smoke | 冒烟测试 | 构建验证 | +| 2 | Feature | 功能测试 | 特性完成时 | +| 3 | E2E | 端到端测试 | 版本发布前 | + +## 目录结构 + +``` +test/ +├── config.yaml # 测试框架配置文件 +├── conftest.py # pytest配置和fixtures,程序主入口 +├── pytest.ini # pytest标记和基础配置 +├── requirements.txt # 依赖包列表 +├── common/ # 通用工具库 +│ ├── __init__.py +│ ├── config_utils.py # 配置文件读取工具 +│ ├── influxdb_utils.py # InfluxDB写入工具 +│ └── allure_utils.py # Allure报告工具 +├── suites/ # 测试用例目录 +│ ├── UnitTest/ # 单元测试 (stage 0) +│ ├── Smoke/ # 冒烟测试 (stage 1) +│ ├── Feature/ # 功能测试 (stage 2) +│ ├── E2E/ # 端到端测试 (stage 3) +│ └── test_demo_function.py# 示例测试用例 +├── reports/ # 测试报告目录 +└── logs/ # 日志目录 +``` + +## 快速开始 + +### 1. 环境准备 +```bash +# 安装依赖 +pip install -r requirements.txt + +# 确保Allure CLI已安装(用于生成报告) +# 下载地址: https://github.com/allure-framework/allure2/releases +``` + +### 2. 配置文件 +主要配置文件为 `config.yaml`,包含以下配置项: +- **reports**: 报告生成配置(HTML/Allure) +- **log**: 日志配置 +- **influxdb**: 性能数据推送配置 +- **llm_connection**: LLM连接配置 + +### 3. 运行测试 +```bash +# 运行所有测试 +pytest + +# 运行特定级别的测试 +pytest --stage=1 # 运行冒烟测试 +pytest --stage=2+ # 运行功能测试和端到端测试 + +# 运行特定标签的测试 +pytest --feature=performance # 运行性能相关测试 +pytest --platform=gpu # 运行GPU平台测试 +pytest --reliability=high # 运行高可靠性测试 + +# 组合过滤 +pytest --stage=1 --feature=performance,accuracy # 冒烟测试中的性能和准确性测试 +``` + +## 测试用例标准 + +### 基本结构 +```python +import pytest +import allure +from common.config_utils import config_utils as config_instance + +class TestExample: + """测试示例类""" + + @pytest.mark.stage(2) + @pytest.mark.feature("performance") + @pytest.mark.platform("gpu") + def test_gpu_performance(self): + """测试GPU性能""" + # Arrange + test_data = config_instance.get_config("test_data") + + # Act & Assert + with allure.step("执行GPU计算"): + result = perform_gpu_calculation(test_data) + assert result.is_valid + + # 收集性能数据 + from common.influxdb_utils import push_to_influx + push_to_influx("gpu_compute_time", result.duration, { + "test_name": "test_gpu_performance", + "platform": "gpu" + }) +``` + +### 标记使用规范 + +#### 1. 级别标记 (必需) +```python +@pytest.mark.stage(0) # 单元测试 +@pytest.mark.stage(1) # 冒烟测试 +@pytest.mark.stage(2) # 功能测试 +@pytest.mark.stage(3) # 端到端测试 +``` + +#### 2. 功能标记 (推荐) +```python +@pytest.mark.feature("performance") # 性能测试 +@pytest.mark.feature("accuracy") # 准确性测试 +@pytest.mark.feature("memory") # 内存测试 +``` + +#### 3. 平台标记 (可选) +```python +@pytest.mark.platform("gpu") # GPU平台测试 +@pytest.mark.platform("npu") # NPU平台测试 +@pytest.mark.platform("cpu") # CPU平台测试 +``` + +#### 4. 可靠性标记 (可选) +```python +@pytest.mark.reliability("high") # 高可靠性测试 +@pytest.mark.reliability("medium") # 中等可靠性测试 +@pytest.mark.reliability("low") # 低可靠性测试 +``` + +## Allure 报告集成 + +### 1. 基本用法 +```python +import allure + +@allure.feature('用户认证') +@allure.story('登录功能') +def test_user_login(): + """测试用户登录功能""" + with allure.step("输入用户名和密码"): + login_page.enter_credentials("user", "pass") + + with allure.step("点击登录按钮"): + login_page.click_login() + + with allure.step("验证登录成功"): + assert dashboard_page.is_displayed() + + # 添加附件 + allure.attach("Screenshot data", name="登录截图", + attachment_type=allure.attachment_type.PNG) +``` + +### 2. 报告配置 +在 `config.yaml` 中配置Allure报告: +```yaml +reports: + allure: + enabled: true + html_enable: true + serve_mode: true # 使用动态服务模式 + serve_host: "localhost" + serve_port: 8081 + directory: "allure-results" +``` + +### 3. 报告查看 +- **静态HTML模式**: 测试完成后自动生成静态HTML报告 +- **动态服务模式**: 启动Allure服务器,提供交互式报告界面 + +## 性能数据收集 + +### InfluxDB 集成 +```python +from common.influxdb_utils import push_to_influx + +# 在测试中收集性能数据 +def test_performance_metrics(): + start_time = time.time() + + # 执行测试逻辑 + result = perform_operation() + + # 推送性能数据到InfluxDB + push_to_influx("operation_duration", time.time() - start_time, { + "test_name": "test_performance_metrics", + "operation_type": "calculation", + "success": str(result.success) + }) +``` + +## 扩展和自定义 + +### 添加新标记 +1. 在 `pytest.ini` 的 `markers` 部分添加新标记定义 +2. 保持 `markers =` 和 `# end of markers` 两行不变 +3. 重新运行测试即可使用新标记 + +### 自定义配置 +通过修改 `config.yaml` 可以自定义: +- 报告格式和存储位置 +- 日志级别和输出格式 +- InfluxDB连接参数 +- LLM服务配置 + +## 最佳实践 + +1. **测试命名**: 使用描述性的测试方法名 +2. **标记使用**: 为每个测试添加适当的级别和功能标记 +3. **步骤分解**: 使用Allure步骤将复杂测试分解为可读的步骤 +4. **数据驱动**: 使用参数化测试减少重复代码 +5. **环境隔离**: 使用fixtures确保测试环境的一致性 diff --git a/test/common/__init__.py b/test/common/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/test/common/allure_utils.py b/test/common/allure_utils.py new file mode 100644 index 00000000..80bbd1d2 --- /dev/null +++ b/test/common/allure_utils.py @@ -0,0 +1,196 @@ +""" +Allure Report Utility +Provides convenient Allure reporting functionality and decorators +""" + +import allure +import os +import pytest +import subprocess +import shutil +import time +import platform +import sys +from pathlib import Path +from typing import Dict, Any, ContextManager, Optional, Union, List + + + + +def setup_allure(config: Dict[str, Any]) -> Optional[Path]: + """Configure Allure results directory and write environment.properties.""" + allure_cfg = config.get("allure", {}) + if not allure_cfg.get("enabled", False): + return None + + # 1. 沿用你原来的目录逻辑 + base_dir = Path(config.get("base_dir", "reports")) + if config.get("use_timestamp", False) and base_dir.exists(): + timestamp_dirs = [ + d for d in base_dir.iterdir() + if d.is_dir() and d.name.startswith(config.get("directory_prefix", "pytest")) + ] + if timestamp_dirs: + timestamp_dirs.sort(key=lambda x: x.stat().st_mtime, reverse=True) + base_dir = timestamp_dirs[0] + + allure_dir = base_dir / allure_cfg.get("directory", "allure-results") + allure_dir.mkdir(parents=True, exist_ok=True) + os.environ["ALLURE_REPORT_DIR"] = str(allure_dir) + + # 2. 新增:写入环境信息 + env_info = _get_system_info() # 采集系统信息 + custom_env = allure_cfg.get("environment", {}) # 允许用户再追加/覆盖 + env_info.update(custom_env) + _create_environment_properties(allure_dir, env_info) + + return allure_dir + + +def check_allure_available() -> bool: + """Check if Allure CLI is installed and working.""" + try: + allure_path = shutil.which("allure") + if not allure_path: + return False + result = subprocess.run( + [allure_path, "--version"], + capture_output=True, + text=True, + timeout=10, + shell=True + ) + return result.returncode == 0 + except Exception: + return False + + +def serve_allure_report( + allure_results_dir: Union[str, Path], + host: str = "localhost", + port: int = 8080, + auto_open: bool = True +) -> Optional[subprocess.Popen]: + """Start Allure server and optionally open browser.""" + if not check_allure_available(): + print("Allure CLI not found. Install from https://github.com/allure-framework/allure2/releases") + return None + + allure_results_dir = Path(allure_results_dir) + if not allure_results_dir.exists() or not any(allure_results_dir.iterdir()): + print(f"Allure results directory missing or empty: {allure_results_dir}") + return None + + allure_path = shutil.which("allure") + cmd = [allure_path, "serve", str(allure_results_dir), "--host", host] + if port > 0: + cmd.extend(["--port", str(port)]) + + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + universal_newlines=True + ) + print(f"Allure server starting at http://{host}:{port} (PID: {process.pid})") + print("Please press Ctrl+C to stop the server") + time.sleep(3) + + if process.poll() is not None: + print("Allure server failed to start") + return None + + try: + while process.poll() is None: + time.sleep(0.5) + except KeyboardInterrupt: + print("\nStopping Allure server...") + process.terminate() + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + process.wait() + return process + + +def generate_allure_html( + allure_results_dir: Union[str, Path], + html_output_dir: Optional[Union[str, Path]] = None, + clean: bool = False, + auto_serve: bool = False +) -> Optional[Union[Path, subprocess.Popen]]: + """Generate static HTML report or serve dynamically.""" + if not check_allure_available(): + print("Allure CLI not found. Install from https://github.com/allure-framework/allure2/releases") + return None + + allure_results_dir = Path(allure_results_dir) + if not allure_results_dir.exists() or not any(allure_results_dir.iterdir()): + print(f"Allure results directory missing or empty: {allure_results_dir}") + return None + + if auto_serve: + return serve_allure_report(allure_results_dir) + + html_output_dir = Path(html_output_dir or allure_results_dir.parent / "allure-report") + if clean and html_output_dir.exists(): + shutil.rmtree(html_output_dir) + html_output_dir.mkdir(parents=True, exist_ok=True) + + allure_path = shutil.which("allure") + cmd = f'{allure_path} generate "{allure_results_dir}" -o "{html_output_dir}" --clean' + result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=60) + + if result.returncode == 0: + print(f"Allure HTML report generated: {html_output_dir}") + return html_output_dir + else: + print(f"HTML generation failed: {result.stderr}") + return None + + +def _create_environment_properties(allure_results_dir: Union[str, Path], + environment_info: Dict[str, str]) -> None: + allure_results_dir = Path(allure_results_dir) + allure_results_dir.mkdir(parents=True, exist_ok=True) + + env_file = allure_results_dir / "environment.properties" + + with open(env_file, 'w', encoding='utf-8') as f: + for key, value in environment_info.items(): + f.write(f"{key}={value}\n") + + print(f"Environment properties file created: {env_file}") + + +def _get_system_info() -> Dict[str, str]: + """Human-readable system information (English only).""" + info: Dict[str, str] = {} + + # ---------- OS ---------- + os_name = platform.system() + info["OS"] = os_name + + # ---------- Architecture ---------- + arch = platform.architecture()[0] # '64bit' / '32bit' + info["Architecture"] = "64-bit" if "64" in arch else "32-bit" + + # ---------- Python ---------- + # info["Python Implementation"] = platform.python_implementation() + info["Python"] = sys.version.split()[0].replace("Version=", "") + + # ---------- Hardware ---------- + machine = platform.machine() + info["Machine"] = "x86-64" if machine == "AMD64" else machine + proc = platform.processor() + if "Intel" in proc: + info["Processor"] = "Intel" + elif "AMD" in proc: + info["Processor"] = "AMD" + else: + info["Processor"] = proc.split()[0] if proc else "Kunpeng" + + return info \ No newline at end of file diff --git a/test/common/config_utils.py b/test/common/config_utils.py new file mode 100644 index 00000000..3cdc427b --- /dev/null +++ b/test/common/config_utils.py @@ -0,0 +1,80 @@ +import yaml +import os +import threading +from typing import Dict, Any + + +class ConfigUtils: + """ + Singleton Configuration Utility + Provides methods to read and access YAML configuration files. + """ + + _instance = None + _lock = threading.Lock() # Ensure thread-safe singleton creation + + def __new__(cls, config_file: str = None): + # Double-checked locking + if cls._instance is None: + with cls._lock: + if cls._instance is None: + instance = super().__new__(cls) + instance._init_config(config_file) + cls._instance = instance + return cls._instance + + def _init_config(self, config_file: str = None): + """Initialize configuration file path and load config""" + if config_file is None: + current_dir = os.path.dirname(os.path.abspath(__file__)) + config_file = os.path.join(current_dir, "..", "config.yaml") + + self.config_file = os.path.abspath(config_file) + self._config = None # Lazy load + + def _load_config(self) -> Dict[str, Any]: + """Internal method to read configuration from file""" + try: + with open(self.config_file, "r", encoding="utf-8") as f: + return yaml.safe_load(f) or {} + except FileNotFoundError: + print(f"[WARN] Config file not found: {self.config_file}") + return {} + except yaml.YAMLError as e: + print(f"[ERROR] Failed to parse YAML config: {e}") + return {} + + def read_config(self) -> Dict[str, Any]: + """Read configuration file (lazy load)""" + if self._config is None: + self._config = self._load_config() + return self._config + + def reload_config(self): + """Force reload configuration file""" + self._config = self._load_config() + + def get_config(self, key: str, default: Any = None) -> Any: + """Get top-level configuration item""" + config = self.read_config() + return config.get(key, default) + + def get_nested_config(self, key_path: str, default: Any = None) -> Any: + """Get nested configuration, e.g., 'influxdb.host'""" + config = self.read_config() + keys = key_path.split(".") + value = config + try: + for k in keys: + value = value[k] + return value + except (KeyError, TypeError): + return default + + +# Global instance +config_utils = ConfigUtils() + +if __name__ == "__main__": + print("InfluxDB config:", config_utils.get_config("influxdb")) + print("InfluxDB host:", config_utils.get_nested_config("influxdb.host", "localhost")) diff --git a/test/common/influxdb_utils.py b/test/common/influxdb_utils.py new file mode 100644 index 00000000..5d564061 --- /dev/null +++ b/test/common/influxdb_utils.py @@ -0,0 +1,58 @@ +""" +InfluxDB Data Push Utility +Provides convenient InfluxDB data writing functionality +""" + +from datetime import datetime +from typing import Dict, Any, Optional, Union +from influxdb_client import InfluxDBClient, Point, WritePrecision +from influxdb_client.client.write_api import SYNCHRONOUS +from config_utils import config_utils as config_instance + +class InfluxDBUtils: + """InfluxDB Utility Class""" + + def __init__(self): + """Initialize InfluxDB connection""" + self.config = config_instance.get_config("influxdb") + + +# Global InfluxDB utility instance +influxdb_utils = InfluxDBUtils() + + +def push_to_influx(measurement: str, + value: Union[int, float, str], + tags: Optional[Dict[str, str]] = None, + fields: Optional[Dict[str, Union[int, float, str]]] = None, + timestamp: Optional[datetime] = None) -> bool: + + return None + + +def push_test_metric(test_name: str, + metric_name: str, + value: Union[int, float], + additional_tags: Optional[Dict[str, str]] = None) -> bool: + print("Push to InfluxDB, To be implemented.") + + +if __name__ == "__main__": + # Simple data push + push_to_influx("response_time", 0.123) + + # Data push with tags + push_to_influx("accuracy", 0.95, { + "model": "v1.0", + "platform": "gpu", + "test_case": "classification" + }) + + # Test metric push + push_test_metric("test_calculation_accuracy", "calculation_time", 0.001, { + "feature": "accuracy" + }) + + # Data push with timestamp + from datetime import datetime + push_to_influx("memory_usage", 1024, {"test": "memory"}, timestamp=datetime.now()) \ No newline at end of file diff --git a/test/common/llmperf/__init__.py b/test/common/llmperf/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/test/common/llmperf/run_inference.py b/test/common/llmperf/run_inference.py new file mode 100644 index 00000000..801163de --- /dev/null +++ b/test/common/llmperf/run_inference.py @@ -0,0 +1,169 @@ +import json +import os +import random +from pathlib import Path +from typing import List, Dict, Any + +import yaml + +from common.llmperf.utils.token_benchmark import run_token_benchmark +from common.llmperf.utils.utils import reset_prefill_cache + + +def run_test_cases(test_cases, timestamp_dir, model, server_url, tokenizer_path): + """ + Execute all test cases and return the list of failed case indices and hit_rate mapping for each case. + Parameters: + test_cases — List of test cases read from the configuration file + timestamp_dir — Directory Path to save results + model — Model name + server_url — Base URL of the service + tokenizer_path— Path to the tokenizer + Returns: + failed_cases — List of failed case indices + case_hit_rate_map — Mapping of {case_idx: hit_rate} + """ + print(f"[INFO] Total {len(test_cases)} test cases to be executed") + failed_case = [] + + # Clear proxy environment variables + env = os.environ.copy() + env.pop('http_proxy', None) + env.pop('https_proxy', None) + + # Store hit_rate for each case_idx (to export to Excel later) + case_hit_rate_map = {} + + for i, case in enumerate(test_cases): + print(f"\n>>> Executing test case {i + 1} <<<") + reset_prefill_cache(env, server_url) + # Use a fixed random_seed for each test to control PC hit_rate + random_seed = random.randint(1, 100000) + + # Read parameters from configuration file + mean_input = case.get("mean_input_tokens", 5000) + stddev_input = case.get("stddev_input_tokens", 0) + mean_output = case.get("mean_output_tokens", 1000) + stddev_output = case.get("stddev_output_tokens", 0) + max_completed = case.get("max_num_completed_requests", 1) + concurrent = case.get("num_concurrent_requests", 1) + llm_api = case.get("llm_api", "openai") + additional_sampling_params = case.get("additional_sampling_params", "{}") + timeout = case.get("timeout", 60000) + hit_rate = case.get("hit_rate", 0) + + # Record hit_rate for this case + case_hit_rate_map[i] = hit_rate + try: + # Determine if two runs are needed (PC hit_rate test) + if hit_rate == 0: + run_token_benchmark( + llm_api=llm_api, + model=model, + test_timeout_s=timeout, + max_num_completed_requests=max_completed, + num_concurrent_requests=concurrent, + mean_input_tokens=mean_input, + stddev_input_tokens=stddev_input, + mean_output_tokens=mean_output, + stddev_output_tokens=stddev_output, + additional_sampling_params=additional_sampling_params, + results_dir=str(timestamp_dir), + random_seed=random_seed, + openai_api_base=server_url + "/v1", + tokenizer_path=tokenizer_path, + user_metadata={"case_idx": i} + ) + else: + print("[INFO] hit_rate > 0 detected, entering prefill mode") + # hit_rate > 0: first prefill mode + prefill_mean_input = int(mean_input * hit_rate / 100) + print(f"[INFO] Prefill execution: mean_input_tokens={prefill_mean_input}") + run_token_benchmark( + llm_api=llm_api, + model=model, + test_timeout_s=timeout, + max_num_completed_requests=max_completed, + num_concurrent_requests=concurrent, + mean_input_tokens=prefill_mean_input, + stddev_input_tokens=stddev_input, + mean_output_tokens=2, + stddev_output_tokens=stddev_output, + additional_sampling_params=additional_sampling_params, + results_dir=str(timestamp_dir), + random_seed=random_seed, + openai_api_base=server_url + "/v1", + tokenizer_path=tokenizer_path, + user_metadata={"case_idx": i, "phase": "prefill"} + ) + # Then run normal mode + print("[INFO] Prefill completed, switching to normal mode execution") + run_token_benchmark( + llm_api=llm_api, + model=model, + test_timeout_s=timeout, + max_num_completed_requests=max_completed, + num_concurrent_requests=concurrent, + mean_input_tokens=mean_input, + stddev_input_tokens=stddev_input, + mean_output_tokens=mean_output, + stddev_output_tokens=stddev_output, + additional_sampling_params=additional_sampling_params, + results_dir=str(timestamp_dir), + random_seed=random_seed, + openai_api_base=server_url + "/v1", + tokenizer_path=tokenizer_path, + user_metadata={"case_idx": i, "phase": "normal"} + ) + except Exception as e: + failed_case.append(i) + + return failed_case, case_hit_rate_map + +def getResult(performance_name: str): + results_dir = Path("result_outputs") + matched_values: List[Dict[str, Any]] = [] + for idx, fname in enumerate(os.listdir(results_dir)): + if not fname.lower().endswith(".json"): + continue + + file_path = os.path.join(results_dir, fname) + try: + with open(file_path, "r", encoding="utf-8") as f: + data = json.load(f) + except Exception as e: + print(f"[ERROR] Failed to read {file_path}: {e}") + continue + + # Iterate over each key in the dictionary + for key, value in data.items(): + if isinstance(key, str) and performance_name.lower() in key.lower(): + matched_values.append(value) + + print(f"[INFO] Found {len(matched_values)} matching values under {results_dir}, substring = '{performance_name}'") + return matched_values + +def inference_results(performance_name: str): + config_file = Path(__file__).parent.parent.parent / "config.yaml" + results_dir = Path("result_outputs") + if os.path.exists(results_dir) and len(os.listdir(results_dir)) != 0: + print("Test results already exist!!!!!!!!!!!!!!!") + else: + print("[INFO] Initialization complete, starting main process") + print(f"[INFO] Reading configuration file: {config_file}") + with open(config_file, 'r', encoding='utf-8') as f: + config = yaml.safe_load(f) + model = config.get("llm_connection", {}).get("model", "") + server_url = config.get("llm_connection", {}).get("server_url", "") + tokenizer_path = config.get("llm_connection", {}).get("tokenizer_path", "") + test_cases = config.get("llmperf_test_cases", []) + timestamp_dir = Path("result_outputs") + timestamp_dir.mkdir(parents=True, exist_ok=True) + print(f"[INFO] Created results directory: {timestamp_dir}") + + failed_cases, case_hit_rate_map = run_test_cases(test_cases, timestamp_dir, model, server_url, tokenizer_path) + total = len(test_cases) + print(f"\n[INFO] All tests completed! Success: {total - len(failed_cases)}/{total}") + if failed_cases: + print(f"[WARN] Failed case indices: {failed_cases}") + return getResult(performance_name) \ No newline at end of file diff --git a/test/common/llmperf/utils/__init__.py b/test/common/llmperf/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/test/common/llmperf/utils/common_metrics.py b/test/common/llmperf/utils/common_metrics.py new file mode 100644 index 00000000..3b05b437 --- /dev/null +++ b/test/common/llmperf/utils/common_metrics.py @@ -0,0 +1,17 @@ +# TODO (Avnishn): compute metrics in class +INTER_TOKEN_LAT = "inter_token_latency_s" +TTFT = "ttft_s" +E2E_LAT = "end_to_end_latency_s" +NUM_INPUT_TOKENS = "number_input_tokens" +NUM_OUTPUT_TOKENS = "number_output_tokens" +NUM_TOTAL_TOKENS = "number_total_tokens" +REQ_OUTPUT_THROUGHPUT = "request_output_throughput_token_per_s" +ERROR_MSG = "error_msg" +ERROR_CODE = "error_code" +ERROR_CODE_FREQ = "error_code_frequency" +NUM_ERRORS = "number_errors" +OUTPUT_THROUGHPUT = "mean_output_throughput_token_per_s" +NUM_COMPLETED_REQUESTS = "num_completed_requests" +COMPLETED_REQUESTS_PER_MIN = "num_completed_requests_per_min" +ERROR_RATE = "error_rate" +NUM_REQ_STARTED = "num_requests_started" \ No newline at end of file diff --git a/test/common/llmperf/utils/models.py b/test/common/llmperf/utils/models.py new file mode 100644 index 00000000..f70e8a7e --- /dev/null +++ b/test/common/llmperf/utils/models.py @@ -0,0 +1,22 @@ +from typing import Any, Dict, Optional, Tuple +from pydantic import BaseModel + + +class RequestConfig(BaseModel): + """The configuration for a request to the LLM API. + + Args: + model: The model to use. + prompt: The prompt to provide to the LLM API. + sampling_params: Additional sampling parameters to send with the request. + For more information see the Router app's documentation for the completions + llm_api: The name of the LLM API to send the request to. + metadata: Additional metadata to attach to the request for logging or validation purposes. + """ + + model: str + prompt: Tuple[str, int] + sampling_params: Optional[Dict[str, Any]] = None + llm_api: Optional[str] = None + metadata: Optional[Dict[str, Any]] = None + openai_api_base: Optional[str] = "" \ No newline at end of file diff --git a/test/common/llmperf/utils/openai_chat_completions_client.py b/test/common/llmperf/utils/openai_chat_completions_client.py new file mode 100644 index 00000000..b24320d0 --- /dev/null +++ b/test/common/llmperf/utils/openai_chat_completions_client.py @@ -0,0 +1,122 @@ +import json +import os +import time +from typing import Any, Dict, Tuple + +import requests + +from common.llmperf.utils.models import RequestConfig + +from common.llmperf.utils import common_metrics + + +class OpenAIChatCompletionsClient(): + """ + used for sending HTTP requests, receiving token streams, measuring latency, etc. + """ + def llm_request(self, request_config: RequestConfig) -> Tuple[Dict[str, Any], str, RequestConfig]: + prompt, prompt_len = request_config.prompt + + message = [ + {"role": "system", "content": ""}, + {"role": "user", "content": prompt}, + ] + model = request_config.model + body = { + "model": model, + "messages": message, + "stream": True, + "ignore_eos": True, + } + sampling_params = request_config.sampling_params + body.update(sampling_params or {}) + + time_to_next_token = [] + tokens_received = 0 + ttft = 0.0 + error_response_code = None + generated_text = "" + error_msg = "" + output_throughput = 0.0 + total_request_time = 0.0 + flag = False + + metrics: Dict[str, Any] = {} + + metrics[common_metrics.ERROR_CODE] = None + metrics[common_metrics.ERROR_MSG] = "" + + start_time = time.monotonic() + most_recent_received_token_time = start_time + + address = request_config.openai_api_base + + if not address: + raise ValueError("the environment variable OPENAI_API_BASE must be set.") + key = os.environ.get("OPENAI_API_KEY", "secret_abcdefg") + if not key: + raise ValueError("the environment variable OPENAI_API_KEY must be set.") + headers = {"Authorization": f"Bearer {key}"} + if not address.endswith("/"): + address = address + "/" + address += "chat/completions" + try: + with requests.post( + address, + json=body, + stream=True, + timeout=180, + headers=headers, + ) as response: + if response.status_code != 200: + error_msg = response.text + error_response_code = response.status_code + response.raise_for_status() + + for chunk in response.iter_lines(chunk_size=None): + if not chunk: + continue + stem = b"data: " + if chunk.startswith(stem): + chunk = chunk[len(stem):] + # Data might already be bytes or str + if isinstance(chunk, bytes): + chunk = chunk.decode("utf-8", errors="ignore") + if chunk.strip() == "[DONE]": + continue + tokens_received += 1 + data = json.loads(chunk) + if "error" in data: + error_msg = data["error"]["message"] + error_response_code = data["error"]["code"] + raise RuntimeError(error_msg) + delta = data["choices"][0]["delta"] + content = delta.get("content", None) or delta.get("reasoning_content", "") + if content: + if tokens_received != 0 and flag == False: + ttft = time.monotonic() - start_time + flag = True + else: + time_to_next_token.append(time.monotonic() - most_recent_received_token_time) + most_recent_received_token_time = time.monotonic() + generated_text += content + + total_request_time = time.monotonic() - start_time + if total_request_time > 0: + output_throughput = tokens_received / total_request_time + + except Exception as e: + metrics[common_metrics.ERROR_MSG] = error_msg + metrics[common_metrics.ERROR_CODE] = error_response_code + print(f"Warning Or Error: {e}") + print(error_response_code) + + metrics[common_metrics.INTER_TOKEN_LAT] = sum(time_to_next_token) + metrics[common_metrics.TTFT] = ttft + metrics[common_metrics.E2E_LAT] = total_request_time + metrics[common_metrics.REQ_OUTPUT_THROUGHPUT] = output_throughput + metrics[common_metrics.NUM_TOTAL_TOKENS] = tokens_received + prompt_len + metrics[common_metrics.NUM_OUTPUT_TOKENS] = tokens_received + metrics[common_metrics.NUM_INPUT_TOKENS] = prompt_len + + return metrics, generated_text, request_config \ No newline at end of file diff --git a/test/common/llmperf/utils/sonnet.txt b/test/common/llmperf/utils/sonnet.txt new file mode 100644 index 00000000..9f13ead4 --- /dev/null +++ b/test/common/llmperf/utils/sonnet.txt @@ -0,0 +1,84 @@ +Shall I compare thee to a summer's day? +Thou art more lovely and more temperate: +Rough winds do shake the darling buds of May, +And summer's lease hath all too short a date: +Sometime too hot the eye of heaven shines, +And often is his gold complexion dimm'd; +And every fair from fair sometime declines, +By chance or nature's changing course untrimm'd; +But thy eternal summer shall not fade +Nor lose possession of that fair thou owest; +Nor shall Death brag thou wander'st in his shade, +When in eternal lines to time thou growest: +So long as men can breathe or eyes can see, +So long lives this and this gives life to thee. +Then let not winter's ragged hand deface +In thee thy summer, ere thou be distill'd: +Make sweet some vial; treasure thou some place +With beauty's treasure, ere it be self-kill'd. +That use is not forbidden usury, +Which happies those that pay the willing loan; +That's for thyself to breed another thee, +Or ten times happier, be it ten for one; +Ten times thyself were happier than thou art, +If ten of thine ten times refigured thee: +Then what could death do, if thou shouldst depart, +Leaving thee living in posterity? +Be not self-will'd, for thou art much too fair +To be death's conquest and make worms thine heir. +Where art thou, Muse, that thou forget'st so long +To speak of that which gives thee all thy might? +Spend'st thou thy fury on some worthless song, +Darkening thy power to lend base subjects light? +Return, forgetful Muse, and straight redeem +In gentle numbers time so idly spent; +Sing to the ear that doth thy lays esteem +And gives thy pen both skill and argument. +Rise, resty Muse, my love's sweet face survey, +If Time have any wrinkle graven there; +If any, be a satire to decay, +And make Time's spoils despised every where. +Give my love fame faster than Time wastes life; +So thou prevent'st his scythe and crooked knife. +My glass shall not persuade me I am old, +So long as youth and thou are of one date; +But when in thee time's furrows I behold, +Then look I death my days should expiate. +For all that beauty that doth cover thee +Is but the seemly raiment of my heart, +Which in thy breast doth live, as thine in me: +How can I then be elder than thou art? +O, therefore, love, be of thyself so wary +As I, not for myself, but for thee will; +Bearing thy heart, which I will keep so chary +As tender nurse her babe from faring ill. +Presume not on thy heart when mine is slain; +Thou gavest me thine, not to give back again. +So am I as the rich, whose blessed key +Can bring him to his sweet up-locked treasure, +The which he will not every hour survey, +For blunting the fine point of seldom pleasure. +Therefore are feasts so solemn and so rare, +Since, seldom coming, in the long year set, +Like stones of worth they thinly placed are, +Or captain jewels in the carcanet. +So is the time that keeps you as my chest, +Or as the wardrobe which the robe doth hide, +To make some special instant special blest, +By new unfolding his imprison'd pride. +Blessed are you, whose worthiness gives scope, +Being had, to triumph, being lack'd, to hope. +If there be nothing new, but that which is +Hath been before, how are our brains beguiled, +Which, labouring for invention, bear amiss +The second burden of a former child! +O, that record could with a backward look, +Even of five hundred courses of the sun, +Show me your image in some antique book, +Since mind at first in character was done! +That I might see what the old world could say +To this composed wonder of your frame; +Whether we are mended, or whether better they, +Or whether revolution be the same. +O, sure I am, the wits of former days +To subjects worse have given admiring praise. \ No newline at end of file diff --git a/test/common/llmperf/utils/token_benchmark.py b/test/common/llmperf/utils/token_benchmark.py new file mode 100644 index 00000000..5f514267 --- /dev/null +++ b/test/common/llmperf/utils/token_benchmark.py @@ -0,0 +1,327 @@ +import logging +from collections.abc import Iterable +import json +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path +import re +import time +import random +from typing import Any, Dict, List, Optional, Tuple + +import pandas as pd + + +from transformers import AutoTokenizer + +from common.llmperf.utils import common_metrics +from common.llmperf.utils.models import RequestConfig +from common.llmperf.utils.openai_chat_completions_client import OpenAIChatCompletionsClient +from common.llmperf.utils.utils import ( + randomly_sample_sonnet_lines_prompt, + LLMPerfResults, + sample_random_positive_int, ) + + +def get_token_throughput_latencies( + model: str, + mean_input_tokens: int, + stddev_input_tokens: int, + mean_output_tokens: int, + stddev_output_tokens: int, + additional_sampling_params: Optional[Dict[str, Any]] = None, + num_concurrent_requests: int = 1, + max_num_completed_requests: int = 500, + test_timeout_s=90, + llm_api="openai", + random_seed: int = None, + openai_api_base: str = "", + tokenizer_path: str = None, +) -> Tuple[Dict[str, Any], List[Dict[str, Any]], float, float]: + """Get the token throughput and latencies for the given model. + + Args: + model: The name of the model to query. + mean_input_tokens: The mean number of tokens to send in the prompt for the request. + stddev_input_tokens: The standard deviation of the number of tokens to send in the prompt for the request. + mean_output_tokens: The mean number of tokens to generate per request. + stddev_output_tokens: The standard deviation of the number of tokens to generate per request. + additional_sampling_params: Additional sampling parameters to send with the request. + For more information see the LLM APIs documentation for the completions + num_concurrent_requests: The number of concurrent requests to make. Increase + this to increase the amount of load and vice versa. + test_timeout_s: The amount of time to run the test for before reporting results. + llm_api: The name of the llm api to use. Either "openai" or "litellm". + + Returns: + A summary of the performance metrics collected across all completed requests + (e.g. throughput, latencies, etc.) + The individual metrics for each request. + """ + random.seed(random_seed) + + print(f"Using tokenizer:{tokenizer_path}") + tokenizer = AutoTokenizer.from_pretrained(tokenizer_path) + get_token_length = lambda text: len(tokenizer.encode(text)) + + if not additional_sampling_params: + additional_sampling_params = {} + + # 1. create prompts + prompts: List[Tuple[str, int]] = [] + num_output_tokens_list: List[int] = [] + for i in range(max_num_completed_requests): + num_output = sample_random_positive_int(mean_output_tokens, stddev_output_tokens) + num_output_tokens_list.append(num_output) + prompts.append(randomly_sample_sonnet_lines_prompt( + prompt_tokens_mean=mean_input_tokens, + prompt_tokens_stddev=stddev_input_tokens, + tokenizer=tokenizer + )) + start_time = time.monotonic() + completed_requests: List[Dict[str, Any]] = [] + incremental_time_delay = 0.0 + client = OpenAIChatCompletionsClient() + futures = [] + + # 2. Submitting tasks using a thread pool + with ThreadPoolExecutor(max_workers=num_concurrent_requests) as executor: + for idx in range(max_num_completed_requests): + sampling = {"max_tokens": num_output_tokens_list[idx]} + sampling.update(additional_sampling_params) + cfg = RequestConfig( + model=model, + prompt=prompts[idx], + sampling_params=sampling, + llm_api=llm_api, + openai_api_base=openai_api_base + ) + futures.append(executor.submit(client.llm_request, cfg)) + # 3. Waiting for completion or timeout + for future in as_completed(futures, timeout=test_timeout_s): + try: + metrics, gen_text, req_cfg = future.result() + except Exception as e: + logging.warning(f"[WARN] Future raised exception: {e}") + continue + num_output_tokens = get_token_length(gen_text) + if num_output_tokens: + metrics[common_metrics.INTER_TOKEN_LAT] /= (metrics[common_metrics.NUM_OUTPUT_TOKENS] - 1) if ( + metrics[common_metrics.NUM_OUTPUT_TOKENS] - 1) else 1 + metrics[common_metrics.NUM_OUTPUT_TOKENS] = num_output_tokens + metrics[common_metrics.NUM_TOTAL_TOKENS] = metrics[ + common_metrics.NUM_INPUT_TOKENS] + num_output_tokens + try: + metrics[common_metrics.REQ_OUTPUT_THROUGHPUT] = num_output_tokens / metrics[ + common_metrics.E2E_LAT] + except ZeroDivisionError: + logging.error("Division by zero in throughput calculation.") + + completed_requests.append(metrics) + + incremental_time_delay += metrics.get(common_metrics.INTER_TOKEN_LAT, 0.0) + + end_time = time.monotonic() + + print(f"Results for token benchmark for {model} queried with the {llm_api} api.\n") + if mean_output_tokens == 2: + print(f"[INFO] First token sending pre-embedding completed\n") + return {}, [], 0.0, 0.0 + + ret = metrics_summary(completed_requests, start_time, end_time) + + metadata = { + "model": model, + "mean_input_tokens": mean_input_tokens, + "stddev_input_tokens": stddev_input_tokens, + "mean_output_tokens": mean_output_tokens, + "stddev_output_tokens": stddev_output_tokens, + "num_concurrent_requests": num_concurrent_requests, + "additional_sampling_params": additional_sampling_params, + } + + metadata["results"] = ret + elapsed_time = end_time - start_time + return metadata, completed_requests, elapsed_time, incremental_time_delay + + +def metrics_summary( + metrics: List[Dict[str, Any]], start_time: int, end_time: int +) -> Dict[str, Any]: + """Generate a summary over metrics generated from potentially multiple instances of this client. + + Args: + metrics: The metrics to summarize. + start_time: The time the test started. + end_time: The time the test ended. + + Returns: + A summary with the following information: + - Overall throughput (generated tokens / total test time) + - Number of completed requests + - Error rate + - Error code frequency + - Quantiles (p25-p99) for the following metrics: + - Inter token latency + - Time to first token + - User total request time + - Number of tokens processed per request + - Number of tokens generated per request + - User throughput (tokens / s) + """ + ret = {} + + def flatten(item): + for sub_item in item: + if isinstance(sub_item, Iterable) and not isinstance(sub_item, str): + yield from flatten(sub_item) + else: + yield sub_item + + df = pd.DataFrame(metrics) + df_without_errored_req = df[df[common_metrics.ERROR_CODE].isna()] + + for key in [ + common_metrics.INTER_TOKEN_LAT, + common_metrics.TTFT, + common_metrics.E2E_LAT, + common_metrics.REQ_OUTPUT_THROUGHPUT, + common_metrics.NUM_INPUT_TOKENS, + common_metrics.NUM_OUTPUT_TOKENS + ]: + print(key) + ret[key] = {} + series = pd.Series(list(flatten(df_without_errored_req[key]))).dropna() + quantiles = series.quantile([0.25, 0.5, 0.75, 0.9, 0.95, 0.99]).to_dict() + quantiles_reformatted_keys = {} + for quantile, value in quantiles.items(): + reformatted_key = f"p{int(quantile * 100)}" + print(f" {reformatted_key} = {value}") + quantiles_reformatted_keys[reformatted_key] = value + ret[key]["quantiles"] = quantiles_reformatted_keys + mean = series.mean() + print(f" mean = {mean}") + ret[key]["mean"] = mean + print(f" min = {series.min()}") + ret[key]["min"] = series.min() + print(f" max = {series.max()}") + ret[key]["max"] = series.max() + print(f" stddev = {series.std()}") + ret[key]["stddev"] = series.std() + + ret[common_metrics.NUM_REQ_STARTED] = len(metrics) + + error_codes = df[common_metrics.ERROR_CODE].dropna() + num_errors = len(error_codes) + ret[common_metrics.ERROR_RATE] = num_errors / len(metrics) if len(metrics) else 0 + ret[common_metrics.NUM_ERRORS] = num_errors + print(f"Number Of Errored Requests: {num_errors}") + error_code_frequency = dict(error_codes.value_counts()) + if num_errors: + error_code_frequency = dict(error_codes.value_counts()) + print("Error Code Frequency") + print(error_code_frequency) + ret[common_metrics.ERROR_CODE_FREQ] = str(error_code_frequency) + + overall_output_throughput = df_without_errored_req[ + common_metrics.NUM_OUTPUT_TOKENS + ].sum() / (end_time - start_time) + + print(f"Overall Output Throughput: {overall_output_throughput}") + ret[common_metrics.OUTPUT_THROUGHPUT] = overall_output_throughput + + num_completed_requests = len(df_without_errored_req) + num_completed_requests_per_min = ( + num_completed_requests / (end_time - start_time) * 60 + ) + print(f"Number Of Completed Requests: {num_completed_requests}") + print(f"Completed Requests Per Minute: {num_completed_requests_per_min}") + + ret[common_metrics.NUM_COMPLETED_REQUESTS] = num_completed_requests + ret[common_metrics.COMPLETED_REQUESTS_PER_MIN] = num_completed_requests_per_min + + return ret + + +def run_token_benchmark( + llm_api: str, + model: str, + test_timeout_s: int, + max_num_completed_requests: int, + num_concurrent_requests: int, + mean_input_tokens: int, + stddev_input_tokens: int, + mean_output_tokens: int, + stddev_output_tokens: int, + additional_sampling_params: str, + results_dir: str, + random_seed: int, + openai_api_base: str, + tokenizer_path: str, + user_metadata: Dict[str, Any], +): + """ + Args: + llm_api: The name of the llm api to use. + model: The name of the model to query. + max_num_completed_requests: The number of requests to complete before finishing the test. + test_timeout_s: The amount of time to run the test for before reporting results. + num_concurrent_requests: The number of concurrent requests to make. Increase + this to increase the amount of load and vice versa. + mean_input_tokens: The mean number of tokens to send in the prompt for the request. + stddev_input_tokens: The standard deviation of the number of tokens to send in the prompt for the request. + mean_output_tokens: The mean number of tokens to generate per request. + stddev_output_tokens: The standard deviation of the number of tokens to generate per request. + additional_sampling_params: Additional sampling parameters to send with the request. + For more information see the LLM APIs documentation for the completions. + results_dir: The directory to save the results to. + user_metadata: Additional metadata to include in the results. + """ + if mean_input_tokens < 40: + print( + "the minimum number of input tokens that will be sent is 41" + " because of the prompting logic right now" + ) + + summary, individual_responses, elapsed_time, incremental_time_delay = get_token_throughput_latencies( + model=model, + llm_api=llm_api, + test_timeout_s=test_timeout_s, + max_num_completed_requests=max_num_completed_requests, + mean_input_tokens=mean_input_tokens, + stddev_input_tokens=stddev_input_tokens, + mean_output_tokens=mean_output_tokens, + stddev_output_tokens=stddev_output_tokens, + num_concurrent_requests=num_concurrent_requests, + additional_sampling_params=json.loads(additional_sampling_params), + random_seed=random_seed, + openai_api_base=openai_api_base, + tokenizer_path=tokenizer_path, + ) + if mean_output_tokens == 2: + return summary, individual_responses, elapsed_time, incremental_time_delay + + timestamp = int(time.time() * 1000) + if results_dir: + filename = f"{model}_{mean_input_tokens}_{mean_output_tokens}_{timestamp}" + filename = re.sub(r"[^\w\d-]+", "-", filename) + filename = re.sub(r"-{2,}", "-", filename) + summary_filename = f"{filename}_summary" + + # Update to metadata. + summary.update(user_metadata) + summary["elapsed_time"] = elapsed_time + summary["incremental_time_delay"] = incremental_time_delay + + results = LLMPerfResults(name=summary_filename, metadata=summary) + results_dir = Path(results_dir) + if not results_dir.exists(): + results_dir.mkdir(parents=True) + elif not results_dir.is_dir(): + raise ValueError(f"{results_dir} is not a directory") + + try: + with open(results_dir / f"{summary_filename}.json", "w") as f: + json.dump(results.to_dict(), f, indent=4, default=str) + except Exception as e: + print(results.to_dict()) + raise e \ No newline at end of file diff --git a/test/common/llmperf/utils/utils.py b/test/common/llmperf/utils/utils.py new file mode 100644 index 00000000..e68078b4 --- /dev/null +++ b/test/common/llmperf/utils/utils.py @@ -0,0 +1,168 @@ +import json +import math +import os +import hashlib +import pathlib +import random +import subprocess +import time +from typing import Any, Dict, Tuple + +from transformers import LlamaTokenizerFast + + +RESULTS_VERSION = "2025-10-30" + + +class LLMPerfResults: + def __init__( + self, + name: str, + metadata: Dict[str, Any] = None, + ): + self.name = name + self.metadata = metadata or {} + self.timestamp = int(time.time()) + self.metadata["timestamp"] = self.timestamp + self.version = RESULTS_VERSION + + def to_dict(self): + data = { + "version": self.version, + "name": self.name, + } + data.update(self.metadata) + data = flatten_dict(data) + return data + + def json(self): + data = self.to_dict() + return json.dumps(data) + + +def upload_to_s3(results_path: str, s3_path: str) -> None: + """Upload the results to s3. + + Args: + results_path: The path to the results file. + s3_path: The s3 path to upload the results to. + + """ + + command = ["aws", "s3", "sync", results_path, f"{s3_path}/"] + result = subprocess.run(command) + if result.returncode == 0: + print("Files uploaded successfully!") + else: + print("An error occurred:") + print(result.stderr) + +def randomly_sample_sonnet_lines_prompt( + prompt_tokens_mean: int = 550, + prompt_tokens_stddev: int = 250, + tokenizer: LlamaTokenizerFast = None, +) -> Tuple[str, int]: + """Generate a prompt that randomly samples lines from a the shakespeare sonnet at sonnet.txt. + + Args: + prompt_length_mean: The mean length of the prompt to generate. + prompt_len_stddev: The standard deviation of the length of the prompt to generate. + expect_output_tokens: The number of tokens to expect in the output. This is used to + determine the length of the prompt. The prompt will be generated such that the output + will be approximately this many tokens. + + Note: + tokens will be counted from the sonnet using the Llama tokenizer. Using one tokenizer + ensures a fairer comparison across different LLMs. For example, if gpt 3.5 tokenizes + a prompt in less tokens than Llama2, then this will be reflected in the results since + they will be fed identical prompts. + + Returns: + A tuple of the prompt and the length of the prompt. + """ + get_token_length = lambda text: len(tokenizer.encode(text)) + + prompt = ( + "Randomly stream lines from the following text " + "Don't generate eos tokens:\n\n" + ) + # get a prompt length that is at least as long as the base + num_prompt_tokens = sample_random_positive_int( + prompt_tokens_mean, prompt_tokens_stddev + ) + while num_prompt_tokens < get_token_length(prompt): + num_prompt_tokens = sample_random_positive_int( + prompt_tokens_mean, prompt_tokens_stddev + ) + remaining_prompt_tokens = num_prompt_tokens - get_token_length(prompt) + sonnet_path = pathlib.Path(__file__).parent.resolve() / "sonnet.txt" + with open(sonnet_path, "r") as f: + sonnet_lines = f.readlines() + random.shuffle(sonnet_lines) + sampling_lines = True + while sampling_lines: + for line in sonnet_lines: + line_to_add = line + if remaining_prompt_tokens - get_token_length(line_to_add) < 0: + # This will cut off a line in the middle of a word, but that's ok since an + # llm should be able to handle that. + line_to_add = line_to_add[: int(math.ceil(remaining_prompt_tokens))] + sampling_lines = False + prompt += line_to_add + break + prompt += line_to_add + remaining_prompt_tokens -= get_token_length(line_to_add) + print(hashlib.sha256(prompt.encode("utf-8")).hexdigest()) + return (prompt, num_prompt_tokens) + + +def sample_random_positive_int(mean: int, stddev: int) -> int: + """Sample random numbers from a gaussian distribution until a positive number is sampled. + + Args: + mean: The mean of the gaussian distribution to sample from. + stddev: The standard deviation of the gaussian distribution to sample from. + + Returns: + A random positive integer sampled from the gaussian distribution. + """ + ret = -1 + while ret <= 0: + ret = int(random.gauss(mean, stddev)) + return ret + + +def flatten_dict(d, parent_key="", sep="_"): + items = [] + for k, v in d.items(): + new_key = f"{parent_key}{sep}{k}" if parent_key else k + if isinstance(v, dict): + items.extend(flatten_dict(v, new_key, sep=sep).items()) + else: + items.append((new_key, v)) + return dict(items) + +def reset_prefill_cache(env, server_url): + """ + prefix cache / HBM + Param: + env + server_url + """ + reset_url = f"{server_url}/reset_prefix_cache" + print(f"[INFO] Resetting prefix cache: {reset_url}") + try: + result = subprocess.run( + ["curl", "-X", "POST", reset_url, "-s", "-f"], + env=env, + check=False, + capture_output=True, + text=True, + timeout=10 + ) + if result.returncode == 0: + print("[INFO] Prefix cache successfully reset") + else: + print(f"[ERROR] Unsuccessfully reset prefix cache,error code: {result.returncode}") + except Exception as e: + print(f"[ERROR] Exception in resetting prefix cache: {e}") \ No newline at end of file diff --git a/test/config.yaml b/test/config.yaml new file mode 100644 index 00000000..df1bb6a7 --- /dev/null +++ b/test/config.yaml @@ -0,0 +1,50 @@ +reports: + base_dir: "reports" + use_timestamp: true + directory_prefix: "pytest" + html: # pytest-html + enabled: false + filename: "report.html" + title: "UCM Pytest Test Report" + allure: + enabled: true + html_enable: true + serve_mode: true # 使用allure serve mode + serve_host: "localhost" + serve_port: 8081 + directory: "allure-results" + +log: + enabled: true + path: "logs" + filename: "pytest.log" + use_timestamp: false + +# InfluxDB Configuration +influxdb: + host: localhost + port: 8086 + token: your-influxdb-token-here + org: your-organization + bucket: test-metrics + timeout: 10 + +# LLM Connection Configuration +llm_connection: + model: "qwen3" + server_url: "http://141.111.32.70:9382" + tokenizer_path: "/home/models/QwQ-32B" +# Performance Test Configuration +llmperf_test_cases: + - mean_input_tokens: 600 + mean_output_tokens: 300 + max_num_completed_requests: 1 + num_concurrent_requests: 1 + additional_sampling_params: "{}" + hit_rate: 0 + - mean_input_tokens: 600 + mean_output_tokens: 200 + max_num_completed_requests: 3 + num_concurrent_requests: 1 + additional_sampling_params: "{}" + hit_rate: 0 diff --git a/test/conftest.py b/test/conftest.py new file mode 100644 index 00000000..65ace924 --- /dev/null +++ b/test/conftest.py @@ -0,0 +1,388 @@ +from __future__ import annotations +import logging +from math import log +import shutil +import sys +import re +import pytest +import tempfile +import datetime as dt +import platform as pf +from pathlib import Path +from typing import Dict, Any, List +from common.config_utils import config_utils as config_instance +from common.allure_utils import setup_allure, generate_allure_html, serve_allure_report + + +# ---------------- Constants ---------------- +PRJ_ROOT = Path(__file__).resolve().parent +REPORT_DIR = PRJ_ROOT / "reports" +sys.path.insert(0, str(PRJ_ROOT)) + +# Global variables for Allure configuration +ALLURE_DIR = None +ALLURE_CONFIG = None + + +# ---------------- Logging ---------------- +# TODO:Unified log +def _init_logger() -> logging.Logger: + """Initialize and configure test logger.""" + log_config = config_instance.get_config("log", {}) + if not log_config.get("enabled", True): + return logging.getLogger("UCM_TEST") + + log = logging.getLogger("UCM_TEST") + log.setLevel(logging.DEBUG) + log.handlers.clear() + + log_path = Path(log_config.get("path", "logs")) + log_path.mkdir(parents=True, exist_ok=True) + + filename = config_instance.get_nested_config("log.filename", "pytest.log") + use_timestamp = config_instance.get_nested_config("log.use_timestamp", True) + if use_timestamp: + ts = dt.datetime.now().strftime("%Y%m%d-%H%M%S") + stem, ext = Path(filename).stem, Path(filename).suffix + filename = f"{stem}_{ts}{ext}" + + log_file = log_path / filename + + # Common formatter + console_fmt = logging.Formatter("[%(levelname)s] %(name)s: %(message)s") + + # File handler + fh = logging.FileHandler(log_file, encoding="utf-8") + fh.setLevel(logging.INFO) + fh.setFormatter(console_fmt) + log.addHandler(fh) + + # Console handler + ch = logging.StreamHandler() + ch.setLevel(logging.INFO) + ch.setFormatter(console_fmt) + log.addHandler(ch) + + log.propagate = False + return log + + +logger = _init_logger() +reports_config = config_instance.get_config("reports") + + +# ---------------- pytest Hooks ---------------- +def _prepare_report_dir(config: pytest.Config) -> Path: + """Prepare report directory based on config.yaml.""" + cfg = config_instance.get_config("reports", {}) + base_dir = Path(cfg.get("base_dir", "reports")) + prefix = cfg.get("directory_prefix", "pytest") + if cfg.get("use_timestamp", False): + ts = dt.datetime.now().strftime("%Y%m%d_%H%M%S") + report_dir = base_dir / f"{prefix}_{ts}" + else: + report_dir = base_dir + report_dir.mkdir(parents=True, exist_ok=True) + return report_dir + + +def _setup_html_report(config: pytest.Config, report_dir: Path) -> None: + """Configure pytest-html if enabled.""" + html_cfg = reports_config.get("html", {}) + if not html_cfg.get("enabled", True): + if hasattr(config.option, "htmlpath"): + config.option.htmlpath = None + logger.info("HTML report disabled according to config.yaml") + return + + html_filename = html_cfg.get("filename", "report.html") + html_path = report_dir / html_filename + config.option.htmlpath = str(html_path) + config.option.self_contained_html = True + logger.info(f"HTML report enabled → {html_path}") + + +def pytest_configure(config: pytest.Config) -> None: + """Pytest entry hook: configure logging and reports.""" + logger.info(f"Starting Test Session: {dt.datetime.now():%Y-%m-%d %H:%M:%S}") + global REPORT_DIR, ALLURE_DIR, ALLURE_CONFIG + REPORT_DIR = _prepare_report_dir(config) + _setup_html_report(config, REPORT_DIR) + reports_cfg = config_instance.get_config("reports", {}) + + # Save Allure configuration globally + ALLURE_CONFIG = reports_cfg + allure_dir = setup_allure(reports_cfg) + ALLURE_DIR = allure_dir + + # Configure allure-pytest plugin if enabled + if allure_dir: + # Set allure results directory for pytest-allure plugin + if hasattr(config.option, 'allure_report_dir'): + config.option.allure_report_dir = str(allure_dir) + # Also set as environment variable + import os + os.environ["ALLURE_REPORT_DIR"] = str(allure_dir) + logger.info(f"Allure results will be stored at {allure_dir}") + else: + logger.info("Allure report disabled according to config.yaml") + + +# ---------------- Marker & Filter Logic ---------------- +def _load_markers_from_ini() -> Dict[str, Dict[str, Any]]: + """Parse pytest.ini markers section.""" + ini_path = Path(__file__).with_name("pytest.ini") + if not ini_path.exists(): + return {} + + markers: Dict[str, Dict[str, Any]] = {} + in_markers = False + + for raw in ini_path.read_text(encoding="utf-8").splitlines(): + line = raw.strip() + if line.startswith("markers"): + in_markers = True + continue + if not in_markers or not line or line.startswith("#"): + continue + if line == "# end of markers": + break + + m = re.match(r"(\w+)(?:\((\w+)\))?\s*:\s*(.+)", line) + if m: + name, arg, help_txt = m.groups() + markers[name] = {"name": name, "arg": arg, "help": help_txt.strip()} + return markers + + +_MARKER_DEFS = _load_markers_from_ini() + + +def pytest_addoption(parser: pytest.Parser) -> None: + """Add CLI options dynamically from marker definitions.""" + for info in _MARKER_DEFS.values(): + parser.addoption( + f"--{info['name']}", + action="store", + default="", + help=( + f"Filter by {info['name']} marker. " + "Syntax: val1,val2,... | all | empty(no filter). " + f"({info['help']})" + ), + ) + + +def _get_marker_values(item: pytest.Item, name: str) -> List[str]: + """Extract marker values from test item.""" + vals: List[str] = [] + mark_infos = [] + + for mark in item.iter_markers(name=name): + mark_val_list = [str(a) for a in mark.args] + + if name in mark.kwargs: + mark_val_list.append(str(mark.kwargs[name])) + + vals.extend(mark_val_list) + mark_infos.append(f"{name}: {', '.join(mark_val_list) if mark_val_list else 'None'}") + + return vals + + +@pytest.hookimpl(hookwrapper=True, tryfirst=True) +def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo): + """Attach test reports to item for access in fixtures.""" + outcome = yield + rep = outcome.get_result() + setattr(item, f"rep_{rep.when}", rep) + + +def pytest_collection_modifyitems(config: pytest.Config, items: List[pytest.Item]) -> None: + """Filter test collection based on CLI options.""" + # Store marker information for later use in test execution + for item in items: + markers_info = [] + for mark in item.iter_markers(): + # Skip pytest's built-in markers + if mark.name in ['parametrize', 'usefixtures', 'skip', 'skipif', 'xfail']: + continue + markers_info.append({ + 'name': mark.name, + 'args': mark.args + }) + # Store marker info in the item for later use + item._pytest_markers_info = markers_info + + # Original filtering logic + kept = items[:] + + for name, info in _MARKER_DEFS.items(): + opt = config.getoption(f"--{name}", "").strip() + if not opt: + continue + + # all means any marker value with the marker + if opt == "all": + kept = [it for it in kept if _get_marker_values(it, name)] + continue + + # 特殊处理 stage + if name == "stage": + if opt.endswith("+"): + min_stage = int(opt[:-1]) + kept = [ + it for it in kept + if any(int(v) >= min_stage for v in _get_marker_values(it, "stage")) + ] + else: + wanted = {x.strip() for x in opt.split(",") if x.strip()} + kept = [ + it for it in kept + if any(v in wanted for v in _get_marker_values(it, "stage")) + ] + else: + wanted = {x.strip() for x in opt.split(",") if x.strip()} + kept = [ + it for it in kept + if any(v in wanted for v in _get_marker_values(it, name)) + ] + + if not kept: + logger.warning( + "No tests matched filter conditions: %s", + {m: config.getoption(f"--{m}") for m in _MARKER_DEFS}, + ) + else: + logger.info( + "Filter %d / %d tests after applying markers %s", + len(kept), len(items), + {m: config.getoption(f'--{m}') for m in _MARKER_DEFS if config.getoption(f'--{m}')} + ) + + items[:] = kept + + +@pytest.hookimpl(tryfirst=True) +def pytest_runtest_setup(item): + """Add pytest markers as Allure labels during test setup.""" + # Add pytest markers as Allure labels + if hasattr(item, '_pytest_markers_info'): + import allure + for marker_info in item._pytest_markers_info: + marker_name = marker_info['name'] + marker_args = marker_info['args'] + + # Add marker as Allure label + label_name = f"pytest_{marker_name}" + if marker_args: + # If marker has arguments, add each as a separate label + for arg in marker_args: + allure.dynamic.label(label_name, str(arg)) + else: + # If marker has no arguments, just add the marker name + allure.dynamic.label(label_name, marker_name) + + +# ---------------- Fixtures ---------------- +@pytest.fixture(scope="session", autouse=True) +def session_logger() -> None: + """Session-level setup and teardown with system info logging.""" + logger.info("-" * 60) + logger.info(f"{'Python':<10} │ {pf.python_version()}") + logger.info(f"{'Platform':<10} │ {pf.system()} {pf.release()}") + logger.info("-" * 60) + yield + logger.info("-" * 60) + logger.info(f"{'Reports at':<10} │ {REPORT_DIR}") + logger.info("Test session ended") + logger.info("-" * 60) + + +@pytest.fixture(scope="function", autouse=True) +def test_logger(request): + """Function-level logging before and after each test.""" + node = request.node + klass = f"{node.cls.__name__}::" if node.cls else "" + identifier = f"{node.path.relative_to(Path.cwd())}::{klass}{node.name}" + print() + logger.info("-" * 60) + logger.info(f"[TEST_CLASS] {identifier}") + logger.info(f"[START] {node.name}") + yield + + result = getattr(node, "rep_call", None) + status = "PASSED" if result and result.outcome == "passed" else "FAILED" + logger.info(f"[ END ] {node.name} - {status}") + if result and getattr(result, "longrepr", None): + logger.error(f"Error details: {result.longrepr}") + + +@pytest.hookimpl(hookwrapper=True, tryfirst=True) +def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo): + """Attach test reports to item for access in fixtures.""" + outcome = yield + rep = outcome.get_result() + setattr(item, f"rep_{rep.when}", rep) + + +@pytest.fixture(scope="session", autouse=True) +def cleanup() -> None: + """Cleanup temporary pytest directories after test session.""" + yield + tmp_root = Path(tempfile.gettempdir()) + for d in tmp_root.iterdir(): + if d.is_dir() and d.name.startswith(("pytest_", "test_")): + shutil.rmtree(d, ignore_errors=True) + + +def pytest_unconfigure(config: pytest.Config) -> None: + """Pytest cleanup hook: generate Allure HTML report or start server if configured.""" + global ALLURE_DIR, ALLURE_CONFIG + + if ALLURE_DIR and ALLURE_CONFIG: + allure_cfg = ALLURE_CONFIG.get("allure", {}) + + # Check if HTML generation is enabled + if allure_cfg.get("html_enable", False): + serve_mode = allure_cfg.get("serve_mode", False) + + if serve_mode: + # Start Allure server + serve_host = allure_cfg.get("serve_host", "localhost") + serve_port = allure_cfg.get("serve_port", 8080) + + logger.info("Starting Allure server...") + logger.info(f"Server will be available at http://{serve_host}:{serve_port}") + + server_process = serve_allure_report( + ALLURE_DIR, + host=serve_host, + port=serve_port, + + ) + + if server_process: + logger.info("Allure server started successfully") + else: + logger.warning("Failed to start Allure server, falling back to static HTML generation...") + # Fallback to static HTML + html_dir = generate_allure_html(ALLURE_DIR, clean=True) + if html_dir: + logger.info(f"Static HTML report generated: {html_dir}") + else: + logger.warning("Failed to generate static HTML report") + else: + # Generate static HTML report + logger.info("Generating Allure HTML report...") + html_dir = generate_allure_html(ALLURE_DIR, clean=True) + + if html_dir: + logger.info(f"Allure HTML report generated: {html_dir}") + logger.info("Tip: If the report doesn't load properly, enable serve_mode in config.yaml") + else: + logger.warning("Failed to generate Allure HTML report") + else: + logger.info("Allure HTML generation disabled in configuration") + else: + logger.info("Allure not configured, skipping HTML generation") diff --git a/test/pytest.ini b/test/pytest.ini new file mode 100644 index 00000000..d5ff2635 --- /dev/null +++ b/test/pytest.ini @@ -0,0 +1,26 @@ +[pytest] +# 0. Test Discovery Rules +testpaths = suites +python_files = test_*.py +python_classes = Test* +python_functions = test_* + + +addopts = + -ra + --strict-markers + --capture=no + +log_cli = 1 +log_cli_level = INFO +log_cli_format = [%(levelname)s] %(name)s: %(message)s +norecursedirs = .git venv env __pycache__ *.egg + +markers = + # -------- Levels (Required) -------- + stage(n): Unit/Smoke/Regression/Release (0=Unit 1=Smoke 2=Regression 3=Release) + # -------- Features (Recommended) -------- + feature: Feature tag + platform(name): Platform tag(gpu/npu) + reliability: Reliability tag +# end of markers diff --git a/test/requirements.txt b/test/requirements.txt new file mode 100644 index 00000000..2d2f2d19 --- /dev/null +++ b/test/requirements.txt @@ -0,0 +1,9 @@ +pytest>=7.0.0 +pytest-xdist>=3.0.0 +pytest-html>=3.1.1 +pytest-json-report>=1.5.0 +allure-pytest>=2.12.0 +influxdb-client>=1.36.0 +PyYAML>=6.0 +python-dotenv>=1.0.0 +requests>=2.28.0 \ No newline at end of file diff --git a/test/suites/test_demo_function.py b/test/suites/test_demo_function.py new file mode 100644 index 00000000..67433ebb --- /dev/null +++ b/test/suites/test_demo_function.py @@ -0,0 +1,185 @@ +# tests/test_demo.py +import pytest +import allure + +@pytest.mark.stage(1) +@pytest.mark.feature("mark") +@pytest.mark.platform("gpu") +def test_gpu_smoke(): + assert 1 == 1 + +@pytest.mark.stage(1) +@pytest.mark.feature("mark") +def test_regress_accuracy(): + assert 2 + 2 <= 5 + +@pytest.mark.stage(1) +@pytest.mark.feature("mark") +@pytest.mark.platform("npu") +def test_performance_accuracy(): + assert 2 + 2 <= 5 + +# Example of new mark +@pytest.mark.feature("mark") +@pytest.mark.reliability("high") +def test_llm_reliability(): + assert True + + +# Example of importing configuration file parameters +from common.config_utils import config_utils as config_instance +@pytest.mark.feature("config") +def test_llm_config(): + llm_config = config_instance.get_config("llm_connection") + assert llm_config["type"] == "openai" + assert config_instance.get_nested_config("llm_connection.model") == "gpt-3.5-turbo" + assert config_instance.get_nested_config("llm_connection.models", "gpt-3.5-turbo") == "gpt-3.5-turbo" + + + +# Example of using allure +@pytest.mark.feature("allure1") +@allure.feature('test_success') +def test_success(): + """this test succeeds""" + assert True + +@allure.feature('test_failure') +@pytest.mark.feature("allure1") +def test_failure(): + """this test fails""" + assert False + +@allure.feature('test_skip') +@pytest.mark.feature("allure1") +def test_skip(): + """this test is skipped""" + pytest.skip('for a reason!') + +@allure.feature('test_broken') +@pytest.mark.feature("allure1") +def test_broken(): + raise Exception('oops') + +@pytest.mark.feature("allure2") +@pytest.mark.parametrize('param1', ["Hello", "World"]) +@pytest.mark.parametrize('param2', ['Hello', "Hello"]) +def test_parametrize_with_two_parameters(param1, param2): + assert param1 == param2 + +@pytest.mark.feature("allure3") +@allure.description_html(""" +

This is HTML description

+ + + + + + + + + + + + + + + + +
FirstnameLastnameAge
jademr18
roadTester18
+""") +def test_html_description(): + assert True + +@pytest.mark.feature("allure3") +@allure.description("""Multi-line description""") +def test_description_from_decorator(): + assert 42 == int(6 * 7) + +@pytest.mark.feature("allure3") +def test_unicode_in_docstring_description(): + """Description can also be below the function""" + assert 42 == int(6 * 7) + +@pytest.mark.feature("allure4") +@allure.title("Assert that 2+2=4") +def test_with_a_title(): + assert 2 + 2 == 4 + +@pytest.mark.feature("allure4") +@allure.title("Dynamic title: {param1} + {param2} = {expected}") +@pytest.mark.parametrize('param1,param2,expected', [(2, 2, 4),(1, 2, 5)]) +def test_with_parameterized_title(param1, param2, expected): + assert param1 + param2 == expected + +@pytest.mark.feature("allure4") +@allure.title("This is a dynamic title that will be replaced") +def test_with_dynamic_title(): + assert 2 + 2 == 4 + allure.dynamic.title('Test completed, used as title') + + +@pytest.mark.feature("allure5") +def test_with_steps(): + """Example test case with steps""" + with allure.step("Step 1: Initialize variables"): + a = 2 + b = 3 + + with allure.step("Step 2: Perform addition"): + result = a + b + + with allure.step("Step 3: Verify result"): + assert result == 5 + +import tempfile +import os +@pytest.mark.feature("allure6") +def test_with_attachment(): + """Example test case with attachment""" + # Create some data to attach + data = "This is sample data for attachment\nLine 2\nLine 3" + + # Attach text data + allure.attach(data, name="Sample Data", attachment_type=allure.attachment_type.TEXT) + + # Create and attach a simple file + with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as f: + f.write("Sample file content\nFor testing attachment feature") + temp_file_path = f.name + + # Attach the file + allure.attach.file(temp_file_path, name="Attached File", + attachment_type=allure.attachment_type.TEXT) + + # Clean up temporary file + os.unlink(temp_file_path) + + assert True + +@pytest.mark.feature("allure7") +def test_mixed_steps_and_attachments(): + """Example test case combining steps and attachments""" + with allure.step("Initialize test data"): + test_data = {"name": "John", "age": 30, "city": "New York"} + + with allure.step("Convert data to JSON string"): + import json + json_data = json.dumps(test_data, indent=2) + allure.attach(json_data, name="JSON Data", attachment_type=allure.attachment_type.JSON) + + with allure.step("Validate data"): + assert test_data["name"] == "John" + assert test_data["age"] == 30 + + with allure.step("Create and attach report"): + report_content = f""" + Test Report + =========== + Name: {test_data['name']} + Age: {test_data['age']} + City: {test_data['city']} + Status: PASSED + """ + allure.attach(report_content, name="Test Report", + attachment_type=allure.attachment_type.TEXT) \ No newline at end of file diff --git a/test/suites/test_uc_performance.py b/test/suites/test_uc_performance.py new file mode 100644 index 00000000..7fe425c7 --- /dev/null +++ b/test/suites/test_uc_performance.py @@ -0,0 +1,159 @@ +import pytest + +from common.llmperf.run_inference import inference_results + +mean_output_tokens = [] +num_completed_requests = [] +total_e2e_latency_s = [] +total_generation_time_s = [] + +@pytest.mark.feature("mean_input_tokens") +def test_mean_input_tokens(): + result = inference_results("mean_input_tokens") + assert len(result) > 0, "result list is empty! Please check data source or inference process." + non_positive = [x for x in result if x <= 0] + assert all(x > 0 for x in result), f"Non-positive values found in list: {non_positive}" + +@pytest.mark.feature("mean_output_tokens") +def test_mean_output_tokens(): + global mean_output_tokens + result = inference_results("mean_output_tokens") + mean_output_tokens = result[:] + assert len(result) > 0, "result list is empty! Please check data source or inference process." + non_positive = [x for x in result if x <= 0] + assert all(x > 0 for x in result), f"Non-positive values found in list: {non_positive}" + +@pytest.mark.feature("results_inter_token_latency_s_quantiles_p50") +def test_inter_token_latency_s_quantiles_p50(): + result = inference_results("results_inter_token_latency_s_quantiles_p50") + assert len(result) > 0, "result list is empty! Please check data source or inference process." + non_positive = [x for x in result if x <= 0] + assert all(x > 0 for x in result), f"Non-positive values found in list: {non_positive}" + +@pytest.mark.feature("results_inter_token_latency_s_quantiles_p90") +def test_inter_token_latency_s_quantiles_p90(): + result = inference_results("results_inter_token_latency_s_quantiles_p90") + assert len(result) > 0, "result list is empty! Please check data source or inference process." + non_positive = [x for x in result if x <= 0] + assert all(x > 0 for x in result), f"Non-positive values found in list: {non_positive}" + +@pytest.mark.feature("results_inter_token_latency_s_quantiles_p99") +def test_inter_token_latency_s_quantiles_p99(): + result = inference_results("results_inter_token_latency_s_quantiles_p99") + assert len(result) > 0, "result list is empty! Please check data source or inference process." + non_positive = [x for x in result if x <= 0] + assert all(x > 0 for x in result), f"Non-positive values found in list: {non_positive}" + +@pytest.mark.feature("results_inter_token_latency_s_mean") +def test_inter_token_latency_s_mean(): + result = inference_results("results_inter_token_latency_s_mean") + assert len(result) > 0, "result list is empty! Please check data source or inference process." + non_positive = [x for x in result if x <= 0] + assert all(x > 0 for x in result), f"Non-positive values found in list: {non_positive}" + +@pytest.mark.feature("results_ttft_s_quantiles_p50") +def test_ttft_s_quantiles_p50(): + result = inference_results("results_ttft_s_quantiles_p50") + assert len(result) > 0, "result list is empty! Please check data source or inference process." + non_positive = [x for x in result if x <= 0] + assert all(x > 0 for x in result), f"Non-positive values found in list: {non_positive}" + +@pytest.mark.feature("results_ttft_s_quantiles_p90") +def test_ttft_s_quantiles_p90(): + result = inference_results("results_ttft_s_quantiles_p90") + assert len(result) > 0, "result list is empty! Please check data source or inference process." + non_positive = [x for x in result if x <= 0] + assert all(x > 0 for x in result), f"Non-positive values found in list: {non_positive}" + +@pytest.mark.feature("results_ttft_s_quantiles_p99") +def test_ttft_s_quantiles_p99(): + result = inference_results("results_ttft_s_quantiles_p99") + assert len(result) > 0, "result list is empty! Please check data source or inference process." + non_positive = [x for x in result if x <= 0] + assert all(x > 0 for x in result), f"Non-positive values found in list: {non_positive}" + +@pytest.mark.feature("results_ttft_s_mean") +def test_ttft_s_mean(): + result = inference_results("results_ttft_s_mean") + assert len(result) > 0, "result list is empty! Please check data source or inference process." + non_positive = [x for x in result if x <= 0] + assert all(x > 0 for x in result), f"Non-positive values found in list: {non_positive}" + +@pytest.mark.feature("results_end_to_end_latency_s_quantiles_p50") +def test_end_to_end_latency_s_quantiles_p50(): + result = inference_results("results_end_to_end_latency_s_quantiles_p50") + assert len(result) > 0, "result list is empty! Please check data source or inference process." + non_positive = [x for x in result if x <= 0] + assert all(x > 0 for x in result), f"Non-positive values found in list: {non_positive}" + +@pytest.mark.feature("results_end_to_end_latency_s_quantiles_p90") +def test_end_to_end_latency_s_quantiles_p90(): + result = inference_results("results_end_to_end_latency_s_quantiles_p90") + assert len(result) > 0, "result list is empty! Please check data source or inference process." + non_positive = [x for x in result if x <= 0] + assert all(x > 0 for x in result), f"Non-positive values found in list: {non_positive}" + +@pytest.mark.feature("results_end_to_end_latency_s_quantiles_p99") +def test_end_to_end_latency_s_quantiles_p99(): + result = inference_results("results_end_to_end_latency_s_quantiles_p99") + assert len(result) > 0, "result list is empty! Please check data source or inference process." + non_positive = [x for x in result if x <= 0] + assert all(x > 0 for x in result), f"Non-positive values found in list: {non_positive}" + +@pytest.mark.feature("results_end_to_end_latency_s_mean") +def test_end_to_end_latency_s_mean(): + result = inference_results("results_end_to_end_latency_s_mean") + assert len(result) > 0, "result list is empty! Please check data source or inference process." + non_positive = [x for x in result if x <= 0] + assert all(x > 0 for x in result), f"Non-positive values found in list: {non_positive}" + +@pytest.mark.feature("results_num_completed_requests") +def test_num_completed_requests(): + global num_completed_requests + result = inference_results("results_num_completed_requests") + num_completed_requests = result[:] + assert len(result) > 0, "result list is empty! Please check data source or inference process." + non_positive = [x for x in result if x <= 0] + assert all(x > 0 for x in result), f"Non-positive values found in list: {non_positive}" + +@pytest.mark.feature("elapsed_time") +def test_elapsed_time(): + global total_e2e_latency_s + result = inference_results("elapsed_time") + total_e2e_latency_s = result[:] + assert len(result) > 0, "result list is empty! Please check data source or inference process." + non_positive = [x for x in result if x <= 0] + assert all(x > 0 for x in result), f"Non-positive values found in list: {non_positive}" + +@pytest.mark.feature("incremental_time_delay") +def test_incremental_time_delay(): + global total_generation_time_s + result = inference_results("incremental_time_delay") + total_generation_time_s = result[:] + assert len(result) > 0, "result list is empty! Please check data source or inference process." + non_positive = [x for x in result if x <= 0] + assert all(x > 0 for x in result), f"Non-positive values found in list: {non_positive}" + +@pytest.mark.feature("total_throughput") +def test_total_throughput(): + result = [] + n = len(mean_output_tokens) + for i in range(n): + total_throughput = (mean_output_tokens[i] * num_completed_requests[i] / total_e2e_latency_s[i] + if total_e2e_latency_s[i] > 0 else 0.0) + result.append(total_throughput) + assert len(result) > 0, "result list is empty! Please check data source or inference process." + non_positive = [x for x in result if x <= 0] + assert all(x > 0 for x in result), f"Non-positive values found in list: {non_positive}" + +@pytest.mark.feature("incremental_throughput") +def test_incremental_throughput(): + result = [] + n = len(mean_output_tokens) + for i in range(n): + incremental_throughput = (mean_output_tokens[i] * num_completed_requests[i] / total_generation_time_s[i] + if total_generation_time_s[i] > 0 else 0.0) + result.append(incremental_throughput) + assert len(result) > 0, "result list is empty! Please check data source or inference process." + non_positive = [x for x in result if x <= 0] + assert all(x > 0 for x in result), f"Non-positive values found in list: {non_positive}" \ No newline at end of file diff --git a/ucm/integration/vllm/uc_connector.py b/ucm/integration/vllm/uc_connector.py index ddba78d6..dac3d8a9 100644 --- a/ucm/integration/vllm/uc_connector.py +++ b/ucm/integration/vllm/uc_connector.py @@ -334,9 +334,9 @@ def wait_for_layer_load(self, layer_name: str) -> None: if self.layerwise_load_tasks: logger.debug(f"Waiting for layer {self.current_layer} to be loaded") - assert ( - self.current_layer < self.num_layers - ), "The current layer should be less than total layers!" + if self.current_layer >= self.num_layers: + return + for request_id, layer_to_task in self.layerwise_load_tasks.items(): if request_id in self._load_failed_reqs: continue @@ -384,6 +384,9 @@ def save_kv_layer( if not self.use_layerwise: return + if self.current_layer > self.num_layers: + return + metadata = self._get_connector_metadata() assert isinstance(metadata, UCConnectorV1Metadata) diff --git a/ucm/store/device/ibuffered_device.h b/ucm/store/device/ibuffered_device.h index 4c1ac2bb..a56ce67a 100644 --- a/ucm/store/device/ibuffered_device.h +++ b/ucm/store/device/ibuffered_device.h @@ -25,11 +25,37 @@ #define UNIFIEDCACHE_IBUFFERED_DEVICE_H #include "idevice.h" -#include "thread/index_pool.h" namespace UC { class IBufferedDevice : public IDevice { + class LinearBuffer { + std::shared_ptr addr_{nullptr}; + size_t index_{0}; + size_t number_{0}; + size_t size_{0}; + + public: + void Setup(std::shared_ptr addr, const size_t number, const size_t size) + { + this->addr_ = addr; + this->number_ = number; + this->size_ = size; + this->Reset(); + } + void Reset() noexcept { this->index_ = 0; } + bool Full() const noexcept { return this->index_ == this->number_; } + bool Available(const size_t size) const noexcept { return this->size_ >= size; } + std::shared_ptr Get() noexcept + { + auto addr = this->addr_.get(); + auto buffer = addr + this->size_ * this->index_; + ++this->index_; + return std::shared_ptr(buffer, [](auto) {}); + } + }; + LinearBuffer buffer_; + public: IBufferedDevice(const int32_t deviceId, const size_t bufferSize, const size_t bufferNumber) : IDevice{deviceId, bufferSize, bufferNumber} @@ -39,26 +65,20 @@ class IBufferedDevice : public IDevice { { auto totalSize = this->bufferSize * this->bufferNumber; if (totalSize == 0) { return Status::OK(); } - this->_addr = this->MakeBuffer(totalSize); - if (!this->_addr) { return Status::OutOfMemory(); } - this->_indexPool.Setup(this->bufferNumber); + auto addr = this->MakeBuffer(totalSize); + if (!addr) { return Status::OutOfMemory(); } + this->buffer_.Setup(addr, this->bufferNumber, this->bufferSize); return Status::OK(); } virtual std::shared_ptr GetBuffer(const size_t size) override { - if (!this->_addr || size > this->bufferSize) { return this->MakeBuffer(size); } - auto idx = this->_indexPool.Acquire(); - if (idx != IndexPool::npos) { - auto ptr = this->_addr.get() + this->bufferSize * idx; - return std::shared_ptr(ptr, - [this, idx](auto) { this->_indexPool.Release(idx); }); + if (this->buffer_.Full()) { + auto status = this->Synchronized(); + if (status.Failure()) { return nullptr; } + this->buffer_.Reset(); } - return this->MakeBuffer(size); + return this->buffer_.Available(size) ? this->buffer_.Get() : this->MakeBuffer(size); } - -private: - std::shared_ptr _addr{nullptr}; - IndexPool _indexPool; }; } // namespace UC diff --git a/ucm/store/test/e2e/nfsstore_embed.py b/ucm/store/test/e2e/nfsstore_embed.py index 8c76fcdb..0b6e2fc5 100644 --- a/ucm/store/test/e2e/nfsstore_embed.py +++ b/ucm/store/test/e2e/nfsstore_embed.py @@ -80,6 +80,39 @@ def embed(store: UcmKVStoreBase, hashes: List[str], tensors: List[List[torch.Ten store.commit(hashes, True) +def fetch(store: UcmKVStoreBase, hashes: List[str], tensors: List[List[torch.Tensor]]): + founds = store.lookup(hashes) + for found in founds: + assert found + block_ids = [] + offsets = [] + layers = [] + for hash_id, block in zip(hashes, tensors): + offset = 0 + for layer in block: + block_ids.append(hash_id) + offsets.append(offset) + layers.append(layer) + offset += layer.untyped_storage().size() + task = store.load(block_ids, offsets, layers) + assert task.task_id > 0 + ret = store.wait(task) + assert ret == 0 + + +def cmp_and_print_diff(a, b, rtol=0.0, atol=0.0): + for r, (row_a, row_b) in enumerate(zip(a, b)): + for c, (ta, tb) in enumerate(zip(row_a, row_b)): + if not torch.allclose(ta, tb, rtol=rtol, atol=atol): + mask = ~torch.isclose(ta, tb, rtol=rtol, atol=atol) + diff_a = ta[mask].cpu() + diff_b = tb[mask].cpu() + print(f"DIFF at [{r}][{c}] total {mask.sum().item()} element(s)") + print(" a val:", diff_a.flatten()) + print(" b val:", diff_b.flatten()) + assert False + + def store_all_hashes(hashes): kvcache_block_hashes_file = "kvcache_block_hashes.txt" current_directory = os.path.dirname(__file__) @@ -108,7 +141,10 @@ def main(): for batch in range(total_batches): start = batch_size * batch end = min(start + batch_size, block_number) + tensors2 = [[torch.empty_like(t) for t in row] for row in tensors] embed(store, hashes[start:end], tensors) + fetch(store, hashes[start:end], tensors2) + cmp_and_print_diff(tensors, tensors2) store_all_hashes(hashes)