Skip to content

Commit c4d9de6

Browse files
authored
FS stats for warm nodes based on addressable space (#18767)
Signed-off-by: Gagan Singh Saini <gagasa@amazon.com>
1 parent 592f4c8 commit c4d9de6

File tree

8 files changed

+677
-9
lines changed

8 files changed

+677
-9
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
## [Unreleased 3.x]
77
### Added
88
- Add support for Warm Indices Write Block on Flood Watermark breach ([#18375](https://github.com/opensearch-project/OpenSearch/pull/18375))
9+
- FS stats for warm nodes based on addressable space ([#18767](https://github.com/opensearch-project/OpenSearch/pull/18767))
910
- Add support for custom index name resolver from cluster plugin ([#18593](https://github.com/opensearch-project/OpenSearch/pull/18593))
1011
- Rename WorkloadGroupTestUtil to WorkloadManagementTestUtil ([#18709](https://github.com/opensearch-project/OpenSearch/pull/18709))
1112
- Disallow resize for Warm Index, add Parameterized ITs for close in remote store ([#18686](https://github.com/opensearch-project/OpenSearch/pull/18686))
@@ -38,6 +39,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3839
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))
3940
- Update SecureAuxTransportSettingsProvider to distinguish between aux transport types ([#18616](https://github.com/opensearch-project/OpenSearch/pull/18616))
4041
- Make node duress values cacheable ([#18649](https://github.com/opensearch-project/OpenSearch/pull/18649))
42+
- Change default value of remote_data_ratio, which is used in Searchable Snapshots and Writeable Warm from 0 to 5 and min allowed value to 1 ([#18767](https://github.com/opensearch-project/OpenSearch/pull/18767))
4143
- Making multi rate limiters in repository dynamic [#18069](https://github.com/opensearch-project/OpenSearch/pull/18069)
4244

4345
### Dependencies

server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIT.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,12 @@
4545
import org.opensearch.common.Priority;
4646
import org.opensearch.common.settings.Settings;
4747
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
48+
import org.opensearch.core.common.unit.ByteSizeUnit;
4849
import org.opensearch.core.common.unit.ByteSizeValue;
4950
import org.opensearch.gateway.GatewayService;
51+
import org.opensearch.monitor.fs.FsInfo;
5052
import org.opensearch.monitor.os.OsStats;
53+
import org.opensearch.node.Node;
5154
import org.opensearch.node.NodeRoleSettings;
5255
import org.opensearch.test.OpenSearchIntegTestCase;
5356
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
@@ -74,6 +77,20 @@
7477
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
7578
public class ClusterStatsIT extends OpenSearchIntegTestCase {
7679

80+
@Override
81+
protected boolean addMockIndexStorePlugin() {
82+
return false;
83+
}
84+
85+
@Override
86+
protected Settings nodeSettings(int nodeOrdinal) {
87+
ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB);
88+
return Settings.builder()
89+
.put(super.nodeSettings(nodeOrdinal))
90+
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString())
91+
.build();
92+
}
93+
7794
private void assertCounts(ClusterStatsNodes.Counts counts, int total, Map<String, Integer> roles) {
7895
assertThat(counts.getTotal(), equalTo(total));
7996
assertThat(counts.getRoles(), equalTo(roles));
@@ -831,6 +848,35 @@ public void testClusterStatsWithSelectiveMetricsFilterAndNoIndex() {
831848
assertEquals(0, statsResponseWithAllIndicesMetrics.getIndicesStats().getSegments().getVersionMapMemoryInBytes());
832849
}
833850

851+
public void testWarmNodeFSStats() {
852+
internalCluster().startClusterManagerOnlyNode();
853+
internalCluster().startWarmOnlyNodes(1);
854+
ensureGreen();
855+
856+
ClusterStatsResponse statsResponseWarmFSMetrics = client().admin()
857+
.cluster()
858+
.prepareClusterStats()
859+
.useAggregatedNodeLevelResponses(randomBoolean())
860+
.requestMetrics(Set.of(Metric.FS))
861+
.computeAllMetrics(false)
862+
.get();
863+
assertNotNull(statsResponseWarmFSMetrics);
864+
assertNotNull(statsResponseWarmFSMetrics.getNodesStats());
865+
validateNodeStatsOutput(Set.of(Metric.FS), statsResponseWarmFSMetrics);
866+
FsInfo warmFsInfo = statsResponseWarmFSMetrics.getNodes()
867+
.stream()
868+
.filter(nodeResponse -> nodeResponse.getNode().isWarmNode())
869+
.findFirst()
870+
.map(nodeResponse -> nodeResponse.nodeStats().getFs())
871+
.orElseThrow(() -> new IllegalStateException("No warm node found"));
872+
873+
for (FsInfo.Path path : warmFsInfo) {
874+
assertEquals(path.getPath(), "/warm");
875+
assertEquals(path.getFileCacheReserved(), new ByteSizeValue(16, ByteSizeUnit.GB));
876+
assertEquals(path.getTotal(), new ByteSizeValue(16 * 5, ByteSizeUnit.GB));
877+
}
878+
}
879+
834880
private void validateNodeStatsOutput(Set<ClusterStatsRequest.Metric> expectedMetrics, ClusterStatsResponse clusterStatsResponse) {
835881
// Ingest, network types, discovery types and packaging types stats are not included here as they don't have a get method exposed.
836882
Set<Metric> NodeMetrics = Set.of(Metric.OS, Metric.JVM, Metric.FS, Metric.PROCESS, Metric.PLUGINS);

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheSettings.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,11 @@ public class FileCacheSettings {
2323
* the file cache. For example, if 100GB disk space is configured for use as a file cache and the
2424
* remote_data_ratio of 5 is defined, then a total of 500GB of remote data can be loaded as searchable snapshots.
2525
* This is designed to be a safeguard to prevent oversubscribing a cluster.
26-
* Specify a value of zero for no limit, which is the default for compatibility reasons.
2726
*/
2827
public static final Setting<Double> DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING = Setting.doubleSetting(
2928
"cluster.filecache.remote_data_ratio",
30-
0.0,
31-
0.0,
29+
5.0,
30+
1.0,
3231
Setting.Property.NodeScope,
3332
Setting.Property.Dynamic
3433
);

server/src/main/java/org/opensearch/monitor/MonitorService.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,8 @@
3434

3535
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
3636
import org.opensearch.common.settings.Settings;
37-
import org.opensearch.env.NodeEnvironment;
38-
import org.opensearch.index.store.remote.filecache.FileCache;
3937
import org.opensearch.monitor.fs.FsService;
38+
import org.opensearch.monitor.fs.FsServiceProvider;
4039
import org.opensearch.monitor.jvm.JvmGcMonitorService;
4140
import org.opensearch.monitor.jvm.JvmService;
4241
import org.opensearch.monitor.os.OsService;
@@ -58,13 +57,12 @@ public class MonitorService extends AbstractLifecycleComponent {
5857
private final JvmService jvmService;
5958
private final FsService fsService;
6059

61-
public MonitorService(Settings settings, NodeEnvironment nodeEnvironment, ThreadPool threadPool, FileCache fileCache)
62-
throws IOException {
60+
public MonitorService(Settings settings, ThreadPool threadPool, FsServiceProvider fsServiceProvider) throws IOException {
6361
this.jvmGcMonitorService = new JvmGcMonitorService(settings, threadPool);
6462
this.osService = new OsService(settings);
6563
this.processService = new ProcessService(settings);
6664
this.jvmService = new JvmService(settings);
67-
this.fsService = new FsService(settings, nodeEnvironment, fileCache);
65+
this.fsService = fsServiceProvider.createFsService();
6866
}
6967

7068
public OsService osService() {
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.monitor.fs;
10+
11+
import org.opensearch.cluster.node.DiscoveryNode;
12+
import org.opensearch.common.settings.ClusterSettings;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.env.NodeEnvironment;
15+
import org.opensearch.index.store.remote.filecache.FileCache;
16+
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
17+
import org.opensearch.indices.IndicesService;
18+
19+
/**
20+
* Factory for creating appropriate FsService implementations based on node type.
21+
*
22+
* @opensearch.internal
23+
*/
24+
public class FsServiceProvider {
25+
26+
private final Settings settings;
27+
private final NodeEnvironment nodeEnvironment;
28+
private final FileCache fileCache;
29+
private final FileCacheSettings fileCacheSettings;
30+
private final IndicesService indicesService;
31+
32+
public FsServiceProvider(
33+
Settings settings,
34+
NodeEnvironment nodeEnvironment,
35+
FileCache fileCache,
36+
ClusterSettings clusterSettings,
37+
IndicesService indicesService
38+
) {
39+
this.settings = settings;
40+
this.nodeEnvironment = nodeEnvironment;
41+
this.fileCache = fileCache;
42+
this.fileCacheSettings = new FileCacheSettings(settings, clusterSettings);
43+
this.indicesService = indicesService;
44+
}
45+
46+
/**
47+
* Creates the appropriate FsService implementation based on node type.
48+
*
49+
* @return FsService instance
50+
*/
51+
public FsService createFsService() {
52+
if (DiscoveryNode.isWarmNode(settings)) {
53+
return new WarmFsService(settings, nodeEnvironment, fileCacheSettings, indicesService, fileCache);
54+
}
55+
return new FsService(settings, nodeEnvironment, fileCache);
56+
}
57+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.monitor.fs;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.env.NodeEnvironment;
15+
import org.opensearch.index.IndexService;
16+
import org.opensearch.index.shard.IndexShard;
17+
import org.opensearch.index.store.remote.filecache.FileCache;
18+
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
19+
import org.opensearch.indices.IndicesService;
20+
21+
import static org.opensearch.monitor.fs.FsProbe.adjustForHugeFilesystems;
22+
23+
/**
24+
* FileSystem service implementation for warm nodes that calculates disk usage
25+
* based on file cache size and remote data ratio instead of actual physical disk usage.
26+
*
27+
* @opensearch.internal
28+
*/
29+
public class WarmFsService extends FsService {
30+
31+
private static final Logger logger = LogManager.getLogger(WarmFsService.class);
32+
33+
private final FileCacheSettings fileCacheSettings;
34+
private final IndicesService indicesService;
35+
private final FileCache fileCache;
36+
37+
public WarmFsService(
38+
Settings settings,
39+
NodeEnvironment nodeEnvironment,
40+
FileCacheSettings fileCacheSettings,
41+
IndicesService indicesService,
42+
FileCache fileCache
43+
) {
44+
super(settings, nodeEnvironment, fileCache);
45+
this.fileCacheSettings = fileCacheSettings;
46+
this.indicesService = indicesService;
47+
this.fileCache = fileCache;
48+
}
49+
50+
@Override
51+
public FsInfo stats() {
52+
// Calculate total addressable space
53+
final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio();
54+
final long nodeCacheSize = fileCache != null ? fileCache.capacity() : 0;
55+
final long totalBytes = (long) (dataToFileCacheSizeRatio * nodeCacheSize);
56+
57+
// Calculate used bytes from primary shards
58+
long usedBytes = 0;
59+
if (indicesService != null) {
60+
for (IndexService indexService : indicesService) {
61+
for (IndexShard shard : indexService) {
62+
if (shard.routingEntry() != null && shard.routingEntry().primary() && shard.routingEntry().active()) {
63+
try {
64+
usedBytes += shard.store().stats(0).getSizeInBytes();
65+
} catch (Exception e) {
66+
logger.error("Unable to get store size for shard {} with error: {}", shard.shardId(), e.getMessage());
67+
}
68+
}
69+
}
70+
}
71+
}
72+
73+
long freeBytes = Math.max(0, totalBytes - usedBytes);
74+
75+
FsInfo.Path warmPath = new FsInfo.Path();
76+
warmPath.path = "/warm";
77+
warmPath.mount = "warm";
78+
warmPath.type = "warm";
79+
warmPath.total = adjustForHugeFilesystems(totalBytes);
80+
warmPath.free = adjustForHugeFilesystems(freeBytes);
81+
warmPath.available = adjustForHugeFilesystems(freeBytes);
82+
if (fileCache != null) {
83+
warmPath.fileCacheReserved = adjustForHugeFilesystems(fileCache.capacity());
84+
warmPath.fileCacheUtilized = adjustForHugeFilesystems(fileCache.usage());
85+
}
86+
87+
logger.trace("Warm node disk usage - total: {}, used: {}, free: {}", totalBytes, usedBytes, freeBytes);
88+
89+
FsInfo nodeFsInfo = super.stats();
90+
return new FsInfo(System.currentTimeMillis(), nodeFsInfo.getIoStats(), new FsInfo.Path[] { warmPath });
91+
}
92+
}

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@
194194
import org.opensearch.monitor.MonitorService;
195195
import org.opensearch.monitor.fs.FsHealthService;
196196
import org.opensearch.monitor.fs.FsProbe;
197+
import org.opensearch.monitor.fs.FsServiceProvider;
197198
import org.opensearch.monitor.jvm.JvmInfo;
198199
import org.opensearch.node.remotestore.RemoteStoreNodeService;
199200
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
@@ -794,7 +795,6 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
794795
);
795796
// File cache will be initialized by the node once circuit breakers are in place.
796797
initializeFileCache(settings, circuitBreakerService.getBreaker(CircuitBreaker.REQUEST));
797-
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, fileCache);
798798

799799
pluginsService.filterPlugins(CircuitBreakerPlugin.class).forEach(plugin -> {
800800
CircuitBreaker breaker = circuitBreakerService.getBreaker(plugin.getCircuitBreaker(settings).getName());
@@ -1008,6 +1008,15 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
10081008
new SystemIngestPipelineCache()
10091009
);
10101010

1011+
final FsServiceProvider fsServiceProvider = new FsServiceProvider(
1012+
settings,
1013+
nodeEnvironment,
1014+
fileCache,
1015+
settingsModule.getClusterSettings(),
1016+
indicesService
1017+
);
1018+
final MonitorService monitorService = new MonitorService(settings, threadPool, fsServiceProvider);
1019+
10111020
final AliasValidator aliasValidator = new AliasValidator();
10121021

10131022
final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices);

0 commit comments

Comments
 (0)