Skip to content

Commit

Permalink
[PERF] Refacto run dao sql query (#2685)
Browse files Browse the repository at this point in the history
* refacto run dao sql query

Signed-off-by: sophiely <ly.sophie200@gmail.com>

* add comments on filters

Signed-off-by: sophiely <ly.sophie200@gmail.com>

---------

Signed-off-by: sophiely <ly.sophie200@gmail.com>
  • Loading branch information
sophiely authored Nov 20, 2023
1 parent dddac31 commit 1019fb9
Showing 1 changed file with 76 additions and 46 deletions.
122 changes: 76 additions & 46 deletions api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,54 +140,84 @@ LEFT OUTER JOIN (

@SqlQuery(
"""
SELECT r.*, ra.args, f.facets,
j.namespace_name, j.name, jv.version AS job_version,
ri.input_versions, ro.output_versions, df.dataset_facets
FROM runs_view AS r
INNER JOIN jobs_view j ON r.job_uuid=j.uuid
LEFT JOIN LATERAL
(
SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets
FROM run_facets_view rf
WHERE rf.run_uuid=r.uuid
GROUP BY rf.run_uuid
) AS f ON r.uuid=f.run_uuid
LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid
LEFT OUTER JOIN job_versions jv ON jv.uuid=r.job_version_uuid
LEFT OUTER JOIN (
SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name,
'name', dv.dataset_name,
'version', dv.version,
'dataset_version_uuid', uuid
)) AS input_versions
FROM runs_input_mapping im
INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid
GROUP BY im.run_uuid
) ri ON ri.run_uuid=r.uuid
LEFT OUTER JOIN (
SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,
'name', dataset_name,
'version', version,
'dataset_version_uuid', uuid
)) AS output_versions
FROM dataset_versions
WITH filtered_jobs AS (
SELECT
jv.uuid,
jv.namespace_name,
jv.name
FROM jobs_view jv
WHERE jv.namespace_name=:namespace AND (jv.name=:jobName OR :jobName = ANY(jv.aliases))
),
run_facets_agg AS (
SELECT
run_uuid,
JSON_AGG(facet ORDER BY lineage_event_time ASC) AS facets
FROM run_facets_view
-- This filter here is used for performance purpose: we only aggregate the json of run_uuid that matters
WHERE
run_uuid IN (SELECT uuid FROM runs_view WHERE job_uuid IN (SELECT uuid FROM filtered_jobs))
GROUP BY run_uuid
) ro ON ro.run_uuid=r.uuid
LEFT OUTER JOIN (
),
input_versions_agg AS (
SELECT
im.run_uuid,
JSON_AGG(json_build_object('namespace', dv.namespace_name,
'name', dv.dataset_name,
'version', dv.version,
'dataset_version_uuid', dv.uuid
)) AS input_versions
FROM runs_input_mapping im
INNER JOIN dataset_versions dv ON im.dataset_version_uuid = dv.uuid
-- This filter here is used for performance purpose: we only aggregate the json of run_uuid that matters
WHERE
im.run_uuid IN (SELECT uuid FROM runs_view WHERE job_uuid IN (SELECT uuid FROM filtered_jobs))
GROUP BY im.run_uuid
),
output_versions_agg AS (
SELECT
dv.run_uuid,
JSON_AGG(json_build_object('namespace', namespace_name,
'name', dataset_name,
'version', version,
'dataset_version_uuid', uuid
)) AS output_versions
FROM dataset_versions dv
-- This filter here is used for performance purpose: we only aggregate the json of run_uuid that matters
WHERE dv.run_uuid IN (SELECT uuid FROM runs_view WHERE job_uuid IN (SELECT uuid FROM filtered_jobs))
GROUP BY dv.run_uuid
),
dataset_facets_agg AS (
SELECT
run_uuid,
JSON_AGG(json_build_object(
'dataset_version_uuid', dataset_version_uuid,
'name', name,
'type', type,
'facet', facet
) ORDER BY created_at ASC) as dataset_facets
FROM dataset_facets_view
-- This filter here is used for performance purpose: we only aggregate the json of run_uuid that matters
WHERE run_uuid IN (SELECT uuid FROM runs_view WHERE job_uuid IN (SELECT uuid FROM filtered_jobs))
AND (type ILIKE 'output' OR type ILIKE 'input')
GROUP BY run_uuid
)
SELECT
run_uuid,
JSON_AGG(json_build_object(
'dataset_version_uuid', dataset_version_uuid,
'name', name,
'type', type,
'facet', facet
) ORDER BY created_at ASC) as dataset_facets
FROM dataset_facets_view
WHERE (type ILIKE 'output' OR type ILIKE 'input')
GROUP BY run_uuid
) AS df ON r.uuid = df.run_uuid
WHERE j.namespace_name=:namespace AND (j.name=:jobName OR :jobName = ANY(j.aliases))
ORDER BY STARTED_AT DESC NULLS LAST
r.*,
ra.args,
f.facets,
jv.version AS job_version,
ri.input_versions,
ro.output_versions,
df.dataset_facets
FROM runs_view r
INNER JOIN filtered_jobs fj ON r.job_uuid = fj.uuid
LEFT JOIN run_facets_agg f ON r.uuid = f.run_uuid
LEFT JOIN run_args ra ON ra.uuid = r.run_args_uuid
LEFT JOIN job_versions jv ON jv.uuid = r.job_version_uuid
LEFT JOIN input_versions_agg ri ON r.uuid = ri.run_uuid
LEFT JOIN output_versions_agg ro ON r.uuid = ro.run_uuid
LEFT JOIN dataset_facets_agg df ON r.uuid = df.run_uuid
ORDER BY r.started_at DESC NULLS LAST
LIMIT :limit OFFSET :offset
""")
List<Run> findAll(String namespace, String jobName, int limit, int offset);
Expand Down

0 comments on commit 1019fb9

Please sign in to comment.