Skip to content

Commit

Permalink
Allow null metadata file only is shard is inactive
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Kala <ankikala@amazon.com>
  • Loading branch information
ankitkala committed Jul 7, 2023
1 parent 69596ea commit 8fcd17d
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.lucene.util.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
Expand Down Expand Up @@ -55,27 +56,29 @@ public void getCheckpointMetadata(
final Version version = indexShard.getSegmentInfosSnapshot().get().getCommitLuceneVersion();
try {
RemoteSegmentMetadata mdFile = remoteDirectory.readLatestMetadataFile();
if (mdFile == null) {
// During initial recovery flow, the remote store might not have metadata as primary hasn't uploaded anything yet.
if (mdFile == null && indexShard.state().equals(IndexShardState.STARTED) == false) {
listener.onResponse(new CheckpointInfoResponse(checkpoint, Collections.emptyMap(), null));
} else {
metadataMap = mdFile.getMetadata()
.entrySet()
.stream()
.collect(
Collectors.toMap(
e -> e.getKey(),
e -> new StoreFileMetadata(
e.getValue().getOriginalFilename(),
e.getValue().getLength(),
Store.digestToString(Long.valueOf(e.getValue().getChecksum())),
version,
null
)
)
);
// TODO: GET current checkpoint from remote store.
listener.onResponse(new CheckpointInfoResponse(checkpoint, metadataMap, null));
return;
}
assert mdFile != null : "Remote metadata file can't be null if shard is active " + indexShard.state();
metadataMap = mdFile.getMetadata()
.entrySet()
.stream()
.collect(
Collectors.toMap(
e -> e.getKey(),
e -> new StoreFileMetadata(
e.getValue().getOriginalFilename(),
e.getValue().getLength(),
Store.digestToString(Long.valueOf(e.getValue().getChecksum())),
version,
null
)
)
);
// TODO: GET current checkpoint from remote store.
listener.onResponse(new CheckpointInfoResponse(checkpoint, metadataMap, null));
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.RemoteStoreRefreshListenerTests;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
Expand Down Expand Up @@ -70,7 +71,6 @@ public void setUp() throws Exception {
new RemoteStoreRefreshListenerTests.TestFilterDirectory(remoteSegmentStoreDirectory)
);
when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory);

replicationSource = new RemoteStoreReplicationSource(mockShard);
}

Expand Down Expand Up @@ -140,10 +140,19 @@ public void testGetCheckpointMetadataEmpty() throws ExecutionException, Interrup
when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory);

final PlainActionFuture<CheckpointInfoResponse> res = PlainActionFuture.newFuture();
when(mockShard.state()).thenReturn(IndexShardState.RECOVERING);
// Recovering shard should just do a noop and return empty metadata map.
replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, res);
CheckpointInfoResponse response = res.get();
assert (response.getCheckpoint().equals(checkpoint));
assert (response.getMetadataMap().isEmpty());

when(mockShard.state()).thenReturn(IndexShardState.STARTED);
// Started shard should fail with assertion error.
expectThrows(AssertionError.class, () -> {
final PlainActionFuture<CheckpointInfoResponse> res2 = PlainActionFuture.newFuture();
replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, res2);
});
} finally {
closeShards(emptyIndexShard);
}
Expand Down

0 comments on commit 8fcd17d

Please sign in to comment.