Skip to content

Commit

Permalink
Adds counter metrics for leader and follower check failures (opensear…
Browse files Browse the repository at this point in the history
…ch-project#12439)

* Adds counter metrics for leader and follower check failures

Signed-off-by: Harsh Garg <gkharsh@amazon.com>
  • Loading branch information
gargharsh3134 authored and wangdongyu.danny committed Aug 22, 2024
1 parent 2b4646b commit c66be2c
Show file tree
Hide file tree
Showing 16 changed files with 313 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Add leader and follower check failure counter metrics ([#12439](https://github.com/opensearch-project/OpenSearch/pull/12439))
- Add latency metrics for instrumenting critical clusterManager code paths ([#12333](https://github.com/opensearch-project/OpenSearch/pull/12333))
- Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423))
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.cluster;

import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;
Expand All @@ -23,13 +24,17 @@
public final class ClusterManagerMetrics {

private static final String LATENCY_METRIC_UNIT_MS = "ms";
private static final String COUNTER_METRICS_UNIT = "1";

public final Histogram clusterStateAppliersHistogram;
public final Histogram clusterStateListenersHistogram;
public final Histogram rerouteHistogram;
public final Histogram clusterStateComputeHistogram;
public final Histogram clusterStatePublishHistogram;

public final Counter leaderCheckFailureCounter;
public final Counter followerChecksFailureCounter;

public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
clusterStateAppliersHistogram = metricsRegistry.createHistogram(
"cluster.state.appliers.latency",
Expand All @@ -56,6 +61,16 @@ public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
"Histogram for recording time taken to publish a new cluster state",
LATENCY_METRIC_UNIT_MS
);
followerChecksFailureCounter = metricsRegistry.createCounter(
"followers.checker.failure.count",
"Counter for number of failed follower checks",
COUNTER_METRICS_UNIT
);
leaderCheckFailureCounter = metricsRegistry.createCounter(
"leader.checker.failure.count",
"Counter for number of failed leader checks",
COUNTER_METRICS_UNIT
);
}

public void recordLatency(Histogram histogram, Double value) {
Expand All @@ -69,4 +84,16 @@ public void recordLatency(Histogram histogram, Double value, Optional<Tags> tags
}
histogram.record(value, tags.get());
}

public void incrementCounter(Counter counter, Double value) {
incrementCounter(counter, value, Optional.empty());
}

public void incrementCounter(Counter counter, Double value, Optional<Tags> tags) {
if (Objects.isNull(tags) || tags.isEmpty()) {
counter.add(value);
return;
}
counter.add(value, tags.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateTaskConfig;
Expand Down Expand Up @@ -207,7 +208,8 @@ public Coordinator(
ElectionStrategy electionStrategy,
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService
RemoteStoreNodeService remoteStoreNodeService,
ClusterManagerMetrics clusterManagerMetrics
) {
this.settings = settings;
this.transportService = transportService;
Expand Down Expand Up @@ -261,14 +263,22 @@ public Coordinator(
this::handlePublishRequest,
this::handleApplyCommit
);
this.leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, this::onLeaderFailure, nodeHealthService);
this.leaderChecker = new LeaderChecker(
settings,
clusterSettings,
transportService,
this::onLeaderFailure,
nodeHealthService,
clusterManagerMetrics
);
this.followersChecker = new FollowersChecker(
settings,
clusterSettings,
transportService,
this::onFollowerCheckRequest,
this::removeNode,
nodeHealthService
nodeHealthService,
clusterManagerMetrics
);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
this.clusterApplier = clusterApplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.coordination.Coordinator.Mode;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -127,14 +128,16 @@ public class FollowersChecker {
private final TransportService transportService;
private final NodeHealthService nodeHealthService;
private volatile FastResponseState fastResponseState;
private ClusterManagerMetrics clusterManagerMetrics;

public FollowersChecker(
Settings settings,
ClusterSettings clusterSettings,
TransportService transportService,
Consumer<FollowerCheckRequest> handleRequestAndUpdateState,
BiConsumer<DiscoveryNode, String> onNodeFailure,
NodeHealthService nodeHealthService
NodeHealthService nodeHealthService,
ClusterManagerMetrics clusterManagerMetrics
) {
this.settings = settings;
this.transportService = transportService;
Expand All @@ -161,6 +164,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
handleDisconnectedNode(node);
}
});
this.clusterManagerMetrics = clusterManagerMetrics;
}

private void setFollowerCheckTimeout(TimeValue followerCheckTimeout) {
Expand Down Expand Up @@ -413,6 +417,7 @@ public String executor() {
}

void failNode(String reason) {
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.followerChecksFailureCounter, 1.0);
transportService.getThreadPool().generic().execute(new Runnable() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.Nullable;
Expand Down Expand Up @@ -119,17 +120,17 @@ public class LeaderChecker {
private final TransportService transportService;
private final Consumer<Exception> onLeaderFailure;
private final NodeHealthService nodeHealthService;

private AtomicReference<CheckScheduler> currentChecker = new AtomicReference<>();

private volatile DiscoveryNodes discoveryNodes;
private final ClusterManagerMetrics clusterManagerMetrics;

LeaderChecker(
final Settings settings,
final ClusterSettings clusterSettings,
final TransportService transportService,
final Consumer<Exception> onLeaderFailure,
NodeHealthService nodeHealthService
NodeHealthService nodeHealthService,
final ClusterManagerMetrics clusterManagerMetrics
) {
this.settings = settings;
leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings);
Expand All @@ -138,6 +139,7 @@ public class LeaderChecker {
this.transportService = transportService;
this.onLeaderFailure = onLeaderFailure;
this.nodeHealthService = nodeHealthService;
this.clusterManagerMetrics = clusterManagerMetrics;
clusterSettings.addSettingsUpdateConsumer(LEADER_CHECK_TIMEOUT_SETTING, this::setLeaderCheckTimeout);

transportService.registerRequestHandler(
Expand Down Expand Up @@ -293,7 +295,6 @@ public void handleResponse(Empty response) {
logger.debug("closed check scheduler received a response, doing nothing");
return;
}

failureCountSinceLastSuccess.set(0);
scheduleNextWakeUp(); // logs trace message indicating success
}
Expand All @@ -304,7 +305,6 @@ public void handleException(TransportException exp) {
logger.debug("closed check scheduler received a response, doing nothing");
return;
}

if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
logger.debug(new ParameterizedMessage("leader [{}] disconnected during check", leader), exp);
leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp));
Expand Down Expand Up @@ -355,6 +355,7 @@ public String executor() {

void leaderFailed(Exception e) {
if (isClosed.compareAndSet(false, true)) {
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.leaderCheckFailureCounter, 1.0);
transportService.getThreadPool().generic().execute(new Runnable() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.Coordinator;
import org.opensearch.cluster.coordination.ElectionStrategy;
Expand Down Expand Up @@ -133,7 +134,8 @@ public DiscoveryModule(
RerouteService rerouteService,
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService
RemoteStoreNodeService remoteStoreNodeService,
ClusterManagerMetrics clusterManagerMetrics
) {
final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
Expand Down Expand Up @@ -211,7 +213,8 @@ public DiscoveryModule(
electionStrategy,
nodeHealthService,
persistedStateRegistry,
remoteStoreNodeService
remoteStoreNodeService,
clusterManagerMetrics
);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1199,7 +1199,8 @@ protected Node(
rerouteService,
fsHealthService,
persistedStateRegistry,
remoteStoreNodeService
remoteStoreNodeService,
clusterManagerMetrics
);
final SearchPipelineService searchPipelineService = new SearchPipelineService(
clusterService,
Expand Down
Loading

0 comments on commit c66be2c

Please sign in to comment.