From fe18a67f025d645daf9399d84bb01c57768ea18a Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 17 Aug 2023 09:49:03 +0100 Subject: [PATCH] Make TransportAddVotingConfigExclusionsAction retryable (#98568) The docs for this API say the following: > If the API fails, you can safely retry it. Only a successful response > guarantees that the node has been removed from the voting > configuration and will not be reinstated. Unfortunately this isn't true today: if the request adds no exclusions then we do not wait before responding. This commit makes the API wait until all exclusions are really applied. Backport of #98386, plus the test changes from #98146 and #98356. --- ...nsportAddVotingConfigExclusionsAction.java | 33 +- ...tAddVotingConfigExclusionsActionTests.java | 378 ++++++++++-------- .../org/elasticsearch/test/ESTestCase.java | 10 + 3 files changed, 227 insertions(+), 194 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java index 722cdc7b8c983..61d6e543865ce 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java @@ -36,9 +36,9 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.HashSet; import java.util.Set; import java.util.function.Predicate; -import java.util.stream.Collectors; public class TransportAddVotingConfigExclusionsAction extends TransportMasterNodeAction< AddVotingConfigExclusionsRequest, @@ -99,13 +99,14 @@ protected void masterOperation( clusterService.submitStateUpdateTask("add-voting-config-exclusions", new ClusterStateUpdateTask(Priority.URGENT) { - private Set resolvedExclusions; - @Override public ClusterState execute(ClusterState currentState) { - assert resolvedExclusions == null : resolvedExclusions; final int finalMaxVotingConfigExclusions = TransportAddVotingConfigExclusionsAction.this.maxVotingConfigExclusions; - resolvedExclusions = resolveVotingConfigExclusionsAndCheckMaximum(request, currentState, finalMaxVotingConfigExclusions); + final Set resolvedExclusions = resolveVotingConfigExclusionsAndCheckMaximum( + request, + currentState, + finalMaxVotingConfigExclusions + ); final CoordinationMetadata.Builder builder = CoordinationMetadata.builder(currentState.coordinationMetadata()); resolvedExclusions.forEach(builder::addVotingConfigExclusion); @@ -130,13 +131,13 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS threadPool.getThreadContext() ); - final Set excludedNodeIds = resolvedExclusions.stream() - .map(VotingConfigExclusion::getNodeId) - .collect(Collectors.toSet()); - final Predicate allNodesRemoved = clusterState -> { - final Set votingConfigNodeIds = clusterState.getLastCommittedConfiguration().getNodeIds(); - return excludedNodeIds.stream().noneMatch(votingConfigNodeIds::contains); + final Set votingConfigNodeIds = new HashSet<>(); + votingConfigNodeIds.addAll(clusterState.getLastCommittedConfiguration().getNodeIds()); + votingConfigNodeIds.addAll(clusterState.getLastAcceptedConfiguration().getNodeIds()); + return clusterState.getVotingConfigExclusions() + .stream() + .noneMatch(votingConfigExclusion -> votingConfigNodeIds.contains(votingConfigExclusion.getNodeId())); }; final Listener clusterStateListener = new Listener() { @@ -148,20 +149,14 @@ public void onNewClusterState(ClusterState state) { @Override public void onClusterServiceClose() { listener.onFailure( - new ElasticsearchException( - "cluster service closed while waiting for voting config exclusions " - + resolvedExclusions - + " to take effect" - ) + new ElasticsearchException("cluster service closed while waiting for voting config exclusions to take effect") ); } @Override public void onTimeout(TimeValue timeout) { listener.onFailure( - new ElasticsearchTimeoutException( - "timed out waiting for voting config exclusions " + resolvedExclusions + " to take effect" - ) + new ElasticsearchTimeoutException("timed out waiting for voting config exclusions to take effect") ); } }; diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsActionTests.java index b25b073116b20..c9b5a0c8e2399 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsActionTests.java @@ -7,7 +7,6 @@ */ package org.elasticsearch.action.admin.cluster.configuration; -import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionResponse; @@ -25,6 +24,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes.Builder; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.ClusterSettings; @@ -45,7 +45,6 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import static java.util.Collections.emptyMap; @@ -58,9 +57,10 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.sameInstance; -import static org.hamcrest.Matchers.startsWith; public class TransportAddVotingConfigExclusionsActionTests extends ESTestCase { @@ -165,109 +165,112 @@ public void setupForTest() { clusterStateObserver = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); } - public void testWithdrawsVoteFromANode() throws InterruptedException { - final CountDownLatch countDownLatch = new CountDownLatch(2); + private static void assertAllExclusionsApplied(ClusterState clusterState) { + final VotingConfiguration lastAcceptedConfiguration = clusterState.coordinationMetadata().getLastAcceptedConfiguration(); + final VotingConfiguration lastCommittedConfiguration = clusterState.coordinationMetadata().getLastCommittedConfiguration(); + for (final VotingConfigExclusion votingConfigExclusion : clusterState.getVotingConfigExclusions()) { + assertThat(lastAcceptedConfiguration.getNodeIds(), not(hasItem(votingConfigExclusion.getNodeId()))); + assertThat(lastCommittedConfiguration.getNodeIds(), not(hasItem(votingConfigExclusion.getNodeId()))); + } + } - clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions(countDownLatch)); + public void testWithdrawsVoteFromANode() { + final CountDownLatch countDownLatch = new CountDownLatch(1); + clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions()); transportService.sendRequest( localNode, AddVotingConfigExclusionsAction.NAME, new AddVotingConfigExclusionsRequest("other1"), expectSuccess(r -> { assertNotNull(r); + final ClusterState state = clusterService.getClusterApplierService().state(); + assertThat(state.getVotingConfigExclusions(), contains(otherNode1Exclusion)); + assertAllExclusionsApplied(state); countDownLatch.countDown(); }) ); - - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat(clusterService.getClusterApplierService().state().getVotingConfigExclusions(), contains(otherNode1Exclusion)); + safeAwait(countDownLatch); } - public void testWithdrawsVotesFromMultipleNodes() throws InterruptedException { - final CountDownLatch countDownLatch = new CountDownLatch(2); - - clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions(countDownLatch)); + public void testWithdrawsVotesFromMultipleNodes() { + final CountDownLatch countDownLatch = new CountDownLatch(1); + clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions()); transportService.sendRequest( localNode, AddVotingConfigExclusionsAction.NAME, new AddVotingConfigExclusionsRequest("other1", "other2"), expectSuccess(r -> { assertNotNull(r); + final ClusterState state = clusterService.getClusterApplierService().state(); + assertThat(state.getVotingConfigExclusions(), containsInAnyOrder(otherNode1Exclusion, otherNode2Exclusion)); + assertAllExclusionsApplied(state); countDownLatch.countDown(); }) ); - - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat( - clusterService.getClusterApplierService().state().getVotingConfigExclusions(), - containsInAnyOrder(otherNode1Exclusion, otherNode2Exclusion) - ); + safeAwait(countDownLatch); } - public void testWithdrawsVotesFromNodesMatchingWildcard() throws InterruptedException { - final CountDownLatch countDownLatch = new CountDownLatch(2); - - clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions(countDownLatch)); + public void testWithdrawsVotesFromNodesMatchingWildcard() { + final CountDownLatch countDownLatch = new CountDownLatch(1); + clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions()); transportService.sendRequest( localNode, AddVotingConfigExclusionsAction.NAME, makeRequestWithNodeDescriptions("other*"), expectSuccess(r -> { assertNotNull(r); + final ClusterState state = clusterService.getClusterApplierService().state(); + assertThat(state.getVotingConfigExclusions(), containsInAnyOrder(otherNode1Exclusion, otherNode2Exclusion)); + assertAllExclusionsApplied(state); countDownLatch.countDown(); }) ); - - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat( - clusterService.getClusterApplierService().state().getVotingConfigExclusions(), - containsInAnyOrder(otherNode1Exclusion, otherNode2Exclusion) - ); + safeAwait(countDownLatch); assertWarnings(AddVotingConfigExclusionsRequest.DEPRECATION_MESSAGE); } - public void testWithdrawsVotesFromAllMasterEligibleNodes() throws InterruptedException { - final CountDownLatch countDownLatch = new CountDownLatch(2); - - clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions(countDownLatch)); + public void testWithdrawsVotesFromAllMasterEligibleNodes() { + final CountDownLatch countDownLatch = new CountDownLatch(1); + clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions()); transportService.sendRequest( localNode, AddVotingConfigExclusionsAction.NAME, makeRequestWithNodeDescriptions("_all"), expectSuccess(r -> { assertNotNull(r); + final ClusterState state = clusterService.getClusterApplierService().state(); + assertThat( + state.getVotingConfigExclusions(), + containsInAnyOrder(localNodeExclusion, otherNode1Exclusion, otherNode2Exclusion) + ); + assertAllExclusionsApplied(state); countDownLatch.countDown(); }) ); - - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat( - clusterService.getClusterApplierService().state().getVotingConfigExclusions(), - containsInAnyOrder(localNodeExclusion, otherNode1Exclusion, otherNode2Exclusion) - ); + safeAwait(countDownLatch); assertWarnings(AddVotingConfigExclusionsRequest.DEPRECATION_MESSAGE); } - public void testWithdrawsVoteFromLocalNode() throws InterruptedException { - final CountDownLatch countDownLatch = new CountDownLatch(2); - - clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions(countDownLatch)); + public void testWithdrawsVoteFromLocalNode() { + final CountDownLatch countDownLatch = new CountDownLatch(1); + clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions()); transportService.sendRequest( localNode, AddVotingConfigExclusionsAction.NAME, makeRequestWithNodeDescriptions("_local"), expectSuccess(r -> { assertNotNull(r); + final ClusterState state = clusterService.getClusterApplierService().state(); + assertThat(state.getVotingConfigExclusions(), contains(localNodeExclusion)); + assertAllExclusionsApplied(state); countDownLatch.countDown(); }) ); - - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat(clusterService.getClusterApplierService().state().getVotingConfigExclusions(), contains(localNodeExclusion)); + safeAwait(countDownLatch); assertWarnings(AddVotingConfigExclusionsRequest.DEPRECATION_MESSAGE); } - public void testReturnsImmediatelyIfVoteAlreadyWithdrawn() throws InterruptedException { + public void testReturnsImmediatelyIfVoteAlreadyWithdrawn() { final ClusterState state = clusterService.state(); setState( clusterService, @@ -291,66 +294,57 @@ public void testReturnsImmediatelyIfVoteAlreadyWithdrawn() throws InterruptedExc new AddVotingConfigExclusionsRequest("other1"), expectSuccess(r -> { assertNotNull(r); + final ClusterState finalState = clusterService.getClusterApplierService().state(); + assertThat(finalState.getVotingConfigExclusions(), contains(otherNode1Exclusion)); + assertAllExclusionsApplied(finalState); countDownLatch.countDown(); }) ); - - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat(clusterService.getClusterApplierService().state().getVotingConfigExclusions(), contains(otherNode1Exclusion)); + safeAwait(countDownLatch); } - public void testReturnsErrorIfNoMatchingNodeDescriptions() throws InterruptedException { + public void testReturnsErrorIfNoMatchingNodeDescriptions() { final CountDownLatch countDownLatch = new CountDownLatch(1); - final SetOnce exceptionHolder = new SetOnce<>(); - transportService.sendRequest( localNode, AddVotingConfigExclusionsAction.NAME, makeRequestWithNodeDescriptions("not-a-node"), expectError(e -> { - exceptionHolder.set(e); + final Throwable rootCause = e.getRootCause(); + assertThat(rootCause, instanceOf(IllegalArgumentException.class)); + assertThat( + rootCause.getMessage(), + equalTo("add voting config exclusions request for [not-a-node] matched no master-eligible nodes") + ); countDownLatch.countDown(); }) ); - - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - final Throwable rootCause = exceptionHolder.get().getRootCause(); - assertThat(rootCause, instanceOf(IllegalArgumentException.class)); - assertThat( - rootCause.getMessage(), - equalTo("add voting config exclusions request for [not-a-node] matched no master-eligible nodes") - ); + safeAwait(countDownLatch); assertWarnings(AddVotingConfigExclusionsRequest.DEPRECATION_MESSAGE); } - public void testOnlyMatchesMasterEligibleNodes() throws InterruptedException { + public void testOnlyMatchesMasterEligibleNodes() { final CountDownLatch countDownLatch = new CountDownLatch(1); - final SetOnce exceptionHolder = new SetOnce<>(); - transportService.sendRequest( localNode, AddVotingConfigExclusionsAction.NAME, makeRequestWithNodeDescriptions("_all", "master:false"), expectError(e -> { - exceptionHolder.set(e); + final Throwable rootCause = e.getRootCause(); + assertThat(rootCause, instanceOf(IllegalArgumentException.class)); + assertThat( + rootCause.getMessage(), + equalTo("add voting config exclusions request for [_all, master:false] matched no master-eligible nodes") + ); countDownLatch.countDown(); }) ); - - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - final Throwable rootCause = exceptionHolder.get().getRootCause(); - assertThat(rootCause, instanceOf(IllegalArgumentException.class)); - assertThat( - rootCause.getMessage(), - equalTo("add voting config exclusions request for [_all, master:false] matched no master-eligible nodes") - ); + safeAwait(countDownLatch); assertWarnings(AddVotingConfigExclusionsRequest.DEPRECATION_MESSAGE); } - public void testExcludeAbsentNodesByNodeIds() throws InterruptedException { - final CountDownLatch countDownLatch = new CountDownLatch(2); - - clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions(countDownLatch)); + public void testExcludeAbsentNodesByNodeIds() { + final CountDownLatch countDownLatch = new CountDownLatch(1); transportService.sendRequest( localNode, AddVotingConfigExclusionsAction.NAME, @@ -360,20 +354,22 @@ public void testExcludeAbsentNodesByNodeIds() throws InterruptedException { Strings.EMPTY_ARRAY, TimeValue.timeValueSeconds(30) ), - expectSuccess(e -> { countDownLatch.countDown(); }) - ); - - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertEquals( - singleton(new VotingConfigExclusion("absent_id", VotingConfigExclusion.MISSING_VALUE_MARKER)), - clusterService.getClusterApplierService().state().getVotingConfigExclusions() + expectSuccess(r -> { + final ClusterState state = clusterService.getClusterApplierService().state(); + assertEquals( + singleton(new VotingConfigExclusion("absent_id", VotingConfigExclusion.MISSING_VALUE_MARKER)), + state.getVotingConfigExclusions() + ); + assertAllExclusionsApplied(state); + countDownLatch.countDown(); + }) ); + safeAwait(countDownLatch); } - public void testExcludeExistingNodesByNodeIds() throws InterruptedException { - final CountDownLatch countDownLatch = new CountDownLatch(2); - - clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions(countDownLatch)); + public void testExcludeExistingNodesByNodeIds() { + final CountDownLatch countDownLatch = new CountDownLatch(1); + clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions()); transportService.sendRequest( localNode, AddVotingConfigExclusionsAction.NAME, @@ -385,63 +381,99 @@ public void testExcludeExistingNodesByNodeIds() throws InterruptedException { ), expectSuccess(r -> { assertNotNull(r); + final ClusterState state = clusterService.getClusterApplierService().state(); + assertThat(state.getVotingConfigExclusions(), containsInAnyOrder(otherNode1Exclusion, otherNode2Exclusion)); + assertAllExclusionsApplied(state); countDownLatch.countDown(); }) ); - - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat( - clusterService.getClusterApplierService().state().getVotingConfigExclusions(), - containsInAnyOrder(otherNode1Exclusion, otherNode2Exclusion) - ); + safeAwait(countDownLatch); } - public void testExcludeAbsentNodesByNodeNames() throws InterruptedException { - final CountDownLatch countDownLatch = new CountDownLatch(2); - - clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions(countDownLatch)); + public void testExcludeAbsentNodesByNodeNames() { + final CountDownLatch countDownLatch = new CountDownLatch(1); transportService.sendRequest( localNode, AddVotingConfigExclusionsAction.NAME, new AddVotingConfigExclusionsRequest("absent_node"), - expectSuccess(e -> { countDownLatch.countDown(); }) - ); - - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertEquals( - singleton(new VotingConfigExclusion(VotingConfigExclusion.MISSING_VALUE_MARKER, "absent_node")), - clusterService.getClusterApplierService().state().getVotingConfigExclusions() + expectSuccess(r -> { + final ClusterState state = clusterService.getClusterApplierService().state(); + assertEquals( + singleton(new VotingConfigExclusion(VotingConfigExclusion.MISSING_VALUE_MARKER, "absent_node")), + state.getVotingConfigExclusions() + ); + assertAllExclusionsApplied(state); + countDownLatch.countDown(); + }) ); + safeAwait(countDownLatch); } - public void testExcludeExistingNodesByNodeNames() throws InterruptedException { - final CountDownLatch countDownLatch = new CountDownLatch(2); - - clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions(countDownLatch)); + public void testExcludeExistingNodesByNodeNames() { + final CountDownLatch countDownLatch = new CountDownLatch(1); + clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions()); transportService.sendRequest( localNode, AddVotingConfigExclusionsAction.NAME, new AddVotingConfigExclusionsRequest("other1", "other2"), expectSuccess(r -> { assertNotNull(r); + final ClusterState state = clusterService.getClusterApplierService().state(); + assertThat(state.getVotingConfigExclusions(), containsInAnyOrder(otherNode1Exclusion, otherNode2Exclusion)); + assertAllExclusionsApplied(state); countDownLatch.countDown(); }) ); + safeAwait(countDownLatch); + } - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat( - clusterService.getClusterApplierService().state().getVotingConfigExclusions(), - containsInAnyOrder(otherNode1Exclusion, otherNode2Exclusion) + public void testTriggersReconfigurationEvenIfAllExclusionsAlreadyAddedButStillInConfiguration() { + final ClusterState state = clusterService.state(); + final ClusterState.Builder builder = builder(state); + builder.metadata( + Metadata.builder(state.metadata()) + .coordinationMetadata( + CoordinationMetadata.builder(state.coordinationMetadata()).addVotingConfigExclusion(otherNode1Exclusion).build() + ) + ); + setState(clusterService, builder); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions()); + transportService.sendRequest( + localNode, + AddVotingConfigExclusionsAction.NAME, + randomFrom( + new AddVotingConfigExclusionsRequest("other1"), + new AddVotingConfigExclusionsRequest( + Strings.EMPTY_ARRAY, + new String[] { "other1" }, + Strings.EMPTY_ARRAY, + TimeValue.timeValueSeconds(30) + ) + ), + expectSuccess(r -> { + assertNotNull(r); + final ClusterState finalState = clusterService.getClusterApplierService().state(); + assertThat(finalState.getVotingConfigExclusions(), contains(otherNode1Exclusion)); + assertAllExclusionsApplied(finalState); + countDownLatch.countDown(); + }) ); + safeAwait(countDownLatch); } - public void testSucceedsEvenIfAllExclusionsAlreadyAdded() throws InterruptedException { + public void testExcludeByNodeDescriptionSucceedsEvenIfAllExclusionsAlreadyAdded() { final ClusterState state = clusterService.state(); final ClusterState.Builder builder = builder(state); builder.metadata( Metadata.builder(state.metadata()) .coordinationMetadata( - CoordinationMetadata.builder(state.coordinationMetadata()).addVotingConfigExclusion(otherNode1Exclusion).build() + CoordinationMetadata.builder(state.coordinationMetadata()) + .lastCommittedConfiguration(VotingConfiguration.of(localNode, otherNode2)) + .lastAcceptedConfiguration(VotingConfiguration.of(localNode, otherNode2)) + .addVotingConfigExclusion(otherNode1Exclusion) + .build() ) ); setState(clusterService, builder); @@ -454,21 +486,26 @@ public void testSucceedsEvenIfAllExclusionsAlreadyAdded() throws InterruptedExce new AddVotingConfigExclusionsRequest("other1"), expectSuccess(r -> { assertNotNull(r); + final ClusterState finalState = clusterService.getClusterApplierService().state(); + assertThat(finalState.getVotingConfigExclusions(), contains(otherNode1Exclusion)); + assertAllExclusionsApplied(finalState); countDownLatch.countDown(); }) ); - - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat(clusterService.getClusterApplierService().state().getVotingConfigExclusions(), contains(otherNode1Exclusion)); + safeAwait(countDownLatch); } - public void testExcludeByNodeIdSucceedsEvenIfAllExclusionsAlreadyAdded() throws InterruptedException { + public void testExcludeByNodeIdSucceedsEvenIfAllExclusionsAlreadyAdded() { final ClusterState state = clusterService.state(); final ClusterState.Builder builder = builder(state); builder.metadata( Metadata.builder(state.metadata()) .coordinationMetadata( - CoordinationMetadata.builder(state.coordinationMetadata()).addVotingConfigExclusion(otherNode1Exclusion).build() + CoordinationMetadata.builder(state.coordinationMetadata()) + .lastCommittedConfiguration(VotingConfiguration.of(localNode, otherNode2)) + .lastAcceptedConfiguration(VotingConfiguration.of(localNode, otherNode2)) + .addVotingConfigExclusion(otherNode1Exclusion) + .build() ) ); setState(clusterService, builder); @@ -486,21 +523,26 @@ public void testExcludeByNodeIdSucceedsEvenIfAllExclusionsAlreadyAdded() throws ), expectSuccess(r -> { assertNotNull(r); + final ClusterState finalState = clusterService.getClusterApplierService().state(); + assertThat(finalState.getVotingConfigExclusions(), contains(otherNode1Exclusion)); + assertAllExclusionsApplied(finalState); countDownLatch.countDown(); }) ); - - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat(clusterService.getClusterApplierService().state().getVotingConfigExclusions(), contains(otherNode1Exclusion)); + safeAwait(countDownLatch); } - public void testExcludeByNodeNameSucceedsEvenIfAllExclusionsAlreadyAdded() throws InterruptedException { + public void testExcludeByNodeNameSucceedsEvenIfAllExclusionsAlreadyAdded() { final ClusterState state = clusterService.state(); final ClusterState.Builder builder = builder(state); builder.metadata( Metadata.builder(state.metadata()) .coordinationMetadata( - CoordinationMetadata.builder(state.coordinationMetadata()).addVotingConfigExclusion(otherNode1Exclusion).build() + CoordinationMetadata.builder(state.coordinationMetadata()) + .lastCommittedConfiguration(VotingConfiguration.of(localNode, otherNode2)) + .lastAcceptedConfiguration(VotingConfiguration.of(localNode, otherNode2)) + .addVotingConfigExclusion(otherNode1Exclusion) + .build() ) ); setState(clusterService, builder); @@ -513,15 +555,17 @@ public void testExcludeByNodeNameSucceedsEvenIfAllExclusionsAlreadyAdded() throw new AddVotingConfigExclusionsRequest("other1"), expectSuccess(r -> { assertNotNull(r); + final ClusterState finalState = clusterService.getClusterApplierService().state(); + assertThat(finalState.getVotingConfigExclusions(), contains(otherNode1Exclusion)); + assertAllExclusionsApplied(finalState); countDownLatch.countDown(); }) ); - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat(clusterService.getClusterApplierService().state().getVotingConfigExclusions(), contains(otherNode1Exclusion)); + safeAwait(countDownLatch); } - public void testReturnsErrorIfMaximumExclusionCountExceeded() throws InterruptedException { + public void testReturnsErrorIfMaximumExclusionCountExceeded() { final Metadata.Builder metadataBuilder = Metadata.builder(clusterService.state().metadata()); CoordinationMetadata.Builder coordinationMetadataBuilder = CoordinationMetadata.builder( clusterService.state().coordinationMetadata() @@ -562,40 +606,35 @@ public void testReturnsErrorIfMaximumExclusionCountExceeded() throws Interrupted setState(clusterService, builder); final CountDownLatch countDownLatch = new CountDownLatch(1); - final SetOnce exceptionHolder = new SetOnce<>(); - transportService.sendRequest( localNode, AddVotingConfigExclusionsAction.NAME, makeRequestWithNodeDescriptions("other*"), expectError(e -> { - exceptionHolder.set(e); + final Throwable rootCause = e.getRootCause(); + assertThat(rootCause, instanceOf(IllegalArgumentException.class)); + assertThat( + rootCause.getMessage(), + equalTo( + "add voting config exclusions request for [other*] would add [" + + newCount + + "] exclusions to the existing [" + + existingCount + + "] which would exceed the maximum of [" + + actualMaximum + + "] set by [cluster.max_voting_config_exclusions]" + ) + ); countDownLatch.countDown(); }) ); - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - final Throwable rootCause = exceptionHolder.get().getRootCause(); - assertThat(rootCause, instanceOf(IllegalArgumentException.class)); - assertThat( - rootCause.getMessage(), - equalTo( - "add voting config exclusions request for [other*] would add [" - + newCount - + "] exclusions to the existing [" - + existingCount - + "] which would exceed the maximum of [" - + actualMaximum - + "] set by [cluster.max_voting_config_exclusions]" - ) - ); + safeAwait(countDownLatch); assertWarnings(AddVotingConfigExclusionsRequest.DEPRECATION_MESSAGE); } - public void testTimesOut() throws InterruptedException { + public void testTimesOut() { final CountDownLatch countDownLatch = new CountDownLatch(1); - final SetOnce exceptionHolder = new SetOnce<>(); - transportService.sendRequest( localNode, AddVotingConfigExclusionsAction.NAME, @@ -606,17 +645,15 @@ public void testTimesOut() throws InterruptedException { TimeValue.timeValueMillis(100) ), expectError(e -> { - exceptionHolder.set(e); + final Throwable rootCause = e.getRootCause(); + assertThat(rootCause, instanceOf(ElasticsearchTimeoutException.class)); + assertThat(rootCause.getMessage(), equalTo("timed out waiting for voting config exclusions to take effect")); countDownLatch.countDown(); }) ); - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - final Throwable rootCause = exceptionHolder.get().getRootCause(); - assertThat(rootCause, instanceOf(ElasticsearchTimeoutException.class)); - assertThat(rootCause.getMessage(), startsWith("timed out waiting for voting config exclusions [{other1}")); + safeAwait(countDownLatch); assertWarnings(AddVotingConfigExclusionsRequest.DEPRECATION_MESSAGE); - } private TransportResponseHandler expectSuccess(Consumer onResponse) { @@ -632,6 +669,11 @@ private TransportResponseHandler responseHandler( Consumer onException ) { return new TransportResponseHandler() { + @Override + public ActionResponse.Empty read(StreamInput in) { + return ActionResponse.Empty.INSTANCE; + } + @Override public void handleResponse(ActionResponse.Empty response) { onResponse.accept(response); @@ -641,28 +683,19 @@ public void handleResponse(ActionResponse.Empty response) { public void handleException(TransportException exp) { onException.accept(exp); } - - @Override - public ActionResponse.Empty read(StreamInput in) { - return ActionResponse.Empty.INSTANCE; - } }; } private static class AdjustConfigurationForExclusions implements Listener { - - final CountDownLatch doneLatch; - - AdjustConfigurationForExclusions(CountDownLatch latch) { - this.doneLatch = latch; - } - @Override public void onNewClusterState(ClusterState state) { - clusterService.getMasterService().submitStateUpdateTask("reconfiguration", new ClusterStateUpdateTask() { + final Priority prio = randomFrom(Priority.values()); + clusterService.getMasterService().submitStateUpdateTask("reconfiguration", new ClusterStateUpdateTask(prio) { @Override public ClusterState execute(ClusterState currentState) { - assertThat(currentState, sameInstance(state)); + if (prio.compareTo(Priority.URGENT) <= 0) { + assertThat(currentState, sameInstance(state)); + } final Set votingNodeIds = new HashSet<>(); currentState.nodes().forEach(n -> votingNodeIds.add(n.getId())); currentState.getVotingConfigExclusions().forEach(t -> votingNodeIds.remove(t.getNodeId())); @@ -682,11 +715,6 @@ public ClusterState execute(ClusterState currentState) { public void onFailure(String source, Exception e) { throw new AssertionError("unexpected failure", e); } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - doneLatch.countDown(); - } }); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 27496ee6edc68..68d95ca398e30 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -139,6 +139,7 @@ import java.util.Set; import java.util.TimeZone; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -1836,4 +1837,13 @@ public static void safeAwait(CyclicBarrier barrier) { throw new AssertionError("unexpected", e); } } + + public static void safeAwait(CountDownLatch countDownLatch) { + try { + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError("unexpected", e); + } + } }