Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Force merge API supports performing only on primary shards (#11269) #12609

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix error in RemoteSegmentStoreDirectory when debug logging is enabled ([#12328](https://github.com/opensearch-project/OpenSearch/pull/12328))
- Support for returning scores in matched queries ([#11626](https://github.com/opensearch-project/OpenSearch/pull/11626))
- Add shard id property to SearchLookup for use in field types provided by plugins ([#1063](https://github.com/opensearch-project/OpenSearch/pull/1063))
- Force merge API supports performing on primary shards only ([#11269](https://github.com/opensearch-project/OpenSearch/pull/11269))
- Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835))
- [Admission Control] Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats ([#11880](https://github.com/opensearch-project/OpenSearch/pull/11880))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
"wait_for_completion": {
"type" : "boolean",
"description" : "If false, the request will return a task immediately and the operation will run in background. Defaults to true."
},
"primary_only": {
"type" : "boolean",
"description" : "Specify whether the operation should only perform on primary shards. Defaults to false."
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,23 @@
index: test
max_num_segments: 10
only_expunge_deletes: true

---
"Test primary_only parameter":
- skip:
version: " - 2.12.99"
reason: "primary_only is available in 2.13.0+"

- do:
indices.create:
index: test
body:
settings:
index.number_of_shards: 2
index.number_of_replicas: 1

- do:
indices.forcemerge:
index: test
primary_only: true
- match: { _shards.total: 2 }
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
# will return a task immediately and the merge process will run in background.

- skip:
version: " - 2.6.99"
reason: "only available in 2.7+"
version: " - 2.6.99, 2.13.0 - "
reason: "wait_for_completion was introduced in in 2.7.0 and task description was changed in 2.13.0"
gaobinlong marked this conversation as resolved.
Show resolved Hide resolved
features: allowed_warnings

- do:
Expand All @@ -27,6 +27,28 @@
- match: { task.action: "indices:admin/forcemerge" }
- match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true]" }

---
"Force merge index with wait_for_completion after task description changed":
- skip:
version: " - 2.12.99 "
reason: "task description was changed in 2.13.0"
features: allowed_warnings
gaobinlong marked this conversation as resolved.
Show resolved Hide resolved

- do:
indices.forcemerge:
index: test_index
wait_for_completion: false
max_num_segments: 1
- match: { task: /^.+$/ }
- set: { task: taskId }

- do:
tasks.get:
wait_for_completion: true
task_id: $taskId
- match: { task.action: "indices:admin/forcemerge" }
- match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true], primaryOnly[false]" }

# .tasks index is created when the force-merge operation completes, so we should delete .tasks index finally,
# if not, the .tasks index may introduce unexpected warnings and then cause other test cases to fail.
# Delete the .tasks index directly will also introduce warning, but currently we don't have such APIs which can delete one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,24 @@ public void testForceMergeUUIDConsistent() throws IOException {
assertThat(primaryForceMergeUUID, is(replicaForceMergeUUID));
}

public void testForceMergeOnlyOnPrimaryShards() throws IOException {
internalCluster().ensureAtLeastNumDataNodes(2);
final String index = "test-index";
createIndex(
index,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
);
ensureGreen(index);
final ForceMergeResponse forceMergeResponse = client().admin()
.indices()
.prepareForceMerge(index)
.setMaxNumSegments(1)
.setPrimaryOnly(true)
.get();
assertThat(forceMergeResponse.getFailedShards(), is(0));
assertThat(forceMergeResponse.getSuccessfulShards(), is(1));
}

private static String getForceMergeUUID(IndexShard indexShard) throws IOException {
try (GatedCloseable<IndexCommit> wrappedIndexCommit = indexShard.acquireLastIndexCommit(true)) {
return wrappedIndexCommit.get().getUserData().get(Engine.FORCE_MERGE_UUID_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.get.GetResponse;
Expand Down Expand Up @@ -400,6 +401,14 @@ public void testMultipleShards() throws Exception {
}

public void testReplicationAfterForceMerge() throws Exception {
performReplicationAfterForceMerge(false, SHARD_COUNT * (1 + REPLICA_COUNT));
}

public void testReplicationAfterForceMergeOnPrimaryShardsOnly() throws Exception {
performReplicationAfterForceMerge(true, SHARD_COUNT);
}

private void performReplicationAfterForceMerge(boolean primaryOnly, int expectedSuccessfulShards) throws Exception {
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -430,8 +439,16 @@ public void testReplicationAfterForceMerge() throws Exception {
waitForDocs(expectedHitCount, indexer);
waitForSearchableDocs(expectedHitCount, nodeA, nodeB);

// Force a merge here so that the in memory SegmentInfos does not reference old segments on disk.
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
// Perform force merge only on the primary shards.
final ForceMergeResponse forceMergeResponse = client().admin()
.indices()
.prepareForceMerge(INDEX_NAME)
.setPrimaryOnly(primaryOnly)
.setMaxNumSegments(1)
.setFlush(false)
.get();
assertThat(forceMergeResponse.getFailedShards(), is(0));
assertThat(forceMergeResponse.getSuccessfulShards(), is(expectedSuccessfulShards));
refresh(INDEX_NAME);
verifyStoreContent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,13 @@ public static final class Defaults {
public static final int MAX_NUM_SEGMENTS = -1;
public static final boolean ONLY_EXPUNGE_DELETES = false;
public static final boolean FLUSH = true;
public static final boolean PRIMARY_ONLY = false;
}

private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS;
private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
private boolean flush = Defaults.FLUSH;
private boolean primaryOnly = Defaults.PRIMARY_ONLY;

private static final Version FORCE_MERGE_UUID_VERSION = LegacyESVersion.V_7_7_0;

Expand Down Expand Up @@ -102,6 +104,9 @@ public ForceMergeRequest(StreamInput in) throws IOException {
maxNumSegments = in.readInt();
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_2_13_0)) {
primaryOnly = in.readBoolean();
}
if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
forceMergeUUID = in.readOptionalString();
} else {
Expand Down Expand Up @@ -167,6 +172,21 @@ public ForceMergeRequest flush(boolean flush) {
return this;
}

/**
* Should force merge only performed on primary shards. Defaults to {@code false}.
*/
public boolean primaryOnly() {
return primaryOnly;
}

/**
* Should force merge only performed on primary shards. Defaults to {@code false}.
*/
public ForceMergeRequest primaryOnly(boolean primaryOnly) {
this.primaryOnly = primaryOnly;
return this;
}

/**
* Should this task store its result after it has finished?
*/
Expand All @@ -189,6 +209,8 @@ public String getDescription() {
+ onlyExpungeDeletes
+ "], flush["
+ flush
+ "], primaryOnly["
+ primaryOnly
+ "]";
}

Expand All @@ -198,6 +220,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeInt(maxNumSegments);
out.writeBoolean(onlyExpungeDeletes);
out.writeBoolean(flush);
if (out.getVersion().onOrAfter(Version.V_2_13_0)) {
out.writeBoolean(primaryOnly);
}
if (out.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
out.writeOptionalString(forceMergeUUID);
}
Expand All @@ -212,6 +237,8 @@ public String toString() {
+ onlyExpungeDeletes
+ ", flush="
+ flush
+ ", primaryOnly="
+ primaryOnly
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,12 @@
request.flush(flush);
return this;
}

/**
* Should force merge only performed on primary shards. Defaults to {@code false}.
*/
public ForceMergeRequestBuilder setPrimaryOnly(boolean primaryOnly) {
request.primaryOnly(primaryOnly);
return this;

Check warning on line 90 in server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestBuilder.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestBuilder.java#L89-L90

Added lines #L89 - L90 were not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,16 @@
}

/**
* The refresh request works against *all* shards.
* The force merge request works against *all* shards by default, but it can work against all primary shards only
* by setting primary_only to true.
*/
@Override
protected ShardsIterator shards(ClusterState clusterState, ForceMergeRequest request, String[] concreteIndices) {
return clusterState.routingTable().allShards(concreteIndices);
if (request.primaryOnly()) {
return clusterState.routingTable().allShardsSatisfyingPredicate(concreteIndices, ShardRouting::primary);

Check warning on line 124 in server/src/main/java/org/opensearch/action/admin/indices/forcemerge/TransportForceMergeAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/forcemerge/TransportForceMergeAction.java#L124

Added line #L124 was not covered by tests
} else {
return clusterState.routingTable().allShards(concreteIndices);

Check warning on line 126 in server/src/main/java/org/opensearch/action/admin/indices/forcemerge/TransportForceMergeAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/forcemerge/TransportForceMergeAction.java#L126

Added line #L126 was not covered by tests
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,16 @@ public ShardsIterator allShardsSatisfyingPredicate(Predicate<ShardRouting> predi
return allShardsSatisfyingPredicate(indices, predicate, false);
}

/**
* All the shards for the provided indices on the node which match the predicate
* @param indices indices to return all the shards.
* @param predicate condition to match
* @return iterator over shards matching the predicate for the specific indices
*/
public ShardsIterator allShardsSatisfyingPredicate(String[] indices, Predicate<ShardRouting> predicate) {
return allShardsSatisfyingPredicate(indices, predicate, false);
}

private ShardsIterator allShardsSatisfyingPredicate(
String[] indices,
Predicate<ShardRouting> predicate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
mergeRequest.maxNumSegments(request.paramAsInt("max_num_segments", mergeRequest.maxNumSegments()));
mergeRequest.onlyExpungeDeletes(request.paramAsBoolean("only_expunge_deletes", mergeRequest.onlyExpungeDeletes()));
mergeRequest.flush(request.paramAsBoolean("flush", mergeRequest.flush()));
mergeRequest.primaryOnly(request.paramAsBoolean("primary_only", mergeRequest.primaryOnly()));
if (mergeRequest.onlyExpungeDeletes() && mergeRequest.maxNumSegments() != ForceMergeRequest.Defaults.MAX_NUM_SEGMENTS) {
deprecationLogger.deprecate(
"force_merge_expunge_deletes_and_max_num_segments_deprecation",
Expand Down
Loading
Loading