Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup lineage on pipeline and store procedure removal #19133

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1180,10 +1180,7 @@ void deleteLineageBySource(
+ "AND json->>'source' = :source",
connectionType = POSTGRES)
void deleteLineageBySourcePipeline(
@BindUUID("toId") UUID toId,
@Bind("toEntity") String toEntity,
@Bind("source") String source,
@Bind("relation") int relation);
@BindUUID("toId") UUID toId, @Bind("source") String source, @Bind("relation") int relation);

class FromRelationshipMapper implements RowMapper<EntityRelationshipRecord> {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ public void deleteLineageBySource(UUID toId, String toEntity, String source) {
.findLineageBySourcePipeline(toId, toEntity, source, Relationship.UPSTREAM.ordinal());
// Finally, delete lineage relationship
dao.relationshipDAO()
.deleteLineageBySourcePipeline(toId, toEntity, source, Relationship.UPSTREAM.ordinal());
.deleteLineageBySourcePipeline(toId, toEntity, Relationship.UPSTREAM.ordinal());
} else {
relations =
dao.relationshipDAO()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.LineageDetails;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.schema.type.Status;
import org.openmetadata.schema.type.TagLabel;
import org.openmetadata.schema.type.Task;
Expand Down Expand Up @@ -296,6 +298,18 @@ public void storeEntity(Pipeline pipeline, boolean update) {
pipeline.withService(service).withTasks(taskWithTagsAndOwners);
}

@Override
protected void cleanup(Pipeline pipeline) {
// When a pipeline is removed , the linege needs to be removed
daoCollection
.relationshipDAO()
.deleteLineageBySourcePipeline(
pipeline.getId(),
LineageDetails.Source.PIPELINE_LINEAGE.value(),
Relationship.UPSTREAM.ordinal());
super.cleanup(pipeline);
}

@Override
public void storeRelationships(Pipeline pipeline) {
addServiceRelationship(pipeline, pipeline.getService());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.openmetadata.schema.entity.data.StoredProcedure;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.LineageDetails;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.service.Entity;
import org.openmetadata.service.resources.databases.StoredProcedureResource;
Expand Down Expand Up @@ -69,6 +70,18 @@ public void storeRelationships(StoredProcedure storedProcedure) {
Relationship.CONTAINS);
}

@Override
protected void cleanup(StoredProcedure storedProcedure) {
// When a pipeline is removed , the linege needs to be removed
daoCollection
.relationshipDAO()
.deleteLineageBySourcePipeline(
storedProcedure.getId(),
LineageDetails.Source.QUERY_LINEAGE.value(),
Relationship.UPSTREAM.ordinal());
super.cleanup(storedProcedure);
}

@Override
public void setInheritedFields(StoredProcedure storedProcedure, EntityUtil.Fields fields) {
DatabaseSchema schema =
Expand Down
Loading