Skip to content

Commit 18390f9

Browse files
authored
Add remote refresh segment pressure service, settings and tracker (opensearch-project#7227)
Signed-off-by: Ashish Singh <ssashish@amazon.com>
1 parent 44ebf81 commit 18390f9

10 files changed

+1914
-8
lines changed

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.opensearch.index.IndexModule;
4040
import org.opensearch.index.IndexSettings;
4141
import org.opensearch.index.IndexingPressure;
42+
import org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings;
4243
import org.opensearch.index.SegmentReplicationPressureService;
4344
import org.opensearch.index.ShardIndexingPressureMemoryManager;
4445
import org.opensearch.index.ShardIndexingPressureSettings;
@@ -638,7 +639,17 @@ public void apply(Settings value, Settings current, Settings previous) {
638639
SegmentReplicationPressureService.MAX_ALLOWED_STALE_SHARDS,
639640

640641
// Settings related to Searchable Snapshots
641-
Node.NODE_SEARCH_CACHE_SIZE_SETTING
642+
Node.NODE_SEARCH_CACHE_SIZE_SETTING,
643+
644+
// Settings related to Remote Refresh Segment Pressure
645+
RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED,
646+
RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT,
647+
RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR,
648+
RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR,
649+
RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT,
650+
RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE,
651+
RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE,
652+
RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE
642653
)
643654
)
644655
);

server/src/main/java/org/opensearch/common/util/MovingAverage.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,47 @@ public class MovingAverage {
1717
private final int windowSize;
1818
private final long[] observations;
1919

20-
private long count = 0;
21-
private long sum = 0;
22-
private double average = 0;
20+
private volatile long count = 0;
21+
private volatile long sum = 0;
22+
private volatile double average = 0;
2323

2424
public MovingAverage(int windowSize) {
25-
if (windowSize <= 0) {
26-
throw new IllegalArgumentException("window size must be greater than zero");
27-
}
28-
25+
checkWindowSize(windowSize);
2926
this.windowSize = windowSize;
3027
this.observations = new long[windowSize];
3128
}
3229

30+
/**
31+
* Used for changing the window size of {@code MovingAverage}.
32+
*
33+
* @param newWindowSize new window size.
34+
* @return copy of original object with updated size.
35+
*/
36+
public MovingAverage copyWithSize(int newWindowSize) {
37+
MovingAverage copy = new MovingAverage(newWindowSize);
38+
// Start is inclusive, but end is exclusive
39+
long start, end = count;
40+
if (isReady() == false) {
41+
start = 0;
42+
} else {
43+
start = end - windowSize;
44+
}
45+
// If the newWindow Size is smaller than the elements eligible to be copied over, then we adjust the start value
46+
if (end - start > newWindowSize) {
47+
start = end - newWindowSize;
48+
}
49+
for (int i = (int) start; i < end; i++) {
50+
copy.record(observations[i % observations.length]);
51+
}
52+
return copy;
53+
}
54+
55+
private void checkWindowSize(int size) {
56+
if (size <= 0) {
57+
throw new IllegalArgumentException("window size must be greater than zero");
58+
}
59+
}
60+
3361
/**
3462
* Records a new observation and evicts the n-th last observation.
3563
*/

0 commit comments

Comments
 (0)