Skip to content

Commit

Permalink
OL facets - PR1 - create & feed new tables while not reading them
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Jan 12, 2023
1 parent 9a481bd commit c89504d
Show file tree
Hide file tree
Showing 32 changed files with 1,580 additions and 46 deletions.
9 changes: 9 additions & 0 deletions api/src/main/java/marquez/db/BaseDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,13 @@ public interface BaseDao extends SqlObject {

@CreateSqlObject
ColumnLineageDao createColumnLineageDao();

@CreateSqlObject
DatasetFacetsDao createDatasetFacetsDao();

@CreateSqlObject
JobFacetsDao createJobFacetsDao();

@CreateSqlObject
RunFacetsDao createRunFacetsDao();
}
14 changes: 14 additions & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import lombok.extern.slf4j.Slf4j;
import marquez.common.Utils;
import org.postgresql.util.PGInterval;
import org.postgresql.util.PGobject;

@Slf4j
public final class Columns {
Expand Down Expand Up @@ -299,4 +300,17 @@ public static ImmutableMap<String, String> mapOrNull(final ResultSet results, fi
final String mapAsString = results.getString(column);
return Utils.fromJson(mapAsString, new TypeReference<>() {});
}

public static PGobject toPgObject(@NonNull final Object object) {
final PGobject jsonObject = new PGobject();
jsonObject.setType("jsonb");
final String json = Utils.toJson(object);
try {
jsonObject.setValue(json);
} catch (SQLException e) {
log.error("Error when ...", e);
return null;
}
return jsonObject;
}
}
161 changes: 161 additions & 0 deletions api/src/main/java/marquez/db/DatasetFacetsDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db;

import com.fasterxml.jackson.databind.JsonNode;
import java.time.Instant;
import java.util.Arrays;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.UUID;
import java.util.stream.StreamSupport;
import lombok.NonNull;
import marquez.common.Utils;
import marquez.service.models.LineageEvent;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.postgresql.util.PGobject;

/** The DAO for {@code dataset} facets. */
public interface DatasetFacetsDao {
/* An {@code enum} used ... */
enum Type {
DATASET,
INPUT,
OUTPUT,
UNKNOWN;
}

/* An {@code enum} used to determine the dataset facet. */
enum DatasetFacet {
DOCUMENTATION(Type.DATASET, "documentation"),
DESCRIPTION(Type.DATASET, "description"),
SCHEMA(Type.DATASET, "schema"),
DATASOURCE(Type.DATASET, "dataSource"),
LIFECYCLE_STATE_CHANGE(Type.DATASET, "lifecycleStateChange"),
VERSION(Type.DATASET, "version"),
COLUMN_LINEAGE(Type.DATASET, "columnLineage"),
OWNERSHIP(Type.DATASET, "ownership"),
DATA_QUALITY_METRICS(Type.INPUT, "dataQualityMetrics"),
DATA_QUALITY_ASSERTIONS(Type.INPUT, "dataQualityAssertions"),
OUTPUT_STATISTICS(Type.OUTPUT, "outputStatistics");

final Type type;
final String name;

DatasetFacet(@NonNull final Type type, @NonNull final String name) {
this.type = type;
this.name = name;
}

Type getType() {
return type;
}

String getName() {
return name;
}

/** ... */
public static Type typeFromName(@NonNull final String name) {
return Arrays.stream(DatasetFacet.values())
.filter(facet -> facet.getName().equalsIgnoreCase(name))
.map(facet -> facet.getType())
.findFirst()
.orElse(Type.UNKNOWN);
}
}

/**
* @param uuid
* @param createdAt
* @param datasetUuid
* @param runUuid
* @param lineageEventTime
* @param lineageEventType
* @param type
* @param name
* @param facet
*/
@SqlUpdate(
"""
INSERT INTO dataset_facets (
uuid,
created_at,
dataset_uuid,
run_uuid,
lineage_event_time,
lineage_event_type,
type,
name,
facet
) VALUES (
:uuid,
:createdAt,
:datasetUuid,
:runUuid,
:lineageEventTime,
:lineageEventType,
:type,
:name,
:facet
)
""")
void insertDatasetFacet(
UUID uuid,
Instant createdAt,
UUID datasetUuid,
UUID runUuid,
Instant lineageEventTime,
String lineageEventType,
Type type,
String name,
PGobject facet);

/**
* @param datasetUuid
* @param runUuid
* @param lineageEventTime
* @param lineageEventType
* @param datasetFacets
*/
@Transaction
default void insertDatasetFacetsFor(
@NonNull UUID datasetUuid,
@NonNull UUID runUuid,
@NonNull Instant lineageEventTime,
@NonNull String lineageEventType,
@NonNull LineageEvent.DatasetFacets datasetFacets) {
final Instant now = Instant.now();

JsonNode jsonNode = Utils.getMapper().valueToTree(datasetFacets);
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false)
.forEach(
fieldName ->
insertDatasetFacet(
UUID.randomUUID(),
now,
datasetUuid,
runUuid,
lineageEventTime,
lineageEventType,
DatasetFacet.typeFromName(fieldName),
fieldName,
FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName))));
}

record DatasetFacetRow(
UUID uuid,
Instant createdAt,
UUID datasetUuid,
UUID runUuid,
Instant lineageEventTime,
String lineageEventType,
DatasetFacetsDao.Type type,
String name,
PGobject facet) {}
}
24 changes: 24 additions & 0 deletions api/src/main/java/marquez/db/FacetUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db;

import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.NonNull;
import marquez.common.Utils;
import org.postgresql.util.PGobject;

public class FacetUtils {

static ObjectNode asJson(@NonNull final String facetName, @NonNull Object facetValue) {
final ObjectNode facetAsJson = Utils.getMapper().createObjectNode();
facetAsJson.putPOJO(facetName, facetValue);
return facetAsJson;
}

static PGobject toPgObject(String name, Object o) {
return Columns.toPgObject(asJson(name, o));
}
}
90 changes: 90 additions & 0 deletions api/src/main/java/marquez/db/JobFacetsDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db;

import com.fasterxml.jackson.databind.JsonNode;
import java.time.Instant;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.UUID;
import java.util.stream.StreamSupport;
import lombok.NonNull;
import marquez.common.Utils;
import marquez.service.models.LineageEvent;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.postgresql.util.PGobject;

/** The DAO for {@code job} facets. */
public interface JobFacetsDao {

@SqlUpdate(
"""
INSERT INTO job_facets (
uuid,
created_at,
job_uuid,
run_uuid,
lineage_event_time,
lineage_event_type,
name,
facet
) VALUES (
:uuid,
:createdAt,
:jobUuid,
:runUuid,
:lineageEventTime,
:lineageEventType,
:name,
:facet
)
""")
void insertJobFacet(
UUID uuid,
Instant createdAt,
UUID jobUuid,
UUID runUuid,
Instant lineageEventTime,
String lineageEventType,
String name,
PGobject facet);

@Transaction
default void insertJobFacetsFor(
@NonNull UUID jobUuid,
@NonNull UUID runUuid,
@NonNull Instant lineageEventTime,
@NonNull String lineageEventType,
@NonNull LineageEvent.JobFacet jobFacet) {
final Instant now = Instant.now();

JsonNode jsonNode = Utils.getMapper().valueToTree(jobFacet);
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false)
.forEach(
fieldName ->
insertJobFacet(
UUID.randomUUID(),
now,
jobUuid,
runUuid,
lineageEventTime,
lineageEventType,
fieldName,
FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName))));
}

record JobFacetRow(
UUID uuid,
Instant createdAt,
UUID jobUuid,
UUID runUuid,
Instant lineageEventTime,
String lineageEventType,
String name,
PGobject facet) {}
}
4 changes: 2 additions & 2 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import marquez.db.mappers.JobDataMapper;
import marquez.db.mappers.JobRowMapper;
import marquez.db.mappers.RunMapper;
import marquez.db.models.DatasetData;
import marquez.db.models.JobData;
import marquez.service.models.DatasetData;
import marquez.service.models.JobData;
import marquez.service.models.Run;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.BindList;
Expand Down
Loading

0 comments on commit c89504d

Please sign in to comment.