-
Notifications
You must be signed in to change notification settings - Fork 325
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix symlink display on marquez #2736
Changes from all commits
a03621f
40bbca1
005928a
e3e83b5
ec44bab
e70b1a6
00a7d34
93b4363
a6dde9b
a4bfbe1
0178bb3
e832a6f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -102,16 +102,21 @@ INNER JOIN jobs_view j ON (j.uuid=l2.job_uuid OR j.uuid=l2.job_symlink_target_uu | |
""" | ||
SELECT ds.*, dv.fields, dv.lifecycle_state | ||
FROM datasets_view ds | ||
LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid | ||
WHERE ds.uuid IN (<dsUuids>)""") | ||
LEFT JOIN dataset_versions dv ON dv.uuid = ds.current_version_uuid | ||
LEFT JOIN dataset_symlinks dsym ON dsym.namespace_uuid = ds.namespace_uuid and dsym.name = ds.name | ||
WHERE dsym.is_primary = true | ||
AND ds.uuid IN (<dsUuids>)""") | ||
Comment on lines
+105
to
+108
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So here since the view datasets_views can have several rows with the same uuid we choose the one flagged as primary. |
||
Set<DatasetData> getDatasetData(@BindList Set<UUID> dsUuids); | ||
|
||
@SqlQuery( | ||
""" | ||
SELECT ds.*, dv.fields, dv.lifecycle_state | ||
FROM datasets_view ds | ||
LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid | ||
WHERE ds.name = :datasetName AND ds.namespace_name = :namespaceName""") | ||
LEFT JOIN dataset_symlinks dsym ON dsym.namespace_uuid = ds.namespace_uuid and dsym.name = ds.name | ||
INNER JOIN datasets_view AS d ON d.uuid = ds.uuid | ||
WHERE dsym.is_primary is true | ||
AND CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)""") | ||
DatasetData getDatasetData(String namespaceName, String datasetName); | ||
|
||
@SqlQuery( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -114,15 +114,20 @@ public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) { | |
if (!datasetIds.isEmpty()) { | ||
datasets.addAll(this.getDatasetData(datasetIds)); | ||
} | ||
if (nodeId.isDatasetType() | ||
&& datasets.stream().noneMatch(n -> n.getId().equals(nodeId.asDatasetId()))) { | ||
log.warn( | ||
"Found jobs {} which no longer share lineage with dataset '{}' - discarding", | ||
jobData.stream().map(JobData::getId).toList(), | ||
nodeId.getValue()); | ||
return toLineageWithOrphanDataset(nodeId.asDatasetId()); | ||
} | ||
|
||
if (nodeId.isDatasetType()) { | ||
DatasetId datasetId = nodeId.asDatasetId(); | ||
DatasetData datasetData = | ||
this.getDatasetData(datasetId.getNamespace().getValue(), datasetId.getName().getValue()); | ||
|
||
if (!datasetIds.contains(datasetData.getUuid())) { | ||
log.warn( | ||
"Found jobs {} which no longer share lineage with dataset '{}' - discarding", | ||
jobData.stream().map(JobData::getId).toList(), | ||
nodeId.getValue()); | ||
return toLineageWithOrphanDataset(nodeId.asDatasetId()); | ||
} | ||
} | ||
Comment on lines
+118
to
+130
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now we check if the uuid of the node and not the namespace+name There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! Thanks for adding the |
||
return toLineage(jobData, datasets); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,23 +1,33 @@ | ||
DROP VIEW IF EXISTS datasets_view; | ||
CREATE VIEW datasets_view | ||
AS | ||
CREATE VIEW datasets_view AS | ||
SELECT d.uuid, | ||
d.type, | ||
d.created_at, | ||
d.updated_at, | ||
d.namespace_uuid, | ||
d.source_uuid, | ||
d.name, | ||
array_agg(CAST((namespaces.name, symlinks.name) AS DATASET_NAME)) AS dataset_symlinks, | ||
d.physical_name, | ||
d.description, | ||
d.current_version_uuid, | ||
d.last_modified_at, | ||
d.namespace_name, | ||
d.source_name, | ||
d.is_deleted | ||
FROM datasets d | ||
JOIN dataset_symlinks symlinks ON d.uuid = symlinks.dataset_uuid | ||
INNER JOIN namespaces ON symlinks.namespace_uuid = namespaces.uuid | ||
WHERE d.is_hidden IS FALSE | ||
GROUP BY d.uuid; | ||
d.type, | ||
d.created_at, | ||
d.updated_at, | ||
CASE | ||
WHEN (d.namespace_name = namespaces.name AND d.name = symlinks.name) THEN d.namespace_uuid | ||
ELSE namespaces.uuid | ||
END | ||
AS namespace_uuid , | ||
d.source_uuid, | ||
CASE | ||
WHEN (d.namespace_name = namespaces.name and d.name = symlinks.name) THEN d.name | ||
ELSE symlinks.name | ||
END | ||
AS name, | ||
array(SELECT ROW(namespaces.name::character varying(255), symlinks.name::character varying(255))::dataset_name) AS dataset_symlinks, | ||
d.physical_name, | ||
d.description, | ||
d.current_version_uuid, | ||
d.last_modified_at, | ||
CASE | ||
WHEN (d.namespace_name = namespaces.name AND d.name = symlinks.name) THEN d.namespace_name | ||
ELSE namespaces.name | ||
END | ||
AS namespace_name, | ||
d.source_name, | ||
d.is_deleted | ||
FROM datasets d | ||
JOIN dataset_symlinks symlinks ON d.uuid = symlinks.dataset_uuid | ||
JOIN namespaces ON symlinks.namespace_uuid = namespaces.uuid | ||
WHERE d.is_hidden is false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the changelog 💯