Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
Signed-off-by: sophiely <ly.sophie200@gmail.com>
  • Loading branch information
sophiely committed Feb 7, 2024
1 parent 05c09ed commit b0e02fe
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 63 deletions.
12 changes: 3 additions & 9 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,9 @@ AND ds.uuid IN (<dsUuids>)""")
FROM datasets_view ds
LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid
LEFT JOIN dataset_symlinks dsym ON dsym.namespace_uuid = ds.namespace_uuid and dsym.name = ds.name
INNER JOIN (
SELECT uuid
FROM datasets_view as u
WHERE
u.name = :datasetName
AND u.namespace_name = :namespaceName
) as u
on u.uuid = ds.uuid
WHERE dsym.is_primary is true""")
INNER JOIN datasets_view AS d ON d.uuid = ds.uuid
WHERE dsym.is_primary is true
AND CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)""")
DatasetData getDatasetData(String namespaceName, String datasetName);

@SqlQuery(
Expand Down
52 changes: 31 additions & 21 deletions api/src/main/resources/marquez/db/migration/R__3_Datasets_view.sql
Original file line number Diff line number Diff line change
@@ -1,23 +1,33 @@
DROP VIEW IF EXISTS datasets_view;
CREATE VIEW datasets_view
AS
CREATE VIEW datasets_view AS
SELECT d.uuid,
d.type,
d.created_at,
d.updated_at,
d.namespace_uuid,
d.source_uuid,
d.name,
array_agg(CAST((namespaces.name, symlinks.name) AS DATASET_NAME)) AS dataset_symlinks,
d.physical_name,
d.description,
d.current_version_uuid,
d.last_modified_at,
d.namespace_name,
d.source_name,
d.is_deleted
FROM datasets d
JOIN dataset_symlinks symlinks ON d.uuid = symlinks.dataset_uuid
INNER JOIN namespaces ON symlinks.namespace_uuid = namespaces.uuid
WHERE d.is_hidden IS FALSE
GROUP BY d.uuid;
d.type,
d.created_at,
d.updated_at,
CASE
WHEN (d.namespace_name = namespaces.name AND d.name = symlinks.name) THEN d.namespace_uuid
ELSE namespaces.uuid
END
AS namespace_uuid ,
d.source_uuid,
CASE
WHEN (d.namespace_name = namespaces.name and d.name = symlinks.name) THEN d.name
ELSE symlinks.name
END
AS name,
array(SELECT ROW(namespaces.name::character varying(255), symlinks.name::character varying(255))::dataset_name) AS dataset_symlinks,
d.physical_name,
d.description,
d.current_version_uuid,
d.last_modified_at,
CASE
WHEN (d.namespace_name = namespaces.name AND d.name = symlinks.name) THEN d.namespace_name
ELSE namespaces.name
END
AS namespace_name,
d.source_name,
d.is_deleted
FROM datasets d
JOIN dataset_symlinks symlinks ON d.uuid = symlinks.dataset_uuid
JOIN namespaces ON symlinks.namespace_uuid = namespaces.uuid
WHERE d.is_hidden is false;

This file was deleted.

62 changes: 62 additions & 0 deletions api/src/test/java/marquez/service/LineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -588,4 +590,64 @@ public void testLineageForOrphanedDataset() {
private boolean jobNameEquals(Node node, String writeJob) {
return node.getId().asJobId().getName().getValue().equals(writeJob);
}

@Test
public void testSymlinkDatasetLineage() {
// (1) Create symlink facet for our main dataset
Map<String, Object> symlink = new HashMap<>();
Map<String, Object> symlinkInfo = new HashMap<>();
Map<String, Object> symlinkIdentifiers = new HashMap<>();
symlinkIdentifiers.put("name", "symlinkDataset");
symlinkIdentifiers.put("namespace", NAMESPACE);
symlinkIdentifiers.put("type", "DB_TABLE");
symlinkInfo.put("producer", "https://github.com/OpenLineage/producer/");
symlinkInfo.put("schemaURL", "https://openlineage.io/schema/url/");
symlinkInfo.put("identifiers", symlinkIdentifiers);
symlink.put("symlinks", symlinkInfo);

// (2) Create main dataset with a symlink
Dataset mainDataset =
new Dataset(
NAMESPACE,
"mainDataset",
newDatasetFacet(symlink, new SchemaField("firstname", "string", "the first name")));

// (3) Create the symlink dataset
Dataset symlinkDataset =
new Dataset(
NAMESPACE,
"symlinkDataset",
newDatasetFacet(new SchemaField("firstname", "string", "the first name")));

// (3) Create a job with the main dataset
UpdateLineageRow firstJob =
LineageTestUtils.createLineageRow(
openLineageDao,
"firstJob",
"COMPLETE",
jobFacet,
Arrays.asList(mainDataset),
Arrays.asList());

// (4) Create a job with the symlink dataset
UpdateLineageRow secondJob =
LineageTestUtils.createLineageRow(
openLineageDao,
"secondJob",
"COMPLETE",
jobFacet,
Arrays.asList(symlinkDataset),
Arrays.asList());

// (5) We expect the first and second job linked together because the main
// and symlink dataset are in fact the same dataset
Lineage lineage =
lineageService.lineage(
NodeId.of(
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("symlinkDataset"))),
5,
true);

assertThat(lineage.getGraph()).hasSize(3);
}
}

0 comments on commit b0e02fe

Please sign in to comment.