Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve dataset facets access #2407

Merged
merged 3 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.30.0...HEAD)

* Improve dataset facets access [`#2407`](https://github.com/MarquezProject/marquez/pull/2407) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Improves database query performance for accessing datasets and datasets' versions.*

## [0.30.0](https://github.com/MarquezProject/marquez/compare/0.29.0...0.30.0) - 2023-01-31

### Added
Expand Down
82 changes: 22 additions & 60 deletions api/src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,30 +71,8 @@ void updateLastModifiedAt(

@SqlQuery(
"""
WITH selected_datasets AS (
SELECT d.*
FROM datasets_view d
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
), dataset_runs AS (
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, lineage_event_time, facet
FROM selected_datasets d
INNER JOIN dataset_versions AS dv ON dv.uuid = d.current_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid
) df ON df.run_uuid = dv.run_uuid
UNION
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
LEFT JOIN LATERAL (
SELECT dataset_uuid, run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid AND run_uuid = rim.run_uuid
) df ON df.run_uuid = rim.run_uuid
)
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
FROM selected_datasets d
FROM datasets_view d
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
Expand All @@ -104,11 +82,15 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = d.uuid
LEFT JOIN (
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(d2.facet ORDER BY d2.lineage_event_time ASC) AS facets
FROM dataset_runs AS d2
WHERE d2.run_uuid = d2.run_uuid AND d2.facet IS NOT NULL
GROUP BY d2.uuid
) f ON f.dataset_uuid = d.uuid""")
SELECT
df.dataset_version_uuid,
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
FROM dataset_facets_view AS df
WHERE df.facet IS NOT NULL
GROUP BY df.dataset_version_uuid
) f ON f.dataset_version_uuid = d.current_version_uuid
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
""")
Optional<Dataset> findDatasetByName(String namespaceName, String datasetName);

default Optional<Dataset> findWithTags(String namespaceName, String datasetName) {
Expand Down Expand Up @@ -137,32 +119,8 @@ default void setFields(Dataset ds) {

@SqlQuery(
"""
WITH selected_datasets AS (
SELECT d.*
FROM datasets_view d
WHERE d.namespace_name = :namespaceName
ORDER BY d.name
LIMIT :limit OFFSET :offset
), dataset_runs AS (
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, lineage_event_time, facet
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid
) df ON df.run_uuid = dv.run_uuid
UNION
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid
) df ON df.run_uuid = rim.run_uuid
)
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
FROM selected_datasets d
FROM datasets_view d
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
Expand All @@ -172,13 +130,17 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = d.uuid
LEFT JOIN (
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(d2.facet ORDER BY d2.lineage_event_time ASC) AS facets
FROM dataset_runs AS d2
WHERE d2.run_uuid = d2.run_uuid
AND d2.facet IS NOT NULL
GROUP BY d2.uuid
) f ON f.dataset_uuid = d.uuid
ORDER BY d.name""")
SELECT
df.dataset_version_uuid,
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
FROM dataset_facets_view AS df
WHERE df.facet IS NOT NULL
GROUP BY df.dataset_version_uuid
) f ON f.dataset_version_uuid = d.current_version_uuid
WHERE d.namespace_name = :namespaceName
ORDER BY d.name
LIMIT :limit OFFSET :offset
""")
List<Dataset> findAll(String namespaceName, int limit, int offset);

@SqlQuery("SELECT count(*) FROM datasets_view")
Expand Down
78 changes: 28 additions & 50 deletions api/src/main/java/marquez/db/DatasetVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,10 @@ default void updateDatasetVersionMetric(

@SqlQuery(
"""
WITH selected_dataset_versions AS (
SELECT dv.*
FROM dataset_versions dv
WHERE dv.version = :version
), selected_dataset_version_facets AS (
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet
FROM selected_dataset_versions dv
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid
)
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
t.tags, f.facets
FROM selected_dataset_versions dv
FROM dataset_versions dv
LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
Expand All @@ -178,28 +169,21 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = dv.dataset_uuid
LEFT JOIN (
SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM selected_dataset_version_facets dvf
WHERE dvf.run_uuid = dvf.run_uuid
GROUP BY dvf.uuid
) f ON f.dataset_uuid = dv.uuid""")
SELECT dvf.dataset_version_uuid,
JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM dataset_facets_view dvf
GROUP BY dataset_version_uuid
) f ON f.dataset_version_uuid = dv.uuid
WHERE dv.version = :version
""")
Optional<DatasetVersion> findBy(UUID version);

@SqlQuery(
"""
WITH selected_dataset_versions AS (
SELECT dv.*
FROM dataset_versions dv
WHERE dv.uuid = :uuid
), selected_dataset_version_facets AS (
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet
FROM selected_dataset_versions dv
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid
)
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
t.tags, f.facets
FROM selected_dataset_versions dv
FROM dataset_versions dv
LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
Expand All @@ -208,12 +192,14 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = dv.dataset_uuid
LEFT JOIN (
SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM selected_dataset_version_facets dvf
WHERE dvf.run_uuid = dvf.run_uuid
GROUP BY dvf.uuid
) f ON f.dataset_uuid = dv.uuid""")
LEFT JOIN (
SELECT dvf.dataset_version_uuid,
JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM dataset_facets_view dvf
GROUP BY dataset_version_uuid
) f ON f.dataset_version_uuid = dv.uuid
WHERE dv.uuid = :uuid
""")
Optional<DatasetVersion> findByUuid(UUID uuid);

default Optional<DatasetVersion> findByWithRun(UUID version) {
Expand Down Expand Up @@ -244,22 +230,10 @@ default Optional<DatasetVersion> findByWithRun(UUID version) {

@SqlQuery(
"""
WITH selected_dataset_versions AS (
SELECT dv.*
FROM dataset_versions dv
WHERE dv.namespace_name = :namespaceName
AND dv.dataset_name = :datasetName
ORDER BY dv.created_at DESC
LIMIT :limit OFFSET :offset
), selected_dataset_version_facets AS (
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet
FROM selected_dataset_versions dv
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid
)
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
t.tags, f.facets
FROM selected_dataset_versions dv
FROM dataset_versions dv
LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
Expand All @@ -269,12 +243,16 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = dv.dataset_uuid
LEFT JOIN (
SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM selected_dataset_version_facets dvf
WHERE dvf.run_uuid = dvf.run_uuid
GROUP BY dvf.uuid
) f ON f.dataset_uuid = dv.uuid
ORDER BY dv.created_at DESC""")
SELECT dvf.dataset_version_uuid,
JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM dataset_facets_view dvf
GROUP BY dataset_version_uuid
) f ON f.dataset_version_uuid = dv.uuid
WHERE dv.namespace_name = :namespaceName
AND dv.dataset_name = :datasetName
ORDER BY dv.created_at DESC
LIMIT :limit OFFSET :offset
""")
List<DatasetVersion> findAll(String namespaceName, String datasetName, int limit, int offset);

default List<DatasetVersion> findAllWithRun(
Expand Down