Skip to content

Commit

Permalink
Adds counter metrics for leader and follower check failures
Browse files Browse the repository at this point in the history
  • Loading branch information
Harsh Garg committed Feb 23, 2024
1 parent bb0b4b0 commit 67c3ca7
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -207,7 +208,8 @@ public Coordinator(
ElectionStrategy electionStrategy,
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService
RemoteStoreNodeService remoteStoreNodeService,
MetricsRegistry metricsRegistry
) {
this.settings = settings;
this.transportService = transportService;
Expand Down Expand Up @@ -261,14 +263,22 @@ public Coordinator(
this::handlePublishRequest,
this::handleApplyCommit
);
this.leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, this::onLeaderFailure, nodeHealthService);
this.leaderChecker = new LeaderChecker(
settings,
clusterSettings,
transportService,
this::onLeaderFailure,
nodeHealthService,
metricsRegistry
);
this.followersChecker = new FollowersChecker(
settings,
clusterSettings,
transportService,
this::onFollowerCheckRequest,
this::removeNode,
nodeHealthService
nodeHealthService,
metricsRegistry
);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
this.clusterApplier = clusterApplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.opensearch.core.transport.TransportResponse.Empty;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.Transport;
Expand Down Expand Up @@ -112,6 +114,8 @@ public class FollowersChecker {
Setting.Property.NodeScope
);

private static final String UNIT = "1";

private final Settings settings;

private final TimeValue followerCheckInterval;
Expand All @@ -127,14 +131,16 @@ public class FollowersChecker {
private final TransportService transportService;
private final NodeHealthService nodeHealthService;
private volatile FastResponseState fastResponseState;
private Counter followerChecksFailureCounter;

public FollowersChecker(
Settings settings,
ClusterSettings clusterSettings,
TransportService transportService,
Consumer<FollowerCheckRequest> handleRequestAndUpdateState,
BiConsumer<DiscoveryNode, String> onNodeFailure,
NodeHealthService nodeHealthService
NodeHealthService nodeHealthService,
MetricsRegistry metricsRegistry
) {
this.settings = settings;
this.transportService = transportService;
Expand All @@ -161,6 +167,15 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
handleDisconnectedNode(node);
}
});
initializeMetrics(metricsRegistry);
}

private void initializeMetrics(MetricsRegistry metricsRegistry) {
this.followerChecksFailureCounter = metricsRegistry.createCounter(
"follower.checker.failure.count",
"Counter for number of failed follower checks",
UNIT
);
}

private void setFollowerCheckTimeout(TimeValue followerCheckTimeout) {
Expand Down Expand Up @@ -401,6 +416,7 @@ public void handleException(TransportException exp) {
return;
}

followerChecksFailureCounter.add(1);
failNode(reason);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.opensearch.core.transport.TransportResponse.Empty;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.NodeDisconnectedException;
Expand Down Expand Up @@ -111,6 +113,8 @@ public class LeaderChecker {
Setting.Property.NodeScope
);

private static final String UNIT = "1";

private final Settings settings;

private final TimeValue leaderCheckInterval;
Expand All @@ -119,17 +123,17 @@ public class LeaderChecker {
private final TransportService transportService;
private final Consumer<Exception> onLeaderFailure;
private final NodeHealthService nodeHealthService;

private AtomicReference<CheckScheduler> currentChecker = new AtomicReference<>();

private volatile DiscoveryNodes discoveryNodes;
private Counter leaderCheckFailureCounter;

LeaderChecker(
final Settings settings,
final ClusterSettings clusterSettings,
final TransportService transportService,
final Consumer<Exception> onLeaderFailure,
NodeHealthService nodeHealthService
NodeHealthService nodeHealthService,
final MetricsRegistry metricsRegistry
) {
this.settings = settings;
leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings);
Expand Down Expand Up @@ -158,6 +162,15 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
handleDisconnectedNode(node);
}
});
initializeMetrics(metricsRegistry);
}

private void initializeMetrics(MetricsRegistry metricsRegistry) {
this.leaderCheckFailureCounter = metricsRegistry.createCounter(
"leader.checker.failure.count",
"Counter for number of failed leader checks",
UNIT
);
}

private void setLeaderCheckTimeout(TimeValue leaderCheckTimeout) {
Expand Down Expand Up @@ -293,7 +306,6 @@ public void handleResponse(Empty response) {
logger.debug("closed check scheduler received a response, doing nothing");
return;
}

failureCountSinceLastSuccess.set(0);
scheduleNextWakeUp(); // logs trace message indicating success
}
Expand All @@ -304,14 +316,15 @@ public void handleException(TransportException exp) {
logger.debug("closed check scheduler received a response, doing nothing");
return;
}

if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
logger.debug(new ParameterizedMessage("leader [{}] disconnected during check", leader), exp);
leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp));
leaderCheckFailureCounter.add(1);
return;
} else if (exp.getCause() instanceof NodeHealthCheckFailureException) {
logger.debug(new ParameterizedMessage("leader [{}] health check failed", leader), exp);
leaderFailed(new NodeHealthCheckFailureException("node [" + leader + "] failed health checks", exp));
leaderCheckFailureCounter.add(1);
return;
}
long failureCount = failureCountSinceLastSuccess.incrementAndGet();
Expand All @@ -329,6 +342,7 @@ public void handleException(TransportException exp) {
leaderFailed(
new OpenSearchException("node [" + leader + "] failed [" + failureCount + "] consecutive checks", exp)
);
leaderCheckFailureCounter.add(1);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.plugins.DiscoveryPlugin;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -133,7 +134,8 @@ public DiscoveryModule(
RerouteService rerouteService,
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService
RemoteStoreNodeService remoteStoreNodeService,
MetricsRegistry metricsRegistry
) {
final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
Expand Down Expand Up @@ -211,7 +213,8 @@ public DiscoveryModule(
electionStrategy,
nodeHealthService,
persistedStateRegistry,
remoteStoreNodeService
remoteStoreNodeService,
metricsRegistry
);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,8 @@ protected Node(
rerouteService,
fsHealthService,
persistedStateRegistry,
remoteStoreNodeService
remoteStoreNodeService,
metricsRegistry
);
final SearchPipelineService searchPipelineService = new SearchPipelineService(
clusterService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.opensearch.core.transport.TransportResponse.Empty;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.EqualsHashCodeTestUtils;
import org.opensearch.test.EqualsHashCodeTestUtils.CopyFunction;
Expand Down Expand Up @@ -75,6 +77,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.mockito.Mockito;

import static java.util.Collections.emptySet;
import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_ACTION_NAME;
import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING;
Expand Down Expand Up @@ -139,7 +143,8 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
(node, reason) -> {
assert false : node;
},
() -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info")
() -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"),
Mockito.mock(MetricsRegistry.class)
);

followersChecker.setCurrentNodes(discoveryNodesHolder[0]);
Expand Down Expand Up @@ -307,7 +312,8 @@ public String toString() {
assertTrue(nodeFailed.compareAndSet(false, true));
assertThat(reason, equalTo("disconnected"));
},
() -> new StatusInfo(HEALTHY, "healthy-info")
() -> new StatusInfo(HEALTHY, "healthy-info"),
Mockito.mock(MetricsRegistry.class)
);

DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build();
Expand Down Expand Up @@ -396,7 +402,8 @@ public String toString() {
assertTrue(nodeFailed.compareAndSet(false, true));
assertThat(reason, equalTo(failureReason));
},
nodeHealthService
nodeHealthService,
NoopMetricsRegistry.INSTANCE
);

DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build();
Expand Down Expand Up @@ -501,7 +508,11 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
if (exception != null) {
throw exception;
}
}, (node, reason) -> { assert false : node; }, () -> new StatusInfo(UNHEALTHY, "unhealthy-info"));
},
(node, reason) -> { assert false : node; },
() -> new StatusInfo(UNHEALTHY, "unhealthy-info"),
Mockito.mock(MetricsRegistry.class)
);

final long leaderTerm = randomLongBetween(2, Long.MAX_VALUE);
final long followerTerm = randomLongBetween(1, leaderTerm - 1);
Expand Down Expand Up @@ -574,7 +585,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
if (exception != null) {
throw exception;
}
}, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info"));
}, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info"), Mockito.mock(MetricsRegistry.class));

{
// Does not call into the coordinator in the normal case
Expand Down Expand Up @@ -721,7 +732,7 @@ public void testPreferClusterManagerNodes() {
);
final FollowersChecker followersChecker = new FollowersChecker(Settings.EMPTY, clusterSettings, transportService, fcr -> {
assert false : fcr;
}, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info"));
}, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info"), Mockito.mock(MetricsRegistry.class));
followersChecker.setCurrentNodes(discoveryNodes);
List<DiscoveryNode> followerTargets = Stream.of(capturingTransport.getCapturedRequestsAndClear())
.map(cr -> cr.node)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.core.transport.TransportResponse.Empty;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.EqualsHashCodeTestUtils;
import org.opensearch.test.EqualsHashCodeTestUtils.CopyFunction;
Expand Down Expand Up @@ -179,7 +180,7 @@ public String toString() {
final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> {
assertThat(e.getMessage(), matchesRegex("node \\[.*\\] failed \\[[1-9][0-9]*\\] consecutive checks"));
assertTrue(leaderFailed.compareAndSet(false, true));
}, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"));
}, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), NoopMetricsRegistry.INSTANCE);

logger.info("--> creating first checker");
leaderChecker.updateLeader(leader1);
Expand Down Expand Up @@ -296,7 +297,7 @@ public String toString() {
final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> {
assertThat(e.getMessage(), anyOf(endsWith("disconnected"), endsWith("disconnected during check")));
assertTrue(leaderFailed.compareAndSet(false, true));
}, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"));
}, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), NoopMetricsRegistry.INSTANCE);

leaderChecker.updateLeader(leader);
{
Expand Down Expand Up @@ -410,7 +411,7 @@ public String toString() {
final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> {
assertThat(e.getMessage(), endsWith("failed health checks"));
assertTrue(leaderFailed.compareAndSet(false, true));
}, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"));
}, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), NoopMetricsRegistry.INSTANCE);

leaderChecker.updateLeader(leader);

Expand Down Expand Up @@ -458,7 +459,8 @@ public void testLeaderBehaviour() {
clusterSettings,
transportService,
e -> fail("shouldn't be checking anything"),
() -> nodeHealthServiceStatus.get()
() -> nodeHealthServiceStatus.get(),
NoopMetricsRegistry.INSTANCE
);

final DiscoveryNodes discoveryNodes = DiscoveryNodes.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.opensearch.monitor.StatusInfo;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
Expand Down Expand Up @@ -270,7 +271,8 @@ protected void onSendRequest(
ElectionStrategy.DEFAULT_INSTANCE,
nodeHealthService,
persistedStateRegistry,
Mockito.mock(RemoteStoreNodeService.class)
Mockito.mock(RemoteStoreNodeService.class),
Mockito.mock(MetricsRegistry.class)
);
transportService.start();
transportService.acceptIncomingRequests();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.gateway.GatewayMetaState;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.plugins.DiscoveryPlugin;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.MockTransportService;
Expand All @@ -66,6 +67,8 @@
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import org.mockito.Mockito;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -128,7 +131,8 @@ private DiscoveryModule newModule(Settings settings, List<DiscoveryPlugin> plugi
mock(RerouteService.class),
null,
new PersistedStateRegistry(),
remoteStoreNodeService
remoteStoreNodeService,
Mockito.mock(MetricsRegistry.class)
);
}

Expand Down
Loading

0 comments on commit 67c3ca7

Please sign in to comment.