Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(graph): graph index soft-delete support #11453

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
import com.linkedin.gms.factory.kafka.common.TopicConventionFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.metadata.aspect.GraphRetriever;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
import com.linkedin.metadata.dao.producer.KafkaHealthChecker;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.EntityServiceAspectRetriever;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.graph.SystemGraphRetriever;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.metadata.search.SearchServiceSearchRetriever;
Expand Down Expand Up @@ -145,7 +146,7 @@ protected OperationContext javaSystemOperationContext(
@Nonnull final EntityRegistry entityRegistry,
@Nonnull final EntityService<?> entityService,
@Nonnull final RestrictedService restrictedService,
@Nonnull final GraphRetriever graphRetriever,
@Nonnull final GraphService graphService,
@Nonnull final SearchService searchService,
@Qualifier("baseElasticSearchComponents")
BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components) {
Expand All @@ -159,6 +160,9 @@ protected OperationContext javaSystemOperationContext(
SearchServiceSearchRetriever searchServiceSearchRetriever =
SearchServiceSearchRetriever.builder().searchService(searchService).build();

SystemGraphRetriever systemGraphRetriever =
SystemGraphRetriever.builder().graphService(graphService).build();

OperationContext systemOperationContext =
OperationContext.asSystem(
operationContextConfig,
Expand All @@ -168,11 +172,12 @@ protected OperationContext javaSystemOperationContext(
components.getIndexConvention(),
RetrieverContext.builder()
.aspectRetriever(entityServiceAspectRetriever)
.graphRetriever(graphRetriever)
.graphRetriever(systemGraphRetriever)
.searchRetriever(searchServiceSearchRetriever)
.build());

entityServiceAspectRetriever.setSystemOperationContext(systemOperationContext);
systemGraphRetriever.setSystemOperationContext(systemOperationContext);
searchServiceSearchRetriever.setSystemOperationContext(systemOperationContext);

return systemOperationContext;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.linkedin.datahub.upgrade.config;
package com.linkedin.datahub.upgrade.config.graph;

import com.linkedin.datahub.upgrade.config.SystemUpdateCondition;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.datahub.upgrade.system.graph.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.linkedin.datahub.upgrade.config.graph;

import com.linkedin.datahub.upgrade.config.SystemUpdateCondition;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.graph.edgestatus.ReindexEdgeStatus;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;

@Configuration
@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class)
public class ReindexEdgeStatusConfig {

@Bean
public NonBlockingSystemUpgrade reindexEdgeStatus(
final OperationContext opContext,
final EntityService<?> entityService,
final AspectDao aspectDao,
@Value("${elasticsearch.search.graph.graphStatusEnabled}") final boolean featureEnabled,
@Value("${systemUpdate.edgeStatus.enabled}") final boolean enabled,
@Value("${systemUpdate.edgeStatus.batchSize}") final Integer batchSize,
@Value("${systemUpdate.edgeStatus.delayMs}") final Integer delayMs,
@Value("${systemUpdate.edgeStatus.limit}") final Integer limit) {
return new ReindexEdgeStatus(
opContext, entityService, aspectDao, featureEnabled && enabled, batchSize, delayMs, limit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.linkedin.datahub.upgrade.system.graph.edgestatus;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;

/**
* A job that reindexes all status aspects as part of the graph edges containing status information.
* This is required to make sure previously written status information is present in the graph
* index.
*/
@Slf4j
public class ReindexEdgeStatus implements NonBlockingSystemUpgrade {

private final List<UpgradeStep> _steps;

public ReindexEdgeStatus(
@Nonnull OperationContext opContext,
EntityService<?> entityService,
AspectDao aspectDao,
boolean enabled,
Integer batchSize,
Integer batchDelayMs,
Integer limit) {
if (enabled) {
_steps =
ImmutableList.of(
new ReindexReindexEdgeStatusStep(
opContext, entityService, aspectDao, batchSize, batchDelayMs, limit));
} else {
_steps = ImmutableList.of();
}
}

@Override
public String id() {
return this.getClass().getName();
}

@Override
public List<UpgradeStep> steps() {
return _steps;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.linkedin.datahub.upgrade.system.graph.edgestatus;

import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;

import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.system.AbstractMCLStep;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;

@Slf4j
public class ReindexReindexEdgeStatusStep extends AbstractMCLStep {

public ReindexReindexEdgeStatusStep(
OperationContext opContext,
EntityService<?> entityService,
AspectDao aspectDao,
Integer batchSize,
Integer batchDelayMs,
Integer limit) {
super(opContext, entityService, aspectDao, batchSize, batchDelayMs, limit);
}

@Override
public String id() {
return "edge-status-reindex-v1";
}

@Nonnull
@Override
protected String getAspectName() {
return STATUS_ASPECT_NAME;
}

@Nullable
@Override
protected String getUrnLike() {
return null;
}

@Override
/**
* Returns whether the upgrade should be skipped. Uses previous run history or the environment
* variable to determine whether to skip.
*/
public boolean skip(UpgradeContext context) {
boolean envFlagRecommendsSkip = Boolean.parseBoolean(System.getenv("SKIP_REINDEX_EDGE_STATUS"));
if (envFlagRecommendsSkip) {
log.info("Environment variable SKIP_REINDEX_EDGE_STATUS is set to true. Skipping.");
}
return (super.skip(context) || envFlagRecommendsSkip);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.datahub.upgrade.system.vianodes;
package com.linkedin.datahub.upgrade.system.graph.vianodes;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.UpgradeStep;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.datahub.upgrade.system.vianodes;
package com.linkedin.datahub.upgrade.system.graph.vianodes;

import static com.linkedin.metadata.Constants.*;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager;
import com.linkedin.datahub.upgrade.system.SystemUpdateNonBlocking;
import com.linkedin.datahub.upgrade.system.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.datahub.upgrade.system.graph.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.boot.kafka.MockSystemUpdateDeserializer;
import com.linkedin.metadata.boot.kafka.MockSystemUpdateSerializer;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
Expand All @@ -36,6 +37,10 @@ public class Edge {
@EqualsAndHashCode.Include private Urn lifecycleOwner;
// An entity through which the edge between source and destination is created
@EqualsAndHashCode.Include private Urn via;
@EqualsAndHashCode.Exclude @Nullable private Boolean sourceStatus;
@EqualsAndHashCode.Exclude @Nullable private Boolean destinationStatus;
@EqualsAndHashCode.Exclude @Nullable private Boolean viaStatus;
@EqualsAndHashCode.Exclude @Nullable private Boolean lifecycleOwnerStatus;

// For backwards compatibility
public Edge(
Expand All @@ -57,6 +62,38 @@ public Edge(
updatedActor,
properties,
null,
null,
null,
null,
null,
null);
}

public Edge(
Urn source,
Urn destination,
String relationshipType,
Long createdOn,
Urn createdActor,
Long updatedOn,
Urn updatedActor,
Map<String, Object> properties,
Urn lifecycleOwner,
Urn via) {
this(
source,
destination,
relationshipType,
createdOn,
createdActor,
updatedOn,
updatedActor,
properties,
lifecycleOwner,
via,
null,
null,
null,
null);
}

Expand Down Expand Up @@ -91,6 +128,10 @@ public String toDocId(@Nonnull String idHashAlgo) {
public static final String EDGE_FIELD_LIFECYCLE_OWNER = "lifecycleOwner";
public static final String EDGE_SOURCE_URN_FIELD = "source.urn";
public static final String EDGE_DESTINATION_URN_FIELD = "destination.urn";
public static final String EDGE_SOURCE_STATUS = "source.removed";
public static final String EDGE_DESTINATION_STATUS = "destination.removed";
public static final String EDGE_FIELD_VIA_STATUS = "viaRemoved";
public static final String EDGE_FIELD_LIFECYCLE_OWNER_STATUS = "lifecycleOwnerRemoved";

public static final List<Pair<String, SortOrder>> KEY_SORTS =
ImmutableList.of(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.linkedin.metadata.aspect.models.graph;

public enum EdgeUrnType {
SOURCE,
DESTINATION,
VIA,
LIFECYCLE_OWNER
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
import lombok.EqualsAndHashCode;

/** Class representing an authenticated actor accessing DataHub. */
@EqualsAndHashCode
public class Authentication {

private final Actor authenticatedActor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.metadata.query.filter.RelationshipDirection;
import com.linkedin.metadata.search.utils.QueryUtils;
import io.datahubproject.metadata.context.OperationContext;
import java.net.URISyntaxException;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -19,10 +20,13 @@
@Slf4j
public class JavaGraphClient implements GraphClient {

GraphService _graphService;
private final OperationContext systemOpContext;
private final GraphService graphService;

public JavaGraphClient(@Nonnull GraphService graphService) {
this._graphService = graphService;
public JavaGraphClient(
@Nonnull OperationContext systemOpContext, @Nonnull GraphService graphService) {
this.systemOpContext = systemOpContext;
this.graphService = graphService;
}

/**
Expand All @@ -43,7 +47,8 @@ public EntityRelationships getRelatedEntities(
count = count == null ? DEFAULT_PAGE_SIZE : count;

RelatedEntitiesResult relatedEntitiesResult =
_graphService.findRelatedEntities(
graphService.findRelatedEntities(
systemOpContext,
null,
QueryUtils.newFilter("urn", rawUrn),
null,
Expand Down Expand Up @@ -91,7 +96,8 @@ public EntityLineageResult getLineageEntities(
@Nullable Integer count,
int maxHops,
String actor) {
return _graphService.getLineage(
return graphService.getLineage(
systemOpContext,
UrnUtils.getUrn(rawUrn),
direction,
start != null ? start : 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,7 @@ public EntityLineageResult getLineage(
if (separateSiblings) {
return ValidationUtils.validateEntityLineageResult(
opContext,
_graphService.getLineage(
entityUrn,
direction,
offset,
count,
maxHops,
opContext.getSearchContext().getLineageFlags()),
_graphService.getLineage(opContext, entityUrn, direction, offset, count, maxHops),
_entityService);
}

Expand All @@ -81,13 +75,7 @@ public EntityLineageResult getLineage(
}

EntityLineageResult entityLineage =
_graphService.getLineage(
entityUrn,
direction,
offset,
count,
maxHops,
opContext.getSearchContext().getLineageFlags());
_graphService.getLineage(opContext, entityUrn, direction, offset, count, maxHops);

Siblings siblingAspectOfEntity =
(Siblings) _entityService.getLatestAspect(opContext, entityUrn, SIBLINGS_ASPECT_NAME);
Expand Down
Loading
Loading