Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(restoreIndices): update restore indices args and docs #12529

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public class RestoreIndices implements Upgrade {
public static final String URN_BASED_PAGINATION_ARG_NAME = "urnBasedPagination";

public static final String STARTING_OFFSET_ARG_NAME = "startingOffset";
public static final String LAST_URN_ARG_NAME = "lastUrn";
public static final String LAST_ASPECT_ARG_NAME = "lastAspect";
public static final String GE_PIT_EPOCH_MS_ARG_NAME = "gePitEpochMs";
public static final String LE_PIT_EPOCH_MS_ARG_NAME = "lePitEpochMs";
public static final String ASPECT_NAMES_ARG_NAME = "aspectNames";

private final List<UpgradeStep> _steps;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.ebean.Database;
import io.ebean.ExpressionList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -123,6 +124,30 @@
} else {
context.report().addLine("No urnLike arg present");
}
if (containsKey(context.parsedArgs(), RestoreIndices.LE_PIT_EPOCH_MS_ARG_NAME)) {
result.lePitEpochMs =
Long.parseLong(context.parsedArgs().get(RestoreIndices.LE_PIT_EPOCH_MS_ARG_NAME).get());
context.report().addLine(String.format("lePitEpochMs is %s", result.lePitEpochMs));

Check warning on line 130 in datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java

View check run for this annotation

Codecov / codecov/patch

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java#L128-L130

Added lines #L128 - L130 were not covered by tests
}
if (containsKey(context.parsedArgs(), RestoreIndices.GE_PIT_EPOCH_MS_ARG_NAME)) {
result.gePitEpochMs =
Long.parseLong(context.parsedArgs().get(RestoreIndices.GE_PIT_EPOCH_MS_ARG_NAME).get());
context.report().addLine(String.format("gePitEpochMs is %s", result.gePitEpochMs));

Check warning on line 135 in datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java

View check run for this annotation

Codecov / codecov/patch

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java#L133-L135

Added lines #L133 - L135 were not covered by tests
}
if (containsKey(context.parsedArgs(), RestoreIndices.LAST_URN_ARG_NAME)) {
result.lastUrn = context.parsedArgs().get(RestoreIndices.LAST_URN_ARG_NAME).get();
context.report().addLine(String.format("lastUrn is %s", result.lastUrn));

Check warning on line 139 in datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java

View check run for this annotation

Codecov / codecov/patch

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java#L138-L139

Added lines #L138 - L139 were not covered by tests
}
if (containsKey(context.parsedArgs(), RestoreIndices.LAST_ASPECT_ARG_NAME)) {
result.lastAspect = context.parsedArgs().get(RestoreIndices.LAST_ASPECT_ARG_NAME).get();
context.report().addLine(String.format("lastAspect is %s", result.lastAspect));

Check warning on line 143 in datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java

View check run for this annotation

Codecov / codecov/patch

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java#L142-L143

Added lines #L142 - L143 were not covered by tests
}
if (containsKey(context.parsedArgs(), RestoreIndices.ASPECT_NAMES_ARG_NAME)) {
result.aspectNames =
Arrays.asList(
context.parsedArgs().get(RestoreIndices.ASPECT_NAMES_ARG_NAME).get().split(","));
context.report().addLine(String.format("aspectNames is %s", result.aspectNames));

Check warning on line 149 in datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java

View check run for this annotation

Codecov / codecov/patch

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java#L146-L149

Added lines #L146 - L149 were not covered by tests
}
return result;
}

Expand Down
32 changes: 32 additions & 0 deletions docs/how/restore-indices.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,38 @@ By default, restoring the indices from the local database will not remove any ex
the search and graph indices that no longer exist in the local database, potentially leading to inconsistencies
between the search and graph indices and the local database.

## Configuration

The upgrade jobs take arguments as command line args to the job itself rather than environment variables for job specific configuration. The RestoreIndices job is specified through the `-u RestoreIndices` upgrade ID parameter and then additional parameters are specified like `-a batchSize=1000`.
The following configurations are available:

### Time-Based Filtering

* `lePitEpochMs`: Restore records created before this timestamp (in milliseconds)
* `gePitEpochMs`: Restore records created after this timestamp (in milliseconds)

### Pagination and Performance Options

* `urnBasedPagination`: Enable key-based pagination instead of offset-based pagination. Recommended for large datasets as it's typically more efficient.
* `startingOffset`: When using default pagination, start from this offset
* `lastUrn`: Resume from a specific URN when using URN-based pagination
* `lastAspect`: Used with lastUrn to resume from a specific aspect, preventing reprocessing
* `numThreads`: Number of concurrent threads for processing restoration, only used with default offset based paging
* `batchSize`: Configures the size of each batch as the job pages through rows
* `batchDelayMs`: Adds a delay in between each batch to avoid overloading backend systems

### Content Filtering

* `aspectNames`: Comma-separated list of aspects to restore (e.g., "ownership,status")
* `urnLike`: SQL LIKE pattern to filter URNs (e.g., "urn:li:dataset%")

### Nuclear option
* `clean`: This option wipes out the current indices by running deletes of all the documents to guarantee a consistent state with SQL. This is generally not recommended unless there is significant data corruption on the instance.

### Helm

These are available in the helm charts as configurations for Kubernetes deployments under the `datahubUpgrade.restoreIndices.args` path which will set them up as args to the pod command.

## Quickstart

If you're using the quickstart images, you can use the `datahub` cli to restore the indices.
Expand Down
Loading