Skip to content

Commit

Permalink
Making Recovery Chunk Size setting dynamic
Browse files Browse the repository at this point in the history
Signed-off-by: Shubh Sahu <shubhvs@amazon.com>
  • Loading branch information
Shubh Sahu committed Jun 5, 2024
1 parent 581fcd2 commit 3013bac
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ private void restoreRecoverySpeed() {
.setTransientSettings(
Settings.builder()
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "20mb")
.put(CHUNK_SIZE_SETTING.getKey(), RecoverySettings.DEFAULT_CHUNK_SIZE)
.put(CHUNK_SIZE_SETTING.getKey(), RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getDefault(Settings.EMPTY))
)
.get()
.isAcknowledged()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING,
RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT,
RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,14 @@ public class RecoverySettings {
);

// choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1.
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES);
public static final Setting<ByteSizeValue> INDICES_RECOVERY_CHUNK_SIZE_SETTING = Setting.byteSizeSetting(
"indices.recovery.chunk_size",
new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES),
new ByteSizeValue(0, ByteSizeUnit.BYTES),
new ByteSizeValue(100 * 1024 * 1024, ByteSizeUnit.BYTES),
Property.Dynamic,
Property.NodeScope
);

private volatile ByteSizeValue recoveryMaxBytesPerSec;
private volatile ByteSizeValue replicationMaxBytesPerSec;
Expand All @@ -193,7 +200,7 @@ public class RecoverySettings {
private volatile TimeValue internalActionRetryTimeout;
private volatile TimeValue internalActionLongTimeout;

private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;
private volatile ByteSizeValue chunkSize;
private volatile TimeValue internalRemoteUploadTimeout;

public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -221,6 +228,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {

logger.debug("using recovery max_bytes_per_sec[{}]", recoveryMaxBytesPerSec);
this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings);
this.chunkSize = INDICES_RECOVERY_CHUNK_SIZE_SETTING.get(settings);

clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec);
Expand All @@ -239,7 +247,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setActivityTimeout);
clusterSettings.addSettingsUpdateConsumer(INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT, this::setInternalRemoteUploadTimeout);

clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_CHUNK_SIZE_SETTING, this::setChunkSize);
}

public RateLimiter recoveryRateLimiter() {
Expand Down Expand Up @@ -282,10 +290,7 @@ public ByteSizeValue getChunkSize() {
return chunkSize;
}

public void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests
if (chunkSize.bytesAsInt() <= 0) {
throw new IllegalArgumentException("chunkSize must be > 0");
}
public void setChunkSize(ByteSizeValue chunkSize) {
this.chunkSize = chunkSize;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,13 @@ public void testInternalLongActionTimeout() {
);
assertEquals(new TimeValue(duration, timeUnit), recoverySettings.internalActionLongTimeout());
}

public void testChunkSize() {
ByteSizeValue chunkSize = new ByteSizeValue(between(1, 1000), ByteSizeUnit.BYTES);
clusterSettings.applySettings(
Settings.builder().put(RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), chunkSize).build()
);
assertEquals(chunkSize, recoverySettings.getChunkSize());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ public final void recoverUnstartedReplica(
startingSeqNo
);
long fileChunkSizeInBytes = randomBoolean()
? RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes()
? RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes()
: randomIntBetween(1, 10 * 1024 * 1024);
final Settings settings = Settings.builder()
.put("indices.recovery.max_concurrent_file_chunks", Integer.toString(between(1, 4)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class RecoverySettingsChunkSizePlugin extends Plugin {
*/
public static final Setting<ByteSizeValue> CHUNK_SIZE_SETTING = Setting.byteSizeSetting(
"indices.recovery.chunk_size",
RecoverySettings.DEFAULT_CHUNK_SIZE,
RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING,
Property.Dynamic,
Property.NodeScope
);
Expand Down

0 comments on commit 3013bac

Please sign in to comment.