Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ILM: add force-merge step to searchable snapshots action #60819

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ tasks.register("verifyVersions") {
* after the backport of the backcompat code is complete.
*/

boolean bwc_tests_enabled = true
final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */
boolean bwc_tests_enabled = false
final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/60819" /* place a PR link here when committing bwc changes */
if (bwc_tests_enabled == false) {
if (bwc_tests_disabled_issue.isEmpty()) {
throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")
Expand Down
4 changes: 4 additions & 0 deletions docs/reference/ilm/actions/ilm-forcemerge.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ This action makes the index <<dynamic-index-settings,read-only>>.
To use the `forcemerge` action in the `hot` phase, the `rollover` action *must* be present.
If no rollover action is configured, {ilm-init} will reject the policy.

[NOTE]
The `forcemerge` action is best effort. It might happen that some of the
shards are relocating, in which case they will not be merged.

[[ilm-forcemerge-options]]
==== Options

Expand Down
14 changes: 14 additions & 0 deletions docs/reference/ilm/actions/ilm-searchable-snapshot.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,20 @@ To keep the snapshot, set `delete_searchable_snapshot` to `false` in the delete
Specifies where to store the snapshot.
See <<snapshots-register-repository>> for more information.

`force_merge_index`::
(Optional, boolean)
Force merges the managed index to one segment.
Defaults to `true`.
If the managed index was already force merged using the
<<ilm-forcemerge, force merge action>> in a previous action
the `searchable snapshot` action force merge step will be a no-op.

[NOTE]
The `forcemerge` action is best effort. It might happen that some of
the shards are relocating, in which case they will not be merged.
The `searchable-snapshot` action will continue executing even if not all shards
are force merged.

[[ilm-searchable-snapshot-ex]]
==== Examples
[source,console]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ protected static void assertOK(Response response) {
* in an non green state
* @param index index to test for
**/
protected static void ensureGreen(String index) throws IOException {
public static void ensureGreen(String index) throws IOException {
ensureHealth(index, (request) -> {
request.addParameter("wait_for_status", "green");
request.addParameter("wait_for_no_relocating_shards", "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
Expand All @@ -18,7 +19,7 @@
import org.elasticsearch.xpack.core.ilm.Step.StepKey;

import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

Expand All @@ -31,38 +32,52 @@ public class SearchableSnapshotAction implements LifecycleAction {
public static final String NAME = "searchable_snapshot";

public static final ParseField SNAPSHOT_REPOSITORY = new ParseField("snapshot_repository");
public static final ParseField FORCE_MERGE_INDEX = new ParseField("force_merge_index");
public static final String CONDITIONAL_DATASTREAM_CHECK_KEY = BranchingStep.NAME + "-on-datastream-check";

public static final String RESTORED_INDEX_PREFIX = "restored-";

private static final ConstructingObjectParser<SearchableSnapshotAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
a -> new SearchableSnapshotAction((String) a[0]));
a -> new SearchableSnapshotAction((String) a[0], a[1] == null || (boolean) a[1]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), SNAPSHOT_REPOSITORY);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), FORCE_MERGE_INDEX);
}

public static SearchableSnapshotAction parse(XContentParser parser) {
return PARSER.apply(parser, null);
}

private final String snapshotRepository;
private final boolean forceMergeIndex;

public SearchableSnapshotAction(String snapshotRepository) {
public SearchableSnapshotAction(String snapshotRepository, boolean forceMergeIndex) {
if (Strings.hasText(snapshotRepository) == false) {
throw new IllegalArgumentException("the snapshot repository must be specified");
}
this.snapshotRepository = snapshotRepository;
this.forceMergeIndex = forceMergeIndex;
}

public SearchableSnapshotAction(String snapshotRepository) {
this(snapshotRepository, true);
}

public SearchableSnapshotAction(StreamInput in) throws IOException {
this(in.readString());
this(in.readString(), in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readBoolean() : true);
}

boolean isForceMergeIndex() {
return forceMergeIndex;
}

@Override
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
StepKey checkNoWriteIndex = new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME);
StepKey waitForNoFollowerStepKey = new StepKey(phase, NAME, WaitForNoFollowersStep.NAME);
StepKey forceMergeStepKey = new StepKey(phase, NAME, ForceMergeStep.NAME);
StepKey waitForSegmentCountKey = new StepKey(phase, NAME, SegmentCountStep.NAME);
StepKey generateSnapshotNameKey = new StepKey(phase, NAME, GenerateSnapshotNameStep.NAME);
StepKey cleanSnapshotKey = new StepKey(phase, NAME, CleanupSnapshotStep.NAME);
StepKey createSnapshotKey = new StepKey(phase, NAME, CreateSnapshotStep.NAME);
Expand All @@ -77,8 +92,14 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {

CheckNotDataStreamWriteIndexStep checkNoWriteIndexStep = new CheckNotDataStreamWriteIndexStep(checkNoWriteIndex,
waitForNoFollowerStepKey);
WaitForNoFollowersStep waitForNoFollowersStep = new WaitForNoFollowersStep(waitForNoFollowerStepKey, generateSnapshotNameKey,
client);
final WaitForNoFollowersStep waitForNoFollowersStep;
if (forceMergeIndex) {
waitForNoFollowersStep = new WaitForNoFollowersStep(waitForNoFollowerStepKey, forceMergeStepKey, client);
} else {
waitForNoFollowersStep = new WaitForNoFollowersStep(waitForNoFollowerStepKey, generateSnapshotNameKey, client);
}
ForceMergeStep forceMergeStep = new ForceMergeStep(forceMergeStepKey, waitForSegmentCountKey, client, 1);
SegmentCountStep segmentCountStep = new SegmentCountStep(waitForSegmentCountKey, generateSnapshotNameKey, client, 1);
GenerateSnapshotNameStep generateSnapshotNameStep = new GenerateSnapshotNameStep(generateSnapshotNameKey, cleanSnapshotKey,
snapshotRepository);
CleanupSnapshotStep cleanupSnapshotStep = new CleanupSnapshotStep(cleanSnapshotKey, createSnapshotKey, client);
Expand Down Expand Up @@ -108,9 +129,25 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
SwapAliasesAndDeleteSourceIndexStep swapAliasesAndDeleteSourceIndexStep = new SwapAliasesAndDeleteSourceIndexStep(swapAliasesKey,
null, client, RESTORED_INDEX_PREFIX);

return Arrays.asList(checkNoWriteIndexStep, waitForNoFollowersStep, generateSnapshotNameStep, cleanupSnapshotStep,
createSnapshotBranchingStep, mountSnapshotStep, waitForGreenIndexHealthStep, copyMetadataStep, copySettingsStep,
isDataStreamBranchingStep, replaceDataStreamBackingIndex, deleteSourceIndexStep, swapAliasesAndDeleteSourceIndexStep);
List<Step> steps = new ArrayList<>();
steps.add(checkNoWriteIndexStep);
steps.add(waitForNoFollowersStep);
if (forceMergeIndex) {
steps.add(forceMergeStep);
steps.add(segmentCountStep);
}
steps.add(generateSnapshotNameStep);
steps.add(cleanupSnapshotStep);
steps.add(createSnapshotBranchingStep);
steps.add(mountSnapshotStep);
steps.add(waitForGreenIndexHealthStep);
steps.add(copyMetadataStep);
steps.add(copySettingsStep);
steps.add(isDataStreamBranchingStep);
steps.add(replaceDataStreamBackingIndex);
steps.add(deleteSourceIndexStep);
steps.add(swapAliasesAndDeleteSourceIndexStep);
return steps;
}

@Override
Expand All @@ -126,12 +163,16 @@ public String getWriteableName() {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(snapshotRepository);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeBoolean(forceMergeIndex);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(SNAPSHOT_REPOSITORY.getPreferredName(), snapshotRepository);
builder.field(FORCE_MERGE_INDEX.getPreferredName(), forceMergeIndex);
builder.endObject();
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ public void evaluateCondition(Metadata metadata, Index index, Listener listener,
if (idxSegments == null || (response.getShardFailures() != null && response.getShardFailures().length > 0)) {
final DefaultShardOperationFailedException[] failures = response.getShardFailures();
logger.info("[{}] retrieval of segment counts after force merge did not succeed, " +
"there were {} shard failures. " +
"failures: {}",
"there were {} shard failures. failures: {}",
index.getName(),
response.getFailedShards(),
failures == null ? "n/a" : Strings.collectionToDelimitedString(Arrays.stream(failures)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,73 @@ public class SearchableSnapshotActionTests extends AbstractActionTestCase<Search
@Override
public void testToSteps() {
String phase = randomAlphaOfLengthBetween(1, 10);
StepKey expectedFirstStep = new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME);
StepKey expectedSecondStep = new StepKey(phase, NAME, WaitForNoFollowersStep.NAME);
StepKey expectedThirdStep = new StepKey(phase, NAME, GenerateSnapshotNameStep.NAME);
StepKey expectedFourthStep = new StepKey(phase, NAME, CleanupSnapshotStep.NAME);
StepKey expectedFifthStep = new StepKey(phase, NAME, CreateSnapshotStep.NAME);
StepKey expectedSixthStep = new StepKey(phase, NAME, MountSnapshotStep.NAME);
StepKey expectedSeventhStep = new StepKey(phase, NAME, WaitForIndexColorStep.NAME);
StepKey expectedEighthStep = new StepKey(phase, NAME, CopyExecutionStateStep.NAME);
StepKey expectedNinthStep = new StepKey(phase, NAME, CopySettingsStep.NAME);
StepKey expectedTenthStep = new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_DATASTREAM_CHECK_KEY);
StepKey expectedElevenStep = new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME);
StepKey expectedTwelveStep = new StepKey(phase, NAME, DeleteStep.NAME);
StepKey expectedThirteenStep = new StepKey(phase, NAME, SwapAliasesAndDeleteSourceIndexStep.NAME);

SearchableSnapshotAction action = createTestInstance();
StepKey nextStepKey = new StepKey(phase, randomAlphaOfLengthBetween(1, 5), randomAlphaOfLengthBetween(1, 5));

List<Step> steps = action.toSteps(null, phase, nextStepKey);
assertThat(steps.size(), is(13));

assertThat(steps.get(0).getKey(), is(expectedFirstStep));
assertThat(steps.get(1).getKey(), is(expectedSecondStep));
assertThat(steps.get(2).getKey(), is(expectedThirdStep));
assertThat(steps.get(3).getKey(), is(expectedFourthStep));
assertThat(steps.get(4).getKey(), is(expectedFifthStep));
assertThat(steps.get(5).getKey(), is(expectedSixthStep));
assertThat(steps.get(6).getKey(), is(expectedSeventhStep));
assertThat(steps.get(7).getKey(), is(expectedEighthStep));
assertThat(steps.get(8).getKey(), is(expectedNinthStep));
assertThat(steps.get(9).getKey(), is(expectedTenthStep));
assertThat(steps.get(10).getKey(), is(expectedElevenStep));
assertThat(steps.get(11).getKey(), is(expectedTwelveStep));
assertThat(steps.get(12).getKey(), is(expectedThirteenStep));

AsyncActionBranchingStep branchStep = (AsyncActionBranchingStep) steps.get(4);
assertThat(branchStep.getNextKeyOnIncompleteResponse(), is(expectedFourthStep));
assertThat(steps.size(), is(action.isForceMergeIndex() ? 15 : 13));

List<StepKey> expectedSteps = action.isForceMergeIndex() ? expectedStepKeysWithForceMerge(phase) :
expectedStepKeysNoForceMerge(phase);

assertThat(steps.get(0).getKey(), is(expectedSteps.get(0)));
assertThat(steps.get(1).getKey(), is(expectedSteps.get(1)));
assertThat(steps.get(2).getKey(), is(expectedSteps.get(2)));
assertThat(steps.get(3).getKey(), is(expectedSteps.get(3)));
assertThat(steps.get(4).getKey(), is(expectedSteps.get(4)));
assertThat(steps.get(5).getKey(), is(expectedSteps.get(5)));
assertThat(steps.get(6).getKey(), is(expectedSteps.get(6)));
assertThat(steps.get(7).getKey(), is(expectedSteps.get(7)));
assertThat(steps.get(8).getKey(), is(expectedSteps.get(8)));
assertThat(steps.get(9).getKey(), is(expectedSteps.get(9)));
assertThat(steps.get(10).getKey(), is(expectedSteps.get(10)));
assertThat(steps.get(11).getKey(), is(expectedSteps.get(11)));
assertThat(steps.get(12).getKey(), is(expectedSteps.get(12)));

if (action.isForceMergeIndex()) {
assertThat(steps.get(13).getKey(), is(expectedSteps.get(13)));
AsyncActionBranchingStep branchStep = (AsyncActionBranchingStep) steps.get(6);
assertThat(branchStep.getNextKeyOnIncompleteResponse(), is(expectedSteps.get(5)));
} else {
AsyncActionBranchingStep branchStep = (AsyncActionBranchingStep) steps.get(4);
assertThat(branchStep.getNextKeyOnIncompleteResponse(), is(expectedSteps.get(3)));
}
}

private List<StepKey> expectedStepKeysWithForceMerge(String phase) {
return List.of(
new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME),
new StepKey(phase, NAME, WaitForNoFollowersStep.NAME),
new StepKey(phase, NAME, ForceMergeStep.NAME),
new StepKey(phase, NAME, SegmentCountStep.NAME),
new StepKey(phase, NAME, GenerateSnapshotNameStep.NAME),
new StepKey(phase, NAME, CleanupSnapshotStep.NAME),
new StepKey(phase, NAME, CreateSnapshotStep.NAME),
new StepKey(phase, NAME, MountSnapshotStep.NAME),
new StepKey(phase, NAME, WaitForIndexColorStep.NAME),
new StepKey(phase, NAME, CopyExecutionStateStep.NAME),
new StepKey(phase, NAME, CopySettingsStep.NAME),
new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_DATASTREAM_CHECK_KEY),
new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME),
new StepKey(phase, NAME, DeleteStep.NAME),
new StepKey(phase, NAME, SwapAliasesAndDeleteSourceIndexStep.NAME));
}

private List<StepKey> expectedStepKeysNoForceMerge(String phase) {
return List.of(
new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME),
new StepKey(phase, NAME, WaitForNoFollowersStep.NAME),
new StepKey(phase, NAME, GenerateSnapshotNameStep.NAME),
new StepKey(phase, NAME, CleanupSnapshotStep.NAME),
new StepKey(phase, NAME, CreateSnapshotStep.NAME),
new StepKey(phase, NAME, MountSnapshotStep.NAME),
new StepKey(phase, NAME, WaitForIndexColorStep.NAME),
new StepKey(phase, NAME, CopyExecutionStateStep.NAME),
new StepKey(phase, NAME, CopySettingsStep.NAME),
new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_DATASTREAM_CHECK_KEY),
new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME),
new StepKey(phase, NAME, DeleteStep.NAME),
new StepKey(phase, NAME, SwapAliasesAndDeleteSourceIndexStep.NAME));
}

@Override
Expand All @@ -79,6 +110,6 @@ protected SearchableSnapshotAction mutateInstance(SearchableSnapshotAction insta
}

static SearchableSnapshotAction randomInstance() {
return new SearchableSnapshotAction(randomAlphaOfLengthBetween(5, 10));
return new SearchableSnapshotAction(randomAlphaOfLengthBetween(5, 10), randomBoolean());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -36,12 +37,15 @@
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.ESTestCase.randomAlphaOfLengthBetween;
import static org.elasticsearch.test.ESTestCase.randomBoolean;
import static org.elasticsearch.test.rest.ESRestTestCase.ensureGreen;

/**
* This class provides the operational REST functions needed to control an ILM time series lifecycle.
Expand Down Expand Up @@ -204,4 +208,36 @@ public static Map<String, Object> getOnlyIndexSettings(RestClient client, String
}
}

public static void createIndexWithSettings(RestClient client, String index, String alias, Settings.Builder settings)
throws IOException {
createIndexWithSettings(client, index, alias, settings, randomBoolean());
}

public static void createIndexWithSettings(RestClient client, String index, String alias, Settings.Builder settings,
boolean useWriteIndex) throws IOException {
Request request = new Request("PUT", "/" + index);

String writeIndexSnippet = "";
if (useWriteIndex) {
writeIndexSnippet = "\"is_write_index\": true";
}
request.setJsonEntity("{\n \"settings\": " + Strings.toString(settings.build())
+ ", \"aliases\" : { \"" + alias + "\": { " + writeIndexSnippet + " } } }");
client.performRequest(request);
// wait for the shards to initialize
ensureGreen(index);
}

@SuppressWarnings("unchecked")
public static Integer getNumberOfSegments(RestClient client, String index) throws IOException {
Response response = client.performRequest(new Request("GET", index + "/_segments"));
XContentType entityContentType = XContentType.fromMediaTypeOrFormat(response.getEntity().getContentType().getValue());
Map<String, Object> responseEntity = XContentHelper.convertToMap(entityContentType.xContent(),
response.getEntity().getContent(), false);
responseEntity = (Map<String, Object>) responseEntity.get("indices");
responseEntity = (Map<String, Object>) responseEntity.get(index);
responseEntity = (Map<String, Object>) responseEntity.get("shards");
List<Map<String, Object>> shards = (List<Map<String, Object>>) responseEntity.get("0");
return (Integer) shards.get(0).get("num_search_segments");
}
}
Loading