From 563122ed66805a3e7518ec8b4f2e47ac8f49297b Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Thu, 6 Oct 2022 12:29:52 +0530 Subject: [PATCH] Add GET api to get shard routing weights (#4275) * Weighted round-robin scheduling policy for shard coordination traffic routing Signed-off-by: Anshu Agarwal --- CHANGELOG.md | 1 + .../client/RestHighLevelClientTests.java | 1 + .../api/cluster.get_weighted_routing.json | 25 ++ .../cluster/routing/WeightedRoutingIT.java | 245 ++++++++++++++++ .../org/opensearch/action/ActionModule.java | 5 + .../get/ClusterGetWeightedRoutingAction.java | 25 ++ .../get/ClusterGetWeightedRoutingRequest.java | 62 +++++ ...usterGetWeightedRoutingRequestBuilder.java | 37 +++ .../ClusterGetWeightedRoutingResponse.java | 125 +++++++++ .../TransportGetWeightedRoutingAction.java | 111 ++++++++ .../routing/weighted/get/package-info.java | 10 + .../TransportAddWeightedRoutingAction.java | 43 +-- .../opensearch/client/ClusterAdminClient.java | 18 ++ .../java/org/opensearch/client/Requests.java | 10 + .../client/support/AbstractClient.java | 22 ++ .../routing/WeightedRoutingService.java | 41 ++- .../RestClusterGetWeightedRoutingAction.java | 52 ++++ ...ClusterGetWeightedRoutingRequestTests.java | 39 +++ ...lusterGetWeightedRoutingResponseTests.java | 38 +++ ...ransportGetWeightedRoutingActionTests.java | 262 ++++++++++++++++++ .../routing/WeightedRoutingServiceTests.java | 19 +- 21 files changed, 1152 insertions(+), 39 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/cluster.get_weighted_routing.json create mode 100644 server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingRequest.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingRequestBuilder.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/package-info.java create mode 100644 server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterGetWeightedRoutingAction.java create mode 100644 server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingRequestTests.java create mode 100644 server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java create mode 100644 server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/TransportGetWeightedRoutingActionTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b7b7b29cc904..868ce6ed7bf9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948)) - [Remote Store] Change behaviour in replica recovery for remote translog enabled indices ([#4318](https://github.com/opensearch-project/OpenSearch/pull/4318)) - PUT api for weighted shard routing ([#4272](https://github.com/opensearch-project/OpenSearch/pull/4272)) +- GET api for weighted shard routing([#4275](https://github.com/opensearch-project/OpenSearch/pull/4275/)) - Unmute test RelocationIT.testRelocationWhileIndexingRandom ([#4580](https://github.com/opensearch-project/OpenSearch/pull/4580)) - Add DecommissionService and helper to execute awareness attribute decommissioning ([#4084](https://github.com/opensearch-project/OpenSearch/pull/4084)) - Further simplification of the ZIP publication implementation ([#4360](https://github.com/opensearch-project/OpenSearch/pull/4360)) diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java index 07bf98a8ff0a3..1b9c7d4aead12 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java @@ -890,6 +890,7 @@ public void testApiNamingConventions() throws Exception { "search_shards", "remote_store.restore", "cluster.put_weighted_routing", + "cluster.get_weighted_routing", "cluster.put_decommission_awareness", "cluster.get_decommission_awareness", }; List booleanReturnMethods = Arrays.asList("security.enable_user", "security.disable_user", "security.change_password"); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.get_weighted_routing.json b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.get_weighted_routing.json new file mode 100644 index 0000000000000..45eb3d2b62a84 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.get_weighted_routing.json @@ -0,0 +1,25 @@ +{ + "cluster.get_weighted_routing": { + "documentation": { + "url": "https://opensearch.org/docs/latest/opensearch/rest-api/weighted-routing/get", + "description": "Fetches weighted shard routing weights" + }, + "stability": "stable", + "url": { + "paths": [ + { + "path": "/_cluster/routing/awareness/{attribute}/weights", + "methods": [ + "GET" + ], + "parts": { + "attribute": { + "type": "string", + "description": "Awareness attribute name" + } + } + } + ] + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java new file mode 100644 index 0000000000000..6cf8292095c6a --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java @@ -0,0 +1,245 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3) +public class WeightedRoutingIT extends OpenSearchIntegTestCase { + + public void testPutWeightedRouting() { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + logger.info("--> starting 6 nodes on different zones"); + int nodeCountPerAZ = 2; + + logger.info("--> starting a dedicated cluster manager node"); + internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build()); + + logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'"); + List nodes_in_zone_a = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build() + ); + List nodes_in_zone_b = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build() + ); + List nodes_in_zone_c = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() + ); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("7").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + ensureGreen(); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse response = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .get(); + assertEquals(response.isAcknowledged(), true); + + // put call made on a data node in zone a + response = internalCluster().client(randomFrom(nodes_in_zone_a.get(0), nodes_in_zone_a.get(1))) + .admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .get(); + assertEquals(response.isAcknowledged(), true); + } + + public void testPutWeightedRouting_InvalidAwarenessAttribute() { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + internalCluster().startNodes( + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() + ); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("3").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + ensureGreen(); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0); + WeightedRouting weightedRouting = new WeightedRouting("zone1", weights); + + assertThrows( + IllegalArgumentException.class, + () -> client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).get() + ); + } + + public void testPutWeightedRouting_MoreThanOneZoneHasZeroWeight() { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + internalCluster().startNodes( + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() + ); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("3").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + ensureGreen(); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 0.0, "c", 0.0); + WeightedRouting weightedRouting = new WeightedRouting("zone1", weights); + + assertThrows( + IllegalArgumentException.class, + () -> client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).get() + ); + } + + public void testGetWeightedRouting_WeightsNotSet() { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + internalCluster().startNodes( + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() + ); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("3").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + ensureGreen(); + + ClusterGetWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareGetWeightedRouting() + .setAwarenessAttribute("zone") + .get(); + assertNull(weightedRoutingResponse.weights()); + } + + public void testGetWeightedRouting_WeightsAreSet() throws IOException { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + int nodeCountPerAZ = 2; + + logger.info("--> starting a dedicated cluster manager node"); + internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build()); + + logger.info("--> starting 2 nodes on zones 'a' & 'b' & 'c'"); + List nodes_in_zone_a = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build() + ); + List nodes_in_zone_b = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build() + ); + List nodes_in_zone_c = internalCluster().startDataOnlyNodes( + nodeCountPerAZ, + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() + ); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("7").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + ensureGreen(); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + // put api call to set weights + ClusterPutWeightedRoutingResponse response = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .get(); + assertEquals(response.isAcknowledged(), true); + + // get api call to fetch weights + ClusterGetWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareGetWeightedRouting() + .setAwarenessAttribute("zone") + .get(); + assertEquals(weightedRouting, weightedRoutingResponse.weights()); + + // get api to fetch local node weight for a node in zone a + weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_a.get(0), nodes_in_zone_a.get(1))) + .admin() + .cluster() + .prepareGetWeightedRouting() + .setAwarenessAttribute("zone") + .setRequestLocal(true) + .get(); + assertEquals(weightedRouting, weightedRoutingResponse.weights()); + assertEquals("1.0", weightedRoutingResponse.getLocalNodeWeight()); + + // get api to fetch local node weight for a node in zone b + weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_b.get(0), nodes_in_zone_b.get(1))) + .admin() + .cluster() + .prepareGetWeightedRouting() + .setAwarenessAttribute("zone") + .setRequestLocal(true) + .get(); + assertEquals(weightedRouting, weightedRoutingResponse.weights()); + assertEquals("2.0", weightedRoutingResponse.getLocalNodeWeight()); + + // get api to fetch local node weight for a node in zone c + weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_c.get(0), nodes_in_zone_c.get(1))) + .admin() + .cluster() + .prepareGetWeightedRouting() + .setAwarenessAttribute("zone") + .setRequestLocal(true) + .get(); + assertEquals(weightedRouting, weightedRoutingResponse.weights()); + assertEquals("3.0", weightedRoutingResponse.getLocalNodeWeight()); + } +} diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index ac9fd8e5fea3e..d959d6828a46b 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -83,6 +83,8 @@ import org.opensearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsAction; import org.opensearch.action.admin.cluster.shards.TransportClusterSearchShardsAction; +import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingAction; +import org.opensearch.action.admin.cluster.shards.routing.weighted.get.TransportGetWeightedRoutingAction; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterAddWeightedRoutingAction; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.TransportAddWeightedRoutingAction; import org.opensearch.action.admin.cluster.snapshots.clone.CloneSnapshotAction; @@ -299,6 +301,7 @@ import org.opensearch.rest.action.admin.cluster.RestCloneSnapshotAction; import org.opensearch.rest.action.admin.cluster.RestClusterAllocationExplainAction; import org.opensearch.rest.action.admin.cluster.RestClusterGetSettingsAction; +import org.opensearch.rest.action.admin.cluster.RestClusterGetWeightedRoutingAction; import org.opensearch.rest.action.admin.cluster.RestClusterHealthAction; import org.opensearch.rest.action.admin.cluster.RestClusterPutWeightedRoutingAction; import org.opensearch.rest.action.admin.cluster.RestClusterRerouteAction; @@ -573,6 +576,7 @@ public void reg actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class); actions.register(ClusterAddWeightedRoutingAction.INSTANCE, TransportAddWeightedRoutingAction.class); + actions.register(ClusterGetWeightedRoutingAction.INSTANCE, TransportGetWeightedRoutingAction.class); actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class); actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class); actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class); @@ -759,6 +763,7 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestOpenIndexAction()); registerHandler.accept(new RestAddIndexBlockAction()); registerHandler.accept(new RestClusterPutWeightedRoutingAction()); + registerHandler.accept(new RestClusterGetWeightedRoutingAction()); registerHandler.accept(new RestUpdateSettingsAction()); registerHandler.accept(new RestGetSettingsAction()); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingAction.java new file mode 100644 index 0000000000000..7662b7cc6fcc8 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingAction.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.get; + +import org.opensearch.action.ActionType; + +/** + * Action to get weights for weighted round-robin search routing policy. + * + * @opensearch.internal + */ +public class ClusterGetWeightedRoutingAction extends ActionType { + public static final ClusterGetWeightedRoutingAction INSTANCE = new ClusterGetWeightedRoutingAction(); + public static final String NAME = "cluster:admin/routing/awareness/weights/get"; + + private ClusterGetWeightedRoutingAction() { + super(NAME, ClusterGetWeightedRoutingResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingRequest.java new file mode 100644 index 0000000000000..aaa000baa95f3 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingRequest.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.get; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +import static org.opensearch.action.ValidateActions.addValidationError; + +/** + * Request to get weights for weighted round-robin search routing policy. + * + * @opensearch.internal + */ +public class ClusterGetWeightedRoutingRequest extends ClusterManagerNodeReadRequest { + + String awarenessAttribute; + + public String getAwarenessAttribute() { + return awarenessAttribute; + } + + public void setAwarenessAttribute(String awarenessAttribute) { + this.awarenessAttribute = awarenessAttribute; + } + + public ClusterGetWeightedRoutingRequest(String awarenessAttribute) { + this.awarenessAttribute = awarenessAttribute; + } + + public ClusterGetWeightedRoutingRequest() {} + + public ClusterGetWeightedRoutingRequest(StreamInput in) throws IOException { + super(in); + awarenessAttribute = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(awarenessAttribute); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (awarenessAttribute == null || awarenessAttribute.isEmpty()) { + validationException = addValidationError("Awareness attribute is missing", validationException); + } + return validationException; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingRequestBuilder.java new file mode 100644 index 0000000000000..82f4c1106461d --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingRequestBuilder.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.get; + +import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadOperationRequestBuilder; +import org.opensearch.client.OpenSearchClient; + +/** + * Request builder to get weights for weighted round-robin search routing policy. + * + * @opensearch.internal + */ +public class ClusterGetWeightedRoutingRequestBuilder extends ClusterManagerNodeReadOperationRequestBuilder< + ClusterGetWeightedRoutingRequest, + ClusterGetWeightedRoutingResponse, + ClusterGetWeightedRoutingRequestBuilder> { + + public ClusterGetWeightedRoutingRequestBuilder(OpenSearchClient client, ClusterGetWeightedRoutingAction action) { + super(client, action, new ClusterGetWeightedRoutingRequest()); + } + + public ClusterGetWeightedRoutingRequestBuilder setRequestLocal(boolean local) { + request.local(local); + return this; + } + + public ClusterGetWeightedRoutingRequestBuilder setAwarenessAttribute(String attribute) { + request.setAwarenessAttribute(attribute); + return this; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java new file mode 100644 index 0000000000000..bb77576b63d20 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java @@ -0,0 +1,125 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.get; + +import org.opensearch.OpenSearchParseException; +import org.opensearch.action.ActionResponse; + +import org.opensearch.cluster.routing.WeightedRouting; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.ToXContentObject; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; + +/** + * Response from fetching weights for weighted round-robin search routing policy. + * + * @opensearch.internal + */ +public class ClusterGetWeightedRoutingResponse extends ActionResponse implements ToXContentObject { + private WeightedRouting weightedRouting; + private String localNodeWeight; + private static final String NODE_WEIGHT = "node_weight"; + + public String getLocalNodeWeight() { + return localNodeWeight; + } + + ClusterGetWeightedRoutingResponse() { + this.weightedRouting = null; + } + + public ClusterGetWeightedRoutingResponse(String localNodeWeight, WeightedRouting weightedRouting) { + this.localNodeWeight = localNodeWeight; + this.weightedRouting = weightedRouting; + } + + ClusterGetWeightedRoutingResponse(StreamInput in) throws IOException { + if (in.available() != 0) { + this.weightedRouting = new WeightedRouting(in); + } + } + + /** + * List of weights to return + * + * @return list or weights + */ + public WeightedRouting weights() { + return this.weightedRouting; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + if (weightedRouting != null) { + weightedRouting.writeTo(out); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (this.weightedRouting != null) { + for (Map.Entry entry : weightedRouting.weights().entrySet()) { + builder.field(entry.getKey(), entry.getValue().toString()); + } + if (localNodeWeight != null) { + builder.field(NODE_WEIGHT, localNodeWeight); + } + } + builder.endObject(); + return builder; + } + + public static ClusterGetWeightedRoutingResponse fromXContent(XContentParser parser) throws IOException { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + XContentParser.Token token; + String attrKey = null, attrValue = null; + String localNodeWeight = null; + Map weights = new HashMap<>(); + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + attrKey = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + attrValue = parser.text(); + if (attrKey != null && attrKey.equals(NODE_WEIGHT)) { + localNodeWeight = attrValue; + } else if (attrKey != null) { + weights.put(attrKey, Double.parseDouble(attrValue)); + } + } else { + throw new OpenSearchParseException("failed to parse weighted routing response"); + } + } + WeightedRouting weightedRouting = new WeightedRouting("", weights); + return new ClusterGetWeightedRoutingResponse(localNodeWeight, weightedRouting); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ClusterGetWeightedRoutingResponse that = (ClusterGetWeightedRoutingResponse) o; + return weightedRouting.equals(that.weightedRouting) && localNodeWeight.equals(that.localNodeWeight); + } + + @Override + public int hashCode() { + return Objects.hash(weightedRouting, localNodeWeight); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java new file mode 100644 index 0000000000000..9421967a5df26 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java @@ -0,0 +1,111 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.get; + +import org.opensearch.action.ActionListener; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeReadAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; + +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.WeightedRouting; +import org.opensearch.cluster.routing.WeightedRoutingService; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; + +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; + +/** + * Transport action for getting weights for weighted round-robin search routing policy + * + * @opensearch.internal + */ +public class TransportGetWeightedRoutingAction extends TransportClusterManagerNodeReadAction< + ClusterGetWeightedRoutingRequest, + ClusterGetWeightedRoutingResponse> { + private static final Logger logger = LogManager.getLogger(TransportGetWeightedRoutingAction.class); + private final WeightedRoutingService weightedRoutingService; + + @Inject + public TransportGetWeightedRoutingAction( + TransportService transportService, + ClusterService clusterService, + WeightedRoutingService weightedRoutingService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + ClusterGetWeightedRoutingAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + ClusterGetWeightedRoutingRequest::new, + indexNameExpressionResolver + ); + this.weightedRoutingService = weightedRoutingService; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected ClusterGetWeightedRoutingResponse read(StreamInput in) throws IOException { + return new ClusterGetWeightedRoutingResponse(in); + } + + @Override + protected ClusterBlockException checkBlock(ClusterGetWeightedRoutingRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } + + @Override + protected void clusterManagerOperation( + final ClusterGetWeightedRoutingRequest request, + ClusterState state, + final ActionListener listener + ) { + try { + weightedRoutingService.verifyAwarenessAttribute(request.getAwarenessAttribute()); + WeightedRoutingMetadata weightedRoutingMetadata = state.metadata().custom(WeightedRoutingMetadata.TYPE); + ClusterGetWeightedRoutingResponse clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse(); + String weight = null; + if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting() != null) { + WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting(); + if (request.local()) { + DiscoveryNode localNode = state.getNodes().getLocalNode(); + if (localNode.getAttributes().get(request.getAwarenessAttribute()) != null) { + String attrVal = localNode.getAttributes().get(request.getAwarenessAttribute()); + if (weightedRouting.weights().containsKey(attrVal)) { + weight = weightedRouting.weights().get(attrVal).toString(); + } + } + } + clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse(weight, weightedRouting); + } + listener.onResponse(clusterGetWeightedRoutingResponse); + } catch (Exception ex) { + listener.onFailure(ex); + } + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/package-info.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/package-info.java new file mode 100644 index 0000000000000..45e5b32b72e50 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** get weighted-round robin shard routing weights. */ +package org.opensearch.action.admin.cluster.shards.routing.weighted.get; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/TransportAddWeightedRoutingAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/TransportAddWeightedRoutingAction.java index 8c29ab2199848..249e313c1f53b 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/TransportAddWeightedRoutingAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/TransportAddWeightedRoutingAction.java @@ -17,20 +17,13 @@ import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.routing.WeightedRoutingService; -import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Settings; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.List; -import java.util.Locale; - -import static org.opensearch.action.ValidateActions.addValidationError; /** * Transport action for updating weights for weighted round-robin search routing policy @@ -42,12 +35,9 @@ public class TransportAddWeightedRoutingAction extends TransportClusterManagerNo ClusterPutWeightedRoutingResponse> { private final WeightedRoutingService weightedRoutingService; - private volatile List awarenessAttributes; @Inject public TransportAddWeightedRoutingAction( - Settings settings, - ClusterSettings clusterSettings, TransportService transportService, ClusterService clusterService, WeightedRoutingService weightedRoutingService, @@ -65,19 +55,6 @@ public TransportAddWeightedRoutingAction( indexNameExpressionResolver ); this.weightedRoutingService = weightedRoutingService; - this.awarenessAttributes = AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings); - clusterSettings.addSettingsUpdateConsumer( - AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, - this::setAwarenessAttributes - ); - } - - List getAwarenessAttributes() { - return awarenessAttributes; - } - - private void setAwarenessAttributes(List awarenessAttributes) { - this.awarenessAttributes = awarenessAttributes; } @Override @@ -96,7 +73,12 @@ protected void clusterManagerOperation( ClusterState state, ActionListener listener ) throws Exception { - verifyAwarenessAttribute(request.getWeightedRouting().attributeName()); + try { + weightedRoutingService.verifyAwarenessAttribute(request.getWeightedRouting().attributeName()); + } catch (ActionRequestValidationException ex) { + listener.onFailure(ex); + return; + } weightedRoutingService.registerWeightedRoutingMetadata( request, ActionListener.delegateFailure( @@ -108,21 +90,8 @@ protected void clusterManagerOperation( ); } - private void verifyAwarenessAttribute(String attributeName) { - if (getAwarenessAttributes().contains(attributeName) == false) { - ActionRequestValidationException validationException = null; - - validationException = addValidationError( - String.format(Locale.ROOT, "invalid awareness attribute %s requested for updating weighted routing", attributeName), - validationException - ); - throw validationException; - } - } - @Override protected ClusterBlockException checkBlock(ClusterPutWeightedRoutingRequest request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); } - } diff --git a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java index eacd7ff16e6af..c811b788d9cf6 100644 --- a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java +++ b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java @@ -92,6 +92,9 @@ import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequestBuilder; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingRequest; +import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingRequestBuilder; +import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequest; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequestBuilder; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; @@ -816,6 +819,21 @@ public interface ClusterAdminClient extends OpenSearchClient { */ ClusterPutWeightedRoutingRequestBuilder prepareWeightedRouting(); + /** + * Gets weights for weighted round-robin search routing policy. + */ + ActionFuture getWeightedRouting(ClusterGetWeightedRoutingRequest request); + + /** + * Gets weights for weighted round-robin search routing policy. + */ + void getWeightedRouting(ClusterGetWeightedRoutingRequest request, ActionListener listener); + + /** + * Gets weights for weighted round-robin search routing policy. + */ + ClusterGetWeightedRoutingRequestBuilder prepareGetWeightedRouting(); + /** * Decommission awareness attribute */ diff --git a/server/src/main/java/org/opensearch/client/Requests.java b/server/src/main/java/org/opensearch/client/Requests.java index a033933cf6696..c8729a0d498f4 100644 --- a/server/src/main/java/org/opensearch/client/Requests.java +++ b/server/src/main/java/org/opensearch/client/Requests.java @@ -49,6 +49,7 @@ import org.opensearch.action.admin.cluster.reroute.ClusterRerouteRequest; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequest; +import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingRequest; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequest; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; @@ -561,6 +562,15 @@ public static ClusterPutWeightedRoutingRequest putWeightedRoutingRequest(String return new ClusterPutWeightedRoutingRequest(attributeName); } + /** + * Gets weights for weighted round-robin search routing policy + * + * @return get weight request + */ + public static ClusterGetWeightedRoutingRequest getWeightedRoutingRequest(String attributeName) { + return new ClusterGetWeightedRoutingRequest(attributeName); + } + /** * Creates a new decommission request. * diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index f74acd01f107c..65199b22e8e72 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -118,6 +118,10 @@ import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequestBuilder; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingAction; +import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingRequest; +import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingRequestBuilder; +import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterAddWeightedRoutingAction; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequest; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequestBuilder; @@ -1302,6 +1306,24 @@ public ClusterPutWeightedRoutingRequestBuilder prepareWeightedRouting() { return new ClusterPutWeightedRoutingRequestBuilder(this, ClusterAddWeightedRoutingAction.INSTANCE); } + @Override + public ActionFuture getWeightedRouting(ClusterGetWeightedRoutingRequest request) { + return execute(ClusterGetWeightedRoutingAction.INSTANCE, request); + } + + @Override + public void getWeightedRouting( + ClusterGetWeightedRoutingRequest request, + ActionListener listener + ) { + execute(ClusterGetWeightedRoutingAction.INSTANCE, request, listener); + } + + @Override + public ClusterGetWeightedRoutingRequestBuilder prepareGetWeightedRouting() { + return new ClusterGetWeightedRoutingRequestBuilder(this, ClusterGetWeightedRoutingAction.INSTANCE); + } + @Override public void deleteDanglingIndex(DeleteDanglingIndexRequest request, ActionListener listener) { execute(DeleteDanglingIndexAction.INSTANCE, request, listener); diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java index da454865ac866..7ff2b23630a3c 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java @@ -12,18 +12,27 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequest; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.ack.ClusterStateUpdateResponse; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; +import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; import org.opensearch.threadpool.ThreadPool; +import java.util.List; +import java.util.Locale; + +import static org.opensearch.action.ValidateActions.addValidationError; + /** * * Service responsible for updating cluster state metadata with weighted routing weights */ @@ -31,11 +40,22 @@ public class WeightedRoutingService { private static final Logger logger = LogManager.getLogger(WeightedRoutingService.class); private final ClusterService clusterService; private final ThreadPool threadPool; + private volatile List awarenessAttributes; @Inject - public WeightedRoutingService(ClusterService clusterService, ThreadPool threadPool) { + public WeightedRoutingService( + ClusterService clusterService, + ThreadPool threadPool, + Settings settings, + ClusterSettings clusterSettings + ) { this.clusterService = clusterService; this.threadPool = threadPool; + this.awarenessAttributes = AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer( + AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, + this::setAwarenessAttributes + ); } public void registerWeightedRoutingMetadata( @@ -85,4 +105,23 @@ private boolean checkIfSameWeightsInMetadata( ) { return newWeightedRoutingMetadata.getWeightedRouting().equals(oldWeightedRoutingMetadata.getWeightedRouting()); } + + List getAwarenessAttributes() { + return awarenessAttributes; + } + + private void setAwarenessAttributes(List awarenessAttributes) { + this.awarenessAttributes = awarenessAttributes; + } + + public void verifyAwarenessAttribute(String attributeName) { + if (getAwarenessAttributes().contains(attributeName) == false) { + ActionRequestValidationException validationException = null; + validationException = addValidationError( + String.format(Locale.ROOT, "invalid awareness attribute %s requested for updating weighted routing", attributeName), + validationException + ); + throw validationException; + } + } } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterGetWeightedRoutingAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterGetWeightedRoutingAction.java new file mode 100644 index 0000000000000..7c9d1190f0b1d --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterGetWeightedRoutingAction.java @@ -0,0 +1,52 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.admin.cluster; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingRequest; +import org.opensearch.client.Requests; +import org.opensearch.client.node.NodeClient; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.opensearch.rest.RestRequest.Method.GET; + +/** + * Fetch Weighted Round Robin based shard routing weights + * + * @opensearch.api + * + */ +public class RestClusterGetWeightedRoutingAction extends BaseRestHandler { + + private static final Logger logger = LogManager.getLogger(RestClusterGetWeightedRoutingAction.class); + + @Override + public List routes() { + return singletonList(new Route(GET, "/_cluster/routing/awareness/{attribute}/weights")); + } + + @Override + public String getName() { + return "get_weighted_routing_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + ClusterGetWeightedRoutingRequest getWeightedRoutingRequest = Requests.getWeightedRoutingRequest(request.param("attribute")); + getWeightedRoutingRequest.local(request.paramAsBoolean("local", getWeightedRoutingRequest.local())); + return channel -> client.admin().cluster().getWeightedRouting(getWeightedRoutingRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingRequestTests.java b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingRequestTests.java new file mode 100644 index 0000000000000..0a4dad4cbc597 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingRequestTests.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.action.shard.routing.weighted.get; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingRequest; +import org.opensearch.test.OpenSearchTestCase; + +public class ClusterGetWeightedRoutingRequestTests extends OpenSearchTestCase { + + public void testValidate_AwarenessAttributeIsSet() { + ClusterGetWeightedRoutingRequest request = new ClusterGetWeightedRoutingRequest(); + request.setAwarenessAttribute("zone"); + ActionRequestValidationException actionRequestValidationException = request.validate(); + assertNull(actionRequestValidationException); + } + + public void testValidate_AwarenessAttributeNotSet() { + ClusterGetWeightedRoutingRequest request = new ClusterGetWeightedRoutingRequest(); + ActionRequestValidationException actionRequestValidationException = request.validate(); + assertNotNull(actionRequestValidationException); + assertTrue(actionRequestValidationException.getMessage().contains("Awareness attribute is missing")); + } + + public void testValidate_AwarenessAttributeIsEmpty() { + ClusterGetWeightedRoutingRequest request = new ClusterGetWeightedRoutingRequest(); + request.setAwarenessAttribute(""); + ActionRequestValidationException actionRequestValidationException = request.validate(); + assertNotNull(actionRequestValidationException); + assertTrue(actionRequestValidationException.getMessage().contains("Awareness attribute is missing")); + } + +} diff --git a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java new file mode 100644 index 0000000000000..e9add55ca774b --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.action.shard.routing.weighted.get; + +import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse; +import org.opensearch.cluster.routing.WeightedRouting; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.test.AbstractXContentTestCase; + +import java.io.IOException; +import java.util.Map; + +public class ClusterGetWeightedRoutingResponseTests extends AbstractXContentTestCase { + @Override + protected ClusterGetWeightedRoutingResponse createTestInstance() { + Map weights = Map.of("zone_A", 1.0, "zone_B", 0.0, "zone_C", 1.0); + WeightedRouting weightedRouting = new WeightedRouting("", weights); + ClusterGetWeightedRoutingResponse response = new ClusterGetWeightedRoutingResponse("1", weightedRouting); + return response; + } + + @Override + protected ClusterGetWeightedRoutingResponse doParseInstance(XContentParser parser) throws IOException { + return ClusterGetWeightedRoutingResponse.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } + +} diff --git a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/TransportGetWeightedRoutingActionTests.java b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/TransportGetWeightedRoutingActionTests.java new file mode 100644 index 0000000000000..f28e932e068ac --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/TransportGetWeightedRoutingActionTests.java @@ -0,0 +1,262 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.action.shard.routing.weighted.get; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.Version; +import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingAction; +import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingRequestBuilder; +import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse; +import org.opensearch.action.admin.cluster.shards.routing.weighted.get.TransportGetWeightedRoutingAction; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.ActionTestUtils; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.WeightedRouting; +import org.opensearch.cluster.routing.WeightedRoutingService; +import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.ClusterServiceUtils; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.transport.MockTransport; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static java.util.Collections.emptySet; +import static org.mockito.Mockito.mock; + +public class TransportGetWeightedRoutingActionTests extends OpenSearchTestCase { + + private ThreadPool threadPool; + private ClusterService clusterService; + private TransportService transportService; + private WeightedRoutingService weightedRoutingService; + private TransportGetWeightedRoutingAction transportGetWeightedRoutingAction; + private ClusterSettings clusterSettings; + NodeClient client; + + final private static Set CLUSTER_MANAGER_ROLE = Collections.unmodifiableSet( + new HashSet<>(Collections.singletonList(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + ); + + final private static Set DATA_ROLE = Collections.unmodifiableSet( + new HashSet<>(Collections.singletonList(DiscoveryNodeRole.DATA_ROLE)) + ); + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("test", Settings.EMPTY); + clusterService = ClusterServiceUtils.createClusterService(threadPool); + } + + @Before + public void setUpService() { + ClusterState clusterState = ClusterState.builder(new ClusterName("test")).build(); + clusterState = addClusterManagerNodes(clusterState); + clusterState = addDataNodes(clusterState); + clusterState = setLocalNode(clusterState, "nodeA1"); + + ClusterState.Builder builder = ClusterState.builder(clusterState); + ClusterServiceUtils.setState(clusterService, builder); + + final MockTransport transport = new MockTransport(); + transportService = transport.createTransportService( + Settings.EMPTY, + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundTransportAddress -> clusterService.state().nodes().get("nodes1"), + null, + Collections.emptySet() + + ); + + Settings.Builder settingsBuilder = Settings.builder() + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone"); + + clusterSettings = new ClusterSettings(settingsBuilder.build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + transportService.start(); + transportService.acceptIncomingRequests(); + + this.weightedRoutingService = new WeightedRoutingService(clusterService, threadPool, settingsBuilder.build(), clusterSettings); + + this.transportGetWeightedRoutingAction = new TransportGetWeightedRoutingAction( + transportService, + clusterService, + weightedRoutingService, + threadPool, + new ActionFilters(emptySet()), + mock(IndexNameExpressionResolver.class) + ); + client = new NodeClient(Settings.EMPTY, threadPool); + } + + private ClusterState addDataNodes(ClusterState clusterState) { + clusterState = addDataNodeForAZone(clusterState, "zone_A", "nodeA1", "nodeA2", "nodeA3"); + clusterState = addDataNodeForAZone(clusterState, "zone_B", "nodeB1", "nodeB2", "nodeB3"); + clusterState = addDataNodeForAZone(clusterState, "zone_C", "nodeC1", "nodeC2", "nodeC3"); + return clusterState; + } + + private ClusterState addClusterManagerNodes(ClusterState clusterState) { + clusterState = addClusterManagerNodeForAZone(clusterState, "zone_A", "nodeMA"); + clusterState = addClusterManagerNodeForAZone(clusterState, "zone_B", "nodeMB"); + clusterState = addClusterManagerNodeForAZone(clusterState, "zone_C", "nodeMC"); + return clusterState; + } + + private ClusterState addDataNodeForAZone(ClusterState clusterState, String zone, String... nodeIds) { + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); + org.opensearch.common.collect.List.of(nodeIds) + .forEach( + nodeId -> nodeBuilder.add( + new DiscoveryNode( + nodeId, + buildNewFakeTransportAddress(), + Collections.singletonMap("zone", zone), + DATA_ROLE, + Version.CURRENT + ) + ) + ); + clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + return clusterState; + } + + private ClusterState addClusterManagerNodeForAZone(ClusterState clusterState, String zone, String... nodeIds) { + + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); + org.opensearch.common.collect.List.of(nodeIds) + .forEach( + nodeId -> nodeBuilder.add( + new DiscoveryNode( + nodeId, + buildNewFakeTransportAddress(), + Collections.singletonMap("zone", zone), + CLUSTER_MANAGER_ROLE, + Version.CURRENT + ) + ) + ); + clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + return clusterState; + } + + private ClusterState setLocalNode(ClusterState clusterState, String nodeId) { + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); + nodeBuilder.localNodeId(nodeId); + nodeBuilder.clusterManagerNodeId(nodeId); + clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + return clusterState; + } + + private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map weights) { + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); + clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); + return clusterState; + } + + public void testGetWeightedRouting_WeightsNotSetInMetadata() { + + final ClusterGetWeightedRoutingRequestBuilder request = new ClusterGetWeightedRoutingRequestBuilder( + client, + ClusterGetWeightedRoutingAction.INSTANCE + ); + request.setAwarenessAttribute("zone"); + ClusterState state = clusterService.state(); + + ClusterGetWeightedRoutingResponse response = ActionTestUtils.executeBlocking(transportGetWeightedRoutingAction, request.request()); + assertEquals(response.getLocalNodeWeight(), null); + assertEquals(response.weights(), null); + } + + public void testGetWeightedRouting_WeightsSetInMetadata() { + ClusterGetWeightedRoutingRequestBuilder request = new ClusterGetWeightedRoutingRequestBuilder( + client, + ClusterGetWeightedRoutingAction.INSTANCE + ); + request.setAwarenessAttribute("zone"); + + ClusterState state = clusterService.state(); + state = setLocalNode(state, "nodeB1"); + Map weights = Map.of("zone_A", 1.0, "zone_B", 0.0, "zone_C", 1.0); + state = setWeightedRoutingWeights(state, weights); + ClusterState.Builder builder = ClusterState.builder(state); + ClusterServiceUtils.setState(clusterService, builder); + + ClusterGetWeightedRoutingResponse response = ActionTestUtils.executeBlocking(transportGetWeightedRoutingAction, request.request()); + assertEquals(weights, response.weights().weights()); + } + + public void testGetWeightedRoutingLocalWeight_WeightsSetInMetadata() { + + ClusterGetWeightedRoutingRequestBuilder request = new ClusterGetWeightedRoutingRequestBuilder( + client, + ClusterGetWeightedRoutingAction.INSTANCE + ); + + request.setRequestLocal(true); + request.setAwarenessAttribute("zone"); + + ClusterState state = clusterService.state(); + state = setLocalNode(state, "nodeB1"); + Map weights = Map.of("zone_A", 1.0, "zone_B", 0.0, "zone_C", 1.0); + state = setWeightedRoutingWeights(state, weights); + ClusterState.Builder builder = ClusterState.builder(state); + ClusterServiceUtils.setState(clusterService, builder); + + ClusterGetWeightedRoutingResponse response = ActionTestUtils.executeBlocking(transportGetWeightedRoutingAction, request.request()); + assertEquals("0.0", response.getLocalNodeWeight()); + } + + public void testGetWeightedRoutingLocalWeight_WeightsNotSetInMetadata() { + + ClusterGetWeightedRoutingRequestBuilder request = new ClusterGetWeightedRoutingRequestBuilder( + client, + ClusterGetWeightedRoutingAction.INSTANCE + ); + + request.setRequestLocal(true); + request.setAwarenessAttribute("zone"); + + ClusterState state = clusterService.state(); + state = setLocalNode(state, "nodeB1"); + ClusterState.Builder builder = ClusterState.builder(state); + ClusterServiceUtils.setState(clusterService, builder); + + ClusterGetWeightedRoutingResponse response = ActionTestUtils.executeBlocking(transportGetWeightedRoutingAction, request.request()); + assertEquals(null, response.getLocalNodeWeight()); + } + + @After + public void shutdown() { + clusterService.stop(); + threadPool.shutdown(); + } + +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java index e5cca998d3f06..557c5c7ac910d 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java @@ -12,6 +12,7 @@ import org.junit.Before; import org.opensearch.Version; import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterAddWeightedRoutingAction; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequestBuilder; import org.opensearch.client.node.NodeClient; @@ -92,7 +93,7 @@ public void setUpService() { transportService.start(); transportService.acceptIncomingRequests(); - this.weightedRoutingService = new WeightedRoutingService(clusterService, threadPool); + this.weightedRoutingService = new WeightedRoutingService(clusterService, threadPool, settingsBuilder.build(), clusterSettings); client = new NodeClient(Settings.EMPTY, threadPool); } @@ -231,4 +232,20 @@ public void onFailure(Exception e) { weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } + + public void testVerifyAwarenessAttribute_InvalidAttributeName() { + assertThrows( + "invalid awareness attribute %s requested for updating weighted routing", + ActionRequestValidationException.class, + () -> weightedRoutingService.verifyAwarenessAttribute("zone2") + ); + } + + public void testVerifyAwarenessAttribute_ValidAttributeName() { + try { + weightedRoutingService.verifyAwarenessAttribute("zone"); + } catch (Exception e) { + fail("verify awareness attribute should not fail"); + } + } }