diff --git a/CHANGELOG.md b/CHANGELOG.md index dec5c18472bc4..343a791a33f54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,6 +84,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Return success on DeletePits when no PITs exist. ([#6544](https://github.com/opensearch-project/OpenSearch/pull/6544)) - Add node repurpose command for search nodes ([#6517](https://github.com/opensearch-project/OpenSearch/pull/6517)) - [Segment Replication] Apply backpressure when replicas fall behind ([#6563](https://github.com/opensearch-project/OpenSearch/pull/6563)) +- [Remote Store] Integrate remote segment store in peer recovery flow ([#6664](https://github.com/opensearch-project/OpenSearch/pull/6664)) - [Segment Replication] Add new cluster setting to set replication strategy by default for all indices in cluster. ([#6791](https://github.com/opensearch-project/OpenSearch/pull/6791)) ### Dependencies diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 6d5e6a250383b..86e4e50a08a38 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -11,13 +11,16 @@ import org.junit.After; import org.junit.Before; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.action.admin.indices.recovery.RecoveryResponse; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexModule; +import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; import org.opensearch.test.InternalTestCluster; @@ -30,6 +33,8 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -207,4 +212,93 @@ public void testRemoteTranslogRestoreWithRefreshedData() throws IOException { public void testRemoteTranslogRestoreWithCommittedData() throws IOException { testRestoreFlow(true, randomIntBetween(2, 5), true); } + + private void testPeerRecovery(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush) throws Exception { + internalCluster().startDataOnlyNodes(3); + if (remoteTranslog) { + createIndex(INDEX_NAME, remoteTranslogIndexSettings(0)); + } else { + createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); + } + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + Map indexStats = indexData(numberOfIterations, invokeFlush); + + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + .get(); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + refresh(INDEX_NAME); + String replicaNodeName = replicaNodeName(INDEX_NAME); + assertBusy( + () -> assertHitCount(client(replicaNodeName).prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS)), + 30, + TimeUnit.SECONDS + ); + + RecoveryResponse recoveryResponse = client(replicaNodeName).admin().indices().prepareRecoveries().get(); + + Optional recoverySource = recoveryResponse.shardRecoveryStates() + .get(INDEX_NAME) + .stream() + .filter(rs -> rs.getRecoverySource().getType() == RecoverySource.Type.PEER) + .findFirst(); + assertFalse(recoverySource.isEmpty()); + if (numberOfIterations == 1 && invokeFlush) { + // segments_N file is copied to new replica + assertEquals(1, recoverySource.get().getIndex().recoveredFileCount()); + } else { + assertEquals(0, recoverySource.get().getIndex().recoveredFileCount()); + } + + IndexResponse response = indexSingleDoc(); + assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL) + 1, response.getSeqNo()); + refresh(INDEX_NAME); + assertBusy( + () -> assertHitCount(client(replicaNodeName).prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) + 1), + 30, + TimeUnit.SECONDS + ); + } + + public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogNoDataFlush() throws Exception { + testPeerRecovery(false, 1, true); + } + + public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogFlush() throws Exception { + testPeerRecovery(false, randomIntBetween(2, 5), true); + } + + public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogNoDataRefresh() throws Exception { + testPeerRecovery(false, 1, false); + } + + public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogRefresh() throws Exception { + testPeerRecovery(false, randomIntBetween(2, 5), false); + } + + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193") + public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataFlush() throws Exception { + testPeerRecovery(true, 1, true); + } + + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193") + public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogFlush() throws Exception { + testPeerRecovery(true, randomIntBetween(2, 5), true); + } + + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193") + public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataRefresh() throws Exception { + testPeerRecovery(true, 1, false); + } + + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193") + public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogRefresh() throws Exception { + testPeerRecovery(true, randomIntBetween(2, 5), false); + } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index e71606d623a62..4a2d0f0198051 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4372,11 +4372,22 @@ public void close() throws IOException { } /** - * Downloads segments from remote segment store. + * Downloads segments from remote segment store. This method will download segments till + * last refresh checkpoint. * @param overrideLocal flag to override local segment files with those in remote store * @throws IOException if exception occurs while reading segments from remote store */ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException { + syncSegmentsFromRemoteSegmentStore(overrideLocal, true); + } + + /** + * Downloads segments from remote segment store. + * @param overrideLocal flag to override local segment files with those in remote store + * @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise + * @throws IOException if exception occurs while reading segments from remote store + */ + public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException { assert indexSettings.isRemoteStoreEnabled(); logger.info("Downloading segments from remote segment store"); assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory"; @@ -4415,7 +4426,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE skippedSegments.add(file); } } - if (segmentInfosSnapshotFilename != null) { + if (refreshLevelSegmentSync && segmentInfosSnapshotFilename != null) { try ( ChecksumIndexInput indexInput = new BufferedChecksumIndexInput( storeDirectory.openInput(segmentInfosSnapshotFilename, IOContext.DEFAULT) diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 1c5025ad834f7..b54171bb9a7d7 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -243,9 +243,13 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node"; logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); indexShard.prepareForIndexRecovery(); + final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled(); + if (hasRemoteSegmentStore) { + indexShard.syncSegmentsFromRemoteSegmentStore(false, false); + } final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot(); - final boolean verifyTranslog = (hasRemoteTranslog || hasNoTranslog) == false; + final boolean verifyTranslog = (hasRemoteTranslog || hasNoTranslog || hasRemoteSegmentStore) == false; final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!hasRemoteTranslog); assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]"; diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index e0a6990ad4445..7c074b07f5f3d 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -2449,4 +2449,11 @@ protected String primaryNodeName(String indexName) { String nodeId = clusterState.getRoutingTable().index(indexName).shard(0).primaryShard().currentNodeId(); return clusterState.getRoutingNodes().node(nodeId).node().getName(); } + + protected String replicaNodeName(String indexName) { + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + String nodeId = clusterState.getRoutingTable().index(indexName).shard(0).replicaShards().get(0).currentNodeId(); + return clusterState.getRoutingNodes().node(nodeId).node().getName(); + } + }