From 50fc63d70e57ba5bb2be5c68ff18717cfb705c55 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Date: Fri, 7 Jul 2023 22:55:29 +0530 Subject: [PATCH] [Remote Store] Update Translog Metadata file name (#8350) --------- Signed-off-by: Gaurav Bafna --- .../index/translog/RemoteFsTranslog.java | 7 +- .../transfer/BlobStoreTransferService.java | 28 +++-- .../translog/transfer/TransferService.java | 13 +- .../transfer/TranslogTransferManager.java | 111 ++++++++++------- .../transfer/TranslogTransferMetadata.java | 38 +++--- .../index/translog/RemoteFSTranslogTests.java | 14 +-- .../TranslogTransferManagerTests.java | 113 ++++++++++++++---- .../index/shard/IndexShardTestCase.java | 23 +++- 8 files changed, 221 insertions(+), 126 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 04057b581e8d9..1e565b97387d1 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -385,7 +385,8 @@ public void trimUnreferencedReaders() throws IOException { } if (generationsToDelete.isEmpty() == false) { deleteRemoteGeneration(generationsToDelete); - deleteStaleRemotePrimaryTermsAndMetadataFiles(); + translogTransferManager.deleteStaleTranslogMetadataFilesAsync(remoteGenerationDeletionPermits::release); + deleteStaleRemotePrimaryTerms(); } else { remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); } @@ -409,7 +410,7 @@ private void deleteRemoteGeneration(Set generations) { *
* This will also delete all stale translog metadata files from remote except the latest basis the metadata file comparator. */ - private void deleteStaleRemotePrimaryTermsAndMetadataFiles() { + private void deleteStaleRemotePrimaryTerms() { // The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there // are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part // of older primary term. @@ -418,8 +419,6 @@ private void deleteStaleRemotePrimaryTermsAndMetadataFiles() { assert readers.isEmpty() == false : "Expected non-empty readers"; long minimumReferencedPrimaryTerm = readers.stream().map(BaseTranslogReader::getPrimaryTerm).min(Long::compare).get(); translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm); - // Second we delete all stale metadata files from remote store - translogTransferManager.deleteStaleTranslogMetadataFilesAsync(); } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 88fe816ccb462..d9feb1a832681 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; +import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; @@ -23,6 +24,8 @@ import java.util.List; import java.util.Set; +import static org.opensearch.common.blobstore.BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC; + /** * Service that handles remote transfer of translog and checkpoint files * @@ -114,17 +117,6 @@ public Set listAll(Iterable path) throws IOException { return blobStore.blobContainer((BlobPath) path).listBlobs().keySet(); } - @Override - public void listAllAsync(String threadpoolName, Iterable path, ActionListener> listener) { - threadPool.executor(threadpoolName).execute(() -> { - try { - listener.onResponse(listAll(path)); - } catch (IOException e) { - listener.onFailure(e); - } - }); - } - @Override public Set listFolders(Iterable path) throws IOException { return blobStore.blobContainer((BlobPath) path).children().keySet(); @@ -140,4 +132,18 @@ public void listFoldersAsync(String threadpoolName, Iterable path, Actio } }); } + + public void listAllInSortedOrder(Iterable path, int limit, ActionListener> listener) { + blobStore.blobContainer((BlobPath) path).listBlobsByPrefixInSortedOrder("", limit, LEXICOGRAPHIC, listener); + } + + public void listAllInSortedOrderAsync( + String threadpoolName, + Iterable path, + int limit, + ActionListener> listener + ) { + threadPool.executor(threadpoolName).execute(() -> { listAllInSortedOrder(path, limit, listener); }); + } + } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 6aca3055a3f53..0e6496042e3d8 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -9,6 +9,7 @@ package org.opensearch.index.translog.transfer; import org.opensearch.action.ActionListener; +import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import java.io.IOException; @@ -80,14 +81,6 @@ void uploadBlobAsync( */ Set listAll(Iterable path) throws IOException; - /** - * Lists the files and invokes the listener on success or failure - * @param threadpoolName threadpool type which will be used to list all files asynchronously. - * @param path the path to list - * @param listener the callback to be invoked once list operation completes successfully/fails. - */ - void listAllAsync(String threadpoolName, Iterable path, ActionListener> listener); - /** * Lists the folders inside the path. * @param path : the path @@ -114,4 +107,8 @@ void uploadBlobAsync( */ InputStream downloadBlob(Iterable path, String fileName) throws IOException; + void listAllInSortedOrder(Iterable path, int limit, ActionListener> listener); + + void listAllInSortedOrderAsync(String threadpoolName, Iterable path, int limit, ActionListener> listener); + } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index f6405bc9b5c82..6da0ee5521738 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -15,6 +15,8 @@ import org.apache.lucene.store.OutputStreamIndexOutput; import org.opensearch.action.ActionListener; import org.opensearch.action.LatchedActionListener; +import org.opensearch.common.SetOnce; +import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.VersionedCodecStreamWrapper; @@ -42,8 +44,6 @@ import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; -import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR; -import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.getFileName; /** * The class responsible for orchestrating the transfer of a {@link TransferSnapshot} via a {@link TransferService} @@ -185,15 +185,39 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th } public TranslogTransferMetadata readMetadata() throws IOException { - return transferService.listAll(remoteMetadataTransferPath).stream().max(METADATA_FILENAME_COMPARATOR).map(filename -> { - try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) { - IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes()); - return metadataStreamWrapper.readStream(indexInput); - } catch (IOException e) { - logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e); - return null; - } - }).orElse(null); + SetOnce metadataSetOnce = new SetOnce<>(); + SetOnce exceptionSetOnce = new SetOnce<>(); + final CountDownLatch latch = new CountDownLatch(1); + LatchedActionListener> latchedActionListener = new LatchedActionListener<>( + ActionListener.wrap(blobMetadataList -> { + if (blobMetadataList.isEmpty()) return; + String filename = blobMetadataList.get(0).name(); + try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename)) { + IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes()); + metadataSetOnce.set(metadataStreamWrapper.readStream(indexInput)); + } catch (IOException e) { + logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e); + exceptionSetOnce.set(e); + } + }, e -> { + logger.error(() -> new ParameterizedMessage("Exception while listing metadata files "), e); + exceptionSetOnce.set((IOException) e); + }), + latch + ); + + try { + transferService.listAllInSortedOrder(remoteMetadataTransferPath, 1, latchedActionListener); + latch.await(); + } catch (InterruptedException e) { + throw new IOException("Exception while reading/downloading metadafile", e); + } + + if (exceptionSetOnce.get() != null) { + throw exceptionSetOnce.get(); + } + + return metadataSetOnce.get(); } private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) throws IOException { @@ -211,7 +235,7 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap)); return new TransferFileSnapshot( - getFileName(translogTransferMetadata.getPrimaryTerm(), translogTransferMetadata.getGeneration()), + translogTransferMetadata.getFileName(), getMetadataBytes(translogTransferMetadata), translogTransferMetadata.getPrimaryTerm() ); @@ -230,7 +254,7 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep try ( OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( "translog transfer metadata " + metadata.getPrimaryTerm(), - getFileName(metadata.getPrimaryTerm(), metadata.getGeneration()), + metadata.getFileName(), output, TranslogTransferMetadata.BUFFER_SIZE ) @@ -253,20 +277,14 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep */ public void deleteGenerationAsync(long primaryTerm, Set generations, Runnable onCompletion) { List translogFiles = new ArrayList<>(); - List metadataFiles = new ArrayList<>(); generations.forEach(generation -> { // Add .ckp and .tlog file to translog file list which is located in basePath/ String ckpFileName = Translog.getCommitCheckpointFileName(generation); String translogFileName = Translog.getFilename(generation); translogFiles.addAll(List.of(ckpFileName, translogFileName)); - // Add metadata file tio metadata file list which is located in basePath/metadata - String metadataFileName = TranslogTransferMetadata.getFileName(primaryTerm, generation); - metadataFiles.add(metadataFileName); }); // Delete the translog and checkpoint files asynchronously deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion); - // Delete the metadata files asynchronously - deleteMetadataFilesAsync(metadataFiles, onCompletion); } /** @@ -341,24 +359,37 @@ public void onFailure(Exception e) { }); } - public void deleteStaleTranslogMetadataFilesAsync() { - transferService.listAllAsync(ThreadPool.Names.REMOTE_PURGE, remoteMetadataTransferPath, new ActionListener<>() { - @Override - public void onResponse(Set metadataFiles) { - List sortedMetadataFiles = metadataFiles.stream().sorted(METADATA_FILENAME_COMPARATOR).collect(Collectors.toList()); - if (sortedMetadataFiles.size() <= 1) { - logger.trace("Remote Metadata file count is {}, so skipping deletion", sortedMetadataFiles.size()); - return; - } - List metadataFilesToDelete = sortedMetadataFiles.subList(0, sortedMetadataFiles.size() - 1); - deleteMetadataFilesAsync(metadataFilesToDelete); - } + public void deleteStaleTranslogMetadataFilesAsync(Runnable onCompletion) { + try { + transferService.listAllInSortedOrderAsync( + ThreadPool.Names.REMOTE_PURGE, + remoteMetadataTransferPath, + Integer.MAX_VALUE, + new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + List sortedMetadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()); + if (sortedMetadataFiles.size() <= 1) { + logger.trace("Remote Metadata file count is {}, so skipping deletion", sortedMetadataFiles.size()); + onCompletion.run(); + return; + } + List metadataFilesToDelete = sortedMetadataFiles.subList(1, sortedMetadataFiles.size()); + logger.trace("Deleting remote translog metadata files {}", metadataFilesToDelete); + deleteMetadataFilesAsync(metadataFilesToDelete, onCompletion); + } - @Override - public void onFailure(Exception e) { - logger.error("Exception occurred while listing translog metadata files from remote store", e); - } - }); + @Override + public void onFailure(Exception e) { + logger.error("Exception occurred while listing translog metadata files from remote store", e); + onCompletion.run(); + } + } + ); + } catch (Exception e) { + logger.error("Exception occurred while listing translog metadata files from remote store", e); + onCompletion.run(); + } } public void deleteTranslogFiles() throws IOException { @@ -407,14 +438,6 @@ public void onFailure(Exception e) { } } - /** - * Deletes metadata files asynchronously using the {@code REMOTE_PURGE} threadpool. - * @param metadataFilesToDelete list of metadata files to be deleted. - */ - private void deleteMetadataFilesAsync(List metadataFilesToDelete) { - deleteMetadataFilesAsync(metadataFilesToDelete, () -> {}); - } - /** * Deletes metadata files asynchronously using the {@code REMOTE_PURGE} threadpool. On success or failure, runs {@code onCompletion}. * diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 7a2fee9a69d5e..75d6549b23f1e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -9,9 +9,9 @@ package org.opensearch.index.translog.transfer; import org.opensearch.common.SetOnce; +import org.opensearch.index.remote.RemoteStoreUtils; import java.util.Arrays; -import java.util.Comparator; import java.util.Map; import java.util.Objects; @@ -42,13 +42,14 @@ public class TranslogTransferMetadata { static final String METADATA_CODEC = "md"; - public static final Comparator METADATA_FILENAME_COMPARATOR = new MetadataFilenameComparator(); + private final long createdAt; public TranslogTransferMetadata(long primaryTerm, long generation, long minTranslogGeneration, int count) { this.primaryTerm = primaryTerm; this.generation = generation; this.minTranslogGeneration = minTranslogGeneration; this.count = count; + this.createdAt = System.currentTimeMillis(); } public long getPrimaryTerm() { @@ -75,8 +76,19 @@ public Map getGenerationToPrimaryTermMapper() { return generationToPrimaryTermMapper.get(); } - public static String getFileName(long primaryTerm, long generation) { - return String.join(METADATA_SEPARATOR, Arrays.asList(String.valueOf(primaryTerm), String.valueOf(generation))); + /* + This should be used only at the time of creation. + */ + public String getFileName() { + return String.join( + METADATA_SEPARATOR, + Arrays.asList( + RemoteStoreUtils.invertLong(primaryTerm), + RemoteStoreUtils.invertLong(generation), + RemoteStoreUtils.invertLong(createdAt), + String.valueOf(CURRENT_VERSION) + ) + ); } @Override @@ -91,22 +103,4 @@ public boolean equals(Object o) { TranslogTransferMetadata other = (TranslogTransferMetadata) o; return Objects.equals(this.primaryTerm, other.primaryTerm) && Objects.equals(this.generation, other.generation); } - - private static class MetadataFilenameComparator implements Comparator { - @Override - public int compare(String first, String second) { - // Format of metadata filename is __ - String[] filenameTokens1 = first.split(METADATA_SEPARATOR); - String[] filenameTokens2 = second.split(METADATA_SEPARATOR); - // Here, we are comparing only primary term and generation. - for (int i = 0; i < filenameTokens1.length; i++) { - if (filenameTokens1[i].equals(filenameTokens2[i]) == false) { - return Long.compare(Long.parseLong(filenameTokens1[i]), Long.parseLong(filenameTokens2[i])); - } - } - throw new IllegalArgumentException( - "TranslogTransferMetadata files " + first + " and " + second + " have same primary term and generation" - ); - } - } } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index d963830e9e736..30c04c731d1f8 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -570,7 +570,7 @@ public void testMetadataFileDeletion() throws Exception { assertEquals(1, translog.readers.size()); } assertBusy(() -> assertEquals(4, translog.allUploaded().size())); - assertBusy(() -> assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); + assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); int moreDocs = randomIntBetween(3, 10); logger.info("numDocs={} moreDocs={}", numDocs, moreDocs); for (int i = numDocs; i < numDocs + moreDocs; i++) { @@ -579,7 +579,7 @@ public void testMetadataFileDeletion() throws Exception { translog.trimUnreferencedReaders(); assertEquals(1 + moreDocs, translog.readers.size()); assertBusy(() -> assertEquals(2 + 2L * moreDocs, translog.allUploaded().size())); - assertBusy(() -> assertEquals(1 + moreDocs, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); + assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); int totalDocs = numDocs + moreDocs; translog.setMinSeqNoToKeep(totalDocs - 1); @@ -592,7 +592,7 @@ public void testMetadataFileDeletion() throws Exception { ); translog.setMinSeqNoToKeep(totalDocs); translog.trimUnreferencedReaders(); - assertBusy(() -> assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); + assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); // Change primary term and test the deletion of older primaries String translogUUID = translog.translogUUID; @@ -607,10 +607,6 @@ public void testMetadataFileDeletion() throws Exception { long oldPrimaryTerm = primaryTerm.get(); long newPrimaryTerm = primaryTerm.incrementAndGet(); - // Check all metadata files corresponds to old primary term - Set mdFileNames = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)); - assertTrue(mdFileNames.stream().allMatch(name -> name.startsWith(String.valueOf(oldPrimaryTerm).concat("__")))); - // Creating RemoteFsTranslog with the same location Translog newTranslog = create(translogDir, repository, translogUUID); int newPrimaryTermDocs = randomIntBetween(5, 10); @@ -621,10 +617,6 @@ public void testMetadataFileDeletion() throws Exception { newTranslog.trimUnreferencedReaders(); } - // Check that all metadata files are belonging now to the new primary - mdFileNames = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)); - assertTrue(mdFileNames.stream().allMatch(name -> name.startsWith(String.valueOf(newPrimaryTerm).concat("__")))); - try { newTranslog.close(); } catch (Exception e) { diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 6f6b3622295b6..924a9d039da28 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -11,10 +11,12 @@ import org.apache.lucene.tests.util.LuceneTestCase; import org.mockito.Mockito; import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; -import org.opensearch.common.util.set.Sets; +import org.opensearch.common.blobstore.support.PlainBlobMetadata; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; @@ -31,10 +33,13 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -186,11 +191,17 @@ public void testReadMetadataNoFile() throws IOException { remoteBaseTransferPath, null ); + doAnswer(invocation -> { + LatchedActionListener> latchedActionListener = invocation.getArgument(2); + List bmList = new LinkedList<>(); + latchedActionListener.onResponse(bmList); + return null; + }).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); - when(transferService.listAll(remoteBaseTransferPath)).thenReturn(Sets.newHashSet()); assertNull(translogTransferManager.readMetadata()); } + // This should happen most of the time - Just a single metadata file public void testReadMetadataSingleFile() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, @@ -198,19 +209,25 @@ public void testReadMetadataSingleFile() throws IOException { remoteBaseTransferPath, null ); - - // BlobPath does not have equals method, so we can't use the instance directly in when - when(transferService.listAll(any(BlobPath.class))).thenReturn(Sets.newHashSet("12__234")); + TranslogTransferMetadata tm = new TranslogTransferMetadata(1, 1, 1, 2); + String mdFilename = tm.getFileName(); + doAnswer(invocation -> { + LatchedActionListener> latchedActionListener = invocation.getArgument(2); + List bmList = new LinkedList<>(); + bmList.add(new PlainBlobMetadata(mdFilename, 1)); + latchedActionListener.onResponse(bmList); + return null; + }).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata(); - when(transferService.downloadBlob(any(BlobPath.class), eq("12__234"))).thenReturn( + when(transferService.downloadBlob(any(BlobPath.class), eq(mdFilename))).thenReturn( new ByteArrayInputStream(translogTransferManager.getMetadataBytes(metadata)) ); assertEquals(metadata, translogTransferManager.readMetadata()); } - public void testReadMetadataMultipleFiles() throws IOException { + public void testReadMetadataReadException() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, @@ -218,17 +235,33 @@ public void testReadMetadataMultipleFiles() throws IOException { null ); - when(transferService.listAll(any(BlobPath.class))).thenReturn(Sets.newHashSet("12__234", "12__235", "12__233")); + TranslogTransferMetadata tm = new TranslogTransferMetadata(1, 1, 1, 2); + String mdFilename = tm.getFileName(); - TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata(); - when(transferService.downloadBlob(any(BlobPath.class), eq("12__235"))).thenReturn( - new ByteArrayInputStream(translogTransferManager.getMetadataBytes(metadata)) - ); + doAnswer(invocation -> { + LatchedActionListener> latchedActionListener = invocation.getArgument(2); + List bmList = new LinkedList<>(); + bmList.add(new PlainBlobMetadata(mdFilename, 1)); + latchedActionListener.onResponse(bmList); + return null; + }).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); - assertEquals(metadata, translogTransferManager.readMetadata()); + when(transferService.downloadBlob(any(BlobPath.class), eq(mdFilename))).thenThrow(new IOException("Something went wrong")); + + assertThrows(IOException.class, translogTransferManager::readMetadata); + } + + public void testMetadataFileNameOrder() throws IOException { + // asserting that new primary followed new generation are lexicographically smallest + String mdFilenameGen1 = new TranslogTransferMetadata(1, 1, 1, 2).getFileName(); + String mdFilenameGen2 = new TranslogTransferMetadata(1, 2, 1, 2).getFileName(); + String mdFilenamePrimary2 = new TranslogTransferMetadata(2, 1, 1, 2).getFileName(); + List metadataFiles = Arrays.asList(mdFilenameGen1, mdFilenameGen2, mdFilenamePrimary2); + Collections.sort(metadataFiles); + assertEquals(Arrays.asList(mdFilenamePrimary2, mdFilenameGen2, mdFilenameGen1), metadataFiles); } - public void testReadMetadataException() throws IOException { + public void testReadMetadataListException() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( shardId, transferService, @@ -236,16 +269,15 @@ public void testReadMetadataException() throws IOException { null ); - when(transferService.listAll(any(BlobPath.class))).thenReturn(Sets.newHashSet("12__234", "12__235", "12__233")); + doAnswer(invocation -> { + LatchedActionListener> latchedActionListener = invocation.getArgument(2); + latchedActionListener.onFailure(new IOException("Issue while listing")); + return null; + }).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); - when(transferService.downloadBlob(any(BlobPath.class), eq("12__235"))).thenThrow(new IOException("Something went wrong")); + when(transferService.downloadBlob(any(BlobPath.class), any(String.class))).thenThrow(new IOException("Something went wrong")); - assertNull(translogTransferManager.readMetadata()); - } - - public void testReadMetadataSamePrimaryTermGeneration() throws IOException { - List metadataFiles = Arrays.asList("12__234", "12__235", "12__234"); - assertThrows(IllegalArgumentException.class, () -> metadataFiles.sort(TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR)); + assertThrows(IOException.class, translogTransferManager::readMetadata); } public void testDownloadTranslog() throws IOException { @@ -360,6 +392,43 @@ public void testDeleteTranslogSuccess() throws Exception { verify(blobContainer).deleteBlobsIgnoringIfNotExists(eq(files)); } + public void testDeleteStaleTranslogMetadata() { + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + shardId, + transferService, + remoteBaseTransferPath, + null + ); + String tm1 = new TranslogTransferMetadata(1, 1, 1, 2).getFileName(); + String tm2 = new TranslogTransferMetadata(1, 2, 1, 2).getFileName(); + String tm3 = new TranslogTransferMetadata(2, 3, 1, 2).getFileName(); + doAnswer(invocation -> { + ActionListener> actionListener = invocation.getArgument(3); + List bmList = new LinkedList<>(); + bmList.add(new PlainBlobMetadata(tm1, 1)); + bmList.add(new PlainBlobMetadata(tm2, 1)); + bmList.add(new PlainBlobMetadata(tm3, 1)); + actionListener.onResponse(bmList); + return null; + }).when(transferService) + .listAllInSortedOrderAsync(eq(ThreadPool.Names.REMOTE_PURGE), any(BlobPath.class), anyInt(), any(ActionListener.class)); + List files = List.of(tm2, tm3); + translogTransferManager.deleteStaleTranslogMetadataFilesAsync(() -> { + verify(transferService).listAllInSortedOrderAsync( + eq(ThreadPool.Names.REMOTE_PURGE), + any(BlobPath.class), + eq(Integer.MAX_VALUE), + any() + ); + verify(transferService).deleteBlobsAsync( + eq(ThreadPool.Names.REMOTE_PURGE), + any(BlobPath.class), + eq(files), + any(ActionListener.class) + ); + }); + } + public void testDeleteTranslogFailure() throws Exception { FileTransferTracker tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0)); BlobStore blobStore = mock(BlobStore.class); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 7f3819563dcbd..f5af84bb9f128 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -42,6 +42,7 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.Version; import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.PlainActionFuture; @@ -61,6 +62,7 @@ import org.opensearch.common.Nullable; import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.fs.FsBlobContainer; @@ -68,14 +70,14 @@ import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.io.PathUtils; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.lucene.uid.Versions; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.util.BigArrays; -import org.opensearch.common.xcontent.XContentType; import org.opensearch.common.util.io.IOUtils; -import org.opensearch.common.lease.Releasable; +import org.opensearch.common.xcontent.XContentType; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.Index; import org.opensearch.index.IndexSettings; @@ -98,18 +100,18 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; +import org.opensearch.index.store.RemoteBufferedOutputDirectory; import org.opensearch.index.store.RemoteDirectory; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; -import org.opensearch.index.store.RemoteBufferedOutputDirectory; import org.opensearch.index.translog.InternalTranslogFactory; import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; import org.opensearch.index.translog.Translog; -import org.opensearch.indices.IndicesService; import org.opensearch.index.translog.TranslogFactory; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.breaker.HierarchyCircuitBreakerService; import org.opensearch.indices.recovery.AsyncRecoveryTarget; @@ -172,6 +174,8 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; @@ -645,6 +649,17 @@ protected RepositoriesService createRepositoriesService() { when(repository.basePath()).thenReturn(new BlobPath()); BlobStore blobStore = Mockito.mock(BlobStore.class); BlobContainer blobContainer = Mockito.mock(BlobContainer.class); + doAnswer(invocation -> { + LatchedActionListener> listener = invocation.getArgument(3); + listener.onResponse(new ArrayList<>()); + return null; + }).when(blobContainer) + .listBlobsByPrefixInSortedOrder( + any(String.class), + anyInt(), + any(BlobContainer.BlobNameSortOrder.class), + any(ActionListener.class) + ); when(blobStore.blobContainer(any())).thenReturn(blobContainer); when(repository.blobStore()).thenReturn(blobStore); when(repositoriesService.repository(any(String.class))).thenReturn(repository);