Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decommission controlled discovery/pr #49

Open
wants to merge 88 commits into
base: decommission/pr
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
9b864e2
Add Executor to decommission node attribute
imRishN Aug 2, 2022
6f188e8
Add DecommissionHelper
imRishN Aug 8, 2022
c57bc77
Decommission service implementation with metadata
imRishN Aug 17, 2022
d348085
Fixes
imRishN Aug 17, 2022
9cccc44
Master abdication
imRishN Aug 17, 2022
4923f0a
Fixes
imRishN Aug 17, 2022
38c5930
Update join validator to validate decommissioned node join request
imRishN Aug 17, 2022
42050fa
Clear voting config after decommissioning
imRishN Aug 18, 2022
30dbfdc
Resolving comments
imRishN Aug 23, 2022
52a8e6b
Fixes
imRishN Aug 23, 2022
1561024
Fixes
imRishN Aug 23, 2022
53057ac
Some refactpring
imRishN Aug 23, 2022
5c2b91c
Updates
imRishN Aug 23, 2022
fc95ac3
Fix to abdication
imRishN Aug 23, 2022
ac9c372
Remove cluster state variable from service
imRishN Aug 23, 2022
825092a
Log node string
imRishN Aug 23, 2022
6a3157c
Fix conflict
imRishN Aug 24, 2022
109459b
Changes in Service
imRishN Aug 24, 2022
4cd06da
Fix spotless check
imRishN Aug 24, 2022
a28e6fd
Update the join validator for decommissioned attribute
imRishN Aug 24, 2022
763f241
Add UTs for metadata
imRishN Aug 24, 2022
1464bbb
Add UTs for JoinTaskExecutor changes
imRishN Aug 24, 2022
f3d49a3
Fix
imRishN Aug 24, 2022
fd0bbe8
Test files
imRishN Aug 25, 2022
efdad78
Move observer logic to helper
imRishN Aug 25, 2022
48def4b
fix msg
imRishN Aug 25, 2022
23f59b1
Move predicate to helper
imRishN Aug 26, 2022
d99dcac
test
imRishN Aug 29, 2022
5ee6f44
Add UT
imRishN Aug 29, 2022
20b465e
Add UT for DecommissionController
imRishN Aug 29, 2022
3b5dd14
Improvements and UTs
imRishN Aug 30, 2022
4fa02f0
Add UT
imRishN Aug 30, 2022
447084f
Fix decommission initiation
imRishN Aug 30, 2022
55238ee
Changes
imRishN Aug 30, 2022
0d850a3
Move DecommissionAttributeMetadata to decommission package
imRishN Aug 30, 2022
a6e542d
Update exception name
imRishN Aug 30, 2022
1b98cf5
Fix spotless and precommit checks
imRishN Aug 30, 2022
ebf7e6e
Update enum
imRishN Aug 30, 2022
c3755a2
Fix spotless and precommit checks
imRishN Aug 30, 2022
9c7cd3f
Add package-info and Changelog
imRishN Aug 30, 2022
0bb70e2
Add checks for quorum
imRishN Sep 1, 2022
d976865
Bug fix
imRishN Sep 1, 2022
ba5c572
Resolving PR comments
imRishN Sep 2, 2022
5cc5c9c
Update awareness attribute decommission status check
imRishN Sep 6, 2022
a356e46
Update quorum loss check logic
imRishN Sep 6, 2022
585c37c
Update status assertion and clear voting config for failed init
imRishN Sep 6, 2022
406950a
Refactoring
imRishN Sep 6, 2022
29013a9
Fix spotless check
imRishN Sep 6, 2022
1e6a3ff
Resolve comments
imRishN Sep 6, 2022
9006455
Fix spotless check
imRishN Sep 6, 2022
985317d
Updating states and flow
imRishN Sep 7, 2022
c851909
Trigger exclusion after init
imRishN Sep 7, 2022
798f0a2
Updates
imRishN Sep 7, 2022
3fe2b66
Resolving comments
imRishN Sep 7, 2022
e24048d
Fixes
imRishN Sep 7, 2022
c4038d8
Fix spotless check
imRishN Sep 7, 2022
cba93d5
Resolve comments
imRishN Sep 8, 2022
74769be
Precheck for retry
imRishN Sep 8, 2022
9119cbd
Add logging
imRishN Sep 8, 2022
807bd3a
Fix spotless check
imRishN Sep 8, 2022
ad4b227
Fix controller tests
imRishN Sep 9, 2022
871784e
Fix Decommission Service test
imRishN Sep 9, 2022
f3cf714
Fix spotless check
imRishN Sep 9, 2022
a6619b5
Empty-Commit
imRishN Sep 9, 2022
06048a8
Add getHistoryOperationsFromTranslog method to fetch the history snap…
ankitkala Sep 13, 2022
3a1dbc8
Add package-info and Changelog
imRishN Aug 30, 2022
418c079
Empty-Commit
imRishN Sep 9, 2022
ac28c4d
Address Comments
imRishN Sep 13, 2022
4b37726
Fix tests
imRishN Sep 13, 2022
a268fd3
Fix spotless check
imRishN Sep 13, 2022
0d420b1
Update logic for exclusion response
imRishN Sep 16, 2022
84443be
Update Changelog
imRishN Sep 16, 2022
0bcf6b3
Addressing minor comments
imRishN Sep 16, 2022
e114e85
Update request eligibility check
imRishN Sep 16, 2022
ad9f040
Update metadata usage
imRishN Sep 16, 2022
7a65770
Remove fromStage method and update withUpdatedStatus method in metadata
imRishN Sep 16, 2022
453b1b1
Fix spotless check
imRishN Sep 16, 2022
1b42670
Add observer to ensure abdication
imRishN Sep 16, 2022
a30fa9d
Refactor node removal observer
imRishN Sep 16, 2022
32481f5
Fix spotless check
imRishN Sep 16, 2022
9f6e1de
Update state transistions
imRishN Sep 19, 2022
32de492
Small fixes
imRishN Sep 19, 2022
779bdf8
Fixes
imRishN Sep 19, 2022
0d81ae2
Fix spotless check
imRishN Sep 19, 2022
b8bd931
Control peer discovery during decommission
imRishN Aug 31, 2022
828167f
Change runnable to action listener
imRishN Aug 31, 2022
e73f4f7
skip prevoting for decommissioned nodes irrespective of cluster state
imRishN Sep 1, 2022
e55b588
Merge branch 'decommission/pr' into decommission-controlled-discovery/pr
imRishN Sep 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,19 +208,6 @@ public Coordinator(
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.singleNodeDiscovery = DiscoveryModule.isSingleNodeDiscovery(settings);
this.electionStrategy = electionStrategy;
this.joinHelper = new JoinHelper(
settings,
allocationService,
clusterManagerService,
transportService,
this::getCurrentTerm,
this::getStateForClusterManagerService,
this::handleJoinRequest,
this::joinLeaderInTerm,
this.onJoinValidators,
rerouteService,
nodeHealthService
);
this.persistedStateSupplier = persistedStateSupplier;
this.noClusterManagerBlockService = new NoClusterManagerBlockService(settings, clusterSettings);
this.lastKnownLeader = Optional.empty();
Expand All @@ -244,6 +231,20 @@ public Coordinator(
new HandshakingTransportAddressConnector(settings, transportService),
configuredHostsResolver
);
this.joinHelper = new JoinHelper(
settings,
allocationService,
clusterManagerService,
transportService,
this::getCurrentTerm,
this::getStateForClusterManagerService,
this::handleJoinRequest,
this::joinLeaderInTerm,
this.onJoinValidators,
rerouteService,
nodeHealthService,
peerFinder.nodeCommissionedListener()
);
this.publicationHandler = new PublicationTransportHandler(
transportService,
namedWriteableRegistry,
Expand Down Expand Up @@ -1438,6 +1439,11 @@ private void startElectionScheduler() {
public void run() {
synchronized (mutex) {
if (mode == Mode.CANDIDATE) {
if(peerFinder.localNodeDecommissioned()) {
logger.debug("skip prevoting as local node is decommissioned");
return;
}

final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();

if (localNodeMayWinElection(lastAcceptedState) == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.cluster.ClusterStateTaskListener;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.coordination.Coordinator.Mode;
import org.opensearch.cluster.decommission.NodeDecommissionedException;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RerouteService;
Expand All @@ -57,6 +58,7 @@
import org.opensearch.monitor.StatusInfo;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.RemoteTransportException;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportRequest;
Expand Down Expand Up @@ -113,6 +115,9 @@ public class JoinHelper {
private final TimeValue joinTimeout; // only used for Zen1 joining
private final NodeHealthService nodeHealthService;

public boolean isDecommissioned;
private final ActionListener<Void> nodeCommissionedListener;

private final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>());

private final AtomicReference<FailedJoinAttempt> lastFailedJoinAttempt = new AtomicReference<>();
Expand All @@ -130,12 +135,14 @@ public class JoinHelper {
Function<StartJoinRequest, Join> joinLeaderInTerm,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators,
RerouteService rerouteService,
NodeHealthService nodeHealthService
NodeHealthService nodeHealthService,
ActionListener<Void> nodeCommissionedListener
) {
this.clusterManagerService = clusterManagerService;
this.transportService = transportService;
this.nodeHealthService = nodeHealthService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
this.nodeCommissionedListener = nodeCommissionedListener;
this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService, transportService) {

private final long term = currentTermSupplier.getAsLong();
Expand Down Expand Up @@ -343,11 +350,22 @@ public void handleResponse(Empty response) {
logger.debug("successfully joined {} with {}", destination, joinRequest);
lastFailedJoinAttempt.set(null);
onCompletion.run();
if (isDecommissioned) {
isDecommissioned = false;
nodeCommissionedListener.onResponse(null);
}
}

@Override
public void handleException(TransportException exp) {
pendingOutgoingJoins.remove(dedupKey);
if (exp instanceof RemoteTransportException && (exp.getCause() instanceof NodeDecommissionedException)) {
logger.info("local node is decommissioned. Will not be able to join the cluster");
if (!isDecommissioned) {
isDecommissioned = true;
nodeCommissionedListener.onFailure(exp);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : it doesn't look like a Failure, but an expected behavior. I would prefer just a Executor over ActionListener for same reason .

But overall it looks good.

}
}
logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp);
FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp);
attempt.logNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ public void apply(Settings value, Settings current, Settings previous) {
PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING,
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,
PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING,
PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING,
PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING,
ClusterFormationFailureHelper.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING,
ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING,
Expand Down
39 changes: 38 additions & 1 deletion server/src/main/java/org/opensearch/discovery/PeerFinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,23 @@ public abstract class PeerFinder {
Setting.Property.NodeScope
);

// the time between attempts to find all peers when node is in decommissioned state, default set to 2 minutes
public static final Setting<TimeValue> DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING = Setting.timeSetting(
"discovery.find_peers_interval_during_decommission",
TimeValue.timeValueMinutes(2L),
TimeValue.timeValueMillis(1000),
Setting.Property.NodeScope
);

public static final Setting<TimeValue> DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING = Setting.timeSetting(
"discovery.request_peers_timeout",
TimeValue.timeValueMillis(3000),
TimeValue.timeValueMillis(1),
Setting.Property.NodeScope
);

private final TimeValue findPeersInterval;
private final Settings settings;
private TimeValue findPeersInterval;
private final TimeValue requestPeersTimeout;

private final Object mutex = new Object();
Expand All @@ -101,6 +110,7 @@ public abstract class PeerFinder {

private volatile long currentTerm;
private boolean active;
private boolean localNodeDecommissioned = false;
private DiscoveryNodes lastAcceptedNodes;
private final Map<TransportAddress, Peer> peersByAddress = new LinkedHashMap<>();
private Optional<DiscoveryNode> leader = Optional.empty();
Expand All @@ -112,6 +122,7 @@ public PeerFinder(
TransportAddressConnector transportAddressConnector,
ConfiguredHostsResolver configuredHostsResolver
) {
this.settings = settings;
findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings);
this.transportService = transportService;
Expand All @@ -128,6 +139,32 @@ public PeerFinder(
);
}

public ActionListener<Void> nodeCommissionedListener() {
return new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
logger.info("setting findPeersInterval to [{}], due to recommissioning", findPeersInterval);
assert localNodeDecommissioned; // TODO: Do we need this?
localNodeDecommissioned = false;
findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);

}

@Override
public void onFailure(Exception e) {
logger.info("setting findPeersInterval to [{}], due to decommissioning",
DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING.get(settings));
assert !localNodeDecommissioned;
localNodeDecommissioned = true;
findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING.get(settings);
}
};
}

public boolean localNodeDecommissioned() {
return localNodeDecommissioned;
}

public void activate(final DiscoveryNodes lastAcceptedNodes) {
logger.trace("activating with {}", lastAcceptedNodes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.logging.log4j.Level;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.ClusterName;
Expand All @@ -55,6 +56,7 @@
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import static org.mockito.Mockito.mock;
import static org.opensearch.monitor.StatusInfo.Status.HEALTHY;
import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY;
import static org.opensearch.node.Node.NODE_NAME_SETTING;
Expand Down Expand Up @@ -90,7 +92,8 @@ public void testJoinDeduplication() {
startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList(),
(s, p, r) -> {},
() -> new StatusInfo(HEALTHY, "info")
() -> new StatusInfo(HEALTHY, "info"),
mock(ActionListener.class)
);
transportService.start();

Expand Down Expand Up @@ -230,7 +233,8 @@ private void assertJoinValidationRejectsMismatchedClusterUUID(String actionName,
startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList(),
(s, p, r) -> {},
null
null,
mock(ActionListener.class)
); // registers request handler
transportService.start();
transportService.acceptIncomingRequests();
Expand Down Expand Up @@ -284,7 +288,8 @@ public void testJoinFailureOnUnhealthyNodes() {
startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList(),
(s, p, r) -> {},
() -> nodeHealthServiceStatus.get()
() -> nodeHealthServiceStatus.get(),
mock(ActionListener.class)
);
transportService.start();

Expand Down