Skip to content

Commit

Permalink
route to pri shards
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <poojiraj@amazon.com>
  • Loading branch information
Poojita-Raj committed Aug 9, 2023
1 parent 7278f43 commit 55645ac
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import org.apache.lucene.util.BytesRef;
import org.junit.Before;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.CreatePitRequest;
Expand Down Expand Up @@ -86,6 +88,7 @@
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.action.search.PitTestsUtil.assertSegments;
import static org.opensearch.action.search.SearchContextId.decode;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
Expand All @@ -109,6 +112,10 @@ private void setup() {
internalCluster().startClusterManagerOnlyNode();
}

static String indexOrAlias() {
return randomBoolean() ? INDEX_NAME : "alias";
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -1425,4 +1432,34 @@ public void testIndexWhileRecoveringReplica() throws Exception {
.get();
assertNoFailures(response);
}

/**
* Tests whether segment replication supports realtime get requests and reads and parses source from the translog to serve strong reads.
*/
public void testRealtimeGetRequests() {
final String primary = internalCluster().startDataOnlyNode();
final String replica = internalCluster().startDataOnlyNode();

assertAcked(
prepareCreate(INDEX_NAME).setSettings(Settings.builder().put("index.refresh_interval", -1).put(indexSettings()))
.addAlias(new Alias("alias"))
);
ensureGreen(INDEX_NAME);

GetResponse response = client().prepareGet(indexOrAlias(), "1").get();
assertThat(response.isExists(), equalTo(false));

logger.info("--> index doc 1");
client().prepareIndex(indexOrAlias()).setId("1").setSource("foo", "bar").get();

logger.info("--> non realtime get 1");
response = client().prepareGet(indexOrAlias(), "1").setRealtime(false).get();
assertThat(response.isExists(), equalTo(false));

logger.info("--> realtime get 1");
response = client().prepareGet(indexOrAlias(), "1").get();
assertThat(response.isExists(), equalTo(true));
assertThat(response.getIndex(), equalTo(INDEX_NAME));
assertThat(response.getSourceAsMap().get("foo").toString(), equalTo("bar"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.single.shard.TransportSingleShardAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
Expand All @@ -47,6 +49,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -90,14 +93,21 @@ protected boolean resolveIndex(GetRequest request) {

@Override
protected ShardIterator shards(ClusterState state, InternalRequest request) {
String preference = request.request().preference();
// route realtime GET requests when segment replication is enabled to primary shards,
// iff there are no other preferences/routings enabled for routing to a specific shard
if (state.getMetadata()
.index(request.concreteIndex())
.getSettings()
.get(IndexMetadata.SETTING_REPLICATION_TYPE)
.equals(ReplicationType.SEGMENT.toString())
&& request.request().realtime()
&& request.request().routing() == null
&& request.request().preference() == null) {
preference = Preference.PRIMARY.type();
}
return clusterService.operationRouting()
.getShards(
clusterService.state(),
request.concreteIndex(),
request.request().id(),
request.request().routing(),
request.request().preference()
);
.getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(), preference);
}

@Override
Expand Down

0 comments on commit 55645ac

Please sign in to comment.