diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java index dea98c5cbcb13..8b33e4e7c2164 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java @@ -12,12 +12,13 @@ import com.linkedin.gms.factory.kafka.common.TopicConventionFactory; import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory; import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory; -import com.linkedin.metadata.aspect.GraphRetriever; import com.linkedin.metadata.config.kafka.KafkaConfiguration; import com.linkedin.metadata.dao.producer.KafkaEventProducer; import com.linkedin.metadata.dao.producer.KafkaHealthChecker; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.EntityServiceAspectRetriever; +import com.linkedin.metadata.graph.GraphService; +import com.linkedin.metadata.graph.SystemGraphRetriever; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.search.SearchService; import com.linkedin.metadata.search.SearchServiceSearchRetriever; @@ -145,7 +146,7 @@ protected OperationContext javaSystemOperationContext( @Nonnull final EntityRegistry entityRegistry, @Nonnull final EntityService entityService, @Nonnull final RestrictedService restrictedService, - @Nonnull final GraphRetriever graphRetriever, + @Nonnull final GraphService graphService, @Nonnull final SearchService searchService, @Qualifier("baseElasticSearchComponents") BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components) { @@ -159,6 +160,9 @@ protected OperationContext javaSystemOperationContext( SearchServiceSearchRetriever searchServiceSearchRetriever = SearchServiceSearchRetriever.builder().searchService(searchService).build(); + SystemGraphRetriever systemGraphRetriever = + SystemGraphRetriever.builder().graphService(graphService).build(); + OperationContext systemOperationContext = OperationContext.asSystem( operationContextConfig, @@ -168,11 +172,12 @@ protected OperationContext javaSystemOperationContext( components.getIndexConvention(), RetrieverContext.builder() .aspectRetriever(entityServiceAspectRetriever) - .graphRetriever(graphRetriever) + .graphRetriever(systemGraphRetriever) .searchRetriever(searchServiceSearchRetriever) .build()); entityServiceAspectRetriever.setSystemOperationContext(systemOperationContext); + systemGraphRetriever.setSystemOperationContext(systemOperationContext); searchServiceSearchRetriever.setSystemOperationContext(systemOperationContext); return systemOperationContext; diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/ReindexDataJobViaNodesCLLConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/graph/ReindexDataJobViaNodesCLLConfig.java similarity index 85% rename from datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/ReindexDataJobViaNodesCLLConfig.java rename to datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/graph/ReindexDataJobViaNodesCLLConfig.java index 4956254062ff9..a973876c6715f 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/ReindexDataJobViaNodesCLLConfig.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/graph/ReindexDataJobViaNodesCLLConfig.java @@ -1,7 +1,8 @@ -package com.linkedin.datahub.upgrade.config; +package com.linkedin.datahub.upgrade.config.graph; +import com.linkedin.datahub.upgrade.config.SystemUpdateCondition; import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade; -import com.linkedin.datahub.upgrade.system.vianodes.ReindexDataJobViaNodesCLL; +import com.linkedin.datahub.upgrade.system.graph.vianodes.ReindexDataJobViaNodesCLL; import com.linkedin.metadata.entity.AspectDao; import com.linkedin.metadata.entity.EntityService; import io.datahubproject.metadata.context.OperationContext; diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/graph/ReindexEdgeStatusConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/graph/ReindexEdgeStatusConfig.java new file mode 100644 index 0000000000000..97715573eb51f --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/graph/ReindexEdgeStatusConfig.java @@ -0,0 +1,31 @@ +package com.linkedin.datahub.upgrade.config.graph; + +import com.linkedin.datahub.upgrade.config.SystemUpdateCondition; +import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade; +import com.linkedin.datahub.upgrade.system.graph.edgestatus.ReindexEdgeStatus; +import com.linkedin.metadata.entity.AspectDao; +import com.linkedin.metadata.entity.EntityService; +import io.datahubproject.metadata.context.OperationContext; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.Configuration; + +@Configuration +@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class) +public class ReindexEdgeStatusConfig { + + @Bean + public NonBlockingSystemUpgrade reindexEdgeStatus( + final OperationContext opContext, + final EntityService entityService, + final AspectDao aspectDao, + @Value("${elasticsearch.search.graph.graphStatusEnabled}") final boolean featureEnabled, + @Value("${systemUpdate.edgeStatus.enabled}") final boolean enabled, + @Value("${systemUpdate.edgeStatus.batchSize}") final Integer batchSize, + @Value("${systemUpdate.edgeStatus.delayMs}") final Integer delayMs, + @Value("${systemUpdate.edgeStatus.limit}") final Integer limit) { + return new ReindexEdgeStatus( + opContext, entityService, aspectDao, featureEnabled && enabled, batchSize, delayMs, limit); + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/graph/edgestatus/ReindexEdgeStatus.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/graph/edgestatus/ReindexEdgeStatus.java new file mode 100644 index 0000000000000..6b7286a6a0639 --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/graph/edgestatus/ReindexEdgeStatus.java @@ -0,0 +1,50 @@ +package com.linkedin.datahub.upgrade.system.graph.edgestatus; + +import com.google.common.collect.ImmutableList; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade; +import com.linkedin.metadata.entity.AspectDao; +import com.linkedin.metadata.entity.EntityService; +import io.datahubproject.metadata.context.OperationContext; +import java.util.List; +import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; + +/** + * A job that reindexes all status aspects as part of the graph edges containing status information. + * This is required to make sure previously written status information is present in the graph + * index. + */ +@Slf4j +public class ReindexEdgeStatus implements NonBlockingSystemUpgrade { + + private final List _steps; + + public ReindexEdgeStatus( + @Nonnull OperationContext opContext, + EntityService entityService, + AspectDao aspectDao, + boolean enabled, + Integer batchSize, + Integer batchDelayMs, + Integer limit) { + if (enabled) { + _steps = + ImmutableList.of( + new ReindexReindexEdgeStatusStep( + opContext, entityService, aspectDao, batchSize, batchDelayMs, limit)); + } else { + _steps = ImmutableList.of(); + } + } + + @Override + public String id() { + return this.getClass().getName(); + } + + @Override + public List steps() { + return _steps; + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/graph/edgestatus/ReindexReindexEdgeStatusStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/graph/edgestatus/ReindexReindexEdgeStatusStep.java new file mode 100644 index 0000000000000..6543f82e74563 --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/graph/edgestatus/ReindexReindexEdgeStatusStep.java @@ -0,0 +1,56 @@ +package com.linkedin.datahub.upgrade.system.graph.edgestatus; + +import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME; + +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.system.AbstractMCLStep; +import com.linkedin.metadata.entity.AspectDao; +import com.linkedin.metadata.entity.EntityService; +import io.datahubproject.metadata.context.OperationContext; +import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.Nullable; + +@Slf4j +public class ReindexReindexEdgeStatusStep extends AbstractMCLStep { + + public ReindexReindexEdgeStatusStep( + OperationContext opContext, + EntityService entityService, + AspectDao aspectDao, + Integer batchSize, + Integer batchDelayMs, + Integer limit) { + super(opContext, entityService, aspectDao, batchSize, batchDelayMs, limit); + } + + @Override + public String id() { + return "edge-status-reindex-v1"; + } + + @Nonnull + @Override + protected String getAspectName() { + return STATUS_ASPECT_NAME; + } + + @Nullable + @Override + protected String getUrnLike() { + return null; + } + + @Override + /** + * Returns whether the upgrade should be skipped. Uses previous run history or the environment + * variable to determine whether to skip. + */ + public boolean skip(UpgradeContext context) { + boolean envFlagRecommendsSkip = Boolean.parseBoolean(System.getenv("SKIP_REINDEX_EDGE_STATUS")); + if (envFlagRecommendsSkip) { + log.info("Environment variable SKIP_REINDEX_EDGE_STATUS is set to true. Skipping."); + } + return (super.skip(context) || envFlagRecommendsSkip); + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/vianodes/ReindexDataJobViaNodesCLL.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/graph/vianodes/ReindexDataJobViaNodesCLL.java similarity index 95% rename from datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/vianodes/ReindexDataJobViaNodesCLL.java rename to datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/graph/vianodes/ReindexDataJobViaNodesCLL.java index fc0b44f57ab49..7a4ca9586f155 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/vianodes/ReindexDataJobViaNodesCLL.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/graph/vianodes/ReindexDataJobViaNodesCLL.java @@ -1,4 +1,4 @@ -package com.linkedin.datahub.upgrade.system.vianodes; +package com.linkedin.datahub.upgrade.system.graph.vianodes; import com.google.common.collect.ImmutableList; import com.linkedin.datahub.upgrade.UpgradeStep; diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/vianodes/ReindexDataJobViaNodesCLLStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/graph/vianodes/ReindexDataJobViaNodesCLLStep.java similarity index 96% rename from datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/vianodes/ReindexDataJobViaNodesCLLStep.java rename to datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/graph/vianodes/ReindexDataJobViaNodesCLLStep.java index cf580670ee3a9..e3e07f99bb1ee 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/vianodes/ReindexDataJobViaNodesCLLStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/graph/vianodes/ReindexDataJobViaNodesCLLStep.java @@ -1,4 +1,4 @@ -package com.linkedin.datahub.upgrade.system.vianodes; +package com.linkedin.datahub.upgrade.system.graph.vianodes; import static com.linkedin.metadata.Constants.*; diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNonBlockingTest.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNonBlockingTest.java index 55a52f072a0ca..df27d33f3a117 100644 --- a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNonBlockingTest.java +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNonBlockingTest.java @@ -9,7 +9,7 @@ import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager; import com.linkedin.datahub.upgrade.system.SystemUpdateNonBlocking; -import com.linkedin.datahub.upgrade.system.vianodes.ReindexDataJobViaNodesCLL; +import com.linkedin.datahub.upgrade.system.graph.vianodes.ReindexDataJobViaNodesCLL; import com.linkedin.metadata.boot.kafka.MockSystemUpdateDeserializer; import com.linkedin.metadata.boot.kafka.MockSystemUpdateSerializer; import com.linkedin.metadata.config.kafka.KafkaConfiguration; diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/models/graph/Edge.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/models/graph/Edge.java index 8777be57e1bd8..e999471488dd7 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/models/graph/Edge.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/models/graph/Edge.java @@ -14,6 +14,7 @@ import java.util.Optional; import java.util.stream.Collectors; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; @@ -36,6 +37,10 @@ public class Edge { @EqualsAndHashCode.Include private Urn lifecycleOwner; // An entity through which the edge between source and destination is created @EqualsAndHashCode.Include private Urn via; + @EqualsAndHashCode.Exclude @Nullable private Boolean sourceStatus; + @EqualsAndHashCode.Exclude @Nullable private Boolean destinationStatus; + @EqualsAndHashCode.Exclude @Nullable private Boolean viaStatus; + @EqualsAndHashCode.Exclude @Nullable private Boolean lifecycleOwnerStatus; // For backwards compatibility public Edge( @@ -57,6 +62,38 @@ public Edge( updatedActor, properties, null, + null, + null, + null, + null, + null); + } + + public Edge( + Urn source, + Urn destination, + String relationshipType, + Long createdOn, + Urn createdActor, + Long updatedOn, + Urn updatedActor, + Map properties, + Urn lifecycleOwner, + Urn via) { + this( + source, + destination, + relationshipType, + createdOn, + createdActor, + updatedOn, + updatedActor, + properties, + lifecycleOwner, + via, + null, + null, + null, null); } @@ -91,6 +128,10 @@ public String toDocId(@Nonnull String idHashAlgo) { public static final String EDGE_FIELD_LIFECYCLE_OWNER = "lifecycleOwner"; public static final String EDGE_SOURCE_URN_FIELD = "source.urn"; public static final String EDGE_DESTINATION_URN_FIELD = "destination.urn"; + public static final String EDGE_SOURCE_STATUS = "source.removed"; + public static final String EDGE_DESTINATION_STATUS = "destination.removed"; + public static final String EDGE_FIELD_VIA_STATUS = "viaRemoved"; + public static final String EDGE_FIELD_LIFECYCLE_OWNER_STATUS = "lifecycleOwnerRemoved"; public static final List> KEY_SORTS = ImmutableList.of( diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/models/graph/EdgeUrnType.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/models/graph/EdgeUrnType.java new file mode 100644 index 0000000000000..2fc2f4b588e8b --- /dev/null +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/models/graph/EdgeUrnType.java @@ -0,0 +1,8 @@ +package com.linkedin.metadata.aspect.models.graph; + +public enum EdgeUrnType { + SOURCE, + DESTINATION, + VIA, + LIFECYCLE_OWNER +} diff --git a/metadata-auth/auth-api/src/main/java/com/datahub/authentication/Authentication.java b/metadata-auth/auth-api/src/main/java/com/datahub/authentication/Authentication.java index b53d868e6e878..7583a4efd6425 100644 --- a/metadata-auth/auth-api/src/main/java/com/datahub/authentication/Authentication.java +++ b/metadata-auth/auth-api/src/main/java/com/datahub/authentication/Authentication.java @@ -4,8 +4,10 @@ import java.util.Map; import java.util.Objects; import javax.annotation.Nonnull; +import lombok.EqualsAndHashCode; /** Class representing an authenticated actor accessing DataHub. */ +@EqualsAndHashCode public class Authentication { private final Actor authenticatedActor; diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/JavaGraphClient.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/JavaGraphClient.java index c54ba4a222b73..12c59324e3f7c 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/JavaGraphClient.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/JavaGraphClient.java @@ -9,6 +9,7 @@ import com.linkedin.common.urn.UrnUtils; import com.linkedin.metadata.query.filter.RelationshipDirection; import com.linkedin.metadata.search.utils.QueryUtils; +import io.datahubproject.metadata.context.OperationContext; import java.net.URISyntaxException; import java.util.List; import java.util.stream.Collectors; @@ -19,10 +20,13 @@ @Slf4j public class JavaGraphClient implements GraphClient { - GraphService _graphService; + private final OperationContext systemOpContext; + private final GraphService graphService; - public JavaGraphClient(@Nonnull GraphService graphService) { - this._graphService = graphService; + public JavaGraphClient( + @Nonnull OperationContext systemOpContext, @Nonnull GraphService graphService) { + this.systemOpContext = systemOpContext; + this.graphService = graphService; } /** @@ -43,7 +47,8 @@ public EntityRelationships getRelatedEntities( count = count == null ? DEFAULT_PAGE_SIZE : count; RelatedEntitiesResult relatedEntitiesResult = - _graphService.findRelatedEntities( + graphService.findRelatedEntities( + systemOpContext, null, QueryUtils.newFilter("urn", rawUrn), null, @@ -91,7 +96,8 @@ public EntityLineageResult getLineageEntities( @Nullable Integer count, int maxHops, String actor) { - return _graphService.getLineage( + return graphService.getLineage( + systemOpContext, UrnUtils.getUrn(rawUrn), direction, start != null ? start : 0, diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/SiblingGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/SiblingGraphService.java index 0dff287080842..f9287ab34cf19 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/SiblingGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/SiblingGraphService.java @@ -64,13 +64,7 @@ public EntityLineageResult getLineage( if (separateSiblings) { return ValidationUtils.validateEntityLineageResult( opContext, - _graphService.getLineage( - entityUrn, - direction, - offset, - count, - maxHops, - opContext.getSearchContext().getLineageFlags()), + _graphService.getLineage(opContext, entityUrn, direction, offset, count, maxHops), _entityService); } @@ -81,13 +75,7 @@ public EntityLineageResult getLineage( } EntityLineageResult entityLineage = - _graphService.getLineage( - entityUrn, - direction, - offset, - count, - maxHops, - opContext.getSearchContext().getLineageFlags()); + _graphService.getLineage(opContext, entityUrn, direction, offset, count, maxHops); Siblings siblingAspectOfEntity = (Siblings) _entityService.getLatestAspect(opContext, entityUrn, SIBLINGS_ASPECT_NAME); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/SystemGraphRetriever.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/SystemGraphRetriever.java new file mode 100644 index 0000000000000..33cb1a7130f14 --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/SystemGraphRetriever.java @@ -0,0 +1,48 @@ +package com.linkedin.metadata.graph; + +import com.linkedin.metadata.aspect.GraphRetriever; +import com.linkedin.metadata.aspect.models.graph.RelatedEntitiesScrollResult; +import com.linkedin.metadata.query.filter.Filter; +import com.linkedin.metadata.query.filter.RelationshipFilter; +import com.linkedin.metadata.query.filter.SortCriterion; +import io.datahubproject.metadata.context.OperationContext; +import java.util.List; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.Builder; +import lombok.Setter; + +@Builder +public class SystemGraphRetriever implements GraphRetriever { + @Setter private OperationContext systemOperationContext; + @Nonnull private final GraphService graphService; + + @Nonnull + @Override + public RelatedEntitiesScrollResult scrollRelatedEntities( + @Nullable List sourceTypes, + @Nonnull Filter sourceEntityFilter, + @Nullable List destinationTypes, + @Nonnull Filter destinationEntityFilter, + @Nonnull List relationshipTypes, + @Nonnull RelationshipFilter relationshipFilter, + @Nonnull List sortCriteria, + @Nullable String scrollId, + int count, + @Nullable Long startTimeMillis, + @Nullable Long endTimeMillis) { + return graphService.scrollRelatedEntities( + systemOperationContext, + sourceTypes, + sourceEntityFilter, + destinationTypes, + destinationEntityFilter, + relationshipTypes, + relationshipFilter, + sortCriteria, + scrollId, + count, + startTimeMillis, + endTimeMillis); + } +} diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/dgraph/DgraphGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/dgraph/DgraphGraphService.java index 6703e07bfd915..352e89baefc25 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/dgraph/DgraphGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/dgraph/DgraphGraphService.java @@ -19,6 +19,7 @@ import com.linkedin.metadata.query.filter.RelationshipDirection; import com.linkedin.metadata.query.filter.RelationshipFilter; import com.linkedin.metadata.query.filter.SortCriterion; +import io.datahubproject.metadata.context.OperationContext; import io.dgraph.DgraphClient; import io.dgraph.DgraphProto.Mutation; import io.dgraph.DgraphProto.NQuad; @@ -453,6 +454,7 @@ public void removeEdge(final Edge edge) { @Nonnull @Override public RelatedEntitiesResult findRelatedEntities( + @Nonnull final OperationContext opContext, @Nullable List sourceTypes, @Nonnull Filter sourceEntityFilter, @Nullable List destinationTypes, @@ -662,7 +664,7 @@ protected static List getRelatedEntitiesFromResponseData( } @Override - public void removeNode(@Nonnull Urn urn) { + public void removeNode(@Nonnull final OperationContext opContext, @Nonnull Urn urn) { String query = String.format("query {\n" + " node as var(func: eq(urn, \"%s\"))\n" + "}", urn); String deletion = "uid(node) * * ."; @@ -679,6 +681,7 @@ public void removeNode(@Nonnull Urn urn) { @Override public void removeEdgesFromNode( + @Nonnull final OperationContext opContext, @Nonnull Urn urn, @Nonnull List relationshipTypes, @Nonnull RelationshipFilter relationshipFilter) { @@ -782,6 +785,7 @@ public void clear() { @Nonnull @Override public RelatedEntitiesScrollResult scrollRelatedEntities( + @Nonnull OperationContext opContext, @Nullable List sourceTypes, @Nonnull Filter sourceEntityFilter, @Nullable List destinationTypes, diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java index 50e5aa6ba893d..40fa79a0ef171 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java @@ -14,6 +14,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.template.IntegerArray; +import com.linkedin.metadata.aspect.models.graph.EdgeUrnType; import com.linkedin.metadata.config.search.GraphQueryConfiguration; import com.linkedin.metadata.graph.GraphFilters; import com.linkedin.metadata.graph.LineageDirection; @@ -34,14 +35,17 @@ import com.linkedin.metadata.utils.DataPlatformInstanceUtils; import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import com.linkedin.metadata.utils.metrics.MetricUtils; +import io.datahubproject.metadata.context.OperationContext; import io.opentelemetry.extension.annotations.WithSpan; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -105,8 +109,7 @@ public class ESGraphQueryDAO { static final String GROUP_BY_DESTINATION_AGG = "group_by_destination"; static final String TOP_DOCUMENTS_AGG = "top_documents"; - @Nonnull - public static void addFilterToQueryBuilder( + private static void addFilterToQueryBuilder( @Nonnull Filter filter, @Nullable String node, BoolQueryBuilder rootQuery) { BoolQueryBuilder orQuery = new BoolQueryBuilder(); for (ConjunctiveCriterion conjunction : filter.getOr()) { @@ -231,7 +234,7 @@ private SearchResponse executeGroupByLineageSearchQuery( } } - private BoolQueryBuilder getAggregationFilter( + private static BoolQueryBuilder getAggregationFilter( Pair pair, RelationshipDirection direction) { BoolQueryBuilder subFilter = QueryBuilders.boolQuery(); TermQueryBuilder relationshipTypeTerm = @@ -258,6 +261,7 @@ private BoolQueryBuilder getAggregationFilter( } public SearchResponse getSearchResponse( + @Nonnull final OperationContext opContext, @Nullable final List sourceTypes, @Nonnull final Filter sourceEntityFilter, @Nullable final List destinationTypes, @@ -268,6 +272,8 @@ public SearchResponse getSearchResponse( final int count) { BoolQueryBuilder finalQuery = buildQuery( + opContext, + graphQueryConfiguration, sourceTypes, sourceEntityFilter, destinationTypes, @@ -279,6 +285,8 @@ public SearchResponse getSearchResponse( } public static BoolQueryBuilder buildQuery( + @Nonnull final OperationContext opContext, + @Nonnull final GraphQueryConfiguration graphQueryConfiguration, @Nullable final List sourceTypes, @Nullable final Filter sourceEntityFilter, @Nullable final List destinationTypes, @@ -286,6 +294,8 @@ public static BoolQueryBuilder buildQuery( @Nonnull final List relationshipTypes, @Nonnull final RelationshipFilter relationshipFilter) { return buildQuery( + opContext, + graphQueryConfiguration, sourceTypes, sourceEntityFilter, destinationTypes, @@ -296,6 +306,8 @@ public static BoolQueryBuilder buildQuery( } public static BoolQueryBuilder buildQuery( + @Nonnull final OperationContext opContext, + @Nonnull final GraphQueryConfiguration graphQueryConfiguration, @Nullable final List sourceTypes, @Nonnull final Filter sourceEntityFilter, @Nullable final List destinationTypes, @@ -345,19 +357,23 @@ public static BoolQueryBuilder buildQuery( if (lifecycleOwner != null) { finalQuery.filter(QueryBuilders.termQuery(EDGE_FIELD_LIFECYCLE_OWNER, lifecycleOwner)); } + if (!Optional.ofNullable(opContext.getSearchContext().getSearchFlags().isIncludeSoftDeleted()) + .orElse(false)) { + applyExcludeSoftDelete(graphQueryConfiguration, finalQuery); + } return finalQuery; } @WithSpan public LineageResponse getLineage( + @Nonnull final OperationContext opContext, @Nonnull Urn entityUrn, @Nonnull LineageDirection direction, GraphFilters graphFilters, int offset, int count, - int maxHops, - @Nullable LineageFlags lineageFlags) { + int maxHops) { Map result = new HashMap<>(); long currentTime = System.currentTimeMillis(); long remainingTime = graphQueryConfiguration.getTimeoutSeconds() * 1000; @@ -388,6 +404,7 @@ public LineageResponse getLineage( // Do one hop on the lineage graph Stream intermediateStream = processOneHopLineage( + opContext, currentLevel, remainingTime, direction, @@ -398,7 +415,6 @@ public LineageResponse getLineage( existingPaths, exploreMultiplePaths, result, - lineageFlags, i); currentLevel = intermediateStream.collect(Collectors.toList()); currentTime = System.currentTimeMillis(); @@ -421,6 +437,7 @@ public LineageResponse getLineage( } private Stream processOneHopLineage( + @Nonnull OperationContext opContext, List currentLevel, Long remainingTime, LineageDirection direction, @@ -431,7 +448,6 @@ private Stream processOneHopLineage( Map existingPaths, boolean exploreMultiplePaths, Map result, - LineageFlags lineageFlags, int i) { // Do one hop on the lineage graph @@ -439,6 +455,7 @@ private Stream processOneHopLineage( int remainingHops = maxHops - numHops; List oneHopRelationships = getLineageRelationshipsInBatches( + opContext, currentLevel, direction, graphFilters, @@ -448,8 +465,10 @@ private Stream processOneHopLineage( remainingHops, remainingTime, existingPaths, - exploreMultiplePaths, - lineageFlags); + exploreMultiplePaths); + + final LineageFlags lineageFlags = opContext.getSearchContext().getLineageFlags(); + for (LineageRelationship oneHopRelnship : oneHopRelationships) { if (result.containsKey(oneHopRelnship.getEntity())) { log.debug("Urn encountered again during graph walk {}", oneHopRelnship.getEntity()); @@ -487,6 +506,7 @@ private Stream processOneHopLineage( if (!additionalCurrentLevel.isEmpty()) { Stream ignoreAsHopUrns = processOneHopLineage( + opContext, additionalCurrentLevel, remainingTime, direction, @@ -497,7 +517,6 @@ private Stream processOneHopLineage( existingPaths, exploreMultiplePaths, result, - lineageFlags, i); intermediateStream = Stream.concat(intermediateStream, ignoreAsHopUrns); } @@ -560,6 +579,7 @@ private LineageRelationship mergeLineageRelationships( // Get 1-hop lineage relationships asynchronously in batches with timeout @WithSpan public List getLineageRelationshipsInBatches( + @Nonnull final OperationContext opContext, @Nonnull List entityUrns, @Nonnull LineageDirection direction, GraphFilters graphFilters, @@ -569,8 +589,7 @@ public List getLineageRelationshipsInBatches( int remainingHops, long remainingTime, Map existingPaths, - boolean exploreMultiplePaths, - @Nullable LineageFlags lineageFlags) { + boolean exploreMultiplePaths) { List> batches = Lists.partition(entityUrns, graphQueryConfiguration.getBatchSize()); return ConcurrencyUtils.getAllCompleted( batches.stream() @@ -579,6 +598,7 @@ public List getLineageRelationshipsInBatches( CompletableFuture.supplyAsync( () -> getLineageRelationships( + opContext, batchUrns, direction, graphFilters, @@ -587,8 +607,7 @@ public List getLineageRelationshipsInBatches( numHops, remainingHops, existingPaths, - exploreMultiplePaths, - lineageFlags))) + exploreMultiplePaths))) .collect(Collectors.toList()), remainingTime, TimeUnit.MILLISECONDS) @@ -600,6 +619,7 @@ public List getLineageRelationshipsInBatches( // Get 1-hop lineage relationships @WithSpan private List getLineageRelationships( + @Nonnull final OperationContext opContext, @Nonnull List entityUrns, @Nonnull LineageDirection direction, GraphFilters graphFilters, @@ -608,8 +628,8 @@ private List getLineageRelationships( int numHops, int remainingHops, Map existingPaths, - boolean exploreMultiplePaths, - @Nullable LineageFlags lineageFlags) { + boolean exploreMultiplePaths) { + final LineageFlags lineageFlags = opContext.getSearchContext().getLineageFlags(); Map> urnsPerEntityType = entityUrns.stream().collect(Collectors.groupingBy(Urn::getEntityType)); Map> edgesPerEntityType = @@ -628,7 +648,7 @@ private List getLineageRelationships( .collect(Collectors.toSet()); QueryBuilder finalQuery = - getLineageQuery(urnsPerEntityType, edgesPerEntityType, graphFilters, lineageFlags); + getLineageQuery(opContext, urnsPerEntityType, edgesPerEntityType, graphFilters); SearchResponse response; if (lineageFlags != null && lineageFlags.getEntitiesExploredPerHopLimit() != null) { response = @@ -660,11 +680,12 @@ private List getLineageRelationships( } @VisibleForTesting - public QueryBuilder getLineageQuery( + public static QueryBuilder getLineageQuery( + @Nonnull OperationContext opContext, @Nonnull Map> urnsPerEntityType, @Nonnull Map> edgesPerEntityType, - @Nonnull GraphFilters graphFilters, - @Nullable LineageFlags lineageFlags) { + @Nonnull GraphFilters graphFilters) { + final LineageFlags lineageFlags = opContext.getSearchContext().getLineageFlags(); BoolQueryBuilder entityTypeQueries = QueryBuilders.boolQuery(); // Get all relation types relevant to the set of urns to hop from urnsPerEntityType.forEach( @@ -690,7 +711,7 @@ public QueryBuilder getLineageQuery( && lineageFlags.getStartTimeMillis() != null && lineageFlags.getEndTimeMillis() != null) { finalQuery.filter( - TimeFilterUtils.getEdgeTimeFilterQuery( + GraphFilterUtils.getEdgeTimeFilterQuery( lineageFlags.getStartTimeMillis(), lineageFlags.getEndTimeMillis())); } else { log.debug("Empty time filter range provided. Skipping application of time filters"); @@ -700,7 +721,7 @@ public QueryBuilder getLineageQuery( } @VisibleForTesting - public QueryBuilder getLineageQueryForEntityType( + static QueryBuilder getLineageQueryForEntityType( @Nonnull List urns, @Nonnull List lineageEdges, @Nonnull GraphFilters graphFilters) { @@ -769,7 +790,7 @@ private void addViaNodeBoostQuery(final SearchSourceBuilder sourceBuilder) { * the Graph Store. */ @VisibleForTesting - public static void addEdgeToPaths( + static void addEdgeToPaths( @Nonnull final Map existingPaths, @Nonnull final Urn parentUrn, @Nonnull final Urn childUrn) { @@ -782,7 +803,7 @@ private static boolean containsCycle(final UrnArray path) { return (path.size() != urnSet.size()); } - public static boolean addEdgeToPaths( + static boolean addEdgeToPaths( @Nonnull final Map existingPaths, @Nonnull final Urn parentUrn, final Urn viaUrn, @@ -1317,6 +1338,7 @@ public static class LineageResponse { } public SearchResponse getSearchResponse( + @Nonnull final OperationContext opContext, @Nullable final List sourceTypes, @Nullable final Filter sourceEntityFilter, @Nullable final List destinationTypes, @@ -1329,6 +1351,8 @@ public SearchResponse getSearchResponse( BoolQueryBuilder finalQuery = buildQuery( + opContext, + graphQueryConfiguration, sourceTypes, sourceEntityFilter, destinationTypes, @@ -1371,4 +1395,17 @@ private SearchResponse executeScrollSearchQuery( throw new ESQueryException("Search query failed:", e); } } + + private static void applyExcludeSoftDelete( + GraphQueryConfiguration graphQueryConfiguration, BoolQueryBuilder boolQueryBuilder) { + if (graphQueryConfiguration.isGraphStatusEnabled()) { + Arrays.stream(EdgeUrnType.values()) + .map( + edgeUrnType -> + QueryBuilders.termsQuery( + GraphFilterUtils.getUrnStatusFieldName(edgeUrnType), "true")) + .filter(statusQuery -> !boolQueryBuilder.mustNot().contains(statusQuery)) + .forEach(boolQueryBuilder::mustNot); + } + } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAO.java index ddbd00f90ef68..ba481bdfa109f 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAO.java @@ -4,10 +4,12 @@ import static com.linkedin.metadata.graph.elastic.ElasticSearchGraphService.INDEX_NAME; import com.google.common.collect.ImmutableList; +import com.linkedin.metadata.config.search.GraphQueryConfiguration; import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.query.filter.RelationshipFilter; import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor; import com.linkedin.metadata.utils.elasticsearch.IndexConvention; +import io.datahubproject.metadata.context.OperationContext; import java.util.List; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -17,7 +19,9 @@ import org.opensearch.action.update.UpdateRequest; import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.reindex.BulkByScrollResponse; +import org.opensearch.script.Script; @Slf4j @RequiredArgsConstructor @@ -25,8 +29,7 @@ public class ESGraphWriteDAO { private final IndexConvention indexConvention; private final ESBulkProcessor bulkProcessor; private final int numRetries; - - private static final String ES_WRITES_METRIC = "num_elasticSearch_writes"; + private final GraphQueryConfiguration graphQueryConfiguration; /** * Updates or inserts the given search document. @@ -56,6 +59,7 @@ public void deleteDocument(@Nonnull String docId) { } public BulkByScrollResponse deleteByQuery( + @Nonnull final OperationContext opContext, @Nullable final String sourceType, @Nonnull final Filter sourceEntityFilter, @Nullable final String destinationType, @@ -64,6 +68,8 @@ public BulkByScrollResponse deleteByQuery( @Nonnull final RelationshipFilter relationshipFilter) { BoolQueryBuilder finalQuery = buildQuery( + opContext, + graphQueryConfiguration, sourceType == null ? ImmutableList.of() : ImmutableList.of(sourceType), sourceEntityFilter, destinationType == null ? ImmutableList.of() : ImmutableList.of(destinationType), @@ -77,6 +83,7 @@ public BulkByScrollResponse deleteByQuery( } public BulkByScrollResponse deleteByQuery( + @Nonnull final OperationContext opContext, @Nullable final String sourceType, @Nonnull final Filter sourceEntityFilter, @Nullable final String destinationType, @@ -86,6 +93,8 @@ public BulkByScrollResponse deleteByQuery( String lifecycleOwner) { BoolQueryBuilder finalQuery = buildQuery( + opContext, + graphQueryConfiguration, sourceType == null ? ImmutableList.of() : ImmutableList.of(sourceType), sourceEntityFilter, destinationType == null ? ImmutableList.of() : ImmutableList.of(destinationType), @@ -98,4 +107,12 @@ public BulkByScrollResponse deleteByQuery( .deleteByQuery(finalQuery, indexConvention.getIndexName(INDEX_NAME)) .orElse(null); } + + @Nullable + public BulkByScrollResponse updateByQuery( + @Nonnull Script script, @Nonnull final QueryBuilder query) { + return bulkProcessor + .updateByQuery(script, query, indexConvention.getIndexName(INDEX_NAME)) + .orElse(null); + } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java index e1532ea4e26c0..1769c53e4cd9b 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java @@ -1,12 +1,16 @@ package com.linkedin.metadata.graph.elastic; import static com.linkedin.metadata.aspect.models.graph.Edge.*; +import static com.linkedin.metadata.graph.elastic.GraphFilterUtils.getUrnStatusFieldName; +import static com.linkedin.metadata.graph.elastic.GraphFilterUtils.getUrnStatusQuery; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableList; import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.StringArray; import com.linkedin.metadata.aspect.models.graph.Edge; +import com.linkedin.metadata.aspect.models.graph.EdgeUrnType; import com.linkedin.metadata.aspect.models.graph.RelatedEntities; import com.linkedin.metadata.aspect.models.graph.RelatedEntitiesScrollResult; import com.linkedin.metadata.aspect.models.graph.RelatedEntity; @@ -17,7 +21,6 @@ import com.linkedin.metadata.graph.LineageRelationshipArray; import com.linkedin.metadata.graph.RelatedEntitiesResult; import com.linkedin.metadata.models.registry.LineageRegistry; -import com.linkedin.metadata.query.LineageFlags; import com.linkedin.metadata.query.filter.Condition; import com.linkedin.metadata.query.filter.ConjunctiveCriterion; import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray; @@ -35,6 +38,7 @@ import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import com.linkedin.structured.StructuredPropertyDefinition; import com.linkedin.util.Pair; +import io.datahubproject.metadata.context.OperationContext; import io.opentelemetry.extension.annotations.WithSpan; import java.io.IOException; import java.util.ArrayList; @@ -51,13 +55,15 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.opensearch.action.search.SearchResponse; +import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.script.Script; +import org.opensearch.script.ScriptType; import org.opensearch.search.SearchHit; @Slf4j @RequiredArgsConstructor public class ElasticSearchGraphService implements GraphService, ElasticSearchIndexed { - private final LineageRegistry _lineageRegistry; private final ESBulkProcessor _esBulkProcessor; private final IndexConvention _indexConvention; @@ -68,7 +74,7 @@ public class ElasticSearchGraphService implements GraphService, ElasticSearchInd public static final String INDEX_NAME = "graph_service_v1"; private static final Map EMPTY_HASH = new HashMap<>(); - private String toDocument(@Nonnull final Edge edge) { + private static String toDocument(@Nonnull final Edge edge) { final ObjectNode searchDocument = JsonNodeFactory.instance.objectNode(); final ObjectNode sourceObject = JsonNodeFactory.instance.objectNode(); @@ -114,6 +120,18 @@ private String toDocument(@Nonnull final Edge edge) { if (edge.getVia() != null) { searchDocument.put(EDGE_FIELD_VIA, edge.getVia().toString()); } + if (edge.getViaStatus() != null) { + searchDocument.put(EDGE_FIELD_VIA_STATUS, edge.getViaStatus()); + } + if (edge.getLifecycleOwnerStatus() != null) { + searchDocument.put(EDGE_FIELD_LIFECYCLE_OWNER_STATUS, edge.getLifecycleOwnerStatus()); + } + if (edge.getSourceStatus() != null) { + searchDocument.put(EDGE_SOURCE_STATUS, edge.getSourceStatus()); + } + if (edge.getDestinationStatus() != null) { + searchDocument.put(EDGE_DESTINATION_STATUS, edge.getDestinationStatus()); + } log.debug("Search doc for write {}", searchDocument); return searchDocument.toString(); @@ -142,8 +160,10 @@ public void removeEdge(@Nonnull final Edge edge) { _graphWriteDAO.deleteDocument(docId); } + @Override @Nonnull public RelatedEntitiesResult findRelatedEntities( + @Nonnull final OperationContext opContext, @Nullable final List sourceTypes, @Nonnull final Filter sourceEntityFilter, @Nullable final List destinationTypes, @@ -161,6 +181,7 @@ public RelatedEntitiesResult findRelatedEntities( SearchResponse response = _graphReadDAO.getSearchResponse( + opContext, sourceTypes, sourceEntityFilter, destinationTypes, @@ -188,35 +209,16 @@ public RelatedEntitiesResult findRelatedEntities( @Override @Deprecated public EntityLineageResult getLineage( + @Nonnull final OperationContext opContext, @Nonnull Urn entityUrn, @Nonnull LineageDirection direction, GraphFilters graphFilters, int offset, int count, int maxHops) { - ESGraphQueryDAO.LineageResponse lineageResponse = - _graphReadDAO.getLineage(entityUrn, direction, graphFilters, offset, count, maxHops, null); - return new EntityLineageResult() - .setRelationships(new LineageRelationshipArray(lineageResponse.getLineageRelationships())) - .setStart(offset) - .setCount(count) - .setTotal(lineageResponse.getTotal()); - } - - @Nonnull - @WithSpan - @Override - public EntityLineageResult getLineage( - @Nonnull Urn entityUrn, - @Nonnull LineageDirection direction, - GraphFilters graphFilters, - int offset, - int count, - int maxHops, - @Nullable LineageFlags lineageFlags) { ESGraphQueryDAO.LineageResponse lineageResponse = _graphReadDAO.getLineage( - entityUrn, direction, graphFilters, offset, count, maxHops, lineageFlags); + opContext, entityUrn, direction, graphFilters, offset, count, maxHops); return new EntityLineageResult() .setRelationships(new LineageRelationshipArray(lineageResponse.getLineageRelationships())) .setStart(offset) @@ -224,13 +226,14 @@ public EntityLineageResult getLineage( .setTotal(lineageResponse.getTotal()); } - private Filter createUrnFilter(@Nonnull final Urn urn) { + private static Filter createUrnFilter(@Nonnull final Urn urn) { Filter filter = new Filter(); CriterionArray criterionArray = new CriterionArray(); Criterion criterion = new Criterion(); criterion.setCondition(Condition.EQUAL); criterion.setField("urn"); criterion.setValue(urn.toString()); + criterion.setValues(new StringArray(urn.toString())); criterionArray.add(criterion); filter.setOr( new ConjunctiveCriterionArray( @@ -239,7 +242,7 @@ private Filter createUrnFilter(@Nonnull final Urn urn) { return filter; } - public void removeNode(@Nonnull final Urn urn) { + public void removeNode(@Nonnull final OperationContext opContext, @Nonnull final Urn urn) { Filter urnFilter = createUrnFilter(urn); Filter emptyFilter = new Filter().setOr(new ConjunctiveCriterionArray()); List relationshipTypes = new ArrayList<>(); @@ -250,19 +253,47 @@ public void removeNode(@Nonnull final Urn urn) { new RelationshipFilter().setDirection(RelationshipDirection.INCOMING); _graphWriteDAO.deleteByQuery( - null, urnFilter, null, emptyFilter, relationshipTypes, outgoingFilter); + opContext, null, urnFilter, null, emptyFilter, relationshipTypes, outgoingFilter); _graphWriteDAO.deleteByQuery( - null, urnFilter, null, emptyFilter, relationshipTypes, incomingFilter); + opContext, null, urnFilter, null, emptyFilter, relationshipTypes, incomingFilter); // Delete all edges where this entity is a lifecycle owner _graphWriteDAO.deleteByQuery( - null, emptyFilter, null, emptyFilter, relationshipTypes, incomingFilter, urn.toString()); + opContext, + null, + emptyFilter, + null, + emptyFilter, + relationshipTypes, + incomingFilter, + urn.toString()); + } - return; + @Override + public void setEdgeStatus( + @Nonnull Urn urn, boolean removed, @Nonnull EdgeUrnType... edgeUrnTypes) { + + for (EdgeUrnType edgeUrnType : edgeUrnTypes) { + // Update the graph status fields per urn type which do not match target state + QueryBuilder negativeQuery = getUrnStatusQuery(edgeUrnType, urn, !removed); + + // Set up the script to update the boolean field + String scriptContent = + "ctx._source." + getUrnStatusFieldName(edgeUrnType) + " = params.newValue"; + Script script = + new Script( + ScriptType.INLINE, + "painless", + scriptContent, + Collections.singletonMap("newValue", removed)); + + _graphWriteDAO.updateByQuery(script, negativeQuery); + } } public void removeEdgesFromNode( + @Nonnull final OperationContext opContext, @Nonnull final Urn urn, @Nonnull final List relationshipTypes, @Nonnull final RelationshipFilter relationshipFilter) { @@ -271,7 +302,7 @@ public void removeEdgesFromNode( Filter emptyFilter = new Filter().setOr(new ConjunctiveCriterionArray()); _graphWriteDAO.deleteByQuery( - null, urnFilter, null, emptyFilter, relationshipTypes, relationshipFilter); + opContext, null, urnFilter, null, emptyFilter, relationshipTypes, relationshipFilter); } @Override @@ -308,8 +339,8 @@ public boolean supportsMultiHop() { } @Nonnull - @Override public RelatedEntitiesScrollResult scrollRelatedEntities( + @Nonnull final OperationContext opContext, @Nullable List sourceTypes, @Nullable Filter sourceEntityFilter, @Nullable List destinationTypes, @@ -326,6 +357,7 @@ public RelatedEntitiesScrollResult scrollRelatedEntities( SearchResponse response = _graphReadDAO.getSearchResponse( + opContext, sourceTypes, sourceEntityFilter, destinationTypes, diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/TimeFilterUtils.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphFilterUtils.java similarity index 67% rename from metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/TimeFilterUtils.java rename to metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphFilterUtils.java index 7ee84ce834cfa..982bcae9b5fd9 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/TimeFilterUtils.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphFilterUtils.java @@ -1,14 +1,81 @@ package com.linkedin.metadata.graph.elastic; +import static com.linkedin.metadata.aspect.models.graph.Edge.EDGE_DESTINATION_STATUS; +import static com.linkedin.metadata.aspect.models.graph.Edge.EDGE_DESTINATION_URN_FIELD; +import static com.linkedin.metadata.aspect.models.graph.Edge.EDGE_FIELD_LIFECYCLE_OWNER; +import static com.linkedin.metadata.aspect.models.graph.Edge.EDGE_FIELD_LIFECYCLE_OWNER_STATUS; +import static com.linkedin.metadata.aspect.models.graph.Edge.EDGE_FIELD_VIA; +import static com.linkedin.metadata.aspect.models.graph.Edge.EDGE_FIELD_VIA_STATUS; +import static com.linkedin.metadata.aspect.models.graph.Edge.EDGE_SOURCE_STATUS; +import static com.linkedin.metadata.aspect.models.graph.Edge.EDGE_SOURCE_URN_FIELD; import static com.linkedin.metadata.graph.elastic.ESGraphQueryDAO.*; +import com.linkedin.common.urn.Urn; +import com.linkedin.metadata.aspect.models.graph.EdgeUrnType; +import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; @Slf4j -public class TimeFilterUtils { +public class GraphFilterUtils { + + public static QueryBuilder getUrnStatusQuery( + @Nonnull EdgeUrnType edgeUrnType, @Nonnull final Urn urn, @Nonnull Boolean removed) { + + final String urnField = getUrnFieldName(edgeUrnType); + final String statusField = getUrnStatusFieldName(edgeUrnType); + + // Create a BoolQueryBuilder + BoolQueryBuilder finalQuery = QueryBuilders.boolQuery(); + + // urn filter + finalQuery.filter(QueryBuilders.termQuery(urnField, urn.toString())); + + // status filter + if (removed) { + finalQuery.filter(QueryBuilders.termQuery(statusField, removed.toString())); + } else { + finalQuery.minimumShouldMatch(1); + finalQuery.should(QueryBuilders.termQuery(statusField, removed.toString())); + finalQuery.should(QueryBuilders.boolQuery().mustNot(QueryBuilders.existsQuery(statusField))); + } + + return finalQuery; + } + + public static String getUrnStatusFieldName(EdgeUrnType edgeUrnType) { + switch (edgeUrnType) { + case SOURCE: + return EDGE_SOURCE_STATUS; + case DESTINATION: + return EDGE_DESTINATION_STATUS; + case VIA: + return EDGE_FIELD_VIA_STATUS; + case LIFECYCLE_OWNER: + return EDGE_FIELD_LIFECYCLE_OWNER_STATUS; + default: + throw new IllegalStateException( + String.format("Unhandled EdgeUrnType. Found: %s", edgeUrnType)); + } + } + + public static String getUrnFieldName(EdgeUrnType edgeUrnType) { + switch (edgeUrnType) { + case SOURCE: + return EDGE_SOURCE_URN_FIELD; + case DESTINATION: + return EDGE_DESTINATION_URN_FIELD; + case VIA: + return EDGE_FIELD_VIA; + case LIFECYCLE_OWNER: + return EDGE_FIELD_LIFECYCLE_OWNER; + default: + throw new IllegalStateException( + String.format("Unhandled EdgeUrnType. Found: %s", edgeUrnType)); + } + } /** * In order to filter for edges that fall into a specific filter window, we perform a @@ -141,5 +208,5 @@ private static QueryBuilder buildManualLineageFilter() { return QueryBuilders.termQuery(String.format("%s.%s", PROPERTIES, SOURCE), UI); } - private TimeFilterUtils() {} + private GraphFilterUtils() {} } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphRelationshipMappingsBuilder.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphRelationshipMappingsBuilder.java index 7a6c7701fde5f..164bf3ad17d8c 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphRelationshipMappingsBuilder.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/GraphRelationshipMappingsBuilder.java @@ -20,6 +20,8 @@ public static Map getMappings() { mappings.put(EDGE_FIELD_PROPERTIES, getMappingsForEdgeProperties()); mappings.put(EDGE_FIELD_LIFECYCLE_OWNER, getMappingsForKeyword()); mappings.put(EDGE_FIELD_VIA, getMappingsForKeyword()); + mappings.put(EDGE_FIELD_LIFECYCLE_OWNER_STATUS, getMappingsForBoolean()); + mappings.put(EDGE_FIELD_VIA_STATUS, getMappingsForBoolean()); return ImmutableMap.of("properties", mappings); } @@ -27,12 +29,17 @@ private static Map getMappingsForKeyword() { return ImmutableMap.builder().put("type", "keyword").build(); } + private static Map getMappingsForBoolean() { + return ImmutableMap.builder().put("type", "boolean").build(); + } + private static Map getMappingsForEntity() { Map mappings = ImmutableMap.builder() .put("urn", getMappingsForKeyword()) .put("entityType", getMappingsForKeyword()) + .put("removed", getMappingsForBoolean()) .build(); return ImmutableMap.of("properties", mappings); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java index 9fe9c242fe48c..75d993f52680a 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java @@ -32,7 +32,7 @@ import com.linkedin.metadata.search.elasticsearch.query.request.SearchAfterWrapper; import com.linkedin.metadata.utils.metrics.MetricUtils; import com.linkedin.util.Pair; -import io.opentelemetry.extension.annotations.WithSpan; +import io.datahubproject.metadata.context.OperationContext; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collections; @@ -250,33 +250,24 @@ public void removeEdge(final Edge edge) { } @Nonnull - @WithSpan @Override public EntityLineageResult getLineage( + @Nonnull final OperationContext opContext, @Nonnull Urn entityUrn, @Nonnull LineageDirection direction, GraphFilters graphFilters, int offset, int count, int maxHops) { - return getLineage(entityUrn, direction, graphFilters, offset, count, maxHops, null); - } - - @Nonnull - @Override - public EntityLineageResult getLineage( - @Nonnull Urn entityUrn, - @Nonnull LineageDirection direction, - GraphFilters graphFilters, - int offset, - int count, - int maxHops, - @Nullable LineageFlags lineageFlags) { log.debug(String.format("Neo4j getLineage maxHops = %d", maxHops)); final var statementAndParams = generateLineageStatementAndParameters( - entityUrn, direction, graphFilters, maxHops, lineageFlags); + entityUrn, + direction, + graphFilters, + maxHops, + opContext.getSearchContext().getLineageFlags()); final var statement = statementAndParams.getFirst(); final var parameters = statementAndParams.getSecond(); @@ -457,6 +448,7 @@ private Pair> generateLineageStatementAndParameters( @Nonnull public RelatedEntitiesResult findRelatedEntities( + @Nonnull final OperationContext opContext, @Nullable final List sourceTypes, @Nonnull final Filter sourceEntityFilter, @Nullable final List destinationTypes, @@ -600,7 +592,7 @@ private String computeEntityTypeWhereClause( return whereClause; } - public void removeNode(@Nonnull final Urn urn) { + public void removeNode(@Nonnull final OperationContext opContext, @Nonnull final Urn urn) { log.debug(String.format("Removing Neo4j node with urn: %s", urn)); final String srcNodeLabel = urn.getEntityType(); @@ -627,6 +619,7 @@ public void removeNode(@Nonnull final Urn urn) { * @param relationshipFilter Query relationship filter */ public void removeEdgesFromNode( + @Nonnull final OperationContext opContext, @Nonnull final Urn urn, @Nonnull final List relationshipTypes, @Nonnull final RelationshipFilter relationshipFilter) { @@ -915,6 +908,7 @@ private boolean isSourceDestReversed( @Nonnull @Override public RelatedEntitiesScrollResult scrollRelatedEntities( + @Nonnull OperationContext opContext, @Nullable List sourceTypes, @Nonnull Filter sourceEntityFilter, @Nullable List destinationTypes, diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java index 435731a3f9d04..ec9c44e42f7f4 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/LineageSearchService.java @@ -173,13 +173,7 @@ public LineageSearchResult searchAcrossLineage( if (cachedLineageResult == null || finalOpContext.getSearchContext().getSearchFlags().isSkipCache()) { lineageResult = - _graphService.getLineage( - sourceUrn, - direction, - 0, - MAX_RELATIONSHIPS, - maxHops, - opContext.getSearchContext().getLineageFlags()); + _graphService.getLineage(opContext, sourceUrn, direction, 0, MAX_RELATIONSHIPS, maxHops); if (cacheEnabled) { try { cache.put( @@ -210,12 +204,7 @@ public LineageSearchResult searchAcrossLineage( // we have to refetch EntityLineageResult result = _graphService.getLineage( - sourceUrn, - direction, - 0, - MAX_RELATIONSHIPS, - finalMaxHops, - opContext.getSearchContext().getLineageFlags()); + opContext, sourceUrn, direction, 0, MAX_RELATIONSHIPS, finalMaxHops); cache.put(cacheKey, result); log.debug("Refilled Cached lineage entry for: {}.", sourceUrn); } else { @@ -770,13 +759,7 @@ public LineageScrollResult scrollAcrossLineage( if (cachedLineageResult == null) { maxHops = maxHops != null ? maxHops : 1000; lineageResult = - _graphService.getLineage( - sourceUrn, - direction, - 0, - MAX_RELATIONSHIPS, - maxHops, - opContext.getSearchContext().getLineageFlags()); + _graphService.getLineage(opContext, sourceUrn, direction, 0, MAX_RELATIONSHIPS, maxHops); if (cacheEnabled) { cache.put( cacheKey, new CachedEntityLineageResult(lineageResult, System.currentTimeMillis())); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESBulkProcessor.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESBulkProcessor.java index fc29aca411784..63a9c731a2d39 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESBulkProcessor.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESBulkProcessor.java @@ -23,6 +23,8 @@ import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.reindex.BulkByScrollResponse; import org.opensearch.index.reindex.DeleteByQueryRequest; +import org.opensearch.index.reindex.UpdateByQueryRequest; +import org.opensearch.script.Script; @Slf4j @Builder(builderMethodName = "hiddenBuilder") @@ -30,6 +32,7 @@ public class ESBulkProcessor implements Closeable { private static final String ES_WRITES_METRIC = "num_elasticSearch_writes"; private static final String ES_BATCHES_METRIC = "num_elasticSearch_batches_submitted"; private static final String ES_DELETE_EXCEPTION_METRIC = "delete_by_query"; + private static final String ES_UPDATE_EXCEPTION_METRIC = "update_by_query"; private static final String ES_SUBMIT_DELETE_EXCEPTION_METRIC = "submit_delete_by_query_task"; private static final String ES_SUBMIT_REINDEX_METRIC = "reindex_submit"; private static final String ES_REINDEX_SUCCESS_METRIC = "reindex_success"; @@ -97,6 +100,26 @@ public Optional deleteByQuery( return deleteByQuery(queryBuilder, refresh, bulkRequestsLimit, defaultTimeout, indices); } + public Optional updateByQuery( + Script script, QueryBuilder queryBuilder, String... indices) { + // Create an UpdateByQueryRequest + UpdateByQueryRequest updateByQuery = new UpdateByQueryRequest(indices); + updateByQuery.setQuery(queryBuilder); + updateByQuery.setScript(script); + + try { + final BulkByScrollResponse updateResponse = + searchClient.updateByQuery(updateByQuery, RequestOptions.DEFAULT); + MetricUtils.counter(this.getClass(), ES_WRITES_METRIC).inc(updateResponse.getTotal()); + return Optional.of(updateResponse); + } catch (Exception e) { + log.error("ERROR: Failed to update by query. See stacktrace for a more detailed error:", e); + MetricUtils.exceptionCounter(ESBulkProcessor.class, ES_UPDATE_EXCEPTION_METRIC, e); + } + + return Optional.empty(); + } + public Optional deleteByQuery( QueryBuilder queryBuilder, boolean refresh, int limit, TimeValue timeout, String... indices) { DeleteByQueryRequest deleteByQueryRequest = diff --git a/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateGraphIndicesService.java b/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateGraphIndicesService.java new file mode 100644 index 0000000000000..7549aea2007da --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateGraphIndicesService.java @@ -0,0 +1,452 @@ +package com.linkedin.metadata.service; + +import static com.linkedin.metadata.Constants.FORCE_INDEXING_KEY; +import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME; +import static com.linkedin.metadata.search.utils.QueryUtils.createRelationshipFilter; +import static com.linkedin.metadata.search.utils.QueryUtils.newRelationshipFilter; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import com.linkedin.common.InputField; +import com.linkedin.common.InputFields; +import com.linkedin.common.Status; +import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.datajob.DataJobInputOutput; +import com.linkedin.dataset.FineGrainedLineage; +import com.linkedin.dataset.FineGrainedLineageArray; +import com.linkedin.dataset.UpstreamLineage; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.aspect.batch.MCLItem; +import com.linkedin.metadata.aspect.models.graph.Edge; +import com.linkedin.metadata.aspect.models.graph.EdgeUrnType; +import com.linkedin.metadata.entity.SearchIndicesService; +import com.linkedin.metadata.entity.ebean.batch.MCLItemImpl; +import com.linkedin.metadata.graph.GraphIndexUtils; +import com.linkedin.metadata.graph.GraphService; +import com.linkedin.metadata.graph.dgraph.DgraphGraphService; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.models.RelationshipFieldSpec; +import com.linkedin.metadata.models.extractor.FieldExtractor; +import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray; +import com.linkedin.metadata.query.filter.Filter; +import com.linkedin.metadata.query.filter.RelationshipDirection; +import com.linkedin.metadata.utils.SchemaFieldUtils; +import com.linkedin.mxe.MetadataChangeLog; +import com.linkedin.mxe.SystemMetadata; +import com.linkedin.util.Pair; +import io.datahubproject.metadata.context.OperationContext; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class UpdateGraphIndicesService implements SearchIndicesService { + private static final String DOWNSTREAM_OF = "DownstreamOf"; + + public static UpdateGraphIndicesService withService(GraphService graphService) { + return new UpdateGraphIndicesService(graphService); + } + + private final GraphService graphService; + + @Getter private final boolean graphStatusEnabled; + + @Getter @Setter @VisibleForTesting private boolean graphDiffMode; + + private static final Set UPDATE_CHANGE_TYPES = + ImmutableSet.of( + ChangeType.CREATE, + ChangeType.CREATE_ENTITY, + ChangeType.UPSERT, + ChangeType.RESTATE, + ChangeType.PATCH); + + public UpdateGraphIndicesService(GraphService graphService) { + this(graphService, true, true); + } + + public UpdateGraphIndicesService( + GraphService graphService, boolean graphDiffMode, boolean graphStatusEnabled) { + this.graphService = graphService; + this.graphDiffMode = graphDiffMode; + this.graphStatusEnabled = graphStatusEnabled; + } + + @Override + public void handleChangeEvent( + @Nonnull OperationContext opContext, @Nonnull final MetadataChangeLog event) { + try { + MCLItemImpl mclItem = + MCLItemImpl.builder().build(event, opContext.getAspectRetrieverOpt().get()); + + if (UPDATE_CHANGE_TYPES.contains(event.getChangeType())) { + handleUpdateChangeEvent(opContext, mclItem); + + if (graphStatusEnabled && mclItem.getAspectName().equals(STATUS_ASPECT_NAME)) { + handleStatusUpdateChangeEvent(opContext, mclItem); + } + } else if (event.getChangeType() == ChangeType.DELETE) { + handleDeleteChangeEvent(opContext, mclItem); + + if (graphStatusEnabled && mclItem.getAspectName().equals(STATUS_ASPECT_NAME)) { + handleStatusUpdateChangeEvent(opContext, mclItem); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void handleStatusUpdateChangeEvent( + @Nonnull final OperationContext opContext, @Nonnull final MCLItem item) { + final Boolean removed; + if (ChangeType.DELETE.equals(item.getChangeType())) { + removed = false; + } else if (ChangeType.RESTATE.equals(item.getChangeType()) + || item.getPreviousRecordTemplate() == null + || !item.getPreviousAspect(Status.class).equals(item.getAspect(Status.class))) { + removed = item.getAspect(Status.class).isRemoved(); + } else { + removed = null; + } + + if (removed != null) { + graphService.setEdgeStatus(item.getUrn(), removed, EdgeUrnType.values()); + } + } + + /** + * This very important method processes {@link MetadataChangeLog} events that represent changes to + * the Metadata Graph. + * + *

In particular, it handles updating the Search, Graph, Timeseries, and System Metadata stores + * in response to a given change type to reflect the changes present in the new aspect. + * + * @param event the change event to be processed. + */ + private void handleUpdateChangeEvent( + @Nonnull final OperationContext opContext, @Nonnull final MCLItem event) throws IOException { + + final AspectSpec aspectSpec = event.getAspectSpec(); + final Urn urn = event.getUrn(); + + RecordTemplate aspect = event.getRecordTemplate(); + RecordTemplate previousAspect = event.getPreviousRecordTemplate(); + + // For all aspects, attempt to update Graph + SystemMetadata systemMetadata = event.getSystemMetadata(); + if (graphDiffMode + && !(graphService instanceof DgraphGraphService) + && (systemMetadata == null + || systemMetadata.getProperties() == null + || !Boolean.parseBoolean(systemMetadata.getProperties().get(FORCE_INDEXING_KEY)))) { + updateGraphServiceDiff(urn, aspectSpec, previousAspect, aspect, event.getMetadataChangeLog()); + } else { + updateGraphService(opContext, urn, aspectSpec, aspect, event.getMetadataChangeLog()); + } + } + + /** + * This very important method processes {@link MetadataChangeLog} deletion events to cleanup the + * Metadata Graph when an aspect or entity is removed. + * + *

In particular, it handles updating the Search, Graph, Timeseries, and System Metadata stores + * to reflect the deletion of a particular aspect. + * + *

Note that if an entity's key aspect is deleted, the entire entity will be purged from + * search, graph, timeseries, etc. + * + * @param event the change event to be processed. + */ + private void handleDeleteChangeEvent( + @Nonnull final OperationContext opContext, @Nonnull final MCLItem event) { + + final EntitySpec entitySpec = event.getEntitySpec(); + final Urn urn = event.getUrn(); + + AspectSpec aspectSpec = entitySpec.getAspectSpec(event.getAspectName()); + if (aspectSpec == null) { + throw new RuntimeException( + String.format( + "Failed to retrieve Aspect Spec for entity with name %s, aspect with name %s. Cannot update indices for MCL.", + urn.getEntityType(), event.getAspectName())); + } + + RecordTemplate aspect = event.getRecordTemplate(); + Boolean isDeletingKey = event.getAspectName().equals(entitySpec.getKeyAspectName()); + + if (!aspectSpec.isTimeseries()) { + deleteGraphData( + opContext, urn, aspectSpec, aspect, isDeletingKey, event.getMetadataChangeLog()); + } + } + + // TODO: remove this method once we implement sourceOverride when creating graph edges + private void updateFineGrainedEdgesAndRelationships( + Urn entity, + FineGrainedLineageArray fineGrainedLineageArray, + List edgesToAdd, + HashMap> urnToRelationshipTypesBeingAdded) { + if (fineGrainedLineageArray != null) { + for (FineGrainedLineage fineGrainedLineage : fineGrainedLineageArray) { + if (!fineGrainedLineage.hasDownstreams() || !fineGrainedLineage.hasUpstreams()) { + break; + } + // Fine grained lineage array is present either on datajob (datajob input/output) or dataset + // We set the datajob as the viaEntity in scenario 1, and the query (if present) as the + // viaEntity in scenario 2 + Urn viaEntity = + entity.getEntityType().equals("dataJob") ? entity : fineGrainedLineage.getQuery(); + // for every downstream, create an edge with each of the upstreams + for (Urn downstream : fineGrainedLineage.getDownstreams()) { + for (Urn upstream : fineGrainedLineage.getUpstreams()) { + // TODO: add edges uniformly across aspects + edgesToAdd.add( + new Edge( + downstream, + upstream, + DOWNSTREAM_OF, + null, + null, + null, + null, + null, + entity, + viaEntity)); + Set relationshipTypes = + urnToRelationshipTypesBeingAdded.getOrDefault(downstream, new HashSet<>()); + relationshipTypes.add(DOWNSTREAM_OF); + urnToRelationshipTypesBeingAdded.put(downstream, relationshipTypes); + } + } + } + } + } + + // TODO: remove this method once we implement sourceOverride and update inputFields aspect + private void updateInputFieldEdgesAndRelationships( + @Nonnull final Urn urn, + @Nonnull final InputFields inputFields, + @Nonnull final List edgesToAdd, + @Nonnull final HashMap> urnToRelationshipTypesBeingAdded) { + if (inputFields.hasFields()) { + for (final InputField field : inputFields.getFields()) { + if (field.hasSchemaFieldUrn() + && field.hasSchemaField() + && field.getSchemaField().hasFieldPath()) { + final Urn sourceFieldUrn = + SchemaFieldUtils.generateSchemaFieldUrn(urn, field.getSchemaField().getFieldPath()); + // TODO: add edges uniformly across aspects + edgesToAdd.add( + new Edge( + sourceFieldUrn, + field.getSchemaFieldUrn(), + DOWNSTREAM_OF, + null, + null, + null, + null, + null)); + final Set relationshipTypes = + urnToRelationshipTypesBeingAdded.getOrDefault(sourceFieldUrn, new HashSet<>()); + relationshipTypes.add(DOWNSTREAM_OF); + urnToRelationshipTypesBeingAdded.put(sourceFieldUrn, relationshipTypes); + } + } + } + } + + private Pair, HashMap>> getEdgesAndRelationshipTypesFromAspect( + @Nonnull final Urn urn, + @Nonnull final AspectSpec aspectSpec, + @Nonnull final RecordTemplate aspect, + @Nonnull final MetadataChangeLog event, + final boolean isNewAspectVersion) { + final List edgesToAdd = new ArrayList<>(); + final HashMap> urnToRelationshipTypesBeingAdded = new HashMap<>(); + + // we need to manually set schemaField <-> schemaField edges for fineGrainedLineage and + // inputFields + // since @Relationship only links between the parent entity urn and something else. + if (aspectSpec.getName().equals(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)) { + UpstreamLineage upstreamLineage = new UpstreamLineage(aspect.data()); + updateFineGrainedEdgesAndRelationships( + urn, + upstreamLineage.getFineGrainedLineages(), + edgesToAdd, + urnToRelationshipTypesBeingAdded); + } else if (aspectSpec.getName().equals(Constants.INPUT_FIELDS_ASPECT_NAME)) { + final InputFields inputFields = new InputFields(aspect.data()); + updateInputFieldEdgesAndRelationships( + urn, inputFields, edgesToAdd, urnToRelationshipTypesBeingAdded); + } else if (aspectSpec.getName().equals(Constants.DATA_JOB_INPUT_OUTPUT_ASPECT_NAME)) { + DataJobInputOutput dataJobInputOutput = new DataJobInputOutput(aspect.data()); + updateFineGrainedEdgesAndRelationships( + urn, + dataJobInputOutput.getFineGrainedLineages(), + edgesToAdd, + urnToRelationshipTypesBeingAdded); + } + + Map> extractedFields = + FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs()); + + for (Map.Entry> entry : extractedFields.entrySet()) { + Set relationshipTypes = + urnToRelationshipTypesBeingAdded.getOrDefault(urn, new HashSet<>()); + relationshipTypes.add(entry.getKey().getRelationshipName()); + urnToRelationshipTypesBeingAdded.put(urn, relationshipTypes); + final List newEdges = + GraphIndexUtils.extractGraphEdges(entry, aspect, urn, event, isNewAspectVersion); + edgesToAdd.addAll(newEdges); + } + return Pair.of(edgesToAdd, urnToRelationshipTypesBeingAdded); + } + + /** Process snapshot and update graph index */ + private void updateGraphService( + @Nonnull final OperationContext opContext, + @Nonnull final Urn urn, + @Nonnull final AspectSpec aspectSpec, + @Nonnull final RecordTemplate aspect, + @Nonnull final MetadataChangeLog event) { + Pair, HashMap>> edgeAndRelationTypes = + getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true); + + final List edgesToAdd = edgeAndRelationTypes.getFirst(); + final HashMap> urnToRelationshipTypesBeingAdded = + edgeAndRelationTypes.getSecond(); + + log.debug("Here's the relationship types found {}", urnToRelationshipTypesBeingAdded); + if (!urnToRelationshipTypesBeingAdded.isEmpty()) { + for (Map.Entry> entry : urnToRelationshipTypesBeingAdded.entrySet()) { + graphService.removeEdgesFromNode( + opContext, + entry.getKey(), + new ArrayList<>(entry.getValue()), + newRelationshipFilter( + new Filter().setOr(new ConjunctiveCriterionArray()), + RelationshipDirection.OUTGOING)); + } + edgesToAdd.forEach(graphService::addEdge); + } + } + + private void updateGraphServiceDiff( + @Nonnull final Urn urn, + @Nonnull final AspectSpec aspectSpec, + @Nullable final RecordTemplate oldAspect, + @Nonnull final RecordTemplate newAspect, + @Nonnull final MetadataChangeLog event) { + Pair, HashMap>> oldEdgeAndRelationTypes = null; + if (oldAspect != null) { + oldEdgeAndRelationTypes = + getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, oldAspect, event, false); + } + + final List oldEdges = + oldEdgeAndRelationTypes != null + ? oldEdgeAndRelationTypes.getFirst() + : Collections.emptyList(); + final Set oldEdgeSet = new HashSet<>(oldEdges); + + Pair, HashMap>> newEdgeAndRelationTypes = + getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, newAspect, event, true); + + final List newEdges = newEdgeAndRelationTypes.getFirst(); + final Set newEdgeSet = new HashSet<>(newEdges); + + // Edges to add + final List additiveDifference = + newEdgeSet.stream().filter(edge -> !oldEdgeSet.contains(edge)).collect(Collectors.toList()); + + // Edges to remove + final List subtractiveDifference = + oldEdgeSet.stream().filter(edge -> !newEdgeSet.contains(edge)).collect(Collectors.toList()); + + // Edges to update + final List mergedEdges = getMergedEdges(oldEdgeSet, newEdgeSet); + + // Remove any old edges that no longer exist first + if (subtractiveDifference.size() > 0) { + log.debug("Removing edges: {}", subtractiveDifference); + subtractiveDifference.forEach(graphService::removeEdge); + } + + // Then add new edges + if (additiveDifference.size() > 0) { + log.debug("Adding edges: {}", additiveDifference); + additiveDifference.forEach(graphService::addEdge); + } + + // Then update existing edges + if (mergedEdges.size() > 0) { + log.debug("Updating edges: {}", mergedEdges); + mergedEdges.forEach(graphService::upsertEdge); + } + } + + private static List getMergedEdges(final Set oldEdgeSet, final Set newEdgeSet) { + final Map oldEdgesMap = + oldEdgeSet.stream() + .map(edge -> Pair.of(edge.hashCode(), edge)) + .collect(Collectors.toMap(Pair::getFirst, Pair::getSecond)); + + final List mergedEdges = new ArrayList<>(); + if (!oldEdgesMap.isEmpty()) { + for (Edge newEdge : newEdgeSet) { + if (oldEdgesMap.containsKey(newEdge.hashCode())) { + final Edge oldEdge = oldEdgesMap.get(newEdge.hashCode()); + final Edge mergedEdge = GraphIndexUtils.mergeEdges(oldEdge, newEdge); + mergedEdges.add(mergedEdge); + } + } + } + + return mergedEdges; + } + + private void deleteGraphData( + @Nonnull final OperationContext opContext, + @Nonnull final Urn urn, + @Nonnull final AspectSpec aspectSpec, + @Nonnull final RecordTemplate aspect, + @Nonnull final Boolean isKeyAspect, + @Nonnull final MetadataChangeLog event) { + if (isKeyAspect) { + graphService.removeNode(opContext, urn); + return; + } + + Pair, HashMap>> edgeAndRelationTypes = + getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true); + + final HashMap> urnToRelationshipTypesBeingAdded = + edgeAndRelationTypes.getSecond(); + if (urnToRelationshipTypesBeingAdded.size() > 0) { + for (Map.Entry> entry : urnToRelationshipTypesBeingAdded.entrySet()) { + graphService.removeEdgesFromNode( + opContext, + entry.getKey(), + new ArrayList<>(entry.getValue()), + createRelationshipFilter( + new Filter().setOr(new ConjunctiveCriterionArray()), + RelationshipDirection.OUTGOING)); + } + } + } +} diff --git a/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateIndicesService.java b/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateIndicesService.java index 2274b0a7c1cd8..3795fd19316b1 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateIndicesService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/service/UpdateIndicesService.java @@ -2,59 +2,37 @@ import static com.linkedin.metadata.Constants.*; import static com.linkedin.metadata.search.transformer.SearchDocumentTransformer.withSystemCreated; -import static com.linkedin.metadata.search.utils.QueryUtils.*; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; -import com.linkedin.common.InputField; -import com.linkedin.common.InputFields; import com.linkedin.common.Status; import com.linkedin.common.UrnArray; import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; -import com.linkedin.datajob.DataJobInputOutput; -import com.linkedin.dataset.FineGrainedLineage; -import com.linkedin.dataset.FineGrainedLineageArray; -import com.linkedin.dataset.UpstreamLineage; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.Constants; import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.aspect.batch.MCLItem; -import com.linkedin.metadata.aspect.models.graph.Edge; import com.linkedin.metadata.entity.SearchIndicesService; import com.linkedin.metadata.entity.ebean.batch.MCLItemImpl; -import com.linkedin.metadata.graph.GraphIndexUtils; -import com.linkedin.metadata.graph.GraphService; -import com.linkedin.metadata.graph.dgraph.DgraphGraphService; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; -import com.linkedin.metadata.models.RelationshipFieldSpec; -import com.linkedin.metadata.models.extractor.FieldExtractor; -import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray; -import com.linkedin.metadata.query.filter.Filter; -import com.linkedin.metadata.query.filter.RelationshipDirection; import com.linkedin.metadata.search.EntitySearchService; import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders; import com.linkedin.metadata.search.transformer.SearchDocumentTransformer; import com.linkedin.metadata.systemmetadata.SystemMetadataService; import com.linkedin.metadata.timeseries.TimeseriesAspectService; import com.linkedin.metadata.timeseries.transformer.TimeseriesAspectTransformer; -import com.linkedin.metadata.utils.SchemaFieldUtils; import com.linkedin.mxe.MetadataChangeLog; import com.linkedin.mxe.SystemMetadata; import com.linkedin.structured.StructuredPropertyDefinition; -import com.linkedin.util.Pair; import io.datahubproject.metadata.context.OperationContext; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -63,33 +41,25 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nonnull; -import javax.annotation.Nullable; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; @Slf4j public class UpdateIndicesService implements SearchIndicesService { - private static final String DOWNSTREAM_OF = "DownstreamOf"; - - private final GraphService _graphService; - private final EntitySearchService _entitySearchService; - private final TimeseriesAspectService _timeseriesAspectService; - private final SystemMetadataService _systemMetadataService; - private final SearchDocumentTransformer _searchDocumentTransformer; - private final EntityIndexBuilders _entityIndexBuilders; - @Nonnull private final String idHashAlgo; - @Value("${featureFlags.graphServiceDiffModeEnabled:true}") - private boolean _graphDiffMode; + @VisibleForTesting @Getter private final UpdateGraphIndicesService updateGraphIndicesService; + private final EntitySearchService entitySearchService; + private final TimeseriesAspectService timeseriesAspectService; + private final SystemMetadataService systemMetadataService; + private final SearchDocumentTransformer searchDocumentTransformer; + private final EntityIndexBuilders entityIndexBuilders; + @Nonnull private final String idHashAlgo; - @Value("${featureFlags.searchServiceDiffModeEnabled:true}") - private boolean _searchDiffMode; + @Getter private final boolean searchDiffMode; - @Value("${structuredProperties.enabled}") - private boolean _structuredPropertiesHookEnabled; + @Getter private final boolean structuredPropertiesHookEnabled; - @Value("${structuredProperties.writeEnabled}") - private boolean _structuredPropertiesWriteEnabled; + @Getter private final boolean structuredPropertiesWriteEnabled; private static final Set UPDATE_CHANGE_TYPES = ImmutableSet.of( @@ -99,31 +69,48 @@ public class UpdateIndicesService implements SearchIndicesService { ChangeType.RESTATE, ChangeType.PATCH); - @VisibleForTesting - public void setGraphDiffMode(boolean graphDiffMode) { - _graphDiffMode = graphDiffMode; - } - - @VisibleForTesting - public void setSearchDiffMode(boolean searchDiffMode) { - _searchDiffMode = searchDiffMode; + public UpdateIndicesService( + UpdateGraphIndicesService updateGraphIndicesService, + EntitySearchService entitySearchService, + TimeseriesAspectService timeseriesAspectService, + SystemMetadataService systemMetadataService, + SearchDocumentTransformer searchDocumentTransformer, + EntityIndexBuilders entityIndexBuilders, + @Nonnull String idHashAlgo) { + this( + updateGraphIndicesService, + entitySearchService, + timeseriesAspectService, + systemMetadataService, + searchDocumentTransformer, + entityIndexBuilders, + idHashAlgo, + true, + true, + true); } public UpdateIndicesService( - GraphService graphService, + UpdateGraphIndicesService updateGraphIndicesService, EntitySearchService entitySearchService, TimeseriesAspectService timeseriesAspectService, SystemMetadataService systemMetadataService, SearchDocumentTransformer searchDocumentTransformer, EntityIndexBuilders entityIndexBuilders, - @Nonnull String idHashAlgo) { - _graphService = graphService; - _entitySearchService = entitySearchService; - _timeseriesAspectService = timeseriesAspectService; - _systemMetadataService = systemMetadataService; - _searchDocumentTransformer = searchDocumentTransformer; - _entityIndexBuilders = entityIndexBuilders; + @Nonnull String idHashAlgo, + boolean searchDiffMode, + boolean structuredPropertiesHookEnabled, + boolean structuredPropertiesWriteEnabled) { + this.updateGraphIndicesService = updateGraphIndicesService; + this.entitySearchService = entitySearchService; + this.timeseriesAspectService = timeseriesAspectService; + this.systemMetadataService = systemMetadataService; + this.searchDocumentTransformer = searchDocumentTransformer; + this.entityIndexBuilders = entityIndexBuilders; this.idHashAlgo = idHashAlgo; + this.searchDiffMode = searchDiffMode; + this.structuredPropertiesHookEnabled = structuredPropertiesHookEnabled; + this.structuredPropertiesWriteEnabled = structuredPropertiesWriteEnabled; } @Override @@ -144,6 +131,9 @@ public void handleChangeEvent( } else if (hookEvent.getChangeType() == ChangeType.DELETE) { handleDeleteChangeEvent(opContext, mclItem); } + + // graph update + updateGraphIndicesService.handleChangeEvent(opContext, event); } } catch (IOException e) { throw new RuntimeException(e); @@ -191,18 +181,6 @@ private void handleUpdateChangeEvent( // Step 2. For all aspects, attempt to update Search updateSearchService(opContext, event); - - // Step 3. For all aspects, attempt to update Graph - SystemMetadata systemMetadata = event.getSystemMetadata(); - if (_graphDiffMode - && !(_graphService instanceof DgraphGraphService) - && (systemMetadata == null - || systemMetadata.getProperties() == null - || !Boolean.parseBoolean(systemMetadata.getProperties().get(FORCE_INDEXING_KEY)))) { - updateGraphServiceDiff(urn, aspectSpec, previousAspect, aspect, event.getMetadataChangeLog()); - } else { - updateGraphService(urn, aspectSpec, aspect, event.getMetadataChangeLog()); - } } public void updateIndexMappings( @@ -210,9 +188,8 @@ public void updateIndexMappings( EntitySpec entitySpec, AspectSpec aspectSpec, RecordTemplate newValue, - RecordTemplate oldValue) - throws IOException { - if (_structuredPropertiesHookEnabled + RecordTemplate oldValue) { + if (structuredPropertiesHookEnabled && STRUCTURED_PROPERTY_ENTITY_NAME.equals(entitySpec.getName()) && STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME.equals(aspectSpec.getName())) { @@ -228,7 +205,7 @@ public void updateIndexMappings( newDefinition.getEntityTypes().removeAll(oldEntityTypes); if (newDefinition.getEntityTypes().size() > 0) { - _entityIndexBuilders + entityIndexBuilders .buildReindexConfigsWithNewStructProp(urn, newDefinition) .forEach( reindexState -> { @@ -237,7 +214,7 @@ public void updateIndexMappings( "Applying new structured property {} to index {}", newDefinition, reindexState.name()); - _entityIndexBuilders.getIndexBuilder().applyMappings(reindexState, false); + entityIndexBuilders.getIndexBuilder().applyMappings(reindexState, false); } catch (IOException e) { throw new RuntimeException(e); } @@ -277,236 +254,10 @@ private void handleDeleteChangeEvent( if (!aspectSpec.isTimeseries()) { deleteSystemMetadata(urn, aspectSpec, isDeletingKey); - deleteGraphData(urn, aspectSpec, aspect, isDeletingKey, event.getMetadataChangeLog()); deleteSearchData(opContext, urn, entitySpec.getName(), aspectSpec, aspect, isDeletingKey); } } - // TODO: remove this method once we implement sourceOverride when creating graph edges - private void updateFineGrainedEdgesAndRelationships( - Urn entity, - FineGrainedLineageArray fineGrainedLineageArray, - List edgesToAdd, - HashMap> urnToRelationshipTypesBeingAdded) { - if (fineGrainedLineageArray != null) { - for (FineGrainedLineage fineGrainedLineage : fineGrainedLineageArray) { - if (!fineGrainedLineage.hasDownstreams() || !fineGrainedLineage.hasUpstreams()) { - break; - } - // Fine grained lineage array is present either on datajob (datajob input/output) or dataset - // We set the datajob as the viaEntity in scenario 1, and the query (if present) as the - // viaEntity in scenario 2 - Urn viaEntity = - entity.getEntityType().equals("dataJob") ? entity : fineGrainedLineage.getQuery(); - // for every downstream, create an edge with each of the upstreams - for (Urn downstream : fineGrainedLineage.getDownstreams()) { - for (Urn upstream : fineGrainedLineage.getUpstreams()) { - // TODO: add edges uniformly across aspects - edgesToAdd.add( - new Edge( - downstream, - upstream, - DOWNSTREAM_OF, - null, - null, - null, - null, - null, - entity, - viaEntity)); - Set relationshipTypes = - urnToRelationshipTypesBeingAdded.getOrDefault(downstream, new HashSet<>()); - relationshipTypes.add(DOWNSTREAM_OF); - urnToRelationshipTypesBeingAdded.put(downstream, relationshipTypes); - } - } - } - } - } - - // TODO: remove this method once we implement sourceOverride and update inputFields aspect - private void updateInputFieldEdgesAndRelationships( - @Nonnull final Urn urn, - @Nonnull final InputFields inputFields, - @Nonnull final List edgesToAdd, - @Nonnull final HashMap> urnToRelationshipTypesBeingAdded) { - if (inputFields.hasFields()) { - for (final InputField field : inputFields.getFields()) { - if (field.hasSchemaFieldUrn() - && field.hasSchemaField() - && field.getSchemaField().hasFieldPath()) { - final Urn sourceFieldUrn = - SchemaFieldUtils.generateSchemaFieldUrn(urn, field.getSchemaField().getFieldPath()); - // TODO: add edges uniformly across aspects - edgesToAdd.add( - new Edge( - sourceFieldUrn, - field.getSchemaFieldUrn(), - DOWNSTREAM_OF, - null, - null, - null, - null, - null)); - final Set relationshipTypes = - urnToRelationshipTypesBeingAdded.getOrDefault(sourceFieldUrn, new HashSet<>()); - relationshipTypes.add(DOWNSTREAM_OF); - urnToRelationshipTypesBeingAdded.put(sourceFieldUrn, relationshipTypes); - } - } - } - } - - private Pair, HashMap>> getEdgesAndRelationshipTypesFromAspect( - @Nonnull final Urn urn, - @Nonnull final AspectSpec aspectSpec, - @Nonnull final RecordTemplate aspect, - @Nonnull final MetadataChangeLog event, - final boolean isNewAspectVersion) { - final List edgesToAdd = new ArrayList<>(); - final HashMap> urnToRelationshipTypesBeingAdded = new HashMap<>(); - - // we need to manually set schemaField <-> schemaField edges for fineGrainedLineage and - // inputFields - // since @Relationship only links between the parent entity urn and something else. - if (aspectSpec.getName().equals(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)) { - UpstreamLineage upstreamLineage = new UpstreamLineage(aspect.data()); - updateFineGrainedEdgesAndRelationships( - urn, - upstreamLineage.getFineGrainedLineages(), - edgesToAdd, - urnToRelationshipTypesBeingAdded); - } else if (aspectSpec.getName().equals(Constants.INPUT_FIELDS_ASPECT_NAME)) { - final InputFields inputFields = new InputFields(aspect.data()); - updateInputFieldEdgesAndRelationships( - urn, inputFields, edgesToAdd, urnToRelationshipTypesBeingAdded); - } else if (aspectSpec.getName().equals(Constants.DATA_JOB_INPUT_OUTPUT_ASPECT_NAME)) { - DataJobInputOutput dataJobInputOutput = new DataJobInputOutput(aspect.data()); - updateFineGrainedEdgesAndRelationships( - urn, - dataJobInputOutput.getFineGrainedLineages(), - edgesToAdd, - urnToRelationshipTypesBeingAdded); - } - - Map> extractedFields = - FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs()); - - for (Map.Entry> entry : extractedFields.entrySet()) { - Set relationshipTypes = - urnToRelationshipTypesBeingAdded.getOrDefault(urn, new HashSet<>()); - relationshipTypes.add(entry.getKey().getRelationshipName()); - urnToRelationshipTypesBeingAdded.put(urn, relationshipTypes); - final List newEdges = - GraphIndexUtils.extractGraphEdges(entry, aspect, urn, event, isNewAspectVersion); - edgesToAdd.addAll(newEdges); - } - return Pair.of(edgesToAdd, urnToRelationshipTypesBeingAdded); - } - - /** Process snapshot and update graph index */ - private void updateGraphService( - @Nonnull final Urn urn, - @Nonnull final AspectSpec aspectSpec, - @Nonnull final RecordTemplate aspect, - @Nonnull final MetadataChangeLog event) { - Pair, HashMap>> edgeAndRelationTypes = - getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true); - - final List edgesToAdd = edgeAndRelationTypes.getFirst(); - final HashMap> urnToRelationshipTypesBeingAdded = - edgeAndRelationTypes.getSecond(); - - log.debug("Here's the relationship types found {}", urnToRelationshipTypesBeingAdded); - if (!urnToRelationshipTypesBeingAdded.isEmpty()) { - for (Map.Entry> entry : urnToRelationshipTypesBeingAdded.entrySet()) { - _graphService.removeEdgesFromNode( - entry.getKey(), - new ArrayList<>(entry.getValue()), - newRelationshipFilter( - new Filter().setOr(new ConjunctiveCriterionArray()), - RelationshipDirection.OUTGOING)); - } - edgesToAdd.forEach(_graphService::addEdge); - } - } - - private void updateGraphServiceDiff( - @Nonnull final Urn urn, - @Nonnull final AspectSpec aspectSpec, - @Nullable final RecordTemplate oldAspect, - @Nonnull final RecordTemplate newAspect, - @Nonnull final MetadataChangeLog event) { - Pair, HashMap>> oldEdgeAndRelationTypes = null; - if (oldAspect != null) { - oldEdgeAndRelationTypes = - getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, oldAspect, event, false); - } - - final List oldEdges = - oldEdgeAndRelationTypes != null - ? oldEdgeAndRelationTypes.getFirst() - : Collections.emptyList(); - final Set oldEdgeSet = new HashSet<>(oldEdges); - - Pair, HashMap>> newEdgeAndRelationTypes = - getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, newAspect, event, true); - - final List newEdges = newEdgeAndRelationTypes.getFirst(); - final Set newEdgeSet = new HashSet<>(newEdges); - - // Edges to add - final List additiveDifference = - newEdgeSet.stream().filter(edge -> !oldEdgeSet.contains(edge)).collect(Collectors.toList()); - - // Edges to remove - final List subtractiveDifference = - oldEdgeSet.stream().filter(edge -> !newEdgeSet.contains(edge)).collect(Collectors.toList()); - - // Edges to update - final List mergedEdges = getMergedEdges(oldEdgeSet, newEdgeSet); - - // Remove any old edges that no longer exist first - if (subtractiveDifference.size() > 0) { - log.debug("Removing edges: {}", subtractiveDifference); - subtractiveDifference.forEach(_graphService::removeEdge); - } - - // Then add new edges - if (additiveDifference.size() > 0) { - log.debug("Adding edges: {}", additiveDifference); - additiveDifference.forEach(_graphService::addEdge); - } - - // Then update existing edges - if (mergedEdges.size() > 0) { - log.debug("Updating edges: {}", mergedEdges); - mergedEdges.forEach(_graphService::upsertEdge); - } - } - - private static List getMergedEdges(final Set oldEdgeSet, final Set newEdgeSet) { - final Map oldEdgesMap = - oldEdgeSet.stream() - .map(edge -> Pair.of(edge.hashCode(), edge)) - .collect(Collectors.toMap(Pair::getFirst, Pair::getSecond)); - - final List mergedEdges = new ArrayList<>(); - if (!oldEdgesMap.isEmpty()) { - for (com.linkedin.metadata.aspect.models.graph.Edge newEdge : newEdgeSet) { - if (oldEdgesMap.containsKey(newEdge.hashCode())) { - final com.linkedin.metadata.aspect.models.graph.Edge oldEdge = - oldEdgesMap.get(newEdge.hashCode()); - final com.linkedin.metadata.aspect.models.graph.Edge mergedEdge = - GraphIndexUtils.mergeEdges(oldEdge, newEdge); - mergedEdges.add(mergedEdge); - } - } - } - - return mergedEdges; - } - /** Process snapshot and update search index */ private void updateSearchService(@Nonnull OperationContext opContext, MCLItem event) { Urn urn = event.getUrn(); @@ -520,7 +271,7 @@ private void updateSearchService(@Nonnull OperationContext opContext, MCLItem ev Optional previousSearchDocument = Optional.empty(); try { searchDocument = - _searchDocumentTransformer + searchDocumentTransformer .transformAspect(opContext, urn, aspect, aspectSpec, false) .map( objectNode -> @@ -540,16 +291,16 @@ private void updateSearchService(@Nonnull OperationContext opContext, MCLItem ev return; } - final String docId = _entityIndexBuilders.getIndexConvention().getEntityDocumentId(urn); + final String docId = entityIndexBuilders.getIndexConvention().getEntityDocumentId(urn); - if (_searchDiffMode + if (searchDiffMode && (systemMetadata == null || systemMetadata.getProperties() == null || !Boolean.parseBoolean(systemMetadata.getProperties().get(FORCE_INDEXING_KEY)))) { if (previousAspect != null) { try { previousSearchDocument = - _searchDocumentTransformer.transformAspect( + searchDocumentTransformer.transformAspect( opContext, urn, previousAspect, aspectSpec, false); } catch (Exception e) { log.error( @@ -572,7 +323,7 @@ private void updateSearchService(@Nonnull OperationContext opContext, MCLItem ev searchDocument.get(), previousSearchDocument.orElse(null)) .toString(); - _entitySearchService.upsertDocument(opContext, entityName, finalDocument, docId); + entitySearchService.upsertDocument(opContext, entityName, finalDocument, docId); } /** Process snapshot and update time-series index */ @@ -597,18 +348,18 @@ private void updateTimeseriesFields( .entrySet() .forEach( document -> { - _timeseriesAspectService.upsertDocument( + timeseriesAspectService.upsertDocument( opContext, entityType, aspectName, document.getKey(), document.getValue()); }); } private void updateSystemMetadata( SystemMetadata systemMetadata, Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) { - _systemMetadataService.insert(systemMetadata, urn.toString(), aspectSpec.getName()); + systemMetadataService.insert(systemMetadata, urn.toString(), aspectSpec.getName()); // If processing status aspect update all aspects for this urn to removed if (aspectSpec.getName().equals(Constants.STATUS_ASPECT_NAME)) { - _systemMetadataService.setDocStatus(urn.toString(), ((Status) aspect).isRemoved()); + systemMetadataService.setDocStatus(urn.toString(), ((Status) aspect).isRemoved()); } } @@ -616,41 +367,13 @@ private void deleteSystemMetadata(Urn urn, AspectSpec aspectSpec, Boolean isKeyA if (isKeyAspect) { // Delete all aspects log.debug(String.format("Deleting all system metadata for urn: %s", urn)); - _systemMetadataService.deleteUrn(urn.toString()); + systemMetadataService.deleteUrn(urn.toString()); } else { // Delete all aspects from system metadata service log.debug( String.format( "Deleting system metadata for urn: %s, aspect: %s", urn, aspectSpec.getName())); - _systemMetadataService.deleteAspect(urn.toString(), aspectSpec.getName()); - } - } - - private void deleteGraphData( - @Nonnull final Urn urn, - @Nonnull final AspectSpec aspectSpec, - @Nonnull final RecordTemplate aspect, - @Nonnull final Boolean isKeyAspect, - @Nonnull final MetadataChangeLog event) { - if (isKeyAspect) { - _graphService.removeNode(urn); - return; - } - - Pair, HashMap>> edgeAndRelationTypes = - getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true); - - final HashMap> urnToRelationshipTypesBeingAdded = - edgeAndRelationTypes.getSecond(); - if (urnToRelationshipTypesBeingAdded.size() > 0) { - for (Map.Entry> entry : urnToRelationshipTypesBeingAdded.entrySet()) { - _graphService.removeEdgesFromNode( - entry.getKey(), - new ArrayList<>(entry.getValue()), - createRelationshipFilter( - new Filter().setOr(new ConjunctiveCriterionArray()), - RelationshipDirection.OUTGOING)); - } + systemMetadataService.deleteAspect(urn.toString(), aspectSpec.getName()); } } @@ -670,14 +393,14 @@ private void deleteSearchData( } if (isKeyAspect) { - _entitySearchService.deleteDocument(opContext, entityName, docId); + entitySearchService.deleteDocument(opContext, entityName, docId); return; } Optional searchDocument; try { searchDocument = - _searchDocumentTransformer + searchDocumentTransformer .transformAspect(opContext, urn, aspect, aspectSpec, true) .map(Objects::toString); // TODO } catch (Exception e) { @@ -690,18 +413,6 @@ private void deleteSearchData( return; } - _entitySearchService.upsertDocument(opContext, entityName, searchDocument.get(), docId); - } - - private EntitySpec getEventEntitySpec( - @Nonnull OperationContext opContext, @Nonnull final MetadataChangeLog event) { - try { - return opContext.getEntityRegistry().getEntitySpec(event.getEntityType()); - } catch (IllegalArgumentException e) { - throw new RuntimeException( - String.format( - "Failed to retrieve Entity Spec for entity with name %s. Cannot update indices for MCL.", - event.getEntityType())); - } + entitySearchService.upsertDocument(opContext, entityName, searchDocument.get(), docId); } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/DeleteEntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/DeleteEntityServiceTest.java index fe3608a2cf71d..d585ff1ce8383 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/DeleteEntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/DeleteEntityServiceTest.java @@ -37,6 +37,7 @@ import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.test.metadata.context.TestOperationContexts; import java.sql.Timestamp; +import java.util.List; import java.util.Map; import org.mockito.Mockito; import org.testng.annotations.Test; @@ -79,14 +80,15 @@ public void testDeleteUniqueRefGeneratesValidMCP() { Mockito.when( _graphService.findRelatedEntities( - null, - newFilter("urn", container.toString()), - null, - EMPTY_FILTER, - ImmutableList.of(), - newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING), - 0, - 10000)) + any(OperationContext.class), + nullable(List.class), + eq(newFilter("urn", container.toString())), + nullable(List.class), + eq(EMPTY_FILTER), + eq(ImmutableList.of()), + eq(newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING)), + eq(0), + eq((10000)))) .thenReturn(mockRelatedEntities); final EntityResponse entityResponse = new EntityResponse(); @@ -195,14 +197,15 @@ public void testDeleteSearchReferences() { new RelatedEntitiesResult(0, 0, 0, ImmutableList.of()); Mockito.when( _graphService.findRelatedEntities( - null, - newFilter("urn", form.toString()), - null, - EMPTY_FILTER, - ImmutableList.of(), - newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING), - 0, - 10000)) + any(OperationContext.class), + nullable(List.class), + eq(newFilter("urn", form.toString())), + nullable(List.class), + eq(EMPTY_FILTER), + eq(ImmutableList.of()), + eq(newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING)), + eq(0), + eq((10000)))) .thenReturn(mockRelatedEntities); final DeleteReferencesResponse response = @@ -249,14 +252,15 @@ public void testDeleteNoSearchReferences() { new RelatedEntitiesResult(0, 0, 0, ImmutableList.of()); Mockito.when( _graphService.findRelatedEntities( - null, - newFilter("urn", form.toString()), - null, - EMPTY_FILTER, - ImmutableList.of(), - newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING), - 0, - 10000)) + any(OperationContext.class), + nullable(List.class), + eq(newFilter("urn", form.toString())), + nullable(List.class), + eq(EMPTY_FILTER), + eq(ImmutableList.of()), + eq(newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING)), + eq(0), + eq((10000)))) .thenReturn(mockRelatedEntities); final DeleteReferencesResponse response = @@ -308,14 +312,15 @@ public void testDeleteSearchReferencesDryRun() { new RelatedEntitiesResult(0, 0, 0, ImmutableList.of()); Mockito.when( _graphService.findRelatedEntities( - null, - newFilter("urn", form.toString()), - null, - EMPTY_FILTER, - ImmutableList.of(), - newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING), - 0, - 10000)) + any(OperationContext.class), + nullable(List.class), + eq(newFilter("urn", form.toString())), + nullable(List.class), + eq(EMPTY_FILTER), + eq(ImmutableList.of()), + eq(newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING)), + eq(0), + eq((10000)))) .thenReturn(mockRelatedEntities); final DeleteReferencesResponse response = diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java index 5d9a5079f2a3b..64ab95b5c6843 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java @@ -11,16 +11,16 @@ import com.linkedin.common.urn.DataJobUrn; import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; -import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor; import com.linkedin.metadata.aspect.models.graph.Edge; import com.linkedin.metadata.aspect.models.graph.RelatedEntity; import com.linkedin.metadata.config.search.GraphQueryConfiguration; import com.linkedin.metadata.graph.dgraph.DgraphGraphService; import com.linkedin.metadata.graph.neo4j.Neo4jGraphService; -import com.linkedin.metadata.query.LineageFlags; import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.query.filter.RelationshipDirection; import com.linkedin.metadata.query.filter.RelationshipFilter; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.test.metadata.context.TestOperationContexts; import java.net.URISyntaxException; import java.time.Duration; import java.util.ArrayList; @@ -47,7 +47,6 @@ import javax.annotation.Nullable; import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; import org.testng.Assert; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -273,20 +272,16 @@ public int compare(RelatedEntity left, RelatedEntity right) { /** Any source and destination type value. */ protected static @Nullable List anyType = null; - protected final GraphQueryConfiguration _graphQueryConfiguration = getGraphQueryConfiguration(); + protected static final GraphQueryConfiguration _graphQueryConfiguration = + getGraphQueryConfiguration(); + protected static final OperationContext operationContext = + TestOperationContexts.systemContextNoSearchAuthorization(); /** Timeout used to test concurrent ops in doTestConcurrentOp. */ protected Duration getTestConcurrentOpTimeout() { return Duration.ofMinutes(1); } - @BeforeMethod - public void disableAssert() { - PathSpecBasedSchemaAnnotationVisitor.class - .getClassLoader() - .setClassAssertionStatus(PathSpecBasedSchemaAnnotationVisitor.class.getName(), false); - } - @Test public void testStaticUrns() { assertNotNull(dataset1Urn); @@ -502,6 +497,7 @@ public void testAddEdge( RelatedEntitiesResult relatedOutgoing = service.findRelatedEntities( + operationContext, anyType, EMPTY_FILTER, anyType, @@ -514,6 +510,7 @@ public void testAddEdge( RelatedEntitiesResult relatedIncoming = service.findRelatedEntities( + operationContext, anyType, EMPTY_FILTER, anyType, @@ -531,6 +528,7 @@ public void testPopulatedGraphService() throws Exception { RelatedEntitiesResult relatedOutgoingEntitiesBeforeRemove = service.findRelatedEntities( + operationContext, anyType, EMPTY_FILTER, anyType, @@ -549,6 +547,7 @@ public void testPopulatedGraphService() throws Exception { downstreamOfSchemaFieldTwoVia, downstreamOfSchemaFieldTwo)); RelatedEntitiesResult relatedIncomingEntitiesBeforeRemove = service.findRelatedEntities( + operationContext, anyType, EMPTY_FILTER, anyType, @@ -574,13 +573,13 @@ public void testPopulatedGraphService() throws Exception { downstreamOfSchemaFieldOne)); EntityLineageResult viaNodeResult = service.getLineage( + operationContext, schemaFieldUrnOne, LineageDirection.UPSTREAM, new GraphFilters(List.of("schemaField")), 0, 1000, - 100, - null); + 100); // Multi-path enabled assertEquals(viaNodeResult.getRelationships().size(), 2); // First one is via node @@ -589,13 +588,13 @@ public void testPopulatedGraphService() throws Exception { EntityLineageResult viaNodeResultNoMulti = getGraphService(false) .getLineage( + operationContext, schemaFieldUrnOne, LineageDirection.UPSTREAM, new GraphFilters(List.of("schemaField")), 0, 1000, - 100, - null); + 100); // Multi-path disabled, still has two because via flow creates both edges in response assertEquals(viaNodeResultNoMulti.getRelationships().size(), 2); @@ -612,12 +611,12 @@ public void testPopulatedGraphServiceGetLineage() throws Exception { GraphService service = getLineagePopulatedGraphService(); EntityLineageResult upstreamLineage = - service.getLineage(dataset1Urn, LineageDirection.UPSTREAM, 0, 1000, 1); + service.getLineage(operationContext, dataset1Urn, LineageDirection.UPSTREAM, 0, 1000, 1); assertEquals(upstreamLineage.getTotal().intValue(), 0); assertEquals(upstreamLineage.getRelationships().size(), 0); EntityLineageResult downstreamLineage = - service.getLineage(dataset1Urn, LineageDirection.DOWNSTREAM, 0, 1000, 1); + service.getLineage(operationContext, dataset1Urn, LineageDirection.DOWNSTREAM, 0, 1000, 1); assertEquals(downstreamLineage.getTotal().intValue(), 3); assertEquals(downstreamLineage.getRelationships().size(), 3); Map relationships = @@ -630,7 +629,8 @@ public void testPopulatedGraphServiceGetLineage() throws Exception { assertTrue(relationships.containsKey(dataJobTwoUrn)); assertEquals(relationships.get(dataJobTwoUrn).getType(), consumes); - upstreamLineage = service.getLineage(dataset3Urn, LineageDirection.UPSTREAM, 0, 1000, 1); + upstreamLineage = + service.getLineage(operationContext, dataset3Urn, LineageDirection.UPSTREAM, 0, 1000, 1); assertEquals(upstreamLineage.getTotal().intValue(), 2); assertEquals(upstreamLineage.getRelationships().size(), 2); relationships = @@ -641,11 +641,13 @@ public void testPopulatedGraphServiceGetLineage() throws Exception { assertTrue(relationships.containsKey(dataJobOneUrn)); assertEquals(relationships.get(dataJobOneUrn).getType(), produces); - downstreamLineage = service.getLineage(dataset3Urn, LineageDirection.DOWNSTREAM, 0, 1000, 1); + downstreamLineage = + service.getLineage(operationContext, dataset3Urn, LineageDirection.DOWNSTREAM, 0, 1000, 1); assertEquals(downstreamLineage.getTotal().intValue(), 0); assertEquals(downstreamLineage.getRelationships().size(), 0); - upstreamLineage = service.getLineage(dataJobOneUrn, LineageDirection.UPSTREAM, 0, 1000, 1); + upstreamLineage = + service.getLineage(operationContext, dataJobOneUrn, LineageDirection.UPSTREAM, 0, 1000, 1); assertEquals(upstreamLineage.getTotal().intValue(), 2); assertEquals(upstreamLineage.getRelationships().size(), 2); relationships = @@ -656,7 +658,9 @@ public void testPopulatedGraphServiceGetLineage() throws Exception { assertTrue(relationships.containsKey(dataset2Urn)); assertEquals(relationships.get(dataset2Urn).getType(), consumes); - downstreamLineage = service.getLineage(dataJobOneUrn, LineageDirection.DOWNSTREAM, 0, 1000, 1); + downstreamLineage = + service.getLineage( + operationContext, dataJobOneUrn, LineageDirection.DOWNSTREAM, 0, 1000, 1); assertEquals(downstreamLineage.getTotal().intValue(), 3); assertEquals(downstreamLineage.getRelationships().size(), 3); relationships = @@ -834,6 +838,7 @@ private void doTestFindRelatedEntities( RelatedEntitiesResult relatedEntities = service.findRelatedEntities( + operationContext, anyType, sourceEntityFilter, anyType, @@ -1118,6 +1123,7 @@ private void doTestFindRelatedEntities( RelatedEntitiesResult relatedEntities = service.findRelatedEntities( + operationContext, sourceType, EMPTY_FILTER, destinationType, @@ -1139,6 +1145,7 @@ private void doTestFindRelatedEntitiesEntityType( @Nonnull RelatedEntity... expectedEntities) { RelatedEntitiesResult actualEntities = service.findRelatedEntities( + operationContext, sourceType, EMPTY_FILTER, destinationType, @@ -1244,6 +1251,7 @@ public void testFindRelatedEntitiesRelationshipTypes() throws Exception { RelatedEntitiesResult allOutgoingRelatedEntities = service.findRelatedEntities( + operationContext, anyType, EMPTY_FILTER, anyType, @@ -1263,6 +1271,7 @@ public void testFindRelatedEntitiesRelationshipTypes() throws Exception { RelatedEntitiesResult allIncomingRelatedEntities = service.findRelatedEntities( + operationContext, anyType, EMPTY_FILTER, anyType, @@ -1289,6 +1298,7 @@ public void testFindRelatedEntitiesRelationshipTypes() throws Exception { RelatedEntitiesResult allUnknownRelationshipTypeRelatedEntities = service.findRelatedEntities( + operationContext, anyType, EMPTY_FILTER, anyType, @@ -1301,6 +1311,7 @@ public void testFindRelatedEntitiesRelationshipTypes() throws Exception { RelatedEntitiesResult someUnknownRelationshipTypeRelatedEntities = service.findRelatedEntities( + operationContext, anyType, EMPTY_FILTER, anyType, @@ -1325,6 +1336,7 @@ public void testFindRelatedEntitiesNoRelationshipTypes() throws Exception { RelatedEntitiesResult relatedEntities = service.findRelatedEntities( + operationContext, anyType, EMPTY_FILTER, anyType, @@ -1340,6 +1352,7 @@ public void testFindRelatedEntitiesNoRelationshipTypes() throws Exception { // did not get any related urns? RelatedEntitiesResult relatedEntitiesAll = service.findRelatedEntities( + operationContext, anyType, EMPTY_FILTER, anyType, @@ -1358,6 +1371,7 @@ public void testFindRelatedEntitiesAllFilters() throws Exception { RelatedEntitiesResult relatedEntities = service.findRelatedEntities( + operationContext, ImmutableList.of(datasetType), newFilter("urn", dataset1UrnString), ImmutableList.of(userType), @@ -1371,6 +1385,7 @@ public void testFindRelatedEntitiesAllFilters() throws Exception { relatedEntities = service.findRelatedEntities( + operationContext, ImmutableList.of(datasetType), newFilter("urn", dataset1UrnString), ImmutableList.of(userType), @@ -1389,6 +1404,7 @@ public void testFindRelatedEntitiesMultipleEntityTypes() throws Exception { RelatedEntitiesResult relatedEntities = service.findRelatedEntities( + operationContext, ImmutableList.of(datasetType, userType), newFilter("urn", dataset1UrnString), ImmutableList.of(datasetType, userType), @@ -1402,6 +1418,7 @@ public void testFindRelatedEntitiesMultipleEntityTypes() throws Exception { relatedEntities = service.findRelatedEntities( + operationContext, ImmutableList.of(datasetType, userType), newFilter("urn", dataset1UrnString), ImmutableList.of(datasetType, userType), @@ -1421,6 +1438,7 @@ public void testFindRelatedEntitiesOffsetAndCount() throws Exception { // populated graph asserted in testPopulatedGraphService RelatedEntitiesResult allRelatedEntities = service.findRelatedEntities( + operationContext, ImmutableList.of(datasetType), EMPTY_FILTER, anyType, @@ -1436,6 +1454,7 @@ public void testFindRelatedEntitiesOffsetAndCount() throws Exception { idx -> individualRelatedEntities.addAll( service.findRelatedEntities( + operationContext, ImmutableList.of(datasetType), EMPTY_FILTER, anyType, @@ -1540,6 +1559,7 @@ public void testRemoveEdgesFromNode( RelatedEntitiesResult actualOutgoingRelatedUrnsBeforeRemove = service.findRelatedEntities( + operationContext, anyType, newFilter("urn", nodeToRemoveFrom.toString()), anyType, @@ -1550,6 +1570,7 @@ public void testRemoveEdgesFromNode( 100); RelatedEntitiesResult actualIncomingRelatedUrnsBeforeRemove = service.findRelatedEntities( + operationContext, anyType, newFilter("urn", nodeToRemoveFrom.toString()), anyType, @@ -1566,6 +1587,7 @@ public void testRemoveEdgesFromNode( // we expect these do not change RelatedEntitiesResult relatedEntitiesOfOtherOutgoingRelationTypesBeforeRemove = service.findRelatedEntities( + operationContext, anyType, newFilter("urn", nodeToRemoveFrom.toString()), anyType, @@ -1576,6 +1598,7 @@ public void testRemoveEdgesFromNode( 100); RelatedEntitiesResult relatedEntitiesOfOtherIncomingRelationTypesBeforeRemove = service.findRelatedEntities( + operationContext, anyType, newFilter("urn", nodeToRemoveFrom.toString()), anyType, @@ -1585,11 +1608,13 @@ public void testRemoveEdgesFromNode( 0, 100); - service.removeEdgesFromNode(nodeToRemoveFrom, relationTypes, relationshipFilter); + service.removeEdgesFromNode( + operationContext, nodeToRemoveFrom, relationTypes, relationshipFilter); syncAfterWrite(); RelatedEntitiesResult actualOutgoingRelatedUrnsAfterRemove = service.findRelatedEntities( + operationContext, anyType, newFilter("urn", nodeToRemoveFrom.toString()), anyType, @@ -1600,6 +1625,7 @@ public void testRemoveEdgesFromNode( 100); RelatedEntitiesResult actualIncomingRelatedUrnsAfterRemove = service.findRelatedEntities( + operationContext, anyType, newFilter("urn", nodeToRemoveFrom.toString()), anyType, @@ -1616,6 +1642,7 @@ public void testRemoveEdgesFromNode( // assert these did not change RelatedEntitiesResult relatedEntitiesOfOtherOutgoingRelationTypesAfterRemove = service.findRelatedEntities( + operationContext, anyType, newFilter("urn", nodeToRemoveFrom.toString()), anyType, @@ -1626,6 +1653,7 @@ public void testRemoveEdgesFromNode( 100); RelatedEntitiesResult relatedEntitiesOfOtherIncomingRelationTypesAfterRemove = service.findRelatedEntities( + operationContext, anyType, newFilter("urn", nodeToRemoveFrom.toString()), anyType, @@ -1650,6 +1678,7 @@ public void testRemoveEdgesFromNodeNoRelationshipTypes() throws Exception { // populated graph asserted in testPopulatedGraphService RelatedEntitiesResult relatedOutgoingEntitiesBeforeRemove = service.findRelatedEntities( + operationContext, anyType, newFilter("urn", nodeToRemoveFrom.toString()), anyType, @@ -1661,12 +1690,15 @@ public void testRemoveEdgesFromNodeNoRelationshipTypes() throws Exception { // can be replaced with a single removeEdgesFromNode and undirectedRelationships once supported // by all implementations - service.removeEdgesFromNode(nodeToRemoveFrom, Collections.emptyList(), outgoingRelationships); - service.removeEdgesFromNode(nodeToRemoveFrom, Collections.emptyList(), incomingRelationships); + service.removeEdgesFromNode( + operationContext, nodeToRemoveFrom, Collections.emptyList(), outgoingRelationships); + service.removeEdgesFromNode( + operationContext, nodeToRemoveFrom, Collections.emptyList(), incomingRelationships); syncAfterWrite(); RelatedEntitiesResult relatedOutgoingEntitiesAfterRemove = service.findRelatedEntities( + operationContext, anyType, newFilter("urn", nodeToRemoveFrom.toString()), anyType, @@ -1680,13 +1712,20 @@ public void testRemoveEdgesFromNodeNoRelationshipTypes() throws Exception { // does the test actually test something? is the Collections.emptyList() the only reason why we // did not see changes? service.removeEdgesFromNode( - nodeToRemoveFrom, Arrays.asList(downstreamOf, hasOwner, knowsUser), outgoingRelationships); + operationContext, + nodeToRemoveFrom, + Arrays.asList(downstreamOf, hasOwner, knowsUser), + outgoingRelationships); service.removeEdgesFromNode( - nodeToRemoveFrom, Arrays.asList(downstreamOf, hasOwner, knowsUser), incomingRelationships); + operationContext, + nodeToRemoveFrom, + Arrays.asList(downstreamOf, hasOwner, knowsUser), + incomingRelationships); syncAfterWrite(); RelatedEntitiesResult relatedOutgoingEntitiesAfterRemoveAll = service.findRelatedEntities( + operationContext, anyType, newFilter("urn", nodeToRemoveFrom.toString()), anyType, @@ -1706,6 +1745,7 @@ public void testRemoveEdgesFromUnknownNode() throws Exception { // populated graph asserted in testPopulatedGraphService RelatedEntitiesResult relatedOutgoingEntitiesBeforeRemove = service.findRelatedEntities( + operationContext, anyType, EMPTY_FILTER, anyType, @@ -1718,13 +1758,20 @@ public void testRemoveEdgesFromUnknownNode() throws Exception { // can be replaced with a single removeEdgesFromNode and undirectedRelationships once supported // by all implementations service.removeEdgesFromNode( - nodeToRemoveFrom, Arrays.asList(downstreamOf, hasOwner, knowsUser), outgoingRelationships); + operationContext, + nodeToRemoveFrom, + Arrays.asList(downstreamOf, hasOwner, knowsUser), + outgoingRelationships); service.removeEdgesFromNode( - nodeToRemoveFrom, Arrays.asList(downstreamOf, hasOwner, knowsUser), incomingRelationships); + operationContext, + nodeToRemoveFrom, + Arrays.asList(downstreamOf, hasOwner, knowsUser), + incomingRelationships); syncAfterWrite(); RelatedEntitiesResult relatedOutgoingEntitiesAfterRemove = service.findRelatedEntities( + operationContext, anyType, EMPTY_FILTER, anyType, @@ -1740,13 +1787,14 @@ public void testRemoveEdgesFromUnknownNode() throws Exception { public void testRemoveNode() throws Exception { GraphService service = getPopulatedGraphService(); - service.removeNode(dataset2Urn); + service.removeNode(operationContext, dataset2Urn); syncAfterWrite(); // assert the modified graph // All downstreamOf, hasOwner, knowsUser relationships minus datasetTwo's, outgoing assertEqualsAnyOrder( service.findRelatedEntities( + operationContext, anyType, EMPTY_FILTER, anyType, @@ -1768,6 +1816,7 @@ public void testRemoveUnknownNode() throws Exception { // populated graph asserted in testPopulatedGraphService RelatedEntitiesResult entitiesBeforeRemove = service.findRelatedEntities( + operationContext, anyType, EMPTY_FILTER, anyType, @@ -1777,11 +1826,12 @@ public void testRemoveUnknownNode() throws Exception { 0, 100); - service.removeNode(unknownUrn); + service.removeNode(operationContext, unknownUrn); syncAfterWrite(); RelatedEntitiesResult entitiesAfterRemove = service.findRelatedEntities( + operationContext, anyType, EMPTY_FILTER, anyType, @@ -1806,6 +1856,7 @@ public void testClear() throws Exception { // again assertEqualsAnyOrder( service.findRelatedEntities( + operationContext, ImmutableList.of(datasetType), EMPTY_FILTER, anyType, @@ -1817,6 +1868,7 @@ public void testClear() throws Exception { Collections.emptyList()); assertEqualsAnyOrder( service.findRelatedEntities( + operationContext, ImmutableList.of(userType), EMPTY_FILTER, anyType, @@ -1828,6 +1880,7 @@ public void testClear() throws Exception { Collections.emptyList()); assertEqualsAnyOrder( service.findRelatedEntities( + operationContext, anyType, EMPTY_FILTER, ImmutableList.of(userType), @@ -1891,6 +1944,7 @@ public void testConcurrentAddEdge() throws Exception { RelatedEntitiesResult relatedEntities = service.findRelatedEntities( + operationContext, null, EMPTY_FILTER, null, @@ -1937,6 +1991,7 @@ public void testConcurrentRemoveEdgesFromNode() throws Exception { // assert the graph is there RelatedEntitiesResult relatedEntities = service.findRelatedEntities( + operationContext, null, EMPTY_FILTER, null, @@ -1956,6 +2011,7 @@ public void testConcurrentRemoveEdgesFromNode() throws Exception { edge -> () -> service.removeEdgesFromNode( + operationContext, edge.getSource(), Collections.singletonList(edge.getRelationshipType()), outgoingRelationships)); @@ -1965,6 +2021,7 @@ public void testConcurrentRemoveEdgesFromNode() throws Exception { // assert the graph is gone RelatedEntitiesResult relatedEntitiesAfterDeletion = service.findRelatedEntities( + operationContext, null, EMPTY_FILTER, null, @@ -1998,6 +2055,7 @@ public void testConcurrentRemoveNodes() throws Exception { // assert the graph is there RelatedEntitiesResult relatedEntities = service.findRelatedEntities( + operationContext, null, EMPTY_FILTER, null, @@ -2013,13 +2071,14 @@ public void testConcurrentRemoveNodes() throws Exception { // remove all nodes concurrently // nodes will be removed multiple times Stream operations = - edges.stream().map(edge -> () -> service.removeNode(edge.getSource())); + edges.stream().map(edge -> () -> service.removeNode(operationContext, edge.getSource())); doTestConcurrentOp(operations); syncAfterWrite(); // assert the graph is gone RelatedEntitiesResult relatedEntitiesAfterDeletion = service.findRelatedEntities( + operationContext, null, EMPTY_FILTER, null, @@ -2094,12 +2153,12 @@ public void testPopulatedGraphServiceGetLineageMultihop(Boolean attemptMultiPath (!((service instanceof Neo4jGraphService) || (service instanceof DgraphGraphService))); EntityLineageResult upstreamLineage = - service.getLineage(dataset1Urn, LineageDirection.UPSTREAM, 0, 1000, 2); + service.getLineage(operationContext, dataset1Urn, LineageDirection.UPSTREAM, 0, 1000, 2); assertEquals(upstreamLineage.getTotal().intValue(), 0); assertEquals(upstreamLineage.getRelationships().size(), 0); EntityLineageResult downstreamLineage = - service.getLineage(dataset1Urn, LineageDirection.DOWNSTREAM, 0, 1000, 2); + service.getLineage(operationContext, dataset1Urn, LineageDirection.DOWNSTREAM, 0, 1000, 2); assertEquals(downstreamLineage.getTotal().intValue(), 5); assertEquals(downstreamLineage.getRelationships().size(), 5); @@ -2124,7 +2183,8 @@ public void testPopulatedGraphServiceGetLineageMultihop(Boolean attemptMultiPath assertTrue(relationships.containsKey(dataJobTwoUrn)); assertEquals(relationships.get(dataJobTwoUrn).getDegree(), 1); - upstreamLineage = service.getLineage(dataset3Urn, LineageDirection.UPSTREAM, 0, 1000, 2); + upstreamLineage = + service.getLineage(operationContext, dataset3Urn, LineageDirection.UPSTREAM, 0, 1000, 2); assertEquals(upstreamLineage.getTotal().intValue(), 3); assertEquals(upstreamLineage.getRelationships().size(), 3); relationships = @@ -2137,7 +2197,8 @@ public void testPopulatedGraphServiceGetLineageMultihop(Boolean attemptMultiPath assertTrue(relationships.containsKey(dataJobOneUrn)); assertEquals(relationships.get(dataJobOneUrn).getDegree(), 1); - downstreamLineage = service.getLineage(dataset3Urn, LineageDirection.DOWNSTREAM, 0, 1000, 2); + downstreamLineage = + service.getLineage(operationContext, dataset3Urn, LineageDirection.DOWNSTREAM, 0, 1000, 2); assertEquals(downstreamLineage.getTotal().intValue(), 0); assertEquals(downstreamLineage.getRelationships().size(), 0); } @@ -2156,6 +2217,7 @@ public void testHighlyConnectedGraphWalk() throws Exception { Set expectedRelatedEntities = convertEdgesToRelatedEntities(edges); RelatedEntitiesResult relatedEntities = service.findRelatedEntities( + operationContext, null, EMPTY_FILTER, null, @@ -2169,9 +2231,13 @@ public void testHighlyConnectedGraphWalk() throws Exception { expectedRelatedEntities); Urn root = dataset1Urn; + OperationContext limitedHopOpContext = + operationContext.withLineageFlags(f -> f.setEntitiesExploredPerHopLimit(5)); + EntityLineageResult lineageResult = getGraphService(false) .getLineage( + limitedHopOpContext, root, LineageDirection.UPSTREAM, new GraphFilters( @@ -2183,8 +2249,7 @@ public void testHighlyConnectedGraphWalk() throws Exception { .collect(Collectors.toList())), 0, 1000, - 100, - new LineageFlags().setEntitiesExploredPerHopLimit(5)); + 100); // Unable to explore all paths because multi is disabled, but will be at least 5 since it will // explore 5 edges assertTrue( @@ -2201,6 +2266,7 @@ public void testHighlyConnectedGraphWalk() throws Exception { EntityLineageResult lineageResultMulti = getGraphService(true) .getLineage( + limitedHopOpContext, root, LineageDirection.UPSTREAM, new GraphFilters( @@ -2212,8 +2278,7 @@ public void testHighlyConnectedGraphWalk() throws Exception { .collect(Collectors.toList())), 0, 1000, - 100, - new LineageFlags().setEntitiesExploredPerHopLimit(5)); + 100); assertTrue( lineageResultMulti.getRelationships().size() >= 5 diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBaseNoVia.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBaseNoVia.java index e4cefaa1feaa1..a4a93b29f50c6 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBaseNoVia.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBaseNoVia.java @@ -1,8 +1,10 @@ package com.linkedin.metadata.graph; import static com.linkedin.metadata.search.utils.QueryUtils.*; +import static org.mockito.Mockito.mock; import static org.testng.Assert.*; +import io.datahubproject.metadata.context.OperationContext; import java.util.Arrays; import java.util.Collections; import org.testng.annotations.DataProvider; @@ -220,6 +222,7 @@ public void testFindRelatedEntitiesRelationshipTypes() throws Exception { RelatedEntitiesResult allOutgoingRelatedEntities = service.findRelatedEntities( + mock(OperationContext.class), anyType, EMPTY_FILTER, anyType, @@ -243,6 +246,7 @@ public void testFindRelatedEntitiesRelationshipTypes() throws Exception { RelatedEntitiesResult allIncomingRelatedEntities = service.findRelatedEntities( + mock(OperationContext.class), anyType, EMPTY_FILTER, anyType, @@ -269,6 +273,7 @@ public void testFindRelatedEntitiesRelationshipTypes() throws Exception { RelatedEntitiesResult allUnknownRelationshipTypeRelatedEntities = service.findRelatedEntities( + mock(OperationContext.class), anyType, EMPTY_FILTER, anyType, @@ -281,6 +286,7 @@ public void testFindRelatedEntitiesRelationshipTypes() throws Exception { RelatedEntitiesResult someUnknownRelationshipTypeRelatedEntities = service.findRelatedEntities( + mock(OperationContext.class), anyType, EMPTY_FILTER, anyType, @@ -306,6 +312,7 @@ public void testPopulatedGraphService() throws Exception { RelatedEntitiesResult relatedOutgoingEntitiesBeforeRemove = service.findRelatedEntities( + mock(OperationContext.class), anyType, EMPTY_FILTER, anyType, @@ -328,6 +335,7 @@ public void testPopulatedGraphService() throws Exception { downstreamOfSchemaFieldTwo)); RelatedEntitiesResult relatedIncomingEntitiesBeforeRemove = service.findRelatedEntities( + mock(OperationContext.class), anyType, EMPTY_FILTER, anyType, @@ -360,13 +368,14 @@ public void testPopulatedGraphService() throws Exception { public void testRemoveNode() throws Exception { GraphService service = getPopulatedGraphService(); - service.removeNode(dataset2Urn); + service.removeNode(mock(OperationContext.class), dataset2Urn); syncAfterWrite(); // assert the modified graph // All downstreamOf, hasOwner, knowsUser relationships minus datasetTwo's, outgoing assertEqualsAnyOrder( service.findRelatedEntities( + mock(OperationContext.class), anyType, EMPTY_FILTER, anyType, diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/search/ESGraphQueryDAOTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAOTest.java similarity index 92% rename from metadata-io/src/test/java/com/linkedin/metadata/graph/search/ESGraphQueryDAOTest.java rename to metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAOTest.java index 0bf7df1fc8e7c..b8e3a6e107128 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/search/ESGraphQueryDAOTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAOTest.java @@ -1,4 +1,4 @@ -package com.linkedin.metadata.graph.search; +package com.linkedin.metadata.graph.elastic; import com.google.common.collect.ImmutableList; import com.google.common.io.Resources; @@ -9,10 +9,11 @@ import com.linkedin.metadata.Constants; import com.linkedin.metadata.config.search.GraphQueryConfiguration; import com.linkedin.metadata.graph.GraphFilters; -import com.linkedin.metadata.graph.elastic.ESGraphQueryDAO; import com.linkedin.metadata.models.registry.LineageRegistry; import com.linkedin.metadata.query.LineageFlags; import com.linkedin.metadata.query.filter.RelationshipDirection; +import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.test.metadata.context.TestOperationContexts; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -21,6 +22,7 @@ import java.util.Map; import org.opensearch.index.query.QueryBuilder; import org.testng.Assert; +import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; public class ESGraphQueryDAOTest { @@ -34,8 +36,15 @@ public class ESGraphQueryDAOTest { private static final String TEST_QUERY_FILE_FULL_MULTIPLE_FILTERS = "elasticsearch/sample_filters/lineage_query_filters_full_multiple_filters.json"; + private OperationContext operationContext; + + @BeforeTest + public void init() { + operationContext = TestOperationContexts.systemContextNoSearchAuthorization(); + } + @Test - private static void testGetQueryForLineageFullArguments() throws Exception { + private void testGetQueryForLineageFullArguments() throws Exception { URL urlLimited = Resources.getResource(TEST_QUERY_FILE_LIMITED); String expectedQueryLimited = Resources.toString(urlLimited, StandardCharsets.UTF_8); @@ -108,21 +117,26 @@ private static void testGetQueryForLineageFullArguments() throws Exception { QueryBuilder fullBuilder = graphQueryDAO.getLineageQuery( + operationContext.withLineageFlags( + f -> new LineageFlags().setEndTimeMillis(endTime).setStartTimeMillis(startTime)), urnsPerEntityType, edgesPerEntityType, - graphFilters, - new LineageFlags().setEndTimeMillis(endTime).setStartTimeMillis(startTime)); + graphFilters); QueryBuilder fullBuilderEmptyFilters = graphQueryDAO.getLineageQuery( - urnsPerEntityType, edgesPerEntityType, GraphFilters.emptyGraphFilters, null); + operationContext, + urnsPerEntityType, + edgesPerEntityType, + GraphFilters.emptyGraphFilters); QueryBuilder fullBuilderMultipleFilters = graphQueryDAO.getLineageQuery( + operationContext.withLineageFlags( + f -> new LineageFlags().setEndTimeMillis(endTime).setStartTimeMillis(startTime)), urnsPerEntityTypeMultiple, edgesPerEntityTypeMultiple, - graphFiltersMultiple, - new LineageFlags().setEndTimeMillis(endTime).setStartTimeMillis(startTime)); + graphFiltersMultiple); Assert.assertEquals(limitedBuilder.toString(), expectedQueryLimited); Assert.assertEquals(fullBuilder.toString(), expectedQueryFull); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAOTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAOTest.java new file mode 100644 index 0000000000000..ac96257e8ec41 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAOTest.java @@ -0,0 +1,33 @@ +package com.linkedin.metadata.graph.elastic; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import com.linkedin.metadata.config.search.GraphQueryConfiguration; +import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor; +import com.linkedin.metadata.utils.elasticsearch.IndexConvention; +import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.script.Script; +import org.testng.annotations.Test; + +public class ESGraphWriteDAOTest { + public static final IndexConvention TEST_INDEX_CONVENTION = IndexConventionImpl.noPrefix("md5"); + + @Test + public void testUpdateByQuery() { + ESBulkProcessor mockBulkProcess = mock(ESBulkProcessor.class); + GraphQueryConfiguration config = new GraphQueryConfiguration(); + config.setGraphStatusEnabled(true); + ESGraphWriteDAO test = new ESGraphWriteDAO(TEST_INDEX_CONVENTION, mockBulkProcess, 0, config); + + test.updateByQuery(new Script("test"), QueryBuilders.boolQuery()); + + verify(mockBulkProcess) + .updateByQuery( + eq(new Script("test")), eq(QueryBuilders.boolQuery()), eq("graph_service_v1")); + verifyNoMoreInteractions(mockBulkProcess); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphServiceTest.java new file mode 100644 index 0000000000000..1f53b9c4e999e --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphServiceTest.java @@ -0,0 +1,98 @@ +package com.linkedin.metadata.graph.elastic; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.metadata.aspect.models.graph.EdgeUrnType; +import com.linkedin.metadata.entity.TestEntityRegistry; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.models.registry.LineageRegistry; +import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; +import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor; +import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl; +import java.util.Set; +import org.mockito.ArgumentCaptor; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.ExistsQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.script.Script; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +public class ElasticSearchGraphServiceTest { + + private ElasticSearchGraphService test; + private ESBulkProcessor mockESBulkProcessor; + private ESGraphWriteDAO mockWriteDAO; + private ESGraphQueryDAO mockReadDAO; + + @BeforeTest + public void beforeTest() { + EntityRegistry entityRegistry = new TestEntityRegistry(); + mockESBulkProcessor = mock(ESBulkProcessor.class); + mockWriteDAO = mock(ESGraphWriteDAO.class); + mockReadDAO = mock(ESGraphQueryDAO.class); + test = + new ElasticSearchGraphService( + new LineageRegistry(entityRegistry), + mockESBulkProcessor, + IndexConventionImpl.noPrefix("md5"), + mockWriteDAO, + mockReadDAO, + mock(ESIndexBuilder.class), + "md5"); + } + + @BeforeMethod + public void beforeMethod() { + reset(mockESBulkProcessor, mockWriteDAO, mockReadDAO); + } + + @Test + public void testSetEdgeStatus() { + final Urn testUrn = UrnUtils.getUrn("urn:li:container:test"); + for (boolean removed : Set.of(true, false)) { + test.setEdgeStatus(testUrn, removed, EdgeUrnType.values()); + + ArgumentCaptor