diff --git a/aws/lambda/benchmark_regression_summary_report/.gitignore b/aws/lambda/benchmark_regression_summary_report/.gitignore new file mode 100644 index 0000000000..bd92f6376a --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/.gitignore @@ -0,0 +1,3 @@ +*.zip +deployment/ +venv/ diff --git a/aws/lambda/benchmark_regression_summary_report/Makefile b/aws/lambda/benchmark_regression_summary_report/Makefile new file mode 100644 index 0000000000..3db1a588ca --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/Makefile @@ -0,0 +1,20 @@ +all: run-local + +clean: + rm -rf deployment + rm -rf venv + rm -rf deployment.zip + +venv/bin/python: + virtualenv venv + venv/bin/pip install -r requirements.txt + +deployment.zip: + mkdir -p deployment + cp lambda_function.py lib ./deployment/. + + pip3.10 install -r requirements.txt -t ./deployment/. --platform manylinux2014_x86_64 --only-binary=:all: --implementation cp --python-version 3.10 --upgrade + cd ./deployment && zip -q -r ../deployment.zip . + +.PHONY: create-deployment-package +create-deployment-package: deployment.zip diff --git a/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py b/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py new file mode 100644 index 0000000000..7aad397aa3 --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py @@ -0,0 +1,60 @@ +from dataclasses import dataclass, field +from typing import Optional, List, Dict, Any +import requests + + +@dataclass +class TimeRange: + start: str + end: str + + +@dataclass +class BenchmarkTimeSeriesItem: + group_info: Dict[str, Any] + num_of_dp: int + data: List[Dict[str, Any]] = field(default_factory=list) + + +@dataclass +class BenchmarkTimeSeriesApiData: + time_series: List[BenchmarkTimeSeriesItem] + time_range: TimeRange + + +@dataclass +class BenchmarkTimeSeriesApiResponse: + data: BenchmarkTimeSeriesApiData + + @classmethod + def from_request( + cls, url: str, query: dict, timeout: int = 180 + ) -> "BenchmarkTimeSeriesApiResponse": + """ + Send a POST request and parse into BenchmarkTimeSeriesApiResponse. + + Args: + url: API endpoint + query: JSON payload must + timeout: max seconds to wait for connect + response (default: 30) + Returns: + ApiResponse + Raises: + requests.exceptions.RequestException if network/timeout/HTTP error + RuntimeError if the API returns an "error" field or malformed data + """ + resp = requests.post(url, json=query, timeout=timeout) + resp.raise_for_status() + payload = resp.json() + + if "error" in payload: + raise RuntimeError(f"API error: {payload['error']}") + try: + tr = TimeRange(**payload["data"]["time_range"]) + ts = [ + BenchmarkTimeSeriesItem(**item) + for item in payload["data"]["time_series"] + ] + except Exception as e: + raise RuntimeError(f"Malformed API payload: {e}") + return cls(data=BenchmarkTimeSeriesApiData(time_series=ts, time_range=tr)) diff --git a/aws/lambda/benchmark_regression_summary_report/common/config.py b/aws/lambda/benchmark_regression_summary_report/common/config.py new file mode 100644 index 0000000000..d894c5c544 --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/common/config.py @@ -0,0 +1,79 @@ +from common.config_model import ( + BenchmarkApiSource, + BenchmarkConfig, + BenchmarkRegressionConfigBook, + DayRangeWindow, + Frequency, + RegressionPolicy, + Policy, + RangeConfig, +) + +# Compiler benchmark regression config +# todo(elainewy): eventually each team should configure their own benchmark regression config, currenlty place here for lambda +COMPILER_BENCHMARK_CONFIG = BenchmarkConfig( + name="Compiler Benchmark Regression", + id="compiler_regression", + source=BenchmarkApiSource( + api_query_url="http://localhost:3000/api/benchmark/get_time_series", + type="benchmark_time_series_api", + # currently we only detect the regression for h100 with dtype bfloat16, and mode inference + # we can extend this to other devices, dtypes and mode in the future + api_endpoint_params_template=""" + { + "name": "compiler_precompute", + "query_params": { + "commits": [], + "compilers": [], + "arch": "h100", + "device": "cuda", + "dtype": "bfloat16", + "granularity": "hour", + "mode": "inference", + "startTime": "{{ startTime }}", + "stopTime": "{{ stopTime }}", + "suites": ["torchbench", "huggingface", "timm_models"], + "workflowId": 0, + "branches": ["main"] + } + } + """, + ), + # set baseline from past 7 days using avg, and compare with the last 1 day + policy=Policy( + frequency=Frequency(value=1, unit="days"), + range=RangeConfig( + baseline=DayRangeWindow(value=7), + comparison=DayRangeWindow(value=1), + ), + metrics={ + "passrate": RegressionPolicy( + name="passrate", condition="greater_equal", threshold=0.9, baseline_aggregation="max", + ), + "geomean": RegressionPolicy( + name="geomean", condition="greater_equal", threshold=0.95,baseline_aggregation="max", + ), + "compression_ratio": RegressionPolicy( + name="compression_ratio", condition="greater_equal", threshold=0.9, baseline_aggregation="max", + ), + }, + notification_config={ + "type": "github", + "repo": "pytorch/test-infra", + "issue": "7081", + }, + ), +) + +BENCHMARK_REGRESSION_CONFIG = BenchmarkRegressionConfigBook( + configs={ + "compiler_regression": COMPILER_BENCHMARK_CONFIG, + } +) + + +def get_benchmark_regression_config(config_id: str) -> BenchmarkConfig: + try: + return BENCHMARK_REGRESSION_CONFIG[config_id] + except KeyError: + raise ValueError(f"Invalid config id: {config_id}") diff --git a/aws/lambda/benchmark_regression_summary_report/common/config_model.py b/aws/lambda/benchmark_regression_summary_report/common/config_model.py new file mode 100644 index 0000000000..59c2f86d9a --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/common/config_model.py @@ -0,0 +1,224 @@ +from __future__ import annotations +from dataclasses import dataclass, field, fields +from typing import Any, ClassVar, Dict, Literal, Optional, Set, Type, Union +from datetime import datetime, timedelta +from jinja2 import Environment, Template, meta +import requests +import json + + +# -------- Frequency -------- +@dataclass(frozen=True) +class Frequency: + """ + The frequency of how often the report should be generated. + The minimum frequency we support is 1 day. + Attributes: + value: Number of units (e.g., 7 for 7 days). + unit: Unit of time, either "days" or "weeks". + + Methods: + to_timedelta: Convert frequency into a datetime.timedelta. + get_text: return the frequency in text format + """ + value: int + unit: Literal["days", "weeks"] + def to_timedelta(self) -> timedelta: + """Convert frequency N days or M weeks into a datetime.timedelta.""" + if self.unit == "days": + return timedelta(days=self.value) + elif self.unit == "weeks": + return timedelta(weeks=self.value) + else: + raise ValueError(f"Unsupported unit: {self.unit}") + + def get_text(self): + return f"{self.value} {self.unit}" + + +# -------- Source -------- +_JINJA_ENV = Environment(autoescape=False) + +@dataclass +class BenchmarkApiSource: + """ + Defines the source of the benchmark data we want to query + api_query_url: the url of the api to query + api_endpoint_params_template: the jinjia2 template of the api endpoint's query params + default_ctx: the default context to use when rendering the api_endpoint_params_template + """ + api_query_url: str + api_endpoint_params_template: str + type: Literal["benchmark_time_series_api", "other"] = "benchmark_time_series_api" + default_ctx: Dict[str, Any] = field(default_factory=dict) + + def required_template_vars(self) -> set[str]: + ast = _JINJA_ENV.parse(self.api_endpoint_params_template) + return set(meta.find_undeclared_variables(ast)) + + def render(self, ctx: Dict[str, Any], strict: bool = True) -> dict: + """Render with caller-supplied context (no special casing for start/end).""" + merged = {**self.default_ctx, **ctx} + + if strict: + required = self.required_template_vars() + missing = required - merged.keys() + if missing: + raise ValueError(f"Missing required vars: {missing}") + rendered = Template(self.api_endpoint_params_template).render(**merged) + return json.loads(rendered) + + +# -------- Policy: range windows -------- +@dataclass +class DayRangeWindow: + value: int + # raw indicates fetch from the source data + source: Literal["raw"] = "raw" + +@dataclass +class RangeConfig: + """ + Defines the range of baseline and comparison windows for a given policy. + - baseline: the baseline window that build the baseline value + - comparison: the comparison window that we fetch data from to compare against the baseline value + """ + baseline: DayRangeWindow + comparison: DayRangeWindow + + def total_timedelta(self) -> timedelta: + return timedelta(days=self.baseline.value + self.comparison.value) + def comparison_timedelta(self) -> timedelta: + return timedelta(days=self.comparison.value) + def baseline_timedelta(self) -> timedelta: + return timedelta(days=self.baseline.value) + +# -------- Policy: metrics -------- +@dataclass +class RegressionPolicy: + """ + Defines the policy for a given metric. + - new value muset be {x} baseline value: + - "greater_than": higher is better; new value must be strictly greater to baseline + - "less_than": lower is better; new value must be strictly lower to baseline + - "equal_to": new value should be ~= baseline * threshold within rel_tol + - "greater_equal": higher is better; new value must be greater or equal to baseline + - "less_equal": lower is better; new value must be less or equal to baseline + """ + name: str + condition: Literal["greater_than", "less_than", "equal_to","greater_equal","less_equal"] + threshold: float + baseline_aggregation: Literal["avg", "max", "min", "p50", "p90", "p95","latest","earliest"] = "max" + rel_tol: float = 1e-3 # used only for "equal_to" + + def is_violation(self, value: float, baseline: float) -> bool: + target = baseline * self.threshold + + if self.condition == "greater_than": + # value must be strictly greater than target + return value <= target + + if self.condition == "greater_equal": + # value must be greater or equal to target + return value < target + + if self.condition == "less_than": + # value must be strictly less than target + return value >= target + + if self.condition == "less_equal": + # value must be less or equal to target + return value > target + + if self.condition == "equal_to": + # |value - target| should be within rel_tol * max(1, |target|) + denom = max(1.0, abs(target)) + return abs(value - target) > self.rel_tol * denom + + raise ValueError(f"Unknown condition: {self.condition}") +class BaseNotificationConfig: + # every subclass must override this + type_tag: ClassVar[str] + + @classmethod + def from_dict(cls: Type[T], d: Dict[str, Any]) -> T: + # pick only known fields for this dataclass + kwargs = {f.name: d.get(f.name) for f in fields(cls)} + return cls(**kwargs) # type: ignore + + @classmethod + def matches(cls, d: Dict[str, Any]) -> bool: + return d.get("type") == cls.type_tag + + +@dataclass +class GitHubNotificationConfig(BaseNotificationConfig): + type: str = "github" + repo: str = "" + issue_number: str = "" + type_tag: ClassVar[str] = "github" + + def create_github_comment(self, body: str, github_token: str) -> Dict[str, Any]: + """ + Create a new comment on a GitHub issue. + Args: + notification_config: dict with keys: + - type: must be "github" + - repo: "owner/repo" + - issue: issue number (string or int) + body: text of the comment + token: GitHub personal access token or GitHub Actions token + + Returns: + The GitHub API response as a dict (JSON). + """ + url = f"https://api.github.com/repos/{self.repo}/issues/{self.issue_number}/comments" + headers = { + "Authorization": f"token {github_token}", + "Accept": "application/vnd.github+json", + "User-Agent": "bench-reporter/1.0", + } + resp = requests.post(url, headers=headers, json={"body": body}) + resp.raise_for_status() + return resp.json() + +@dataclass +class Policy: + frequency: Frequency + range: RangeConfig + metrics: Dict[str, RegressionPolicy] + notification_config: Optional[Dict[str, Any]] = None + + def get_github_notification_config(self) -> Optional[GitHubNotificationConfig]: + if not self.notification_config: + return None + return notification_from_dict(self.notification_config) # type: ignore + + +# -------- Top-level benchmark regression config -------- +@dataclass +class BenchmarkConfig: + """ + Represents a single benchmark regression configuration. + + - BenchmarkConfig defines the benchmark regression config for a given benchmark. + - source: defines the source of the benchmark data we want to query + - policy: defines the policy for the benchmark regressions + - name: the name of the benchmark + - id: the id of the benchmark, this must be unique for each benchmark, and cannot be changed once set + """ + name: str + id: str + source: BenchmarkApiSource + policy: Policy + + +@dataclass +class BenchmarkRegressionConfigBook: + configs: Dict[str, BenchmarkConfig] = field(default_factory=dict) + + def __getitem__(self, key: str) -> BenchmarkConfig: + config = self.configs.get(key, None) + if not config: + raise KeyError(f"Config {key} not found") + return config diff --git a/aws/lambda/benchmark_regression_summary_report/common/regression_utils.py b/aws/lambda/benchmark_regression_summary_report/common/regression_utils.py new file mode 100644 index 0000000000..10091192b9 --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/common/regression_utils.py @@ -0,0 +1,268 @@ +import logging +import math +from typing import Any, Dict, List, Literal, Optional, Tuple, TypedDict +import statistics +from dateutil.parser import isoparse +from common.config_model import BenchmarkConfig, RegressionPolicy +from common.benchmark_time_series_api_model import ( + BenchmarkTimeSeriesApiData, + BenchmarkTimeSeriesItem, +) +import pprint + +logger = logging.getLogger() + +RegressionClassifyLabel = Literal[ + "regression", "suspicious", "no_regression", "insufficient_data" +] + + +class BaselineItem(TypedDict): + group_info: Dict[str, Any] + value: float + + +class BenchmarkValueItem(TypedDict): + group_info: Dict[str, Any] + values: List[Dict[str, Any]] + + +class PerGroupResult(TypedDict, total=True): + group_info: Dict[str, Any] + baseline: Optional[float] + points: List[Any] + label: RegressionClassifyLabel + policy: Optional["RegressionPolicy"] + + +def percentile(values: list[float], q: float): + v = sorted(values) + k = (len(v) - 1) * q + f = math.floor(k) + c = math.ceil(k) + if f == c: + return v[int(k)] + return v[f] + (v[c] - v[f]) * (k - f) + + +class BenchmarkRegressionReportGenerator: + def __init__( + self, + config: BenchmarkConfig, + latest_ts: BenchmarkTimeSeriesApiData, + baseline_ts: BenchmarkTimeSeriesApiData, + ) -> None: + self.metric_policies = config.policy.metrics + self.latest_ts = self._to_data_map(latest_ts) + self.baseline_raw = self._to_data_map(baseline_ts) + + def generate(self) -> Tuple[List[PerGroupResult], bool]: + return self.detect_regressions_with_policies( + self.baseline_raw, + self.latest_ts, + metric_policies=self.metric_policies, + ) + + def detect_regressions_with_policies( + self, + baseline_map: Dict[tuple, BenchmarkValueItem], + dp_map: Dict[tuple, BenchmarkValueItem], + *, + metric_policies: Dict[str, RegressionPolicy], + min_points: int = 2, + ) -> Tuple[List[PerGroupResult], bool]: + """ + For each group: + - choose policy by group_info['metric'] + - compute flags via policy.is_violation(value, baseline) + - classify with classify_flags + Returns a list of {group_info, baseline, values, flags, label, policy} + """ + results: List[PerGroupResult] = [] + + is_any_regression = False + + for key in sorted(dp_map.keys()): + cur_item = dp_map.get(key) + gi = cur_item["group_info"] if cur_item else {} + points: List[Any] = cur_item["values"] if cur_item else [] + + base_item = baseline_map.get(key) + if not base_item: + logger.warning("Skip. No baseline item found for %s", gi) + results.append( + PerGroupResult( + group_info=gi, + baseline=None, + points=[], + label="insufficient_data", + policy=None, + ) + ) + continue + policy = self._resolve_policy(metric_policies, gi.get("metric", "")) + if not policy: + logger.warning("No policy for %s", gi) + results.append( + PerGroupResult( + group_info=gi, + baseline=None, + points=[], + label="insufficient_data", + policy=None, + ) + ) + continue + + baseline_aggre_mode = policy.baseline_aggregation + baseline_value = self._get_baseline(base_item,baseline_aggre_mode) + if baseline_value is None or len(points) == 0: + logger.warning("baseline_value is %s, len(points) == %s", baseline_value,len(points)) + results.append( + PerGroupResult( + group_info=gi, + baseline=None, + points=[], + label="insufficient_data", + policy=policy, + ) + ) + continue + + # Per-point violations (True = regression) + flags: List[bool] = [ + policy.is_violation(p["value"], baseline_value["value"]) for p in points + ] + label = self.classify_flags(flags, min_points=min_points) + + enriched_points = [{**p, "flag": f} for p, f in zip(points, flags)] + results.append( + PerGroupResult( + group_info=gi, + baseline= baseline_value["value"], + points=enriched_points, + label=label, + policy=policy, + ) + ) + if label == "regression": + is_any_regression = True + return results, is_any_regression + + def _to_data_map( + self, data: "BenchmarkTimeSeriesApiData", field: str = "value" + ) -> Dict[tuple, BenchmarkValueItem]: + result: Dict[tuple, BenchmarkValueItem] = {} + for ts_group in data.time_series: + group_keys = tuple(sorted(ts_group.group_info.items())) + points: List[Dict[str, Any]] = [] + for d in sorted( + ts_group.data, key=lambda d: isoparse(d["granularity_bucket"]) + ): + if field not in d: + continue + points.append( + { + "value": float(d[field]), + "commit": d.get("commit"), + "branch": d.get("branch"), + "timestamp": isoparse(d["granularity_bucket"]), + } + ) + result[group_keys] = { + "group_info": ts_group.group_info, + "values": points, + } + return result + + def _get_baseline( + self, + data: BenchmarkValueItem, + mode: str = "mean", + field: str = "value", + ) -> Optional[BaselineItem]: + values = [float(d[field]) for d in data["values"] if field in d] + if not values: + return None + + if mode == "mean": + val = statistics.fmean(values) + elif mode == "p90": + val = percentile(values, 0.9) + elif mode == "max": + val = max(values) + elif mode == "min": + val = min(values) + elif mode == "latest": + val = values[-1] + elif mode == "earliest": + val = values[0] + elif mode == "p50": + val = percentile(values, 0.5) + elif mode == "p95": + val = percentile(values, 0.95) + else: + logger.warning("Unknown mode: %s", mode) + return None + result:BaselineItem = { + "group_info": data["group_info"], + "value": val, + } + return result + + def classify_flags( + self, flags: list[bool], min_points: int = 3 + ) -> RegressionClassifyLabel: + """ + Classify a sequence of boolean flags to detect regression. + + - regression: last run has >= 2 consecutive True values + - suspicious: there is a run of >= 3 consecutive True values, but not at the end + - no_regression: all other cases + - insufficient_data: not enough data points (< min_points) + + Special case: + - If min_points == 1, then just look at the last flag: + True -> regression + False -> no_regression + """ + n = len(flags) + if n == 0: + return "insufficient_data" + + if min_points == 1: + return "regression" if flags[-1] else "no_regression" + + if n < min_points: + return "insufficient_data" + + # trailing run length + t = 0 + for v in reversed(flags): + if v: + t += 1 + else: + break + if t >= 2: + return "regression" + + # longest run anywhere + longest = cur = 0 + for v in flags: + cur = cur + 1 if v else 0 + longest = max(longest, cur) + + if longest >= 3: + return "suspicious" + + return "no_regression" + + def _resolve_policy( + self, + metric_policies: Dict[str, RegressionPolicy], + metric: str, + ) -> Optional[RegressionPolicy]: + if not metric: + return None + m = metric.lower() + return metric_policies.get(m) diff --git a/aws/lambda/benchmark_regression_summary_report/lambda_function.py b/aws/lambda/benchmark_regression_summary_report/lambda_function.py new file mode 100644 index 0000000000..4c8a0f5863 --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/lambda_function.py @@ -0,0 +1,425 @@ +#!/usr/bin/env python +import argparse +from concurrent.futures import ThreadPoolExecutor, as_completed +import json +import logging +import os +import threading +import requests +import datetime as dt +from typing import Any, Optional +from common.regression_utils import BenchmarkRegressionReportGenerator +import clickhouse_connect +from common.benchmark_time_series_api_model import ( + BenchmarkTimeSeriesApiResponse, +) +from common.config_model import ( + BenchmarkApiSource, + BenchmarkConfig, + Frequency, +) +from common.config import get_benchmark_regression_config +from dateutil.parser import isoparse + +logging.basicConfig( + level=logging.INFO, +) +logger = logging.getLogger() +logger.setLevel("INFO") + +ENVS = { + "GITHUB_ACCESS_TOKEN": os.getenv("GITHUB_ACCESS_TOKEN", ""), + "CLICKHOUSE_ENDPOINT": os.getenv("CLICKHOUSE_ENDPOINT", ""), + "CLICKHOUSE_PASSWORD": os.getenv("CLICKHOUSE_PASSWORD", ""), + "CLICKHOUSE_USERNAME": os.getenv("CLICKHOUSE_USERNAME", ""), +} + +BENCHMARK_REGRESSION_REPORT_TABLE = "fortesting.benchmark_regression_report" + +BENCHMARK_REGRESSION_TRACKING_CONFIG_IDS = ["compiler_regression"] + + +def truncate_to_hour(ts: dt.datetime) -> dt.datetime: + return ts.replace(minute=0, second=0, microsecond=0) + + +def get_clickhouse_client( + host: str, user: str, password: str +) -> clickhouse_connect.driver.client.Client: + # for local testing only, disable SSL verification + logger.info("trying to connect with clickhouse") + return clickhouse_connect.get_client(host=host, user=user, password=password,secure=True, verify=False) + + return clickhouse_connect.get_client( + host=host, user=user, password=password, secure=True + ) + + +def get_clickhouse_client_environment() -> clickhouse_connect.driver.client.Client: + for name, env_val in ENVS.items(): + if not env_val: + raise ValueError(f"Missing environment variable {name}") + return get_clickhouse_client( + host=ENVS["CLICKHOUSE_ENDPOINT"], + user=ENVS["CLICKHOUSE_USERNAME"], + password=ENVS["CLICKHOUSE_PASSWORD"], + ) + +BENCHMARK_REGRESSION_SUMMARY_REPORT_TABLE = ( + "fortesting.benchmark_regression_summary_report" +) + + +class BenchmarkSummaryProcessor: + """ """ + + def __init__( + self, + is_dry_run: bool = False, + ) -> None: + self.is_dry_run = is_dry_run + + def process( + self, + config_id: str, + end_time: dt.datetime, + cc: Optional[clickhouse_connect.driver.client.Client] = None, + args: Optional[argparse.Namespace] = None, + ): + # ensure each thread has its own clickhouse client. clickhouse client + # is not thread-safe. + if cc is None: + tlocal = threading.local() + if not hasattr(tlocal, "cc") or tlocal.cc is None: + if args: + tlocal.cc = get_clickhouse_client( + args.clickhouse_endpoint, + args.clickhouse_username, + args.clickhouse_password, + ) + else: + tlocal.cc = get_clickhouse_client_environment() + cc = tlocal.cc + try: + config = get_benchmark_regression_config(config_id) + logger.info("found config for config_id %s",config_id) + except ValueError as e: + logger.error(f"Skip process, Invalid config: {e}") + return + except Exception as e: + print( + f"Something else went wrong when call get_benchmark_regression_config: {e}" + ) + return + + # check if the current time is > policy's time_delta + previous record_ts from summary_table + report_freq = config.policy.frequency + should_generate = self._should_generate_report( + cc, end_time, config_id, report_freq + ) + if not should_generate: + logger.info( + "[%s] Skip generate report for date:%s with frequency %s, no data found during [%s,%s]", + config_id, + end_time.isoformat(), + report_freq.get_text(), + ) + return + else: + logger.info( "[%s] Plan to generate report for time: %s with frequency %s ...", + config_id,end_time,report_freq.get_text()) + + latest = self.get_latest(config, end_time) + if not latest: + logger.info("no latest data found") + return + baseline = self.get_basline(config, end_time) + if not baseline: + return + + generator = BenchmarkRegressionReportGenerator( + config=config, latest_ts=latest, baseline_ts=baseline + ) + result, regression_detected = generator.generate() + if self.is_dry_run: + print("regression_detected: ", regression_detected) + print(json.dumps(result, indent=2, default=str)) + return + + def get_latest(self, config: BenchmarkConfig, end_time: dt.datetime): + data_range = config.policy.range + latest_s = end_time - data_range.comparison_timedelta() + latest_e = end_time + latest_data = self._fetch_from_benchmark_ts_api( + config_id=config.id, + start_time=latest_s, + end_time=latest_e, + source=config.source, + ) + if not latest_data.time_range or not latest_data.time_range.end: + return None + if not self.should_use_data(latest_data.time_range.end, end_time): + return None + return latest_data + + def get_basline(self, config: BenchmarkConfig, end_time: dt.datetime): + data_range = config.policy.range + baseline_s = end_time - data_range.total_timedelta() + baseline_e = end_time - data_range.comparison_timedelta() + # fetch baseline from api + raw_data = self._fetch_from_benchmark_ts_api( + config_id=config.id, + start_time=baseline_s, + end_time=baseline_e, + source=config.source, + ) + if not self.should_use_data(raw_data.time_range.end, end_time): + logger.info( + "[%s][get_basline] Skip generate report, no data found during [%s,%s]", + config.id, + baseline_s.isoformat(), + baseline_e.isoformat(), + ) + return None + return raw_data + + def should_use_data( + self, + latest_ts_str: str, + end_time: dt.datetime, + min_delta: dt.timedelta = dt.timedelta(days=2), + ) -> bool: + if not latest_ts_str: + return False + latest_dt = isoparse(latest_ts_str) + cutoff = end_time - min_delta + return latest_dt >= cutoff + + def _fetch_from_benchmark_ts_api( + self, + config_id: str, + end_time: dt.datetime, + start_time: dt.datetime, + source: BenchmarkApiSource, + ): + str_end_time = end_time.strftime("%Y-%m-%dT%H:%M:%S") + str_start_time = start_time.strftime("%Y-%m-%dT%H:%M:%S") + query = source.render( + ctx={ + "startTime": str_start_time, + "stopTime": str_end_time, + } + ) + url = source.api_query_url + + logger.info("[%s]trying to call %s, with query\n %s",config_id, url,query) + try: + resp: BenchmarkTimeSeriesApiResponse = ( + BenchmarkTimeSeriesApiResponse.from_request(url, query) + ) + return resp.data + except requests.exceptions.HTTPError as e: + logger.error("Server error message: %s", e.response.json().get("error")) + raise + except Exception as e: + raise RuntimeError(f"[{config_id}]Fetch failed: {e}") + + def _should_generate_report( + self, + cc: clickhouse_connect.driver.client.Client, + end_time: dt.datetime, + config_id: str, + f: Frequency, + ) -> bool: + def _get_latest_record_ts( + cc: clickhouse_connect.driver.Client, + config_id: str, + ) -> Optional[dt.datetime]: + table = BENCHMARK_REGRESSION_REPORT_TABLE + res = cc.query( + f""" + SELECT max(last_record_ts) + FROM {table} + WHERE report_id = {{config_id:String}} + """, + parameters={"config_id": config_id}, + ) + if not res.result_rows or res.result_rows[0][0] is None: + return None + latest: dt.datetime = res.result_rows[0][ + 0 + ] # typically tz-aware UTC from clickhouse_connect + # If not tz-aware, force UTC: + if latest.tzinfo is None: + latest = latest.replace(tzinfo=dt.timezone.utc) + return latest + + freq_delta = f.to_timedelta() + latest_record_ts = _get_latest_record_ts(cc, config_id) + + # No report exists yet, generate + if not latest_record_ts: + return True + + end_utc = ( + end_time if end_time.tzinfo else end_time.replace(tzinfo=dt.timezone.utc) + ) + end_utc = end_utc.astimezone(dt.timezone.utc) + cutoff = end_time - freq_delta + return latest_record_ts < cutoff + + +class WorkerPoolHandler: + """ + WorkerPoolHandler runs workers in parallel to generate benchmark regression report + and writes the results to the target destination. + + """ + + def __init__( + self, + benchmark_summary_processor: BenchmarkSummaryProcessor, + max_workers: int = 6, + ): + self.benchmark_summary_processor = benchmark_summary_processor + self.max_workers = max_workers + + def start( + self, + config_ids: list[str], + args: Optional[argparse.Namespace] = None, + ) -> None: + logger.info( + "[WorkerPoolHandler] start to process benchmark " + "summary data with config_ids %s", + config_ids, + ) + end_time = dt.datetime.now(dt.timezone.utc).replace( + minute=0, second=0, microsecond=0 + ) + logger.info("current time with hour granularity(utc) %s", end_time) + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + futures = [] + for config_id in config_ids: + future = executor.submit( + self.benchmark_summary_processor.process, + config_id, + end_time, + cc=None, + args=args, + ) + futures.append(future) + results = [] + errors = [] + + # handle results from parallel processing + for future in as_completed(futures): + try: + result = future.result() + # This will raise an exception if one occurred + results.append(result) + except Exception as e: + logger.warning(f"Error processing future: {e}") + errors.append({"error": str(e)}) + + +def main( + args: Optional[argparse.Namespace] = None, + github_access_token: str = "", + is_dry_run: bool = False, +): + """ + Main method to run in both local environment and lambda handler. + 1. generate intervals[start_time,end_time] using latest timestamp from source table and target table + 2. call WorkerPoolHandler to geneterate and write histogram data for each interval in parallel + """ + if not github_access_token: + raise ValueError("Missing environment variable GITHUB_ACCESS_TOKEN") + + # get time intervals. + logger.info("[Main] start work ....") + + # get jobs in queue from clickhouse for list of time intervals, in parallel + handler = WorkerPoolHandler( + BenchmarkSummaryProcessor(is_dry_run=is_dry_run), + ) + handler.start(BENCHMARK_REGRESSION_TRACKING_CONFIG_IDS, args) + logger.info(" [Main] Done. work completed.") + + +def lambda_handler(event: Any, context: Any) -> None: + """ + Main method to run in aws lambda environment + """ + main( + None, + github_access_token=ENVS["GITHUB_ACCESS_TOKEN"], + ) + return + + +def parse_args() -> argparse.Namespace: + """ + Parse command line args, this is mainly used for local test environment. + """ + parser = argparse.ArgumentParser() + parser.add_argument( + "--clickhouse-endpoint", + default=ENVS["CLICKHOUSE_ENDPOINT"], + type=str, + help="the clickhouse endpoint, the clickhouse_endpoint " + + "name is https://{clickhouse_endpoint}:{port} for full url ", + ) + parser.add_argument( + "--clickhouse-username", + type=str, + default=ENVS["CLICKHOUSE_USERNAME"], + help="the clickhouse username", + ) + parser.add_argument( + "--clickhouse-password", + type=str, + default=ENVS["CLICKHOUSE_PASSWORD"], + help="the clickhouse password for the user name", + ) + parser.add_argument( + "--github-access-token", + type=str, + default=ENVS["GITHUB_ACCESS_TOKEN"], + help="the github access token to access github api", + ) + parser.add_argument( + "--not-dry-run", + action="store_true", + help="when set, writing results to destination from local " + + "environment. By default, we run in dry-run mode for local " + + "environment", + ) + args, _ = parser.parse_known_args() + return args + + +def local_run() -> None: + """ + method to run in local test environment + """ + + args = parse_args() + + + logger.info("args: %s",args) + + # update environment variables for input parameters + + # always run in dry-run mode in local environment, unless it's disabled. + is_dry_run = not args.not_dry_run + + main( + args, + args.github_access_token, + is_dry_run=is_dry_run, + ) + + +if __name__ == "__main__": + local_run() diff --git a/aws/lambda/benchmark_regression_summary_report/requirements.txt b/aws/lambda/benchmark_regression_summary_report/requirements.txt new file mode 100644 index 0000000000..87c33c2e7f --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/requirements.txt @@ -0,0 +1,5 @@ +clickhouse_connect==0.8.5 +boto3==1.35.33 +PyGithub==1.59.0 +python-dateutil==2.8.2 +PyYAML==6.0.1 diff --git a/torchci/components/metrics/panels/TablePanel.tsx b/torchci/components/metrics/panels/TablePanel.tsx index d53d0ab8d6..c53b29f4e3 100644 --- a/torchci/components/metrics/panels/TablePanel.tsx +++ b/torchci/components/metrics/panels/TablePanel.tsx @@ -1,6 +1,7 @@ import HelpIcon from "@mui/icons-material/Help"; import { Box, Skeleton, Typography } from "@mui/material"; import IconButton from "@mui/material/IconButton"; +import { Box } from "@mui/system"; import { DataGrid, GridColDef } from "@mui/x-data-grid"; import { CSSProperties } from "react"; import useSWR from "swr"; diff --git a/torchci/lib/benchmark/compilerUtils.ts b/torchci/lib/benchmark/compilerUtils.ts index 06e4542269..00212177b3 100644 --- a/torchci/lib/benchmark/compilerUtils.ts +++ b/torchci/lib/benchmark/compilerUtils.ts @@ -456,3 +456,51 @@ export function convertToCompilerPerformanceData(data: BenchmarkData[]) { return Object.values(convertData); } + +export function computePassrateSimple(data: any[]) { + if (!Array.isArray(data) || data.length === 0) return []; + + const blocked = new Set(BLOCKLIST_COMPILERS); + const passingAcc = new Set(PASSING_ACCURACY); + const toDisplay = (c: string) => COMPILER_NAMES_TO_DISPLAY_NAMES[c] ?? c; + + const totalCount = new Map(); + const passCount = new Map(); + + for (const r of data) { + const compilerDisp = toDisplay(r.compiler); + if (blocked.has(compilerDisp)) continue; + + const key = `${r.granularity_bucket}+${r.workflow_id}+${r.suite}+${compilerDisp}`; + + // 计总 + totalCount.set(key, (totalCount.get(key) ?? 0) + 1); + + const acc = r.accuracy ?? ""; + const speed = r.speedup ?? 0; + const pass = + (passingAcc.has(acc) && (speed !== 0 || compilerDisp === "export")) || + acc === "pass_due_to_skip"; + + if (pass) passCount.set(key, (passCount.get(key) ?? 0) + 1); + } + const out: any[] = []; + for (const [key, tc] of totalCount) { + const pc = passCount.get(key) ?? 0; + const p = tc > 0 ? pc / tc : 0; + + const [bucket, wfStr, suite, compiler] = key.split("+"); + out.push({ + metirc: "passrate", + granularity_bucket: bucket, + workflow_id: Number(wfStr), + suite, + compiler, + passrate: p, + pass_count: pc, + total_count: tc, + passrate_display: `${(p * 100).toFixed(0)}%, ${pc}/${tc}`, + }); + } + return out; +} diff --git a/torchci/lib/clickhouse.ts b/torchci/lib/clickhouse.ts index b48673ad8c..f8c720caea 100644 --- a/torchci/lib/clickhouse.ts +++ b/torchci/lib/clickhouse.ts @@ -18,13 +18,13 @@ export function getClickhouseClient() { request_timeout: 180_000, // 3 mins }); } -// export function getClickhouseClientWritable() { return createClient({ host: process.env.CLICKHOUSE_HUD_USER_URL ?? "http://localhost:8123", username: process.env.CLICKHOUSE_HUD_USER_WRITE_USERNAME ?? "default", password: process.env.CLICKHOUSE_HUD_USER_WRITE_PASSWORD ?? "", + request_timeout: 180_000, // 3 minutes }); } diff --git a/torchci/pages/api/benchmark/group_data.ts b/torchci/pages/api/benchmark/group_data.ts index 713ed480d9..030d79cc24 100644 --- a/torchci/pages/api/benchmark/group_data.ts +++ b/torchci/pages/api/benchmark/group_data.ts @@ -16,7 +16,7 @@ const DEFAULT_TABLE_GROUP = [ const DEFAULT_ROW_GROUP = ["workflow_id", "job_id", "metadata_info.timestamp"]; const BENCNMARK_TABLE_NAME = "oss_ci_benchmark_llms"; -function getNestedField(obj: any, path: string): any { +export function getNestedField(obj: any, path: string): any { return path.split(".").reduce((o, key) => (o && key in o ? o[key] : ""), obj); } @@ -32,6 +32,7 @@ export default async function handler( details: formatZodError(request.error), }); } + const qp = request.data; const groupTableByFields = qp.group_table_by_fields || deepClone(DEFAULT_TABLE_GROUP);