Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(upgrade): common base for mcl upgrades #10429

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.datahub.upgrade.system.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
Expand All @@ -15,13 +16,14 @@ public class ReindexDataJobViaNodesCLLConfig {

@Bean
public NonBlockingSystemUpgrade reindexDataJobViaNodesCLL(
final OperationContext opContext,
final EntityService<?> entityService,
final AspectDao aspectDao,
@Value("${systemUpdate.dataJobNodeCLL.enabled}") final boolean enabled,
@Value("${systemUpdate.dataJobNodeCLL.batchSize}") final Integer batchSize,
@Value("${systemUpdate.dataJobNodeCLL.delayMs}") final Integer delayMs,
@Value("${systemUpdate.dataJobNodeCLL.limit}") final Integer limit) {
return new ReindexDataJobViaNodesCLL(
entityService, aspectDao, enabled, batchSize, delayMs, limit);
opContext, entityService, aspectDao, enabled, batchSize, delayMs, limit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package com.linkedin.datahub.upgrade.system;

import static com.linkedin.metadata.Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME;

import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.EntityUtils;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

/**
* Generic upgrade step class for generating MCLs for a given aspect in order to update ES documents
*/
@Slf4j
public abstract class AbstractMCLStep implements UpgradeStep {
private final OperationContext opContext;
private final EntityService<?> entityService;
private final AspectDao aspectDao;

private final int batchSize;
private final int batchDelayMs;
private final int limit;

public AbstractMCLStep(
OperationContext opContext,
EntityService<?> entityService,
AspectDao aspectDao,
Integer batchSize,
Integer batchDelayMs,
Integer limit) {
this.opContext = opContext;
this.entityService = entityService;
this.aspectDao = aspectDao;
this.batchSize = batchSize;
this.batchDelayMs = batchDelayMs;
this.limit = limit;
}

@Nonnull
protected abstract String getAspectName();

protected Urn getUpgradeIdUrn() {
return BootstrapStep.getUpgradeUrn(id());
}

/** Optionally apply an urn-like sql filter, otherwise all urns */
@Nullable
protected abstract String getUrnLike();

@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {

// re-using for configuring the sql scan
RestoreIndicesArgs args =
new RestoreIndicesArgs().aspectName(getAspectName()).batchSize(batchSize).limit(limit);

if (getUrnLike() != null) {
args = args.urnLike(getUrnLike());
}

final AspectSpec aspectSpec =
opContext.getEntityRegistry().getAspectSpecs().get(getAspectName());

aspectDao
.streamAspectBatches(args)
.forEach(
batch -> {
log.info("Processing batch({}) of size {}.", getAspectName(), batchSize);

List<Pair<Future<?>, Boolean>> futures =
EntityUtils.toSystemAspectFromEbeanAspects(
opContext.getRetrieverContext().get(),
batch.collect(Collectors.toList()))
.stream()
.map(
systemAspect ->
entityService.alwaysProduceMCLAsync(
opContext,
systemAspect.getUrn(),
systemAspect.getUrn().getEntityType(),
getAspectName(),
aspectSpec,
null,
systemAspect.getRecordTemplate(),
null,
systemAspect
.getSystemMetadata()
.setRunId(id())
.setLastObserved(System.currentTimeMillis()),
AuditStampUtils.createDefaultAuditStamp(),
ChangeType.UPSERT))
.collect(Collectors.toList());

futures.forEach(
f -> {
try {
f.getFirst().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

if (batchDelayMs > 0) {
log.info("Sleeping for {} ms", batchDelayMs);
try {
Thread.sleep(batchDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});

entityService
.streamRestoreIndices(opContext, args, x -> context.report().addLine((String) x))
.forEach(
result -> {
context.report().addLine("Rows migrated: " + result.rowsMigrated);
context.report().addLine("Rows ignored: " + result.ignored);
});

BootstrapStep.setUpgradeResult(opContext, getUpgradeIdUrn(), entityService);
context.report().addLine("State updated: " + getUpgradeIdUrn());

return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
};
}

@Override
/** Returns whether the upgrade should be skipped. */
public boolean skip(UpgradeContext context) {
boolean previouslyRun =
entityService.exists(
opContext, getUpgradeIdUrn(), DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true);
if (previouslyRun) {
log.info("{} was already run. Skipping.", id());
}
return previouslyRun;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -18,6 +20,7 @@ public class ReindexDataJobViaNodesCLL implements NonBlockingSystemUpgrade {
private final List<UpgradeStep> _steps;

public ReindexDataJobViaNodesCLL(
@Nonnull OperationContext opContext,
EntityService<?> entityService,
AspectDao aspectDao,
boolean enabled,
Expand All @@ -28,7 +31,7 @@ public ReindexDataJobViaNodesCLL(
_steps =
ImmutableList.of(
new ReindexDataJobViaNodesCLLStep(
entityService, aspectDao, batchSize, batchDelayMs, limit));
opContext, entityService, aspectDao, batchSize, batchDelayMs, limit));
} else {
_steps = ImmutableList.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,148 +2,43 @@

import static com.linkedin.metadata.Constants.*;

import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.datahub.upgrade.system.AbstractMCLStep;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.EntityUtils;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.util.Pair;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
import io.datahubproject.metadata.context.OperationContext;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;

@Slf4j
public class ReindexDataJobViaNodesCLLStep implements UpgradeStep {

public static final String UPGRADE_ID = "via-node-cll-reindex-datajob-v3";
private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);

private final EntityService<?> entityService;
private final AspectDao aspectDao;
private final int batchSize;
private final int batchDelayMs;
private final int limit;
public class ReindexDataJobViaNodesCLLStep extends AbstractMCLStep {

public ReindexDataJobViaNodesCLLStep(
OperationContext opContext,
EntityService<?> entityService,
AspectDao aspectDao,
Integer batchSize,
Integer batchDelayMs,
Integer limit) {
this.entityService = entityService;
this.aspectDao = aspectDao;
this.batchSize = batchSize != null ? batchSize : 200;
this.batchDelayMs = batchDelayMs;
this.limit = limit;
super(opContext, entityService, aspectDao, batchSize, batchDelayMs, limit);
}

@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {

// re-using for configuring the sql scan
RestoreIndicesArgs args =
new RestoreIndicesArgs()
.aspectName(DATA_JOB_INPUT_OUTPUT_ASPECT_NAME)
.urnLike("urn:li:" + DATA_JOB_ENTITY_NAME + ":%")
.batchSize(batchSize)
.limit(limit);

final AspectSpec aspectSpec =
context
.opContext()
.getEntityRegistry()
.getAspectSpecs()
.get(DATA_JOB_INPUT_OUTPUT_ASPECT_NAME);

aspectDao
.streamAspectBatches(args)
.forEach(
batch -> {
log.info("Processing batch of size {}.", batchSize);

List<Pair<Future<?>, Boolean>> futures =
EntityUtils.toSystemAspectFromEbeanAspects(
context.opContext().getRetrieverContext().get(),
batch.collect(Collectors.toList()))
.stream()
.map(
systemAspect ->
entityService.alwaysProduceMCLAsync(
context.opContext(),
systemAspect.getUrn(),
systemAspect.getUrn().getEntityType(),
DATA_JOB_INPUT_OUTPUT_ASPECT_NAME,
aspectSpec,
null,
systemAspect.getRecordTemplate(),
null,
systemAspect
.getSystemMetadata()
.setRunId(UPGRADE_ID)
.setLastObserved(System.currentTimeMillis()),
AuditStampUtils.createDefaultAuditStamp(),
ChangeType.UPSERT))
.collect(Collectors.toList());

futures.forEach(
f -> {
try {
f.getFirst().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

if (batchDelayMs > 0) {
log.info("Sleeping for {} ms", batchDelayMs);
try {
Thread.sleep(batchDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});

entityService
.streamRestoreIndices(
context.opContext(), args, x -> context.report().addLine((String) x))
.forEach(
result -> {
context.report().addLine("Rows migrated: " + result.rowsMigrated);
context.report().addLine("Rows ignored: " + result.ignored);
});

BootstrapStep.setUpgradeResult(context.opContext(), UPGRADE_ID_URN, entityService);
context.report().addLine("State updated: " + UPGRADE_ID_URN);

return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
};
public String id() {
return "via-node-cll-reindex-datajob-v3";
}

@Nonnull
@Override
public String id() {
return UPGRADE_ID;
protected String getAspectName() {
return DATA_JOB_INPUT_OUTPUT_ASPECT_NAME;
}

/**
* Returns whether the upgrade should proceed if the step fails after exceeding the maximum
* retries.
*/
@Nullable
@Override
public boolean isOptional() {
return false;
protected String getUrnLike() {
return "urn:li:" + DATA_JOB_ENTITY_NAME + ":%";
}

@Override
Expand All @@ -152,17 +47,11 @@ public boolean isOptional() {
* variable SKIP_REINDEX_DATA_JOB_INPUT_OUTPUT to determine whether to skip.
*/
public boolean skip(UpgradeContext context) {
boolean previouslyRun =
entityService.exists(
context.opContext(), UPGRADE_ID_URN, DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true);
boolean envFlagRecommendsSkip =
Boolean.parseBoolean(System.getenv("SKIP_REINDEX_DATA_JOB_INPUT_OUTPUT"));
if (previouslyRun) {
log.info("{} was already run. Skipping.", id());
}
if (envFlagRecommendsSkip) {
log.info("Environment variable SKIP_REINDEX_DATA_JOB_INPUT_OUTPUT is set to true. Skipping.");
}
return (previouslyRun || envFlagRecommendsSkip);
return (super.skip(context) || envFlagRecommendsSkip);
}
}
Loading
Loading