From 55645ac5be14023e830902af7817ab9061b81e83 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Mon, 31 Jul 2023 12:25:20 -0700 Subject: [PATCH] route to pri shards Signed-off-by: Poojita Raj --- .../replication/SegmentReplicationIT.java | 37 +++++++++++++++++++ .../action/get/TransportGetAction.java | 24 ++++++++---- 2 files changed, 54 insertions(+), 7 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 4b314ef1ae27b..56fa15f400672 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -22,9 +22,11 @@ import org.apache.lucene.util.BytesRef; import org.junit.Before; import org.opensearch.action.ActionFuture; +import org.opensearch.action.admin.indices.alias.Alias; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.CreatePitAction; import org.opensearch.action.search.CreatePitRequest; @@ -86,6 +88,7 @@ import java.util.stream.Collectors; import static java.util.Arrays.asList; +import static org.hamcrest.Matchers.equalTo; import static org.opensearch.action.search.PitTestsUtil.assertSegments; import static org.opensearch.action.search.SearchContextId.decode; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; @@ -109,6 +112,10 @@ private void setup() { internalCluster().startClusterManagerOnlyNode(); } + static String indexOrAlias() { + return randomBoolean() ? INDEX_NAME : "alias"; + } + public void testPrimaryStopped_ReplicaPromoted() throws Exception { final String primary = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); @@ -1425,4 +1432,34 @@ public void testIndexWhileRecoveringReplica() throws Exception { .get(); assertNoFailures(response); } + + /** + * Tests whether segment replication supports realtime get requests and reads and parses source from the translog to serve strong reads. + */ + public void testRealtimeGetRequests() { + final String primary = internalCluster().startDataOnlyNode(); + final String replica = internalCluster().startDataOnlyNode(); + + assertAcked( + prepareCreate(INDEX_NAME).setSettings(Settings.builder().put("index.refresh_interval", -1).put(indexSettings())) + .addAlias(new Alias("alias")) + ); + ensureGreen(INDEX_NAME); + + GetResponse response = client().prepareGet(indexOrAlias(), "1").get(); + assertThat(response.isExists(), equalTo(false)); + + logger.info("--> index doc 1"); + client().prepareIndex(indexOrAlias()).setId("1").setSource("foo", "bar").get(); + + logger.info("--> non realtime get 1"); + response = client().prepareGet(indexOrAlias(), "1").setRealtime(false).get(); + assertThat(response.isExists(), equalTo(false)); + + logger.info("--> realtime get 1"); + response = client().prepareGet(indexOrAlias(), "1").get(); + assertThat(response.isExists(), equalTo(true)); + assertThat(response.getIndex(), equalTo(INDEX_NAME)); + assertThat(response.getSourceAsMap().get("foo").toString(), equalTo("bar")); + } } diff --git a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java index 07546034665b3..e71cf4208586b 100644 --- a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java @@ -37,7 +37,9 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.single.shard.TransportSingleShardAction; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.routing.Preference; import org.opensearch.cluster.routing.ShardIterator; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; @@ -47,6 +49,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -90,14 +93,21 @@ protected boolean resolveIndex(GetRequest request) { @Override protected ShardIterator shards(ClusterState state, InternalRequest request) { + String preference = request.request().preference(); + // route realtime GET requests when segment replication is enabled to primary shards, + // iff there are no other preferences/routings enabled for routing to a specific shard + if (state.getMetadata() + .index(request.concreteIndex()) + .getSettings() + .get(IndexMetadata.SETTING_REPLICATION_TYPE) + .equals(ReplicationType.SEGMENT.toString()) + && request.request().realtime() + && request.request().routing() == null + && request.request().preference() == null) { + preference = Preference.PRIMARY.type(); + } return clusterService.operationRouting() - .getShards( - clusterService.state(), - request.concreteIndex(), - request.request().id(), - request.request().routing(), - request.request().preference() - ); + .getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(), preference); } @Override