Skip to content

Commit

Permalink
Cleanup lineage on pipeline and store procedure removal (#19133)
Browse files Browse the repository at this point in the history
  • Loading branch information
mohityadav766 authored Dec 19, 2024
1 parent a0c33cc commit f9de2b9
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1180,10 +1180,7 @@ void deleteLineageBySource(
+ "AND json->>'source' = :source",
connectionType = POSTGRES)
void deleteLineageBySourcePipeline(
@BindUUID("toId") UUID toId,
@Bind("toEntity") String toEntity,
@Bind("source") String source,
@Bind("relation") int relation);
@BindUUID("toId") UUID toId, @Bind("source") String source, @Bind("relation") int relation);

class FromRelationshipMapper implements RowMapper<EntityRelationshipRecord> {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ public void deleteLineageBySource(UUID toId, String toEntity, String source) {
.findLineageBySourcePipeline(toId, toEntity, source, Relationship.UPSTREAM.ordinal());
// Finally, delete lineage relationship
dao.relationshipDAO()
.deleteLineageBySourcePipeline(toId, toEntity, source, Relationship.UPSTREAM.ordinal());
.deleteLineageBySourcePipeline(toId, toEntity, Relationship.UPSTREAM.ordinal());
} else {
relations =
dao.relationshipDAO()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.LineageDetails;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.schema.type.Status;
import org.openmetadata.schema.type.TagLabel;
import org.openmetadata.schema.type.Task;
Expand Down Expand Up @@ -296,6 +298,18 @@ public void storeEntity(Pipeline pipeline, boolean update) {
pipeline.withService(service).withTasks(taskWithTagsAndOwners);
}

@Override
protected void cleanup(Pipeline pipeline) {
// When a pipeline is removed , the linege needs to be removed
daoCollection
.relationshipDAO()
.deleteLineageBySourcePipeline(
pipeline.getId(),
LineageDetails.Source.PIPELINE_LINEAGE.value(),
Relationship.UPSTREAM.ordinal());
super.cleanup(pipeline);
}

@Override
public void storeRelationships(Pipeline pipeline) {
addServiceRelationship(pipeline, pipeline.getService());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.openmetadata.schema.entity.data.StoredProcedure;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.LineageDetails;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.service.Entity;
import org.openmetadata.service.resources.databases.StoredProcedureResource;
Expand Down Expand Up @@ -69,6 +70,18 @@ public void storeRelationships(StoredProcedure storedProcedure) {
Relationship.CONTAINS);
}

@Override
protected void cleanup(StoredProcedure storedProcedure) {
// When a pipeline is removed , the linege needs to be removed
daoCollection
.relationshipDAO()
.deleteLineageBySourcePipeline(
storedProcedure.getId(),
LineageDetails.Source.QUERY_LINEAGE.value(),
Relationship.UPSTREAM.ordinal());
super.cleanup(storedProcedure);
}

@Override
public void setInheritedFields(StoredProcedure storedProcedure, EntityUtil.Fields fields) {
DatabaseSchema schema =
Expand Down

0 comments on commit f9de2b9

Please sign in to comment.