Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(graphql/lineage): Support including ghost entities #11510

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.services.RestrictedService;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -60,14 +61,16 @@ public CompletableFuture<EntityLineageResult> get(DataFetchingEnvironment enviro
final LineageInput input = bindArgument(environment.getArgument("input"), LineageInput.class);

final LineageDirection lineageDirection = input.getDirection();
@Nullable final Integer start = input.getStart(); // Optional!
@Nullable final Integer count = input.getCount(); // Optional!
@Nullable final Boolean separateSiblings = input.getSeparateSiblings(); // Optional!
@Nullable final Long startTimeMillis = input.getStartTimeMillis(); // Optional!
// All inputs are optional
@Nullable final Integer start = input.getStart();
@Nullable final Integer count = input.getCount();
@Nullable final Boolean separateSiblings = input.getSeparateSiblings();
@Nullable final Long startTimeMillis = input.getStartTimeMillis();
@Nullable
final Long endTimeMillis =
ResolverUtils.getLineageEndTimeMillis(
input.getStartTimeMillis(), input.getEndTimeMillis()); // Optional!
ResolverUtils.getLineageEndTimeMillis(input.getStartTimeMillis(), input.getEndTimeMillis());
final Boolean includeGhostEntities =
Optional.ofNullable(input.getIncludeGhostEntities()).orElse(false);

com.linkedin.metadata.graph.LineageDirection resolvedDirection =
com.linkedin.metadata.graph.LineageDirection.valueOf(lineageDirection.toString());
Expand All @@ -80,6 +83,8 @@ public CompletableFuture<EntityLineageResult> get(DataFetchingEnvironment enviro
_siblingGraphService.getLineage(
context
.getOperationContext()
.withSearchFlags(
searchFlags -> searchFlags.setIncludeSoftDeleted(includeGhostEntities))
.withLineageFlags(
flags ->
flags
Expand All @@ -91,6 +96,7 @@ public CompletableFuture<EntityLineageResult> get(DataFetchingEnvironment enviro
count != null ? count : 100,
1,
separateSiblings != null ? input.getSeparateSiblings() : false,
input.getIncludeGhostEntities(),
new HashSet<>());

Set<Urn> restrictedUrns = new HashSet<>();
Expand Down
5 changes: 5 additions & 0 deletions datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,11 @@ input LineageInput {
An optional ending time to filter on
"""
endTimeMillis: Long

"""
If enabled, include entities that do not exist or are soft deleted.
"""
includeGhostEntities: Boolean = false
}

"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ public static LineageSearchResult validateLineageSearchResult(
public static EntityLineageResult validateEntityLineageResult(
@Nonnull OperationContext opContext,
@Nullable final EntityLineageResult entityLineageResult,
@Nonnull final EntityService<?> entityService) {
@Nonnull final EntityService<?> entityService,
boolean includeGhostEntities) {
if (entityLineageResult == null) {
return null;
}
Expand All @@ -223,8 +224,8 @@ public static EntityLineageResult validateEntityLineageResult(
entityLineageResult.getRelationships(),
LineageRelationship::getEntity,
entityService,
true,
false)
!includeGhostEntities,
includeGhostEntities)
.collect(Collectors.toCollection(LineageRelationshipArray::new));

validatedEntityLineageResult.setFiltered(
Expand Down Expand Up @@ -280,6 +281,8 @@ private static <T> Stream<T> validateSearchUrns(
boolean includeSoftDeleted) {

if (enforceSQLExistence) {
// TODO: Always set includeSoftDeleted to true once 0.3.7 OSS merge occurs, as soft deleted
// results will be filtered by graph service
Set<Urn> existingUrns =
entityService.exists(
opContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,29 @@ public EntityLineageResult getLineage(
int offset,
int count,
int maxHops) {
return ValidationUtils.validateEntityLineageResult(
return getLineage(opContext, entityUrn, direction, offset, count, maxHops, false, false);
}

@Nonnull
public EntityLineageResult getLineage(
@Nonnull OperationContext opContext,
@Nonnull Urn entityUrn,
@Nonnull LineageDirection direction,
int offset,
int count,
int maxHops,
boolean separateSiblings,
boolean includeGhostEntities) {
return getLineage(
opContext,
getLineage(opContext, entityUrn, direction, offset, count, maxHops, false, new HashSet<>()),
_entityService);
entityUrn,
direction,
offset,
count,
maxHops,
separateSiblings,
includeGhostEntities,
new HashSet<>());
}

/**
Expand All @@ -60,12 +79,14 @@ public EntityLineageResult getLineage(
int count,
int maxHops,
boolean separateSiblings,
boolean includeGhostEntities,
@Nonnull Set<Urn> visitedUrns) {
if (separateSiblings) {
return ValidationUtils.validateEntityLineageResult(
opContext,
_graphService.getLineage(opContext, entityUrn, direction, offset, count, maxHops),
_entityService);
_entityService,
includeGhostEntities);
}

if (maxHops > 1) {
Expand All @@ -89,7 +110,7 @@ public EntityLineageResult getLineage(
// remove your siblings from your lineage
entityLineage =
filterLineageResultFromSiblings(
opContext, entityUrn, allSiblingsInGroup, entityLineage, null);
opContext, entityUrn, allSiblingsInGroup, entityLineage, null, includeGhostEntities);

// Update offset and count to fetch the correct number of edges from the next sibling node
offset = Math.max(0, offset - entityLineage.getTotal());
Expand All @@ -109,8 +130,17 @@ public EntityLineageResult getLineage(
siblingUrn,
allSiblingsInGroup,
getLineage(
opContext, siblingUrn, direction, offset, count, maxHops, false, visitedUrns),
entityLineage);
opContext,
siblingUrn,
direction,
offset,
count,
maxHops,
false,
includeGhostEntities,
visitedUrns),
entityLineage,
includeGhostEntities);

// Update offset and count to fetch the correct number of edges from the next sibling node
offset = Math.max(0, offset - nextEntityLineage.getTotal());
Expand All @@ -122,7 +152,8 @@ public EntityLineageResult getLineage(
;
}

return ValidationUtils.validateEntityLineageResult(opContext, entityLineage, _entityService);
return ValidationUtils.validateEntityLineageResult(
opContext, entityLineage, _entityService, includeGhostEntities);
}

private int getFiltered(@Nullable EntityLineageResult entityLineageResult) {
Expand All @@ -138,7 +169,8 @@ private EntityLineageResult filterLineageResultFromSiblings(
@Nonnull final Urn urn,
@Nonnull final Set<Urn> allSiblingsInGroup,
@Nonnull final EntityLineageResult entityLineageResult,
@Nullable final EntityLineageResult existingResult) {
@Nullable final EntityLineageResult existingResult,
boolean includeGhostEntities) {
int numFiltered = 0;

// 1) remove the source entities siblings from this entity's downstreams
Expand Down Expand Up @@ -231,6 +263,6 @@ private EntityLineageResult filterLineageResultFromSiblings(
combinedLineageResult.setFiltered(
numFiltered + getFiltered(existingResult) + getFiltered(entityLineageResult));
return ValidationUtils.validateEntityLineageResult(
opContext, combinedLineageResult, _entityService);
opContext, combinedLineageResult, _entityService, includeGhostEntities);
}
}
Loading
Loading