Skip to content

Commit

Permalink
Handle replication cases where the remote store metadata file is not …
Browse files Browse the repository at this point in the history
…present

Signed-off-by: Ankit Kala <ankikala@amazon.com>
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ankitkala committed Jul 4, 2023
1 parent 64c0871 commit 264bc7b
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;

import java.util.Collections;
Expand Down Expand Up @@ -53,24 +54,28 @@ public void getCheckpointMetadata(
// TODO: Need to figure out a way to pass this information for segment metadata via remote store.
final Version version = indexShard.getSegmentInfosSnapshot().get().getCommitLuceneVersion();
try {
metadataMap = remoteDirectory.readLatestMetadataFile()
.getMetadata()
.entrySet()
.stream()
.collect(
Collectors.toMap(
e -> e.getKey(),
e -> new StoreFileMetadata(
e.getValue().getOriginalFilename(),
e.getValue().getLength(),
Store.digestToString(Long.valueOf(e.getValue().getChecksum())),
version,
null
RemoteSegmentMetadata mdFile = remoteDirectory.readLatestMetadataFile();
if (mdFile == null) {
listener.onResponse(new CheckpointInfoResponse(checkpoint, Collections.EMPTY_MAP, null));
} else {
metadataMap = mdFile.getMetadata()
.entrySet()
.stream()
.collect(
Collectors.toMap(
e -> e.getKey(),
e -> new StoreFileMetadata(
e.getValue().getOriginalFilename(),
e.getValue().getLength(),
Store.digestToString(Long.valueOf(e.getValue().getChecksum())),
version,
null
)
)
)
);
// TODO: GET current checkpoint from remote store.
listener.onResponse(new CheckpointInfoResponse(checkpoint, metadataMap, null));
);
// TODO: GET current checkpoint from remote store.
listener.onResponse(new CheckpointInfoResponse(checkpoint, metadataMap, null));
}
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class RemoteStoreReplicationSourceTests extends OpenSearchIndexLevelRepli

private IndexShard mockShard;

private Store remoteStore;

@Override
public void setUp() throws Exception {
super.setUp();
Expand All @@ -59,7 +61,7 @@ public void setUp() throws Exception {
Store store = mock(Store.class);
when(mockShard.store()).thenReturn(store);
when(store.directory()).thenReturn(indexShard.store().directory());
Store remoteStore = mock(Store.class);
remoteStore = mock(Store.class);
when(mockShard.remoteStore()).thenReturn(remoteStore);
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory =
(RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()).getDelegate())
Expand Down Expand Up @@ -113,6 +115,40 @@ public void testGetCheckpointMetadataFailure() {
});
}

public void testGetCheckpointMetadataEmpty() throws ExecutionException, InterruptedException, IOException {
when(mockShard.getSegmentInfosSnapshot()).thenReturn(indexShard.getSegmentInfosSnapshot());
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
indexShard.shardId(),
PRIMARY_TERM,
SEGMENTS_GEN,
VERSION,
Codec.getDefault().getName()
);
IndexShard emptyIndexShard = null;
try {
emptyIndexShard = newStartedShard(
true,
Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(),
new InternalEngineFactory()
);
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory =
(RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) emptyIndexShard.remoteStore().directory()).getDelegate())
.getDelegate();
FilterDirectory remoteStoreFilterDirectory = new RemoteStoreRefreshListenerTests.TestFilterDirectory(
new RemoteStoreRefreshListenerTests.TestFilterDirectory(remoteSegmentStoreDirectory)
);
when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory);

final PlainActionFuture<CheckpointInfoResponse> res = PlainActionFuture.newFuture();
replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, res);
CheckpointInfoResponse response = res.get();
assert (response.getCheckpoint().equals(checkpoint));
assert (response.getMetadataMap().isEmpty());
} finally {
closeShards(emptyIndexShard);
}
}

public void testGetSegmentFiles() throws ExecutionException, InterruptedException {
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
indexShard.shardId(),
Expand Down

0 comments on commit 264bc7b

Please sign in to comment.