Skip to content

Commit

Permalink
Caching avg total bytes and avg free bytes inside ClusterInfo (opense…
Browse files Browse the repository at this point in the history
…arch-project#14851)

Signed-off-by: RS146BIJAY <rishavsagar4b1@gmail.com>
  • Loading branch information
RS146BIJAY authored and wangdongyu.danny committed Aug 22, 2024
1 parent 1f5f393 commit 51db1db
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 38 deletions.
37 changes: 37 additions & 0 deletions server/src/main/java/org/opensearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.cluster;

import org.opensearch.Version;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -68,6 +69,8 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
final Map<ShardRouting, String> routingToDataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace;
final Map<String, FileCacheStats> nodeFileCacheStats;
private long avgTotalBytes;
private long avgFreeByte;

protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
Expand Down Expand Up @@ -97,6 +100,7 @@ public ClusterInfo(
this.routingToDataPath = routingToDataPath;
this.reservedSpace = reservedSpace;
this.nodeFileCacheStats = nodeFileCacheStats;
calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage);
}

public ClusterInfo(StreamInput in) throws IOException {
Expand All @@ -117,6 +121,39 @@ public ClusterInfo(StreamInput in) throws IOException {
} else {
this.nodeFileCacheStats = Map.of();
}

calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage);
}

/**
* Returns a {@link DiskUsage} for the {@link RoutingNode} using the
* average usage of other nodes in the disk usage map.
* @param usages Map of nodeId to DiskUsage for all known nodes
*/
private void calculateAvgFreeAndTotalBytes(final Map<String, DiskUsage> usages) {
if (usages == null || usages.isEmpty()) {
this.avgTotalBytes = 0;
this.avgFreeByte = 0;
return;
}

long totalBytes = 0;
long freeBytes = 0;
for (DiskUsage du : usages.values()) {
totalBytes += du.getTotalBytes();
freeBytes += du.getFreeBytes();
}

this.avgTotalBytes = totalBytes / usages.size();
this.avgFreeByte = freeBytes / usages.size();
}

public long getAvgFreeByte() {
return avgFreeByte;
}

public long getAvgTotalBytes() {
return avgTotalBytes;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,8 @@ public static long sizeOfRelocatingShards(

// Where reserved space is unavailable (e.g. stats are out-of-sync) compute a conservative estimate for initialising shards
final List<ShardRouting> initializingShards = node.shardsWithState(ShardRoutingState.INITIALIZING);
initializingShards.removeIf(shardRouting -> reservedSpace.containsShardId(shardRouting.shardId()));
for (ShardRouting routing : initializingShards) {
if (routing.relocatingNodeId() == null) {
if (routing.relocatingNodeId() == null || reservedSpace.containsShardId(routing.shardId())) {
// in practice the only initializing-but-not-relocating shards with a nonzero expected shard size will be ones created
// by a resize (shrink/split/clone) operation which we expect to happen using hard links, so they shouldn't be taking
// any additional space and can be ignored here
Expand Down Expand Up @@ -230,7 +229,14 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing

// subtractLeavingShards is passed as false here, because they still use disk space, and therefore we should be extra careful
// and take the size into account
final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, false);
final DiskUsageWithRelocations usage = getDiskUsage(
node,
allocation,
usages,
clusterInfo.getAvgFreeByte(),
clusterInfo.getAvgTotalBytes(),
false
);
// First, check that the node currently over the low watermark
double freeDiskPercentage = usage.getFreeDiskAsPercentage();
// Cache the used disk percentage for displaying disk percentages consistent with documentation
Expand Down Expand Up @@ -492,7 +498,14 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl

// subtractLeavingShards is passed as true here, since this is only for shards remaining, we will *eventually* have enough disk
// since shards are moving away. No new shards will be incoming since in canAllocate we pass false for this check.
final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, true);
final DiskUsageWithRelocations usage = getDiskUsage(
node,
allocation,
usages,
clusterInfo.getAvgFreeByte(),
clusterInfo.getAvgTotalBytes(),
true
);
final String dataPath = clusterInfo.getDataPath(shardRouting);
// If this node is already above the high threshold, the shard cannot remain (get it off!)
final double freeDiskPercentage = usage.getFreeDiskAsPercentage();
Expand Down Expand Up @@ -581,13 +594,15 @@ private DiskUsageWithRelocations getDiskUsage(
RoutingNode node,
RoutingAllocation allocation,
final Map<String, DiskUsage> usages,
final long avgFreeBytes,
final long avgTotalBytes,
boolean subtractLeavingShards
) {
DiskUsage usage = usages.get(node.nodeId());
if (usage == null) {
// If there is no usage, and we have other nodes in the cluster,
// use the average usage for all nodes as the usage for this node
usage = averageUsage(node, usages);
usage = new DiskUsage(node.nodeId(), node.node().getName(), "_na_", avgTotalBytes, avgFreeBytes);
if (logger.isDebugEnabled()) {
logger.debug(
"unable to determine disk usage for {}, defaulting to average across nodes [{} total] [{} free] [{}% free]",
Expand Down Expand Up @@ -619,26 +634,6 @@ private DiskUsageWithRelocations getDiskUsage(
return diskUsageWithRelocations;
}

/**
* Returns a {@link DiskUsage} for the {@link RoutingNode} using the
* average usage of other nodes in the disk usage map.
* @param node Node to return an averaged DiskUsage object for
* @param usages Map of nodeId to DiskUsage for all known nodes
* @return DiskUsage representing given node using the average disk usage
*/
DiskUsage averageUsage(RoutingNode node, final Map<String, DiskUsage> usages) {
if (usages.size() == 0) {
return new DiskUsage(node.nodeId(), node.node().getName(), "_na_", 0, 0);
}
long totalBytes = 0;
long freeBytes = 0;
for (DiskUsage du : usages.values()) {
totalBytes += du.getTotalBytes();
freeBytes += du.getFreeBytes();
}
return new DiskUsage(node.nodeId(), node.node().getName(), "_na_", totalBytes / usages.size(), freeBytes / usages.size());
}

/**
* Given the DiskUsage for a node and the size of the shard, return the
* percentage of free disk if the shard were to be allocated to the node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -863,19 +863,6 @@ public void testUnknownDiskUsage() {
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
}

public void testAverageUsage() {
RoutingNode rn = new RoutingNode("node1", newNode("node1"));
DiskThresholdDecider decider = makeDecider(Settings.EMPTY);

final Map<String, DiskUsage> usages = new HashMap<>();
usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 50)); // 50% used
usages.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 0)); // 100% used

DiskUsage node1Usage = decider.averageUsage(rn, usages);
assertThat(node1Usage.getTotalBytes(), equalTo(100L));
assertThat(node1Usage.getFreeBytes(), equalTo(25L));
}

public void testFreeDiskPercentageAfterShardAssigned() {
DiskThresholdDecider decider = makeDecider(Settings.EMPTY);

Expand Down

0 comments on commit 51db1db

Please sign in to comment.