From 179f9c83f7c59bfa2363759af3ae80477b10af7f Mon Sep 17 00:00:00 2001 From: Camyll Harajli Date: Thu, 16 Jan 2025 15:17:26 -0800 Subject: [PATCH 01/14] initial commit for clickhouse-format linter --- .lintrunner.toml | 19 + .../linter/adapters/clickhouse_sql_linter.py | 442 ++++++++++++++++++ 2 files changed, 461 insertions(+) create mode 100644 tools/linter/adapters/clickhouse_sql_linter.py diff --git a/.lintrunner.toml b/.lintrunner.toml index 8e98d36816..785c12aef9 100644 --- a/.lintrunner.toml +++ b/.lintrunner.toml @@ -338,6 +338,25 @@ init_command = [ ] is_formatter = true +[[linter]] +code = 'CLICKHOUSE' +include_patterns = ['**/torchci/clickhouse_queries/**/*.sql'] +exclude_patterns = [ +] +command = [ + 'python3', + 'tools/linter/adapters/clickhouse_sql_linter.py', + '--', + '@{{PATHSFILE}}' +] +init_command = [ + 'python3', + 'tools/linter/adapters/pip_init.py', + '--dry-run={{DRYRUN}}', + 'clickhouse', +] +is_formatter = true + [[linter]] code = 'RUSTFMT' include_patterns = ['**/*.rs'] diff --git a/tools/linter/adapters/clickhouse_sql_linter.py b/tools/linter/adapters/clickhouse_sql_linter.py new file mode 100644 index 0000000000..6eb5ffa7e8 --- /dev/null +++ b/tools/linter/adapters/clickhouse_sql_linter.py @@ -0,0 +1,442 @@ +"""Adapter for https://github.com/charliermarsh/ruff.""" + +from __future__ import annotations + +import argparse +import concurrent.futures +import dataclasses +import enum +import json +import logging +import os +import subprocess +import sys +import time +from typing import Any, BinaryIO + +from tools.linter.adapters.ruff_linter import SYNTAX_ERROR + + +LINTER_CODE = "CLICKHOUSE" +SYNTAX_ERROR = "E999" +IS_WINDOWS: bool = os.name == "nt" + + +def eprint(*args: Any, **kwargs: Any) -> None: + """Print to stderr.""" + print(*args, file=sys.stderr, flush=True, **kwargs) + + +class LintSeverity(str, enum.Enum): + """Severity of a lint message.""" + + ERROR = "error" + WARNING = "warning" + ADVICE = "advice" + DISABLED = "disabled" + + +@dataclasses.dataclass(frozen=True) +class LintMessage: + """A lint message defined by https://docs.rs/lintrunner/latest/lintrunner/lint_message/struct.LintMessage.html.""" + + path: str | None + line: int | None + char: int | None + code: str + severity: LintSeverity + name: str + original: str | None + replacement: str | None + description: str | None + + def asdict(self) -> dict[str, Any]: + return dataclasses.asdict(self) + + def display(self) -> None: + """Print to stdout for lintrunner to consume.""" + print(json.dumps(self.asdict()), flush=True) + + +def as_posix(name: str) -> str: + return name.replace("\\", "/") if IS_WINDOWS else name + + +def _run_command( + args: list[str], + *, + timeout: int | None, + stdin: BinaryIO | None, + input: bytes | None, + check: bool, + cwd: os.PathLike[Any] | None, +) -> subprocess.CompletedProcess[bytes]: + logging.debug("$ %s", " ".join(args)) + start_time = time.monotonic() + try: + if input is not None: + return subprocess.run( + args, + capture_output=True, + shell=False, + input=input, + timeout=timeout, + check=check, + cwd=cwd, + ) + + return subprocess.run( + args, + stdin=stdin, + capture_output=True, + shell=False, + timeout=timeout, + check=check, + cwd=cwd, + ) + finally: + end_time = time.monotonic() + logging.debug("took %dms", (end_time - start_time) * 1000) + + +def run_command( + args: list[str], + *, + retries: int = 0, + timeout: int | None = None, + stdin: BinaryIO | None = None, + input: bytes | None = None, + check: bool = False, + cwd: os.PathLike[Any] | None = None, +) -> subprocess.CompletedProcess[bytes]: + remaining_retries = retries + while True: + try: + return _run_command( + args, timeout=timeout, stdin=stdin, input=input, check=check, cwd=cwd + ) + except subprocess.TimeoutExpired as err: + if remaining_retries == 0: + raise err + remaining_retries -= 1 + logging.warning( + "(%s/%s) Retrying because command failed with: %r", + retries - remaining_retries, + retries, + err, + ) + time.sleep(1) + + +def add_default_options(parser: argparse.ArgumentParser) -> None: + """Add default options to a parser. + + This should be called the last in the chain of add_argument calls. + """ + parser.add_argument( + "--retries", + type=int, + default=3, + help="number of times to retry if the linter times out.", + ) + parser.add_argument( + "--verbose", + action="store_true", + help="verbose logging", + ) + parser.add_argument( + "filenames", + nargs="+", + help="paths to lint", + ) + + +def explain_rule(code: str) -> str: + proc = run_command( + ["ruff", "rule", "--output-format=json", code], + check=True, + ) + rule = json.loads(str(proc.stdout, "utf-8").strip()) + return f"\n{rule['linter']}: {rule['summary']}" + + +def get_issue_severity(code: str) -> LintSeverity: + # "B901": `return x` inside a generator + # "B902": Invalid first argument to a method + # "B903": __slots__ efficiency + # "B950": Line too long + # "C4": Flake8 Comprehensions + # "C9": Cyclomatic complexity + # "E2": PEP8 horizontal whitespace "errors" + # "E3": PEP8 blank line "errors" + # "E5": PEP8 line length "errors" + # "T400": type checking Notes + # "T49": internal type checker errors or unmatched messages + if any( + code.startswith(x) + for x in ( + "B9", + "C4", + "C9", + "E2", + "E3", + "E5", + "T400", + "T49", + "PLC", + "PLR", + ) + ): + return LintSeverity.ADVICE + + # "F821": Undefined name + # "E999": syntax error + if any(code.startswith(x) for x in ("F821", SYNTAX_ERROR, "PLE")): + return LintSeverity.ERROR + + # "F": PyFlakes Error + # "B": flake8-bugbear Error + # "E": PEP8 "Error" + # "W": PEP8 Warning + # possibly other plugins... + return LintSeverity.WARNING + + +def format_lint_message( + message: str, code: str, rules: dict[str, str], show_disable: bool +) -> str: + if rules: + message += f".\n{rules.get(code) or ''}" + message += ".\nSee https://beta.ruff.rs/docs/rules/" + if show_disable: + message += f".\n\nTo disable, use ` # noqa: {code}`" + return message + + +def check_files( + filenames: list[str], + severities: dict[str, LintSeverity], + *, + config: str | None, + retries: int, + timeout: int, + explain: bool, + show_disable: bool, +) -> list[LintMessage]: + try: + proc = run_command( + [ + sys.executable, + "-m", + "clickhouse-format", + "--query", + '"', + *filenames, + '"', + ], + retries=retries, + timeout=timeout, + check=True, + ) + except (OSError, subprocess.CalledProcessError) as err: + return [ + LintMessage( + path=None, + line=None, + char=None, + code=LINTER_CODE, + severity=LintSeverity.ERROR, + name="command-failed", + original=None, + replacement=None, + description=( + f"Failed due to {err.__class__.__name__}:\n{err}" + if not isinstance(err, subprocess.CalledProcessError) + else ( + f"COMMAND (exit code {err.returncode})\n" + f"{' '.join(as_posix(x) for x in err.cmd)}\n\n" + f"STDERR\n{err.stderr.decode('utf-8').strip() or '(empty)'}\n\n" + f"STDOUT\n{err.stdout.decode('utf-8').strip() or '(empty)'}" + ) + ), + ) + ] + + stdout = str(proc.stdout, "utf-8").strip() + vulnerabilities = json.loads(stdout) + + if explain: + all_codes = {v["code"] for v in vulnerabilities} + rules = {code: explain_rule(code) for code in all_codes} + else: + rules = {} + + def lint_message(vuln: dict[str, Any]) -> LintMessage: + code = vuln["code"] or SYNTAX_ERROR + return LintMessage( + path=vuln["filename"], + name=code, + description=( + format_lint_message( + vuln["message"], + code, + rules, + show_disable and bool(vuln["code"]), + ) + ), + line=int(vuln["location"]["row"]), + char=int(vuln["location"]["column"]), + code=LINTER_CODE, + severity=severities.get(code, get_issue_severity(code)), + original=None, + replacement=None, + ) + + return [lint_message(v) for v in vulnerabilities] + + +def check_file_for_fixes( + filename: str, + *, + config: str | None, + retries: int, + timeout: int, +) -> list[LintMessage]: + try: + with open(filename, "rb") as f: + original = f.read() + with open(filename, "rb") as f: + proc_fix = run_command( + [ + sys.executable, + "-m", + "clickhouse-format", + "--query", + '"', + filename, + '"', + ], + stdin=f, + retries=retries, + timeout=timeout, + check=True, + ) + except (OSError, subprocess.CalledProcessError) as err: + return [ + LintMessage( + path=None, + line=None, + char=None, + code=LINTER_CODE, + severity=LintSeverity.ERROR, + name="command-failed", + original=None, + replacement=None, + description=( + f"Failed due to {err.__class__.__name__}:\n{err}" + if not isinstance(err, subprocess.CalledProcessError) + else ( + f"COMMAND (exit code {err.returncode})\n" + f"{' '.join(as_posix(x) for x in err.cmd)}\n\n" + f"STDERR\n{err.stderr.decode('utf-8').strip() or '(empty)'}\n\n" + f"STDOUT\n{err.stdout.decode('utf-8').strip() or '(empty)'}" + ) + ), + ) + ] + + replacement = proc_fix.stdout + if original == replacement: + return [] + + return [ + LintMessage( + path=filename, + name="format", + description="Run `lintrunner -a` to apply this patch.", + line=None, + char=None, + code=LINTER_CODE, + severity=LintSeverity.WARNING, + original=original.decode("utf-8"), + replacement=replacement.decode("utf-8"), + ) + ] + + +def main() -> None: + parser = argparse.ArgumentParser( + description=f"Clickhouse format linter. Linter code: {LINTER_CODE}. Use with CLICKHOUSE-FIX to auto-fix issues.", + fromfile_prefix_chars="@", + ) + parser.add_argument( + "--query", + default="store-true", + help="Format queries of any length and complexity.", + ) + parser.add_argument( + "--help", + help="produce help message", + ) + add_default_options(parser) + args = parser.parse_args() + + logging.basicConfig( + format="<%(threadName)s:%(levelname)s> %(message)s", + level=( + logging.NOTSET + if args.verbose + else logging.DEBUG if len(args.filenames) < 1000 else logging.INFO + ), + stream=sys.stderr, + ) + + severities: dict[str, LintSeverity] = {} + if args.severity: + for severity in args.severity: + parts = severity.split(":", 1) + assert len(parts) == 2, f"invalid severity `{severity}`" + severities[parts[0]] = LintSeverity(parts[1]) + + lint_messages = check_files( + args.filenames, + severities=severities, + config=args.config, + retries=args.retries, + timeout=args.timeout, + explain=args.explain, + show_disable=args.show_disable, + ) + for lint_message in lint_messages: + lint_message.display() + + if args.no_fix or not lint_messages: + # If we're not fixing, we can exit early + return + + files_with_lints = {lint.path for lint in lint_messages if lint.path is not None} + with concurrent.futures.ThreadPoolExecutor( + max_workers=os.cpu_count(), + thread_name_prefix="Thread", + ) as executor: + futures = { + executor.submit( + check_file_for_fixes, + path, + config=args.config, + retries=args.retries, + timeout=args.timeout, + ): path + for path in files_with_lints + } + for future in concurrent.futures.as_completed(futures): + try: + for lint_message in future.result(): + lint_message.display() + except Exception: # Catch all exceptions for lintrunner + logging.critical('Failed at "%s".', futures[future]) + raise + + +if __name__ == "__main__": + main() From cbbbbeb7e16fddcf3c861c728e37697bcde4864e Mon Sep 17 00:00:00 2001 From: Camyll Harajli Date: Fri, 17 Jan 2025 11:05:29 -0800 Subject: [PATCH 02/14] add lintrunner change --- .lintrunner.toml | 12 +- .../linter/adapters/clickhouse_sql_linter.py | 157 +++++------------- 2 files changed, 45 insertions(+), 124 deletions(-) diff --git a/.lintrunner.toml b/.lintrunner.toml index 785c12aef9..56220c866d 100644 --- a/.lintrunner.toml +++ b/.lintrunner.toml @@ -349,12 +349,12 @@ command = [ '--', '@{{PATHSFILE}}' ] -init_command = [ - 'python3', - 'tools/linter/adapters/pip_init.py', - '--dry-run={{DRYRUN}}', - 'clickhouse', -] +# init_command = [ +# 'python3', +# 'tools/linter/adapters/pip_init.py', +# '--dry-run={{DRYRUN}}', +# 'clickhouse==0.1.6', +# ] is_formatter = true [[linter]] diff --git a/tools/linter/adapters/clickhouse_sql_linter.py b/tools/linter/adapters/clickhouse_sql_linter.py index 6eb5ffa7e8..bfd9bd6726 100644 --- a/tools/linter/adapters/clickhouse_sql_linter.py +++ b/tools/linter/adapters/clickhouse_sql_linter.py @@ -1,5 +1,3 @@ -"""Adapter for https://github.com/charliermarsh/ruff.""" - from __future__ import annotations import argparse @@ -14,11 +12,8 @@ import time from typing import Any, BinaryIO -from tools.linter.adapters.ruff_linter import SYNTAX_ERROR - LINTER_CODE = "CLICKHOUSE" -SYNTAX_ERROR = "E999" IS_WINDOWS: bool = os.name == "nt" @@ -27,14 +22,6 @@ def eprint(*args: Any, **kwargs: Any) -> None: print(*args, file=sys.stderr, flush=True, **kwargs) -class LintSeverity(str, enum.Enum): - """Severity of a lint message.""" - - ERROR = "error" - WARNING = "warning" - ADVICE = "advice" - DISABLED = "disabled" - @dataclasses.dataclass(frozen=True) class LintMessage: @@ -44,7 +31,6 @@ class LintMessage: line: int | None char: int | None code: str - severity: LintSeverity name: str original: str | None replacement: str | None @@ -151,55 +137,13 @@ def add_default_options(parser: argparse.ArgumentParser) -> None: ) -def explain_rule(code: str) -> str: - proc = run_command( - ["ruff", "rule", "--output-format=json", code], - check=True, - ) - rule = json.loads(str(proc.stdout, "utf-8").strip()) - return f"\n{rule['linter']}: {rule['summary']}" - - -def get_issue_severity(code: str) -> LintSeverity: - # "B901": `return x` inside a generator - # "B902": Invalid first argument to a method - # "B903": __slots__ efficiency - # "B950": Line too long - # "C4": Flake8 Comprehensions - # "C9": Cyclomatic complexity - # "E2": PEP8 horizontal whitespace "errors" - # "E3": PEP8 blank line "errors" - # "E5": PEP8 line length "errors" - # "T400": type checking Notes - # "T49": internal type checker errors or unmatched messages - if any( - code.startswith(x) - for x in ( - "B9", - "C4", - "C9", - "E2", - "E3", - "E5", - "T400", - "T49", - "PLC", - "PLR", - ) - ): - return LintSeverity.ADVICE - - # "F821": Undefined name - # "E999": syntax error - if any(code.startswith(x) for x in ("F821", SYNTAX_ERROR, "PLE")): - return LintSeverity.ERROR - - # "F": PyFlakes Error - # "B": flake8-bugbear Error - # "E": PEP8 "Error" - # "W": PEP8 Warning - # possibly other plugins... - return LintSeverity.WARNING +# def explain_rule(code: str) -> str: +# proc = run_command( +# ["clickhouse", "rule", "--output-format=json", code], +# check=True, +# ) +# rule = json.loads(str(proc.stdout, "utf-8").strip()) +# return f"\n{rule['linter']}: {rule['summary']}" def format_lint_message( @@ -207,27 +151,26 @@ def format_lint_message( ) -> str: if rules: message += f".\n{rules.get(code) or ''}" - message += ".\nSee https://beta.ruff.rs/docs/rules/" + message += ( + ".\nSee https://clickhouse.com/docs/en/operations/utilities/clickhouse-format" + ) if show_disable: message += f".\n\nTo disable, use ` # noqa: {code}`" return message -def check_files( - filenames: list[str], - severities: dict[str, LintSeverity], - *, - config: str | None, - retries: int, - timeout: int, - explain: bool, - show_disable: bool, -) -> list[LintMessage]: +# def check_files( +# filenames: list[str], +# *, +# config: str | None, +# retries: int, +# timeout: int, +# explain: bool, +# show_disable: bool, +# ) -> list[LintMessage]: try: proc = run_command( [ - sys.executable, - "-m", "clickhouse-format", "--query", '"', @@ -245,7 +188,6 @@ def check_files( line=None, char=None, code=LINTER_CODE, - severity=LintSeverity.ERROR, name="command-failed", original=None, replacement=None, @@ -272,7 +214,7 @@ def check_files( rules = {} def lint_message(vuln: dict[str, Any]) -> LintMessage: - code = vuln["code"] or SYNTAX_ERROR + code = vuln["code"] return LintMessage( path=vuln["filename"], name=code, @@ -287,7 +229,6 @@ def lint_message(vuln: dict[str, Any]) -> LintMessage: line=int(vuln["location"]["row"]), char=int(vuln["location"]["column"]), code=LINTER_CODE, - severity=severities.get(code, get_issue_severity(code)), original=None, replacement=None, ) @@ -309,7 +250,6 @@ def check_file_for_fixes( proc_fix = run_command( [ sys.executable, - "-m", "clickhouse-format", "--query", '"', @@ -328,7 +268,6 @@ def check_file_for_fixes( line=None, char=None, code=LINTER_CODE, - severity=LintSeverity.ERROR, name="command-failed", original=None, replacement=None, @@ -357,7 +296,6 @@ def check_file_for_fixes( line=None, char=None, code=LINTER_CODE, - severity=LintSeverity.WARNING, original=original.decode("utf-8"), replacement=replacement.decode("utf-8"), ) @@ -369,15 +307,7 @@ def main() -> None: description=f"Clickhouse format linter. Linter code: {LINTER_CODE}. Use with CLICKHOUSE-FIX to auto-fix issues.", fromfile_prefix_chars="@", ) - parser.add_argument( - "--query", - default="store-true", - help="Format queries of any length and complexity.", - ) - parser.add_argument( - "--help", - help="produce help message", - ) + add_default_options(parser) args = parser.parse_args() @@ -391,30 +321,21 @@ def main() -> None: stream=sys.stderr, ) - severities: dict[str, LintSeverity] = {} - if args.severity: - for severity in args.severity: - parts = severity.split(":", 1) - assert len(parts) == 2, f"invalid severity `{severity}`" - severities[parts[0]] = LintSeverity(parts[1]) - - lint_messages = check_files( - args.filenames, - severities=severities, - config=args.config, - retries=args.retries, - timeout=args.timeout, - explain=args.explain, - show_disable=args.show_disable, - ) - for lint_message in lint_messages: - lint_message.display() - - if args.no_fix or not lint_messages: - # If we're not fixing, we can exit early - return + #trying this here since having issues with init_command' + run_command( + [ + sys.executable, + '-m', + 'pip', + 'install', + 'clickhouse', + ], + retries=0, + timeout=0, + check=True, + ) + # run_command(["python3 -m pip install clickhouse"]) - files_with_lints = {lint.path for lint in lint_messages if lint.path is not None} with concurrent.futures.ThreadPoolExecutor( max_workers=os.cpu_count(), thread_name_prefix="Thread", @@ -423,11 +344,11 @@ def main() -> None: executor.submit( check_file_for_fixes, path, - config=args.config, - retries=args.retries, - timeout=args.timeout, + config='', + retries=0, + timeout=90, ): path - for path in files_with_lints + for path in args.filenames } for future in concurrent.futures.as_completed(futures): try: From 0c0e4bac052f83d69f381e7ceed911cdeb03d683 Mon Sep 17 00:00:00 2001 From: Camyll Harajli Date: Fri, 17 Jan 2025 11:16:45 -0800 Subject: [PATCH 03/14] remove unused method --- .../linter/adapters/clickhouse_sql_linter.py | 77 ------------------- 1 file changed, 77 deletions(-) diff --git a/tools/linter/adapters/clickhouse_sql_linter.py b/tools/linter/adapters/clickhouse_sql_linter.py index bfd9bd6726..78afdf2e01 100644 --- a/tools/linter/adapters/clickhouse_sql_linter.py +++ b/tools/linter/adapters/clickhouse_sql_linter.py @@ -159,83 +159,6 @@ def format_lint_message( return message -# def check_files( -# filenames: list[str], -# *, -# config: str | None, -# retries: int, -# timeout: int, -# explain: bool, -# show_disable: bool, -# ) -> list[LintMessage]: - try: - proc = run_command( - [ - "clickhouse-format", - "--query", - '"', - *filenames, - '"', - ], - retries=retries, - timeout=timeout, - check=True, - ) - except (OSError, subprocess.CalledProcessError) as err: - return [ - LintMessage( - path=None, - line=None, - char=None, - code=LINTER_CODE, - name="command-failed", - original=None, - replacement=None, - description=( - f"Failed due to {err.__class__.__name__}:\n{err}" - if not isinstance(err, subprocess.CalledProcessError) - else ( - f"COMMAND (exit code {err.returncode})\n" - f"{' '.join(as_posix(x) for x in err.cmd)}\n\n" - f"STDERR\n{err.stderr.decode('utf-8').strip() or '(empty)'}\n\n" - f"STDOUT\n{err.stdout.decode('utf-8').strip() or '(empty)'}" - ) - ), - ) - ] - - stdout = str(proc.stdout, "utf-8").strip() - vulnerabilities = json.loads(stdout) - - if explain: - all_codes = {v["code"] for v in vulnerabilities} - rules = {code: explain_rule(code) for code in all_codes} - else: - rules = {} - - def lint_message(vuln: dict[str, Any]) -> LintMessage: - code = vuln["code"] - return LintMessage( - path=vuln["filename"], - name=code, - description=( - format_lint_message( - vuln["message"], - code, - rules, - show_disable and bool(vuln["code"]), - ) - ), - line=int(vuln["location"]["row"]), - char=int(vuln["location"]["column"]), - code=LINTER_CODE, - original=None, - replacement=None, - ) - - return [lint_message(v) for v in vulnerabilities] - - def check_file_for_fixes( filename: str, *, From 8ee56227aa85371a621e4f2863aa272778c5a0a3 Mon Sep 17 00:00:00 2001 From: Camyll Harajli Date: Fri, 17 Jan 2025 16:18:10 -0800 Subject: [PATCH 04/14] add clickhouse binaries and use s3 init --- .lintrunner.toml | 21 +- .../linter/adapters/clickhouse_sql_linter.py | 298 +++++------------- tools/linter/adapters/s3_init_config.json | 10 + 3 files changed, 107 insertions(+), 222 deletions(-) diff --git a/.lintrunner.toml b/.lintrunner.toml index 56220c866d..addbdbc59c 100644 --- a/.lintrunner.toml +++ b/.lintrunner.toml @@ -340,21 +340,24 @@ is_formatter = true [[linter]] code = 'CLICKHOUSE' -include_patterns = ['**/torchci/clickhouse_queries/**/*.sql'] +include_patterns = ['torchci/clickhouse_queries/**/*.sql'] exclude_patterns = [ ] command = [ 'python3', 'tools/linter/adapters/clickhouse_sql_linter.py', - '--', - '@{{PATHSFILE}}' + '--binary=.lintbin/clickhouse', + '@{{PATHSFILE}}', +] +init_command = [ + 'python3', + 'tools/linter/adapters/s3_init.py', + '--config-json=tools/linter/adapters/s3_init_config.json', + '--linter=clickhouse', + '--dry-run={{DRYRUN}}', + '--output-dir=.lintbin', + '--output-name=clickhouse', ] -# init_command = [ -# 'python3', -# 'tools/linter/adapters/pip_init.py', -# '--dry-run={{DRYRUN}}', -# 'clickhouse==0.1.6', -# ] is_formatter = true [[linter]] diff --git a/tools/linter/adapters/clickhouse_sql_linter.py b/tools/linter/adapters/clickhouse_sql_linter.py index 78afdf2e01..d27c42951c 100644 --- a/tools/linter/adapters/clickhouse_sql_linter.py +++ b/tools/linter/adapters/clickhouse_sql_linter.py @@ -1,263 +1,138 @@ -from __future__ import annotations - import argparse import concurrent.futures -import dataclasses -import enum import json import logging import os +import re import subprocess -import sys import time -from typing import Any, BinaryIO +from enum import Enum +from typing import List, NamedTuple, Optional, Pattern LINTER_CODE = "CLICKHOUSE" -IS_WINDOWS: bool = os.name == "nt" - -def eprint(*args: Any, **kwargs: Any) -> None: - """Print to stderr.""" - print(*args, file=sys.stderr, flush=True, **kwargs) +class LintSeverity(str, Enum): + ERROR = "error" + WARNING = "warning" + ADVICE = "advice" + DISABLED = "disabled" -@dataclasses.dataclass(frozen=True) -class LintMessage: - """A lint message defined by https://docs.rs/lintrunner/latest/lintrunner/lint_message/struct.LintMessage.html.""" - - path: str | None - line: int | None - char: int | None +class LintMessage(NamedTuple): + path: Optional[str] + line: Optional[int] + char: Optional[int] code: str + severity: LintSeverity name: str - original: str | None - replacement: str | None - description: str | None - - def asdict(self) -> dict[str, Any]: - return dataclasses.asdict(self) - - def display(self) -> None: - """Print to stdout for lintrunner to consume.""" - print(json.dumps(self.asdict()), flush=True) - - -def as_posix(name: str) -> str: - return name.replace("\\", "/") if IS_WINDOWS else name + original: Optional[str] + replacement: Optional[str] + description: Optional[str] + + +RESULTS_RE: Pattern[str] = re.compile( + r"""(?mx) + ^ + (?P.*?): + (?P\d+): + (?P\d+): + \s(?P.*) + \s(?P\[.*\]) + $ + """ +) -def _run_command( - args: list[str], - *, - timeout: int | None, - stdin: BinaryIO | None, - input: bytes | None, - check: bool, - cwd: os.PathLike[Any] | None, -) -> subprocess.CompletedProcess[bytes]: +def run_command( + args: List[str], +) -> "subprocess.CompletedProcess[bytes]": logging.debug("$ %s", " ".join(args)) start_time = time.monotonic() try: - if input is not None: - return subprocess.run( - args, - capture_output=True, - shell=False, - input=input, - timeout=timeout, - check=check, - cwd=cwd, - ) - return subprocess.run( args, - stdin=stdin, capture_output=True, - shell=False, - timeout=timeout, - check=check, - cwd=cwd, ) finally: end_time = time.monotonic() logging.debug("took %dms", (end_time - start_time) * 1000) -def run_command( - args: list[str], - *, - retries: int = 0, - timeout: int | None = None, - stdin: BinaryIO | None = None, - input: bytes | None = None, - check: bool = False, - cwd: os.PathLike[Any] | None = None, -) -> subprocess.CompletedProcess[bytes]: - remaining_retries = retries - while True: - try: - return _run_command( - args, timeout=timeout, stdin=stdin, input=input, check=check, cwd=cwd - ) - except subprocess.TimeoutExpired as err: - if remaining_retries == 0: - raise err - remaining_retries -= 1 - logging.warning( - "(%s/%s) Retrying because command failed with: %r", - retries - remaining_retries, - retries, - err, - ) - time.sleep(1) - - -def add_default_options(parser: argparse.ArgumentParser) -> None: - """Add default options to a parser. - - This should be called the last in the chain of add_argument calls. - """ - parser.add_argument( - "--retries", - type=int, - default=3, - help="number of times to retry if the linter times out.", - ) - parser.add_argument( - "--verbose", - action="store_true", - help="verbose logging", - ) - parser.add_argument( - "filenames", - nargs="+", - help="paths to lint", - ) - - -# def explain_rule(code: str) -> str: -# proc = run_command( -# ["clickhouse", "rule", "--output-format=json", code], -# check=True, -# ) -# rule = json.loads(str(proc.stdout, "utf-8").strip()) -# return f"\n{rule['linter']}: {rule['summary']}" - - -def format_lint_message( - message: str, code: str, rules: dict[str, str], show_disable: bool -) -> str: - if rules: - message += f".\n{rules.get(code) or ''}" - message += ( - ".\nSee https://clickhouse.com/docs/en/operations/utilities/clickhouse-format" - ) - if show_disable: - message += f".\n\nTo disable, use ` # noqa: {code}`" - return message - - -def check_file_for_fixes( - filename: str, - *, - config: str | None, - retries: int, - timeout: int, -) -> list[LintMessage]: +def check_file( + binary: str, + file: str, +) -> List[LintMessage]: try: - with open(filename, "rb") as f: - original = f.read() - with open(filename, "rb") as f: - proc_fix = run_command( - [ - sys.executable, - "clickhouse-format", - "--query", - '"', - filename, - '"', - ], - stdin=f, - retries=retries, - timeout=timeout, - check=True, - ) - except (OSError, subprocess.CalledProcessError) as err: + proc = run_command([binary, '--format','--query', file]) + except OSError as err: return [ LintMessage( path=None, line=None, char=None, code=LINTER_CODE, + severity=LintSeverity.ERROR, name="command-failed", original=None, replacement=None, - description=( - f"Failed due to {err.__class__.__name__}:\n{err}" - if not isinstance(err, subprocess.CalledProcessError) - else ( - f"COMMAND (exit code {err.returncode})\n" - f"{' '.join(as_posix(x) for x in err.cmd)}\n\n" - f"STDERR\n{err.stderr.decode('utf-8').strip() or '(empty)'}\n\n" - f"STDOUT\n{err.stdout.decode('utf-8').strip() or '(empty)'}" - ) - ), + description=(f"Failed due to {err.__class__.__name__}:\n{err}"), ) ] - - replacement = proc_fix.stdout - if original == replacement: - return [] - + stdout = str(proc.stdout, "utf-8").strip() return [ LintMessage( - path=filename, - name="format", - description="Run `lintrunner -a` to apply this patch.", - line=None, - char=None, + path=match["file"], + name=match["code"], + description=match["message"], + line=int(match["line"]), + char=int(match["char"]), code=LINTER_CODE, - original=original.decode("utf-8"), - replacement=replacement.decode("utf-8"), + severity=LintSeverity.ERROR, + original=None, + replacement='CAMYLL', ) + for match in RESULTS_RE.finditer(stdout) ] def main() -> None: parser = argparse.ArgumentParser( - description=f"Clickhouse format linter. Linter code: {LINTER_CODE}. Use with CLICKHOUSE-FIX to auto-fix issues.", + description=f"Clickhouse format linter for sql queries.", fromfile_prefix_chars="@", ) + parser.add_argument( + "filenames", + nargs="+", + help="paths to lint", + ) + parser.add_argument( + "--binary", + required=True, + help="clickhouse binary path", + ) - add_default_options(parser) args = parser.parse_args() - logging.basicConfig( - format="<%(threadName)s:%(levelname)s> %(message)s", - level=( - logging.NOTSET - if args.verbose - else logging.DEBUG if len(args.filenames) < 1000 else logging.INFO - ), - stream=sys.stderr, - ) - #trying this here since having issues with init_command' - run_command( - [ - sys.executable, - '-m', - 'pip', - 'install', - 'clickhouse', - ], - retries=0, - timeout=0, - check=True, - ) - # run_command(["python3 -m pip install clickhouse"]) + if not os.path.exists(args.binary): + err_msg = LintMessage( + path="", + line=None, + char=None, + code=LINTER_CODE, + severity=LintSeverity.ERROR, + name="command-failed", + original=None, + replacement=None, + description=( + f"Could not find clickhouse binary at {args.binary}," + " you may need to run `lintrunner init`." + ), + ) + print(json.dumps(err_msg._asdict()), flush=True) + exit(0) with concurrent.futures.ThreadPoolExecutor( max_workers=os.cpu_count(), @@ -265,22 +140,19 @@ def main() -> None: ) as executor: futures = { executor.submit( - check_file_for_fixes, - path, - config='', - retries=0, - timeout=90, - ): path - for path in args.filenames + check_file, + args.binary, + filename, + ): filename + for filename in args.filenames } for future in concurrent.futures.as_completed(futures): try: for lint_message in future.result(): - lint_message.display() - except Exception: # Catch all exceptions for lintrunner + print(json.dumps(lint_message._asdict()), flush=True) + except Exception: logging.critical('Failed at "%s".', futures[future]) raise - if __name__ == "__main__": main() diff --git a/tools/linter/adapters/s3_init_config.json b/tools/linter/adapters/s3_init_config.json index 85e2402061..d6e21370fa 100644 --- a/tools/linter/adapters/s3_init_config.json +++ b/tools/linter/adapters/s3_init_config.json @@ -49,5 +49,15 @@ "download_url": "https://raw.githubusercontent.com/bazelbuild/bazelisk/v1.16.0/bazelisk.py", "hash": "1f6d76d023ddd5f1625f34d934418e7334a267318d084f31be09df8a8835ed16" } + }, + "clickhouse": { + "Darwin": { + "download_url": "https://oss-clang-format.s3.us-east-2.amazonaws.com/clickhouse/25.1.1.3442/Darwin/clickhouse", + "hash": "8bc70b41a720e1573bfc30b6e506c91bee73fe920b191976385f6e44ad1b6b00" + }, + "Linux": { + "download_url": "https://oss-clang-format.s3.us-east-2.amazonaws.com/clickhouse/25.1.1.3442/Linux/clickhouse", + "hash": "3f63c35058c5a1fe69dcde9533f55a11c3d094a12aa6a1e9264e922975fd1a8e" + } } } From 1877be9447ccc08b159d975c0904258a022c987c Mon Sep 17 00:00:00 2001 From: Camyll Harajli Date: Fri, 17 Jan 2025 16:34:14 -0800 Subject: [PATCH 05/14] comment out MYPY and remove test code --- .lintrunner.toml | 72 +++++++++---------- .../linter/adapters/clickhouse_sql_linter.py | 2 +- 2 files changed, 37 insertions(+), 37 deletions(-) diff --git a/.lintrunner.toml b/.lintrunner.toml index addbdbc59c..38e88c9e74 100644 --- a/.lintrunner.toml +++ b/.lintrunner.toml @@ -33,42 +33,42 @@ init_command = [ 'torchfix==0.4.0 ; python_version >= "3.9" and python_version < "3.13"', ] -[[linter]] -code = 'MYPY' -include_patterns = [ - 'tools/**/*.py', - 'tools/**/*.pyi', - 'stats/**/*.py', - 'stats/**/*.pyi', - 'torchci/**/*.py', - 'torchci/**/*.pyi', - '.github/scripts/*.py', - 'aws/lambda/whl_metadata_upload_pep658/**/*.py', -] -command = [ - 'python3', - 'tools/linter/adapters/mypy_linter.py', - '--config=mypy.ini', - '--', - '@{{PATHSFILE}}' -] -init_command = [ - 'python3', - 'tools/linter/adapters/pip_init.py', - '--dry-run={{DRYRUN}}', - 'numpy==1.24.3', - 'expecttest==0.1.3', - 'mypy==0.982', - 'types-requests==2.27.25', - 'types-PyYAML==6.0.7', - 'types-tabulate==0.8.8', - 'types-protobuf==3.19.18', - 'types-pkg-resources==0.1.3', - 'types-Jinja2==2.11.9', - 'junitparser==2.1.1', - 'rich==10.9.0', - 'pyyaml==6.0', -] +# [[linter]] +# code = 'MYPY' +# include_patterns = [ +# 'tools/**/*.py', +# 'tools/**/*.pyi', +# 'stats/**/*.py', +# 'stats/**/*.pyi', +# 'torchci/**/*.py', +# 'torchci/**/*.pyi', +# '.github/scripts/*.py', +# 'aws/lambda/whl_metadata_upload_pep658/**/*.py', +# ] +# command = [ +# 'python3', +# 'tools/linter/adapters/mypy_linter.py', +# '--config=mypy.ini', +# '--', +# '@{{PATHSFILE}}' +# ] +# init_command = [ +# 'python3', +# 'tools/linter/adapters/pip_init.py', +# '--dry-run={{DRYRUN}}', +# 'numpy==1.24.3', +# 'expecttest==0.1.3', +# 'mypy==0.982', +# 'types-requests==2.27.25', +# 'types-PyYAML==6.0.7', +# 'types-tabulate==0.8.8', +# 'types-protobuf==3.19.18', +# 'types-pkg-resources==0.1.3', +# 'types-Jinja2==2.11.9', +# 'junitparser==2.1.1', +# 'rich==10.9.0', +# 'pyyaml==6.0', +# ] [[linter]] code = 'TYPEIGNORE' diff --git a/tools/linter/adapters/clickhouse_sql_linter.py b/tools/linter/adapters/clickhouse_sql_linter.py index d27c42951c..c6e443423d 100644 --- a/tools/linter/adapters/clickhouse_sql_linter.py +++ b/tools/linter/adapters/clickhouse_sql_linter.py @@ -91,7 +91,7 @@ def check_file( code=LINTER_CODE, severity=LintSeverity.ERROR, original=None, - replacement='CAMYLL', + replacement=replacement, ) for match in RESULTS_RE.finditer(stdout) ] From 93548521e3a858b6b9cc5d6347632c0a3fef1cf1 Mon Sep 17 00:00:00 2001 From: Huy Do Date: Fri, 17 Jan 2025 17:23:27 -0800 Subject: [PATCH 06/14] It's working (kind of) --- .../linter/adapters/clickhouse_sql_linter.py | 44 +++++++++++++------ 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/tools/linter/adapters/clickhouse_sql_linter.py b/tools/linter/adapters/clickhouse_sql_linter.py index c6e443423d..ab910bf592 100644 --- a/tools/linter/adapters/clickhouse_sql_linter.py +++ b/tools/linter/adapters/clickhouse_sql_linter.py @@ -62,11 +62,26 @@ def run_command( def check_file( binary: str, - file: str, + filename: str, ) -> List[LintMessage]: + with open(filename) as f: + original = f.read() + try: - proc = run_command([binary, '--format','--query', file]) + proc = run_command( + [ + binary, + "--format", + "--comments", + "--max_line_length", + "80", + "--query", + original, + ] + ) except OSError as err: + with open("debug.txt", "a") as f: + print("HERE", file=f) return [ LintMessage( path=None, @@ -80,20 +95,23 @@ def check_file( description=(f"Failed due to {err.__class__.__name__}:\n{err}"), ) ] - stdout = str(proc.stdout, "utf-8").strip() + + replacement = proc.stdout + if original == replacement: + return [] + return [ LintMessage( - path=match["file"], - name=match["code"], - description=match["message"], - line=int(match["line"]), - char=int(match["char"]), + path=filename, + line=None, + char=None, code=LINTER_CODE, - severity=LintSeverity.ERROR, - original=None, - replacement=replacement, + severity=LintSeverity.WARNING, + name="format", + original=original, + replacement=replacement.decode("utf-8"), + description="See https://clickhouse.com/docs/en/operations/utilities/clickhouse-format.\nRun `lintrunner -a` to apply this patch.", ) - for match in RESULTS_RE.finditer(stdout) ] @@ -115,7 +133,6 @@ def main() -> None: args = parser.parse_args() - if not os.path.exists(args.binary): err_msg = LintMessage( path="", @@ -154,5 +171,6 @@ def main() -> None: logging.critical('Failed at "%s".', futures[future]) raise + if __name__ == "__main__": main() From 00926fcf94842049b89c58be34771299c1140426 Mon Sep 17 00:00:00 2001 From: Huy Do Date: Fri, 17 Jan 2025 17:25:05 -0800 Subject: [PATCH 07/14] This is not needed I think --- tools/linter/adapters/clickhouse_sql_linter.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tools/linter/adapters/clickhouse_sql_linter.py b/tools/linter/adapters/clickhouse_sql_linter.py index ab910bf592..f4b7215b9f 100644 --- a/tools/linter/adapters/clickhouse_sql_linter.py +++ b/tools/linter/adapters/clickhouse_sql_linter.py @@ -73,8 +73,6 @@ def check_file( binary, "--format", "--comments", - "--max_line_length", - "80", "--query", original, ] From 47d8a5e71d49e31253e3f9583612b59d65b8ac7b Mon Sep 17 00:00:00 2001 From: Huy Do Date: Fri, 17 Jan 2025 17:26:58 -0800 Subject: [PATCH 08/14] Remove debug log --- tools/linter/adapters/clickhouse_sql_linter.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tools/linter/adapters/clickhouse_sql_linter.py b/tools/linter/adapters/clickhouse_sql_linter.py index f4b7215b9f..2d3f864c50 100644 --- a/tools/linter/adapters/clickhouse_sql_linter.py +++ b/tools/linter/adapters/clickhouse_sql_linter.py @@ -78,8 +78,6 @@ def check_file( ] ) except OSError as err: - with open("debug.txt", "a") as f: - print("HERE", file=f) return [ LintMessage( path=None, From 4b36413f21529e6cfc17f482b99b3153ee4ea0d7 Mon Sep 17 00:00:00 2001 From: Huy Do Date: Tue, 21 Jan 2025 14:36:04 -0800 Subject: [PATCH 09/14] Revert "Correctly return 200 in api/flaky-tests/disable (#6184)" This reverts commit b972178769a64661dfb673b840cd2e22c48c8040. --- torchci/pages/api/flaky-tests/disable.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/torchci/pages/api/flaky-tests/disable.ts b/torchci/pages/api/flaky-tests/disable.ts index 171e4e9525..2952ac96f2 100644 --- a/torchci/pages/api/flaky-tests/disable.ts +++ b/torchci/pages/api/flaky-tests/disable.ts @@ -29,9 +29,8 @@ export default async function handler( if (authorization === process.env.FLAKY_TEST_BOT_KEY) { await disableFlakyTestsAndReenableNonFlakyTests(); res.status(200).end(); - } else { - res.status(403).end(); } + res.status(403).end(); } async function disableFlakyTestsAndReenableNonFlakyTests() { From 1615e6f0197d8a691b4f3d216cb51376a8c5f1b9 Mon Sep 17 00:00:00 2001 From: Huy Do Date: Tue, 21 Jan 2025 14:36:49 -0800 Subject: [PATCH 10/14] Revert "[Schema][Utilization] Add schema tables for job utilization (#6183)" This reverts commit abf801623ed49442dd32856e8f8e2066b0b22ab4. --- .../oss_ci_time_series_schema.sql | 35 ------------------ .../oss_ci_utilization_metadata_schema.sql | 37 ------------------- 2 files changed, 72 deletions(-) delete mode 100644 clickhouse_db_schema/oss_ci_utilization/oss_ci_time_series_schema.sql delete mode 100644 clickhouse_db_schema/oss_ci_utilization/oss_ci_utilization_metadata_schema.sql diff --git a/clickhouse_db_schema/oss_ci_utilization/oss_ci_time_series_schema.sql b/clickhouse_db_schema/oss_ci_utilization/oss_ci_time_series_schema.sql deleted file mode 100644 index 7ec0b762aa..0000000000 --- a/clickhouse_db_schema/oss_ci_utilization/oss_ci_time_series_schema.sql +++ /dev/null @@ -1,35 +0,0 @@ --- This query creates the oss_ci_time_series table on ClickHouse -CREATE TABLE misc.oss_ci_time_series( - -- created_at DateTime when the record is processed in db. - `created_at` DateTime64(0,'UTC'), - -- type of time series, for instance, utilization log data is 'utilization'. - `type` String, - `tags` Array(String) DEFAULT [], - `time_stamp` DateTime64(0,'UTC'), - `repo` String DEFAULT 'pytorch/pytorch', - `workflow_id` UInt64, - `run_attempt` UInt32, - `job_id` UInt64, - `workflow_name` String, - `job_name` String, - -- the data stored as raw json string. - -- Notice in clickhouse the length of string type is not limited. - `json_data` String DEFAULT '{}', - -- The raw records on S3, this is populated by the s3 replicator - `_meta` Tuple(bucket String, key String), -)ENGINE = SharedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}') -PARTITION BY toYYYYMM(time_stamp) -ORDER BY - ( - workflow_id, - job_id, - repo, - workflow_name, - job_name, - type, - time_stamp, - ) --- data exists in the db for a year. --- time to live is based on created_at which is when the record is inserted in db. -TTL toDate(created_at) + toIntervalYear(1) -SETTINGS index_granularity = 8192 diff --git a/clickhouse_db_schema/oss_ci_utilization/oss_ci_utilization_metadata_schema.sql b/clickhouse_db_schema/oss_ci_utilization/oss_ci_utilization_metadata_schema.sql deleted file mode 100644 index da20a1d02e..0000000000 --- a/clickhouse_db_schema/oss_ci_utilization/oss_ci_utilization_metadata_schema.sql +++ /dev/null @@ -1,37 +0,0 @@ --- This query creates the oss_ci_utilization_metadata table on ClickHouse -CREATE TABLE misc.oss_ci_utilization_metadata -( - `created_at` DateTime64(0, 'UTC'), - -- github info - `repo` String DEFAULT 'pytorch/pytorch', - `workflow_id` UInt64, - `run_attempt` UInt32, - `job_id` UInt64, - `workflow_name` String, - `job_name` String, - -- metadata - `usage_collect_interval` Float32, - `data_model_version` String, - `gpu_count` UInt32, - `cpu_count` UInt32, - `gpu_type` String DEFAULT 'None', - `start_at` DateTime64(0, 'UTC'), - `end_at` DateTime64(0, 'UTC'), - -- segments are post-job processed data to identify detected test intervals - `segments` Array(Tuple(level String, name String, start_at DateTime64(0, 'UTC'), end_at DateTime64(0, 'UTC'), extra_info Map(String, String))) DEFAULT [], - -- The raw records on S3, this is populated by the s3 replicator - `_meta` Tuple(bucket String, key String) -) -ENGINE = SharedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}') -PARTITION BY toYYYYMM(start_at) -ORDER BY ( - workflow_id, - job_id, - repo, - workflow_name, - job_name, - start_at) --- data exists in the db for a year. --- time to live is based on created_at which is when the record is inserted in db. -TTL toDate(created_at) + toIntervalYear(1) -SETTINGS index_granularity = 8192 From 4cab69aa2257be67a08082d92d4640a0941e9020 Mon Sep 17 00:00:00 2001 From: Huy Do Date: Tue, 21 Jan 2025 14:37:02 -0800 Subject: [PATCH 11/14] Revert "Use oss_ci_benchmark_metadata materialized view (#6167)" This reverts commit 929e0fe17730b493ff8f327be89baeb5d70ae71c. --- .../oss_ci_benchmark_branches/query.sql | 77 ++++++++++------- .../oss_ci_benchmark_names/query.sql | 83 +++++++++++------- .../query.sql | 84 ------------------- 3 files changed, 101 insertions(+), 143 deletions(-) delete mode 100644 torchci/clickhouse_queries/oss_ci_benchmark_v3_materialized_views/query.sql diff --git a/torchci/clickhouse_queries/oss_ci_benchmark_branches/query.sql b/torchci/clickhouse_queries/oss_ci_benchmark_branches/query.sql index b073945ff6..23e9de5813 100644 --- a/torchci/clickhouse_queries/oss_ci_benchmark_branches/query.sql +++ b/torchci/clickhouse_queries/oss_ci_benchmark_branches/query.sql @@ -1,39 +1,60 @@ -- This query is used to get the list of branches and commits used by different -- OSS CI benchmark experiments. This powers HUD benchmarks dashboards +WITH benchmarks AS ( + SELECT + o.head_branch AS head_branch, + o.head_sha AS head_sha, + o.workflow_id AS id, + IF( + empty(o.runners), + tupleElement(o.benchmark, 'extra_info') [ 'device' ], + tupleElement(o.runners [ 1 ], 'name') + ) AS device, + IF( + empty(o.runners), + tupleElement(o.benchmark, 'extra_info') [ 'arch' ], + tupleElement(o.runners [ 1 ], 'type') + ) AS arch, + o.timestamp AS timestamp, + toStartOfDay(fromUnixTimestamp(o.timestamp)) AS event_time + FROM + benchmark.oss_ci_benchmark_v3 o + WHERE + o.timestamp >= toUnixTimestamp({startTime: DateTime64(3) }) + AND o.timestamp < toUnixTimestamp({stopTime: DateTime64(3) }) + AND o.repo = {repo: String } + AND ( + has({benchmarks: Array(String) }, o.benchmark.name) + OR empty({benchmarks: Array(String) }) + ) + AND ( + has({models: Array(String) }, o.model.name) + OR empty({models: Array(String) }) + ) + AND ( + has({backends: Array(String) }, o.model.backend) + OR empty({backends: Array(String) }) + ) + AND ( + has({dtypes: Array(String) }, o.benchmark.dtype) + OR empty({dtypes: Array(String) }) + ) + AND ( + NOT has({excludedMetrics: Array(String) }, o.metric.name) + OR empty({excludedMetrics: Array(String) }) + ) + AND notEmpty(o.metric.name) +) SELECT DISTINCT replaceOne(head_branch, 'refs/heads/', '') AS head_branch, head_sha, - workflow_id AS id, - toStartOfDay(fromUnixTimestamp(timestamp)) AS event_time + id, + event_time FROM - benchmark.oss_ci_benchmark_metadata + benchmarks WHERE - timestamp >= toUnixTimestamp({startTime: DateTime64(3) }) - AND timestamp < toUnixTimestamp({stopTime: DateTime64(3) }) - AND repo = {repo: String } - AND ( - has({benchmarks: Array(String) }, benchmark_name) - OR empty({benchmarks: Array(String) }) - ) - AND ( - has({models: Array(String) }, model_name) - OR empty({models: Array(String) }) - ) - AND ( - has({backends: Array(String) }, model_backend) - OR empty({backends: Array(String) }) - ) - AND ( - has({dtypes: Array(String) }, benchmark_dtype) - OR empty({dtypes: Array(String) }) - ) - AND ( - NOT has({excludedMetrics: Array(String) }, metric_name) - OR empty({excludedMetrics: Array(String) }) - ) - AND notEmpty(metric_name) -- NB: DEVICE (ARCH) is the display format used by HUD when grouping together these two fields - AND ( + ( CONCAT( device, ' (', diff --git a/torchci/clickhouse_queries/oss_ci_benchmark_names/query.sql b/torchci/clickhouse_queries/oss_ci_benchmark_names/query.sql index 74207c1763..400ab75c01 100644 --- a/torchci/clickhouse_queries/oss_ci_benchmark_names/query.sql +++ b/torchci/clickhouse_queries/oss_ci_benchmark_names/query.sql @@ -1,41 +1,62 @@ --- This query is used by HUD benchmarks dashboards to get the list of experiment names +WITH benchmarks AS ( + SELECT + o.benchmark.name AS benchmark, + o.model.name AS model, + o.model.backend AS backend, + o.metric.name AS metric, + o.benchmark.dtype AS dtype, + IF( + empty(o.runners), + tupleElement(o.benchmark, 'extra_info') [ 'device' ], + tupleElement(o.runners [ 1 ], 'name') + ) AS device, + IF( + empty(o.runners), + tupleElement(o.benchmark, 'extra_info') [ 'arch' ], + tupleElement(o.runners [ 1 ], 'type') + ) AS arch + FROM + benchmark.oss_ci_benchmark_v3 o + WHERE + o.timestamp >= toUnixTimestamp({startTime: DateTime64(3) }) + AND o.timestamp < toUnixTimestamp({stopTime: DateTime64(3) }) + AND o.repo = {repo: String } + AND ( + has({benchmarks: Array(String) }, o.benchmark.name) + OR empty({benchmarks: Array(String) }) + ) + AND ( + has({models: Array(String) }, o.model.name) + OR empty({models: Array(String) }) + ) + AND ( + has({backends: Array(String) }, o.model.backend) + OR empty({backends: Array(String) }) + ) + AND ( + has({dtypes: Array(String) }, o.benchmark.dtype) + OR empty({dtypes: Array(String) }) + ) + AND ( + NOT has({excludedMetrics: Array(String) }, o.metric.name) + OR empty({excludedMetrics: Array(String) }) + ) + AND notEmpty(o.metric.name) +) SELECT - DISTINCT benchmark_name AS benchmark, - model_name AS model, - model_backend AS backend, - metric_name AS metric, - benchmark_dtype AS dtype, + DISTINCT benchmark, + model, + backend, + metric, + dtype, device, arch FROM - benchmark.oss_ci_benchmark_metadata + benchmarks WHERE - timestamp >= toUnixTimestamp({startTime: DateTime64(3) }) - AND timestamp < toUnixTimestamp({stopTime: DateTime64(3) }) - AND repo = {repo: String } - AND ( - has({benchmarks: Array(String) }, benchmark_name) - OR empty({benchmarks: Array(String) }) - ) - AND ( - has({models: Array(String) }, model_name) - OR empty({models: Array(String) }) - ) - AND ( - has({backends: Array(String) }, model_backend) - OR empty({backends: Array(String) }) - ) - AND ( - has({dtypes: Array(String) }, benchmark_dtype) - OR empty({dtypes: Array(String) }) - ) - AND ( - NOT has({excludedMetrics: Array(String) }, metric_name) - OR empty({excludedMetrics: Array(String) }) - ) - AND notEmpty(metric_name) -- NB: DEVICE (ARCH) is the display format used by HUD when grouping together these two fields - AND ( + ( CONCAT( device, ' (', diff --git a/torchci/clickhouse_queries/oss_ci_benchmark_v3_materialized_views/query.sql b/torchci/clickhouse_queries/oss_ci_benchmark_v3_materialized_views/query.sql deleted file mode 100644 index dee43e0066..0000000000 --- a/torchci/clickhouse_queries/oss_ci_benchmark_v3_materialized_views/query.sql +++ /dev/null @@ -1,84 +0,0 @@ --- This table is used to speed-up the performance of oss_ci_benchmark_names and --- and oss_ci_benchmark_branches queries -CREATE TABLE benchmark.oss_ci_benchmark_metadata ( - `repo` String, - `benchmark_name` String, - `benchmark_dtype` String, - `model_name` String, - `model_backend` String, - `device` String, - `arch` String, - `metric_name` String, - `head_branch` String, - `head_sha` String, - `workflow_id` UInt64, - `timestamp` UInt64, -) ENGINE = MergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}') -ORDER BY - ( - repo, - benchmark_name, - benchmark_dtype, - model_name, - model_backend, - device, - arch, - metric_name, - head_branch, - workflow_id, - timestamp - ) SETTINGS index_granularity = 8192; - -CREATE MATERIALIZED VIEW benchmark.oss_ci_benchmark_metadata_mv TO benchmark.oss_ci_benchmark_metadata AS -SELECT - repo AS repo, - tupleElement(benchmark, 'name') AS benchmark_name, - tupleElement(benchmark, 'dtype') AS benchmark_dtype, - tupleElement(model, 'name') AS model_name, - tupleElement(model, 'backend') AS model_backend, - IF( - empty(runners), - tupleElement(benchmark, 'extra_info') [ 'device' ], - tupleElement(runners [ 1 ], 'name') - ) AS device, - IF( - empty(runners), - tupleElement(benchmark, 'extra_info') [ 'arch' ], - tupleElement(runners [ 1 ], 'type') - ) AS arch, - tupleElement(metric, 'name') AS metric_name, - head_branch AS head_branch, - head_sha AS head_sha, - workflow_id AS workflow_id, - timestamp AS timestamp -FROM - benchmark.oss_ci_benchmark_v3 -WHERE - timestamp >= toUnixTimestamp(toDateTime('2025-01-20 22:45:00')); - --- Below is the SQL query to backfill the view with all data from 2024 onward -INSERT INTO - benchmark.oss_ci_benchmark_metadata -SELECT - repo AS repo, - tupleElement(benchmark, 'name') AS benchmark_name, - tupleElement(benchmark, 'dtype') AS benchmark_dtype, - tupleElement(model, 'name') AS model_name, - tupleElement(model, 'backend') AS model_backend, - IF( - empty(runners), - tupleElement(benchmark, 'extra_info') [ 'device' ], - tupleElement(runners [ 1 ], 'name') - ) AS device, - IF( - empty(runners), - tupleElement(benchmark, 'extra_info') [ 'arch' ], - tupleElement(runners [ 1 ], 'type') - ) AS arch, - tupleElement(metric, 'name') AS metric_name, - head_branch AS head_branch, - head_sha AS head_sha, - workflow_id AS workflow_id, - timestamp AS timestamp -FROM - benchmark.oss_ci_benchmark_v3 From 02ed717474750d2e32d1e81cf01a49f32b91ff38 Mon Sep 17 00:00:00 2001 From: Camyll Harajli Date: Wed, 22 Jan 2025 10:35:07 -0800 Subject: [PATCH 12/14] change from clickhouse to sql fluff using clickhouse dialect --- .lintrunner.toml | 9 +- .../linter/adapters/clickhouse_sql_linter.py | 48 ++------ .../clickhouse_queries/queued_jobs/query.sql | 80 +++++++------ .../queued_jobs_aggregate/query.sql | 110 +++++++++--------- .../test_time_per_class/query.sql | 43 +++---- .../query.sql | 75 ++++++------ .../test_time_per_file/query.sql | 41 ++++--- .../query.sql | 73 ++++++------ .../torchbench_list_userbenchmarks/query.sql | 3 +- 9 files changed, 234 insertions(+), 248 deletions(-) diff --git a/.lintrunner.toml b/.lintrunner.toml index 38e88c9e74..a2455f0e6b 100644 --- a/.lintrunner.toml +++ b/.lintrunner.toml @@ -346,17 +346,14 @@ exclude_patterns = [ command = [ 'python3', 'tools/linter/adapters/clickhouse_sql_linter.py', - '--binary=.lintbin/clickhouse', '@{{PATHSFILE}}', ] init_command = [ 'python3', - 'tools/linter/adapters/s3_init.py', - '--config-json=tools/linter/adapters/s3_init_config.json', - '--linter=clickhouse', + 'python3', + 'tools/linter/adapters/pip_init.py', '--dry-run={{DRYRUN}}', - '--output-dir=.lintbin', - '--output-name=clickhouse', + 'sqlfluff', ] is_formatter = true diff --git a/tools/linter/adapters/clickhouse_sql_linter.py b/tools/linter/adapters/clickhouse_sql_linter.py index 2d3f864c50..3c54d44172 100644 --- a/tools/linter/adapters/clickhouse_sql_linter.py +++ b/tools/linter/adapters/clickhouse_sql_linter.py @@ -61,20 +61,16 @@ def run_command( def check_file( - binary: str, filename: str, ) -> List[LintMessage]: - with open(filename) as f: - original = f.read() - try: proc = run_command( [ - binary, - "--format", - "--comments", - "--query", - original, + "sqlfluff", + "format", + "--dialect", + "clickhouse", + filename, ] ) except OSError as err: @@ -92,9 +88,7 @@ def check_file( ) ] - replacement = proc.stdout - if original == replacement: - return [] + lint_message = proc.stdout return [ LintMessage( @@ -104,9 +98,9 @@ def check_file( code=LINTER_CODE, severity=LintSeverity.WARNING, name="format", - original=original, - replacement=replacement.decode("utf-8"), - description="See https://clickhouse.com/docs/en/operations/utilities/clickhouse-format.\nRun `lintrunner -a` to apply this patch.", + original=None, + replacement=None, + description=lint_message.decode("utf-8"), ) ] @@ -121,32 +115,9 @@ def main() -> None: nargs="+", help="paths to lint", ) - parser.add_argument( - "--binary", - required=True, - help="clickhouse binary path", - ) args = parser.parse_args() - if not os.path.exists(args.binary): - err_msg = LintMessage( - path="", - line=None, - char=None, - code=LINTER_CODE, - severity=LintSeverity.ERROR, - name="command-failed", - original=None, - replacement=None, - description=( - f"Could not find clickhouse binary at {args.binary}," - " you may need to run `lintrunner init`." - ), - ) - print(json.dumps(err_msg._asdict()), flush=True) - exit(0) - with concurrent.futures.ThreadPoolExecutor( max_workers=os.cpu_count(), thread_name_prefix="Thread", @@ -154,7 +125,6 @@ def main() -> None: futures = { executor.submit( check_file, - args.binary, filename, ): filename for filename in args.filenames diff --git a/torchci/clickhouse_queries/queued_jobs/query.sql b/torchci/clickhouse_queries/queued_jobs/query.sql index 493222a58c..59fa310344 100644 --- a/torchci/clickhouse_queries/queued_jobs/query.sql +++ b/torchci/clickhouse_queries/queued_jobs/query.sql @@ -1,43 +1,47 @@ --- This query is used by HUD metrics page to get the list of queued jobs with possible_queued_jobs as ( - select id, run_id - from default.workflow_job -- FINAL not needed since we just use this to filter a table that has already been FINALed - where status = 'queued' - AND created_at < (CURRENT_TIMESTAMP() - INTERVAL 5 MINUTE) - AND created_at > (CURRENT_TIMESTAMP() - INTERVAL 1 WEEK) + select + id, + run_id + from default.workflow_job -- FINAL not needed since we just use this to filter a table that has already been FINALed + where + status = 'queued' + and created_at < (CURRENT_TIMESTAMP() - interval 5 minute) + and created_at > (CURRENT_TIMESTAMP() - interval 1 week) ) -SELECT - DATE_DIFF( - 'second', - job.created_at, - CURRENT_TIMESTAMP() - ) AS queue_s, - CONCAT(workflow.name, ' / ', job.name) AS name, - job.html_url, - IF( - LENGTH(job.labels) = 0, - 'N/A', + +select + DATE_DIFF( + 'second', + job.created_at, + CURRENT_TIMESTAMP() + ) as queue_s, + CONCAT(workflow.name, ' / ', job.name) as name, + job.html_url, IF( - LENGTH(job.labels) > 1, - job.labels[2], - job.labels[1] - ) - ) AS machine_type -FROM - default.workflow_job job final - JOIN default.workflow_run workflow final ON workflow.id = job.run_id -WHERE - job.id in (select id from possible_queued_jobs) - and workflow.id in (select run_id from possible_queued_jobs) - and workflow.repository.'full_name' = 'pytorch/pytorch' - AND job.status = 'queued' - /* These two conditions are workarounds for GitHub's broken API. Sometimes */ - /* jobs get stuck in a permanently "queued" state but definitely ran. We can */ - /* detect this by looking at whether any steps executed (if there were, */ - /* obviously the job started running), and whether the workflow was marked as */ - /* complete (somehow more reliable than the job-level API) */ - AND LENGTH(job.steps) = 0 - AND workflow.status != 'completed' -ORDER BY - queue_s DESC + LENGTH(job.labels) = 0, + 'N/A', + IF( + LENGTH(job.labels) > 1, + job.labels[2], + job.labels[1] + ) + ) as machine_type +from + default.workflow_job job final +join default.workflow_run workflow final on workflow.id = job.run_id +where + job.id in (select id from possible_queued_jobs) + and workflow.id in (select run_id from possible_queued_jobs) + and workflow.repository.'full_name' = 'pytorch/pytorch' + and job.status = 'queued' + /* These two conditions are workarounds for GitHub's broken API. Sometimes */ + /* jobs get stuck in a permanently "queued" state but definitely ran. We can */ + /* detect this by looking at whether any steps executed (if there were, */ + /* obviously the job started running), and whether the workflow was marked as */ + /* complete (somehow more reliable than the job-level API) */ + and LENGTH(job.steps) = 0 + and workflow.status != 'completed' +order by + queue_s desc settings allow_experimental_analyzer = 1; diff --git a/torchci/clickhouse_queries/queued_jobs_aggregate/query.sql b/torchci/clickhouse_queries/queued_jobs_aggregate/query.sql index 0d017679e0..19a4f30e3c 100644 --- a/torchci/clickhouse_queries/queued_jobs_aggregate/query.sql +++ b/torchci/clickhouse_queries/queued_jobs_aggregate/query.sql @@ -6,66 +6,70 @@ --- additional runners to spin up. with possible_queued_jobs as ( - select id, run_id - from default.workflow_job - where - status = 'queued' - AND created_at < ( + select + id, + run_id + from default.workflow_job + where + status = 'queued' + and created_at < ( -- Only consider jobs that have been queued for a significant period of time - CURRENT_TIMESTAMP() - INTERVAL 30 MINUTE - ) - AND created_at > ( + CURRENT_TIMESTAMP() - interval 30 minute + ) + and created_at > ( -- Queued jobs are automatically cancelled after this long. Any allegedly pending -- jobs older than this are actually bad data - CURRENT_TIMESTAMP() - INTERVAL 3 DAY - ) + CURRENT_TIMESTAMP() - interval 3 day + ) ), - queued_jobs as ( - SELECT - DATE_DIFF( - 'minute', - job.created_at, - CURRENT_TIMESTAMP() - ) AS queue_m, - workflow.repository.owner.login as org, - workflow.repository.name as repo, - CONCAT(workflow.name, ' / ', job.name) AS name, - job.html_url, - IF( - LENGTH(job.labels) = 0, - 'N/A', + +queued_jobs as ( + select + DATE_DIFF( + 'minute', + job.created_at, + CURRENT_TIMESTAMP() + ) as queue_m, + workflow.repository.owner.login as org, + workflow.repository.name as repo, + CONCAT(workflow.name, ' / ', job.name) as name, + job.html_url, IF( - LENGTH(job.labels) > 1, - job.labels[2], - job.labels[1] - ) - ) AS runner_label - FROM - default.workflow_job job final - JOIN default.workflow_run workflow final ON workflow.id = job.run_id - WHERE - job.id in (select id from possible_queued_jobs) - and workflow.id in (select run_id from possible_queued_jobs) - and workflow.repository.owner.login in ('pytorch', 'pytorch-labs') - AND job.status = 'queued' - /* These two conditions are workarounds for GitHub's broken API. Sometimes */ - /* jobs get stuck in a permanently "queued" state but definitely ran. We can */ - /* detect this by looking at whether any steps executed (if there were, */ - /* obviously the job started running), and whether the workflow was marked as */ - /* complete (somehow more reliable than the job-level API) */ - AND LENGTH(job.steps) = 0 - AND workflow.status != 'completed' - ORDER BY - queue_m DESC + LENGTH(job.labels) = 0, + 'N/A', + IF( + LENGTH(job.labels) > 1, + job.labels[2], + job.labels[1] + ) + ) as runner_label + from + default.workflow_job job final + join default.workflow_run workflow final on workflow.id = job.run_id + where + job.id in (select id from possible_queued_jobs) + and workflow.id in (select run_id from possible_queued_jobs) + and workflow.repository.owner.login in ('pytorch', 'pytorch-labs') + and job.status = 'queued' + /* These two conditions are workarounds for GitHub's broken API. Sometimes */ + /* jobs get stuck in a permanently "queued" state but definitely ran. We can */ + /* detect this by looking at whether any steps executed (if there were, */ + /* obviously the job started running), and whether the workflow was marked as */ + /* complete (somehow more reliable than the job-level API) */ + and LENGTH(job.steps) = 0 + and workflow.status != 'completed' + order by + queue_m desc ) + select - runner_label, - org, - repo, - count(*) as num_queued_jobs, - min(queue_m) as min_queue_time_minutes, - max(queue_m) as max_queue_time_minutes + runner_label, + org, + repo, + COUNT(*) as num_queued_jobs, + MIN(queue_m) as min_queue_time_minutes, + MAX(queue_m) as max_queue_time_minutes from queued_jobs group by runner_label, org, repo order by max_queue_time_minutes desc -settings allow_experimental_analyzer = 1; \ No newline at end of file +settings allow_experimental_analyzer = 1; diff --git a/torchci/clickhouse_queries/test_time_per_class/query.sql b/torchci/clickhouse_queries/test_time_per_class/query.sql index bc19426920..5314b2d284 100644 --- a/torchci/clickhouse_queries/test_time_per_class/query.sql +++ b/torchci/clickhouse_queries/test_time_per_class/query.sql @@ -1,8 +1,7 @@ WITH most_recent_strict_commits AS ( - SELECT - push.head_commit.id as sha + SELECT push.head_commit.id AS sha FROM - default.push final + default.push FINAL WHERE push.ref = 'refs/heads/viable/strict' AND push.repository.full_name = 'pytorch/pytorch' @@ -10,41 +9,44 @@ WITH most_recent_strict_commits AS ( push.head_commit.timestamp DESC LIMIT 3 -), workflow AS ( - SELECT - id +), + +workflow AS ( + SELECT id FROM materialized_views.workflow_run_by_head_sha w - where head_sha in (select sha from most_recent_strict_commits) + WHERE head_sha IN (SELECT sha FROM most_recent_strict_commits) ), + job AS ( SELECT j.name, j.id, j.run_id FROM - default.workflow_job j final - where j.id in ( - select id from materialized_views.workflow_job_by_head_sha - where head_sha in (select sha from most_recent_strict_commits) + default.workflow_job j FINAL + WHERE j.id IN ( + SELECT id FROM materialized_views.workflow_job_by_head_sha + WHERE head_sha IN (SELECT sha FROM most_recent_strict_commits) ) - and j.run_id in (select id from workflow) + AND j.run_id IN (SELECT id FROM workflow) ), + class_duration_per_job AS ( SELECT - test_run.invoking_file as file, - test_run.classname as classname, - SUM(time) as time, - REGEXP_EXTRACT(job.name, '^(.*) /', 1) as base_name, - REGEXP_EXTRACT(job.name, '/ test \(([\w-]*),', 1) as test_config + test_run.invoking_file AS file, + test_run.classname AS classname, + SUM(time) AS time, + REGEXP_EXTRACT(job.name, '^(.*) /', 1) AS base_name, + REGEXP_EXTRACT(job.name, '/ test \(([\w-]*),', 1) AS test_config FROM default.test_run_summary test_run - INNER JOIN job ON test_run.job_id = job.id + INNER JOIN job ON test_run.job_id = job.id WHERE /* cpp tests do not populate `file` for some reason. */ /* Exclude them as we don't include them in our slow test infra */ test_run.file != '' - and test_run.workflow_id in (select id from workflow) + AND test_run.workflow_id IN (SELECT id FROM workflow) GROUP BY test_run.invoking_file, test_run.classname, @@ -52,12 +54,13 @@ class_duration_per_job AS ( test_config, job.run_id ) + SELECT REPLACE(file, '.', '/') AS file, classname, base_name, test_config, - AVG(time) as time + AVG(time) AS time FROM class_duration_per_job GROUP BY diff --git a/torchci/clickhouse_queries/test_time_per_class_periodic_jobs/query.sql b/torchci/clickhouse_queries/test_time_per_class_periodic_jobs/query.sql index 1333b86404..62d54b55e4 100644 --- a/torchci/clickhouse_queries/test_time_per_class_periodic_jobs/query.sql +++ b/torchci/clickhouse_queries/test_time_per_class_periodic_jobs/query.sql @@ -1,67 +1,69 @@ -- same as test_time_per_file query except for the first select WITH good_periodic_sha AS ( - select - job.head_sha as sha - from - default.workflow_job job final - JOIN default.workflow_run workflow final on workflow.id = job.run_id - JOIN default.push on workflow.head_commit.'id' = push.head_commit.'id' - where + SELECT job.head_sha AS sha + FROM + default.workflow_job job FINAL + JOIN default.workflow_run workflow FINAL ON workflow.id = job.run_id + JOIN default.push ON workflow.head_commit.'id' = push.head_commit.'id' + WHERE workflow.name = 'periodic' AND workflow.head_branch LIKE 'main' - and workflow.repository.'full_name' = 'pytorch/pytorch' - group by + AND workflow.repository.'full_name' = 'pytorch/pytorch' + GROUP BY job.head_sha, push.head_commit.'timestamp' - having - groupBitAnd( + HAVING + GROUPBITAND( job.conclusion = 'success' - and job.conclusion is not null + AND job.conclusion IS NOT null ) = 1 - order by - push.head_commit.'timestamp' desc - limit + ORDER BY + push.head_commit.'timestamp' DESC + LIMIT 3 -), workflow AS ( - SELECT - id +), + +workflow AS ( + SELECT id FROM - default.workflow_run final - where - id in ( + default.workflow_run FINAL + WHERE + id IN ( SELECT id FROM materialized_views.workflow_run_by_head_sha w - where head_sha in (select sha from good_periodic_sha) + WHERE head_sha IN (SELECT sha FROM good_periodic_sha) ) - and name = 'periodic' + AND name = 'periodic' ), + job AS ( SELECT j.name, j.id, j.run_id FROM - default.workflow_job j final - where j.id in ( - select id from materialized_views.workflow_job_by_head_sha - where head_sha in (select sha from good_periodic_sha) + default.workflow_job j FINAL + WHERE j.id IN ( + SELECT id FROM materialized_views.workflow_job_by_head_sha + WHERE head_sha IN (SELECT sha FROM good_periodic_sha) ) - and j.run_id in (select id from workflow) + AND j.run_id IN (SELECT id FROM workflow) ), + class_duration_per_job AS ( SELECT - test_run.invoking_file as file, - test_run.classname as classname, - SUM(time) as time, - REGEXP_EXTRACT(job.name, '^(.*) /', 1) as base_name, - REGEXP_EXTRACT(job.name, '/ test \(([\w-]*),', 1) as test_config + test_run.invoking_file AS file, + test_run.classname AS classname, + SUM(time) AS time, + REGEXP_EXTRACT(job.name, '^(.*) /', 1) AS base_name, + REGEXP_EXTRACT(job.name, '/ test \(([\w-]*),', 1) AS test_config FROM default.test_run_summary test_run - INNER JOIN job ON test_run.job_id = job.id + INNER JOIN job ON test_run.job_id = job.id WHERE /* cpp tests do not populate `file` for some reason. */ /* Exclude them as we don't include them in our slow test infra */ test_run.file != '' - and test_run.workflow_id in (select id from workflow) + AND test_run.workflow_id IN (SELECT id FROM workflow) GROUP BY test_run.invoking_file, test_run.classname, @@ -69,12 +71,13 @@ class_duration_per_job AS ( test_config, job.run_id ) + SELECT REPLACE(file, '.', '/') AS file, classname, base_name, test_config, - AVG(time) as time + AVG(time) AS time FROM class_duration_per_job GROUP BY diff --git a/torchci/clickhouse_queries/test_time_per_file/query.sql b/torchci/clickhouse_queries/test_time_per_file/query.sql index 37c7ae2a7d..4b78c3846f 100644 --- a/torchci/clickhouse_queries/test_time_per_file/query.sql +++ b/torchci/clickhouse_queries/test_time_per_file/query.sql @@ -1,8 +1,7 @@ WITH most_recent_strict_commits AS ( - SELECT - push.head_commit.id as sha + SELECT push.head_commit.id AS sha FROM - default.push final + default.push FINAL WHERE push.ref = 'refs/heads/viable/strict' AND push.repository.full_name = 'pytorch/pytorch' @@ -10,51 +9,55 @@ WITH most_recent_strict_commits AS ( push.head_commit.timestamp DESC LIMIT 3 -), workflow AS ( - SELECT - id +), + +workflow AS ( + SELECT id FROM materialized_views.workflow_run_by_head_sha w - where head_sha in (select sha from most_recent_strict_commits) + WHERE head_sha IN (SELECT sha FROM most_recent_strict_commits) ), + job AS ( SELECT j.name, j.id, j.run_id FROM - default.workflow_job j final - where j.id in ( - select id from materialized_views.workflow_job_by_head_sha - where head_sha in (select sha from most_recent_strict_commits) + default.workflow_job j FINAL + WHERE j.id IN ( + SELECT id FROM materialized_views.workflow_job_by_head_sha + WHERE head_sha IN (SELECT sha FROM most_recent_strict_commits) ) - and j.run_id in (select id from workflow) + AND j.run_id IN (SELECT id FROM workflow) ), + file_duration_per_job AS ( SELECT - test_run.invoking_file as file, - SUM(time) as time, - REGEXP_EXTRACT(job.name, '^(.*) /', 1) as base_name, - REGEXP_EXTRACT(job.name, '/ test \(([\w-]*),', 1) as test_config + test_run.invoking_file AS file, + SUM(time) AS time, + REGEXP_EXTRACT(job.name, '^(.*) /', 1) AS base_name, + REGEXP_EXTRACT(job.name, '/ test \(([\w-]*),', 1) AS test_config FROM default.test_run_summary test_run - INNER JOIN job ON test_run.job_id = job.id + INNER JOIN job ON test_run.job_id = job.id WHERE /* cpp tests do not populate `file` for some reason. */ /* Exclude them as we don't include them in our slow test infra */ test_run.file != '' - and test_run.workflow_id in (select id from workflow) + AND test_run.workflow_id IN (SELECT id FROM workflow) GROUP BY test_run.invoking_file, base_name, test_config, job.run_id ) + SELECT REPLACE(file, '.', '/') AS file, base_name, test_config, - AVG(time) as time + AVG(time) AS time FROM file_duration_per_job GROUP BY diff --git a/torchci/clickhouse_queries/test_time_per_file_periodic_jobs/query.sql b/torchci/clickhouse_queries/test_time_per_file_periodic_jobs/query.sql index 19f36b71d2..332b47c866 100644 --- a/torchci/clickhouse_queries/test_time_per_file_periodic_jobs/query.sql +++ b/torchci/clickhouse_queries/test_time_per_file_periodic_jobs/query.sql @@ -1,77 +1,80 @@ -- same as test_time_per_file query except for the first select WITH good_periodic_sha AS ( - select - job.head_sha as sha - from - default.workflow_job job final - JOIN default.workflow_run workflow final on workflow.id = job.run_id - JOIN default.push on workflow.head_commit.'id' = push.head_commit.'id' - where + SELECT job.head_sha AS sha + FROM + default.workflow_job job FINAL + JOIN default.workflow_run workflow FINAL ON workflow.id = job.run_id + JOIN default.push ON workflow.head_commit.'id' = push.head_commit.'id' + WHERE workflow.name = 'periodic' AND workflow.head_branch LIKE 'main' - and workflow.repository.'full_name' = 'pytorch/pytorch' - group by + AND workflow.repository.'full_name' = 'pytorch/pytorch' + GROUP BY job.head_sha, push.head_commit.'timestamp' - having - groupBitAnd( + HAVING + GROUPBITAND( job.conclusion = 'success' - and job.conclusion is not null + AND job.conclusion IS NOT null ) = 1 - order by - push.head_commit.'timestamp' desc - limit + ORDER BY + push.head_commit.'timestamp' DESC + LIMIT 3 -), workflow AS ( - SELECT - id +), + +workflow AS ( + SELECT id FROM - default.workflow_run final - where - id in ( + default.workflow_run FINAL + WHERE + id IN ( SELECT id FROM materialized_views.workflow_run_by_head_sha w - where head_sha in (select sha from good_periodic_sha) + WHERE head_sha IN (SELECT sha FROM good_periodic_sha) ) - and name = 'periodic' + AND name = 'periodic' ), + job AS ( SELECT j.name, j.id, j.run_id FROM - default.workflow_job j final - where j.id in ( - select id from materialized_views.workflow_job_by_head_sha - where head_sha in (select sha from good_periodic_sha) + default.workflow_job j FINAL + WHERE j.id IN ( + SELECT id FROM materialized_views.workflow_job_by_head_sha + WHERE head_sha IN (SELECT sha FROM good_periodic_sha) ) - and j.run_id in (select id from workflow) + AND j.run_id IN (SELECT id FROM workflow) ), + file_duration_per_job AS ( SELECT - test_run.invoking_file as file, - SUM(time) as time, - REGEXP_EXTRACT(job.name, '^(.*) /', 1) as base_name, - REGEXP_EXTRACT(job.name, '/ test \(([\w-]*),', 1) as test_config + test_run.invoking_file AS file, + SUM(time) AS time, + REGEXP_EXTRACT(job.name, '^(.*) /', 1) AS base_name, + REGEXP_EXTRACT(job.name, '/ test \(([\w-]*),', 1) AS test_config FROM default.test_run_summary test_run - INNER JOIN job ON test_run.job_id = job.id + INNER JOIN job ON test_run.job_id = job.id WHERE /* cpp tests do not populate `file` for some reason. */ /* Exclude them as we don't include them in our slow test infra */ test_run.file != '' - and test_run.workflow_id in (select id from workflow) + AND test_run.workflow_id IN (SELECT id FROM workflow) GROUP BY test_run.invoking_file, base_name, test_config, job.run_id ) + SELECT REPLACE(file, '.', '/') AS file, base_name, test_config, - AVG(time) as time + AVG(time) AS time FROM file_duration_per_job GROUP BY diff --git a/torchci/clickhouse_queries/torchbench_list_userbenchmarks/query.sql b/torchci/clickhouse_queries/torchbench_list_userbenchmarks/query.sql index accbbf3d2b..136d37b319 100644 --- a/torchci/clickhouse_queries/torchbench_list_userbenchmarks/query.sql +++ b/torchci/clickhouse_queries/torchbench_list_userbenchmarks/query.sql @@ -1,4 +1,3 @@ -SELECT DISTINCT - name +SELECT DISTINCT name FROM benchmark.torchbench_userbenchmark From 2a3ebf9b2ef1ad5fabae6d24a1360107a4b16ef9 Mon Sep 17 00:00:00 2001 From: Camyll Harajli Date: Wed, 22 Jan 2025 13:33:35 -0800 Subject: [PATCH 13/14] add string replacment to account for sql variables --- .lintrunner.toml | 4 +- tools/linter/adapters/sqlfluff_linter.py | 156 +++++++++++++++++++++++ 2 files changed, 158 insertions(+), 2 deletions(-) create mode 100644 tools/linter/adapters/sqlfluff_linter.py diff --git a/.lintrunner.toml b/.lintrunner.toml index a2455f0e6b..22b31d5042 100644 --- a/.lintrunner.toml +++ b/.lintrunner.toml @@ -339,13 +339,13 @@ init_command = [ is_formatter = true [[linter]] -code = 'CLICKHOUSE' +code = 'SQLFLUFF' include_patterns = ['torchci/clickhouse_queries/**/*.sql'] exclude_patterns = [ ] command = [ 'python3', - 'tools/linter/adapters/clickhouse_sql_linter.py', + 'tools/linter/adapters/sqlfluff_linter.py', '@{{PATHSFILE}}', ] init_command = [ diff --git a/tools/linter/adapters/sqlfluff_linter.py b/tools/linter/adapters/sqlfluff_linter.py new file mode 100644 index 0000000000..307b302ae7 --- /dev/null +++ b/tools/linter/adapters/sqlfluff_linter.py @@ -0,0 +1,156 @@ +import argparse +import concurrent.futures +import json +import logging +import os +import re +import subprocess +import time +from enum import Enum +from typing import List, NamedTuple, Optional, Pattern + + +LINTER_CODE = "SQLFLUFF" + + +class LintSeverity(str, Enum): + ERROR = "error" + WARNING = "warning" + ADVICE = "advice" + DISABLED = "disabled" + + +class LintMessage(NamedTuple): + path: Optional[str] + line: Optional[int] + char: Optional[int] + code: str + severity: LintSeverity + name: str + original: Optional[str] + replacement: Optional[str] + description: Optional[str] + + +RESULTS_RE: Pattern[str] = re.compile( + r"""(?mx) + ^ + (?P.*?): + (?P\d+): + (?P\d+): + \s(?P.*) + \s(?P\[.*\]) + $ + """ +) + + +def run_command( + args: List[str], +) -> "subprocess.CompletedProcess[bytes]": + logging.debug("$ %s", " ".join(args)) + start_time = time.monotonic() + try: + return subprocess.run( + args, + capture_output=True, + ) + finally: + end_time = time.monotonic() + logging.debug("took %dms", (end_time - start_time) * 1000) + + +def check_file( + filename: str, +) -> List[LintMessage]: + with open(filename, 'r') as f: + original = f.read() + original = original.replace('{', '\'{').replace('}', '}\'') + with open(filename, 'w') as f: + f.write(original) + + try: + # proc.run_command(sed -i -e "s/'{/{/g" -e "s/}'/}/g") + proc = run_command( + [ + "sqlfluff", + "format", + "--dialect", + "clickhouse", + filename, + ] + ) + except OSError as err: + return [ + LintMessage( + path=None, + line=None, + char=None, + code=LINTER_CODE, + severity=LintSeverity.ERROR, + name="command-failed", + original=None, + replacement=None, + description=(f"Failed due to {err.__class__.__name__}:\n{err}"), + ) + ] + + with open(filename, 'r') as f: + final = f.read() + final = final.replace('\'{', '{').replace('}\'', '}') + with open(filename, 'w') as f: + f.write(final) + + lint_message = proc.stdout + + + return [ + LintMessage( + path=filename, + line=None, + char=None, + code=LINTER_CODE, + severity=LintSeverity.WARNING, + name="format", + original=None, + replacement=None, + description=lint_message.decode("utf-8"), + ) + ] + + +def main() -> None: + parser = argparse.ArgumentParser( + description=f"sqlfluff format linter for sql queries.", + fromfile_prefix_chars="@", + ) + parser.add_argument( + "filenames", + nargs="+", + help="paths to lint", + ) + + args = parser.parse_args() + + with concurrent.futures.ThreadPoolExecutor( + max_workers=os.cpu_count(), + thread_name_prefix="Thread", + ) as executor: + futures = { + executor.submit( + check_file, + filename, + ): filename + for filename in args.filenames + } + for future in concurrent.futures.as_completed(futures): + try: + for lint_message in future.result(): + print(json.dumps(lint_message._asdict()), flush=True) + except Exception: + logging.critical('Failed at "%s".', futures[future]) + raise + + +if __name__ == "__main__": + main() From 7129dc629a823e74ae3567ba8e9caaa068548639 Mon Sep 17 00:00:00 2001 From: Camyll Harajli Date: Wed, 22 Jan 2025 13:38:47 -0800 Subject: [PATCH 14/14] remove old named file --- .../linter/adapters/clickhouse_sql_linter.py | 142 ------------------ 1 file changed, 142 deletions(-) delete mode 100644 tools/linter/adapters/clickhouse_sql_linter.py diff --git a/tools/linter/adapters/clickhouse_sql_linter.py b/tools/linter/adapters/clickhouse_sql_linter.py deleted file mode 100644 index 3c54d44172..0000000000 --- a/tools/linter/adapters/clickhouse_sql_linter.py +++ /dev/null @@ -1,142 +0,0 @@ -import argparse -import concurrent.futures -import json -import logging -import os -import re -import subprocess -import time -from enum import Enum -from typing import List, NamedTuple, Optional, Pattern - - -LINTER_CODE = "CLICKHOUSE" - - -class LintSeverity(str, Enum): - ERROR = "error" - WARNING = "warning" - ADVICE = "advice" - DISABLED = "disabled" - - -class LintMessage(NamedTuple): - path: Optional[str] - line: Optional[int] - char: Optional[int] - code: str - severity: LintSeverity - name: str - original: Optional[str] - replacement: Optional[str] - description: Optional[str] - - -RESULTS_RE: Pattern[str] = re.compile( - r"""(?mx) - ^ - (?P.*?): - (?P\d+): - (?P\d+): - \s(?P.*) - \s(?P\[.*\]) - $ - """ -) - - -def run_command( - args: List[str], -) -> "subprocess.CompletedProcess[bytes]": - logging.debug("$ %s", " ".join(args)) - start_time = time.monotonic() - try: - return subprocess.run( - args, - capture_output=True, - ) - finally: - end_time = time.monotonic() - logging.debug("took %dms", (end_time - start_time) * 1000) - - -def check_file( - filename: str, -) -> List[LintMessage]: - try: - proc = run_command( - [ - "sqlfluff", - "format", - "--dialect", - "clickhouse", - filename, - ] - ) - except OSError as err: - return [ - LintMessage( - path=None, - line=None, - char=None, - code=LINTER_CODE, - severity=LintSeverity.ERROR, - name="command-failed", - original=None, - replacement=None, - description=(f"Failed due to {err.__class__.__name__}:\n{err}"), - ) - ] - - lint_message = proc.stdout - - return [ - LintMessage( - path=filename, - line=None, - char=None, - code=LINTER_CODE, - severity=LintSeverity.WARNING, - name="format", - original=None, - replacement=None, - description=lint_message.decode("utf-8"), - ) - ] - - -def main() -> None: - parser = argparse.ArgumentParser( - description=f"Clickhouse format linter for sql queries.", - fromfile_prefix_chars="@", - ) - parser.add_argument( - "filenames", - nargs="+", - help="paths to lint", - ) - - args = parser.parse_args() - - with concurrent.futures.ThreadPoolExecutor( - max_workers=os.cpu_count(), - thread_name_prefix="Thread", - ) as executor: - futures = { - executor.submit( - check_file, - filename, - ): filename - for filename in args.filenames - } - for future in concurrent.futures.as_completed(futures): - try: - for lint_message in future.result(): - print(json.dumps(lint_message._asdict()), flush=True) - except Exception: - logging.critical('Failed at "%s".', futures[future]) - raise - - -if __name__ == "__main__": - main()