Skip to content

Commit

Permalink
dataset symlinks provided
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Sep 28, 2022
1 parent bb3d163 commit 2f57f19
Show file tree
Hide file tree
Showing 16 changed files with 348 additions and 29 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
* Update `OpenLineageDao` to handle Airflow run UUID conflicts [`#2097`](https://github.com/MarquezProject/marquez/pull/2097) [@collado-mike](https://github.com/collado-mike)
*Alleviates the problem for Airflow installations that will continue to publish events with the older OpenLineage library. This checks the namespace of the parent run and verifies that it matches the namespace in the `ParentRunFacet`. If not, it generates a new parent run ID that will be written with the correct namespace. (The Airflow integration was generating conflicting UUIDs based on the DAG name and the DagRun ID without accounting for different namespaces. In Marquez installations that have multiple Airflow deployments with duplicated DAG names, we generated jobs whose parents have the wrong namespace.)*

* Implemented dataset symlink feature which allows providing multiple names for a dataset and adds edges to lineage graph based on symlinks [`#2066`](https://github.com/MarquezProject/marquez/pull/2066) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)

## [0.25.0](https://github.com/MarquezProject/marquez/compare/0.24.0...0.25.0) - 2022-08-08

### Fixed
Expand Down
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/db/BaseDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public interface BaseDao extends SqlObject {
@CreateSqlObject
NamespaceDao createNamespaceDao();

@CreateSqlObject
DatasetSymlinkDao createDatasetSymlinkDao();

@CreateSqlObject
RunDao createRunDao();

Expand Down
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ private Columns() {}
public static final String FIELD_UUIDS = "field_uuids";
public static final String LIFECYCLE_STATE = "lifecycle_state";

/* DATASET SYMLINK ROW COLUMNS */
public static final String IS_PRIMARY = "is_primary";

/* STREAM VERSION ROW COLUMNS */
public static final String SCHEMA_LOCATION = "schema_location";

Expand Down
26 changes: 16 additions & 10 deletions api/src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import marquez.db.mappers.DatasetMapper;
import marquez.db.mappers.DatasetRowMapper;
import marquez.db.models.DatasetRow;
import marquez.db.models.DatasetSymlinkRow;
import marquez.db.models.DatasetVersionRow;
import marquez.db.models.NamespaceRow;
import marquez.db.models.SourceRow;
Expand Down Expand Up @@ -73,8 +74,7 @@ void updateLastModifiedAt(
WITH selected_datasets AS (
SELECT d.*
FROM datasets_view d
WHERE d.namespace_name = :namespaceName
AND d.name = :datasetName
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
), dataset_runs AS (
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event
FROM selected_datasets d
Expand Down Expand Up @@ -229,7 +229,7 @@ INSERT INTO datasets (
:description,
:isDeleted,
false
) ON CONFLICT (namespace_uuid, name)
) ON CONFLICT (uuid)
DO UPDATE SET
type = EXCLUDED.type,
updated_at = EXCLUDED.updated_at,
Expand Down Expand Up @@ -275,7 +275,7 @@ DatasetRow upsert(
+ ":sourceName, "
+ ":name, "
+ ":physicalName) "
+ "ON CONFLICT (namespace_uuid, name) "
+ "ON CONFLICT (uuid) "
+ "DO UPDATE SET "
+ "type = EXCLUDED.type, "
+ "updated_at = EXCLUDED.updated_at, "
Expand All @@ -296,8 +296,10 @@ DatasetRow upsert(
"""
UPDATE datasets
SET is_hidden = true
WHERE namespace_name = :namespaceName
AND name = :name
FROM dataset_symlinks, namespaces
WHERE dataset_symlinks.dataset_uuid = datasets.uuid
AND namespaces.uuid = dataset_symlinks.namespace_uuid
AND namespaces.name=:namespaceName AND dataset_symlinks.name=:name
RETURNING *
""")
Optional<DatasetRow> delete(String namespaceName, String name);
Expand All @@ -310,6 +312,10 @@ default Dataset upsertDatasetMeta(
createNamespaceDao()
.upsertNamespaceRow(
UUID.randomUUID(), now, namespaceName.getValue(), DEFAULT_NAMESPACE_OWNER);
DatasetSymlinkRow symlinkRow =
createDatasetSymlinkDao()
.upsertDatasetSymlinkRow(
UUID.randomUUID(), datasetName.getValue(), namespaceRow.getUuid(), true, null, now);
SourceRow sourceRow =
createSourceDao()
.upsertOrDefault(
Expand All @@ -318,13 +324,12 @@ default Dataset upsertDatasetMeta(
now,
datasetMeta.getSourceName().getValue(),
"");
UUID newDatasetUuid = UUID.randomUUID();
DatasetRow datasetRow;

if (datasetMeta.getDescription().isPresent()) {
datasetRow =
upsert(
newDatasetUuid,
symlinkRow.getUuid(),
datasetMeta.getType(),
now,
namespaceRow.getUuid(),
Expand All @@ -338,7 +343,7 @@ default Dataset upsertDatasetMeta(
} else {
datasetRow =
upsert(
newDatasetUuid,
symlinkRow.getUuid(),
datasetMeta.getType(),
now,
namespaceRow.getUuid(),
Expand All @@ -349,7 +354,8 @@ default Dataset upsertDatasetMeta(
datasetMeta.getPhysicalName().getValue());
}

updateDatasetMetric(namespaceName, datasetMeta.getType(), newDatasetUuid, datasetRow.getUuid());
updateDatasetMetric(
namespaceName, datasetMeta.getType(), symlinkRow.getUuid(), datasetRow.getUuid());

TagDao tagDao = createTagDao();
List<DatasetTagMapping> datasetTagMappings = new ArrayList<>();
Expand Down
51 changes: 51 additions & 0 deletions api/src/main/java/marquez/db/DatasetSymlinkDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db;

import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import marquez.db.mappers.DatasetSymlinksRowMapper;
import marquez.db.models.DatasetSymlinkRow;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

@RegisterRowMapper(DatasetSymlinksRowMapper.class)
public interface DatasetSymlinkDao extends BaseDao {

default DatasetSymlinkRow upsertDatasetSymlinkRow(
UUID uuid, String name, UUID namespaceUuid, boolean isPrimary, String type, Instant now) {
doUpsertDatasetSymlinkRow(uuid, name, namespaceUuid, isPrimary, type, now);
return findDatasetSymlinkByNamespaceUuidAndName(namespaceUuid, name).orElseThrow();
}

@SqlQuery("SELECT * FROM dataset_symlinks WHERE namespace_uuid = :namespaceUuid and name = :name")
Optional<DatasetSymlinkRow> findDatasetSymlinkByNamespaceUuidAndName(
UUID namespaceUuid, String name);

@SqlUpdate(
"""
INSERT INTO dataset_symlinks (
dataset_uuid,
name,
namespace_uuid,
is_primary,
type,
created_at,
updated_at
) VALUES (
:uuid,
:name,
:namespaceUuid,
:isPrimary,
:type,
:now,
:now)
ON CONFLICT (name, namespace_uuid) DO NOTHING""")
void doUpsertDatasetSymlinkRow(
UUID uuid, String name, UUID namespaceUuid, boolean isPrimary, String type, Instant now);
}
38 changes: 36 additions & 2 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import marquez.db.mappers.LineageEventMapper;
import marquez.db.models.DatasetFieldRow;
import marquez.db.models.DatasetRow;
import marquez.db.models.DatasetSymlinkRow;
import marquez.db.models.DatasetVersionRow;
import marquez.db.models.JobContextRow;
import marquez.db.models.JobRow;
Expand Down Expand Up @@ -120,6 +121,7 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper map

default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper mapper) {
NamespaceDao namespaceDao = createNamespaceDao();
DatasetSymlinkDao datasetSymlinkDao = createDatasetSymlinkDao();
DatasetDao datasetDao = createDatasetDao();
SourceDao sourceDao = createSourceDao();
JobDao jobDao = createJobDao();
Expand Down Expand Up @@ -316,6 +318,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
runUuid,
true,
namespaceDao,
datasetSymlinkDao,
sourceDao,
datasetDao,
datasetVersionDao,
Expand All @@ -337,6 +340,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
runUuid,
false,
namespaceDao,
datasetSymlinkDao,
sourceDao,
datasetDao,
datasetVersionDao,
Expand Down Expand Up @@ -532,6 +536,7 @@ default DatasetRecord upsertLineageDataset(
UUID runUuid,
boolean isInput,
NamespaceDao namespaceDao,
DatasetSymlinkDao datasetSymlinkDao,
SourceDao sourceDao,
DatasetDao datasetDao,
DatasetVersionDao datasetVersionDao,
Expand Down Expand Up @@ -568,6 +573,35 @@ default DatasetRecord upsertLineageDataset(
formatNamespaceName(ds.getNamespace()),
DEFAULT_NAMESPACE_OWNER);

DatasetSymlinkRow symlink =
datasetSymlinkDao.upsertDatasetSymlinkRow(
UUID.randomUUID(),
formatDatasetName(ds.getName()),
dsNamespace.getUuid(),
true,
null,
now);

Optional.ofNullable(ds.getFacets())
.map(facets -> facets.getSymlinks())
.ifPresent(
el ->
el.getIdentifiers().stream()
.forEach(
id ->
datasetSymlinkDao.doUpsertDatasetSymlinkRow(
symlink.getUuid(),
id.getName(),
namespaceDao
.upsertNamespaceRow(
UUID.randomUUID(),
now,
id.getNamespace(),
DEFAULT_NAMESPACE_OWNER)
.getUuid(),
false,
id.getType(),
now)));
String dslifecycleState =
Optional.ofNullable(ds.getFacets())
.map(DatasetFacets::getLifecycleStateChange)
Expand All @@ -576,7 +610,7 @@ default DatasetRecord upsertLineageDataset(

DatasetRow datasetRow =
datasetDao.upsert(
UUID.randomUUID(),
symlink.getUuid(),
getDatasetType(ds),
now,
datasetNamespace.getUuid(),
Expand Down Expand Up @@ -609,7 +643,7 @@ default DatasetRecord upsertLineageDataset(
dsNamespace.getName(),
source.getName(),
dsRow.getPhysicalName(),
dsRow.getName(),
symlink.getName(),
dslifecycleState,
fields,
runUuid)
Expand Down
36 changes: 36 additions & 0 deletions api/src/main/java/marquez/db/mappers/DatasetSymlinksRowMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.mappers;

import static marquez.db.Columns.booleanOrDefault;
import static marquez.db.Columns.stringOrNull;
import static marquez.db.Columns.stringOrThrow;
import static marquez.db.Columns.timestampOrThrow;
import static marquez.db.Columns.uuidOrThrow;

import java.sql.ResultSet;
import java.sql.SQLException;
import lombok.NonNull;
import marquez.db.Columns;
import marquez.db.models.DatasetSymlinkRow;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

public class DatasetSymlinksRowMapper implements RowMapper<DatasetSymlinkRow> {

@Override
public DatasetSymlinkRow map(@NonNull ResultSet results, @NonNull StatementContext context)
throws SQLException {
return new DatasetSymlinkRow(
uuidOrThrow(results, Columns.DATASET_UUID),
stringOrThrow(results, Columns.NAME),
uuidOrThrow(results, Columns.NAMESPACE_UUID),
stringOrNull(results, Columns.TYPE),
booleanOrDefault(results, Columns.IS_PRIMARY, false),
timestampOrThrow(results, Columns.CREATED_AT),
timestampOrThrow(results, Columns.UPDATED_AT));
}
}
33 changes: 33 additions & 0 deletions api/src/main/java/marquez/db/models/DatasetSymlinkRow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.models;

import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.Value;

@AllArgsConstructor
@EqualsAndHashCode
@Value
public class DatasetSymlinkRow {
@NonNull UUID uuid;
@NonNull String name;
@NonNull UUID namespaceUuid;
@Nullable String type;
@NonNull boolean isPrimary;
@Getter @NonNull private final Instant createdAt;
@Getter @NonNull private final Instant updatedAt;

public Optional<String> getType() {
return Optional.ofNullable(type);
}
}
Loading

0 comments on commit 2f57f19

Please sign in to comment.