Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add recovery chunk size setting #14161

Merged
merged 2 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add dynamic action retry timeout setting ([#14022](https://github.com/opensearch-project/OpenSearch/issues/14022))
- Add capability to disable source recovery_source for an index ([#13590](https://github.com/opensearch-project/OpenSearch/pull/13590))
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))
- Add recovery chunk size setting ([#13997](https://github.com/opensearch-project/OpenSearch/pull/13997))
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))
- Move Remote Store Migration from DocRep to GA and modify remote migration settings name ([#14100](https://github.com/opensearch-project/OpenSearch/pull/14100))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@
import org.opensearch.indices.recovery.RecoveryState.Stage;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.node.NodeClosedException;
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
import org.opensearch.plugins.AnalysisPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.PluginsService;
Expand Down Expand Up @@ -156,7 +155,7 @@
import static java.util.stream.Collectors.toList;
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
import static org.opensearch.action.DocWriteResponse.Result.UPDATED;
import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.opensearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -187,7 +186,6 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(
MockTransportService.TestPlugin.class,
MockFSIndexStore.TestPlugin.class,
RecoverySettingsChunkSizePlugin.class,
TestAnalysisPlugin.class,
InternalSettingsPlugin.class,
MockEngineFactoryPlugin.class
Expand Down Expand Up @@ -263,7 +261,7 @@ private void slowDownRecovery(ByteSizeValue shardSize) {
// one chunk per sec..
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), chunkSize, ByteSizeUnit.BYTES)
// small chunks
.put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES))
.put(INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES))
)
.get()
.isAcknowledged()
Expand All @@ -278,7 +276,10 @@ private void restoreRecoverySpeed() {
.setTransientSettings(
Settings.builder()
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "20mb")
.put(CHUNK_SIZE_SETTING.getKey(), RecoverySettings.DEFAULT_CHUNK_SIZE)
.put(
INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(),
RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getDefault(Settings.EMPTY)
)
)
.get()
.isAcknowledged()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
Expand All @@ -61,7 +60,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.opensearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand All @@ -81,7 +80,7 @@ public static Collection<Object[]> parameters() {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, RecoverySettingsChunkSizePlugin.class);
return Arrays.asList(MockTransportService.TestPlugin.class);
}

/**
Expand All @@ -96,7 +95,8 @@ public void testCancelRecoveryAndResume() throws Exception {
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES))
Settings.builder()
.put(INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES))
)
.get()
.isAcknowledged()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING,
RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT,
RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,15 @@ public class RecoverySettings {
// choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1.
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES);

public static final Setting<ByteSizeValue> INDICES_RECOVERY_CHUNK_SIZE_SETTING = Setting.byteSizeSetting(
"indices.recovery.chunk_size",
DEFAULT_CHUNK_SIZE,
new ByteSizeValue(1, ByteSizeUnit.BYTES),
new ByteSizeValue(100, ByteSizeUnit.MB),
Property.Dynamic,
Property.NodeScope
);

private volatile ByteSizeValue recoveryMaxBytesPerSec;
private volatile ByteSizeValue replicationMaxBytesPerSec;
private volatile int maxConcurrentFileChunks;
Expand All @@ -193,7 +202,7 @@ public class RecoverySettings {
private volatile TimeValue internalActionRetryTimeout;
private volatile TimeValue internalActionLongTimeout;

private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;
private volatile ByteSizeValue chunkSize;
private volatile TimeValue internalRemoteUploadTimeout;

public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -221,6 +230,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {

logger.debug("using recovery max_bytes_per_sec[{}]", recoveryMaxBytesPerSec);
this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings);
this.chunkSize = INDICES_RECOVERY_CHUNK_SIZE_SETTING.get(settings);

clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec);
Expand All @@ -239,11 +249,11 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setActivityTimeout);
clusterSettings.addSettingsUpdateConsumer(INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT, this::setInternalRemoteUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_CHUNK_SIZE_SETTING, this::setChunkSize);
clusterSettings.addSettingsUpdateConsumer(
INDICES_RECOVERY_INTERNAL_ACTION_RETRY_TIMEOUT_SETTING,
this::setInternalActionRetryTimeout
);

}

/**
Expand Down Expand Up @@ -296,10 +306,7 @@ public ByteSizeValue getChunkSize() {
return chunkSize;
}

public void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests
if (chunkSize.bytesAsInt() <= 0) {
throw new IllegalArgumentException("chunkSize must be > 0");
}
public void setChunkSize(ByteSizeValue chunkSize) {
this.chunkSize = chunkSize;
}

Expand Down
5 changes: 0 additions & 5 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1335,7 +1335,6 @@ protected Node(
b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
{
processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
b.bind(PeerRecoverySourceService.class)
.toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings));
b.bind(PeerRecoveryTargetService.class)
Expand Down Expand Up @@ -1443,10 +1442,6 @@ protected TransportService newTransportService(
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, tracer);
}

protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
// Noop in production, overridden by tests
}

/**
* The settings that are used by this node. Contains original settings as well as additional settings provided by plugins.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ public void testInternalLongActionTimeout() {
assertEquals(new TimeValue(duration, timeUnit), recoverySettings.internalActionLongTimeout());
}

public void testChunkSize() {
ByteSizeValue chunkSize = new ByteSizeValue(between(1, 1000), ByteSizeUnit.BYTES);
clusterSettings.applySettings(
Settings.builder().put(RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), chunkSize).build()
);
assertEquals(chunkSize, recoverySettings.getChunkSize());
}

public void testInternalActionRetryTimeout() {
long duration = between(1, 1000);
TimeUnit timeUnit = randomFrom(TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ public final void recoverUnstartedReplica(
startingSeqNo
);
long fileChunkSizeInBytes = randomBoolean()
? RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes()
? RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes()
: randomIntBetween(1, 10 * 1024 * 1024);
final Settings settings = Settings.builder()
.put("indices.recovery.max_concurrent_file_chunks", Integer.toString(between(1, 4)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.opensearch.env.Environment;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.plugins.Plugin;
import org.opensearch.script.MockScriptService;
import org.opensearch.script.ScriptContext;
Expand Down Expand Up @@ -236,13 +235,6 @@ protected TransportService newTransportService(
}
}

@Override
protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
if (false == getPluginsService().filterPlugins(RecoverySettingsChunkSizePlugin.class).isEmpty()) {
clusterSettings.addSettingsUpdateConsumer(RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING, recoverySettings::setChunkSize);
}
}

@Override
protected ClusterInfoService newClusterInfoService(
Settings settings,
Expand Down

This file was deleted.

Loading