Skip to content

Commit

Permalink
Register settings and add unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Apr 18, 2023
1 parent 8002b76 commit 807ccef
Show file tree
Hide file tree
Showing 2 changed files with 280 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexingPressure;
import org.opensearch.index.RemoteRefreshSegmentPressureSettings;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.ShardIndexingPressureMemoryManager;
import org.opensearch.index.ShardIndexingPressureSettings;
Expand Down Expand Up @@ -635,7 +636,17 @@ public void apply(Settings value, Settings current, Settings previous) {
SegmentReplicationPressureService.MAX_ALLOWED_STALE_SHARDS,

// Settings related to Searchable Snapshots
Node.NODE_SEARCH_CACHE_SIZE_SETTING
Node.NODE_SEARCH_CACHE_SIZE_SETTING,

// Settings related to Remote Refresh Segment Pressure
RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED,
RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT,
RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_THRESHOLD,
RemoteRefreshSegmentPressureSettings.TIME_LAG_VARIANCE_THRESHOLD,
RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT,
RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE,
RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE,
RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE
)
)
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;

import java.util.concurrent.atomic.AtomicInteger;

import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;

public class RemoteRefreshSegmentPressureSettingsTest extends OpenSearchTestCase {

private ClusterService clusterService;

private ThreadPool threadPool;

@Override
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool("remote_refresh_segment_pressure_settings_test");
clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool
);
}

@Override
public void tearDown() throws Exception {
super.tearDown();
threadPool.shutdownNow();
}

public void testGetDefaultSettings() {
RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings(
clusterService,
Settings.EMPTY,
mock(RemoteRefreshSegmentPressureService.class)
);

// Check remote refresh segment pressure enabled is false
assertFalse(pressureSettings.isRemoteRefreshSegmentPressureEnabled());

// Check min sequence number lag limit default value
assertEquals(5L, pressureSettings.getMinSeqNoLagLimit());

// Check bytes lag variance threshold default value
assertEquals(2.0, pressureSettings.getBytesLagVarianceThreshold(), 0.0d);

// Check time lag variance threshold default value
assertEquals(2.0, pressureSettings.getTimeLagVarianceThreshold(), 0.0d);

// Check minimum consecutive failures limit default value
assertEquals(10, pressureSettings.getMinConsecutiveFailuresLimit());

// Check upload bytes moving average window size default value
assertEquals(20, pressureSettings.getUploadBytesMovingAverageWindowSize());

// Check upload bytes per sec moving average window size default value
assertEquals(20, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize());

// Check upload time moving average window size default value
assertEquals(20, pressureSettings.getUploadTimeMovingAverageWindowSize());
}

public void testGetConfiguredSettings() {
Settings settings = Settings.builder()
.put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true)
.put(RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT.getKey(), 100)
.put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_THRESHOLD.getKey(), 50.0)
.put(RemoteRefreshSegmentPressureSettings.TIME_LAG_VARIANCE_THRESHOLD.getKey(), 60.0)
.put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121)
.put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102)
.put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103)
.put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104)
.build();
RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings(
clusterService,
settings,
mock(RemoteRefreshSegmentPressureService.class)
);

// Check remote refresh segment pressure enabled is true
assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled());

// Check min sequence number lag limit configured value
assertEquals(100L, pressureSettings.getMinSeqNoLagLimit());

// Check bytes lag variance threshold configured value
assertEquals(50.0, pressureSettings.getBytesLagVarianceThreshold(), 0.0d);

// Check time lag variance threshold configured value
assertEquals(60.0, pressureSettings.getTimeLagVarianceThreshold(), 0.0d);

// Check minimum consecutive failures limit configured value
assertEquals(121, pressureSettings.getMinConsecutiveFailuresLimit());

// Check upload bytes moving average window size configured value
assertEquals(102, pressureSettings.getUploadBytesMovingAverageWindowSize());

// Check upload bytes per sec moving average window size configured value
assertEquals(103, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize());

// Check upload time moving average window size configured value
assertEquals(104, pressureSettings.getUploadTimeMovingAverageWindowSize());
}

public void testUpdateAfterGetDefaultSettings() {
RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings(
clusterService,
Settings.EMPTY,
mock(RemoteRefreshSegmentPressureService.class)
);

Settings newSettings = Settings.builder()
.put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true)
.put(RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT.getKey(), 100)
.put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_THRESHOLD.getKey(), 50.0)
.put(RemoteRefreshSegmentPressureSettings.TIME_LAG_VARIANCE_THRESHOLD.getKey(), 60.0)
.put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121)
.put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102)
.put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103)
.put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104)
.build();
clusterService.getClusterSettings().applySettings(newSettings);

// Check updated remote refresh segment pressure enabled is false
assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled());

// Check min sequence number lag limit
assertEquals(100L, pressureSettings.getMinSeqNoLagLimit());

// Check bytes lag variance threshold updated
assertEquals(50.0, pressureSettings.getBytesLagVarianceThreshold(), 0.0d);

// Check time lag variance threshold updated
assertEquals(60.0, pressureSettings.getTimeLagVarianceThreshold(), 0.0d);

// Check minimum consecutive failures limit updated
assertEquals(121, pressureSettings.getMinConsecutiveFailuresLimit());

// Check upload bytes moving average window size updated
assertEquals(102, pressureSettings.getUploadBytesMovingAverageWindowSize());

// Check upload bytes per sec moving average window size updated
assertEquals(103, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize());

// Check upload time moving average window size updated
assertEquals(104, pressureSettings.getUploadTimeMovingAverageWindowSize());
}

public void testUpdateAfterGetConfiguredSettings() {
Settings settings = Settings.builder()
.put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true)
.put(RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT.getKey(), 100)
.put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_THRESHOLD.getKey(), 50.0)
.put(RemoteRefreshSegmentPressureSettings.TIME_LAG_VARIANCE_THRESHOLD.getKey(), 60.0)
.put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121)
.put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102)
.put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103)
.put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104)
.build();
RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings(
clusterService,
settings,
mock(RemoteRefreshSegmentPressureService.class)
);

Settings newSettings = Settings.builder()
.put(RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT.getKey(), 80)
.put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_THRESHOLD.getKey(), 40.0)
.put(RemoteRefreshSegmentPressureSettings.TIME_LAG_VARIANCE_THRESHOLD.getKey(), 50.0)
.put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 111)
.put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 112)
.put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 113)
.put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 114)
.build();

clusterService.getClusterSettings().applySettings(newSettings);

// Check updated remote refresh segment pressure enabled is true
assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled());

// Check min sequence number lag limit
assertEquals(80L, pressureSettings.getMinSeqNoLagLimit());

// Check bytes lag variance threshold updated
assertEquals(40.0, pressureSettings.getBytesLagVarianceThreshold(), 0.0d);

// Check time lag variance threshold updated
assertEquals(50.0, pressureSettings.getTimeLagVarianceThreshold(), 0.0d);

// Check minimum consecutive failures limit updated
assertEquals(111, pressureSettings.getMinConsecutiveFailuresLimit());

// Check upload bytes moving average window size updated
assertEquals(112, pressureSettings.getUploadBytesMovingAverageWindowSize());

// Check upload bytes per sec moving average window size updated
assertEquals(113, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize());

// Check upload time moving average window size updated
assertEquals(114, pressureSettings.getUploadTimeMovingAverageWindowSize());
}

public void testUpdateTriggeredInRemotePressureServiceOnUpdateSettings() {

int toUpdateVal1 = 1121, toUpdateVal2 = 1123, toUpdateVal3 = 1125;

AtomicInteger updatedUploadBytesWindowSize = new AtomicInteger();
AtomicInteger updatedUploadBytesPerSecWindowSize = new AtomicInteger();
AtomicInteger updatedUploadTimeWindowSize = new AtomicInteger();


RemoteRefreshSegmentPressureService pressureService = mock(RemoteRefreshSegmentPressureService.class);

// Upload bytes
doAnswer(invocation -> {
updatedUploadBytesWindowSize.set(invocation.getArgument(0));
return null;
}).when(pressureService).updateUploadBytesMovingAverageWindowSize(anyInt());

// Upload bytes per sec
doAnswer(invocation -> {
updatedUploadBytesPerSecWindowSize.set(invocation.getArgument(0));
return null;
}).when(pressureService).updateUploadBytesPerSecMovingAverageWindowSize(anyInt());

// Upload time
doAnswer(invocation -> {
updatedUploadTimeWindowSize.set(invocation.getArgument(0));
return null;
}).when(pressureService).updateUploadTimeMsMovingAverageWindowSize(anyInt());

RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings(
clusterService,
Settings.EMPTY,
pressureService
);
Settings newSettings = Settings.builder()
.put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal1)
.put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal2)
.put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal3)
.build();
clusterService.getClusterSettings().applySettings(newSettings);

// Assertions
assertEquals(toUpdateVal1, pressureSettings.getUploadBytesMovingAverageWindowSize());
assertEquals(toUpdateVal1, updatedUploadBytesWindowSize.get());
assertEquals(toUpdateVal2, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize());
assertEquals(toUpdateVal2, updatedUploadBytesPerSecWindowSize.get());
assertEquals(toUpdateVal3, pressureSettings.getUploadTimeMovingAverageWindowSize());
assertEquals(toUpdateVal3, updatedUploadTimeWindowSize.get());
}
}

0 comments on commit 807ccef

Please sign in to comment.