Skip to content

Commit

Permalink
[Backport 2.x] Add recovery chunk size setting (opensearch-project#14161
Browse files Browse the repository at this point in the history
)

Signed-off-by: Shubh Sahu <shubhvs@amazon.com>
Signed-off-by: kkewwei <kkewwei@163.com>
  • Loading branch information
astute-decipher authored and kkewwei committed Jul 24, 2024
1 parent c04d612 commit 80c8230
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 92 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add dynamic action retry timeout setting ([#14022](https://github.com/opensearch-project/OpenSearch/issues/14022))
- Add capability to disable source recovery_source for an index ([#13590](https://github.com/opensearch-project/OpenSearch/pull/13590))
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))
- Add recovery chunk size setting ([#13997](https://github.com/opensearch-project/OpenSearch/pull/13997))
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))
- Move Remote Store Migration from DocRep to GA and modify remote migration settings name ([#14100](https://github.com/opensearch-project/OpenSearch/pull/14100))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@
import org.opensearch.indices.recovery.RecoveryState.Stage;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.node.NodeClosedException;
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
import org.opensearch.plugins.AnalysisPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.PluginsService;
Expand Down Expand Up @@ -156,7 +155,7 @@
import static java.util.stream.Collectors.toList;
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
import static org.opensearch.action.DocWriteResponse.Result.UPDATED;
import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.opensearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -187,7 +186,6 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(
MockTransportService.TestPlugin.class,
MockFSIndexStore.TestPlugin.class,
RecoverySettingsChunkSizePlugin.class,
TestAnalysisPlugin.class,
InternalSettingsPlugin.class,
MockEngineFactoryPlugin.class
Expand Down Expand Up @@ -263,7 +261,7 @@ private void slowDownRecovery(ByteSizeValue shardSize) {
// one chunk per sec..
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), chunkSize, ByteSizeUnit.BYTES)
// small chunks
.put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES))
.put(INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES))
)
.get()
.isAcknowledged()
Expand All @@ -278,7 +276,10 @@ private void restoreRecoverySpeed() {
.setTransientSettings(
Settings.builder()
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "20mb")
.put(CHUNK_SIZE_SETTING.getKey(), RecoverySettings.DEFAULT_CHUNK_SIZE)
.put(
INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(),
RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getDefault(Settings.EMPTY)
)
)
.get()
.isAcknowledged()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
Expand All @@ -61,7 +60,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.opensearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand All @@ -81,7 +80,7 @@ public static Collection<Object[]> parameters() {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, RecoverySettingsChunkSizePlugin.class);
return Arrays.asList(MockTransportService.TestPlugin.class);
}

/**
Expand All @@ -96,7 +95,8 @@ public void testCancelRecoveryAndResume() throws Exception {
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES))
Settings.builder()
.put(INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES))
)
.get()
.isAcknowledged()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING,
RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT,
RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,15 @@ public class RecoverySettings {
// choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1.
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES);

public static final Setting<ByteSizeValue> INDICES_RECOVERY_CHUNK_SIZE_SETTING = Setting.byteSizeSetting(
"indices.recovery.chunk_size",
DEFAULT_CHUNK_SIZE,
new ByteSizeValue(1, ByteSizeUnit.BYTES),
new ByteSizeValue(100, ByteSizeUnit.MB),
Property.Dynamic,
Property.NodeScope
);

private volatile ByteSizeValue recoveryMaxBytesPerSec;
private volatile ByteSizeValue replicationMaxBytesPerSec;
private volatile int maxConcurrentFileChunks;
Expand All @@ -193,7 +202,7 @@ public class RecoverySettings {
private volatile TimeValue internalActionRetryTimeout;
private volatile TimeValue internalActionLongTimeout;

private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;
private volatile ByteSizeValue chunkSize;
private volatile TimeValue internalRemoteUploadTimeout;

public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -221,6 +230,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {

logger.debug("using recovery max_bytes_per_sec[{}]", recoveryMaxBytesPerSec);
this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings);
this.chunkSize = INDICES_RECOVERY_CHUNK_SIZE_SETTING.get(settings);

clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec);
Expand All @@ -239,11 +249,11 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setActivityTimeout);
clusterSettings.addSettingsUpdateConsumer(INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT, this::setInternalRemoteUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_CHUNK_SIZE_SETTING, this::setChunkSize);
clusterSettings.addSettingsUpdateConsumer(
INDICES_RECOVERY_INTERNAL_ACTION_RETRY_TIMEOUT_SETTING,
this::setInternalActionRetryTimeout
);

}

/**
Expand Down Expand Up @@ -296,10 +306,7 @@ public ByteSizeValue getChunkSize() {
return chunkSize;
}

public void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests
if (chunkSize.bytesAsInt() <= 0) {
throw new IllegalArgumentException("chunkSize must be > 0");
}
public void setChunkSize(ByteSizeValue chunkSize) {
this.chunkSize = chunkSize;
}

Expand Down
5 changes: 0 additions & 5 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1335,7 +1335,6 @@ protected Node(
b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
{
processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
b.bind(PeerRecoverySourceService.class)
.toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings));
b.bind(PeerRecoveryTargetService.class)
Expand Down Expand Up @@ -1443,10 +1442,6 @@ protected TransportService newTransportService(
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, tracer);
}

protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
// Noop in production, overridden by tests
}

/**
* The settings that are used by this node. Contains original settings as well as additional settings provided by plugins.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ public void testInternalLongActionTimeout() {
assertEquals(new TimeValue(duration, timeUnit), recoverySettings.internalActionLongTimeout());
}

public void testChunkSize() {
ByteSizeValue chunkSize = new ByteSizeValue(between(1, 1000), ByteSizeUnit.BYTES);
clusterSettings.applySettings(
Settings.builder().put(RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), chunkSize).build()
);
assertEquals(chunkSize, recoverySettings.getChunkSize());
}

public void testInternalActionRetryTimeout() {
long duration = between(1, 1000);
TimeUnit timeUnit = randomFrom(TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ public final void recoverUnstartedReplica(
startingSeqNo
);
long fileChunkSizeInBytes = randomBoolean()
? RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes()
? RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes()
: randomIntBetween(1, 10 * 1024 * 1024);
final Settings settings = Settings.builder()
.put("indices.recovery.max_concurrent_file_chunks", Integer.toString(between(1, 4)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.opensearch.env.Environment;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.plugins.Plugin;
import org.opensearch.script.MockScriptService;
import org.opensearch.script.ScriptContext;
Expand Down Expand Up @@ -236,13 +235,6 @@ protected TransportService newTransportService(
}
}

@Override
protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
if (false == getPluginsService().filterPlugins(RecoverySettingsChunkSizePlugin.class).isEmpty()) {
clusterSettings.addSettingsUpdateConsumer(RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING, recoverySettings::setChunkSize);
}
}

@Override
protected ClusterInfoService newClusterInfoService(
Settings settings,
Expand Down

This file was deleted.

0 comments on commit 80c8230

Please sign in to comment.