Skip to content

Commit

Permalink
dataset symlinks provided
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Sep 8, 2022
1 parent 4b09c74 commit fda3425
Show file tree
Hide file tree
Showing 18 changed files with 355 additions and 40 deletions.
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/db/BaseDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public interface BaseDao extends SqlObject {
@CreateSqlObject
NamespaceDao createNamespaceDao();

@CreateSqlObject
DatasetSymlinkDao createDatasetSymlinkDao();

@CreateSqlObject
RunDao createRunDao();

Expand Down
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ private Columns() {}
public static final String FIELD_UUIDS = "field_uuids";
public static final String LIFECYCLE_STATE = "lifecycle_state";

/* DATASET SYMLINK ROW COLUMNS */
public static final String IS_PRIMARY = "is_primary";

/* STREAM VERSION ROW COLUMNS */
public static final String SCHEMA_LOCATION = "schema_location";

Expand Down
34 changes: 16 additions & 18 deletions api/src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import marquez.db.mappers.DatasetMapper;
import marquez.db.mappers.DatasetRowMapper;
import marquez.db.models.DatasetRow;
import marquez.db.models.DatasetSymlinkRow;
import marquez.db.models.DatasetVersionRow;
import marquez.db.models.NamespaceRow;
import marquez.db.models.SourceRow;
Expand Down Expand Up @@ -73,8 +74,7 @@ void updateLastModifiedAt(
WITH selected_datasets AS (
SELECT d.*
FROM datasets_view d
WHERE d.namespace_name = :namespaceName
AND d.name = :datasetName
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_names)
), dataset_runs AS (
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event
FROM selected_datasets d
Expand Down Expand Up @@ -210,7 +210,6 @@ INSERT INTO datasets (
namespace_name,
source_uuid,
source_name,
name,
physical_name,
description,
is_deleted,
Expand All @@ -224,12 +223,11 @@ INSERT INTO datasets (
:namespaceName,
:sourceUuid,
:sourceName,
:name,
:physicalName,
:description,
:isDeleted,
false
) ON CONFLICT (namespace_uuid, name)
) ON CONFLICT (uuid)
DO UPDATE SET
type = EXCLUDED.type,
updated_at = EXCLUDED.updated_at,
Expand All @@ -247,7 +245,6 @@ DatasetRow upsert(
String namespaceName,
UUID sourceUuid,
String sourceName,
String name,
String physicalName,
String description,
boolean isDeleted);
Expand All @@ -262,7 +259,6 @@ DatasetRow upsert(
+ "namespace_name, "
+ "source_uuid, "
+ "source_name, "
+ "name, "
+ "physical_name "
+ ") VALUES ( "
+ ":uuid, "
Expand All @@ -273,9 +269,8 @@ DatasetRow upsert(
+ ":namespaceName, "
+ ":sourceUuid, "
+ ":sourceName, "
+ ":name, "
+ ":physicalName) "
+ "ON CONFLICT (namespace_uuid, name) "
+ "ON CONFLICT (uuid) "
+ "DO UPDATE SET "
+ "type = EXCLUDED.type, "
+ "updated_at = EXCLUDED.updated_at, "
Expand All @@ -289,15 +284,16 @@ DatasetRow upsert(
String namespaceName,
UUID sourceUuid,
String sourceName,
String name,
String physicalName);

@SqlQuery(
"""
UPDATE datasets
SET is_hidden = true
WHERE namespace_name = :namespaceName
AND name = :name
FROM dataset_symlinks, namespaces
WHERE dataset_symlinks.dataset_uuid = datasets.uuid
AND namespaces.uuid = dataset_symlinks.namespace_uuid
AND namespaces.name=:namespaceName AND dataset_symlinks.name=:name
RETURNING *
""")
Optional<DatasetRow> delete(String namespaceName, String name);
Expand All @@ -310,6 +306,10 @@ default Dataset upsertDatasetMeta(
createNamespaceDao()
.upsertNamespaceRow(
UUID.randomUUID(), now, namespaceName.getValue(), DEFAULT_NAMESPACE_OWNER);
DatasetSymlinkRow symlinkRow =
createDatasetSymlinkDao()
.upsertDatasetSymlinkRow(
UUID.randomUUID(), datasetName.getValue(), namespaceRow.getUuid(), true, now);
SourceRow sourceRow =
createSourceDao()
.upsertOrDefault(
Expand All @@ -318,38 +318,36 @@ default Dataset upsertDatasetMeta(
now,
datasetMeta.getSourceName().getValue(),
"");
UUID newDatasetUuid = UUID.randomUUID();
DatasetRow datasetRow;

if (datasetMeta.getDescription().isPresent()) {
datasetRow =
upsert(
newDatasetUuid,
symlinkRow.getUuid(),
datasetMeta.getType(),
now,
namespaceRow.getUuid(),
namespaceRow.getName(),
sourceRow.getUuid(),
sourceRow.getName(),
datasetName.getValue(),
datasetMeta.getPhysicalName().getValue(),
datasetMeta.getDescription().orElse(null),
false);
} else {
datasetRow =
upsert(
newDatasetUuid,
symlinkRow.getUuid(),
datasetMeta.getType(),
now,
namespaceRow.getUuid(),
namespaceRow.getName(),
sourceRow.getUuid(),
sourceRow.getName(),
datasetName.getValue(),
datasetMeta.getPhysicalName().getValue());
}

updateDatasetMetric(namespaceName, datasetMeta.getType(), newDatasetUuid, datasetRow.getUuid());
updateDatasetMetric(
namespaceName, datasetMeta.getType(), symlinkRow.getUuid(), datasetRow.getUuid());

TagDao tagDao = createTagDao();
List<DatasetTagMapping> datasetTagMappings = new ArrayList<>();
Expand Down
48 changes: 48 additions & 0 deletions api/src/main/java/marquez/db/DatasetSymlinkDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db;

import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import marquez.db.mappers.DatasetSymlinksRowMapper;
import marquez.db.models.DatasetSymlinkRow;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

@RegisterRowMapper(DatasetSymlinksRowMapper.class)
public interface DatasetSymlinkDao extends BaseDao {

default DatasetSymlinkRow upsertDatasetSymlinkRow(
UUID uuid, String name, UUID namespaceUuid, boolean isPrimary, Instant now) {
doUpsertDatasetSymlinkRow(uuid, name, namespaceUuid, isPrimary, now);
return findDatasetSymlinkByNamespaceUidAndName(namespaceUuid, name).orElseThrow();
}

@SqlQuery("SELECT * FROM dataset_symlinks WHERE namespace_uuid = :namespaceUuid and name = :name")
Optional<DatasetSymlinkRow> findDatasetSymlinkByNamespaceUidAndName(
UUID namespaceUuid, String name);

@SqlUpdate(
"INSERT INTO dataset_symlinks ("
+ "dataset_uuid, "
+ "name, "
+ "namespace_uuid, "
+ "is_primary, "
+ "created_at, "
+ "updated_at"
+ ") VALUES ( "
+ ":uuid, "
+ ":name, "
+ ":namespaceUuid, "
+ ":isPrimary, "
+ ":now, "
+ ":now) "
+ "ON CONFLICT (name, namespace_uuid) DO NOTHING")
void doUpsertDatasetSymlinkRow(
UUID uuid, String name, UUID namespaceUuid, boolean isPrimary, Instant now);
}
33 changes: 30 additions & 3 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import marquez.db.mappers.LineageEventMapper;
import marquez.db.models.DatasetFieldRow;
import marquez.db.models.DatasetRow;
import marquez.db.models.DatasetSymlinkRow;
import marquez.db.models.DatasetVersionRow;
import marquez.db.models.JobContextRow;
import marquez.db.models.JobRow;
Expand Down Expand Up @@ -99,6 +100,7 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper map

default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper mapper) {
NamespaceDao namespaceDao = createNamespaceDao();
DatasetSymlinkDao datasetSymlinkDao = createDatasetSymlinkDao();
DatasetDao datasetDao = createDatasetDao();
SourceDao sourceDao = createSourceDao();
JobDao jobDao = createJobDao();
Expand Down Expand Up @@ -295,6 +297,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
runUuid,
true,
namespaceDao,
datasetSymlinkDao,
sourceDao,
datasetDao,
datasetVersionDao,
Expand All @@ -316,6 +319,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
runUuid,
false,
namespaceDao,
datasetSymlinkDao,
sourceDao,
datasetDao,
datasetVersionDao,
Expand Down Expand Up @@ -504,6 +508,7 @@ default DatasetRecord upsertLineageDataset(
UUID runUuid,
boolean isInput,
NamespaceDao namespaceDao,
DatasetSymlinkDao datasetSymlinkDao,
SourceDao sourceDao,
DatasetDao datasetDao,
DatasetVersionDao datasetVersionDao,
Expand Down Expand Up @@ -540,6 +545,29 @@ default DatasetRecord upsertLineageDataset(
formatNamespaceName(ds.getNamespace()),
DEFAULT_NAMESPACE_OWNER);

DatasetSymlinkRow symlink =
datasetSymlinkDao.upsertDatasetSymlinkRow(
UUID.randomUUID(), formatDatasetName(ds.getName()), dsNamespace.getUuid(), true, now);

Optional.ofNullable(ds.getFacets())
.map(facets -> facets.getSymlinks())
.ifPresent(
el ->
el.getIdentifiers().stream()
.forEach(
id ->
datasetSymlinkDao.doUpsertDatasetSymlinkRow(
symlink.getUuid(),
id.getName(),
namespaceDao
.upsertNamespaceRow(
UUID.randomUUID(),
now,
id.getNamespace(),
DEFAULT_NAMESPACE_OWNER)
.getUuid(),
false,
now)));
String dslifecycleState =
Optional.ofNullable(ds.getFacets())
.map(DatasetFacets::getLifecycleStateChange)
Expand All @@ -548,15 +576,14 @@ default DatasetRecord upsertLineageDataset(

DatasetRow datasetRow =
datasetDao.upsert(
UUID.randomUUID(),
symlink.getUuid(),
getDatasetType(ds),
now,
datasetNamespace.getUuid(),
datasetNamespace.getName(),
source.getUuid(),
source.getName(),
formatDatasetName(ds.getName()),
ds.getName(),
dsDescription,
dslifecycleState.equalsIgnoreCase("DROP"));

Expand All @@ -581,7 +608,7 @@ default DatasetRecord upsertLineageDataset(
dsNamespace.getName(),
source.getName(),
dsRow.getPhysicalName(),
dsRow.getName(),
symlink.getName(),
dslifecycleState,
fields,
runUuid)
Expand Down
1 change: 0 additions & 1 deletion api/src/main/java/marquez/db/mappers/DatasetRowMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public DatasetRow map(@NonNull ResultSet results, @NonNull StatementContext cont
timestampOrThrow(results, Columns.UPDATED_AT),
uuidOrThrow(results, Columns.NAMESPACE_UUID),
uuidOrThrow(results, Columns.SOURCE_UUID),
stringOrThrow(results, Columns.NAME),
stringOrThrow(results, Columns.PHYSICAL_NAME),
timestampOrNull(results, Columns.LAST_MODIFIED_AT),
stringOrNull(results, Columns.DESCRIPTION),
Expand Down
36 changes: 36 additions & 0 deletions api/src/main/java/marquez/db/mappers/DatasetSymlinksRowMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.mappers;

import static marquez.db.Columns.booleanOrDefault;
import static marquez.db.Columns.stringOrNull;
import static marquez.db.Columns.stringOrThrow;
import static marquez.db.Columns.timestampOrThrow;
import static marquez.db.Columns.uuidOrThrow;

import java.sql.ResultSet;
import java.sql.SQLException;
import lombok.NonNull;
import marquez.db.Columns;
import marquez.db.models.DatasetSymlinkRow;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

public class DatasetSymlinksRowMapper implements RowMapper<DatasetSymlinkRow> {

@Override
public DatasetSymlinkRow map(@NonNull ResultSet results, @NonNull StatementContext context)
throws SQLException {
return new DatasetSymlinkRow(
uuidOrThrow(results, Columns.DATASET_UUID),
stringOrThrow(results, Columns.NAME),
uuidOrThrow(results, Columns.NAMESPACE_UUID),
stringOrNull(results, Columns.TYPE),
booleanOrDefault(results, Columns.IS_PRIMARY, false),
timestampOrThrow(results, Columns.CREATED_AT),
timestampOrThrow(results, Columns.UPDATED_AT));
}
}
1 change: 0 additions & 1 deletion api/src/main/java/marquez/db/models/DatasetRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public class DatasetRow {
@Getter @NonNull private final Instant updatedAt;
@Getter @NonNull private final UUID namespaceUuid;
@Getter @NonNull private final UUID sourceUuid;
@Getter @NonNull private final String name;
@Getter @NonNull private final String physicalName;
@Nullable private final Instant lastModifiedAt;
@Nullable private final String description;
Expand Down
33 changes: 33 additions & 0 deletions api/src/main/java/marquez/db/models/DatasetSymlinkRow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.models;

import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.Value;

@AllArgsConstructor
@EqualsAndHashCode
@Value
public class DatasetSymlinkRow {
@NonNull UUID uuid;
@NonNull String name;
@NonNull UUID namespaceUuid;
@Nullable String type;
@NonNull boolean isPrimary;
@Getter @NonNull private final Instant createdAt;
@Getter @NonNull private final Instant updatedAt;

public Optional<String> getType() {
return Optional.ofNullable(type);
}
}
Loading

0 comments on commit fda3425

Please sign in to comment.