diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 4b314ef1ae27b..56fa15f400672 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -22,9 +22,11 @@ import org.apache.lucene.util.BytesRef; import org.junit.Before; import org.opensearch.action.ActionFuture; +import org.opensearch.action.admin.indices.alias.Alias; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.CreatePitAction; import org.opensearch.action.search.CreatePitRequest; @@ -86,6 +88,7 @@ import java.util.stream.Collectors; import static java.util.Arrays.asList; +import static org.hamcrest.Matchers.equalTo; import static org.opensearch.action.search.PitTestsUtil.assertSegments; import static org.opensearch.action.search.SearchContextId.decode; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; @@ -109,6 +112,10 @@ private void setup() { internalCluster().startClusterManagerOnlyNode(); } + static String indexOrAlias() { + return randomBoolean() ? INDEX_NAME : "alias"; + } + public void testPrimaryStopped_ReplicaPromoted() throws Exception { final String primary = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); @@ -1425,4 +1432,34 @@ public void testIndexWhileRecoveringReplica() throws Exception { .get(); assertNoFailures(response); } + + /** + * Tests whether segment replication supports realtime get requests and reads and parses source from the translog to serve strong reads. + */ + public void testRealtimeGetRequests() { + final String primary = internalCluster().startDataOnlyNode(); + final String replica = internalCluster().startDataOnlyNode(); + + assertAcked( + prepareCreate(INDEX_NAME).setSettings(Settings.builder().put("index.refresh_interval", -1).put(indexSettings())) + .addAlias(new Alias("alias")) + ); + ensureGreen(INDEX_NAME); + + GetResponse response = client().prepareGet(indexOrAlias(), "1").get(); + assertThat(response.isExists(), equalTo(false)); + + logger.info("--> index doc 1"); + client().prepareIndex(indexOrAlias()).setId("1").setSource("foo", "bar").get(); + + logger.info("--> non realtime get 1"); + response = client().prepareGet(indexOrAlias(), "1").setRealtime(false).get(); + assertThat(response.isExists(), equalTo(false)); + + logger.info("--> realtime get 1"); + response = client().prepareGet(indexOrAlias(), "1").get(); + assertThat(response.isExists(), equalTo(true)); + assertThat(response.getIndex(), equalTo(INDEX_NAME)); + assertThat(response.getSourceAsMap().get("foo").toString(), equalTo("bar")); + } } diff --git a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java index 07546034665b3..29f9762536611 100644 --- a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java @@ -37,7 +37,9 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.single.shard.TransportSingleShardAction; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.routing.Preference; import org.opensearch.cluster.routing.ShardIterator; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; @@ -47,6 +49,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -90,14 +93,20 @@ protected boolean resolveIndex(GetRequest request) { @Override protected ShardIterator shards(ClusterState state, InternalRequest request) { + String preference = request.request().preference(); + // route realtime GET requests when segment replication is enabled to primary shards, + // iff there are no other preferences enabled for shard routing + if (state.getMetadata() + .index(request.concreteIndex()) + .getSettings() + .get(IndexMetadata.SETTING_REPLICATION_TYPE) + .equals(ReplicationType.SEGMENT.toString()) + && request.request().realtime() + && request.request().routing() == null) { + preference = Preference.PRIMARY.type(); + } return clusterService.operationRouting() - .getShards( - clusterService.state(), - request.concreteIndex(), - request.request().id(), - request.request().routing(), - request.request().preference() - ); + .getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(), preference); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java index 24e035e3db643..0086e6608569a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java @@ -32,6 +32,8 @@ package org.opensearch.cluster.routing; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; @@ -63,6 +65,8 @@ */ public class OperationRouting { + private static final Logger logger = LogManager.getLogger(OperationRouting.class); + public static final Setting USE_ADAPTIVE_REPLICA_SELECTION_SETTING = Setting.boolSetting( "cluster.routing.use_adaptive_replica_selection", true, @@ -351,10 +355,12 @@ private ShardIterator preferenceActiveShardIterator( case LOCAL: return indexShard.preferNodeActiveInitializingShardsIt(Collections.singleton(localNodeId)); case PRIMARY: + logger.info("pri shards"); return indexShard.primaryActiveInitializingShardIt(); case REPLICA: return indexShard.replicaActiveInitializingShardIt(); case PRIMARY_FIRST: + logger.info("pri first shards"); return indexShard.primaryFirstActiveInitializingShardsIt(); case REPLICA_FIRST: return indexShard.replicaFirstActiveInitializingShardsIt(); diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index f1cd0d1d1f291..4d50b66c7286b 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -291,7 +291,7 @@ public class IndicesService extends AbstractLifecycleComponent private final ScriptService scriptService; private final ClusterService clusterService; private final Client client; - private volatile Map indices = emptyMap(); + public volatile Map indices = emptyMap(); private final Map> pendingDeletes = new HashMap<>(); private final AtomicInteger numUncompletedDeletes = new AtomicInteger(); private final OldShardsStats oldShardsStats = new OldShardsStats(); diff --git a/server/src/test/java/org/opensearch/action/get/TransportGetActionTests.java b/server/src/test/java/org/opensearch/action/get/TransportGetActionTests.java new file mode 100644 index 0000000000000..db618d5331725 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/get/TransportGetActionTests.java @@ -0,0 +1,189 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.get; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.opensearch.Version; +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.OperationRouting; +import org.opensearch.cluster.routing.ShardIterator; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.IndicesService; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskManager; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.Transport; +import org.opensearch.transport.TransportService; + +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.emptySet; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.opensearch.common.UUIDs.randomBase64UUID; + +public class TransportGetActionTests extends OpenSearchTestCase { + + private static ThreadPool threadPool; + private static TransportService transportService; + private static ClusterService clusterService; + private static TransportMultiGetAction transportAction; + private static TransportShardMultiGetAction shardAction; + + @BeforeClass + public static void beforeClass() throws Exception { + threadPool = new TestThreadPool(TransportMultiGetActionTests.class.getSimpleName()); + + transportService = new TransportService( + Settings.EMPTY, + mock(Transport.class), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal( + Settings.builder().put("node.name", "node1").build(), + boundAddress.publishAddress(), + randomBase64UUID() + ), + null, + emptySet() + ) { + @Override + public TaskManager getTaskManager() { + return taskManager; + } + }; + + final Index index1 = new Index("index1", randomBase64UUID()); + final Index index2 = new Index("index2", randomBase64UUID()); + final ClusterState clusterState = ClusterState.builder(new ClusterName(TransportMultiGetActionTests.class.getSimpleName())) + .metadata( + new Metadata.Builder().put( + new IndexMetadata.Builder(index1.getName()).settings( + Settings.builder() + .put("index.version.created", Version.CURRENT) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put(IndexMetadata.SETTING_INDEX_UUID, index1.getUUID()) + ) + .putMapping( + XContentHelper.convertToJson( + BytesReference.bytes( + XContentFactory.jsonBuilder() + .startObject() + .startObject("_doc") + .startObject("_routing") + .field("required", false) + .endObject() + .endObject() + .endObject() + ), + true, + XContentType.JSON + ) + ) + ) + .put( + new IndexMetadata.Builder(index2.getName()).settings( + Settings.builder() + .put("index.version.created", Version.CURRENT) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put(IndexMetadata.SETTING_INDEX_UUID, index1.getUUID()) + ) + .putMapping( + XContentHelper.convertToJson( + BytesReference.bytes( + XContentFactory.jsonBuilder() + .startObject() + .startObject("_doc") + .startObject("_routing") + .field("required", true) + .endObject() + .endObject() + .endObject() + ), + true, + XContentType.JSON + ) + ) + ) + ) + .build(); + + final ShardIterator index1ShardIterator = mock(ShardIterator.class); + when(index1ShardIterator.shardId()).thenReturn(new ShardId(index1, randomInt())); + + final ShardIterator index2ShardIterator = mock(ShardIterator.class); + when(index2ShardIterator.shardId()).thenReturn(new ShardId(index2, randomInt())); + + final OperationRouting operationRouting = mock(OperationRouting.class); + when( + operationRouting.getShards(eq(clusterState), eq(index1.getName()), anyString(), nullable(String.class), nullable(String.class)) + ).thenReturn(index1ShardIterator); + when(operationRouting.shardId(eq(clusterState), eq(index1.getName()), nullable(String.class), nullable(String.class))).thenReturn( + new ShardId(index1, randomInt()) + ); + when( + operationRouting.getShards(eq(clusterState), eq(index2.getName()), anyString(), nullable(String.class), nullable(String.class)) + ).thenReturn(index2ShardIterator); + when(operationRouting.shardId(eq(clusterState), eq(index2.getName()), nullable(String.class), nullable(String.class))).thenReturn( + new ShardId(index2, randomInt()) + ); + + clusterService = mock(ClusterService.class); + when(clusterService.localNode()).thenReturn(transportService.getLocalNode()); + when(clusterService.state()).thenReturn(clusterState); + when(clusterService.operationRouting()).thenReturn(operationRouting); + + shardAction = new TransportShardMultiGetAction( + clusterService, + transportService, + mock(IndicesService.class), + threadPool, + new ActionFilters(emptySet()), + new TransportMultiGetActionTests.Resolver() + ) { + @Override + protected void doExecute(Task task, MultiGetShardRequest request, ActionListener listener) {} + }; + } + + @AfterClass + public static void afterClass() { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + threadPool = null; + transportService = null; + clusterService = null; + transportAction = null; + shardAction = null; + } + + public void testShards() { + + } +}