Skip to content

Commit

Permalink
Unit test updates
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 committed Jul 19, 2023
1 parent 3614e09 commit 50c3a6c
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4630,7 +4630,6 @@ public List<StoreFileMetadata> syncSegmentsFromRemoteSegmentStore(List<StoreFile
store.incRef();
remoteStore.incRef();
final Directory storeDirectory = store.directory();
logger.info("--> storeDirectory {}", storeDirectory.getClass());
String segmentNFile = null;
for (StoreFileMetadata fileMetadata : filesToFetch) {
String file = fileMetadata.name();
Expand Down Expand Up @@ -4658,8 +4657,7 @@ public List<StoreFileMetadata> syncSegmentsFromRemoteSegmentStore(List<StoreFile
* @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync)
throws IOException {
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException {
assert indexSettings.isRemoteStoreEnabled();
logger.info("Downloading segments from remote segment store");
RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,8 @@ private boolean isRefreshAfterCommit() throws IOException {
&& !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)));
}

void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint) throws IOException {
void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint)
throws IOException {
final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint();
SegmentInfos segmentInfosSnapshot = segmentInfos.clone();
Map<String, String> userData = segmentInfosSnapshot.getUserData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,8 @@ public boolean containsFile(String localFilename, String checksum) {
* @param segmentFiles segment files that are part of the shard at the time of the latest refresh
* @param segmentInfosSnapshot SegmentInfos bytes to store as part of metadata file
* @param storeDirectory instance of local directory to temporarily create metadata file before upload
* @param primaryTerm primary term to be used in the name of metadata file
* @param translogGeneration translog generation
* @param replicationCheckpoint ReplicationCheckpoint of primary shard
* @throws IOException in case of I/O error while uploading the metadata file
*/
public void uploadMetadata(
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ public void buildInfosFromBytes(
byte[] infosBytes,
long segmentsGen,
CheckedConsumer<SegmentInfos, IOException> finalizeConsumer,
CheckedConsumer<Map<String,String>, IOException> renameConsumer
CheckedConsumer<Map<String, String>, IOException> renameConsumer
) throws IOException {
metadataLock.writeLock().lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ public class RemoteSegmentMetadata {

private final byte[] segmentInfosBytes;

// private final long primaryTerm;
// private final long generation;
//
// private final long version;
//
// private final long length;
//
// private final String codec;
// private final long primaryTerm;
// private final long generation;
//
// private final long version;
//
// private final long length;
//
// private final String codec;

public ReplicationCheckpoint getReplicationCheckpoint() {
return replicationCheckpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public void getCheckpointMetadata(
)
)
);
logger.info("--> Sending empty checkpoint");
listener.onResponse(new CheckpointInfoResponse(mdFile.getReplicationCheckpoint(), metadataMap, mdFile.getSegmentInfosBytes()));
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BufferedChecksumIndexInput;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.ByteBuffersIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
Expand All @@ -36,7 +32,6 @@
import org.opensearch.indices.replication.common.ReplicationTarget;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -198,7 +193,8 @@ private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo)
return diff.missing;
}

private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, GetSegmentFilesResponse getSegmentFilesResponse) throws OpenSearchCorruptionException {
private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, GetSegmentFilesResponse getSegmentFilesResponse)
throws OpenSearchCorruptionException {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION);
Store store = null;
Expand All @@ -207,17 +203,22 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse,
store.incRef();
Map<String, String> tempFileNames = null;
if (this.indexShard.indexSettings().isRemoteStoreEnabled() == true) {
tempFileNames = getSegmentFilesResponse.getFiles() != null ? getSegmentFilesResponse.getFiles().stream().collect(Collectors.toMap(StoreFileMetadata::name, StoreFileMetadata::name)) : Collections.emptyMap();
tempFileNames = getSegmentFilesResponse.getFiles() != null
? getSegmentFilesResponse.getFiles()
.stream()
.collect(Collectors.toMap(StoreFileMetadata::name, StoreFileMetadata::name))
: Collections.emptyMap();
} else {
tempFileNames = multiFileWriter.getTempFileNames();
}
logger.info("--> tempFileNames {} checkpointInfoResponse.getCheckpoint().getSegmentsGen() {}", tempFileNames, checkpointInfoResponse.getCheckpoint().getSegmentsGen());
store.buildInfosFromBytes(
tempFileNames,
checkpointInfoResponse.getInfosBytes(),
checkpointInfoResponse.getCheckpoint().getSegmentsGen(),
indexShard::finalizeReplication,
this.indexShard.indexSettings().isRemoteStoreEnabled() == true ? (files) -> {}: (files) -> indexShard.store().renameTempFilesSafe(files)
this.indexShard.indexSettings().isRemoteStoreEnabled() == true
? (files) -> {}
: (files) -> indexShard.store().renameTempFilesSafe(files)
);
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.index.store;

import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
Expand Down Expand Up @@ -44,6 +43,7 @@
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
Expand All @@ -56,8 +56,6 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.HashMap;
import java.util.Collection;
import java.util.concurrent.ExecutorService;

import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -102,7 +100,10 @@ public void setup() throws IOException {
);
testUploadTracker = new TestUploadListener();

Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build();
Settings indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService();

indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory());
Expand Down Expand Up @@ -225,7 +226,7 @@ private Map<String, String> getDummyMetadata(String prefix, int commitGeneration
* @return ByteArrayIndexInput: metadata file bytes with header and footer
* @throws IOException IOException
*/
private ByteArrayIndexInput createMetadataFileBytes(Map<String, String> segmentFilesMap, long generation, long primaryTerm)
private ByteArrayIndexInput createMetadataFileBytes(Map<String, String> segmentFilesMap, ReplicationCheckpoint replicationCheckpoint)
throws IOException {
ByteBuffersDataOutput byteBuffersIndexOutput = new ByteBuffersDataOutput();
segmentInfos.write(new ByteBuffersIndexOutput(byteBuffersIndexOutput, "", ""));
Expand All @@ -235,8 +236,7 @@ private ByteArrayIndexInput createMetadataFileBytes(Map<String, String> segmentF
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096);
CodecUtil.writeHeader(indexOutput, RemoteSegmentMetadata.METADATA_CODEC, RemoteSegmentMetadata.CURRENT_VERSION);
indexOutput.writeMapOfStrings(segmentFilesMap);
indexOutput.writeLong(generation);
indexOutput.writeLong(primaryTerm);
replicationCheckpoint.writeTo(indexOutput);
indexOutput.writeLong(byteArray.length);
indexOutput.writeBytes(byteArray, byteArray.length);
CodecUtil.writeFooter(indexOutput);
Expand Down Expand Up @@ -274,13 +274,13 @@ private Map<String, Map<String, String>> populateMetadata() throws IOException {
);

when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenAnswer(
I -> createMetadataFileBytes(metadataFilenameContentMapping.get(metadataFilename), 23, 12)
I -> createMetadataFileBytes(metadataFilenameContentMapping.get(metadataFilename), indexShard.getLatestReplicationCheckpoint())
);
when(remoteMetadataDirectory.openInput(metadataFilename2, IOContext.DEFAULT)).thenAnswer(
I -> createMetadataFileBytes(metadataFilenameContentMapping.get(metadataFilename2), 13, 12)
I -> createMetadataFileBytes(metadataFilenameContentMapping.get(metadataFilename2), indexShard.getLatestReplicationCheckpoint())
);
when(remoteMetadataDirectory.openInput(metadataFilename3, IOContext.DEFAULT)).thenAnswer(
I -> createMetadataFileBytes(metadataFilenameContentMapping.get(metadataFilename3), 38, 10)
I -> createMetadataFileBytes(metadataFilenameContentMapping.get(metadataFilename3), indexShard.getLatestReplicationCheckpoint())
);

return metadataFilenameContentMapping;
Expand Down Expand Up @@ -616,7 +616,9 @@ public void testContainsFile() throws IOException {
metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512");
metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::1024");

when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(createMetadataFileBytes(metadata, 1, 5));
when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(
createMetadataFileBytes(metadata, indexShard.getLatestReplicationCheckpoint())
);

remoteSegmentStoreDirectory.init();

Expand All @@ -641,12 +643,19 @@ public void testContainsFile() throws IOException {
public void testUploadMetadataEmpty() throws IOException {
Directory storeDirectory = mock(Directory.class);
IndexOutput indexOutput = mock(IndexOutput.class);
when(storeDirectory.createOutput(startsWith("metadata__12__o"), eq(IOContext.DEFAULT))).thenReturn(indexOutput);
final long primaryTerm = indexShard.getOperationPrimaryTerm();
when(storeDirectory.createOutput(startsWith("metadata__" + primaryTerm + "__o"), eq(IOContext.DEFAULT))).thenReturn(indexOutput);

Collection<String> segmentFiles = List.of("s1", "s2", "s3");
assertThrows(
NoSuchFileException.class,
() -> remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, segmentInfos, storeDirectory, 12L, ReplicationCheckpoint.empty(indexShard.shardId(), Codec.getDefault().getName()))
() -> remoteSegmentStoreDirectory.uploadMetadata(
segmentFiles,
segmentInfos,
storeDirectory,
34L,
indexShard.getLatestReplicationCheckpoint()
)
);
}

Expand All @@ -658,14 +667,20 @@ public void testUploadMetadataNonEmpty() throws IOException {
BytesStreamOutput output = new BytesStreamOutput();
IndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096);

String generation = RemoteStoreUtils.invertLong(segmentInfos.getGeneration());
String primaryTerm = RemoteStoreUtils.invertLong(12);
String generation = RemoteStoreUtils.invertLong(indexShard.getLatestReplicationCheckpoint().getSegmentsGen());
String primaryTerm = RemoteStoreUtils.invertLong(indexShard.getLatestReplicationCheckpoint().getPrimaryTerm());
when(storeDirectory.createOutput(startsWith("metadata__" + primaryTerm + "__" + generation), eq(IOContext.DEFAULT))).thenReturn(
indexOutput
);

Collection<String> segmentFiles = List.of("_0.si", "_0.cfe", "_0.cfs", "segments_1");
remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, segmentInfos, storeDirectory, 12L, indexShard.getLatestReplicationCheckpoint());
remoteSegmentStoreDirectory.uploadMetadata(
segmentFiles,
segmentInfos,
storeDirectory,
12L,
indexShard.getLatestReplicationCheckpoint()
);

verify(remoteMetadataDirectory).copyFrom(
eq(storeDirectory),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,7 @@ public void tearDown() throws Exception {

public void testGetCheckpointMetadata() throws ExecutionException, InterruptedException {
when(mockShard.getSegmentInfosSnapshot()).thenReturn(indexShard.getSegmentInfosSnapshot());
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
indexShard.shardId(),
PRIMARY_TERM,
SEGMENTS_GEN,
VERSION,
Codec.getDefault().getName()
);
final ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();

final PlainActionFuture<CheckpointInfoResponse> res = PlainActionFuture.newFuture();
replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, res);
Expand All @@ -101,13 +95,7 @@ public void testGetCheckpointMetadata() throws ExecutionException, InterruptedEx
}

public void testGetCheckpointMetadataFailure() {
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
indexShard.shardId(),
PRIMARY_TERM,
SEGMENTS_GEN,
VERSION,
Codec.getDefault().getName()
);
final ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();

when(mockShard.getSegmentInfosSnapshot()).thenThrow(new RuntimeException("test"));

Expand Down Expand Up @@ -174,21 +162,15 @@ public void testGetSegmentFiles() throws ExecutionException, InterruptedExceptio
replicationSource.getSegmentFiles(REPLICATION_ID, checkpoint, Collections.emptyList(), indexShard, res);
GetSegmentFilesResponse response = res.get();
assert (response.files.isEmpty());
assertEquals("remote store", replicationSource.getDescription());
assertEquals("RemoteStoreReplicationSource", replicationSource.getDescription());

}

public void testGetSegmentFilesFailure() throws IOException {
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
indexShard.shardId(),
PRIMARY_TERM,
SEGMENTS_GEN,
VERSION,
Codec.getDefault().getName()
);
final ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();
Mockito.doThrow(new RuntimeException("testing"))
.when(mockShard)
.syncSegmentsFromRemoteSegmentStore(Mockito.anyBoolean(), Mockito.anyBoolean());
.syncSegmentsFromRemoteSegmentStore(Mockito.any());
assertThrows(ExecutionException.class, () -> {
final PlainActionFuture<GetSegmentFilesResponse> res = PlainActionFuture.newFuture();
replicationSource.getSegmentFiles(REPLICATION_ID, checkpoint, Collections.emptyList(), mockShard, res);
Expand Down

0 comments on commit 50c3a6c

Please sign in to comment.