Skip to content

Commit

Permalink
[RW Separation] Add routing preference to route requests only to sear…
Browse files Browse the repository at this point in the history
…ch replicas. (#15563)

* Add routing preference to route requests only to search replicas.

This adds SEARCH_REPLICA routing preference and defaults to this preference
for indices that have search replicas.

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>

* add changelog entry

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>

* PR feedback - extract a private method for replica filtering

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>

* remove changelog entry

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>

---------

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
(cherry picked from commit 3681b52)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Sep 4, 2024
1 parent d2ae53e commit c62aeda
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@

package org.opensearch.indices.settings;

import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -110,7 +113,6 @@ public void testFailoverWithSearchReplica_WithWriterReplicas() throws IOExceptio
// add back a node
internalCluster().startDataOnlyNode();
ensureGreen(TEST_INDEX);

}

public void testFailoverWithSearchReplica_WithoutWriterReplicas() throws IOException {
Expand Down Expand Up @@ -175,6 +177,39 @@ public void testSearchReplicaScaling() {
assertActiveSearchShards(0);
}

public void testSearchReplicaRoutingPreference() throws IOException {
int numSearchReplicas = 1;
int numWriterReplicas = 1;
internalCluster().startClusterManagerOnlyNode();
String primaryNodeName = internalCluster().startDataOnlyNode();
createIndex(
TEST_INDEX,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numWriterReplicas)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, numSearchReplicas)
.build()
);
ensureYellow(TEST_INDEX);
client().prepareIndex(TEST_INDEX).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
// add 2 nodes for the replicas
internalCluster().startDataOnlyNodes(2);
ensureGreen(TEST_INDEX);

assertActiveShardCounts(numSearchReplicas, numWriterReplicas);

// set preference to search replica here - we default to this when there are
// search replicas but tests will randomize this value if unset
SearchResponse response = client().prepareSearch(TEST_INDEX)
.setPreference(Preference.SEARCH_REPLICA.type())
.setQuery(QueryBuilders.matchAllQuery())
.get();

String nodeId = response.getHits().getAt(0).getShard().getNodeId();
IndexShardRoutingTable indexShardRoutingTable = getIndexShardRoutingTable();
assertEquals(nodeId, indexShardRoutingTable.searchOnlyReplicas().get(0).currentNodeId());
}

/**
* Helper to assert counts of active shards for each type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,15 +648,11 @@ public ShardIterator replicaActiveInitializingShardIt() {
return new PlainShardIterator(shardId, Collections.emptyList());
}

LinkedList<ShardRouting> ordered = new LinkedList<>();
for (ShardRouting replica : shuffler.shuffle(replicas)) {
if (replica.active()) {
ordered.addFirst(replica);
} else if (replica.initializing()) {
ordered.addLast(replica);
}
}
return new PlainShardIterator(shardId, ordered);
return filterAndOrderShards(replica -> true);
}

public ShardIterator searchReplicaActiveInitializingShardIt() {
return filterAndOrderShards(ShardRouting::isSearchOnly);
}

/**
Expand Down Expand Up @@ -687,6 +683,20 @@ public ShardIterator replicaFirstActiveInitializingShardsIt() {
return new PlainShardIterator(shardId, ordered);
}

private ShardIterator filterAndOrderShards(Predicate<ShardRouting> filter) {
LinkedList<ShardRouting> ordered = new LinkedList<>();
for (ShardRouting replica : shuffler.shuffle(replicas)) {
if (filter.test(replica)) {
if (replica.active()) {
ordered.addFirst(replica);
} else if (replica.initializing()) {
ordered.addLast(replica);
}
}
}
return new PlainShardIterator(shardId, ordered);
}

/**
* Returns an iterator on active and initializing shards residing on the provided nodeId.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public class OperationRouting {
private volatile boolean isFailOpenEnabled;
private volatile boolean isStrictWeightedShardRouting;
private volatile boolean ignoreWeightedRouting;
private final boolean isReaderWriterSplitEnabled;

public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
// whether to ignore awareness attributes when routing requests
Expand All @@ -141,6 +142,7 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_FAILOPEN_ENABLED, this::setFailOpenEnabled);
clusterSettings.addSettingsUpdateConsumer(STRICT_WEIGHTED_SHARD_ROUTING_ENABLED, this::setStrictWeightedShardRouting);
clusterSettings.addSettingsUpdateConsumer(IGNORE_WEIGHTED_SHARD_ROUTING, this::setIgnoreWeightedRouting);
this.isReaderWriterSplitEnabled = FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(settings);
}

void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) {
Expand Down Expand Up @@ -254,6 +256,14 @@ public GroupShardsIterator<ShardIterator> searchShards(
preference = Preference.PRIMARY_FIRST.type();
}

if (isReaderWriterSplitEnabled) {
if (preference == null || preference.isEmpty()) {
if (indexMetadataForShard.getNumberOfSearchOnlyReplicas() > 0) {
preference = Preference.SEARCH_REPLICA.type();
}
}
}

ShardIterator iterator = preferenceActiveShardIterator(
shard,
clusterState.nodes().getLocalNodeId(),
Expand Down Expand Up @@ -365,6 +375,8 @@ private ShardIterator preferenceActiveShardIterator(
return indexShard.primaryFirstActiveInitializingShardsIt();
case REPLICA_FIRST:
return indexShard.replicaFirstActiveInitializingShardsIt();
case SEARCH_REPLICA:
return indexShard.searchReplicaActiveInitializingShardIt();
case ONLY_LOCAL:
return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
case ONLY_NODES:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public enum Preference {
*/
REPLICA_FIRST("_replica_first"),

/**
* Route to search replica shards
*/
SEARCH_REPLICA("_search_replica"),

/**
* Route to the local shard only
*/
Expand Down Expand Up @@ -127,6 +132,8 @@ public static Preference parse(String preference) {
return ONLY_LOCAL;
case "_only_nodes":
return ONLY_NODES;
case "_search_replica":
return SEARCH_REPLICA;
default:
throw new IllegalArgumentException("no Preference for [" + preferenceType + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,82 @@ public void testPartialIndexPrimaryDefault() throws Exception {
}
}

public void testSearchReplicaDefaultRouting() throws Exception {
final int numShards = 1;
final int numReplicas = 2;
final int numSearchReplicas = 2;
final String indexName = "test";
final String[] indexNames = new String[] { indexName };

ClusterService clusterService = null;
ThreadPool threadPool = null;

try {
OperationRouting opRouting = new OperationRouting(
Settings.builder().put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, "true").build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);

ClusterState state = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(
indexNames,
numShards,
numReplicas,
numSearchReplicas
);
IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().index(indexName).getShards().get(0);
ShardId shardId = indexShardRoutingTable.searchOnlyReplicas().get(0).shardId();

threadPool = new TestThreadPool("testSearchReplicaDefaultRouting");
clusterService = ClusterServiceUtils.createClusterService(threadPool);

// add a search replica in initializing state:
DiscoveryNode node = new DiscoveryNode(
"node_initializing",
OpenSearchTestCase.buildNewFakeTransportAddress(),
Collections.emptyMap(),
new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES),
Version.CURRENT
);

IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
.settings(Settings.builder().put(state.metadata().index(indexName).getSettings()).build())
.numberOfSearchReplicas(3)
.numberOfReplicas(2)
.build();
Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()).put(indexMetadata, false).generateClusterUuidIfNeeded();
IndexRoutingTable.Builder indexShardRoutingBuilder = IndexRoutingTable.builder(indexMetadata.getIndex());
indexShardRoutingBuilder.addIndexShard(indexShardRoutingTable);
indexShardRoutingBuilder.addShard(
TestShardRouting.newShardRouting(shardId, node.getId(), null, false, true, ShardRoutingState.INITIALIZING, null)
);
state = ClusterState.builder(state)
.routingTable(RoutingTable.builder().add(indexShardRoutingBuilder).build())
.metadata(metadataBuilder.build())
.build();

// Verify default preference is primary only
GroupShardsIterator<ShardIterator> groupIterator = opRouting.searchShards(state, indexNames, null, null);
assertThat("one group per shard", groupIterator.size(), equalTo(numShards));
for (ShardIterator shardIterator : groupIterator) {
assertEquals("We should have 3 shards returned", shardIterator.size(), 3);
int i = 0;
for (ShardRouting shardRouting : shardIterator) {
assertTrue(
"Only search replicas should exist with preference SEARCH_REPLICA",
shardIterator.nextOrNull().isSearchOnly()
);
if (i == shardIterator.size()) {
assertTrue("Initializing shard should appear last", shardRouting.initializing());
assertFalse("Initializing shard should appear last", shardRouting.active());
}
}
}
} finally {
IOUtils.close(clusterService);
terminate(threadPool);
}
}

private DiscoveryNode[] setupNodes() {
// Sets up two data nodes in zone-a and one data node in zone-b
List<String> zones = Arrays.asList("a", "a", "b");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
import static org.opensearch.test.OpenSearchTestCase.randomFrom;
Expand Down Expand Up @@ -325,7 +326,18 @@ public static ClusterState stateWithAssignedPrimariesAndOneReplica(String index,
* Creates cluster state with several indexes, shards and replicas and all shards STARTED.
*/
public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indices, int numberOfShards, int numberOfReplicas) {
return stateWithAssignedPrimariesAndReplicas(indices, numberOfShards, numberOfReplicas, 0);
}

/**
* Creates cluster state with several indexes, shards and replicas and all shards STARTED.
*/
public static ClusterState stateWithAssignedPrimariesAndReplicas(
String[] indices,
int numberOfShards,
int numberOfReplicas,
int numberOfSearchReplicas
) {
int numberOfDataNodes = numberOfReplicas + 1;
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
for (int i = 0; i < numberOfDataNodes + 1; i++) {
Expand All @@ -347,6 +359,7 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indice
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, numberOfShards)
.put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put(SETTING_NUMBER_OF_SEARCH_REPLICAS, numberOfSearchReplicas)
.put(SETTING_CREATION_DATE, System.currentTimeMillis())
)
.build();
Expand All @@ -363,6 +376,19 @@ public static ClusterState stateWithAssignedPrimariesAndReplicas(String[] indice
TestShardRouting.newShardRouting(index, i, newNode(replica + 1).getId(), null, false, ShardRoutingState.STARTED)
);
}
for (int replica = numberOfReplicas; replica < numberOfSearchReplicas + numberOfReplicas; replica++) {
indexShardRoutingBuilder.addShard(
TestShardRouting.newShardRouting(
new ShardId(index, IndexMetadata.INDEX_UUID_NA_VALUE, i),
newNode(replica + 1).getId(),
null,
false,
true,
ShardRoutingState.STARTED,
null
)
);
}
indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build());
}
routingTableBuilder.add(indexRoutingTableBuilder.build());
Expand Down

0 comments on commit c62aeda

Please sign in to comment.