Skip to content

Commit

Permalink
Adds counter metrics for leader and follower check failures (#12439)
Browse files Browse the repository at this point in the history
* Adds counter metrics for leader and follower check failures

Signed-off-by: Harsh Garg <gkharsh@amazon.com>
  • Loading branch information
gargharsh3134 authored and Harsh Garg committed Jun 10, 2024
1 parent 9d11dbb commit 6d78faa
Show file tree
Hide file tree
Showing 16 changed files with 404 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Add leader and follower check failure counter metrics ([#12439](https://github.com/opensearch-project/OpenSearch/pull/12439))
- Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423))
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
- Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster;

import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

import java.util.Objects;
import java.util.Optional;

/**
* Class containing metrics (counters/latency) specific to ClusterManager.
*
* @opensearch.internal
*/
public final class ClusterManagerMetrics {

private static final String LATENCY_METRIC_UNIT_MS = "ms";
private static final String COUNTER_METRICS_UNIT = "1";

public final Histogram clusterStateAppliersHistogram;
public final Histogram clusterStateListenersHistogram;
public final Histogram rerouteHistogram;
public final Histogram clusterStateComputeHistogram;
public final Histogram clusterStatePublishHistogram;

public final Counter leaderCheckFailureCounter;
public final Counter followerChecksFailureCounter;

public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
clusterStateAppliersHistogram = metricsRegistry.createHistogram(
"cluster.state.appliers.latency",
"Histogram for tracking the latency of cluster state appliers",
LATENCY_METRIC_UNIT_MS
);
clusterStateListenersHistogram = metricsRegistry.createHistogram(
"cluster.state.listeners.latency",
"Histogram for tracking the latency of cluster state listeners",
LATENCY_METRIC_UNIT_MS
);
rerouteHistogram = metricsRegistry.createHistogram(
"allocation.reroute.latency",
"Histogram for recording latency of shard re-routing",
LATENCY_METRIC_UNIT_MS
);
clusterStateComputeHistogram = metricsRegistry.createHistogram(
"cluster.state.new.compute.latency",
"Histogram for recording time taken to compute new cluster state",
LATENCY_METRIC_UNIT_MS
);
clusterStatePublishHistogram = metricsRegistry.createHistogram(
"cluster.state.publish.success.latency",
"Histogram for recording time taken to publish a new cluster state",
LATENCY_METRIC_UNIT_MS
);
followerChecksFailureCounter = metricsRegistry.createCounter(
"followers.checker.failure.count",
"Counter for number of failed follower checks",
COUNTER_METRICS_UNIT
);
leaderCheckFailureCounter = metricsRegistry.createCounter(
"leader.checker.failure.count",
"Counter for number of failed leader checks",
COUNTER_METRICS_UNIT
);
}

public void recordLatency(Histogram histogram, Double value) {
histogram.record(value);
}

public void recordLatency(Histogram histogram, Double value, Optional<Tags> tags) {
if (Objects.isNull(tags) || tags.isEmpty()) {
histogram.record(value);
return;
}
histogram.record(value, tags.get());
}

public void incrementCounter(Counter counter, Double value) {
incrementCounter(counter, value, Optional.empty());
}

public void incrementCounter(Counter counter, Double value, Optional<Tags> tags) {
if (Objects.isNull(tags) || tags.isEmpty()) {
counter.add(value);
return;
}
counter.add(value, tags.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.LegacyESVersion;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateTaskConfig;
Expand Down Expand Up @@ -208,7 +209,8 @@ public Coordinator(
ElectionStrategy electionStrategy,
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService
RemoteStoreNodeService remoteStoreNodeService,
ClusterManagerMetrics clusterManagerMetrics
) {
this.settings = settings;
this.transportService = transportService;
Expand Down Expand Up @@ -262,14 +264,22 @@ public Coordinator(
this::handlePublishRequest,
this::handleApplyCommit
);
this.leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, this::onLeaderFailure, nodeHealthService);
this.leaderChecker = new LeaderChecker(
settings,
clusterSettings,
transportService,
this::onLeaderFailure,
nodeHealthService,
clusterManagerMetrics
);
this.followersChecker = new FollowersChecker(
settings,
clusterSettings,
transportService,
this::onFollowerCheckRequest,
this::removeNode,
nodeHealthService
nodeHealthService,
clusterManagerMetrics
);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
this.clusterApplier = clusterApplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.coordination.Coordinator.Mode;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -127,14 +128,16 @@ public class FollowersChecker {
private final TransportService transportService;
private final NodeHealthService nodeHealthService;
private volatile FastResponseState fastResponseState;
private ClusterManagerMetrics clusterManagerMetrics;

public FollowersChecker(
Settings settings,
ClusterSettings clusterSettings,
TransportService transportService,
Consumer<FollowerCheckRequest> handleRequestAndUpdateState,
BiConsumer<DiscoveryNode, String> onNodeFailure,
NodeHealthService nodeHealthService
NodeHealthService nodeHealthService,
ClusterManagerMetrics clusterManagerMetrics
) {
this.settings = settings;
this.transportService = transportService;
Expand All @@ -161,6 +164,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
handleDisconnectedNode(node);
}
});
this.clusterManagerMetrics = clusterManagerMetrics;
}

private void setFollowerCheckTimeout(TimeValue followerCheckTimeout) {
Expand Down Expand Up @@ -413,6 +417,7 @@ public String executor() {
}

void failNode(String reason) {
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.followerChecksFailureCounter, 1.0);
transportService.getThreadPool().generic().execute(new Runnable() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.Nullable;
Expand Down Expand Up @@ -119,17 +120,17 @@ public class LeaderChecker {
private final TransportService transportService;
private final Consumer<Exception> onLeaderFailure;
private final NodeHealthService nodeHealthService;

private AtomicReference<CheckScheduler> currentChecker = new AtomicReference<>();

private volatile DiscoveryNodes discoveryNodes;
private final ClusterManagerMetrics clusterManagerMetrics;

LeaderChecker(
final Settings settings,
final ClusterSettings clusterSettings,
final TransportService transportService,
final Consumer<Exception> onLeaderFailure,
NodeHealthService nodeHealthService
NodeHealthService nodeHealthService,
final ClusterManagerMetrics clusterManagerMetrics
) {
this.settings = settings;
leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings);
Expand All @@ -138,6 +139,7 @@ public class LeaderChecker {
this.transportService = transportService;
this.onLeaderFailure = onLeaderFailure;
this.nodeHealthService = nodeHealthService;
this.clusterManagerMetrics = clusterManagerMetrics;
clusterSettings.addSettingsUpdateConsumer(LEADER_CHECK_TIMEOUT_SETTING, this::setLeaderCheckTimeout);

transportService.registerRequestHandler(
Expand Down Expand Up @@ -293,7 +295,6 @@ public void handleResponse(Empty response) {
logger.debug("closed check scheduler received a response, doing nothing");
return;
}

failureCountSinceLastSuccess.set(0);
scheduleNextWakeUp(); // logs trace message indicating success
}
Expand All @@ -304,7 +305,6 @@ public void handleException(TransportException exp) {
logger.debug("closed check scheduler received a response, doing nothing");
return;
}

if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
logger.debug(new ParameterizedMessage("leader [{}] disconnected during check", leader), exp);
leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp));
Expand Down Expand Up @@ -355,6 +355,7 @@ public String executor() {

void leaderFailed(Exception e) {
if (isClosed.compareAndSet(false, true)) {
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.leaderCheckFailureCounter, 1.0);
transportService.getThreadPool().generic().execute(new Runnable() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.Coordinator;
import org.opensearch.cluster.coordination.ElectionStrategy;
Expand Down Expand Up @@ -133,7 +134,8 @@ public DiscoveryModule(
RerouteService rerouteService,
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService
RemoteStoreNodeService remoteStoreNodeService,
ClusterManagerMetrics clusterManagerMetrics
) {
final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
Expand Down Expand Up @@ -211,7 +213,8 @@ public DiscoveryModule(
electionStrategy,
nodeHealthService,
persistedStateRegistry,
remoteStoreNodeService
remoteStoreNodeService,
clusterManagerMetrics
);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
Expand Down
34 changes: 21 additions & 13 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterInfoService;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.ClusterModule;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -606,21 +607,10 @@ protected Node(
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class))
);

List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
clusterService.addStateApplier(scriptService);
resourcesToClose.add(clusterService);
final Set<Setting<?>> consistentSettings = settingsModule.getConsistentSettings();
if (consistentSettings.isEmpty() == false) {
clusterService.addLocalNodeMasterListener(
new ConsistentSettingsService(settings, clusterService, consistentSettings).newHashPublisher()
);
}

TracerFactory tracerFactory;
MetricsRegistryFactory metricsRegistryFactory;
if (FeatureFlags.isEnabled(TELEMETRY)) {
final TelemetrySettings telemetrySettings = new TelemetrySettings(settings, clusterService.getClusterSettings());
final TelemetrySettings telemetrySettings = new TelemetrySettings(settings, settingsModule.getClusterSettings());
if (telemetrySettings.isTracingFeatureEnabled() || telemetrySettings.isMetricsFeatureEnabled()) {
List<TelemetryPlugin> telemetryPlugins = pluginsService.filterPlugins(TelemetryPlugin.class);
List<TelemetryPlugin> telemetryPluginsImplementingTelemetryAware = telemetryPlugins.stream()
Expand Down Expand Up @@ -660,6 +650,23 @@ protected Node(
resourcesToClose.add(tracer::close);
resourcesToClose.add(metricsRegistry::close);

final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry);

List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
final ClusterService clusterService = new ClusterService(
settings,
settingsModule.getClusterSettings(),
threadPool
);
clusterService.addStateApplier(scriptService);
resourcesToClose.add(clusterService);
final Set<Setting<?>> consistentSettings = settingsModule.getConsistentSettings();
if (consistentSettings.isEmpty() == false) {
clusterService.addLocalNodeMasterListener(
new ConsistentSettingsService(settings, clusterService, consistentSettings).newHashPublisher()
);
}

final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
final UsageService usageService = new UsageService();

Expand Down Expand Up @@ -1186,7 +1193,8 @@ protected Node(
rerouteService,
fsHealthService,
persistedStateRegistry,
remoteStoreNodeService
remoteStoreNodeService,
clusterManagerMetrics
);
final SearchPipelineService searchPipelineService = new SearchPipelineService(
clusterService,
Expand Down
Loading

0 comments on commit 6d78faa

Please sign in to comment.