Skip to content

Commit

Permalink
Refactoring UTs to assert counters
Browse files Browse the repository at this point in the history
Signed-off-by: Harsh Garg <gkharsh@amazon.com>
  • Loading branch information
Harsh Garg committed Mar 15, 2024
1 parent 75185f9 commit bf79851
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti

private void initializeMetrics(MetricsRegistry metricsRegistry) {
this.followerChecksFailureCounter = metricsRegistry.createCounter(
"follower.checker.failure.count",
"followers.checker.failure.count",
"Counter for number of failed follower checks",
UNIT
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.core.transport.TransportResponse.Empty;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
Expand Down Expand Up @@ -77,8 +78,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.mockito.Mockito;

import static java.util.Collections.emptySet;
import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_ACTION_NAME;
import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING;
Expand All @@ -95,6 +94,12 @@
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class FollowersCheckerTests extends OpenSearchTestCase {

Expand Down Expand Up @@ -144,7 +149,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
assert false : node;
},
() -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"),
Mockito.mock(MetricsRegistry.class)
NoopMetricsRegistry.INSTANCE
);

followersChecker.setCurrentNodes(discoveryNodesHolder[0]);
Expand Down Expand Up @@ -313,7 +318,7 @@ public String toString() {
assertThat(reason, equalTo("disconnected"));
},
() -> new StatusInfo(HEALTHY, "healthy-info"),
Mockito.mock(MetricsRegistry.class)
NoopMetricsRegistry.INSTANCE
);

DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build();
Expand Down Expand Up @@ -393,6 +398,10 @@ public String toString() {

final AtomicBoolean nodeFailed = new AtomicBoolean();

final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
final Counter counter = mock(Counter.class);
when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenReturn(counter);

final FollowersChecker followersChecker = new FollowersChecker(
settings,
clusterSettings,
Expand All @@ -403,7 +412,7 @@ public String toString() {
assertThat(reason, equalTo(failureReason));
},
nodeHealthService,
NoopMetricsRegistry.INSTANCE
metricsRegistry
);

DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build();
Expand Down Expand Up @@ -449,6 +458,8 @@ public String toString() {
deterministicTaskQueue.runAllTasksInTimeOrder();
assertTrue(nodeFailed.get());
assertThat(followersChecker.getFaultyNodes(), contains(otherNode));

verify(counter, atLeastOnce()).add(anyDouble());
}

public void testFollowerCheckRequestEqualsHashCodeSerialization() {
Expand Down Expand Up @@ -508,11 +519,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
if (exception != null) {
throw exception;
}
},
(node, reason) -> { assert false : node; },
() -> new StatusInfo(UNHEALTHY, "unhealthy-info"),
Mockito.mock(MetricsRegistry.class)
);
}, (node, reason) -> { assert false : node; }, () -> new StatusInfo(UNHEALTHY, "unhealthy-info"), NoopMetricsRegistry.INSTANCE);

final long leaderTerm = randomLongBetween(2, Long.MAX_VALUE);
final long followerTerm = randomLongBetween(1, leaderTerm - 1);
Expand Down Expand Up @@ -585,7 +592,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
if (exception != null) {
throw exception;
}
}, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info"), Mockito.mock(MetricsRegistry.class));
}, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info"), NoopMetricsRegistry.INSTANCE);

{
// Does not call into the coordinator in the normal case
Expand Down Expand Up @@ -732,7 +739,7 @@ public void testPreferClusterManagerNodes() {
);
final FollowersChecker followersChecker = new FollowersChecker(Settings.EMPTY, clusterSettings, transportService, fcr -> {
assert false : fcr;
}, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info"), Mockito.mock(MetricsRegistry.class));
}, (node, reason) -> { assert false : node; }, () -> new StatusInfo(HEALTHY, "healthy-info"), NoopMetricsRegistry.INSTANCE);
followersChecker.setCurrentNodes(discoveryNodes);
List<DiscoveryNode> followerTargets = Stream.of(capturingTransport.getCapturedRequestsAndClear())
.map(cr -> cr.node)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.core.transport.TransportResponse.Empty;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.EqualsHashCodeTestUtils;
import org.opensearch.test.EqualsHashCodeTestUtils.CopyFunction;
Expand Down Expand Up @@ -79,6 +80,13 @@
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.matchesRegex;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

public class LeaderCheckerTests extends OpenSearchTestCase {

Expand Down Expand Up @@ -176,11 +184,14 @@ public String toString() {
transportService.acceptIncomingRequests();

final AtomicBoolean leaderFailed = new AtomicBoolean();
final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
final Counter leaderCheckFailedCounter = mock(Counter.class);
when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenReturn(leaderCheckFailedCounter);

final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> {
assertThat(e.getMessage(), matchesRegex("node \\[.*\\] failed \\[[1-9][0-9]*\\] consecutive checks"));
assertTrue(leaderFailed.compareAndSet(false, true));
}, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), NoopMetricsRegistry.INSTANCE);
}, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), metricsRegistry);

logger.info("--> creating first checker");
leaderChecker.updateLeader(leader1);
Expand Down Expand Up @@ -230,6 +241,8 @@ public String toString() {
);
}
leaderChecker.updateLeader(null);
verify(metricsRegistry, times(1)).createCounter(anyString(), anyString(), anyString());
verify(leaderCheckFailedCounter, times(1)).add(anyDouble());
}

enum Response {
Expand Down Expand Up @@ -294,10 +307,14 @@ public String toString() {
transportService.acceptIncomingRequests();

final AtomicBoolean leaderFailed = new AtomicBoolean();
final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
final Counter leaderCheckFailedCounter = mock(Counter.class);
when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenReturn(leaderCheckFailedCounter);

final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> {
assertThat(e.getMessage(), anyOf(endsWith("disconnected"), endsWith("disconnected during check")));
assertTrue(leaderFailed.compareAndSet(false, true));
}, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), NoopMetricsRegistry.INSTANCE);
}, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), metricsRegistry);

leaderChecker.updateLeader(leader);
{
Expand Down Expand Up @@ -352,6 +369,8 @@ public String toString() {
deterministicTaskQueue.runAllRunnableTasks();
assertTrue(leaderFailed.get());
}
verify(metricsRegistry, times(1)).createCounter(anyString(), anyString(), anyString());
verify(leaderCheckFailedCounter, times(2)).add(anyDouble());
}

public void testFollowerFailsImmediatelyOnHealthCheckFailure() {
Expand Down Expand Up @@ -408,10 +427,13 @@ public String toString() {
transportService.acceptIncomingRequests();

final AtomicBoolean leaderFailed = new AtomicBoolean();
final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
final Counter leaderChecksFailedCounter = mock(Counter.class);
when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenReturn(leaderChecksFailedCounter);
final LeaderChecker leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, e -> {
assertThat(e.getMessage(), endsWith("failed health checks"));
assertTrue(leaderFailed.compareAndSet(false, true));
}, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), NoopMetricsRegistry.INSTANCE);
}, () -> new StatusInfo(StatusInfo.Status.HEALTHY, "healthy-info"), metricsRegistry);

leaderChecker.updateLeader(leader);

Expand All @@ -431,6 +453,8 @@ public String toString() {

assertTrue(leaderFailed.get());
}
verify(metricsRegistry, times(1)).createCounter(anyString(), anyString(), anyString());
verify(leaderChecksFailedCounter, times(1)).add(anyDouble());
}

public void testLeaderBehaviour() {
Expand All @@ -454,13 +478,16 @@ public void testLeaderBehaviour() {
transportService.start();
transportService.acceptIncomingRequests();

final MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
final Counter leaderChecksFailedCounter = mock(Counter.class);
when(metricsRegistry.createCounter(anyString(), anyString(), anyString())).thenReturn(leaderChecksFailedCounter);
final LeaderChecker leaderChecker = new LeaderChecker(
settings,
clusterSettings,
transportService,
e -> fail("shouldn't be checking anything"),
() -> nodeHealthServiceStatus.get(),
NoopMetricsRegistry.INSTANCE
metricsRegistry
);

final DiscoveryNodes discoveryNodes = DiscoveryNodes.builder()
Expand Down Expand Up @@ -525,6 +552,8 @@ public void testLeaderBehaviour() {
equalTo("rejecting leader check from [" + otherNode + "] sent to a node that is no longer the cluster-manager")
);
}
verify(metricsRegistry, times(1)).createCounter(anyString(), anyString(), anyString());
verifyNoInteractions(leaderChecksFailedCounter);
}

private class CapturingTransportResponseHandler implements TransportResponseHandler<Empty> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
import org.opensearch.monitor.StatusInfo;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
Expand Down Expand Up @@ -272,7 +272,7 @@ protected void onSendRequest(
nodeHealthService,
persistedStateRegistry,
Mockito.mock(RemoteStoreNodeService.class),
Mockito.mock(MetricsRegistry.class)
NoopMetricsRegistry.INSTANCE
);
transportService.start();
transportService.acceptIncomingRequests();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.opensearch.gateway.GatewayMetaState;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.plugins.DiscoveryPlugin;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.MockTransportService;
Expand All @@ -67,8 +67,6 @@
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import org.mockito.Mockito;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -132,7 +130,7 @@ private DiscoveryModule newModule(Settings settings, List<DiscoveryPlugin> plugi
null,
new PersistedStateRegistry(),
remoteStoreNodeService,
Mockito.mock(MetricsRegistry.class)
NoopMetricsRegistry.INSTANCE
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@
import org.opensearch.search.query.QueryPhase;
import org.opensearch.snapshots.mockstore.MockEventuallyConsistentRepository;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
Expand Down Expand Up @@ -2533,7 +2532,7 @@ public void start(ClusterState initialState) {
() -> new StatusInfo(HEALTHY, "healthy-info"),
persistedStateRegistry,
remoteStoreNodeService,
Mockito.mock(MetricsRegistry.class)
NoopMetricsRegistry.INSTANCE
);
clusterManagerService.setClusterStatePublisher(coordinator);
coordinator.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
import org.opensearch.monitor.StatusInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.disruption.DisruptableMockTransport;
Expand Down Expand Up @@ -127,8 +127,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.mockito.Mockito;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
Expand Down Expand Up @@ -1177,7 +1175,7 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
nodeHealthService,
persistedStateRegistry,
remoteStoreNodeService,
Mockito.mock(MetricsRegistry.class)
NoopMetricsRegistry.INSTANCE
);
clusterManagerService.setClusterStatePublisher(coordinator);
final GatewayService gatewayService = new GatewayService(
Expand Down

0 comments on commit bf79851

Please sign in to comment.