Skip to content

Commit

Permalink
implement inputFacets & outputFacets
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Mar 2, 2023
1 parent 8d28ed5 commit ce7d31e
Show file tree
Hide file tree
Showing 22 changed files with 649 additions and 93 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
* Improve dataset facets access [`#2407`](https://github.com/MarquezProject/marquez/pull/2407) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Improves database query performance for accessing datasets and datasets' versions.*

### Added

* Support `inputFacets` and `outputFacets` from Openlineage specificatio [`#2417`](https://github.com/MarquezProject/marquez/pull/2417) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*Adds the ability to store `inputFacets` / `outputFacets` which are sent within datasets.*
*Expose them through Marquez API as a member of `Run` resource.*

## [0.30.0](https://github.com/MarquezProject/marquez/compare/0.29.0...0.30.0) - 2023-01-31

### Added
Expand Down
34 changes: 34 additions & 0 deletions api/src/main/java/marquez/common/models/RunDatasetFacets.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.common.models;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;

/**
* Class used to store `inputFacets` and `outputFacets` which are assigned to datasets within
* OpenLineage spec, but are exposed within Marquez api as a part of {@link
* marquez.service.models.Run}
*/
@EqualsAndHashCode
@ToString
@Getter
public class RunDatasetFacets {

private final DatasetVersionId datasetVersionId;
private final ImmutableMap<String, Object> facets;

public RunDatasetFacets(
@JsonProperty("datasetVersionId") @NonNull DatasetVersionId datasetVersionId,
@JsonProperty("facets") @NonNull ImmutableMap<String, Object> facets) {
this.datasetVersionId = datasetVersionId;
this.facets = facets;
}
}
1 change: 1 addition & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private Columns() {}
public static final String NAMESPACE_NAME = "namespace_name";
public static final String DATASET_NAME = "dataset_name";
public static final String FACETS = "facets";
public static final String DATASET_FACETS = "dataset_facets";
public static final String TAGS = "tags";
public static final String IS_HIDDEN = "is_hidden";

Expand Down
4 changes: 2 additions & 2 deletions api/src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ LEFT JOIN (
df.dataset_version_uuid,
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
FROM dataset_facets_view AS df
WHERE df.facet IS NOT NULL
WHERE df.facet IS NOT NULL AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown')
GROUP BY df.dataset_version_uuid
) f ON f.dataset_version_uuid = d.current_version_uuid
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
Expand Down Expand Up @@ -134,7 +134,7 @@ LEFT JOIN (
df.dataset_version_uuid,
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
FROM dataset_facets_view AS df
WHERE df.facet IS NOT NULL
WHERE df.facet IS NOT NULL AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown')
GROUP BY df.dataset_version_uuid
) f ON f.dataset_version_uuid = d.current_version_uuid
WHERE d.namespace_name = :namespaceName
Expand Down
52 changes: 52 additions & 0 deletions api/src/main/java/marquez/db/DatasetFacetsDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,58 @@ default void insertDatasetFacetsFor(
FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName))));
}

default void insertInputDatasetFacetsFor(
@NonNull UUID datasetUuid,
@NonNull UUID datasetVersionUuid,
@NonNull UUID runUuid,
@NonNull Instant lineageEventTime,
@NonNull String lineageEventType,
@NonNull LineageEvent.InputDatasetFacets inputFacets) {
final Instant now = Instant.now();

JsonNode jsonNode = Utils.getMapper().valueToTree(inputFacets);
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false)
.forEach(
fieldName ->
insertDatasetFacet(
now,
datasetUuid,
datasetVersionUuid,
runUuid,
lineageEventTime,
lineageEventType,
Type.INPUT,
fieldName,
FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName))));
}

default void insertOutputDatasetFacetsFor(
@NonNull UUID datasetUuid,
@NonNull UUID datasetVersionUuid,
@NonNull UUID runUuid,
@NonNull Instant lineageEventTime,
@NonNull String lineageEventType,
@NonNull LineageEvent.OutputDatasetFacets outputFacets) {
final Instant now = Instant.now();

JsonNode jsonNode = Utils.getMapper().valueToTree(outputFacets);
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false)
.forEach(
fieldName ->
insertDatasetFacet(
now,
datasetUuid,
datasetVersionUuid,
runUuid,
lineageEventTime,
lineageEventType,
Type.OUTPUT,
fieldName,
FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName))));
}

record DatasetFacetRow(
Instant createdAt,
UUID datasetUuid,
Expand Down
49 changes: 32 additions & 17 deletions api/src/main/java/marquez/db/DatasetVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,19 @@ default void updateDatasetVersionMetric(

@SqlQuery(
"""
WITH selected_dataset_versions AS (
SELECT dv.*
FROM dataset_versions dv
WHERE dv.version = :version
), selected_dataset_version_facets AS (
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet
FROM selected_dataset_versions dv
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid
)
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
t.tags, f.facets
FROM dataset_versions dv
FROM selected_dataset_versions dv
LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
Expand All @@ -169,21 +178,28 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = dv.dataset_uuid
LEFT JOIN (
SELECT dvf.dataset_version_uuid,
JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM dataset_facets_view dvf
GROUP BY dataset_version_uuid
) f ON f.dataset_version_uuid = dv.uuid
WHERE dv.version = :version
""")
SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM selected_dataset_version_facets dvf
WHERE dvf.run_uuid = dvf.run_uuid
GROUP BY dvf.uuid
) f ON f.dataset_uuid = dv.uuid""")
Optional<DatasetVersion> findBy(UUID version);

@SqlQuery(
"""
WITH selected_dataset_versions AS (
SELECT dv.*
FROM dataset_versions dv
WHERE dv.uuid = :uuid
), selected_dataset_version_facets AS (
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet
FROM selected_dataset_versions dv
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown')
)
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
t.tags, f.facets
FROM dataset_versions dv
FROM selected_dataset_versions dv
LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
Expand All @@ -192,14 +208,12 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = dv.dataset_uuid
LEFT JOIN (
SELECT dvf.dataset_version_uuid,
JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM dataset_facets_view dvf
GROUP BY dataset_version_uuid
) f ON f.dataset_version_uuid = dv.uuid
WHERE dv.uuid = :uuid
""")
LEFT JOIN (
SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM selected_dataset_version_facets dvf
WHERE dvf.run_uuid = dvf.run_uuid
GROUP BY dvf.uuid
) f ON f.dataset_uuid = dv.uuid""")
Optional<DatasetVersion> findByUuid(UUID uuid);

default Optional<DatasetVersion> findByWithRun(UUID version) {
Expand Down Expand Up @@ -246,6 +260,7 @@ LEFT JOIN (
SELECT dvf.dataset_version_uuid,
JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM dataset_facets_view dvf
WHERE (type ILIKE 'dataset' OR type ILIKE 'unknown')
GROUP BY dataset_version_uuid
) f ON f.dataset_version_uuid = dv.uuid
WHERE dv.namespace_name = :namespaceName
Expand Down
24 changes: 24 additions & 0 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,18 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
now,
event.getEventType(),
facets));

// InputFacets ...
Optional.ofNullable(dataset.getInputFacets())
.ifPresent(
facets ->
datasetFacetsDao.insertInputDatasetFacetsFor(
record.getDatasetRow().getUuid(),
record.getDatasetVersionRow().getUuid(),
runUuid,
now,
event.getEventType(),
facets));
}
}
bag.setInputs(Optional.ofNullable(datasetInputs));
Expand Down Expand Up @@ -325,6 +337,18 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
now,
event.getEventType(),
facets));

// OutputFacets ...
Optional.ofNullable(dataset.getOutputFacets())
.ifPresent(
facets ->
datasetFacetsDao.insertOutputDatasetFacetsFor(
record.getDatasetRow().getUuid(),
record.getDatasetVersionRow().getUuid(),
runUuid,
now,
event.getEventType(),
facets));
}
}

Expand Down
95 changes: 65 additions & 30 deletions api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,33 +74,51 @@ public interface RunDao extends BaseDao {
void updateEndState(UUID rowUuid, Instant transitionedAt, UUID endRunStateUuid);

String BASE_FIND_RUN_SQL =
"SELECT r.*, ra.args, f.facets,\n"
+ "jv.version AS job_version,\n"
+ "ri.input_versions, ro.output_versions\n"
+ "FROM runs_view AS r\n"
+ "LEFT OUTER JOIN\n"
+ "(\n"
+ " SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets\n"
+ " FROM run_facets_view rf\n"
+ " GROUP BY rf.run_uuid\n"
+ ") AS f ON r.uuid=f.run_uuid\n"
+ "LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid\n"
+ "LEFT OUTER JOIN job_versions jv ON jv.uuid=r.job_version_uuid\n"
+ "LEFT OUTER JOIN (\n"
+ " SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name,\n"
+ " 'name', dv.dataset_name,\n"
+ " 'version', dv.version)) AS input_versions\n"
+ " FROM runs_input_mapping im\n"
+ " INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid\n"
+ " GROUP BY im.run_uuid\n"
+ ") ri ON ri.run_uuid=r.uuid\n"
+ "LEFT OUTER JOIN (\n"
+ " SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,\n"
+ " 'name', dataset_name,\n"
+ " 'version', version)) AS output_versions\n"
+ " FROM dataset_versions\n"
+ " GROUP BY run_uuid\n"
+ ") ro ON ro.run_uuid=r.uuid\n";
"""
SELECT r.*, ra.args, f.facets,
jv.version AS job_version,
ri.input_versions, ro.output_versions, df.dataset_facets
FROM runs_view AS r
LEFT OUTER JOIN
(
SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets
FROM run_facets_view rf
GROUP BY rf.run_uuid
) AS f ON r.uuid=f.run_uuid
LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid
LEFT OUTER JOIN job_versions jv ON jv.uuid=r.job_version_uuid
LEFT OUTER JOIN (
SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name,
'name', dv.dataset_name,
'version', dv.version,
'dataset_version_uuid', uuid)) AS input_versions
FROM runs_input_mapping im
INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid
GROUP BY im.run_uuid
) ri ON ri.run_uuid=r.uuid
LEFT OUTER JOIN (
SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,
'name', dataset_name,
'version', version,
'dataset_version_uuid', uuid
)) AS output_versions
FROM dataset_versions
GROUP BY run_uuid
) ro ON ro.run_uuid=r.uuid
LEFT OUTER JOIN (
SELECT
run_uuid,
JSON_AGG(json_build_object(
'dataset_version_uuid', dataset_version_uuid,
'name', name,
'type', type,
'facet', facet
) ORDER BY created_at ASC) as dataset_facets
FROM dataset_facets_view
WHERE (type ILIKE 'output' OR type ILIKE 'input')
GROUP BY run_uuid
) AS df ON r.uuid = df.run_uuid
""";

@SqlQuery(BASE_FIND_RUN_SQL + "WHERE r.uuid = :runUuid")
Optional<Run> findRunByUuid(UUID runUuid);
Expand All @@ -123,7 +141,7 @@ public interface RunDao extends BaseDao {
"""
SELECT r.*, ra.args, f.facets,
j.namespace_name, j.name, jv.version AS job_version,
ri.input_versions, ro.output_versions
ri.input_versions, ro.output_versions, df.dataset_facets
FROM runs_view AS r
INNER JOIN jobs_view j ON r.job_uuid=j.uuid
LEFT JOIN LATERAL
Expand All @@ -138,18 +156,35 @@ SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS fac
LEFT OUTER JOIN (
SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name,
'name', dv.dataset_name,
'version', dv.version)) AS input_versions
'version', dv.version,
'dataset_version_uuid', uuid
)) AS input_versions
FROM runs_input_mapping im
INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid
GROUP BY im.run_uuid
) ri ON ri.run_uuid=r.uuid
LEFT OUTER JOIN (
SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,
'name', dataset_name,
'version', version)) AS output_versions
'version', version,
'dataset_version_uuid', uuid
)) AS output_versions
FROM dataset_versions
GROUP BY run_uuid
) ro ON ro.run_uuid=r.uuid
LEFT OUTER JOIN (
SELECT
run_uuid,
JSON_AGG(json_build_object(
'dataset_version_uuid', dataset_version_uuid,
'name', name,
'type', type,
'facet', facet
) ORDER BY created_at ASC) as dataset_facets
FROM dataset_facets_view
WHERE (type ILIKE 'output' OR type ILIKE 'input')
GROUP BY run_uuid
) AS df ON r.uuid = df.run_uuid
WHERE j.namespace_name=:namespace AND (j.name=:jobName OR :jobName = ANY(j.aliases))
ORDER BY STARTED_AT DESC NULLS LAST
LIMIT :limit OFFSET :offset
Expand Down
Loading

0 comments on commit ce7d31e

Please sign in to comment.