From 72312a17a46db574ed8dd4d04cec2f56137e0634 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Sun, 1 Sep 2024 22:16:59 -0700 Subject: [PATCH 1/4] Add routing preference to route requests only to search replicas. This adds SEARCH_REPLICA routing preference and defaults to this preference for indices that have search replicas. Signed-off-by: Marc Handalian --- .../indices/settings/SearchOnlyReplicaIT.java | 37 ++++++++- .../routing/IndexShardRoutingTable.java | 17 +++++ .../cluster/routing/OperationRouting.java | 12 +++ .../cluster/routing/Preference.java | 7 ++ .../routing/OperationRoutingTests.java | 76 +++++++++++++++++++ .../ClusterStateCreationUtils.java | 26 +++++++ 6 files changed, 174 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java index 5fc8e30ed2c7a..6bd91df1de66f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java @@ -8,14 +8,17 @@ package org.opensearch.indices.settings; +import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.Preference; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.query.QueryBuilders; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; @@ -110,7 +113,6 @@ public void testFailoverWithSearchReplica_WithWriterReplicas() throws IOExceptio // add back a node internalCluster().startDataOnlyNode(); ensureGreen(TEST_INDEX); - } public void testFailoverWithSearchReplica_WithoutWriterReplicas() throws IOException { @@ -175,6 +177,39 @@ public void testSearchReplicaScaling() { assertActiveSearchShards(0); } + public void testSearchReplicaRoutingPreference() throws IOException { + int numSearchReplicas = 1; + int numWriterReplicas = 1; + internalCluster().startClusterManagerOnlyNode(); + String primaryNodeName = internalCluster().startDataOnlyNode(); + createIndex( + TEST_INDEX, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numWriterReplicas) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, numSearchReplicas) + .build() + ); + ensureYellow(TEST_INDEX); + client().prepareIndex(TEST_INDEX).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + // add 2 nodes for the replicas + internalCluster().startDataOnlyNodes(2); + ensureGreen(TEST_INDEX); + + assertActiveShardCounts(numSearchReplicas, numWriterReplicas); + + // set preference to search replica here - we default to this when there are + // search replicas but tests will randomize this value if unset + SearchResponse response = client().prepareSearch(TEST_INDEX) + .setPreference(Preference.SEARCH_REPLICA.type()) + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + + String nodeId = response.getHits().getAt(0).getShard().getNodeId(); + IndexShardRoutingTable indexShardRoutingTable = getIndexShardRoutingTable(); + assertEquals(nodeId, indexShardRoutingTable.searchOnlyReplicas().get(0).currentNodeId()); + } + /** * Helper to assert counts of active shards for each type. */ diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index 00ddef507a162..f068963267144 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -676,6 +676,23 @@ public ShardIterator replicaFirstActiveInitializingShardsIt() { return new PlainShardIterator(shardId, ordered); } + /** + * Returns an iterator on replica shards. + */ + public ShardIterator searchReplicaActiveInitializingShardIt() { + LinkedList ordered = new LinkedList<>(); + for (ShardRouting replica : shuffler.shuffle(replicas)) { + if (replica.isSearchOnly()) { + if (replica.active()) { + ordered.addFirst(replica); + } else if (replica.initializing()) { + ordered.addLast(replica); + } + } + } + return new PlainShardIterator(shardId, ordered); + } + /** * Returns an iterator on active and initializing shards residing on the provided nodeId. */ diff --git a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java index 6242247f34a93..fe9e00b250e70 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java @@ -121,6 +121,7 @@ public class OperationRouting { private volatile boolean isFailOpenEnabled; private volatile boolean isStrictWeightedShardRouting; private volatile boolean ignoreWeightedRouting; + private final boolean isReaderWriterSplitEnabled; public OperationRouting(Settings settings, ClusterSettings clusterSettings) { // whether to ignore awareness attributes when routing requests @@ -141,6 +142,7 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) { clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_FAILOPEN_ENABLED, this::setFailOpenEnabled); clusterSettings.addSettingsUpdateConsumer(STRICT_WEIGHTED_SHARD_ROUTING_ENABLED, this::setStrictWeightedShardRouting); clusterSettings.addSettingsUpdateConsumer(IGNORE_WEIGHTED_SHARD_ROUTING, this::setIgnoreWeightedRouting); + this.isReaderWriterSplitEnabled = FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(settings); } void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) { @@ -254,6 +256,14 @@ public GroupShardsIterator searchShards( preference = Preference.PRIMARY_FIRST.type(); } + if (isReaderWriterSplitEnabled) { + if (preference == null || preference.isEmpty()) { + if (indexMetadataForShard.getNumberOfSearchOnlyReplicas() > 0) { + preference = Preference.SEARCH_REPLICA.type(); + } + } + } + ShardIterator iterator = preferenceActiveShardIterator( shard, clusterState.nodes().getLocalNodeId(), @@ -366,6 +376,8 @@ private ShardIterator preferenceActiveShardIterator( return indexShard.primaryFirstActiveInitializingShardsIt(); case REPLICA_FIRST: return indexShard.replicaFirstActiveInitializingShardsIt(); + case SEARCH_REPLICA: + return indexShard.searchReplicaActiveInitializingShardIt(); case ONLY_LOCAL: return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId); case ONLY_NODES: diff --git a/server/src/main/java/org/opensearch/cluster/routing/Preference.java b/server/src/main/java/org/opensearch/cluster/routing/Preference.java index a1ea01afa118f..093e3d5fd45f8 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/Preference.java +++ b/server/src/main/java/org/opensearch/cluster/routing/Preference.java @@ -73,6 +73,11 @@ public enum Preference { */ REPLICA_FIRST("_replica_first"), + /** + * Route to search replica shards + */ + SEARCH_REPLICA("_search_replica"), + /** * Route to the local shard only */ @@ -127,6 +132,8 @@ public static Preference parse(String preference) { return ONLY_LOCAL; case "_only_nodes": return ONLY_NODES; + case "_search_replica": + return SEARCH_REPLICA; default: throw new IllegalArgumentException("no Preference for [" + preferenceType + "]"); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java index ad8b48d56c417..aaeeb52ab5709 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java @@ -1118,6 +1118,82 @@ public void testPartialIndexPrimaryDefault() throws Exception { } } + public void testSearchReplicaDefaultRouting() throws Exception { + final int numShards = 1; + final int numReplicas = 2; + final int numSearchReplicas = 2; + final String indexName = "test"; + final String[] indexNames = new String[] { indexName }; + + ClusterService clusterService = null; + ThreadPool threadPool = null; + + try { + OperationRouting opRouting = new OperationRouting( + Settings.builder().put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, "true").build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + + ClusterState state = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas( + indexNames, + numShards, + numReplicas, + numSearchReplicas + ); + IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().index(indexName).getShards().get(0); + ShardId shardId = indexShardRoutingTable.searchOnlyReplicas().get(0).shardId(); + + threadPool = new TestThreadPool("testSearchReplicaDefaultRouting"); + clusterService = ClusterServiceUtils.createClusterService(threadPool); + + // add a search replica in initializing state: + DiscoveryNode node = new DiscoveryNode( + "node_initializing", + OpenSearchTestCase.buildNewFakeTransportAddress(), + Collections.emptyMap(), + new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES), + Version.CURRENT + ); + + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings(Settings.builder().put(state.metadata().index(indexName).getSettings()).build()) + .numberOfSearchReplicas(3) + .numberOfReplicas(2) + .build(); + Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()).put(indexMetadata, false).generateClusterUuidIfNeeded(); + IndexRoutingTable.Builder indexShardRoutingBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); + indexShardRoutingBuilder.addIndexShard(indexShardRoutingTable); + indexShardRoutingBuilder.addShard( + TestShardRouting.newShardRouting(shardId, node.getId(), null, false, true, ShardRoutingState.INITIALIZING, null) + ); + state = ClusterState.builder(state) + .routingTable(RoutingTable.builder().add(indexShardRoutingBuilder).build()) + .metadata(metadataBuilder.build()) + .build(); + + // Verify default preference is primary only + GroupShardsIterator groupIterator = opRouting.searchShards(state, indexNames, null, null); + assertThat("one group per shard", groupIterator.size(), equalTo(numShards)); + for (ShardIterator shardIterator : groupIterator) { + assertEquals("We should have 3 shards returned", shardIterator.size(), 3); + int i = 0; + for (ShardRouting shardRouting : shardIterator) { + assertTrue( + "Only search replicas should exist with preference SEARCH_REPLICA", + shardIterator.nextOrNull().isSearchOnly() + ); + if (i == shardIterator.size()) { + assertTrue("Initializing shard should appear last", shardRouting.initializing()); + assertFalse("Initializing shard should appear last", shardRouting.active()); + } + } + } + } finally { + IOUtils.close(clusterService); + terminate(threadPool); + } + } + private DiscoveryNode[] setupNodes() { // Sets up two data nodes in zone-a and one data node in zone-b List zones = Arrays.asList("a", "a", "b"); diff --git a/test/framework/src/main/java/org/opensearch/action/support/replication/ClusterStateCreationUtils.java b/test/framework/src/main/java/org/opensearch/action/support/replication/ClusterStateCreationUtils.java index 8650500df8e95..0c4e871b1330c 100644 --- a/test/framework/src/main/java/org/opensearch/action/support/replication/ClusterStateCreationUtils.java +++ b/test/framework/src/main/java/org/opensearch/action/support/replication/ClusterStateCreationUtils.java @@ -63,6 +63,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED; import static org.opensearch.test.OpenSearchTestCase.randomFrom; @@ -325,7 +326,18 @@ public static ClusterState stateWithAssignedPrimariesAndOneReplica(String index, * Creates cluster state with several indexes, shards and replicas and all shards STARTED. */ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indices, int numberOfShards, int numberOfReplicas) { + return stateWithAssignedPrimariesAndReplicas(indices, numberOfShards, numberOfReplicas, 0); + } + /** + * Creates cluster state with several indexes, shards and replicas and all shards STARTED. + */ + public static ClusterState stateWithAssignedPrimariesAndReplicas( + String[] indices, + int numberOfShards, + int numberOfReplicas, + int numberOfSearchReplicas + ) { int numberOfDataNodes = numberOfReplicas + 1; DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); for (int i = 0; i < numberOfDataNodes + 1; i++) { @@ -347,6 +359,7 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indice .put(SETTING_VERSION_CREATED, Version.CURRENT) .put(SETTING_NUMBER_OF_SHARDS, numberOfShards) .put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) + .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, numberOfSearchReplicas) .put(SETTING_CREATION_DATE, System.currentTimeMillis()) ) .build(); @@ -363,6 +376,19 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indice TestShardRouting.newShardRouting(index, i, newNode(replica + 1).getId(), null, false, ShardRoutingState.STARTED) ); } + for (int replica = numberOfReplicas; replica < numberOfSearchReplicas + numberOfReplicas; replica++) { + indexShardRoutingBuilder.addShard( + TestShardRouting.newShardRouting( + new ShardId(index, IndexMetadata.INDEX_UUID_NA_VALUE, i), + newNode(replica + 1).getId(), + null, + false, + true, + ShardRoutingState.STARTED, + null + ) + ); + } indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build()); } routingTableBuilder.add(indexRoutingTableBuilder.build()); From 5e08f77fd321bd9962668a32a233542f80a3ee9f Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Mon, 2 Sep 2024 10:09:50 -0700 Subject: [PATCH 2/4] add changelog entry Signed-off-by: Marc Handalian --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 453f32ee74878..f61d9b09676b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410)) - [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527)) - Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426)) +- [Reader Writer Separation] Add routing preference for search replicas ([#15563](https://github.com/opensearch-project/OpenSearch/pull/15563)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) From 5f020948a17c71275fe328f3a276ec0d027c23af Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 3 Sep 2024 12:47:46 -0700 Subject: [PATCH 3/4] PR feedback - extract a private method for replica filtering Signed-off-by: Marc Handalian --- .../routing/IndexShardRoutingTable.java | 21 +++++++------------ 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index 042c46f4b9378..f25cb14f65eca 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -647,15 +647,11 @@ public ShardIterator replicaActiveInitializingShardIt() { return new PlainShardIterator(shardId, Collections.emptyList()); } - LinkedList ordered = new LinkedList<>(); - for (ShardRouting replica : shuffler.shuffle(replicas)) { - if (replica.active()) { - ordered.addFirst(replica); - } else if (replica.initializing()) { - ordered.addLast(replica); - } - } - return new PlainShardIterator(shardId, ordered); + return filterAndOrderShards(replica -> true); + } + + public ShardIterator searchReplicaActiveInitializingShardIt() { + return filterAndOrderShards(ShardRouting::isSearchOnly); } /** @@ -686,13 +682,10 @@ public ShardIterator replicaFirstActiveInitializingShardsIt() { return new PlainShardIterator(shardId, ordered); } - /** - * Returns an iterator on replica shards. - */ - public ShardIterator searchReplicaActiveInitializingShardIt() { + private ShardIterator filterAndOrderShards(Predicate filter) { LinkedList ordered = new LinkedList<>(); for (ShardRouting replica : shuffler.shuffle(replicas)) { - if (replica.isSearchOnly()) { + if (filter.test(replica)) { if (replica.active()) { ordered.addFirst(replica); } else if (replica.initializing()) { From c8e59b44d538521b345bb45fd8db9e0c8ebd750e Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Wed, 4 Sep 2024 11:09:04 -0700 Subject: [PATCH 4/4] remove changelog entry Signed-off-by: Marc Handalian --- CHANGELOG.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 24c419791febb..f67e9a4c257f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add took time to request nodes stats ([#15054](https://github.com/opensearch-project/OpenSearch/pull/15054)) - [Workload Management] Add Get QueryGroup API Logic ([14709](https://github.com/opensearch-project/OpenSearch/pull/14709)) - [Workload Management] Add Settings for Workload Management feature ([#15028](https://github.com/opensearch-project/OpenSearch/pull/15028)) +- [Workload Management] Add Update QueryGroup API Logic ([#14775](https://github.com/opensearch-project/OpenSearch/pull/14775)) - [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897)) - Support filtering on a large list encoded by bitmap ([#14774](https://github.com/opensearch-project/OpenSearch/pull/14774)) - Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153)) @@ -38,17 +39,20 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428))) - Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494)) - Add index creation using the context field ([#15290](https://github.com/opensearch-project/OpenSearch/pull/15290)) -- Add fieldType to AbstractQueryBuilder and FieldSortBuilder ([#15328](https://github.com/opensearch-project/OpenSearch/pull/15328))) - [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410)) - [Range Queries] Add new approximateable query framework to short-circuit range queries ([#13788](https://github.com/opensearch-project/OpenSearch/pull/13788)) - [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527)) +- Add support for pluggable deciders for concurrent search ([#15363](https://github.com/opensearch-project/OpenSearch/pull/15363)) - Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426)) - [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291))) - Add support for comma-separated list of index names to be used with Snapshot Status API ([#15409](https://github.com/opensearch-project/OpenSearch/pull/15409)) - Add prefix support to hashed prefix & infix path types on remote store ([#15557](https://github.com/opensearch-project/OpenSearch/pull/15557)) - Optimise snapshot deletion to speed up snapshot deletion and creation ([#15568](https://github.com/opensearch-project/OpenSearch/pull/15568)) - [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218)) -- [Reader Writer Separation] Add routing preference for search replicas ([#15563](https://github.com/opensearch-project/OpenSearch/pull/15563)) +- Add canRemain method to TargetPoolAllocationDecider to move shards from local to remote pool for hot to warm tiering ([#15010](https://github.com/opensearch-project/OpenSearch/pull/15010)) +- ClusterManagerTaskThrottler Improvements ([#15508](https://github.com/opensearch-project/OpenSearch/pull/15508)) +- Reset DiscoveryNodes in all transport node actions request ([#15131](https://github.com/opensearch-project/OpenSearch/pull/15131)) +- Relax the join validation for Remote State publication ([#15471](https://github.com/opensearch-project/OpenSearch/pull/15471)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))