From 96bb37969e03b53f7ac72896a6f9d18aea311913 Mon Sep 17 00:00:00 2001 From: pranikum <109206473+pranikum@users.noreply.github.com> Date: Sun, 25 Sep 2022 20:47:20 +0530 Subject: [PATCH] Fix spotless java Signed-off-by: pranikum <109206473+pranikum@users.noreply.github.com> --- .../wrr/put/ClusterPutWRRWeightsAction.java | 2 +- .../wrr/put/ClusterPutWRRWeightsRequest.java | 14 +- .../wrr/put/ClusterPutWRRWeightsResponse.java | 2 +- .../decommission/DecommissionController.java | 165 ++++++++++++++++++ .../decommission/DecommissionService.java | 25 ++- .../opensearch/cluster/metadata/Metadata.java | 8 +- .../metadata/WeightedRoundRobinMetadata.java | 2 +- .../cluster/routing/WRRWeights.java | 2 +- 8 files changed, 199 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/wrr/put/ClusterPutWRRWeightsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/wrr/put/ClusterPutWRRWeightsAction.java index 5379f1654456d..08a46dacf41bf 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/wrr/put/ClusterPutWRRWeightsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/wrr/put/ClusterPutWRRWeightsAction.java @@ -24,4 +24,4 @@ private ClusterPutWRRWeightsAction() { super(NAME, ClusterPutWRRWeightsResponse::new); } -} \ No newline at end of file +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/wrr/put/ClusterPutWRRWeightsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/wrr/put/ClusterPutWRRWeightsRequest.java index 49370cd32c679..d023076ffc807 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/wrr/put/ClusterPutWRRWeightsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/wrr/put/ClusterPutWRRWeightsRequest.java @@ -80,12 +80,12 @@ public void setWRRWeight(Map source) { public void setWRRWeight(BytesReference source, XContentType contentType) { try ( - XContentParser parser = XContentHelper.createParser( - NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, - source, - contentType - ) + XContentParser parser = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + source, + contentType + ) ) { String attrValue = null; Map weights = new HashMap<>(); @@ -161,4 +161,4 @@ public void writeTo(StreamOutput out) throws IOException { public String toString() { return "ClusterPutWRRWeightsRequest{" + "wrrWeight= " + wrrWeight.toString() + "}"; } -} \ No newline at end of file +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/wrr/put/ClusterPutWRRWeightsResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/wrr/put/ClusterPutWRRWeightsResponse.java index 7cd29c8579a0f..cc90c34b086d5 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/wrr/put/ClusterPutWRRWeightsResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/wrr/put/ClusterPutWRRWeightsResponse.java @@ -27,4 +27,4 @@ public ClusterPutWRRWeightsResponse(boolean acknowledged) { public ClusterPutWRRWeightsResponse(StreamInput in) throws IOException { super(in); } -} \ No newline at end of file +} diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index 7719012f2f3d7..42ffd32505e96 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -18,6 +18,13 @@ import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction; import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest; import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsResponse; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.cluster.shards.routing.wrr.put.ClusterPutWRRWeightsAction; +import org.opensearch.action.admin.cluster.shards.routing.wrr.put.ClusterPutWRRWeightsRequest; +import org.opensearch.action.admin.cluster.shards.routing.wrr.put.ClusterPutWRRWeightsResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.ClusterStateTaskConfig; @@ -32,6 +39,7 @@ import org.opensearch.common.Strings; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.unit.TimeValue; +import org.opensearch.http.HttpStats; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportException; import org.opensearch.transport.TransportResponseHandler; @@ -39,7 +47,9 @@ import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Predicate; @@ -267,4 +277,159 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } }); } + + public void handleNodesDecommissionRequest( + Set nodesToBeDecommissioned, + List zones, + String reason, + TimeValue timeout, + TimeValue timeoutForNodeDecommission, + ActionListener nodesRemovedListener + ) { + setWeightForDecommissionedZone(zones); + checkHttpStatsForDecommissionedNodes(nodesToBeDecommissioned, reason, timeout, timeoutForNodeDecommission, nodesRemovedListener); + } + + private void setWeightForDecommissionedZone(List zones) { + ClusterState clusterState = clusterService.getClusterApplierService().state(); + + DecommissionAttributeMetadata decommissionAttributeMetadata = clusterState.metadata().custom(DecommissionAttributeMetadata.TYPE); + assert decommissionAttributeMetadata.status().equals(DecommissionStatus.INIT) + : "unexpected status encountered while decommissioning nodes"; + DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute(); + + Map weights = new HashMap<>(); + zones.forEach(zone -> { + if (zone.equalsIgnoreCase(decommissionAttribute.attributeValue())) { + weights.put(zone, "0"); + } else { + weights.put(zone, "1"); + } + }); + + // WRR API will validate invalid weights + final ClusterPutWRRWeightsRequest clusterWeightRequest = new ClusterPutWRRWeightsRequest(); + clusterWeightRequest.attributeName("zone"); + clusterWeightRequest.setWRRWeight(weights); + + transportService.sendRequest( + transportService.getLocalNode(), + ClusterPutWRRWeightsAction.NAME, + clusterWeightRequest, + new TransportResponseHandler() { + @Override + public void handleResponse(ClusterPutWRRWeightsResponse response) { + logger.info("Weights are successfully set."); + } + + @Override + public void handleException(TransportException exp) { + // Logging warn message on failure. Should we do Retry? If weights are not set should we fail? + logger.warn("Exception occurred while setting weights.Exception Messages - [{}]", exp.unwrapCause().getMessage()); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public ClusterPutWRRWeightsResponse read(StreamInput in) throws IOException { + return new ClusterPutWRRWeightsResponse(in); + } + } + ); + } + + public void checkHttpStatsForDecommissionedNodes( + Set decommissionedNodes, + String reason, + TimeValue timeout, + TimeValue timeoutForNodeDecommission, + ActionListener listener + ) { + + if (timeoutForNodeDecommission.getSeconds() > 0) { + // Wait for timeout to happen. Log the active connection before decommissioning of nodes. + scheduleDecommissionNodesRequestCheck(decommissionedNodes, reason, timeout, listener, timeoutForNodeDecommission); + } else { + getActiveRequestCountOnDecommissionNodes(decommissionedNodes); + removeDecommissionedNodes(decommissionedNodes, reason, timeout, listener); + } + } + + private void logActiveConnections(NodesStatsResponse nodesStatsResponse) { + Map nodeActiveConnectionMap = new HashMap<>(); + List responseNodes = nodesStatsResponse.getNodes(); + for (int i = 0; i < responseNodes.size(); i++) { + HttpStats httpStats = responseNodes.get(i).getHttp(); + DiscoveryNode node = responseNodes.get(i).getNode(); + nodeActiveConnectionMap.put(node.getId(), httpStats.getServerOpen()); + } + logger.info("Decommissioning node with connections : [{}]", nodeActiveConnectionMap); + } + + private void scheduleDecommissionNodesRequestCheck( + Set decommissionedNodes, + String reason, + TimeValue timeout, + ActionListener nodesRemovedListener, + TimeValue timeoutForNodeDecommission + ) { + transportService.getThreadPool().schedule(new Runnable() { + @Override + public void run() { + // Check for active connections. + getActiveRequestCountOnDecommissionNodes(decommissionedNodes); + removeDecommissionedNodes(decommissionedNodes, reason, timeout, nodesRemovedListener); + } + + @Override + public String toString() { + return ""; + } + }, timeoutForNodeDecommission, org.opensearch.threadpool.ThreadPool.Names.SAME); + } + + private void getActiveRequestCountOnDecommissionNodes(Set decommissionedNodes) { + if (decommissionedNodes == null || decommissionedNodes.isEmpty()) { + return; + } + String[] nodes = decommissionedNodes.stream().map(DiscoveryNode::getId).toArray(String[]::new); + + if (nodes.length == 0) { + return; + } + + final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(nodes); + nodesStatsRequest.clear(); + nodesStatsRequest.addMetric(NodesStatsRequest.Metric.HTTP.metricName()); + + transportService.sendRequest( + transportService.getLocalNode(), + NodesStatsAction.NAME, + nodesStatsRequest, + new TransportResponseHandler() { + @Override + public void handleResponse(NodesStatsResponse response) { + logActiveConnections(response); + } + + @Override + public void handleException(TransportException exp) { + logger.warn("Failure occurred while dumping connection for decommission nodes. [{}]", exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public NodesStatsResponse read(StreamInput in) throws IOException { + return new NodesStatsResponse(in); + } + } + ); + } } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 1a0704c5a4ac2..17f72b895dbf7 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -117,7 +117,8 @@ private void setForcedAwarenessAttributes(Settings forceSettings) { */ public void startDecommissionAction( final DecommissionAttribute decommissionAttribute, - final ActionListener listener + final ActionListener listener, + final TimeValue timeOutForNodeDecommission ) { // register the metadata with status as INIT as first step clusterService.submitStateUpdateTask("decommission [" + decommissionAttribute + "]", new ClusterStateUpdateTask(Priority.URGENT) { @@ -156,14 +157,19 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS decommissionAttributeMetadata.decommissionAttribute(), decommissionAttributeMetadata.status() ); - decommissionClusterManagerNodes(decommissionAttributeMetadata.decommissionAttribute(), listener); + decommissionClusterManagerNodes( + decommissionAttributeMetadata.decommissionAttribute(), + listener, + timeOutForNodeDecommission + ); } }); } private synchronized void decommissionClusterManagerNodes( final DecommissionAttribute decommissionAttribute, - ActionListener listener + ActionListener listener, + TimeValue timeOutForNodeDecommission ) { ClusterState state = clusterService.getClusterApplierService().state(); // since here metadata is already registered with INIT, we can guarantee that no new node with decommission attribute can further @@ -206,7 +212,7 @@ public void onResponse(Void unused) { // and to-be-decommissioned cluster manager is no more part of Voting Configuration and no more to-be-decommission // nodes can be part of Voting Config listener.onResponse(new ClusterStateUpdateResponse(true)); - failDecommissionedNodes(clusterService.getClusterApplierService().state()); + failDecommissionedNodes(clusterService.getClusterApplierService().state(), timeOutForNodeDecommission); } } else { // explicitly calling listener.onFailure with NotClusterManagerException as the local node is not the cluster manager @@ -303,19 +309,26 @@ public void onFailure(Exception e) { } } - private void failDecommissionedNodes(ClusterState state) { + private void failDecommissionedNodes(ClusterState state, TimeValue timeOutForNodeDecommission) { // this method ensures no matter what, we always exit from this function after clearing the voting config exclusion DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().decommissionAttributeMetadata(); DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute(); + + // Awareness values refers to all zones in the cluster + List awarenessValues = forcedAwarenessAttributes.get(decommissionAttribute.attributeName()); + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.IN_PROGRESS, new ActionListener<>() { @Override public void onResponse(DecommissionStatus status) { logger.info("updated the decommission status to [{}]", status); // execute nodes decommissioning - decommissionController.removeDecommissionedNodes( + + decommissionController.handleNodesDecommissionRequest( filterNodesWithDecommissionAttribute(clusterService.getClusterApplierService().state(), decommissionAttribute, false), + awarenessValues, "nodes-decommissioned", TimeValue.timeValueSeconds(120L), + timeOutForNodeDecommission, new ActionListener() { @Override public void onResponse(Void unused) { diff --git a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java index a7cfb938ac6f0..eb5e8bbc2d49b 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java @@ -1188,10 +1188,10 @@ public IndexMetadata getSafe(Index index) { return indexMetadata; } throw new IndexNotFoundException( - index, - new IllegalStateException( - "index uuid doesn't match expected: [" + index.getUUID() + "] but got: [" + indexMetadata.getIndexUUID() + "]" - ) + index, + new IllegalStateException( + "index uuid doesn't match expected: [" + index.getUUID() + "] but got: [" + indexMetadata.getIndexUUID() + "]" + ) ); } throw new IndexNotFoundException(index); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoundRobinMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoundRobinMetadata.java index e092054d50ffb..9b4f2a39ac5ed 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoundRobinMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoundRobinMetadata.java @@ -156,4 +156,4 @@ public static void toXContent(WRRWeights wrrWeight, XContentBuilder builder) thr public String toString() { return Strings.toString(this); } -} \ No newline at end of file +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/WRRWeights.java b/server/src/main/java/org/opensearch/cluster/routing/WRRWeights.java index 19c4142971205..daf2986ea8b71 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WRRWeights.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WRRWeights.java @@ -74,4 +74,4 @@ public Map weights() { public String attributeName() { return this.attributeName; } -} \ No newline at end of file +}