Skip to content

Commit

Permalink
Fill data in column lineage input nodes.
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>
  • Loading branch information
JDarDagran committed Feb 2, 2024
1 parent 0ac3a83 commit dfef1ae
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 5 deletions.
3 changes: 2 additions & 1 deletion api/src/main/java/marquez/db/models/InputFieldNodeData.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;
import marquez.service.models.NodeData;

@Getter
@AllArgsConstructor
@ToString
public class InputFieldNodeData {
public class InputFieldNodeData implements NodeData {
@NonNull String namespace;
@NonNull String dataset;
@Nullable UUID datasetVersion;
Expand Down
7 changes: 4 additions & 3 deletions api/src/main/java/marquez/service/ColumnLineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ private Lineage toLineage(Set<ColumnLineageNodeData> lineageNodeData, boolean in
NodeId nodeId = toNodeId(columnLineageNodeData, includeVersion);
graphNodes.put(nodeId, Node.datasetField().data(columnLineageNodeData).id(nodeId));
columnLineageNodeData.getInputFields().stream()
.map(i -> toNodeId(i, includeVersion))
.forEach(
inputNodeId -> {
graphNodes.putIfAbsent(inputNodeId, Node.datasetField().id(inputNodeId));
inputNode -> {
NodeId inputNodeId = toNodeId(inputNode, includeVersion);
graphNodes.putIfAbsent(
inputNodeId, Node.datasetField().id(inputNodeId).data(inputNode));
Optional.ofNullable(outEdges.get(inputNodeId))
.ifPresentOrElse(
nodeEdges -> nodeEdges.add(nodeId),
Expand Down
4 changes: 3 additions & 1 deletion api/src/main/java/marquez/service/models/NodeData.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import marquez.db.models.ColumnLineageNodeData;
import marquez.db.models.InputFieldNodeData;

@JsonTypeInfo(use = JsonTypeInfo.Id.DEDUCTION)
@JsonSubTypes({
@JsonSubTypes.Type(DatasetData.class),
@JsonSubTypes.Type(JobData.class),
@JsonSubTypes.Type(ColumnLineageNodeData.class)
@JsonSubTypes.Type(ColumnLineageNodeData.class),
@JsonSubTypes.Type(InputFieldNodeData.class)
})
public interface NodeData {}

0 comments on commit dfef1ae

Please sign in to comment.