Skip to content

Commit

Permalink
Merge branch 'main' into indices/run-states
Browse files Browse the repository at this point in the history
  • Loading branch information
phixMe authored Jul 17, 2023
2 parents 8546b0c + 52b70a7 commit f8e35d2
Showing 1 changed file with 59 additions and 55 deletions.
114 changes: 59 additions & 55 deletions api/src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,28 @@ void updateLastModifiedAt(

@SqlQuery(
"""
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
FROM datasets_view d
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
FROM tags AS t
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = d.uuid
LEFT JOIN (
SELECT
df.dataset_version_uuid,
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
FROM dataset_facets_view AS df
WHERE df.facet IS NOT NULL AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown')
GROUP BY df.dataset_version_uuid
) f ON f.dataset_version_uuid = d.current_version_uuid
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
""")
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
FROM datasets_view d
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
FROM tags AS t
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = d.uuid
LEFT JOIN (
SELECT
df.dataset_version_uuid,
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
FROM dataset_facets AS df
WHERE df.facet IS NOT NULL AND
(df.type ILIKE 'dataset' OR df.type ILIKE 'unknown') AND
df.dataset_uuid = (SELECT uuid FROM datasets WHERE name = :datasetName AND namespace_name = :namespaceName)
GROUP BY df.dataset_version_uuid
) f ON f.dataset_version_uuid = d.current_version_uuid
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
""")
Optional<Dataset> findDatasetByName(String namespaceName, String datasetName);

default Optional<Dataset> findWithTags(String namespaceName, String datasetName) {
Expand Down Expand Up @@ -119,28 +121,30 @@ default void setFields(Dataset ds) {

@SqlQuery(
"""
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
FROM datasets_view d
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
FROM tags AS t
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = d.uuid
LEFT JOIN (
SELECT
df.dataset_version_uuid,
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
FROM dataset_facets_view AS df
WHERE df.facet IS NOT NULL AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown')
GROUP BY df.dataset_version_uuid
) f ON f.dataset_version_uuid = d.current_version_uuid
WHERE d.namespace_name = :namespaceName
ORDER BY d.name
LIMIT :limit OFFSET :offset
""")
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
FROM datasets_view d
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
FROM tags AS t
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = d.uuid
LEFT JOIN (
SELECT
df.dataset_version_uuid,
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
FROM dataset_facets AS df
WHERE df.facet IS NOT NULL AND
(df.type ILIKE 'dataset' OR df.type ILIKE 'unknown') AND
df.dataset_uuid IN (SELECT uuid FROM datasets_view WHERE namespace_name = :namespaceName ORDER BY name LIMIT :limit OFFSET :offset)
GROUP BY df.dataset_version_uuid
) f ON f.dataset_version_uuid = d.current_version_uuid
WHERE d.namespace_name = :namespaceName
ORDER BY d.name
LIMIT :limit OFFSET :offset
""")
List<Dataset> findAll(String namespaceName, int limit, int offset);

@SqlQuery("SELECT count(*) FROM datasets_view")
Expand Down Expand Up @@ -249,23 +253,23 @@ DatasetRow upsert(

@SqlUpdate(
"""
UPDATE datasets d
SET is_hidden = true
FROM namespaces n
WHERE n.uuid=d.namespace_uuid
AND n.name=:namespaceName
""")
UPDATE datasets d
SET is_hidden = true
FROM namespaces n
WHERE n.uuid=d.namespace_uuid
AND n.name=:namespaceName
""")
void deleteByNamespaceName(String namespaceName);

@SqlQuery(
"""
UPDATE datasets d
SET is_hidden = true
FROM namespaces n
WHERE n.uuid = d.namespace_uuid
AND n.name=:namespaceName AND d.name=:name
RETURNING *
""")
UPDATE datasets d
SET is_hidden = true
FROM namespaces n
WHERE n.uuid = d.namespace_uuid
AND n.name=:namespaceName AND d.name=:name
RETURNING *
""")
Optional<DatasetRow> delete(String namespaceName, String name);

@Transaction
Expand Down

0 comments on commit f8e35d2

Please sign in to comment.