Skip to content

Commit

Permalink
Include file cache stats in node stats
Browse files Browse the repository at this point in the history
And add an event listener to clean up cache entries when the index is deleted

Signed-off-by: Rabi Panda <adnapibar@gmail.com>
  • Loading branch information
adnapibar committed Feb 16, 2023
1 parent 6b35c32 commit 881de81
Show file tree
Hide file tree
Showing 16 changed files with 493 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.hamcrest.MatcherAssert;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
Expand All @@ -27,6 +29,7 @@
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.repositories.fs.FsRepository;

Expand Down Expand Up @@ -410,4 +413,71 @@ private void assertIndexDirectoryDoesNotExist(String... indexNames) {
}
}
}

public void testPruneFileCacheOnIndexDeletion() throws Exception {
final String snapshotName = "test-snap";
final String repoName = "test-repo";
final String indexName1 = "test-idx-1";
final String restoredIndexName1 = indexName1 + "-copy";
final Client client = client();
final int numNodes = 2;

internalCluster().ensureAtLeastNumSearchNodes(numNodes);
createIndexWithDocsAndEnsureGreen(1, 100, indexName1);

createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName1);
deleteIndicesAndEnsureGreen(client, indexName1);

restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertNodesFileCacheNonEmpty(numNodes);

deleteIndicesAndEnsureGreen(client, restoredIndexName1);
assertAllNodesFileCacheEmpty();
}

public void testFileCacheStats() throws Exception {
final String snapshotName = "test-snap";
final String repoName = "test-repo";
final String indexName1 = "test-idx-1";
final Client client = client();
final int numNodes = 2;

internalCluster().ensureAtLeastNumSearchNodes(numNodes);
createIndexWithDocsAndEnsureGreen(1, 100, indexName1);

createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName1);
deleteIndicesAndEnsureGreen(client, indexName1);
assertAllNodesFileCacheEmpty();

restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertNodesFileCacheNonEmpty(numNodes);
}

private void assertAllNodesFileCacheEmpty() {
NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
for (NodeStats stats : response.getNodes()) {
FileCacheStats fcstats = stats.getFileCacheStats();
assertNotNull(fcstats);
assertTrue(isFileCacheEmpty(fcstats));
}
}

private void assertNodesFileCacheNonEmpty(int numNodes) {
NodesStatsResponse response = client().admin().cluster().nodesStats(new NodesStatsRequest().all()).actionGet();
int nonEmptyFileCacheNodes = 0;
for (NodeStats stats : response.getNodes()) {
FileCacheStats fcstats = stats.getFileCacheStats();
assertNotNull(fcstats);
if (!isFileCacheEmpty(fcstats)) {
nonEmptyFileCacheNodes++;
}
}
assertEquals(numNodes, nonEmptyFileCacheNodes);
}

private boolean isFileCacheEmpty(FileCacheStats stats) {
return stats.getUsed().getBytes() == 0L && stats.getActive().getBytes() == 0L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.http.HttpStats;
import org.opensearch.index.stats.IndexingPressureStats;
import org.opensearch.index.stats.ShardIndexingPressureStats;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.indices.NodeIndicesStats;
import org.opensearch.indices.breaker.AllCircuitBreakerStats;
import org.opensearch.ingest.IngestStats;
Expand Down Expand Up @@ -130,6 +131,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private WeightedRoutingStats weightedRoutingStats;

@Nullable
private FileCacheStats fileCacheStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -171,6 +175,11 @@ public NodeStats(StreamInput in) throws IOException {
} else {
weightedRoutingStats = null;
}
if (in.getVersion().onOrAfter(Version.V_2_6_0)) {
fileCacheStats = in.readOptionalWriteable(FileCacheStats::new);
} else {
fileCacheStats = null;
}
}

public NodeStats(
Expand All @@ -194,7 +203,8 @@ public NodeStats(
@Nullable ShardIndexingPressureStats shardIndexingPressureStats,
@Nullable SearchBackpressureStats searchBackpressureStats,
@Nullable ClusterManagerThrottlingStats clusterManagerThrottlingStats,
@Nullable WeightedRoutingStats weightedRoutingStats
@Nullable WeightedRoutingStats weightedRoutingStats,
@Nullable FileCacheStats fileCacheStats
) {
super(node);
this.timestamp = timestamp;
Expand All @@ -217,6 +227,7 @@ public NodeStats(
this.searchBackpressureStats = searchBackpressureStats;
this.clusterManagerThrottlingStats = clusterManagerThrottlingStats;
this.weightedRoutingStats = weightedRoutingStats;
this.fileCacheStats = fileCacheStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -340,6 +351,10 @@ public WeightedRoutingStats getWeightedRoutingStats() {
return weightedRoutingStats;
}

public FileCacheStats getFileCacheStats() {
return fileCacheStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -374,6 +389,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_6_0)) {
out.writeOptionalWriteable(weightedRoutingStats);
}
if (out.getVersion().onOrAfter(Version.V_2_6_0)) {
out.writeOptionalWriteable(fileCacheStats);
}
}

@Override
Expand Down Expand Up @@ -455,6 +473,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getWeightedRoutingStats() != null) {
getWeightedRoutingStats().toXContent(builder, params);
}
if (getFileCacheStats() != null) {
getFileCacheStats().toXContent(builder, params);
}

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ public enum Metric {
SHARD_INDEXING_PRESSURE("shard_indexing_pressure"),
SEARCH_BACKPRESSURE("search_backpressure"),
CLUSTER_MANAGER_THROTTLING("cluster_manager_throttling"),
WEIGHTED_ROUTING_STATS("weighted_routing");
WEIGHTED_ROUTING_STATS("weighted_routing"),
FILE_CACHE_STATS("file_cache");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.SHARD_INDEXING_PRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_BACKPRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.CLUSTER_MANAGER_THROTTLING.containedIn(metrics),
NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics)
NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics),
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store.remote.filecache;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexModule;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardStateMetadata;

import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;

import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION;

/**
* IndexEventListener to clean up file cache when the index is deleted
*
* @opensearch.internal
*/
public class FileCacheCleaner implements IndexEventListener {
private static final Logger log = LogManager.getLogger(FileCacheCleaner.class);

private final NodeEnvironment nodeEnvironment;
private final FileCache fileCache;

public FileCacheCleaner(NodeEnvironment nodeEnvironment, FileCache fileCache) {
this.nodeEnvironment = nodeEnvironment;
this.fileCache = fileCache;
}

private Path getShardPath(ShardId shardId) throws IOException {
final Path[] paths = nodeEnvironment.availableShardPaths(shardId);
for (Path path : paths) {
ShardStateMetadata metadata = ShardStateMetadata.FORMAT.loadLatestState(log, NamedXContentRegistry.EMPTY, path);
if (metadata != null) {
return path;
}
}
return null;
}

/**
* before shard deleted and after shard closed, cleans up the corresponding index file path entries from FC.
* @param shardId The shard id
* @param settings the shards index settings
*/
@Override
public void beforeIndexShardDeleted(ShardId shardId, Settings settings) {
try {
String storeType = settings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey());
if (IndexModule.Type.REMOTE_SNAPSHOT.match(storeType)) {
Path localShardPath = getShardPath(shardId);
if (localShardPath != null) {
Path localStorePath = localShardPath.resolve(LOCAL_STORE_LOCATION);
try (DirectoryStream<Path> ds = Files.newDirectoryStream(localStorePath)) {
for (Path subPath : ds) {
fileCache.remove(subPath.toRealPath());
}
}
}
}
} catch (IOException ioe) {
log.error(
(Supplier<?>) () -> new ParameterizedMessage(
"thrown an exception when removing items from file cache for being deleted shard {})",
shardId
),
ioe
);
}
}
}
Loading

0 comments on commit 881de81

Please sign in to comment.