From bd25364145579aceec29cfbd1d0421f7aaf128af Mon Sep 17 00:00:00 2001 From: Ankit Kala Date: Thu, 29 Jun 2023 11:23:03 +0530 Subject: [PATCH 1/2] Handle replication cases where the remote store metadata file is not present Signed-off-by: Ankit Kala Signed-off-by: Ashish Singh --- CHANGELOG.md | 1 + .../RemoteStoreReplicationSource.java | 39 +++++++++++-------- .../RemoteStoreReplicationSourceTests.java | 38 +++++++++++++++++- 3 files changed, 60 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ffc9e6d2783ea..bff5deee3a786 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -101,6 +101,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Support transport action names when registering NamedRoutes ([#7957](https://github.com/opensearch-project/OpenSearch/pull/7957)) - Create concept of persistent ThreadContext headers that are unstashable ([#8291]()https://github.com/opensearch-project/OpenSearch/pull/8291) - Enable Partial Flat Object ([#7997](https://github.com/opensearch-project/OpenSearch/pull/7997)) +- Handle replication cases where the remote store metadata file is not present ([#8433](https://github.com/opensearch-project/OpenSearch/pull/8433)) ### Dependencies - Bump `com.azure:azure-storage-common` from 12.21.0 to 12.21.1 (#7566, #7814) diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index b8941737a7c57..8dcafccd9cb47 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -17,6 +17,7 @@ import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.util.Collections; @@ -53,24 +54,28 @@ public void getCheckpointMetadata( // TODO: Need to figure out a way to pass this information for segment metadata via remote store. final Version version = indexShard.getSegmentInfosSnapshot().get().getCommitLuceneVersion(); try { - metadataMap = remoteDirectory.readLatestMetadataFile() - .getMetadata() - .entrySet() - .stream() - .collect( - Collectors.toMap( - e -> e.getKey(), - e -> new StoreFileMetadata( - e.getValue().getOriginalFilename(), - e.getValue().getLength(), - Store.digestToString(Long.valueOf(e.getValue().getChecksum())), - version, - null + RemoteSegmentMetadata mdFile = remoteDirectory.readLatestMetadataFile(); + if (mdFile == null) { + listener.onResponse(new CheckpointInfoResponse(checkpoint, Collections.emptyMap(), null)); + } else { + metadataMap = mdFile.getMetadata() + .entrySet() + .stream() + .collect( + Collectors.toMap( + e -> e.getKey(), + e -> new StoreFileMetadata( + e.getValue().getOriginalFilename(), + e.getValue().getLength(), + Store.digestToString(Long.valueOf(e.getValue().getChecksum())), + version, + null + ) ) - ) - ); - // TODO: GET current checkpoint from remote store. - listener.onResponse(new CheckpointInfoResponse(checkpoint, metadataMap, null)); + ); + // TODO: GET current checkpoint from remote store. + listener.onResponse(new CheckpointInfoResponse(checkpoint, metadataMap, null)); + } } catch (Exception e) { listener.onFailure(e); } diff --git a/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java index 04f821a5fc48c..b1ae4c8203e72 100644 --- a/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java @@ -41,6 +41,8 @@ public class RemoteStoreReplicationSourceTests extends OpenSearchIndexLevelRepli private IndexShard mockShard; + private Store remoteStore; + @Override public void setUp() throws Exception { super.setUp(); @@ -59,7 +61,7 @@ public void setUp() throws Exception { Store store = mock(Store.class); when(mockShard.store()).thenReturn(store); when(store.directory()).thenReturn(indexShard.store().directory()); - Store remoteStore = mock(Store.class); + remoteStore = mock(Store.class); when(mockShard.remoteStore()).thenReturn(remoteStore); RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()).getDelegate()) @@ -113,6 +115,40 @@ public void testGetCheckpointMetadataFailure() { }); } + public void testGetCheckpointMetadataEmpty() throws ExecutionException, InterruptedException, IOException { + when(mockShard.getSegmentInfosSnapshot()).thenReturn(indexShard.getSegmentInfosSnapshot()); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + VERSION, + Codec.getDefault().getName() + ); + IndexShard emptyIndexShard = null; + try { + emptyIndexShard = newStartedShard( + true, + Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(), + new InternalEngineFactory() + ); + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = + (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) emptyIndexShard.remoteStore().directory()).getDelegate()) + .getDelegate(); + FilterDirectory remoteStoreFilterDirectory = new RemoteStoreRefreshListenerTests.TestFilterDirectory( + new RemoteStoreRefreshListenerTests.TestFilterDirectory(remoteSegmentStoreDirectory) + ); + when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); + + final PlainActionFuture res = PlainActionFuture.newFuture(); + replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, res); + CheckpointInfoResponse response = res.get(); + assert (response.getCheckpoint().equals(checkpoint)); + assert (response.getMetadataMap().isEmpty()); + } finally { + closeShards(emptyIndexShard); + } + } + public void testGetSegmentFiles() throws ExecutionException, InterruptedException { final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( indexShard.shardId(), From 041e614889c11f13716ad57894ee2bb1262a56ff Mon Sep 17 00:00:00 2001 From: Ankit Kala Date: Fri, 7 Jul 2023 13:35:58 +0530 Subject: [PATCH 2/2] Allow null metadata file only if shard is not started Signed-off-by: Ankit Kala --- .../RemoteStoreReplicationSource.java | 41 ++++++++++--------- .../RemoteStoreReplicationSourceTests.java | 11 ++++- 2 files changed, 32 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index 8dcafccd9cb47..c5be7635782af 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -14,6 +14,7 @@ import org.apache.lucene.util.Version; import org.opensearch.action.ActionListener; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; @@ -55,27 +56,29 @@ public void getCheckpointMetadata( final Version version = indexShard.getSegmentInfosSnapshot().get().getCommitLuceneVersion(); try { RemoteSegmentMetadata mdFile = remoteDirectory.readLatestMetadataFile(); - if (mdFile == null) { + // During initial recovery flow, the remote store might not have metadata as primary hasn't uploaded anything yet. + if (mdFile == null && indexShard.state().equals(IndexShardState.STARTED) == false) { listener.onResponse(new CheckpointInfoResponse(checkpoint, Collections.emptyMap(), null)); - } else { - metadataMap = mdFile.getMetadata() - .entrySet() - .stream() - .collect( - Collectors.toMap( - e -> e.getKey(), - e -> new StoreFileMetadata( - e.getValue().getOriginalFilename(), - e.getValue().getLength(), - Store.digestToString(Long.valueOf(e.getValue().getChecksum())), - version, - null - ) - ) - ); - // TODO: GET current checkpoint from remote store. - listener.onResponse(new CheckpointInfoResponse(checkpoint, metadataMap, null)); + return; } + assert mdFile != null : "Remote metadata file can't be null if shard is active " + indexShard.state(); + metadataMap = mdFile.getMetadata() + .entrySet() + .stream() + .collect( + Collectors.toMap( + e -> e.getKey(), + e -> new StoreFileMetadata( + e.getValue().getOriginalFilename(), + e.getValue().getLength(), + Store.digestToString(Long.valueOf(e.getValue().getChecksum())), + version, + null + ) + ) + ); + // TODO: GET current checkpoint from remote store. + listener.onResponse(new CheckpointInfoResponse(checkpoint, metadataMap, null)); } catch (Exception e) { listener.onFailure(e); } diff --git a/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java index b1ae4c8203e72..7a703f8ec69b5 100644 --- a/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java @@ -17,6 +17,7 @@ import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.shard.RemoteStoreRefreshListenerTests; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; @@ -70,7 +71,6 @@ public void setUp() throws Exception { new RemoteStoreRefreshListenerTests.TestFilterDirectory(remoteSegmentStoreDirectory) ); when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); - replicationSource = new RemoteStoreReplicationSource(mockShard); } @@ -140,10 +140,19 @@ public void testGetCheckpointMetadataEmpty() throws ExecutionException, Interrup when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); final PlainActionFuture res = PlainActionFuture.newFuture(); + when(mockShard.state()).thenReturn(IndexShardState.RECOVERING); + // Recovering shard should just do a noop and return empty metadata map. replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, res); CheckpointInfoResponse response = res.get(); assert (response.getCheckpoint().equals(checkpoint)); assert (response.getMetadataMap().isEmpty()); + + when(mockShard.state()).thenReturn(IndexShardState.STARTED); + // Started shard should fail with assertion error. + expectThrows(AssertionError.class, () -> { + final PlainActionFuture res2 = PlainActionFuture.newFuture(); + replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, res2); + }); } finally { closeShards(emptyIndexShard); }