Skip to content

Commit

Permalink
change from clickhouse to sql fluff using clickhouse dialect
Browse files Browse the repository at this point in the history
  • Loading branch information
Camyll committed Jan 22, 2025
1 parent c788173 commit d913008
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 248 deletions.
9 changes: 3 additions & 6 deletions .lintrunner.toml
Original file line number Diff line number Diff line change
Expand Up @@ -346,17 +346,14 @@ exclude_patterns = [
command = [
'python3',
'tools/linter/adapters/clickhouse_sql_linter.py',
'--binary=.lintbin/clickhouse',
'@{{PATHSFILE}}',
]
init_command = [
'python3',
'tools/linter/adapters/s3_init.py',
'--config-json=tools/linter/adapters/s3_init_config.json',
'--linter=clickhouse',
'python3',
'tools/linter/adapters/pip_init.py',
'--dry-run={{DRYRUN}}',
'--output-dir=.lintbin',
'--output-name=clickhouse',
'sqlfluff',
]
is_formatter = true

Expand Down
48 changes: 9 additions & 39 deletions tools/linter/adapters/clickhouse_sql_linter.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,16 @@ def run_command(


def check_file(
binary: str,
filename: str,
) -> List[LintMessage]:
with open(filename) as f:
original = f.read()

try:
proc = run_command(
[
binary,
"--format",
"--comments",
"--query",
original,
"sqlfluff",
"format",
"--dialect",
"clickhouse",
filename,
]
)
except OSError as err:
Expand All @@ -92,9 +88,7 @@ def check_file(
)
]

replacement = proc.stdout
if original == replacement:
return []
lint_message = proc.stdout

return [
LintMessage(
Expand All @@ -104,9 +98,9 @@ def check_file(
code=LINTER_CODE,
severity=LintSeverity.WARNING,
name="format",
original=original,
replacement=replacement.decode("utf-8"),
description="See https://clickhouse.com/docs/en/operations/utilities/clickhouse-format.\nRun `lintrunner -a` to apply this patch.",
original=None,
replacement=None,
description=lint_message.decode("utf-8"),
)
]

Expand All @@ -121,40 +115,16 @@ def main() -> None:
nargs="+",
help="paths to lint",
)
parser.add_argument(
"--binary",
required=True,
help="clickhouse binary path",
)

args = parser.parse_args()

if not os.path.exists(args.binary):
err_msg = LintMessage(
path="<none>",
line=None,
char=None,
code=LINTER_CODE,
severity=LintSeverity.ERROR,
name="command-failed",
original=None,
replacement=None,
description=(
f"Could not find clickhouse binary at {args.binary},"
" you may need to run `lintrunner init`."
),
)
print(json.dumps(err_msg._asdict()), flush=True)
exit(0)

with concurrent.futures.ThreadPoolExecutor(
max_workers=os.cpu_count(),
thread_name_prefix="Thread",
) as executor:
futures = {
executor.submit(
check_file,
args.binary,
filename,
): filename
for filename in args.filenames
Expand Down
80 changes: 42 additions & 38 deletions torchci/clickhouse_queries/queued_jobs/query.sql
Original file line number Diff line number Diff line change
@@ -1,43 +1,47 @@
--- This query is used by HUD metrics page to get the list of queued jobs
with possible_queued_jobs as (
select id, run_id
from default.workflow_job -- FINAL not needed since we just use this to filter a table that has already been FINALed
where status = 'queued'
AND created_at < (CURRENT_TIMESTAMP() - INTERVAL 5 MINUTE)
AND created_at > (CURRENT_TIMESTAMP() - INTERVAL 1 WEEK)
select
id,
run_id
from default.workflow_job -- FINAL not needed since we just use this to filter a table that has already been FINALed
where
status = 'queued'
and created_at < (CURRENT_TIMESTAMP() - interval 5 minute)
and created_at > (CURRENT_TIMESTAMP() - interval 1 week)
)
SELECT
DATE_DIFF(
'second',
job.created_at,
CURRENT_TIMESTAMP()
) AS queue_s,
CONCAT(workflow.name, ' / ', job.name) AS name,
job.html_url,
IF(
LENGTH(job.labels) = 0,
'N/A',

select
DATE_DIFF(
'second',
job.created_at,
CURRENT_TIMESTAMP()
) as queue_s,
CONCAT(workflow.name, ' / ', job.name) as name,
job.html_url,
IF(
LENGTH(job.labels) > 1,
job.labels[2],
job.labels[1]
)
) AS machine_type
FROM
default.workflow_job job final
JOIN default.workflow_run workflow final ON workflow.id = job.run_id
WHERE
job.id in (select id from possible_queued_jobs)
and workflow.id in (select run_id from possible_queued_jobs)
and workflow.repository.'full_name' = 'pytorch/pytorch'
AND job.status = 'queued'
/* These two conditions are workarounds for GitHub's broken API. Sometimes */
/* jobs get stuck in a permanently "queued" state but definitely ran. We can */
/* detect this by looking at whether any steps executed (if there were, */
/* obviously the job started running), and whether the workflow was marked as */
/* complete (somehow more reliable than the job-level API) */
AND LENGTH(job.steps) = 0
AND workflow.status != 'completed'
ORDER BY
queue_s DESC
LENGTH(job.labels) = 0,
'N/A',
IF(
LENGTH(job.labels) > 1,
job.labels[2],
job.labels[1]
)
) as machine_type
from
default.workflow_job job final
join default.workflow_run workflow final on workflow.id = job.run_id
where
job.id in (select id from possible_queued_jobs)
and workflow.id in (select run_id from possible_queued_jobs)
and workflow.repository.'full_name' = 'pytorch/pytorch'
and job.status = 'queued'
/* These two conditions are workarounds for GitHub's broken API. Sometimes */
/* jobs get stuck in a permanently "queued" state but definitely ran. We can */
/* detect this by looking at whether any steps executed (if there were, */
/* obviously the job started running), and whether the workflow was marked as */
/* complete (somehow more reliable than the job-level API) */
and LENGTH(job.steps) = 0
and workflow.status != 'completed'
order by
queue_s desc
settings allow_experimental_analyzer = 1;
110 changes: 57 additions & 53 deletions torchci/clickhouse_queries/queued_jobs_aggregate/query.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,66 +6,70 @@
--- additional runners to spin up.

with possible_queued_jobs as (
select id, run_id
from default.workflow_job
where
status = 'queued'
AND created_at < (
select
id,
run_id
from default.workflow_job
where
status = 'queued'
and created_at < (
-- Only consider jobs that have been queued for a significant period of time
CURRENT_TIMESTAMP() - INTERVAL 30 MINUTE
)
AND created_at > (
CURRENT_TIMESTAMP() - interval 30 minute
)
and created_at > (
-- Queued jobs are automatically cancelled after this long. Any allegedly pending
-- jobs older than this are actually bad data
CURRENT_TIMESTAMP() - INTERVAL 3 DAY
)
CURRENT_TIMESTAMP() - interval 3 day
)
),
queued_jobs as (
SELECT
DATE_DIFF(
'minute',
job.created_at,
CURRENT_TIMESTAMP()
) AS queue_m,
workflow.repository.owner.login as org,
workflow.repository.name as repo,
CONCAT(workflow.name, ' / ', job.name) AS name,
job.html_url,
IF(
LENGTH(job.labels) = 0,
'N/A',

queued_jobs as (
select
DATE_DIFF(
'minute',
job.created_at,
CURRENT_TIMESTAMP()
) as queue_m,
workflow.repository.owner.login as org,
workflow.repository.name as repo,
CONCAT(workflow.name, ' / ', job.name) as name,
job.html_url,
IF(
LENGTH(job.labels) > 1,
job.labels[2],
job.labels[1]
)
) AS runner_label
FROM
default.workflow_job job final
JOIN default.workflow_run workflow final ON workflow.id = job.run_id
WHERE
job.id in (select id from possible_queued_jobs)
and workflow.id in (select run_id from possible_queued_jobs)
and workflow.repository.owner.login in ('pytorch', 'pytorch-labs')
AND job.status = 'queued'
/* These two conditions are workarounds for GitHub's broken API. Sometimes */
/* jobs get stuck in a permanently "queued" state but definitely ran. We can */
/* detect this by looking at whether any steps executed (if there were, */
/* obviously the job started running), and whether the workflow was marked as */
/* complete (somehow more reliable than the job-level API) */
AND LENGTH(job.steps) = 0
AND workflow.status != 'completed'
ORDER BY
queue_m DESC
LENGTH(job.labels) = 0,
'N/A',
IF(
LENGTH(job.labels) > 1,
job.labels[2],
job.labels[1]
)
) as runner_label
from
default.workflow_job job final
join default.workflow_run workflow final on workflow.id = job.run_id
where
job.id in (select id from possible_queued_jobs)
and workflow.id in (select run_id from possible_queued_jobs)
and workflow.repository.owner.login in ('pytorch', 'pytorch-labs')
and job.status = 'queued'
/* These two conditions are workarounds for GitHub's broken API. Sometimes */
/* jobs get stuck in a permanently "queued" state but definitely ran. We can */
/* detect this by looking at whether any steps executed (if there were, */
/* obviously the job started running), and whether the workflow was marked as */
/* complete (somehow more reliable than the job-level API) */
and LENGTH(job.steps) = 0
and workflow.status != 'completed'
order by
queue_m desc
)

select
runner_label,
org,
repo,
count(*) as num_queued_jobs,
min(queue_m) as min_queue_time_minutes,
max(queue_m) as max_queue_time_minutes
runner_label,
org,
repo,
COUNT(*) as num_queued_jobs,
MIN(queue_m) as min_queue_time_minutes,
MAX(queue_m) as max_queue_time_minutes
from queued_jobs
group by runner_label, org, repo
order by max_queue_time_minutes desc
settings allow_experimental_analyzer = 1;
settings allow_experimental_analyzer = 1;
Loading

0 comments on commit d913008

Please sign in to comment.