Skip to content

Commit

Permalink
fix symlink display on marquez (#2736)
Browse files Browse the repository at this point in the history
* fix symlink display on marquez

Signed-off-by: sophiely <ly.sophie200@gmail.com>

* fix code formatting

Signed-off-by: sophiely <ly.sophie200@gmail.com>

* update changelog

Signed-off-by: sophiely <ly.sophie200@gmail.com>

* change dataset_views query

Signed-off-by: sophiely <ly.sophie200@gmail.com>

* update changelog

Signed-off-by: sophiely <ly.sophie200@gmail.com>

* rename migration file

Signed-off-by: sophiely <ly.sophie200@gmail.com>

* rename migration file

Signed-off-by: sophiely <ly.sophie200@gmail.com>

* fix formatting and add migration file

Signed-off-by: sophiely <ly.sophie200@gmail.com>

* fix formatting

Signed-off-by: sophiely <ly.sophie200@gmail.com>

* resolve comments

Signed-off-by: sophiely <ly.sophie200@gmail.com>

* resolve tests

Signed-off-by: sophiely <ly.sophie200@gmail.com>

---------

Signed-off-by: sophiely <ly.sophie200@gmail.com>
Co-authored-by: Willy Lulciuc <willy@datakin.com>
  • Loading branch information
sophiely and wslulciuc authored Mar 5, 2024
1 parent dde3e28 commit b0683ad
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 32 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
* Web: minor UI enhancements [`#2727`](https://github.com/MarquezProject/marquez/pull/2727) [@phixMe](https://github.com/phixMe)
*Hygienic cleanup of project as a follow-up to [`#2725`](https://github.com/MarquezProject/marquez/pull/2725), including a fix for [`#2747`](https://github.com/MarquezProject/marquez/issues/2747).*

### Fixed

* bug: marquez dataset symlinks facet create empty namespace: [`#2645`](https://github.com/MarquezProject/marquez/pull/2645) [@sophiely](https://github.com/sophiely)
Display symlink dataset in the previously empty namespace and link the symlink dataset lineage to the main dataset.

## [0.44.0](https://github.com/MarquezProject/marquez/compare/0.43.1...0.44.0) - 2024-01-22

### Added
Expand Down
11 changes: 8 additions & 3 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,21 @@ INNER JOIN jobs_view j ON (j.uuid=l2.job_uuid OR j.uuid=l2.job_symlink_target_uu
"""
SELECT ds.*, dv.fields, dv.lifecycle_state
FROM datasets_view ds
LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid
WHERE ds.uuid IN (<dsUuids>)""")
LEFT JOIN dataset_versions dv ON dv.uuid = ds.current_version_uuid
LEFT JOIN dataset_symlinks dsym ON dsym.namespace_uuid = ds.namespace_uuid and dsym.name = ds.name
WHERE dsym.is_primary = true
AND ds.uuid IN (<dsUuids>)""")
Set<DatasetData> getDatasetData(@BindList Set<UUID> dsUuids);

@SqlQuery(
"""
SELECT ds.*, dv.fields, dv.lifecycle_state
FROM datasets_view ds
LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid
WHERE ds.name = :datasetName AND ds.namespace_name = :namespaceName""")
LEFT JOIN dataset_symlinks dsym ON dsym.namespace_uuid = ds.namespace_uuid and dsym.name = ds.name
INNER JOIN datasets_view AS d ON d.uuid = ds.uuid
WHERE dsym.is_primary is true
AND CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)""")
DatasetData getDatasetData(String namespaceName, String datasetName);

@SqlQuery(
Expand Down
21 changes: 13 additions & 8 deletions api/src/main/java/marquez/service/LineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,20 @@ public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
if (!datasetIds.isEmpty()) {
datasets.addAll(this.getDatasetData(datasetIds));
}
if (nodeId.isDatasetType()
&& datasets.stream().noneMatch(n -> n.getId().equals(nodeId.asDatasetId()))) {
log.warn(
"Found jobs {} which no longer share lineage with dataset '{}' - discarding",
jobData.stream().map(JobData::getId).toList(),
nodeId.getValue());
return toLineageWithOrphanDataset(nodeId.asDatasetId());
}

if (nodeId.isDatasetType()) {
DatasetId datasetId = nodeId.asDatasetId();
DatasetData datasetData =
this.getDatasetData(datasetId.getNamespace().getValue(), datasetId.getName().getValue());

if (!datasetIds.contains(datasetData.getUuid())) {
log.warn(
"Found jobs {} which no longer share lineage with dataset '{}' - discarding",
jobData.stream().map(JobData::getId).toList(),
nodeId.getValue());
return toLineageWithOrphanDataset(nodeId.asDatasetId());
}
}
return toLineage(jobData, datasets);
}

Expand Down
52 changes: 31 additions & 21 deletions api/src/main/resources/marquez/db/migration/R__3_Datasets_view.sql
Original file line number Diff line number Diff line change
@@ -1,23 +1,33 @@
DROP VIEW IF EXISTS datasets_view;
CREATE VIEW datasets_view
AS
CREATE VIEW datasets_view AS
SELECT d.uuid,
d.type,
d.created_at,
d.updated_at,
d.namespace_uuid,
d.source_uuid,
d.name,
array_agg(CAST((namespaces.name, symlinks.name) AS DATASET_NAME)) AS dataset_symlinks,
d.physical_name,
d.description,
d.current_version_uuid,
d.last_modified_at,
d.namespace_name,
d.source_name,
d.is_deleted
FROM datasets d
JOIN dataset_symlinks symlinks ON d.uuid = symlinks.dataset_uuid
INNER JOIN namespaces ON symlinks.namespace_uuid = namespaces.uuid
WHERE d.is_hidden IS FALSE
GROUP BY d.uuid;
d.type,
d.created_at,
d.updated_at,
CASE
WHEN (d.namespace_name = namespaces.name AND d.name = symlinks.name) THEN d.namespace_uuid
ELSE namespaces.uuid
END
AS namespace_uuid ,
d.source_uuid,
CASE
WHEN (d.namespace_name = namespaces.name and d.name = symlinks.name) THEN d.name
ELSE symlinks.name
END
AS name,
array(SELECT ROW(namespaces.name::character varying(255), symlinks.name::character varying(255))::dataset_name) AS dataset_symlinks,
d.physical_name,
d.description,
d.current_version_uuid,
d.last_modified_at,
CASE
WHEN (d.namespace_name = namespaces.name AND d.name = symlinks.name) THEN d.namespace_name
ELSE namespaces.name
END
AS namespace_name,
d.source_name,
d.is_deleted
FROM datasets d
JOIN dataset_symlinks symlinks ON d.uuid = symlinks.dataset_uuid
JOIN namespaces ON symlinks.namespace_uuid = namespaces.uuid
WHERE d.is_hidden is false;
62 changes: 62 additions & 0 deletions api/src/test/java/marquez/service/LineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -588,4 +590,64 @@ public void testLineageForOrphanedDataset() {
private boolean jobNameEquals(Node node, String writeJob) {
return node.getId().asJobId().getName().getValue().equals(writeJob);
}

@Test
public void testSymlinkDatasetLineage() {
// (1) Create symlink facet for our main dataset
Map<String, Object> symlink = new HashMap<>();
Map<String, Object> symlinkInfo = new HashMap<>();
Map<String, Object> symlinkIdentifiers = new HashMap<>();
symlinkIdentifiers.put("name", "symlinkDataset");
symlinkIdentifiers.put("namespace", NAMESPACE);
symlinkIdentifiers.put("type", "DB_TABLE");
symlinkInfo.put("producer", "https://github.com/OpenLineage/producer/");
symlinkInfo.put("schemaURL", "https://openlineage.io/schema/url/");
symlinkInfo.put("identifiers", symlinkIdentifiers);
symlink.put("symlinks", symlinkInfo);

// (2) Create main dataset with a symlink
Dataset mainDataset =
new Dataset(
NAMESPACE,
"mainDataset",
newDatasetFacet(symlink, new SchemaField("firstname", "string", "the first name")));

// (3) Create the symlink dataset
Dataset symlinkDataset =
new Dataset(
NAMESPACE,
"symlinkDataset",
newDatasetFacet(new SchemaField("firstname", "string", "the first name")));

// (3) Create a job with the main dataset
UpdateLineageRow firstJob =
LineageTestUtils.createLineageRow(
openLineageDao,
"firstJob",
"COMPLETE",
jobFacet,
Arrays.asList(mainDataset),
Arrays.asList());

// (4) Create a job with the symlink dataset
UpdateLineageRow secondJob =
LineageTestUtils.createLineageRow(
openLineageDao,
"secondJob",
"COMPLETE",
jobFacet,
Arrays.asList(symlinkDataset),
Arrays.asList());

// (5) We expect the first and second job linked together because the main
// and symlink dataset are in fact the same dataset
Lineage lineage =
lineageService.lineage(
NodeId.of(
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("symlinkDataset"))),
5,
true);

assertThat(lineage.getGraph()).hasSize(2);
}
}

0 comments on commit b0683ad

Please sign in to comment.