Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport -2.x] Cluster health call to throw decommissioned exception for local decommissioned node #6059

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Use ReplicationFailedException instead of OpensearchException in ReplicationTarget ([#4725](https://github.com/opensearch-project/OpenSearch/pull/4725))
- [Refactor] Use local opensearch.common.SetOnce instead of lucene's utility class ([#5947](https://github.com/opensearch-project/OpenSearch/pull/5947))
- Cluster health call to throw decommissioned exception for local decommissioned node([#6008](https://github.com/opensearch-project/OpenSearch/pull/6008))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@
"awareness_attribute":{
"type":"string",
"description":"The awareness attribute for which the health is required"
},
"ensure_node_commissioned":{
"type":"boolean",
"description": "Checks whether local node is commissioned or not. If set to true on a local call it will throw exception if node is decommissioned (default: false)"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,36 @@ public boolean innerMatch(LogEvent event) {
Coordinator coordinator = (Coordinator) internalCluster().getInstance(Discovery.class, decommissionedNode);
assertFalse(coordinator.localNodeCommissioned());

// Check cluster health API for decommissioned and active node
ClusterHealthResponse activeNodeLocalHealth = client(activeNode).admin()
.cluster()
.prepareHealth()
.setLocal(true)
.setEnsureNodeCommissioned(true)
.execute()
.actionGet();
assertFalse(activeNodeLocalHealth.isTimedOut());

ClusterHealthResponse decommissionedNodeLocalHealth = client(decommissionedNode).admin()
.cluster()
.prepareHealth()
.setLocal(true)
.execute()
.actionGet();
assertFalse(decommissionedNodeLocalHealth.isTimedOut());

NodeDecommissionedException ex = expectThrows(
NodeDecommissionedException.class,
() -> client(decommissionedNode).admin()
.cluster()
.prepareHealth()
.setLocal(true)
.setEnsureNodeCommissioned(true)
.execute()
.actionGet()
);
assertTrue(ex.getMessage().contains("local node is decommissioned"));

// Recommissioning the zone back to gracefully succeed the test once above tests succeeds
DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(activeNode).execute(
DeleteDecommissionStateAction.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class ClusterHealthRequest extends ClusterManagerNodeReadRequest<ClusterH
private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;
private String waitForNodes = "";
private Priority waitForEvents = null;
private boolean ensureNodeCommissioned = false;
/**
* Only used by the high-level REST Client. Controls the details level of the health information returned.
* The default value is 'cluster'.
Expand Down Expand Up @@ -103,6 +104,9 @@ public ClusterHealthRequest(StreamInput in) throws IOException {
awarenessAttribute = in.readOptionalString();
level = in.readEnum(Level.class);
}
if (in.getVersion().onOrAfter(Version.V_2_6_0)) {
ensureNodeCommissioned = in.readBoolean();
}
}

@Override
Expand Down Expand Up @@ -137,6 +141,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(awarenessAttribute);
out.writeEnum(level);
}
if (out.getVersion().onOrAfter(Version.V_2_6_0)) {
out.writeBoolean(ensureNodeCommissioned);
}
}

@Override
Expand Down Expand Up @@ -321,13 +328,30 @@ public String getAwarenessAttribute() {
return awarenessAttribute;
}

public final ClusterHealthRequest ensureNodeCommissioned(boolean ensureNodeCommissioned) {
this.ensureNodeCommissioned = ensureNodeCommissioned;
return this;
}

/**
* For a given local request, checks if the local node is commissioned or not (default: false).
* @return <code>true</code> if local information is to be returned only when local node is also commissioned
* <code>false</code> to not check local node if commissioned or not for a local request
*/
public final boolean ensureNodeCommissioned() {
return ensureNodeCommissioned;
}

@Override
public ActionRequestValidationException validate() {
if (level.equals(Level.AWARENESS_ATTRIBUTES) && indices.length > 0) {
return addValidationError("awareness_attribute is not a supported parameter with index health", null);
} else if (!level.equals(Level.AWARENESS_ATTRIBUTES) && awarenessAttribute != null) {
return addValidationError("level=awareness_attributes is required with awareness_attribute parameter", null);
}
if (ensureNodeCommissioned && local == false) {
return addValidationError("not a local request to ensure local node commissioned", null);
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,12 @@ public ClusterHealthRequestBuilder setLevel(String level) {
request.setLevel(level);
return this;
}

/**
* Specifies if the local request should ensure that the local node is commissioned
*/
public final ClusterHealthRequestBuilder setEnsureNodeCommissioned(boolean ensureNodeCommissioned) {
request.ensureNodeCommissioned(ensureNodeCommissioned);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.opensearch.cluster.LocalClusterUpdateTask;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.coordination.Coordinator;
import org.opensearch.cluster.decommission.NodeDecommissionedException;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
Expand All @@ -57,6 +59,7 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.CollectionUtils;
import org.opensearch.discovery.Discovery;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
Expand All @@ -77,6 +80,7 @@ public class TransportClusterHealthAction extends TransportClusterManagerNodeRea
private static final Logger logger = LogManager.getLogger(TransportClusterHealthAction.class);

private final AllocationService allocationService;
private final Discovery discovery;

@Inject
public TransportClusterHealthAction(
Expand All @@ -85,7 +89,8 @@ public TransportClusterHealthAction(
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
AllocationService allocationService
AllocationService allocationService,
Discovery discovery
) {
super(
ClusterHealthAction.NAME,
Expand All @@ -98,6 +103,7 @@ public TransportClusterHealthAction(
indexNameExpressionResolver
);
this.allocationService = allocationService;
this.discovery = discovery;
}

@Override
Expand Down Expand Up @@ -134,7 +140,12 @@ protected void clusterManagerOperation(
final ClusterState unusedState,
final ActionListener<ClusterHealthResponse> listener
) {

if (request.ensureNodeCommissioned()
&& discovery instanceof Coordinator
&& ((Coordinator) discovery).localNodeCommissioned() == false) {
listener.onFailure(new NodeDecommissionedException("local node is decommissioned"));
return;
}
final int waitCount = getWaitCount(request);

if (request.waitForEvents() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1438,8 +1438,7 @@ synchronized void onNodeCommissionStatusChange(boolean localNodeCommissioned) {
peerFinder.onNodeCommissionStatusChange(localNodeCommissioned);
}

// package-visible for testing
boolean localNodeCommissioned() {
public boolean localNodeCommissioned() {
return localNodeCommissioned;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.OpenSearchException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.rest.RestStatus;

import java.io.IOException;

Expand All @@ -28,4 +29,9 @@ public NodeDecommissionedException(String msg, Object... args) {
public NodeDecommissionedException(StreamInput in) throws IOException {
super(in);
}

@Override
public RestStatus status() {
return RestStatus.FAILED_DEPENDENCY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public static ClusterHealthRequest fromRequest(final RestRequest request) {
final ClusterHealthRequest clusterHealthRequest = clusterHealthRequest(Strings.splitStringByCommaToArray(request.param("index")));
clusterHealthRequest.indicesOptions(IndicesOptions.fromRequest(request, clusterHealthRequest.indicesOptions()));
clusterHealthRequest.local(request.paramAsBoolean("local", clusterHealthRequest.local()));
clusterHealthRequest.ensureNodeCommissioned(
request.paramAsBoolean("ensure_node_commissioned", clusterHealthRequest.ensureNodeCommissioned())
);
clusterHealthRequest.clusterManagerNodeTimeout(
request.paramAsTime("cluster_manager_timeout", clusterHealthRequest.clusterManagerNodeTimeout())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.action.admin.cluster.health;

import org.opensearch.LegacyESVersion;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.common.Priority;
Expand Down Expand Up @@ -152,6 +153,23 @@ public void testBwcSerialization() throws Exception {
}
}

public void testValidation() {
ClusterHealthRequest clusterHealthRequest = randomRequest();
{
clusterHealthRequest.local(false);
clusterHealthRequest.ensureNodeCommissioned(true);
ActionRequestValidationException e = clusterHealthRequest.validate();
assertNotNull(e);
assertTrue(e.getMessage().contains("not a local request to ensure local node commissioned"));
}
{
clusterHealthRequest.local(true);
clusterHealthRequest.ensureNodeCommissioned(false);
ActionRequestValidationException e = clusterHealthRequest.validate();
assertNull(e);
}
}

private ClusterHealthRequest randomRequest() {
ClusterHealthRequest request = new ClusterHealthRequest();
request.waitForStatus(randomFrom(ClusterHealthStatus.values()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ public void testClusterHealthWaitsForClusterStateApplication() throws Interrupte
threadPool,
new ActionFilters(new HashSet<>()),
indexNameExpressionResolver,
new AllocationService(null, new TestGatewayAllocator(), null, null, null)
new AllocationService(null, new TestGatewayAllocator(), null, null, null),
null
);
PlainActionFuture<ClusterHealthResponse> listener = new PlainActionFuture<>();
action.execute(new ClusterHealthRequest().waitForGreenStatus(), listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public void testFromRequest() {
Map<String, String> params = new HashMap<>();
String index = "index";
boolean local = randomBoolean();
boolean ensureLocalNodeCommissioned = false;
if (local) {
ensureLocalNodeCommissioned = randomBoolean();
}
String clusterManagerTimeout = randomTimeValue();
String timeout = randomTimeValue();
ClusterHealthStatus waitForStatus = randomFrom(ClusterHealthStatus.values());
Expand All @@ -63,6 +67,7 @@ public void testFromRequest() {

params.put("index", index);
params.put("local", String.valueOf(local));
params.put("ensure_node_commissioned", String.valueOf(ensureLocalNodeCommissioned));
params.put("cluster_manager_timeout", clusterManagerTimeout);
params.put("timeout", timeout);
params.put("wait_for_status", waitForStatus.name());
Expand All @@ -81,6 +86,7 @@ public void testFromRequest() {
assertThat(clusterHealthRequest.indices().length, equalTo(1));
assertThat(clusterHealthRequest.indices()[0], equalTo(index));
assertThat(clusterHealthRequest.local(), equalTo(local));
assertThat(clusterHealthRequest.ensureNodeCommissioned(), equalTo(ensureLocalNodeCommissioned));
assertThat(clusterHealthRequest.clusterManagerNodeTimeout(), equalTo(TimeValue.parseTimeValue(clusterManagerTimeout, "test")));
assertThat(clusterHealthRequest.timeout(), equalTo(TimeValue.parseTimeValue(timeout, "test")));
assertThat(clusterHealthRequest.waitForStatus(), equalTo(waitForStatus));
Expand Down