Skip to content

Commit

Permalink
Adding TestMetricsRegistry for UTs
Browse files Browse the repository at this point in the history
Signed-off-by: Harsh Garg <gkharsh@amazon.com>
  • Loading branch information
Harsh Garg committed Mar 26, 2024
1 parent 63eba93 commit 4fd90ac
Show file tree
Hide file tree
Showing 15 changed files with 297 additions and 105 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.coordination;

import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

import java.util.Objects;
import java.util.Optional;

/**
* Class containing metrics (counters/latency) specific to ClusterManager.
*/
public final class ClusterManagerMetrics {

private static final String COUNTER_METRICS_UNIT = "1";

public final Counter leaderCheckFailureCounter;
public final Counter followerChecksFailureCounter;

public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
this.followerChecksFailureCounter = metricsRegistry.createCounter(
"followers.checker.failure.count",
"Counter for number of failed follower checks",
COUNTER_METRICS_UNIT
);
this.leaderCheckFailureCounter = metricsRegistry.createCounter(
"leader.checker.failure.count",
"Counter for number of failed leader checks",
COUNTER_METRICS_UNIT
);
}

public void incrementCounter(Counter counter, Double value) {
incrementCounter(counter, value, Optional.empty());
}

public void incrementCounter(Counter counter, Double value, Optional<Tags> tags) {
if (Objects.isNull(tags) || tags.isEmpty()) {
counter.add(value);
return;
}
counter.add(value, tags.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -209,7 +208,7 @@ public Coordinator(
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService,
MetricsRegistry metricsRegistry
ClusterManagerMetrics clusterManagerMetrics
) {
this.settings = settings;
this.transportService = transportService;
Expand Down Expand Up @@ -269,7 +268,7 @@ public Coordinator(
transportService,
this::onLeaderFailure,
nodeHealthService,
metricsRegistry
clusterManagerMetrics
);
this.followersChecker = new FollowersChecker(
settings,
Expand All @@ -278,7 +277,7 @@ public Coordinator(
this::onFollowerCheckRequest,
this::removeNode,
nodeHealthService,
metricsRegistry
clusterManagerMetrics
);
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
this.clusterApplier = clusterApplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@
import org.opensearch.core.transport.TransportResponse.Empty;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.Transport;
Expand Down Expand Up @@ -114,8 +112,6 @@ public class FollowersChecker {
Setting.Property.NodeScope
);

private static final String UNIT = "1";

private final Settings settings;

private final TimeValue followerCheckInterval;
Expand All @@ -131,7 +127,7 @@ public class FollowersChecker {
private final TransportService transportService;
private final NodeHealthService nodeHealthService;
private volatile FastResponseState fastResponseState;
private Counter followerChecksFailureCounter;
private ClusterManagerMetrics clusterManagerMetrics;

public FollowersChecker(
Settings settings,
Expand All @@ -140,7 +136,7 @@ public FollowersChecker(
Consumer<FollowerCheckRequest> handleRequestAndUpdateState,
BiConsumer<DiscoveryNode, String> onNodeFailure,
NodeHealthService nodeHealthService,
MetricsRegistry metricsRegistry
ClusterManagerMetrics clusterManagerMetrics
) {
this.settings = settings;
this.transportService = transportService;
Expand All @@ -167,15 +163,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
handleDisconnectedNode(node);
}
});
initializeMetrics(metricsRegistry);
}

private void initializeMetrics(MetricsRegistry metricsRegistry) {
this.followerChecksFailureCounter = metricsRegistry.createCounter(
"followers.checker.failure.count",
"Counter for number of failed follower checks",
UNIT
);
this.clusterManagerMetrics = clusterManagerMetrics;
}

private void setFollowerCheckTimeout(TimeValue followerCheckTimeout) {
Expand Down Expand Up @@ -416,7 +404,6 @@ public void handleException(TransportException exp) {
return;
}

followerChecksFailureCounter.add(1);
failNode(reason);
}

Expand All @@ -442,6 +429,7 @@ public void run() {
followerCheckers.remove(discoveryNode);
}
onNodeFailure.accept(discoveryNode, reason);
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.followerChecksFailureCounter, 1.0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
import org.opensearch.core.transport.TransportResponse.Empty;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.NodeDisconnectedException;
Expand Down Expand Up @@ -113,8 +111,6 @@ public class LeaderChecker {
Setting.Property.NodeScope
);

private static final String UNIT = "1";

private final Settings settings;

private final TimeValue leaderCheckInterval;
Expand All @@ -125,15 +121,15 @@ public class LeaderChecker {
private final NodeHealthService nodeHealthService;
private AtomicReference<CheckScheduler> currentChecker = new AtomicReference<>();
private volatile DiscoveryNodes discoveryNodes;
private Counter leaderCheckFailureCounter;
private final ClusterManagerMetrics clusterManagerMetrics;

LeaderChecker(
final Settings settings,
final ClusterSettings clusterSettings,
final TransportService transportService,
final Consumer<Exception> onLeaderFailure,
NodeHealthService nodeHealthService,
final MetricsRegistry metricsRegistry
final ClusterManagerMetrics clusterManagerMetrics
) {
this.settings = settings;
leaderCheckInterval = LEADER_CHECK_INTERVAL_SETTING.get(settings);
Expand All @@ -142,6 +138,7 @@ public class LeaderChecker {
this.transportService = transportService;
this.onLeaderFailure = onLeaderFailure;
this.nodeHealthService = nodeHealthService;
this.clusterManagerMetrics = clusterManagerMetrics;
clusterSettings.addSettingsUpdateConsumer(LEADER_CHECK_TIMEOUT_SETTING, this::setLeaderCheckTimeout);

transportService.registerRequestHandler(
Expand All @@ -162,15 +159,6 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
handleDisconnectedNode(node);
}
});
initializeMetrics(metricsRegistry);
}

private void initializeMetrics(MetricsRegistry metricsRegistry) {
this.leaderCheckFailureCounter = metricsRegistry.createCounter(
"leader.checker.failure.count",
"Counter for number of failed leader checks",
UNIT
);
}

private void setLeaderCheckTimeout(TimeValue leaderCheckTimeout) {
Expand Down Expand Up @@ -319,12 +307,10 @@ public void handleException(TransportException exp) {
if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
logger.debug(new ParameterizedMessage("leader [{}] disconnected during check", leader), exp);
leaderFailed(new ConnectTransportException(leader, "disconnected during check", exp));
leaderCheckFailureCounter.add(1);
return;
} else if (exp.getCause() instanceof NodeHealthCheckFailureException) {
logger.debug(new ParameterizedMessage("leader [{}] health check failed", leader), exp);
leaderFailed(new NodeHealthCheckFailureException("node [" + leader + "] failed health checks", exp));
leaderCheckFailureCounter.add(1);
return;
}
long failureCount = failureCountSinceLastSuccess.incrementAndGet();
Expand All @@ -342,7 +328,6 @@ public void handleException(TransportException exp) {
leaderFailed(
new OpenSearchException("node [" + leader + "] failed [" + failureCount + "] consecutive checks", exp)
);
leaderCheckFailureCounter.add(1);
return;
}

Expand Down Expand Up @@ -373,6 +358,7 @@ void leaderFailed(Exception e) {
@Override
public void run() {
onLeaderFailure.accept(e);
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.leaderCheckFailureCounter, 1.0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.ClusterManagerMetrics;
import org.opensearch.cluster.coordination.Coordinator;
import org.opensearch.cluster.coordination.ElectionStrategy;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
Expand All @@ -55,7 +56,6 @@
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.plugins.DiscoveryPlugin;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -135,7 +135,7 @@ public DiscoveryModule(
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService,
MetricsRegistry metricsRegistry
ClusterManagerMetrics clusterManagerMetrics
) {
final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
Expand Down Expand Up @@ -214,7 +214,7 @@ public DiscoveryModule(
nodeHealthService,
persistedStateRegistry,
remoteStoreNodeService,
metricsRegistry
clusterManagerMetrics
);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
Expand Down
4 changes: 3 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.cluster.InternalClusterInfoService;
import org.opensearch.cluster.NodeConnectionsService;
import org.opensearch.cluster.action.index.MappingUpdatedAction;
import org.opensearch.cluster.coordination.ClusterManagerMetrics;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.cluster.metadata.AliasValidator;
import org.opensearch.cluster.metadata.IndexTemplateMetadata;
Expand Down Expand Up @@ -644,6 +645,7 @@ protected Node(

final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
final UsageService usageService = new UsageService();
final ClusterManagerMetrics clusterManagerMetrics = new ClusterManagerMetrics(metricsRegistry);

ModulesBuilder modules = new ModulesBuilder();
// plugin modules must be added here, before others or we can get crazy injection errors...
Expand Down Expand Up @@ -1130,7 +1132,7 @@ protected Node(
fsHealthService,
persistedStateRegistry,
remoteStoreNodeService,
metricsRegistry
clusterManagerMetrics
);
final SearchPipelineService searchPipelineService = new SearchPipelineService(
clusterService,
Expand Down
Loading

0 comments on commit 4fd90ac

Please sign in to comment.