Skip to content

Commit

Permalink
Fix weighted shard routing state across search requests (#6004) (#6060)
Browse files Browse the repository at this point in the history
* Fix maintaining state across search requests with weighted shard routing


(cherry picked from commit cd860ec)

Signed-off-by: Anshu Agarwal <anshukag@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Andrew Ross <andrross@amazon.com>
  • Loading branch information
3 people authored Feb 3, 2023
1 parent 3c91c92 commit 3d6fe9c
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Fixed
- [Segment Replication] Fix for peer recovery ([#5344](https://github.com/opensearch-project/OpenSearch/pull/5344))
- Fix weighted shard routing state across search requests([#6004](https://github.com/opensearch-project/OpenSearch/pull/6004))
- [Segment Replication] Fix bug where inaccurate sequence numbers are sent during replication ([#6122](https://github.com/opensearch-project/OpenSearch/pull/6122))

### Security
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
public class IndexShardRoutingTable implements Iterable<ShardRouting> {

final ShardShuffler shuffler;
// Shuffler for weighted round-robin shard routing. This uses rotation to permute shards.
final ShardShuffler shufflerForWeightedRouting;
final ShardId shardId;

final ShardRouting primary;
Expand Down Expand Up @@ -105,6 +107,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
IndexShardRoutingTable(ShardId shardId, List<ShardRouting> shards) {
this.shardId = shardId;
this.shuffler = new RotationShardShuffler(Randomness.get().nextInt());
this.shufflerForWeightedRouting = new RotationShardShuffler(Randomness.get().nextInt());
this.shards = Collections.unmodifiableList(shards);

ShardRouting primary = null;
Expand Down Expand Up @@ -323,11 +326,11 @@ public ShardIterator activeInitializingShardsWeightedIt(
double defaultWeight,
boolean isFailOpenEnabled
) {
final int seed = shuffler.nextSeed();
final int seed = shufflerForWeightedRouting.nextSeed();
List<ShardRouting> ordered = new ArrayList<>();
List<ShardRouting> orderedActiveShards = getActiveShardsByWeight(weightedRouting, nodes, defaultWeight);
List<ShardRouting> orderedListWithDistinctShards;
ordered.addAll(shuffler.shuffle(orderedActiveShards, seed));
ordered.addAll(shufflerForWeightedRouting.shuffle(orderedActiveShards, seed));
if (!allInitializingShards.isEmpty()) {
List<ShardRouting> orderedInitializingShards = getInitializingShardsByWeight(weightedRouting, nodes, defaultWeight);
ordered.addAll(orderedInitializingShards);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.threadpool.TestThreadPool;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import static java.util.Collections.singletonMap;
Expand Down Expand Up @@ -768,4 +770,94 @@ public void testWeightedRoutingShardState() {
terminate(threadPool);
}
}

/**
* Test to validate that shard routing state is maintained across requests, requests are assigned to nodes
* according to assigned routing weights
*/
public void testWeightedRoutingShardStateWithDifferentWeights() {
TestThreadPool threadPool = null;
try {
Settings.Builder settings = Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.awareness.attributes", "zone");
AllocationService strategy = createAllocationService(settings.build());

Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2))
.build();

RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build();

ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.build();

threadPool = new TestThreadPool("testThatOnlyNodesSupport");
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);

Map<String, String> node1Attributes = new HashMap<>();
node1Attributes.put("zone", "zone1");
Map<String, String> node2Attributes = new HashMap<>();
node2Attributes.put("zone", "zone2");
Map<String, String> node3Attributes = new HashMap<>();
node3Attributes.put("zone", "zone3");
clusterState = ClusterState.builder(clusterState)
.nodes(
DiscoveryNodes.builder()
.add(newNode("node1", unmodifiableMap(node1Attributes)))
.add(newNode("node2", unmodifiableMap(node2Attributes)))
.add(newNode("node3", unmodifiableMap(node3Attributes)))
.localNodeId("node1")
)
.build();
clusterState = strategy.reroute(clusterState, "reroute");

clusterState = startInitializingShardsAndReroute(strategy, clusterState);
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
List<Map<String, Double>> weightsList = new ArrayList<>();
Map<String, Double> weights1 = Map.of("zone1", 1.0, "zone2", 1.0, "zone3", 0.0);
weightsList.add(weights1);

Map<String, Double> weights2 = Map.of("zone1", 1.0, "zone2", 0.0, "zone3", 1.0);
weightsList.add(weights2);

Map<String, Double> weights3 = Map.of("zone1", 0.0, "zone2", 1.0, "zone3", 1.0);
weightsList.add(weights3);

Map<String, Double> weights4 = Map.of("zone1", 1.0, "zone2", 1.0, "zone3", 0.0);
weightsList.add(weights4);

for (int i = 0; i < weightsList.size(); i++) {
WeightedRouting weightedRouting = new WeightedRouting("zone", weightsList.get(i));
ShardIterator shardIterator = clusterState.routingTable()
.index("test")
.shard(0)
.activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true);

ShardRouting shardRouting1 = shardIterator.nextOrNull();

shardIterator = clusterState.routingTable()
.index("test")
.shard(0)
.activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true);

ShardRouting shardRouting2 = shardIterator.nextOrNull();

shardIterator = clusterState.routingTable()
.index("test")
.shard(0)
.activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true);

ShardRouting shardRouting3 = shardIterator.nextOrNull();

assertEquals(shardRouting1.currentNodeId(), shardRouting3.currentNodeId());
assertNotEquals(shardRouting1.currentNodeId(), shardRouting2.currentNodeId());
}

} finally {
terminate(threadPool);
}
}
}

0 comments on commit 3d6fe9c

Please sign in to comment.