From 1dd07a1e9f68c4b6c468363c4378b60563fa391e Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Mon, 6 Feb 2023 13:26:16 +0100 Subject: [PATCH] improve dataset facets access Signed-off-by: Pawel Leszczynski --- CHANGELOG.md | 3 + api/src/main/java/marquez/db/DatasetDao.java | 82 +++++-------------- .../java/marquez/db/DatasetVersionDao.java | 78 +++++++----------- 3 files changed, 53 insertions(+), 110 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 30199f2e0f..9937f40f62 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.30.0...HEAD) +* Improve dataset facets access [`#2407`](https://github.com/MarquezProject/marquez/pull/2407) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) + * Improves database query performance for accessing datasets and datasets' versions.* + ## [0.30.0](https://github.com/MarquezProject/marquez/compare/0.29.0...0.30.0) - 2023-01-31 ### Added diff --git a/api/src/main/java/marquez/db/DatasetDao.java b/api/src/main/java/marquez/db/DatasetDao.java index 7439813d63..64da665b3f 100644 --- a/api/src/main/java/marquez/db/DatasetDao.java +++ b/api/src/main/java/marquez/db/DatasetDao.java @@ -71,30 +71,8 @@ void updateLastModifiedAt( @SqlQuery( """ - WITH selected_datasets AS ( - SELECT d.* - FROM datasets_view d - WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks) - ), dataset_runs AS ( - SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, lineage_event_time, facet - FROM selected_datasets d - INNER JOIN dataset_versions AS dv ON dv.uuid = d.current_version_uuid - LEFT JOIN LATERAL ( - SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view - WHERE dataset_uuid = dv.dataset_uuid - ) df ON df.run_uuid = dv.run_uuid - UNION - SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet - FROM selected_datasets d - INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid - LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid - LEFT JOIN LATERAL ( - SELECT dataset_uuid, run_uuid, lineage_event_time, facet FROM dataset_facets_view - WHERE dataset_uuid = dv.dataset_uuid AND run_uuid = rim.run_uuid - ) df ON df.run_uuid = rim.run_uuid - ) SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets - FROM selected_datasets d + FROM datasets_view d LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid LEFT JOIN ( @@ -104,11 +82,15 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid GROUP BY m.dataset_uuid ) t ON t.dataset_uuid = d.uuid LEFT JOIN ( - SELECT d2.uuid AS dataset_uuid, JSONB_AGG(d2.facet ORDER BY d2.lineage_event_time ASC) AS facets - FROM dataset_runs AS d2 - WHERE d2.run_uuid = d2.run_uuid AND d2.facet IS NOT NULL - GROUP BY d2.uuid - ) f ON f.dataset_uuid = d.uuid""") + SELECT + df.dataset_version_uuid, + JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets + FROM dataset_facets_view AS df + WHERE df.facet IS NOT NULL + GROUP BY df.dataset_version_uuid + ) f ON f.dataset_version_uuid = d.current_version_uuid + WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks) + """) Optional findDatasetByName(String namespaceName, String datasetName); default Optional findWithTags(String namespaceName, String datasetName) { @@ -137,32 +119,8 @@ default void setFields(Dataset ds) { @SqlQuery( """ - WITH selected_datasets AS ( - SELECT d.* - FROM datasets_view d - WHERE d.namespace_name = :namespaceName - ORDER BY d.name - LIMIT :limit OFFSET :offset - ), dataset_runs AS ( - SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, lineage_event_time, facet - FROM selected_datasets d - INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid - LEFT JOIN LATERAL ( - SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view - WHERE dataset_uuid = dv.dataset_uuid - ) df ON df.run_uuid = dv.run_uuid - UNION - SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet - FROM selected_datasets d - INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid - LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid - LEFT JOIN LATERAL ( - SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view - WHERE dataset_uuid = dv.dataset_uuid - ) df ON df.run_uuid = rim.run_uuid - ) SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets - FROM selected_datasets d + FROM datasets_view d LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid LEFT JOIN ( @@ -172,13 +130,17 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid GROUP BY m.dataset_uuid ) t ON t.dataset_uuid = d.uuid LEFT JOIN ( - SELECT d2.uuid AS dataset_uuid, JSONB_AGG(d2.facet ORDER BY d2.lineage_event_time ASC) AS facets - FROM dataset_runs AS d2 - WHERE d2.run_uuid = d2.run_uuid - AND d2.facet IS NOT NULL - GROUP BY d2.uuid - ) f ON f.dataset_uuid = d.uuid - ORDER BY d.name""") + SELECT + df.dataset_version_uuid, + JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets + FROM dataset_facets_view AS df + WHERE df.facet IS NOT NULL + GROUP BY df.dataset_version_uuid + ) f ON f.dataset_version_uuid = d.current_version_uuid + WHERE d.namespace_name = :namespaceName + ORDER BY d.name + LIMIT :limit OFFSET :offset + """) List findAll(String namespaceName, int limit, int offset); @SqlQuery("SELECT count(*) FROM datasets_view") diff --git a/api/src/main/java/marquez/db/DatasetVersionDao.java b/api/src/main/java/marquez/db/DatasetVersionDao.java index 00f26eb0b6..f08b2f3903 100644 --- a/api/src/main/java/marquez/db/DatasetVersionDao.java +++ b/api/src/main/java/marquez/db/DatasetVersionDao.java @@ -156,19 +156,10 @@ default void updateDatasetVersionMetric( @SqlQuery( """ - WITH selected_dataset_versions AS ( - SELECT dv.* - FROM dataset_versions dv - WHERE dv.version = :version - ), selected_dataset_version_facets AS ( - SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet - FROM selected_dataset_versions dv - LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid - ) SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location, t.tags, f.facets - FROM selected_dataset_versions dv + FROM dataset_versions dv LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid LEFT JOIN ( @@ -178,28 +169,21 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid GROUP BY m.dataset_uuid ) t ON t.dataset_uuid = dv.dataset_uuid LEFT JOIN ( - SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets - FROM selected_dataset_version_facets dvf - WHERE dvf.run_uuid = dvf.run_uuid - GROUP BY dvf.uuid - ) f ON f.dataset_uuid = dv.uuid""") + SELECT dvf.dataset_version_uuid, + JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets + FROM dataset_facets_view dvf + GROUP BY dataset_version_uuid + ) f ON f.dataset_version_uuid = dv.uuid + WHERE dv.version = :version + """) Optional findBy(UUID version); @SqlQuery( """ - WITH selected_dataset_versions AS ( - SELECT dv.* - FROM dataset_versions dv - WHERE dv.uuid = :uuid - ), selected_dataset_version_facets AS ( - SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet - FROM selected_dataset_versions dv - LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid - ) SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location, t.tags, f.facets - FROM selected_dataset_versions dv + FROM dataset_versions dv LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid LEFT JOIN ( @@ -208,12 +192,14 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid GROUP BY m.dataset_uuid ) t ON t.dataset_uuid = dv.dataset_uuid - LEFT JOIN ( - SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets - FROM selected_dataset_version_facets dvf - WHERE dvf.run_uuid = dvf.run_uuid - GROUP BY dvf.uuid - ) f ON f.dataset_uuid = dv.uuid""") + LEFT JOIN ( + SELECT dvf.dataset_version_uuid, + JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets + FROM dataset_facets_view dvf + GROUP BY dataset_version_uuid + ) f ON f.dataset_version_uuid = dv.uuid + WHERE dv.uuid = :uuid + """) Optional findByUuid(UUID uuid); default Optional findByWithRun(UUID version) { @@ -244,22 +230,10 @@ default Optional findByWithRun(UUID version) { @SqlQuery( """ - WITH selected_dataset_versions AS ( - SELECT dv.* - FROM dataset_versions dv - WHERE dv.namespace_name = :namespaceName - AND dv.dataset_name = :datasetName - ORDER BY dv.created_at DESC - LIMIT :limit OFFSET :offset - ), selected_dataset_version_facets AS ( - SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet - FROM selected_dataset_versions dv - LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid - ) SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state, dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location, t.tags, f.facets - FROM selected_dataset_versions dv + FROM dataset_versions dv LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid LEFT JOIN ( @@ -269,12 +243,16 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid GROUP BY m.dataset_uuid ) t ON t.dataset_uuid = dv.dataset_uuid LEFT JOIN ( - SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets - FROM selected_dataset_version_facets dvf - WHERE dvf.run_uuid = dvf.run_uuid - GROUP BY dvf.uuid - ) f ON f.dataset_uuid = dv.uuid - ORDER BY dv.created_at DESC""") + SELECT dvf.dataset_version_uuid, + JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets + FROM dataset_facets_view dvf + GROUP BY dataset_version_uuid + ) f ON f.dataset_version_uuid = dv.uuid + WHERE dv.namespace_name = :namespaceName + AND dv.dataset_name = :datasetName + ORDER BY dv.created_at DESC + LIMIT :limit OFFSET :offset + """) List findAll(String namespaceName, String datasetName, int limit, int offset); default List findAllWithRun(