Skip to content

Commit cb54acc

Browse files
author
Harsh Kothari
committed
Addition of fileCache activeUsage evaluator to DiskThresholdMonitor
Signed-off-by: Harsh Kothari <techarsh@amazon.com>
1 parent 292407b commit cb54acc

File tree

9 files changed

+635
-83
lines changed

9 files changed

+635
-83
lines changed

server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java

Lines changed: 138 additions & 60 deletions
Large diffs are not rendered by default.
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.routing.allocation;
10+
11+
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;
12+
13+
/**
14+
* Evaluates file cache active usage thresholds for warm nodes in the cluster.
15+
* This class provides methods to check if nodes are exceeding various file cache watermark levels
16+
*/
17+
public class FileCacheEvaluator implements FileCacheThresholdEvaluator {
18+
19+
private final FileCacheThresholdSettings fileCacheThresholdSettings;
20+
21+
public FileCacheEvaluator(FileCacheThresholdSettings fileCacheThresholdSettings) {
22+
this.fileCacheThresholdSettings = fileCacheThresholdSettings;
23+
}
24+
25+
@Override
26+
public boolean isNodeExceedingHighWatermark(AggregateFileCacheStats aggregateFileCacheStats) {
27+
return aggregateFileCacheStats.getActivePercentFromTotalCapacity() >= fileCacheThresholdSettings.getFreeFileCacheThresholdHigh();
28+
}
29+
30+
@Override
31+
public boolean isNodeExceedingFloodStageWatermark(AggregateFileCacheStats aggregateFileCacheStats) {
32+
return aggregateFileCacheStats.getActivePercentFromTotalCapacity() >= fileCacheThresholdSettings
33+
.getFreeFileCacheThresholdFloodStage();
34+
}
35+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.routing.allocation;
10+
11+
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;
12+
13+
/**
14+
* Base interface for file cache threshold evaluation logic.
15+
* This interface defines methods for evaluating whether a node exceeds
16+
* various watermarks based on file cache active usage.
17+
*
18+
* @opensearch.internal
19+
*/
20+
public interface FileCacheThresholdEvaluator {
21+
22+
/**
23+
* Checks if a node is exceeding the high watermark threshold
24+
*
25+
* @param aggregateFileCacheStats disk usage for the node
26+
* @return true if the node is exceeding the high watermark, false otherwise
27+
*/
28+
boolean isNodeExceedingHighWatermark(AggregateFileCacheStats aggregateFileCacheStats);
29+
30+
/**
31+
* Checks if a node is exceeding the flood stage watermark threshold
32+
*
33+
* @param aggregateFileCacheStats disk usage for the node
34+
* @return true if the node is exceeding the flood stage watermark, false otherwise
35+
*/
36+
boolean isNodeExceedingFloodStageWatermark(AggregateFileCacheStats aggregateFileCacheStats);
37+
38+
}
Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,304 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/*
10+
* Licensed to Elasticsearch under one or more contributor
11+
* license agreements. See the NOTICE file distributed with
12+
* this work for additional information regarding copyright
13+
* ownership. Elasticsearch licenses this file to you under
14+
* the Apache License, Version 2.0 (the "License"); you may
15+
* not use this file except in compliance with the License.
16+
* You may obtain a copy of the License at
17+
*
18+
* http://www.apache.org/licenses/LICENSE-2.0
19+
*
20+
* Unless required by applicable law or agreed to in writing,
21+
* software distributed under the License is distributed on an
22+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
23+
* KIND, either express or implied. See the License for the
24+
* specific language governing permissions and limitations
25+
* under the License.
26+
*/
27+
28+
/*
29+
* Modifications Copyright OpenSearch Contributors. See
30+
* GitHub history for details.
31+
*/
32+
33+
package org.opensearch.cluster.routing.allocation;
34+
35+
import org.opensearch.OpenSearchParseException;
36+
import org.opensearch.common.settings.ClusterSettings;
37+
import org.opensearch.common.settings.Setting;
38+
import org.opensearch.common.settings.Settings;
39+
import org.opensearch.common.unit.RatioValue;
40+
import org.opensearch.core.common.Strings;
41+
import org.opensearch.core.common.unit.ByteSizeValue;
42+
43+
import java.util.Arrays;
44+
import java.util.Iterator;
45+
import java.util.List;
46+
import java.util.Locale;
47+
import java.util.Map;
48+
49+
/**
50+
* A container to keep settings for file cache thresholds up to date with cluster setting changes.
51+
*
52+
* @opensearch.internal
53+
*/
54+
public class FileCacheThresholdSettings {
55+
56+
public static final Setting<String> CLUSTER_ROUTING_ALLOCATION_HIGH_FILECACHE_WATERMARK_SETTING = new Setting<>(
57+
"cluster.routing.allocation.filecache.watermark.high",
58+
"90%",
59+
(s) -> validWatermarkSetting(s, "cluster.routing.allocation.filecache.watermark.high"),
60+
new HighDiskWatermarkValidator(),
61+
Setting.Property.Dynamic,
62+
Setting.Property.NodeScope
63+
);
64+
public static final Setting<String> CLUSTER_ROUTING_ALLOCATION_FILECACHE_FLOOD_STAGE_WATERMARK_SETTING = new Setting<>(
65+
"cluster.routing.allocation.filecache.watermark.flood_stage",
66+
"100%",
67+
(s) -> validWatermarkSetting(s, "cluster.routing.allocation.filecache.watermark.flood_stage"),
68+
new FloodStageValidator(),
69+
Setting.Property.Dynamic,
70+
Setting.Property.NodeScope
71+
);
72+
73+
private volatile Double freeFileCacheThresholdHigh;
74+
private volatile ByteSizeValue freeBytesThresholdHigh;
75+
private volatile Double freeFileCacheThresholdFloodStage;
76+
private volatile ByteSizeValue freeBytesThresholdFloodStage;
77+
78+
public FileCacheThresholdSettings(Settings settings, ClusterSettings clusterSettings) {
79+
final String highWatermark = CLUSTER_ROUTING_ALLOCATION_HIGH_FILECACHE_WATERMARK_SETTING.get(settings);
80+
final String floodStage = CLUSTER_ROUTING_ALLOCATION_FILECACHE_FLOOD_STAGE_WATERMARK_SETTING.get(settings);
81+
setHighWatermark(highWatermark);
82+
setFloodStage(floodStage);
83+
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_HIGH_FILECACHE_WATERMARK_SETTING, this::setHighWatermark);
84+
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_FILECACHE_FLOOD_STAGE_WATERMARK_SETTING, this::setFloodStage);
85+
}
86+
87+
/**
88+
* Validates a high file cache watermark.
89+
*
90+
* @opensearch.internal
91+
*/
92+
static final class HighDiskWatermarkValidator implements Setting.Validator<String> {
93+
@Override
94+
public void validate(final String value) {}
95+
96+
@Override
97+
public void validate(final String value, final Map<Setting<?>, Object> settings) {
98+
final String floodStageRaw = (String) settings.get(CLUSTER_ROUTING_ALLOCATION_FILECACHE_FLOOD_STAGE_WATERMARK_SETTING);
99+
doValidate(value, floodStageRaw);
100+
}
101+
102+
@Override
103+
public Iterator<Setting<?>> settings() {
104+
final List<Setting<?>> settings = Arrays.asList(CLUSTER_ROUTING_ALLOCATION_FILECACHE_FLOOD_STAGE_WATERMARK_SETTING);
105+
return settings.iterator();
106+
}
107+
}
108+
109+
/**
110+
* Validates the flood stage.
111+
*
112+
* @opensearch.internal
113+
*/
114+
static final class FloodStageValidator implements Setting.Validator<String> {
115+
116+
@Override
117+
public void validate(final String value) {}
118+
119+
@Override
120+
public void validate(final String value, final Map<Setting<?>, Object> settings) {
121+
final String highWatermarkRaw = (String) settings.get(CLUSTER_ROUTING_ALLOCATION_HIGH_FILECACHE_WATERMARK_SETTING);
122+
doValidate(highWatermarkRaw, value);
123+
}
124+
125+
@Override
126+
public Iterator<Setting<?>> settings() {
127+
final List<Setting<?>> settings = Arrays.asList(CLUSTER_ROUTING_ALLOCATION_HIGH_FILECACHE_WATERMARK_SETTING);
128+
return settings.iterator();
129+
}
130+
}
131+
132+
private static void doValidate(String high, String flood) {
133+
try {
134+
doValidateAsPercentage(high, flood);
135+
return; // early return so that we do not try to parse as bytes
136+
} catch (final OpenSearchParseException e) {
137+
// swallow as we are now going to try to parse as bytes
138+
}
139+
try {
140+
doValidateAsBytes(high, flood);
141+
} catch (final OpenSearchParseException e) {
142+
final String message = String.format(
143+
Locale.ROOT,
144+
"unable to consistently parse [%s=%s], [%s=%s], and [%s=%s] as percentage or bytes",
145+
CLUSTER_ROUTING_ALLOCATION_HIGH_FILECACHE_WATERMARK_SETTING.getKey(),
146+
high,
147+
CLUSTER_ROUTING_ALLOCATION_FILECACHE_FLOOD_STAGE_WATERMARK_SETTING.getKey(),
148+
flood
149+
);
150+
throw new IllegalArgumentException(message, e);
151+
}
152+
}
153+
154+
private static void doValidateAsPercentage(final String high, final String flood) {
155+
final double highWatermarkThreshold = thresholdPercentageFromWatermark(high, false);
156+
final double floodThreshold = thresholdPercentageFromWatermark(flood, false);
157+
if (highWatermarkThreshold > floodThreshold) {
158+
throw new IllegalArgumentException(
159+
"high file cache watermark [" + high + "] more than flood stage file cache watermark [" + flood + "]"
160+
);
161+
}
162+
}
163+
164+
private static void doValidateAsBytes(final String high, final String flood) {
165+
final ByteSizeValue highWatermarkBytes = thresholdBytesFromWatermark(
166+
high,
167+
CLUSTER_ROUTING_ALLOCATION_HIGH_FILECACHE_WATERMARK_SETTING.getKey(),
168+
false
169+
);
170+
final ByteSizeValue floodStageBytes = thresholdBytesFromWatermark(
171+
flood,
172+
CLUSTER_ROUTING_ALLOCATION_FILECACHE_FLOOD_STAGE_WATERMARK_SETTING.getKey(),
173+
false
174+
);
175+
if (highWatermarkBytes.getBytes() < floodStageBytes.getBytes()) {
176+
throw new IllegalArgumentException(
177+
"high file cache watermark [" + high + "] less than flood stage file cache watermark [" + flood + "]"
178+
);
179+
}
180+
}
181+
182+
private void setHighWatermark(String highWatermark) {
183+
// Watermark is expressed in terms of used data, but we need "free" data watermark
184+
this.freeFileCacheThresholdHigh = 100.0 - thresholdPercentageFromWatermark(highWatermark);
185+
this.freeBytesThresholdHigh = thresholdBytesFromWatermark(
186+
highWatermark,
187+
CLUSTER_ROUTING_ALLOCATION_HIGH_FILECACHE_WATERMARK_SETTING.getKey()
188+
);
189+
}
190+
191+
private void setFloodStage(String floodStageRaw) {
192+
// Watermark is expressed in terms of used data, but we need "free" data watermark
193+
this.freeFileCacheThresholdFloodStage = 100.0 - thresholdPercentageFromWatermark(floodStageRaw);
194+
this.freeBytesThresholdFloodStage = thresholdBytesFromWatermark(
195+
floodStageRaw,
196+
CLUSTER_ROUTING_ALLOCATION_FILECACHE_FLOOD_STAGE_WATERMARK_SETTING.getKey()
197+
);
198+
}
199+
200+
public Double getFreeFileCacheThresholdHigh() {
201+
return freeFileCacheThresholdHigh;
202+
}
203+
204+
public ByteSizeValue getFreeBytesThresholdHigh() {
205+
return freeBytesThresholdHigh;
206+
}
207+
208+
public Double getFreeFileCacheThresholdFloodStage() {
209+
return freeFileCacheThresholdFloodStage;
210+
}
211+
212+
public ByteSizeValue getFreeBytesThresholdFloodStage() {
213+
return freeBytesThresholdFloodStage;
214+
}
215+
216+
String describeHighThreshold() {
217+
return freeBytesThresholdHigh.equals(ByteSizeValue.ZERO)
218+
? Strings.format1Decimals(100.0 - freeFileCacheThresholdHigh, "%")
219+
: freeBytesThresholdHigh.toString();
220+
}
221+
222+
String describeFloodStageThreshold() {
223+
return freeBytesThresholdFloodStage.equals(ByteSizeValue.ZERO)
224+
? Strings.format1Decimals(100.0 - freeFileCacheThresholdFloodStage, "%")
225+
: freeBytesThresholdFloodStage.toString();
226+
}
227+
228+
/**
229+
* Attempts to parse the watermark into a percentage, returning 100.0% if
230+
* it cannot be parsed.
231+
*/
232+
private static double thresholdPercentageFromWatermark(String watermark) {
233+
return thresholdPercentageFromWatermark(watermark, true);
234+
}
235+
236+
/**
237+
* Attempts to parse the watermark into a percentage, returning 100.0% if it can not be parsed and the specified lenient parameter is
238+
* true, otherwise throwing an {@link OpenSearchParseException}.
239+
*
240+
* @param watermark the watermark to parse as a percentage
241+
* @param lenient true if lenient parsing should be applied
242+
* @return the parsed percentage
243+
*/
244+
private static double thresholdPercentageFromWatermark(String watermark, boolean lenient) {
245+
try {
246+
return RatioValue.parseRatioValue(watermark).getAsPercent();
247+
} catch (OpenSearchParseException ex) {
248+
// NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two
249+
// cases separately
250+
if (lenient) {
251+
return 100.0;
252+
}
253+
throw ex;
254+
}
255+
}
256+
257+
/**
258+
* Attempts to parse the watermark into a {@link ByteSizeValue}, returning
259+
* a ByteSizeValue of 0 bytes if the value cannot be parsed.
260+
*/
261+
private static ByteSizeValue thresholdBytesFromWatermark(String watermark, String settingName) {
262+
return thresholdBytesFromWatermark(watermark, settingName, true);
263+
}
264+
265+
/**
266+
* Attempts to parse the watermark into a {@link ByteSizeValue}, returning zero bytes if it can not be parsed and the specified lenient
267+
* parameter is true, otherwise throwing an {@link OpenSearchParseException}.
268+
*
269+
* @param watermark the watermark to parse as a byte size
270+
* @param settingName the name of the setting
271+
* @param lenient true if lenient parsing should be applied
272+
* @return the parsed byte size value
273+
*/
274+
private static ByteSizeValue thresholdBytesFromWatermark(String watermark, String settingName, boolean lenient) {
275+
try {
276+
return ByteSizeValue.parseBytesSizeValue(watermark, settingName);
277+
} catch (OpenSearchParseException ex) {
278+
// NOTE: this is not end-user leniency, since up above we check that it's a valid byte or percentage, and then store the two
279+
// cases separately
280+
if (lenient) {
281+
return ByteSizeValue.parseBytesSizeValue("0b", settingName);
282+
}
283+
throw ex;
284+
}
285+
}
286+
287+
/**
288+
* Checks if a watermark string is a valid percentage or byte size value,
289+
* @return the watermark value given
290+
*/
291+
private static String validWatermarkSetting(String watermark, String settingName) {
292+
try {
293+
RatioValue.parseRatioValue(watermark);
294+
} catch (OpenSearchParseException e) {
295+
try {
296+
ByteSizeValue.parseBytesSizeValue(watermark, settingName);
297+
} catch (OpenSearchParseException ex) {
298+
ex.addSuppressed(e);
299+
throw ex;
300+
}
301+
}
302+
return watermark;
303+
}
304+
}

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance;
6666
import org.opensearch.cluster.routing.allocation.DiskThresholdSettings;
6767
import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator;
68+
import org.opensearch.cluster.routing.allocation.FileCacheThresholdSettings;
6869
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
6970
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
7071
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
@@ -343,6 +344,8 @@ public void apply(Settings value, Settings current, Settings previous) {
343344
DiskThresholdSettings.CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE,
344345
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING,
345346
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING,
347+
FileCacheThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_FILECACHE_WATERMARK_SETTING,
348+
FileCacheThresholdSettings.CLUSTER_ROUTING_ALLOCATION_FILECACHE_FLOOD_STAGE_WATERMARK_SETTING,
346349
SameShardAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING,
347350
ShardStateAction.FOLLOW_UP_REROUTE_PRIORITY_SETTING,
348351
InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING,

0 commit comments

Comments
 (0)