Skip to content

Commit

Permalink
[Remote Store] Translog metadata filename change
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
  • Loading branch information
gbbafna committed Jun 30, 2023
1 parent 6dd3fe0 commit 3178356
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
Expand Down Expand Up @@ -140,4 +141,9 @@ public void listFoldersAsync(String threadpoolName, Iterable<String> path, Actio
}
});
}

public void listBlobsInSortedOrder(Iterable<String> path, int limit, ActionListener<List<BlobMetadata>> listener) throws IOException {
blobStore.blobContainer((BlobPath) path).listBlobsByPrefixInLexicographicOrder("", limit, listener);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.translog.transfer;

import org.opensearch.action.ActionListener;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;

import java.io.IOException;
Expand Down Expand Up @@ -114,4 +115,6 @@ void uploadBlobAsync(
*/
InputStream downloadBlob(Iterable<String> path, String fileName) throws IOException;

void listBlobsInSortedOrder(Iterable<String> path, int limit, ActionListener<List<BlobMetadata>> listener) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.opensearch.action.ActionListener;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.SetOnce;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.VersionedCodecStreamWrapper;
Expand Down Expand Up @@ -185,15 +187,39 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th
}

public TranslogTransferMetadata readMetadata() throws IOException {
return transferService.listAll(remoteMetadataTransferPath).stream().max(METADATA_FILENAME_COMPARATOR).map(filename -> {
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) {
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
return metadataStreamWrapper.readStream(indexInput);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e);
return null;
}
}).orElse(null);
SetOnce<TranslogTransferMetadata> metadataSetOnce = new SetOnce<>();
SetOnce<IOException> exceptionSetOnce = new SetOnce<>();
final CountDownLatch latch = new CountDownLatch(1);
LatchedActionListener<List<BlobMetadata>> latchedActionListener = new LatchedActionListener<>(
ActionListener.wrap(blobMetadataList -> {
if (blobMetadataList.isEmpty()) return;
String filename = blobMetadataList.get(0).name();
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) {
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
metadataSetOnce.set(metadataStreamWrapper.readStream(indexInput));
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e);
exceptionSetOnce.set(e);
}
}, e -> {
logger.error(() -> new ParameterizedMessage("Exception while listing metadata files "), e);
exceptionSetOnce.set((IOException) e);
}),
latch
);

try {
transferService.listBlobsInSortedOrder(remoteMetadataTransferPath, 1, latchedActionListener);
latch.await();
} catch (InterruptedException | IOException e) {
throw new IOException("Exception while reading/downloading metadafile", e);
}

if (exceptionSetOnce.get() != null) {
throw exceptionSetOnce.get();
}

return metadataSetOnce.get();
}

private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,16 @@ public Map<String, String> getGenerationToPrimaryTermMapper() {
}

public static String getFileName(long primaryTerm, long generation) {
return String.join(METADATA_SEPARATOR, Arrays.asList(String.valueOf(primaryTerm), String.valueOf(generation)));
return String.join(
METADATA_SEPARATOR,
Arrays.asList(
String.valueOf(Long.MAX_VALUE - primaryTerm),
String.valueOf(Long.MAX_VALUE - generation),
String.valueOf(Long.MAX_VALUE - System.currentTimeMillis()),
String.valueOf(primaryTerm),
String.valueOf(generation)
)
);
}

@Override
Expand All @@ -95,11 +104,11 @@ public boolean equals(Object o) {
private static class MetadataFilenameComparator implements Comparator<String> {
@Override
public int compare(String first, String second) {
// Format of metadata filename is <Primary Term>__<Generation>
// Format of metadata filename is <Inv Primary Term>__<Inv Generation>__<Inv Timestamp>__<Primary Term>__<Generation>
String[] filenameTokens1 = first.split(METADATA_SEPARATOR);
String[] filenameTokens2 = second.split(METADATA_SEPARATOR);
// Here, we are comparing only primary term and generation.
for (int i = 0; i < filenameTokens1.length; i++) {
// Here, we are comparing only inverted primary term and inv generation.
for (int i = 0; i < 2; i++) {
if (filenameTokens1[i].equals(filenameTokens2[i]) == false) {
return Long.compare(Long.parseLong(filenameTokens1[i]), Long.parseLong(filenameTokens2[i]));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import org.apache.lucene.tests.util.LuceneTestCase;
import org.mockito.Mockito;
import org.opensearch.action.ActionListener;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.util.set.Sets;
import org.opensearch.common.blobstore.support.PlainBlobMetadata;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot;
Expand All @@ -31,10 +33,12 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
Expand Down Expand Up @@ -186,68 +190,90 @@ public void testReadMetadataNoFile() throws IOException {
remoteBaseTransferPath,
null
);
doAnswer(invocation -> {
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(2);
List<BlobMetadata> bmList = new LinkedList<>();
latchedActionListener.onResponse(bmList);
return null;
}).when(transferService).listBlobsInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class));

when(transferService.listAll(remoteBaseTransferPath)).thenReturn(Sets.newHashSet());
assertNull(translogTransferManager.readMetadata());
}

public void testReadMetadataSingleFile() throws IOException {
public void testReadMetadataHappy() throws IOException {
TranslogTransferManager translogTransferManager = new TranslogTransferManager(
shardId,
transferService,
remoteBaseTransferPath,
null
);

// BlobPath does not have equals method, so we can't use the instance directly in when
when(transferService.listAll(any(BlobPath.class))).thenReturn(Sets.newHashSet("12__234"));
doAnswer(invocation -> {
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(2);
List<BlobMetadata> bmList = new LinkedList<>();
bmList.add(new PlainBlobMetadata("0__3__3__13__235", 1));
latchedActionListener.onResponse(bmList);
return null;
}).when(transferService).listBlobsInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class));

TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata();
when(transferService.downloadBlob(any(BlobPath.class), eq("12__234"))).thenReturn(
when(transferService.downloadBlob(any(BlobPath.class), eq("0__3__3__13__235"))).thenReturn(
new ByteArrayInputStream(translogTransferManager.getMetadataBytes(metadata))
);

assertEquals(metadata, translogTransferManager.readMetadata());
}

public void testReadMetadataMultipleFiles() throws IOException {
public void testReadMetadataReadException() throws IOException {
TranslogTransferManager translogTransferManager = new TranslogTransferManager(
shardId,
transferService,
remoteBaseTransferPath,
null
);

when(transferService.listAll(any(BlobPath.class))).thenReturn(Sets.newHashSet("12__234", "12__235", "12__233"));
doAnswer(invocation -> {
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(2);
List<BlobMetadata> bmList = new LinkedList<>();
bmList.add(new PlainBlobMetadata("0__3__3__13__235", 1));
latchedActionListener.onResponse(bmList);
return null;
}).when(transferService).listBlobsInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class));

TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata();
when(transferService.downloadBlob(any(BlobPath.class), eq("12__235"))).thenReturn(
new ByteArrayInputStream(translogTransferManager.getMetadataBytes(metadata))
);
when(transferService.downloadBlob(any(BlobPath.class), eq("0__3__3__13__235"))).thenThrow(new IOException("Something went wrong"));

assertEquals(metadata, translogTransferManager.readMetadata());
assertThrows(IOException.class, translogTransferManager::readMetadata);
}

public void testReadMetadataException() throws IOException {
public void testReadMetadataListException() throws IOException {
TranslogTransferManager translogTransferManager = new TranslogTransferManager(
shardId,
transferService,
remoteBaseTransferPath,
null
);

when(transferService.listAll(any(BlobPath.class))).thenReturn(Sets.newHashSet("12__234", "12__235", "12__233"));
doAnswer(invocation -> {
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(2);
latchedActionListener.onFailure(new IOException("Issue while listing"));
return null;
}).when(transferService).listBlobsInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class));

when(transferService.downloadBlob(any(BlobPath.class), eq("12__235"))).thenThrow(new IOException("Something went wrong"));
when(transferService.downloadBlob(any(BlobPath.class), eq("1__3__3__12__235"))).thenThrow(new IOException("Something went wrong"));

assertNull(translogTransferManager.readMetadata());
assertThrows(IOException.class, translogTransferManager::readMetadata);
}

public void testReadMetadataSamePrimaryTermGeneration() throws IOException {
List<String> metadataFiles = Arrays.asList("12__234", "12__235", "12__234");
public void testReadMetadataComparatorIllegal() {
List<String> metadataFiles = Arrays.asList("1__4__4__12__234", "1__4__3__12__235", "1__4__4__12__234");
assertThrows(IllegalArgumentException.class, () -> metadataFiles.sort(TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR));
}

public void testReadMetadataComparator() {
List<String> metadataFiles = Arrays.asList("1__4__4__12__234", "0__3__3__13__235", "1__5__5__12__233");
metadataFiles.sort(TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR);
assertEquals(Arrays.asList("0__3__3__13__235", "1__4__4__12__234", "1__5__5__12__233"), metadataFiles);
}

public void testDownloadTranslog() throws IOException {
Path location = createTempDir();
TranslogTransferManager translogTransferManager = new TranslogTransferManager(
Expand Down

0 comments on commit 3178356

Please sign in to comment.