Skip to content

Commit

Permalink
Make TransportAddVotingConfigExclusionsAction retryable (#98386)
Browse files Browse the repository at this point in the history
The docs for this API say the following:

> If the API fails, you can safely retry it. Only a successful response
> guarantees that the node has been removed from the voting
> configuration and will not be reinstated.

Unfortunately this isn't true today: if the request adds no exclusions
then we do not wait before responding. This commit makes the API wait
until all exclusions are really applied.
  • Loading branch information
DaveCTurner authored Aug 14, 2023
1 parent 1640e2f commit 1fef466
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 35 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/98386.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 98386
summary: Make `TransportAddVotingConfigExclusionsAction` retryable
area: Cluster Coordination
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.HashSet;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

public class TransportAddVotingConfigExclusionsAction extends TransportMasterNodeAction<
AddVotingConfigExclusionsRequest,
Expand Down Expand Up @@ -107,13 +107,14 @@ protected void masterOperation(

submitUnbatchedTask("add-voting-config-exclusions", new ClusterStateUpdateTask(Priority.URGENT) {

private Set<VotingConfigExclusion> resolvedExclusions;

@Override
public ClusterState execute(ClusterState currentState) {
assert resolvedExclusions == null : resolvedExclusions;
final int finalMaxVotingConfigExclusions = TransportAddVotingConfigExclusionsAction.this.maxVotingConfigExclusions;
resolvedExclusions = resolveVotingConfigExclusionsAndCheckMaximum(request, currentState, finalMaxVotingConfigExclusions);
final var resolvedExclusions = resolveVotingConfigExclusionsAndCheckMaximum(
request,
currentState,
finalMaxVotingConfigExclusions
);

final CoordinationMetadata.Builder builder = CoordinationMetadata.builder(currentState.coordinationMetadata());
resolvedExclusions.forEach(builder::addVotingConfigExclusion);
Expand All @@ -138,13 +139,13 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
threadPool.getThreadContext()
);

final Set<String> excludedNodeIds = resolvedExclusions.stream()
.map(VotingConfigExclusion::getNodeId)
.collect(Collectors.toSet());

final Predicate<ClusterState> allNodesRemoved = clusterState -> {
final Set<String> votingConfigNodeIds = clusterState.getLastCommittedConfiguration().getNodeIds();
return excludedNodeIds.stream().noneMatch(votingConfigNodeIds::contains);
final Set<String> votingConfigNodeIds = new HashSet<>();
votingConfigNodeIds.addAll(clusterState.getLastCommittedConfiguration().getNodeIds());
votingConfigNodeIds.addAll(clusterState.getLastAcceptedConfiguration().getNodeIds());
return clusterState.getVotingConfigExclusions()
.stream()
.noneMatch(votingConfigExclusion -> votingConfigNodeIds.contains(votingConfigExclusion.getNodeId()));
};

final Listener clusterStateListener = new Listener() {
Expand All @@ -156,20 +157,14 @@ public void onNewClusterState(ClusterState state) {
@Override
public void onClusterServiceClose() {
listener.onFailure(
new ElasticsearchException(
"cluster service closed while waiting for voting config exclusions "
+ resolvedExclusions
+ " to take effect"
)
new ElasticsearchException("cluster service closed while waiting for voting config exclusions to take effect")
);
}

@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(
new ElasticsearchTimeoutException(
"timed out waiting for voting config exclusions " + resolvedExclusions + " to take effect"
)
new ElasticsearchTimeoutException("timed out waiting for voting config exclusions to take effect")
);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.DiscoveryNodes.Builder;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -245,7 +246,7 @@ public void testExcludeAbsentNodesByNodeIds() {
localNode,
AddVotingConfigExclusionsAction.NAME,
new AddVotingConfigExclusionsRequest(new String[] { "absent_id" }, Strings.EMPTY_ARRAY, TimeValue.timeValueSeconds(30)),
expectSuccess(e -> {
expectSuccess(r -> {
final var state = clusterService.getClusterApplierService().state();
assertEquals(
Set.of(new VotingConfigExclusion("absent_id", VotingConfigExclusion.MISSING_VALUE_MARKER)),
Expand Down Expand Up @@ -282,7 +283,7 @@ public void testExcludeAbsentNodesByNodeNames() {
localNode,
AddVotingConfigExclusionsAction.NAME,
new AddVotingConfigExclusionsRequest("absent_node"),
expectSuccess(e -> {
expectSuccess(r -> {
final var state = clusterService.getClusterApplierService().state();
assertEquals(
Set.of(new VotingConfigExclusion(VotingConfigExclusion.MISSING_VALUE_MARKER, "absent_node")),
Expand All @@ -297,8 +298,7 @@ public void testExcludeAbsentNodesByNodeNames() {

public void testExcludeExistingNodesByNodeNames() {
final var countDownLatch = new CountDownLatch(1);
final var configurationAdjuster = new AdjustConfigurationForExclusions();
clusterStateObserver.waitForNextChange(configurationAdjuster);
clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions());
transportService.sendRequest(
localNode,
AddVotingConfigExclusionsAction.NAME,
Expand All @@ -314,7 +314,7 @@ public void testExcludeExistingNodesByNodeNames() {
safeAwait(countDownLatch);
}

public void testSucceedsEvenIfAllExclusionsAlreadyAdded() {
public void testTriggersReconfigurationEvenIfAllExclusionsAlreadyAddedButStillInConfiguration() {
final ClusterState state = clusterService.state();
final ClusterState.Builder builder = builder(state);
builder.metadata(
Expand All @@ -326,14 +326,19 @@ public void testSucceedsEvenIfAllExclusionsAlreadyAdded() {
setState(clusterService, builder);

final CountDownLatch countDownLatch = new CountDownLatch(1);

clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions());
transportService.sendRequest(
localNode,
AddVotingConfigExclusionsAction.NAME,
new AddVotingConfigExclusionsRequest("other1"),
randomFrom(
new AddVotingConfigExclusionsRequest("other1"),
new AddVotingConfigExclusionsRequest(new String[] { "other1" }, Strings.EMPTY_ARRAY, TimeValue.timeValueSeconds(30))
),
expectSuccess(r -> {
assertNotNull(r);
assertThat(clusterService.getClusterApplierService().state().getVotingConfigExclusions(), contains(otherNode1Exclusion));
final var finalState = clusterService.getClusterApplierService().state();
assertThat(finalState.getVotingConfigExclusions(), contains(otherNode1Exclusion));
assertAllExclusionsApplied(finalState);
countDownLatch.countDown();
})
);
Expand All @@ -346,20 +351,26 @@ public void testExcludeByNodeIdSucceedsEvenIfAllExclusionsAlreadyAdded() {
builder.metadata(
Metadata.builder(state.metadata())
.coordinationMetadata(
CoordinationMetadata.builder(state.coordinationMetadata()).addVotingConfigExclusion(otherNode1Exclusion).build()
CoordinationMetadata.builder(state.coordinationMetadata())
.lastCommittedConfiguration(VotingConfiguration.of(localNode, otherNode2))
.lastAcceptedConfiguration(VotingConfiguration.of(localNode, otherNode2))
.addVotingConfigExclusion(otherNode1Exclusion)
.build()
)
);
setState(clusterService, builder);

final CountDownLatch countDownLatch = new CountDownLatch(1);

clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions());
transportService.sendRequest(
localNode,
AddVotingConfigExclusionsAction.NAME,
new AddVotingConfigExclusionsRequest(new String[] { "other1" }, Strings.EMPTY_ARRAY, TimeValue.timeValueSeconds(30)),
expectSuccess(r -> {
assertNotNull(r);
assertThat(clusterService.getClusterApplierService().state().getVotingConfigExclusions(), contains(otherNode1Exclusion));
final var finalState = clusterService.getClusterApplierService().state();
assertThat(finalState.getVotingConfigExclusions(), contains(otherNode1Exclusion));
assertAllExclusionsApplied(finalState);
countDownLatch.countDown();
})
);
Expand All @@ -372,7 +383,11 @@ public void testExcludeByNodeNameSucceedsEvenIfAllExclusionsAlreadyAdded() {
builder.metadata(
Metadata.builder(state.metadata())
.coordinationMetadata(
CoordinationMetadata.builder(state.coordinationMetadata()).addVotingConfigExclusion(otherNode1Exclusion).build()
CoordinationMetadata.builder(state.coordinationMetadata())
.lastCommittedConfiguration(VotingConfiguration.of(localNode, otherNode2))
.lastAcceptedConfiguration(VotingConfiguration.of(localNode, otherNode2))
.addVotingConfigExclusion(otherNode1Exclusion)
.build()
)
);
setState(clusterService, builder);
Expand All @@ -385,7 +400,9 @@ public void testExcludeByNodeNameSucceedsEvenIfAllExclusionsAlreadyAdded() {
new AddVotingConfigExclusionsRequest("other1"),
expectSuccess(r -> {
assertNotNull(r);
assertThat(clusterService.getClusterApplierService().state().getVotingConfigExclusions(), contains(otherNode1Exclusion));
final var finalState = clusterService.getClusterApplierService().state();
assertThat(finalState.getVotingConfigExclusions(), contains(otherNode1Exclusion));
assertAllExclusionsApplied(finalState);
countDownLatch.countDown();
})
);
Expand Down Expand Up @@ -469,7 +486,7 @@ public void testTimesOut() {
expectError(e -> {
final Throwable rootCause = e.getRootCause();
assertThat(rootCause, instanceOf(ElasticsearchTimeoutException.class));
assertThat(rootCause.getMessage(), startsWith("timed out waiting for voting config exclusions [{other1}"));
assertThat(rootCause.getMessage(), equalTo("timed out waiting for voting config exclusions to take effect"));
countDownLatch.countDown();
})
);
Expand Down Expand Up @@ -533,10 +550,13 @@ public void handleException(TransportException exp) {
private static class AdjustConfigurationForExclusions implements Listener {
@Override
public void onNewClusterState(ClusterState state) {
clusterService.getMasterService().submitUnbatchedStateUpdateTask("reconfiguration", new ClusterStateUpdateTask() {
final var prio = randomFrom(Priority.values());
clusterService.getMasterService().submitUnbatchedStateUpdateTask("reconfiguration", new ClusterStateUpdateTask(prio) {
@Override
public ClusterState execute(ClusterState currentState) {
assertThat(currentState, sameInstance(state));
if (prio.compareTo(Priority.URGENT) <= 0) {
assertThat(currentState, sameInstance(state));
}
final Set<String> votingNodeIds = new HashSet<>();
currentState.nodes().forEach(n -> votingNodeIds.add(n.getId()));
currentState.getVotingConfigExclusions().forEach(t -> votingNodeIds.remove(t.getNodeId()));
Expand Down

0 comments on commit 1fef466

Please sign in to comment.