From f5b124723a429ab27300dbb98ff013e69788fb1f Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Tue, 27 Jun 2023 10:23:52 +0530 Subject: [PATCH 1/6] [Remote Store] Translog metadata filename change Signed-off-by: Gaurav Bafna --- .../transfer/BlobStoreTransferService.java | 6 ++ .../translog/transfer/TransferService.java | 3 + .../transfer/TranslogTransferManager.java | 44 ++++++++++--- .../transfer/TranslogTransferMetadata.java | 17 +++-- .../TranslogTransferManagerTests.java | 66 +++++++++++++------ 5 files changed, 103 insertions(+), 33 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 88fe816ccb462..744ca47543858 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; +import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; @@ -140,4 +141,9 @@ public void listFoldersAsync(String threadpoolName, Iterable path, Actio } }); } + + public void listBlobsInSortedOrder(Iterable path, int limit, ActionListener> listener) throws IOException { + blobStore.blobContainer((BlobPath) path).listBlobsByPrefixInLexicographicOrder("", limit, listener); + } + } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 6aca3055a3f53..4039924af5bf5 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -9,6 +9,7 @@ package org.opensearch.index.translog.transfer; import org.opensearch.action.ActionListener; +import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import java.io.IOException; @@ -114,4 +115,6 @@ void uploadBlobAsync( */ InputStream downloadBlob(Iterable path, String fileName) throws IOException; + void listBlobsInSortedOrder(Iterable path, int limit, ActionListener> listener) throws IOException; + } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index f6405bc9b5c82..75258b304e29b 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -15,6 +15,8 @@ import org.apache.lucene.store.OutputStreamIndexOutput; import org.opensearch.action.ActionListener; import org.opensearch.action.LatchedActionListener; +import org.opensearch.common.SetOnce; +import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.VersionedCodecStreamWrapper; @@ -185,15 +187,39 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th } public TranslogTransferMetadata readMetadata() throws IOException { - return transferService.listAll(remoteMetadataTransferPath).stream().max(METADATA_FILENAME_COMPARATOR).map(filename -> { - try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) { - IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes()); - return metadataStreamWrapper.readStream(indexInput); - } catch (IOException e) { - logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e); - return null; - } - }).orElse(null); + SetOnce metadataSetOnce = new SetOnce<>(); + SetOnce exceptionSetOnce = new SetOnce<>(); + final CountDownLatch latch = new CountDownLatch(1); + LatchedActionListener> latchedActionListener = new LatchedActionListener<>( + ActionListener.wrap(blobMetadataList -> { + if (blobMetadataList.isEmpty()) return; + String filename = blobMetadataList.get(0).name(); + try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) { + IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes()); + metadataSetOnce.set(metadataStreamWrapper.readStream(indexInput)); + } catch (IOException e) { + logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e); + exceptionSetOnce.set(e); + } + }, e -> { + logger.error(() -> new ParameterizedMessage("Exception while listing metadata files "), e); + exceptionSetOnce.set((IOException) e); + }), + latch + ); + + try { + transferService.listBlobsInSortedOrder(remoteMetadataTransferPath, 1, latchedActionListener); + latch.await(); + } catch (InterruptedException | IOException e) { + throw new IOException("Exception while reading/downloading metadafile", e); + } + + if (exceptionSetOnce.get() != null) { + throw exceptionSetOnce.get(); + } + + return metadataSetOnce.get(); } private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) throws IOException { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 7a2fee9a69d5e..62a1f4aa6ab86 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -76,7 +76,16 @@ public Map getGenerationToPrimaryTermMapper() { } public static String getFileName(long primaryTerm, long generation) { - return String.join(METADATA_SEPARATOR, Arrays.asList(String.valueOf(primaryTerm), String.valueOf(generation))); + return String.join( + METADATA_SEPARATOR, + Arrays.asList( + String.valueOf(Long.MAX_VALUE - primaryTerm), + String.valueOf(Long.MAX_VALUE - generation), + String.valueOf(Long.MAX_VALUE - System.currentTimeMillis()), + String.valueOf(primaryTerm), + String.valueOf(generation) + ) + ); } @Override @@ -95,11 +104,11 @@ public boolean equals(Object o) { private static class MetadataFilenameComparator implements Comparator { @Override public int compare(String first, String second) { - // Format of metadata filename is __ + // Format of metadata filename is ________ String[] filenameTokens1 = first.split(METADATA_SEPARATOR); String[] filenameTokens2 = second.split(METADATA_SEPARATOR); - // Here, we are comparing only primary term and generation. - for (int i = 0; i < filenameTokens1.length; i++) { + // Here, we are comparing only inverted primary term and inv generation. + for (int i = 0; i < 2; i++) { if (filenameTokens1[i].equals(filenameTokens2[i]) == false) { return Long.compare(Long.parseLong(filenameTokens1[i]), Long.parseLong(filenameTokens2[i])); } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 6f6b3622295b6..ef3dead137c44 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -11,10 +11,12 @@ import org.apache.lucene.tests.util.LuceneTestCase; import org.mockito.Mockito; import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; -import org.opensearch.common.util.set.Sets; +import org.opensearch.common.blobstore.support.PlainBlobMetadata; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; @@ -31,10 +33,12 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; +import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -186,31 +190,40 @@ public void testReadMetadataNoFile() throws IOException { remoteBaseTransferPath, null ); + doAnswer(invocation -> { + LatchedActionListener> latchedActionListener = invocation.getArgument(2); + List bmList = new LinkedList<>(); + latchedActionListener.onResponse(bmList); + return null; + }).when(transferService).listBlobsInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); - when(transferService.listAll(remoteBaseTransferPath)).thenReturn(Sets.newHashSet()); assertNull(translogTransferManager.readMetadata()); } - public void testReadMetadataSingleFile() throws IOException { + public void testReadMetadataHappy() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, remoteBaseTransferPath, null ); - - // BlobPath does not have equals method, so we can't use the instance directly in when - when(transferService.listAll(any(BlobPath.class))).thenReturn(Sets.newHashSet("12__234")); + doAnswer(invocation -> { + LatchedActionListener> latchedActionListener = invocation.getArgument(2); + List bmList = new LinkedList<>(); + bmList.add(new PlainBlobMetadata("0__3__3__13__235", 1)); + latchedActionListener.onResponse(bmList); + return null; + }).when(transferService).listBlobsInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata(); - when(transferService.downloadBlob(any(BlobPath.class), eq("12__234"))).thenReturn( + when(transferService.downloadBlob(any(BlobPath.class), eq("0__3__3__13__235"))).thenReturn( new ByteArrayInputStream(translogTransferManager.getMetadataBytes(metadata)) ); assertEquals(metadata, translogTransferManager.readMetadata()); } - public void testReadMetadataMultipleFiles() throws IOException { + public void testReadMetadataReadException() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, @@ -218,17 +231,20 @@ public void testReadMetadataMultipleFiles() throws IOException { null ); - when(transferService.listAll(any(BlobPath.class))).thenReturn(Sets.newHashSet("12__234", "12__235", "12__233")); + doAnswer(invocation -> { + LatchedActionListener> latchedActionListener = invocation.getArgument(2); + List bmList = new LinkedList<>(); + bmList.add(new PlainBlobMetadata("0__3__3__13__235", 1)); + latchedActionListener.onResponse(bmList); + return null; + }).when(transferService).listBlobsInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); - TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata(); - when(transferService.downloadBlob(any(BlobPath.class), eq("12__235"))).thenReturn( - new ByteArrayInputStream(translogTransferManager.getMetadataBytes(metadata)) - ); + when(transferService.downloadBlob(any(BlobPath.class), eq("0__3__3__13__235"))).thenThrow(new IOException("Something went wrong")); - assertEquals(metadata, translogTransferManager.readMetadata()); + assertThrows(IOException.class, translogTransferManager::readMetadata); } - public void testReadMetadataException() throws IOException { + public void testReadMetadataListException() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, @@ -236,18 +252,28 @@ public void testReadMetadataException() throws IOException { null ); - when(transferService.listAll(any(BlobPath.class))).thenReturn(Sets.newHashSet("12__234", "12__235", "12__233")); + doAnswer(invocation -> { + LatchedActionListener> latchedActionListener = invocation.getArgument(2); + latchedActionListener.onFailure(new IOException("Issue while listing")); + return null; + }).when(transferService).listBlobsInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); - when(transferService.downloadBlob(any(BlobPath.class), eq("12__235"))).thenThrow(new IOException("Something went wrong")); + when(transferService.downloadBlob(any(BlobPath.class), eq("1__3__3__12__235"))).thenThrow(new IOException("Something went wrong")); - assertNull(translogTransferManager.readMetadata()); + assertThrows(IOException.class, translogTransferManager::readMetadata); } - public void testReadMetadataSamePrimaryTermGeneration() throws IOException { - List metadataFiles = Arrays.asList("12__234", "12__235", "12__234"); + public void testReadMetadataComparatorIllegal() { + List metadataFiles = Arrays.asList("1__4__4__12__234", "1__4__3__12__235", "1__4__4__12__234"); assertThrows(IllegalArgumentException.class, () -> metadataFiles.sort(TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR)); } + public void testReadMetadataComparator() { + List metadataFiles = Arrays.asList("1__4__4__12__234", "0__3__3__13__235", "1__5__5__12__233"); + metadataFiles.sort(TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR); + assertEquals(Arrays.asList("0__3__3__13__235", "1__4__4__12__234", "1__5__5__12__233"), metadataFiles); + } + public void testDownloadTranslog() throws IOException { Path location = createTempDir(); TranslogTransferManager translogTransferManager = new TranslogTransferManager( From 7dc5c2ac1ca639363672f884f5d6879c01920276 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Mon, 3 Jul 2023 08:54:10 +0530 Subject: [PATCH 2/6] More changes while reading md file and UTs Signed-off-by: Gaurav Bafna --- .../index/translog/RemoteFsTranslog.java | 3 +- .../transfer/BlobStoreTransferService.java | 30 ++++++---- .../translog/transfer/TransferService.java | 12 +--- .../transfer/TranslogTransferManager.java | 58 ++++++++++--------- .../transfer/TranslogTransferMetadata.java | 34 ++++------- .../index/translog/RemoteFSTranslogTests.java | 14 +---- .../TranslogTransferManagerTests.java | 35 +++++------ 7 files changed, 79 insertions(+), 107 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 04057b581e8d9..091d6a6009d6c 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -385,6 +385,7 @@ public void trimUnreferencedReaders() throws IOException { } if (generationsToDelete.isEmpty() == false) { deleteRemoteGeneration(generationsToDelete); + translogTransferManager.deleteStaleTranslogMetadataFilesAsync(remoteGenerationDeletionPermits::release); deleteStaleRemotePrimaryTermsAndMetadataFiles(); } else { remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); @@ -418,8 +419,6 @@ private void deleteStaleRemotePrimaryTermsAndMetadataFiles() { assert readers.isEmpty() == false : "Expected non-empty readers"; long minimumReferencedPrimaryTerm = readers.stream().map(BaseTranslogReader::getPrimaryTerm).min(Long::compare).get(); translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm); - // Second we delete all stale metadata files from remote store - translogTransferManager.deleteStaleTranslogMetadataFilesAsync(); } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 744ca47543858..4aa6577cbf520 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Set; +import static org.opensearch.common.blobstore.BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC; + /** * Service that handles remote transfer of translog and checkpoint files * @@ -116,34 +118,38 @@ public Set listAll(Iterable path) throws IOException { } @Override - public void listAllAsync(String threadpoolName, Iterable path, ActionListener> listener) { + public Set listFolders(Iterable path) throws IOException { + return blobStore.blobContainer((BlobPath) path).children().keySet(); + } + + @Override + public void listFoldersAsync(String threadpoolName, Iterable path, ActionListener> listener) { threadPool.executor(threadpoolName).execute(() -> { try { - listener.onResponse(listAll(path)); + listener.onResponse(listFolders(path)); } catch (IOException e) { listener.onFailure(e); } }); } - @Override - public Set listFolders(Iterable path) throws IOException { - return blobStore.blobContainer((BlobPath) path).children().keySet(); + public void listAllInSortedOrder(Iterable path, int limit, ActionListener> listener) throws IOException { + blobStore.blobContainer((BlobPath) path).listBlobsByPrefixInSortedOrder("", limit, LEXICOGRAPHIC, listener); } - @Override - public void listFoldersAsync(String threadpoolName, Iterable path, ActionListener> listener) { + public void listAllInSortedOrderAsync( + String threadpoolName, + Iterable path, + int limit, + ActionListener> listener + ) { threadPool.executor(threadpoolName).execute(() -> { try { - listener.onResponse(listFolders(path)); + listAllInSortedOrder(path, limit, listener); } catch (IOException e) { listener.onFailure(e); } }); } - public void listBlobsInSortedOrder(Iterable path, int limit, ActionListener> listener) throws IOException { - blobStore.blobContainer((BlobPath) path).listBlobsByPrefixInLexicographicOrder("", limit, listener); - } - } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 4039924af5bf5..f20bb82ce259a 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -81,14 +81,6 @@ void uploadBlobAsync( */ Set listAll(Iterable path) throws IOException; - /** - * Lists the files and invokes the listener on success or failure - * @param threadpoolName threadpool type which will be used to list all files asynchronously. - * @param path the path to list - * @param listener the callback to be invoked once list operation completes successfully/fails. - */ - void listAllAsync(String threadpoolName, Iterable path, ActionListener> listener); - /** * Lists the folders inside the path. * @param path : the path @@ -115,6 +107,8 @@ void uploadBlobAsync( */ InputStream downloadBlob(Iterable path, String fileName) throws IOException; - void listBlobsInSortedOrder(Iterable path, int limit, ActionListener> listener) throws IOException; + void listAllInSortedOrder(Iterable path, int limit, ActionListener> listener) throws IOException; + + void listAllInSortedOrderAsync(String threadpoolName, Iterable path, int limit, ActionListener> listener); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 75258b304e29b..709d62b6e1575 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -44,8 +44,6 @@ import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; -import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR; -import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.getFileName; /** * The class responsible for orchestrating the transfer of a {@link TransferSnapshot} via a {@link TransferService} @@ -209,7 +207,7 @@ public TranslogTransferMetadata readMetadata() throws IOException { ); try { - transferService.listBlobsInSortedOrder(remoteMetadataTransferPath, 1, latchedActionListener); + transferService.listAllInSortedOrder(remoteMetadataTransferPath, 1, latchedActionListener); latch.await(); } catch (InterruptedException | IOException e) { throw new IOException("Exception while reading/downloading metadafile", e); @@ -237,7 +235,7 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap)); return new TransferFileSnapshot( - getFileName(translogTransferMetadata.getPrimaryTerm(), translogTransferMetadata.getGeneration()), + translogTransferMetadata.getFileName(), getMetadataBytes(translogTransferMetadata), translogTransferMetadata.getPrimaryTerm() ); @@ -256,7 +254,7 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep try ( OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( "translog transfer metadata " + metadata.getPrimaryTerm(), - getFileName(metadata.getPrimaryTerm(), metadata.getGeneration()), + metadata.getFileName(), output, TranslogTransferMetadata.BUFFER_SIZE ) @@ -279,20 +277,14 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep */ public void deleteGenerationAsync(long primaryTerm, Set generations, Runnable onCompletion) { List translogFiles = new ArrayList<>(); - List metadataFiles = new ArrayList<>(); generations.forEach(generation -> { // Add .ckp and .tlog file to translog file list which is located in basePath/ String ckpFileName = Translog.getCommitCheckpointFileName(generation); String translogFileName = Translog.getFilename(generation); translogFiles.addAll(List.of(ckpFileName, translogFileName)); - // Add metadata file tio metadata file list which is located in basePath/metadata - String metadataFileName = TranslogTransferMetadata.getFileName(primaryTerm, generation); - metadataFiles.add(metadataFileName); }); // Delete the translog and checkpoint files asynchronously deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion); - // Delete the metadata files asynchronously - deleteMetadataFilesAsync(metadataFiles, onCompletion); } /** @@ -367,24 +359,34 @@ public void onFailure(Exception e) { }); } - public void deleteStaleTranslogMetadataFilesAsync() { - transferService.listAllAsync(ThreadPool.Names.REMOTE_PURGE, remoteMetadataTransferPath, new ActionListener<>() { - @Override - public void onResponse(Set metadataFiles) { - List sortedMetadataFiles = metadataFiles.stream().sorted(METADATA_FILENAME_COMPARATOR).collect(Collectors.toList()); - if (sortedMetadataFiles.size() <= 1) { - logger.trace("Remote Metadata file count is {}, so skipping deletion", sortedMetadataFiles.size()); - return; - } - List metadataFilesToDelete = sortedMetadataFiles.subList(0, sortedMetadataFiles.size() - 1); - deleteMetadataFilesAsync(metadataFilesToDelete); - } + public void deleteStaleTranslogMetadataFilesAsync(Runnable onCompletion) { + try { + transferService.listAllInSortedOrderAsync( + ThreadPool.Names.REMOTE_PURGE, + remoteMetadataTransferPath, + Integer.MAX_VALUE, + new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + List sortedMetadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()); + if (sortedMetadataFiles.size() <= 1) { + logger.trace("Remote Metadata file count is {}, so skipping deletion", sortedMetadataFiles.size()); + return; + } + List metadataFilesToDelete = sortedMetadataFiles.subList(1, sortedMetadataFiles.size()); + logger.trace("Deleting remote translog metadata files {}", metadataFilesToDelete); + deleteMetadataFilesAsync(metadataFilesToDelete, onCompletion); + } - @Override - public void onFailure(Exception e) { - logger.error("Exception occurred while listing translog metadata files from remote store", e); - } - }); + @Override + public void onFailure(Exception e) { + logger.error("Exception occurred while listing translog metadata files from remote store", e); + } + } + ); + } catch (Exception e) { + logger.error("Exception occurred while listing translog metadata files from remote store", e); + } } public void deleteTranslogFiles() throws IOException { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 62a1f4aa6ab86..b2eacec33256d 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -9,9 +9,9 @@ package org.opensearch.index.translog.transfer; import org.opensearch.common.SetOnce; +import org.opensearch.index.remote.RemoteStoreUtils; import java.util.Arrays; -import java.util.Comparator; import java.util.Map; import java.util.Objects; @@ -42,13 +42,14 @@ public class TranslogTransferMetadata { static final String METADATA_CODEC = "md"; - public static final Comparator METADATA_FILENAME_COMPARATOR = new MetadataFilenameComparator(); + private final long createdAt; public TranslogTransferMetadata(long primaryTerm, long generation, long minTranslogGeneration, int count) { this.primaryTerm = primaryTerm; this.generation = generation; this.minTranslogGeneration = minTranslogGeneration; this.count = count; + this.createdAt = System.currentTimeMillis(); } public long getPrimaryTerm() { @@ -75,13 +76,16 @@ public Map getGenerationToPrimaryTermMapper() { return generationToPrimaryTermMapper.get(); } - public static String getFileName(long primaryTerm, long generation) { + /* + This should be used only at the time of creation. + */ + public String getFileName() { return String.join( METADATA_SEPARATOR, Arrays.asList( - String.valueOf(Long.MAX_VALUE - primaryTerm), - String.valueOf(Long.MAX_VALUE - generation), - String.valueOf(Long.MAX_VALUE - System.currentTimeMillis()), + RemoteStoreUtils.invertLong(primaryTerm), + RemoteStoreUtils.invertLong(generation), + RemoteStoreUtils.invertLong(createdAt), String.valueOf(primaryTerm), String.valueOf(generation) ) @@ -100,22 +104,4 @@ public boolean equals(Object o) { TranslogTransferMetadata other = (TranslogTransferMetadata) o; return Objects.equals(this.primaryTerm, other.primaryTerm) && Objects.equals(this.generation, other.generation); } - - private static class MetadataFilenameComparator implements Comparator { - @Override - public int compare(String first, String second) { - // Format of metadata filename is ________ - String[] filenameTokens1 = first.split(METADATA_SEPARATOR); - String[] filenameTokens2 = second.split(METADATA_SEPARATOR); - // Here, we are comparing only inverted primary term and inv generation. - for (int i = 0; i < 2; i++) { - if (filenameTokens1[i].equals(filenameTokens2[i]) == false) { - return Long.compare(Long.parseLong(filenameTokens1[i]), Long.parseLong(filenameTokens2[i])); - } - } - throw new IllegalArgumentException( - "TranslogTransferMetadata files " + first + " and " + second + " have same primary term and generation" - ); - } - } } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index d963830e9e736..30c04c731d1f8 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -570,7 +570,7 @@ public void testMetadataFileDeletion() throws Exception { assertEquals(1, translog.readers.size()); } assertBusy(() -> assertEquals(4, translog.allUploaded().size())); - assertBusy(() -> assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); + assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); int moreDocs = randomIntBetween(3, 10); logger.info("numDocs={} moreDocs={}", numDocs, moreDocs); for (int i = numDocs; i < numDocs + moreDocs; i++) { @@ -579,7 +579,7 @@ public void testMetadataFileDeletion() throws Exception { translog.trimUnreferencedReaders(); assertEquals(1 + moreDocs, translog.readers.size()); assertBusy(() -> assertEquals(2 + 2L * moreDocs, translog.allUploaded().size())); - assertBusy(() -> assertEquals(1 + moreDocs, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); + assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); int totalDocs = numDocs + moreDocs; translog.setMinSeqNoToKeep(totalDocs - 1); @@ -592,7 +592,7 @@ public void testMetadataFileDeletion() throws Exception { ); translog.setMinSeqNoToKeep(totalDocs); translog.trimUnreferencedReaders(); - assertBusy(() -> assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); + assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); // Change primary term and test the deletion of older primaries String translogUUID = translog.translogUUID; @@ -607,10 +607,6 @@ public void testMetadataFileDeletion() throws Exception { long oldPrimaryTerm = primaryTerm.get(); long newPrimaryTerm = primaryTerm.incrementAndGet(); - // Check all metadata files corresponds to old primary term - Set mdFileNames = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)); - assertTrue(mdFileNames.stream().allMatch(name -> name.startsWith(String.valueOf(oldPrimaryTerm).concat("__")))); - // Creating RemoteFsTranslog with the same location Translog newTranslog = create(translogDir, repository, translogUUID); int newPrimaryTermDocs = randomIntBetween(5, 10); @@ -621,10 +617,6 @@ public void testMetadataFileDeletion() throws Exception { newTranslog.trimUnreferencedReaders(); } - // Check that all metadata files are belonging now to the new primary - mdFileNames = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)); - assertTrue(mdFileNames.stream().allMatch(name -> name.startsWith(String.valueOf(newPrimaryTerm).concat("__")))); - try { newTranslog.close(); } catch (Exception e) { diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index ef3dead137c44..078eacc8025e4 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -32,7 +32,6 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -195,7 +194,7 @@ public void testReadMetadataNoFile() throws IOException { List bmList = new LinkedList<>(); latchedActionListener.onResponse(bmList); return null; - }).when(transferService).listBlobsInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); + }).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); assertNull(translogTransferManager.readMetadata()); } @@ -207,16 +206,18 @@ public void testReadMetadataHappy() throws IOException { remoteBaseTransferPath, null ); + TranslogTransferMetadata tm = new TranslogTransferMetadata(1, 1, 1, 2); + String mdFilename = tm.getFileName(); doAnswer(invocation -> { LatchedActionListener> latchedActionListener = invocation.getArgument(2); List bmList = new LinkedList<>(); - bmList.add(new PlainBlobMetadata("0__3__3__13__235", 1)); + bmList.add(new PlainBlobMetadata(mdFilename, 1)); latchedActionListener.onResponse(bmList); return null; - }).when(transferService).listBlobsInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); + }).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata(); - when(transferService.downloadBlob(any(BlobPath.class), eq("0__3__3__13__235"))).thenReturn( + when(transferService.downloadBlob(any(BlobPath.class), eq(mdFilename))).thenReturn( new ByteArrayInputStream(translogTransferManager.getMetadataBytes(metadata)) ); @@ -231,15 +232,18 @@ public void testReadMetadataReadException() throws IOException { null ); + TranslogTransferMetadata tm = new TranslogTransferMetadata(1, 1, 1, 2); + String mdFilename = tm.getFileName(); + doAnswer(invocation -> { LatchedActionListener> latchedActionListener = invocation.getArgument(2); List bmList = new LinkedList<>(); - bmList.add(new PlainBlobMetadata("0__3__3__13__235", 1)); + bmList.add(new PlainBlobMetadata(mdFilename, 1)); latchedActionListener.onResponse(bmList); return null; - }).when(transferService).listBlobsInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); + }).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); - when(transferService.downloadBlob(any(BlobPath.class), eq("0__3__3__13__235"))).thenThrow(new IOException("Something went wrong")); + when(transferService.downloadBlob(any(BlobPath.class), eq(mdFilename))).thenThrow(new IOException("Something went wrong")); assertThrows(IOException.class, translogTransferManager::readMetadata); } @@ -256,24 +260,13 @@ public void testReadMetadataListException() throws IOException { LatchedActionListener> latchedActionListener = invocation.getArgument(2); latchedActionListener.onFailure(new IOException("Issue while listing")); return null; - }).when(transferService).listBlobsInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); + }).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); - when(transferService.downloadBlob(any(BlobPath.class), eq("1__3__3__12__235"))).thenThrow(new IOException("Something went wrong")); + when(transferService.downloadBlob(any(BlobPath.class), any(String.class))).thenThrow(new IOException("Something went wrong")); assertThrows(IOException.class, translogTransferManager::readMetadata); } - public void testReadMetadataComparatorIllegal() { - List metadataFiles = Arrays.asList("1__4__4__12__234", "1__4__3__12__235", "1__4__4__12__234"); - assertThrows(IllegalArgumentException.class, () -> metadataFiles.sort(TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR)); - } - - public void testReadMetadataComparator() { - List metadataFiles = Arrays.asList("1__4__4__12__234", "0__3__3__13__235", "1__5__5__12__233"); - metadataFiles.sort(TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR); - assertEquals(Arrays.asList("0__3__3__13__235", "1__4__4__12__234", "1__5__5__12__233"), metadataFiles); - } - public void testDownloadTranslog() throws IOException { Path location = createTempDir(); TranslogTransferManager translogTransferManager = new TranslogTransferManager( From 7b02799e1d7d118df6feaddcdabb46e3aa3cff96 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Wed, 5 Jul 2023 16:03:01 +0530 Subject: [PATCH 3/6] UT Fix Signed-off-by: Gaurav Bafna --- .../index/shard/IndexShardTestCase.java | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 7f3819563dcbd..f5af84bb9f128 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -42,6 +42,7 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.Version; import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.PlainActionFuture; @@ -61,6 +62,7 @@ import org.opensearch.common.Nullable; import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.fs.FsBlobContainer; @@ -68,14 +70,14 @@ import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.io.PathUtils; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.lucene.uid.Versions; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.util.BigArrays; -import org.opensearch.common.xcontent.XContentType; import org.opensearch.common.util.io.IOUtils; -import org.opensearch.common.lease.Releasable; +import org.opensearch.common.xcontent.XContentType; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.Index; import org.opensearch.index.IndexSettings; @@ -98,18 +100,18 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; +import org.opensearch.index.store.RemoteBufferedOutputDirectory; import org.opensearch.index.store.RemoteDirectory; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; -import org.opensearch.index.store.RemoteBufferedOutputDirectory; import org.opensearch.index.translog.InternalTranslogFactory; import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; import org.opensearch.index.translog.Translog; -import org.opensearch.indices.IndicesService; import org.opensearch.index.translog.TranslogFactory; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.breaker.HierarchyCircuitBreakerService; import org.opensearch.indices.recovery.AsyncRecoveryTarget; @@ -172,6 +174,8 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; @@ -645,6 +649,17 @@ protected RepositoriesService createRepositoriesService() { when(repository.basePath()).thenReturn(new BlobPath()); BlobStore blobStore = Mockito.mock(BlobStore.class); BlobContainer blobContainer = Mockito.mock(BlobContainer.class); + doAnswer(invocation -> { + LatchedActionListener> listener = invocation.getArgument(3); + listener.onResponse(new ArrayList<>()); + return null; + }).when(blobContainer) + .listBlobsByPrefixInSortedOrder( + any(String.class), + anyInt(), + any(BlobContainer.BlobNameSortOrder.class), + any(ActionListener.class) + ); when(blobStore.blobContainer(any())).thenReturn(blobContainer); when(repository.blobStore()).thenReturn(blobStore); when(repositoriesService.repository(any(String.class))).thenReturn(repository); From b2880b3adbb03b6178c9611fe616f24fa1b6ac0b Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Thu, 6 Jul 2023 10:11:36 +0530 Subject: [PATCH 4/6] PR feedback Signed-off-by: Gaurav Bafna --- .../translog/transfer/TranslogTransferManager.java | 3 +++ .../translog/transfer/TranslogTransferMetadata.java | 3 +-- .../transfer/TranslogTransferManagerTests.java | 12 ++++++++++++ 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 709d62b6e1575..3545a95a157a2 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -371,6 +371,7 @@ public void onResponse(List blobMetadata) { List sortedMetadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()); if (sortedMetadataFiles.size() <= 1) { logger.trace("Remote Metadata file count is {}, so skipping deletion", sortedMetadataFiles.size()); + onCompletion.run(); return; } List metadataFilesToDelete = sortedMetadataFiles.subList(1, sortedMetadataFiles.size()); @@ -381,11 +382,13 @@ public void onResponse(List blobMetadata) { @Override public void onFailure(Exception e) { logger.error("Exception occurred while listing translog metadata files from remote store", e); + onCompletion.run(); } } ); } catch (Exception e) { logger.error("Exception occurred while listing translog metadata files from remote store", e); + onCompletion.run(); } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index b2eacec33256d..75d6549b23f1e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -86,8 +86,7 @@ public String getFileName() { RemoteStoreUtils.invertLong(primaryTerm), RemoteStoreUtils.invertLong(generation), RemoteStoreUtils.invertLong(createdAt), - String.valueOf(primaryTerm), - String.valueOf(generation) + String.valueOf(CURRENT_VERSION) ) ); } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 078eacc8025e4..7016e4e4c8209 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -32,6 +32,8 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -248,6 +250,16 @@ public void testReadMetadataReadException() throws IOException { assertThrows(IOException.class, translogTransferManager::readMetadata); } + public void testMetadataFileNameOrder() throws IOException { + // asserting that new primary followed new generation are lexicographically smallest + String mdFilenameGen1 = new TranslogTransferMetadata(1, 1, 1, 2).getFileName(); + String mdFilenameGen2 = new TranslogTransferMetadata(1, 2, 1, 2).getFileName(); + String mdFilenamePrimary2 = new TranslogTransferMetadata(2, 1, 1, 2).getFileName(); + List metadataFiles = Arrays.asList(mdFilenameGen1, mdFilenameGen2, mdFilenamePrimary2); + Collections.sort(metadataFiles); + assertEquals(Arrays.asList(mdFilenamePrimary2, mdFilenameGen2, mdFilenameGen1), metadataFiles); + } + public void testReadMetadataListException() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, From 107a115ac5f552d2d675dcc7dbed0b9f7a2f126a Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Fri, 7 Jul 2023 09:35:10 +0530 Subject: [PATCH 5/6] PR comments Signed-off-by: Gaurav Bafna --- .../org/opensearch/index/translog/RemoteFsTranslog.java | 4 ++-- .../index/translog/transfer/TranslogTransferManager.java | 8 -------- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 091d6a6009d6c..1e565b97387d1 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -386,7 +386,7 @@ public void trimUnreferencedReaders() throws IOException { if (generationsToDelete.isEmpty() == false) { deleteRemoteGeneration(generationsToDelete); translogTransferManager.deleteStaleTranslogMetadataFilesAsync(remoteGenerationDeletionPermits::release); - deleteStaleRemotePrimaryTermsAndMetadataFiles(); + deleteStaleRemotePrimaryTerms(); } else { remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); } @@ -410,7 +410,7 @@ private void deleteRemoteGeneration(Set generations) { *
* This will also delete all stale translog metadata files from remote except the latest basis the metadata file comparator. */ - private void deleteStaleRemotePrimaryTermsAndMetadataFiles() { + private void deleteStaleRemotePrimaryTerms() { // The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there // are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part // of older primary term. diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 3545a95a157a2..a0d5df13edfc1 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -438,14 +438,6 @@ public void onFailure(Exception e) { } } - /** - * Deletes metadata files asynchronously using the {@code REMOTE_PURGE} threadpool. - * @param metadataFilesToDelete list of metadata files to be deleted. - */ - private void deleteMetadataFilesAsync(List metadataFilesToDelete) { - deleteMetadataFilesAsync(metadataFilesToDelete, () -> {}); - } - /** * Deletes metadata files asynchronously using the {@code REMOTE_PURGE} threadpool. On success or failure, runs {@code onCompletion}. * From c80a390297590ebf4d8010d01fd71ab18d26e38c Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Fri, 7 Jul 2023 18:59:00 +0530 Subject: [PATCH 6/6] PR comments Signed-off-by: Gaurav Bafna --- .../transfer/BlobStoreTransferService.java | 10 +---- .../translog/transfer/TransferService.java | 2 +- .../transfer/TranslogTransferManager.java | 2 +- .../TranslogTransferManagerTests.java | 40 ++++++++++++++++++- 4 files changed, 43 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 4aa6577cbf520..d9feb1a832681 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -133,7 +133,7 @@ public void listFoldersAsync(String threadpoolName, Iterable path, Actio }); } - public void listAllInSortedOrder(Iterable path, int limit, ActionListener> listener) throws IOException { + public void listAllInSortedOrder(Iterable path, int limit, ActionListener> listener) { blobStore.blobContainer((BlobPath) path).listBlobsByPrefixInSortedOrder("", limit, LEXICOGRAPHIC, listener); } @@ -143,13 +143,7 @@ public void listAllInSortedOrderAsync( int limit, ActionListener> listener ) { - threadPool.executor(threadpoolName).execute(() -> { - try { - listAllInSortedOrder(path, limit, listener); - } catch (IOException e) { - listener.onFailure(e); - } - }); + threadPool.executor(threadpoolName).execute(() -> { listAllInSortedOrder(path, limit, listener); }); } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index f20bb82ce259a..0e6496042e3d8 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -107,7 +107,7 @@ void uploadBlobAsync( */ InputStream downloadBlob(Iterable path, String fileName) throws IOException; - void listAllInSortedOrder(Iterable path, int limit, ActionListener> listener) throws IOException; + void listAllInSortedOrder(Iterable path, int limit, ActionListener> listener); void listAllInSortedOrderAsync(String threadpoolName, Iterable path, int limit, ActionListener> listener); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index a0d5df13edfc1..6da0ee5521738 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -209,7 +209,7 @@ public TranslogTransferMetadata readMetadata() throws IOException { try { transferService.listAllInSortedOrder(remoteMetadataTransferPath, 1, latchedActionListener); latch.await(); - } catch (InterruptedException | IOException e) { + } catch (InterruptedException e) { throw new IOException("Exception while reading/downloading metadafile", e); } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 7016e4e4c8209..924a9d039da28 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -201,7 +201,8 @@ public void testReadMetadataNoFile() throws IOException { assertNull(translogTransferManager.readMetadata()); } - public void testReadMetadataHappy() throws IOException { + // This should happen most of the time - Just a single metadata file + public void testReadMetadataSingleFile() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, @@ -391,6 +392,43 @@ public void testDeleteTranslogSuccess() throws Exception { verify(blobContainer).deleteBlobsIgnoringIfNotExists(eq(files)); } + public void testDeleteStaleTranslogMetadata() { + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + shardId, + transferService, + remoteBaseTransferPath, + null + ); + String tm1 = new TranslogTransferMetadata(1, 1, 1, 2).getFileName(); + String tm2 = new TranslogTransferMetadata(1, 2, 1, 2).getFileName(); + String tm3 = new TranslogTransferMetadata(2, 3, 1, 2).getFileName(); + doAnswer(invocation -> { + ActionListener> actionListener = invocation.getArgument(3); + List bmList = new LinkedList<>(); + bmList.add(new PlainBlobMetadata(tm1, 1)); + bmList.add(new PlainBlobMetadata(tm2, 1)); + bmList.add(new PlainBlobMetadata(tm3, 1)); + actionListener.onResponse(bmList); + return null; + }).when(transferService) + .listAllInSortedOrderAsync(eq(ThreadPool.Names.REMOTE_PURGE), any(BlobPath.class), anyInt(), any(ActionListener.class)); + List files = List.of(tm2, tm3); + translogTransferManager.deleteStaleTranslogMetadataFilesAsync(() -> { + verify(transferService).listAllInSortedOrderAsync( + eq(ThreadPool.Names.REMOTE_PURGE), + any(BlobPath.class), + eq(Integer.MAX_VALUE), + any() + ); + verify(transferService).deleteBlobsAsync( + eq(ThreadPool.Names.REMOTE_PURGE), + any(BlobPath.class), + eq(files), + any(ActionListener.class) + ); + }); + } + public void testDeleteTranslogFailure() throws Exception { FileTransferTracker tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0)); BlobStore blobStore = mock(BlobStore.class);