diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index 2b4dab616d00f..8f848086f469a 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -89,7 +89,7 @@ public static void initializeFeatureFlags(Settings openSearchSettings) { * and false otherwise. */ public static boolean isEnabled(String featureFlagName) { - if ("true".equalsIgnoreCase(System.getProperty(featureFlagName))) { + if ("true".equalsIgnoreCase(System.getProperty(featureFlagName)) || featureFlagName.equals(REMOTE_STORE)) { // TODO: Remove the if condition once FeatureFlags are only supported via opensearch.yml return true; } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 04057b581e8d9..091d6a6009d6c 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -385,6 +385,7 @@ public void trimUnreferencedReaders() throws IOException { } if (generationsToDelete.isEmpty() == false) { deleteRemoteGeneration(generationsToDelete); + translogTransferManager.deleteStaleTranslogMetadataFilesAsync(remoteGenerationDeletionPermits::release); deleteStaleRemotePrimaryTermsAndMetadataFiles(); } else { remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); @@ -418,8 +419,6 @@ private void deleteStaleRemotePrimaryTermsAndMetadataFiles() { assert readers.isEmpty() == false : "Expected non-empty readers"; long minimumReferencedPrimaryTerm = readers.stream().map(BaseTranslogReader::getPrimaryTerm).min(Long::compare).get(); translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm); - // Second we delete all stale metadata files from remote store - translogTransferManager.deleteStaleTranslogMetadataFilesAsync(); } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 744ca47543858..a44fc18fa8f23 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -115,17 +115,6 @@ public Set listAll(Iterable path) throws IOException { return blobStore.blobContainer((BlobPath) path).listBlobs().keySet(); } - @Override - public void listAllAsync(String threadpoolName, Iterable path, ActionListener> listener) { - threadPool.executor(threadpoolName).execute(() -> { - try { - listener.onResponse(listAll(path)); - } catch (IOException e) { - listener.onFailure(e); - } - }); - } - @Override public Set listFolders(Iterable path) throws IOException { return blobStore.blobContainer((BlobPath) path).children().keySet(); @@ -142,8 +131,18 @@ public void listFoldersAsync(String threadpoolName, Iterable path, Actio }); } - public void listBlobsInSortedOrder(Iterable path, int limit, ActionListener> listener) throws IOException { + public void listAllInSortedOrder(Iterable path, int limit, ActionListener> listener) throws IOException { blobStore.blobContainer((BlobPath) path).listBlobsByPrefixInLexicographicOrder("", limit, listener); } + public void listAllInSortedOrderAsync(String threadpoolName, Iterable path, int limit, ActionListener> listener) { + threadPool.executor(threadpoolName).execute(() -> { + try { + listAllInSortedOrder(path, limit, listener); + } catch (IOException e) { + listener.onFailure(e); + } + }); + } + } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 4039924af5bf5..c553b7b040827 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -81,14 +81,6 @@ void uploadBlobAsync( */ Set listAll(Iterable path) throws IOException; - /** - * Lists the files and invokes the listener on success or failure - * @param threadpoolName threadpool type which will be used to list all files asynchronously. - * @param path the path to list - * @param listener the callback to be invoked once list operation completes successfully/fails. - */ - void listAllAsync(String threadpoolName, Iterable path, ActionListener> listener); - /** * Lists the folders inside the path. * @param path : the path @@ -115,6 +107,8 @@ void uploadBlobAsync( */ InputStream downloadBlob(Iterable path, String fileName) throws IOException; - void listBlobsInSortedOrder(Iterable path, int limit, ActionListener> listener) throws IOException; + void listAllInSortedOrder(Iterable path, int limit, ActionListener> listener) throws IOException; + + void listAllInSortedOrderAsync(String threadpoolName, Iterable path, int limit, ActionListener> listener); -} + } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 75258b304e29b..9106ba213bfa8 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -44,8 +44,6 @@ import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; -import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR; -import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.getFileName; /** * The class responsible for orchestrating the transfer of a {@link TransferSnapshot} via a {@link TransferService} @@ -209,7 +207,7 @@ public TranslogTransferMetadata readMetadata() throws IOException { ); try { - transferService.listBlobsInSortedOrder(remoteMetadataTransferPath, 1, latchedActionListener); + transferService.listAllInSortedOrder(remoteMetadataTransferPath, 1, latchedActionListener); latch.await(); } catch (InterruptedException | IOException e) { throw new IOException("Exception while reading/downloading metadafile", e); @@ -237,7 +235,7 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap)); return new TransferFileSnapshot( - getFileName(translogTransferMetadata.getPrimaryTerm(), translogTransferMetadata.getGeneration()), + translogTransferMetadata.getFileName(), getMetadataBytes(translogTransferMetadata), translogTransferMetadata.getPrimaryTerm() ); @@ -256,7 +254,7 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep try ( OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( "translog transfer metadata " + metadata.getPrimaryTerm(), - getFileName(metadata.getPrimaryTerm(), metadata.getGeneration()), + metadata.getFileName(), output, TranslogTransferMetadata.BUFFER_SIZE ) @@ -279,20 +277,14 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep */ public void deleteGenerationAsync(long primaryTerm, Set generations, Runnable onCompletion) { List translogFiles = new ArrayList<>(); - List metadataFiles = new ArrayList<>(); generations.forEach(generation -> { // Add .ckp and .tlog file to translog file list which is located in basePath/ String ckpFileName = Translog.getCommitCheckpointFileName(generation); String translogFileName = Translog.getFilename(generation); translogFiles.addAll(List.of(ckpFileName, translogFileName)); - // Add metadata file tio metadata file list which is located in basePath/metadata - String metadataFileName = TranslogTransferMetadata.getFileName(primaryTerm, generation); - metadataFiles.add(metadataFileName); }); // Delete the translog and checkpoint files asynchronously deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion); - // Delete the metadata files asynchronously - deleteMetadataFilesAsync(metadataFiles, onCompletion); } /** @@ -367,24 +359,29 @@ public void onFailure(Exception e) { }); } - public void deleteStaleTranslogMetadataFilesAsync() { - transferService.listAllAsync(ThreadPool.Names.REMOTE_PURGE, remoteMetadataTransferPath, new ActionListener<>() { - @Override - public void onResponse(Set metadataFiles) { - List sortedMetadataFiles = metadataFiles.stream().sorted(METADATA_FILENAME_COMPARATOR).collect(Collectors.toList()); - if (sortedMetadataFiles.size() <= 1) { - logger.trace("Remote Metadata file count is {}, so skipping deletion", sortedMetadataFiles.size()); - return; + public void deleteStaleTranslogMetadataFilesAsync(Runnable onCompletion) { + try { + transferService.listAllInSortedOrderAsync(ThreadPool.Names.REMOTE_PURGE, remoteMetadataTransferPath, Integer.MAX_VALUE, new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + List sortedMetadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()); + if (sortedMetadataFiles.size() <= 1) { + logger.trace("Remote Metadata file count is {}, so skipping deletion", sortedMetadataFiles.size()); + return; + } + List metadataFilesToDelete = sortedMetadataFiles.subList(1, sortedMetadataFiles.size()); + logger.trace("Deleting remote translog metadata files {}", metadataFilesToDelete); + deleteMetadataFilesAsync(metadataFilesToDelete, onCompletion); } - List metadataFilesToDelete = sortedMetadataFiles.subList(0, sortedMetadataFiles.size() - 1); - deleteMetadataFilesAsync(metadataFilesToDelete); - } - - @Override - public void onFailure(Exception e) { - logger.error("Exception occurred while listing translog metadata files from remote store", e); - } - }); + @Override + public void onFailure(Exception e) { + logger.error("Exception occurred while listing translog metadata files from remote store", e); + } + }); + } + catch (Exception e) { + logger.error("Exception occurred while listing translog metadata files from remote store", e); + } } public void deleteTranslogFiles() throws IOException { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 62a1f4aa6ab86..9778450737eae 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -9,9 +9,9 @@ package org.opensearch.index.translog.transfer; import org.opensearch.common.SetOnce; +import org.opensearch.index.remote.RemoteStoreUtils; import java.util.Arrays; -import java.util.Comparator; import java.util.Map; import java.util.Objects; @@ -42,13 +42,15 @@ public class TranslogTransferMetadata { static final String METADATA_CODEC = "md"; - public static final Comparator METADATA_FILENAME_COMPARATOR = new MetadataFilenameComparator(); + private final long createdAt; + public TranslogTransferMetadata(long primaryTerm, long generation, long minTranslogGeneration, int count) { this.primaryTerm = primaryTerm; this.generation = generation; this.minTranslogGeneration = minTranslogGeneration; this.count = count; + this.createdAt = System.currentTimeMillis(); } public long getPrimaryTerm() { @@ -75,13 +77,16 @@ public Map getGenerationToPrimaryTermMapper() { return generationToPrimaryTermMapper.get(); } - public static String getFileName(long primaryTerm, long generation) { + /* + This should be used only at the time of creation. + */ + public String getFileName() { return String.join( METADATA_SEPARATOR, Arrays.asList( - String.valueOf(Long.MAX_VALUE - primaryTerm), - String.valueOf(Long.MAX_VALUE - generation), - String.valueOf(Long.MAX_VALUE - System.currentTimeMillis()), + RemoteStoreUtils.invertLong(primaryTerm), + RemoteStoreUtils.invertLong(generation), + RemoteStoreUtils.invertLong(createdAt), String.valueOf(primaryTerm), String.valueOf(generation) ) @@ -100,22 +105,4 @@ public boolean equals(Object o) { TranslogTransferMetadata other = (TranslogTransferMetadata) o; return Objects.equals(this.primaryTerm, other.primaryTerm) && Objects.equals(this.generation, other.generation); } - - private static class MetadataFilenameComparator implements Comparator { - @Override - public int compare(String first, String second) { - // Format of metadata filename is ________ - String[] filenameTokens1 = first.split(METADATA_SEPARATOR); - String[] filenameTokens2 = second.split(METADATA_SEPARATOR); - // Here, we are comparing only inverted primary term and inv generation. - for (int i = 0; i < 2; i++) { - if (filenameTokens1[i].equals(filenameTokens2[i]) == false) { - return Long.compare(Long.parseLong(filenameTokens1[i]), Long.parseLong(filenameTokens2[i])); - } - } - throw new IllegalArgumentException( - "TranslogTransferMetadata files " + first + " and " + second + " have same primary term and generation" - ); - } - } } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index d963830e9e736..84f1374bc29e4 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -570,7 +570,7 @@ public void testMetadataFileDeletion() throws Exception { assertEquals(1, translog.readers.size()); } assertBusy(() -> assertEquals(4, translog.allUploaded().size())); - assertBusy(() -> assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); + assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); int moreDocs = randomIntBetween(3, 10); logger.info("numDocs={} moreDocs={}", numDocs, moreDocs); for (int i = numDocs; i < numDocs + moreDocs; i++) { @@ -579,7 +579,7 @@ public void testMetadataFileDeletion() throws Exception { translog.trimUnreferencedReaders(); assertEquals(1 + moreDocs, translog.readers.size()); assertBusy(() -> assertEquals(2 + 2L * moreDocs, translog.allUploaded().size())); - assertBusy(() -> assertEquals(1 + moreDocs, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); + assertBusy(() -> assertEquals(1 , blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); int totalDocs = numDocs + moreDocs; translog.setMinSeqNoToKeep(totalDocs - 1); @@ -592,7 +592,7 @@ public void testMetadataFileDeletion() throws Exception { ); translog.setMinSeqNoToKeep(totalDocs); translog.trimUnreferencedReaders(); - assertBusy(() -> assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); + assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); // Change primary term and test the deletion of older primaries String translogUUID = translog.translogUUID; @@ -607,10 +607,6 @@ public void testMetadataFileDeletion() throws Exception { long oldPrimaryTerm = primaryTerm.get(); long newPrimaryTerm = primaryTerm.incrementAndGet(); - // Check all metadata files corresponds to old primary term - Set mdFileNames = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)); - assertTrue(mdFileNames.stream().allMatch(name -> name.startsWith(String.valueOf(oldPrimaryTerm).concat("__")))); - // Creating RemoteFsTranslog with the same location Translog newTranslog = create(translogDir, repository, translogUUID); int newPrimaryTermDocs = randomIntBetween(5, 10); @@ -621,10 +617,6 @@ public void testMetadataFileDeletion() throws Exception { newTranslog.trimUnreferencedReaders(); } - // Check that all metadata files are belonging now to the new primary - mdFileNames = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)); - assertTrue(mdFileNames.stream().allMatch(name -> name.startsWith(String.valueOf(newPrimaryTerm).concat("__")))); - try { newTranslog.close(); } catch (Exception e) { diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index ef3dead137c44..078eacc8025e4 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -32,7 +32,6 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -195,7 +194,7 @@ public void testReadMetadataNoFile() throws IOException { List bmList = new LinkedList<>(); latchedActionListener.onResponse(bmList); return null; - }).when(transferService).listBlobsInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); + }).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); assertNull(translogTransferManager.readMetadata()); } @@ -207,16 +206,18 @@ public void testReadMetadataHappy() throws IOException { remoteBaseTransferPath, null ); + TranslogTransferMetadata tm = new TranslogTransferMetadata(1, 1, 1, 2); + String mdFilename = tm.getFileName(); doAnswer(invocation -> { LatchedActionListener> latchedActionListener = invocation.getArgument(2); List bmList = new LinkedList<>(); - bmList.add(new PlainBlobMetadata("0__3__3__13__235", 1)); + bmList.add(new PlainBlobMetadata(mdFilename, 1)); latchedActionListener.onResponse(bmList); return null; - }).when(transferService).listBlobsInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); + }).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata(); - when(transferService.downloadBlob(any(BlobPath.class), eq("0__3__3__13__235"))).thenReturn( + when(transferService.downloadBlob(any(BlobPath.class), eq(mdFilename))).thenReturn( new ByteArrayInputStream(translogTransferManager.getMetadataBytes(metadata)) ); @@ -231,15 +232,18 @@ public void testReadMetadataReadException() throws IOException { null ); + TranslogTransferMetadata tm = new TranslogTransferMetadata(1, 1, 1, 2); + String mdFilename = tm.getFileName(); + doAnswer(invocation -> { LatchedActionListener> latchedActionListener = invocation.getArgument(2); List bmList = new LinkedList<>(); - bmList.add(new PlainBlobMetadata("0__3__3__13__235", 1)); + bmList.add(new PlainBlobMetadata(mdFilename, 1)); latchedActionListener.onResponse(bmList); return null; - }).when(transferService).listBlobsInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); + }).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); - when(transferService.downloadBlob(any(BlobPath.class), eq("0__3__3__13__235"))).thenThrow(new IOException("Something went wrong")); + when(transferService.downloadBlob(any(BlobPath.class), eq(mdFilename))).thenThrow(new IOException("Something went wrong")); assertThrows(IOException.class, translogTransferManager::readMetadata); } @@ -256,24 +260,13 @@ public void testReadMetadataListException() throws IOException { LatchedActionListener> latchedActionListener = invocation.getArgument(2); latchedActionListener.onFailure(new IOException("Issue while listing")); return null; - }).when(transferService).listBlobsInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); + }).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class)); - when(transferService.downloadBlob(any(BlobPath.class), eq("1__3__3__12__235"))).thenThrow(new IOException("Something went wrong")); + when(transferService.downloadBlob(any(BlobPath.class), any(String.class))).thenThrow(new IOException("Something went wrong")); assertThrows(IOException.class, translogTransferManager::readMetadata); } - public void testReadMetadataComparatorIllegal() { - List metadataFiles = Arrays.asList("1__4__4__12__234", "1__4__3__12__235", "1__4__4__12__234"); - assertThrows(IllegalArgumentException.class, () -> metadataFiles.sort(TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR)); - } - - public void testReadMetadataComparator() { - List metadataFiles = Arrays.asList("1__4__4__12__234", "0__3__3__13__235", "1__5__5__12__233"); - metadataFiles.sort(TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR); - assertEquals(Arrays.asList("0__3__3__13__235", "1__4__4__12__234", "1__5__5__12__233"), metadataFiles); - } - public void testDownloadTranslog() throws IOException { Path location = createTempDir(); TranslogTransferManager translogTransferManager = new TranslogTransferManager(