Skip to content

Commit

Permalink
Store indices and shards health based on cluster health level param
Browse files Browse the repository at this point in the history
Signed-off-by: Swetha Guptha <gupthasg@amazon.com>
  • Loading branch information
Swetha Guptha committed Aug 29, 2024
1 parent ed65482 commit 4ed3b18
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,57 @@ public ClusterHealthResponse(
this.clusterHealthStatus = clusterStateHealth.getStatus();
}

public ClusterHealthResponse(
String clusterName,
String[] concreteIndices,
ClusterHealthRequest.Level healthLevel,
ClusterState clusterState,
int numberOfPendingTasks,
int numberOfInFlightFetch,
int delayedUnassignedShards,
TimeValue taskMaxWaitingTime
) {
this.clusterName = clusterName;
this.numberOfPendingTasks = numberOfPendingTasks;
this.numberOfInFlightFetch = numberOfInFlightFetch;
this.delayedUnassignedShards = delayedUnassignedShards;
this.taskMaxWaitingTime = taskMaxWaitingTime;
this.clusterStateHealth = new ClusterStateHealth(clusterState, concreteIndices, healthLevel);
this.clusterHealthStatus = clusterStateHealth.getStatus();
}

// Awareness Attribute health
public ClusterHealthResponse(
String clusterName,
ClusterState clusterState,
ClusterSettings clusterSettings,
String[] concreteIndices,
String awarenessAttributeName,
int numberOfPendingTasks,
int numberOfInFlightFetch,
int delayedUnassignedShards,
TimeValue taskMaxWaitingTime
) {
this(
clusterName,
concreteIndices,
clusterState,
numberOfPendingTasks,
numberOfInFlightFetch,
delayedUnassignedShards,
taskMaxWaitingTime
);
this.clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, awarenessAttributeName);
}

// Awareness Attribute health
public ClusterHealthResponse(
String clusterName,
ClusterState clusterState,
ClusterSettings clusterSettings,
String[] concreteIndices,
String awarenessAttributeName,
ClusterHealthRequest.Level healthLevel,
int numberOfPendingTasks,
int numberOfInFlightFetch,
int delayedUnassignedShards,
Expand All @@ -252,6 +296,7 @@ public ClusterHealthResponse(
this(
clusterName,
concreteIndices,
healthLevel,
clusterState,
numberOfPendingTasks,
numberOfInFlightFetch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ private ClusterHealthResponse clusterHealth(
clusterService.getClusterSettings(),
concreteIndices,
awarenessAttribute,
request.level(),
numberOfPendingTasks,
numberOfInFlightFetch,
UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
Expand All @@ -514,6 +515,7 @@ private ClusterHealthResponse clusterHealth(
ClusterHealthResponse response = new ClusterHealthResponse(
clusterState.getClusterName().value(),
Strings.EMPTY_ARRAY,
request.level(),
clusterState,
numberOfPendingTasks,
numberOfInFlightFetch,
Expand All @@ -527,6 +529,7 @@ private ClusterHealthResponse clusterHealth(
return new ClusterHealthResponse(
clusterState.getClusterName().value(),
concreteIndices,
request.level(),
clusterState,
numberOfPendingTasks,
numberOfInFlightFetch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.indices.stats.CommonStats;
Expand Down Expand Up @@ -209,7 +210,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq

ClusterHealthStatus clusterStatus = null;
if (clusterService.state().nodes().isLocalNodeElectedClusterManager()) {
clusterStatus = new ClusterStateHealth(clusterService.state()).getStatus();
clusterStatus = new ClusterStateHealth(clusterService.state(), ClusterHealthRequest.Level.CLUSTER).getStatus();
}

return new ClusterStatsNodeResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.cluster.health;

import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
Expand Down Expand Up @@ -191,6 +192,57 @@ public ClusterIndexHealth(final IndexMetadata indexMetadata, final IndexRoutingT
this.unassignedShards = computeUnassignedShards;
}

public ClusterIndexHealth(final IndexMetadata indexMetadata, final IndexRoutingTable indexRoutingTable, final ClusterHealthRequest.Level healthLevel) {
this.index = indexMetadata.getIndex().getName();
this.numberOfShards = indexMetadata.getNumberOfShards();
this.numberOfReplicas = indexMetadata.getNumberOfReplicas();

shards = new HashMap<>();
boolean isShardLevelHealthInfo = healthLevel == ClusterHealthRequest.Level.SHARDS;

// update the index status
ClusterHealthStatus computeStatus = ClusterHealthStatus.GREEN;
int computeActivePrimaryShards = 0;
int computeActiveShards = 0;
int computeRelocatingShards = 0;
int computeInitializingShards = 0;
int computeUnassignedShards = 0;
for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
int shardId = shardRoutingTable.shardId().id();
ClusterShardHealth shardHealth = new ClusterShardHealth(shardId, shardRoutingTable);

if (shardHealth.isPrimaryActive()) {
computeActivePrimaryShards++;
}
computeActiveShards += shardHealth.getActiveShards();
computeRelocatingShards += shardHealth.getRelocatingShards();
computeInitializingShards += shardHealth.getInitializingShards();
computeUnassignedShards += shardHealth.getUnassignedShards();

if (shardHealth.getStatus() == ClusterHealthStatus.RED) {
computeStatus = ClusterHealthStatus.RED;
} else if (shardHealth.getStatus() == ClusterHealthStatus.YELLOW && computeStatus != ClusterHealthStatus.RED) {
// do not override an existing red
computeStatus = ClusterHealthStatus.YELLOW;
}

if (isShardLevelHealthInfo) {
shards.put(shardId, shardHealth);
}
}

if (indexRoutingTable.shards().isEmpty()) { // might be since none has been created yet (two phase index creation)
computeStatus = ClusterHealthStatus.RED;
}

this.status = computeStatus;
this.activePrimaryShards = computeActivePrimaryShards;
this.activeShards = computeActiveShards;
this.relocatingShards = computeRelocatingShards;
this.initializingShards = computeInitializingShards;
this.unassignedShards = computeUnassignedShards;
}

public ClusterIndexHealth(final StreamInput in) throws IOException {
index = in.readString();
numberOfShards = in.readVInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package org.opensearch.cluster.health;

import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
Expand Down Expand Up @@ -76,6 +77,10 @@ public ClusterStateHealth(final ClusterState clusterState) {
this(clusterState, clusterState.metadata().getConcreteAllIndices());
}

public ClusterStateHealth(final ClusterState clusterState, final ClusterHealthRequest.Level clusterHealthLevel) {
this(clusterState, clusterState.metadata().getConcreteAllIndices(), clusterHealthLevel);
}

/**
* Creates a new <code>ClusterStateHealth</code> instance considering the current cluster state and the provided index names.
*
Expand Down Expand Up @@ -145,6 +150,70 @@ public ClusterStateHealth(final ClusterState clusterState, final String[] concre
}
}

public ClusterStateHealth(final ClusterState clusterState, final String[] concreteIndices, final ClusterHealthRequest.Level healthLevel) {
numberOfNodes = clusterState.nodes().getSize();
numberOfDataNodes = clusterState.nodes().getDataNodes().size();
hasDiscoveredClusterManager = clusterState.nodes().getClusterManagerNodeId() != null;
indices = new HashMap<>();
boolean isIndexLevelHealthInfo = healthLevel == ClusterHealthRequest.Level.INDICES || healthLevel == ClusterHealthRequest.Level.SHARDS;

ClusterHealthStatus computeStatus = ClusterHealthStatus.GREEN;
int computeActivePrimaryShards = 0;
int computeActiveShards = 0;
int computeRelocatingShards = 0;
int computeInitializingShards = 0;
int computeUnassignedShards = 0;
for (String index : concreteIndices) {
IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index);
IndexMetadata indexMetadata = clusterState.metadata().index(index);
if (indexRoutingTable == null) {
continue;
}

ClusterIndexHealth indexHealth = new ClusterIndexHealth(indexMetadata, indexRoutingTable);
computeActivePrimaryShards += indexHealth.getActivePrimaryShards();
computeActiveShards += indexHealth.getActiveShards();
computeRelocatingShards += indexHealth.getRelocatingShards();
computeInitializingShards += indexHealth.getInitializingShards();
computeUnassignedShards += indexHealth.getUnassignedShards();
if (indexHealth.getStatus() == ClusterHealthStatus.RED) {
computeStatus = ClusterHealthStatus.RED;
} else if (indexHealth.getStatus() == ClusterHealthStatus.YELLOW && computeStatus != ClusterHealthStatus.RED) {
computeStatus = ClusterHealthStatus.YELLOW;
}

if (isIndexLevelHealthInfo) {
// Store ClusterIndexHealth only when the health is requested at Index or Shard level
indices.put(indexHealth.getIndex(), indexHealth);
}
}

if (clusterState.blocks().hasGlobalBlockWithStatus(RestStatus.SERVICE_UNAVAILABLE)) {
computeStatus = ClusterHealthStatus.RED;
}

this.status = computeStatus;
this.activePrimaryShards = computeActivePrimaryShards;
this.activeShards = computeActiveShards;
this.relocatingShards = computeRelocatingShards;
this.initializingShards = computeInitializingShards;
this.unassignedShards = computeUnassignedShards;

// shortcut on green
if (computeStatus.equals(ClusterHealthStatus.GREEN)) {
this.activeShardsPercent = 100;
} else {
List<ShardRouting> shardRoutings = clusterState.getRoutingTable().allShards();
int activeShardCount = 0;
int totalShardCount = 0;
for (ShardRouting shardRouting : shardRoutings) {
if (shardRouting.active()) activeShardCount++;
totalShardCount++;
}
this.activeShardsPercent = (((double) activeShardCount) / totalShardCount) * 100;
}
}

public ClusterStateHealth(final StreamInput in) throws IOException {
activePrimaryShards = in.readVInt();
activeShards = in.readVInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.cluster.ClusterInfoService;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -196,7 +197,7 @@ public ClusterState applyStartedShards(ClusterState clusterState, List<ShardRout
protected ClusterState buildResultAndLogHealthChange(ClusterState oldState, RoutingAllocation allocation, String reason) {
ClusterState newState = buildResult(oldState, allocation);

logClusterHealthStateChange(new ClusterStateHealth(oldState), new ClusterStateHealth(newState), reason);
logClusterHealthStateChange(new ClusterStateHealth(oldState, ClusterHealthRequest.Level.CLUSTER), new ClusterStateHealth(newState, ClusterHealthRequest.Level.CLUSTER), reason);

return newState;
}
Expand Down

0 comments on commit 4ed3b18

Please sign in to comment.