From d9bbebf7312be583780908c84ce8ae03f7378fd6 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Date: Mon, 19 Jun 2023 15:06:56 +0530 Subject: [PATCH] =?UTF-8?q?[Backport=202.x][Remote=20Store]=20Deletion=20o?= =?UTF-8?q?f=20Remote=20Segments=20and=20Translog=20upon=20Index=20De?= =?UTF-8?q?=E2=80=A6=20(#8073)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [Remote Store] Deletion of Remote Segments and Translog upon Index Deletion (#7682) Signed-off-by: Gaurav Bafna * Translog deletion only for remote txlog enabled indices Signed-off-by: Gaurav Bafna --------- Signed-off-by: Gaurav Bafna --- .../shard/GlobalCheckpointListenersIT.java | 2 +- .../opensearch/index/shard/IndexShardIT.java | 4 +- .../RemoteStoreBaseIntegTestCase.java | 28 ++++++++++++- .../opensearch/remotestore/RemoteStoreIT.java | 36 +++++++++++++++++ .../org/opensearch/index/IndexService.java | 8 +++- .../opensearch/index/shard/IndexShard.java | 31 +++++++++----- .../index/store/RemoteDirectory.java | 4 ++ .../store/RemoteSegmentStoreDirectory.java | 40 ++++++++++++++++--- .../lockmanager/RemoteStoreLockManager.java | 7 +++- .../RemoteStoreMetadataLockManager.java | 4 ++ .../translog/InternalTranslogManager.java | 4 ++ .../index/translog/NoOpTranslogManager.java | 2 + .../index/translog/RemoteFsTranslog.java | 10 +++++ .../opensearch/index/translog/Translog.java | 2 + .../index/translog/TranslogManager.java | 5 +++ .../transfer/TranslogTransferManager.java | 23 +++++++++-- .../index/engine/NoOpEngineRecoveryTests.java | 2 +- .../RecoveryDuringReplicationTests.java | 10 ++--- .../index/shard/IndexShardTests.java | 12 +++--- .../SegmentReplicationIndexShardTests.java | 12 +++--- .../RemoteSegmentStoreDirectoryTests.java | 2 +- .../RemoteSegmentMetadataHandlerTests.java | 2 +- .../IndexingMemoryControllerTests.java | 2 +- .../PeerRecoveryTargetServiceTests.java | 2 +- .../indices/recovery/RecoveryStatusTests.java | 2 +- .../indices/recovery/RecoveryTests.java | 6 +-- .../BlobStoreRepositoryRestoreTests.java | 4 +- .../index/shard/IndexShardTestCase.java | 2 +- 28 files changed, 215 insertions(+), 53 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/GlobalCheckpointListenersIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/GlobalCheckpointListenersIT.java index a2c5c0333bbfe..76ff2f809cb83 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/GlobalCheckpointListenersIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/GlobalCheckpointListenersIT.java @@ -126,7 +126,7 @@ public void accept(final long g, final Exception e) { } }, null); - shard.close("closed", randomBoolean()); + shard.close("closed", randomBoolean(), false); assertBusy(() -> assertTrue(invoked.get())); } diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index e164aefa35d51..00158f5b57bb8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -597,7 +597,7 @@ public void testShardHasMemoryBufferOnTranslogRecover() throws Throwable { client().prepareIndex("test").setId("1").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).setRefreshPolicy(IMMEDIATE).get(); CheckedFunction wrapper = directoryReader -> directoryReader; - shard.close("simon says", false); + shard.close("simon says", false, false); AtomicReference shardRef = new AtomicReference<>(); List failures = new ArrayList<>(); IndexingOperationListener listener = new IndexingOperationListener() { @@ -635,7 +635,7 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResul try { ExceptionsHelper.rethrowAndSuppress(failures); } finally { - newShard.close("just do it", randomBoolean()); + newShard.close("just do it", randomBoolean(), false); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index 42850fc59c8ad..0ffa5ab23e0b6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -18,7 +18,13 @@ import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchIntegTestCase; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.concurrent.atomic.AtomicInteger; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -27,6 +33,8 @@ public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase { protected static final int SHARD_COUNT = 1; protected static final int REPLICA_COUNT = 1; + protected Path absolutePath; + @Override protected boolean addMockInternalEngine() { return false; @@ -73,7 +81,7 @@ protected Settings remoteTranslogIndexSettings(int numberOfReplicas) { @Before public void setup() { internalCluster().startClusterManagerOnlyNode(); - Path absolutePath = randomRepoPath().toAbsolutePath(); + absolutePath = randomRepoPath().toAbsolutePath(); assertAcked( clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath)) ); @@ -84,4 +92,22 @@ public void teardown() { assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); } + public int getFileCount(Path path) throws Exception { + final AtomicInteger filesExisting = new AtomicInteger(0); + Files.walkFileTree(path, new SimpleFileVisitor<>() { + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException impossible) throws IOException { + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { + filesExisting.incrementAndGet(); + return FileVisitResult.CONTINUE; + } + }); + + return filesExisting.get(); + } + } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 290f0df591c64..70a41d74a57c5 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -9,6 +9,7 @@ package org.opensearch.remotestore; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.admin.indices.recovery.RecoveryResponse; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.PlainActionFuture; @@ -23,6 +24,7 @@ import org.opensearch.test.transport.MockTransportService; import java.io.IOException; +import java.nio.file.Path; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -30,6 +32,7 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; +import static org.hamcrest.Matchers.comparesEqualTo; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -241,4 +244,37 @@ public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataRefresh() thro public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogRefresh() throws Exception { testPeerRecovery(true, randomIntBetween(2, 5), false); } + + private void verifyRemoteStoreCleanup(boolean remoteTranslog) throws Exception { + internalCluster().startDataOnlyNodes(3); + if (remoteTranslog) { + createIndex(INDEX_NAME, remoteTranslogIndexSettings(1)); + } else { + createIndex(INDEX_NAME, remoteStoreIndexSettings(1)); + } + + indexData(5, randomBoolean()); + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID); + assertTrue(getFileCount(indexPath) > 0); + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + // Delete is async. Give time for it + assertBusy(() -> { + try { + assertThat(getFileCount(indexPath), comparesEqualTo(0)); + } catch (Exception e) {} + }, 30, TimeUnit.SECONDS); + } + + public void testRemoteSegmentCleanup() throws Exception { + verifyRemoteStoreCleanup(false); + } + + public void testRemoteTranslogCleanup() throws Exception { + verifyRemoteStoreCleanup(true); + } } diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 4e808ebb838e7..73797106bb66f 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -603,6 +603,7 @@ public synchronized void removeShard(int shardId, String reason) { private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store store, IndexEventListener listener) { final int shardId = sId.id(); final Settings indexSettings = this.getIndexSettings().getSettings(); + Store remoteStore = indexShard.remoteStore(); if (store != null) { store.beforeClose(); } @@ -616,7 +617,7 @@ private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store try { // only flush if we are closed (closed index or shutdown) and if we are not deleted final boolean flushEngine = deleted.get() == false && closed.get(); - indexShard.close(reason, flushEngine); + indexShard.close(reason, flushEngine, deleted.get()); } catch (Exception e) { logger.debug(() -> new ParameterizedMessage("[{}] failed to close index shard", shardId), e); // ignore @@ -632,6 +633,11 @@ private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store } else { logger.trace("[{}] store not initialized prior to closing shard, nothing to close", shardId); } + + if (remoteStore != null && indexShard.isPrimaryMode() && deleted.get()) { + remoteStore.close(); + } + } catch (Exception e) { logger.warn( () -> new ParameterizedMessage("[{}] failed to close store on shard removal (reason: [{}])", shardId, reason), diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f5e349eb54b99..a748d9b71a2ff 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1887,7 +1887,7 @@ public CacheHelper getReaderCacheHelper() { } - public void close(String reason, boolean flushEngine) throws IOException { + public void close(String reason, boolean flushEngine, boolean deleted) throws IOException { synchronized (engineMutex) { try { synchronized (mutex) { @@ -1903,12 +1903,31 @@ public void close(String reason, boolean flushEngine) throws IOException { // playing safe here and close the engine even if the above succeeds - close can be called multiple times // Also closing refreshListeners to prevent us from accumulating any more listeners IOUtils.close(engine, globalCheckpointListeners, refreshListeners, pendingReplicationActions); + + if (deleted && engine != null && isPrimaryMode() && isRemoteTranslogEnabled()) { + // Translog Clean up + assert engine instanceof InternalEngine; + ((InternalEngine) engine).translogManager().onDelete(); + } + indexShardOperationPermits.close(); } } } } + /* + ToDo : Fix this https://github.com/opensearch-project/OpenSearch/issues/8003 + */ + private RemoteSegmentStoreDirectory getRemoteDirectory() { + assert indexSettings.isRemoteStoreEnabled(); + assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory"; + FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory(); + FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); + final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate(); + return ((RemoteSegmentStoreDirectory) remoteDirectory); + } + public void preRecovery() { final IndexShardState currentState = this.state; // single volatile read if (currentState == IndexShardState.CLOSED) { @@ -4527,16 +4546,10 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re throws IOException { assert indexSettings.isRemoteStoreEnabled(); logger.info("Downloading segments from remote segment store"); - assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory"; - FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory(); - assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory - : "Store.directory is not enclosing an instance of FilterDirectory"; - FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); - final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate(); + RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory(); // We need to call RemoteSegmentStoreDirectory.init() in order to get latest metadata of the files that // are uploaded to the remote segment store. - assert remoteDirectory instanceof RemoteSegmentStoreDirectory : "remoteDirectory is not an instance of RemoteSegmentStoreDirectory"; - RemoteSegmentMetadata remoteSegmentMetadata = ((RemoteSegmentStoreDirectory) remoteDirectory).init(); + RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.init(); Map uploadedSegments = ((RemoteSegmentStoreDirectory) remoteDirectory) .getSegmentsUploadedToRemoteStore(); store.incRef(); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 5192fd49b91f6..be4b4e910bb4d 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -210,4 +210,8 @@ public void rename(String source, String dest) throws IOException { public Lock obtainLock(String name) throws IOException { throw new UnsupportedOperationException(); } + + public void delete() throws IOException { + blobContainer.delete(); + } } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 93db8c191937d..15c6fbea99148 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -20,25 +20,25 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.opensearch.common.UUIDs; +import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.index.store.lockmanager.FileLockInfo; import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; -import org.opensearch.index.store.lockmanager.FileLockInfo; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; -import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.NoSuchFileException; -import java.util.Map; -import java.util.HashSet; -import java.util.Optional; -import java.util.HashMap; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -636,4 +636,32 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException } } } + + /* + Tries to delete shard level directory if it is empty + Return true if it deleted it successfully + */ + private boolean deleteIfEmpty() throws IOException { + Collection metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX); + if (metadataFiles.size() != 0) { + logger.info("Remote directory still has files , not deleting the path"); + return false; + } + + try { + remoteDataDirectory.delete(); + remoteMetadataDirectory.delete(); + mdLockManager.delete(); + } catch (Exception e) { + logger.error("Exception occurred while deleting directory", e); + return false; + } + + return true; + } + + public void close() throws IOException { + deleteStaleSegments(0); + deleteIfEmpty(); + } } diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java index ce657627fcfc6..c30be082b4795 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java @@ -22,7 +22,7 @@ public interface RemoteStoreLockManager { * @param lockInfo lock info instance for which we need to acquire lock. * @throws IOException throws exception in case there is a problem with acquiring lock. */ - public void acquire(LockInfo lockInfo) throws IOException; + void acquire(LockInfo lockInfo) throws IOException; /** * @@ -38,4 +38,9 @@ public interface RemoteStoreLockManager { * @throws IOException throws exception in case there is a problem in checking if a given file is locked or not. */ Boolean isAcquired(LockInfo lockInfo) throws IOException; + + /* + Deletes all lock related files and directories + */ + void delete() throws IOException; } diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java index 41665ebe47600..7df20cae10664 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java @@ -83,4 +83,8 @@ public Boolean isAcquired(LockInfo lockInfo) throws IOException { Collection lockFiles = lockDirectory.listFilesByPrefix(((FileLockInfo) lockInfo).getLockPrefix()); return !lockFiles.isEmpty(); } + + public void delete() throws IOException { + lockDirectory.delete(); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 7a331e41cde2c..0105a54a9430d 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -295,6 +295,10 @@ public void setMinSeqNoToKeep(long seqNo) { translog.setMinSeqNoToKeep(seqNo); } + public void onDelete() { + translog.onDelete(); + } + /** * Reads operations from the translog * @param location location of translog diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java index 41fa3156c7ed3..3be63113cc667 100644 --- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -115,4 +115,6 @@ public Translog.Operation readOperation(Translog.Location location) throws IOExc public Translog.Location add(Translog.Operation operation) throws IOException { return new Translog.Location(0, 0, 0); } + + public void onDelete() {} } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 20bf76c9305e0..6ad26989a44dd 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -422,4 +422,14 @@ private void deleteStaleRemotePrimaryTermsAndMetadataFiles() { translogTransferManager.deleteStaleTranslogMetadataFilesAsync(); } } + + protected void onDelete() { + if (primaryModeSupplier.getAsBoolean() == false) { + logger.trace("skipped delete translog"); + // NO-OP + return; + } + // clean up all remote translog files + translogTransferManager.delete(); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index 814574855b85d..fb0be2b6aaf0e 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -1807,6 +1807,8 @@ protected long getMinReferencedGen() throws IOException { */ protected void setMinSeqNoToKeep(long seqNo) {} + protected void onDelete() {} + /** * deletes all files associated with a reader. package-private to be able to simulate node failures at this point */ diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index 7f1f87d3409e7..5c91f6cb7b345 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -121,4 +121,9 @@ public interface TranslogManager { * This might be required when segments are persisted via other mechanism than flush. */ void setMinSeqNoToKeep(long seqNo); + + /* + Clean up if any needed on deletion of index + */ + void onDelete(); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 489c81f802695..58aca00d2e9d3 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -56,6 +56,7 @@ public class TranslogTransferManager { private final TransferService transferService; private final BlobPath remoteDataTransferPath; private final BlobPath remoteMetadataTransferPath; + private final BlobPath remoteBaseTransferPath; private final FileTransferTracker fileTransferTracker; private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000; @@ -74,13 +75,14 @@ public class TranslogTransferManager { public TranslogTransferManager( ShardId shardId, TransferService transferService, - BlobPath remoteDataTransferPath, + BlobPath remoteBaseTransferPath, FileTransferTracker fileTransferTracker ) { this.shardId = shardId; this.transferService = transferService; - this.remoteDataTransferPath = remoteDataTransferPath.add(DATA_DIR); - this.remoteMetadataTransferPath = remoteDataTransferPath.add(METADATA_DIR); + this.remoteBaseTransferPath = remoteBaseTransferPath; + this.remoteDataTransferPath = remoteBaseTransferPath.add(DATA_DIR); + this.remoteMetadataTransferPath = remoteBaseTransferPath.add(METADATA_DIR); this.fileTransferTracker = fileTransferTracker; } @@ -324,6 +326,21 @@ public void onFailure(Exception e) { ); } + public void delete() { + // cleans up all the translog contents in async fashion + transferService.deleteAsync(ThreadPool.Names.REMOTE_PURGE, remoteBaseTransferPath, new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.info("Deleted all remote translog data for {}", shardId); + } + + @Override + public void onFailure(Exception e) { + logger.error("Exception occurred while cleaning translog ", e); + } + }); + } + public void deleteStaleTranslogMetadataFilesAsync() { transferService.listAllAsync(ThreadPool.Names.REMOTE_PURGE, remoteMetadataTransferPath, new ActionListener<>() { @Override diff --git a/server/src/test/java/org/opensearch/index/engine/NoOpEngineRecoveryTests.java b/server/src/test/java/org/opensearch/index/engine/NoOpEngineRecoveryTests.java index 3162f7915c994..d8b55815b5b05 100644 --- a/server/src/test/java/org/opensearch/index/engine/NoOpEngineRecoveryTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NoOpEngineRecoveryTests.java @@ -50,7 +50,7 @@ public void testRecoverFromNoOp() throws IOException { for (int i = 0; i < nbDocs; i++) { indexDoc(indexShard, "_doc", String.valueOf(i)); } - indexShard.close("test", true); + indexShard.close("test", true, false); final ShardRouting shardRouting = indexShard.routingEntry(); IndexShard primary = reinitShard( diff --git a/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java index 0b976154969dc..fa2a2f2cf6138 100644 --- a/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java @@ -161,10 +161,10 @@ public void testRecoveryToReplicaThatReceivedExtraDocument() throws Exception { new SourceToParse("index", "replica", new BytesArray("{}"), XContentType.JSON) ); shards.promoteReplicaToPrimary(promotedReplica).get(); - oldPrimary.close("demoted", randomBoolean()); + oldPrimary.close("demoted", randomBoolean(), false); oldPrimary.store().close(); shards.removeReplica(remainingReplica); - remainingReplica.close("disconnected", false); + remainingReplica.close("disconnected", false, false); remainingReplica.store().close(); // randomly introduce a conflicting document final boolean extra = randomBoolean(); @@ -289,7 +289,7 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { newPrimary.flush(new FlushRequest()); } - oldPrimary.close("demoted", false); + oldPrimary.close("demoted", false, false); oldPrimary.store().close(); IndexShard newReplica = shards.addReplicaWithExistingPath(oldPrimary.shardPath(), oldPrimary.routingEntry().currentNodeId()); @@ -335,7 +335,7 @@ public void testReplicaRollbackStaleDocumentsInPeerRecovery() throws Exception { shards.promoteReplicaToPrimary(newPrimary).get(); // Recover a replica should rollback the stale documents shards.removeReplica(replica); - replica.close("recover replica - first time", false); + replica.close("recover replica - first time", false, false); replica.store().close(); replica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); shards.recoverReplica(replica); @@ -346,7 +346,7 @@ public void testReplicaRollbackStaleDocumentsInPeerRecovery() throws Exception { assertThat(replica.getLastSyncedGlobalCheckpoint(), equalTo(replica.seqNoStats().getMaxSeqNo())); // Recover a replica again should also rollback the stale documents. shards.removeReplica(replica); - replica.close("recover replica - second time", false); + replica.close("recover replica - second time", false, false); replica.store().close(); IndexShard anotherReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); shards.recoverReplica(anotherReplica); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 5b6f2f60f55ad..839fa23161bbd 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -290,7 +290,7 @@ public void testFailShard() throws Exception { assertNotNull(shardPath); // fail shard shard.failShard("test shard fail", new CorruptIndexException("", "")); - shard.close("do not assert history", false); + shard.close("do not assert history", false, false); shard.store().close(); // check state file still exists ShardStateMetadata shardStateMetadata = load(logger, shardPath.getShardStatePath()); @@ -1553,7 +1553,7 @@ public void testSnapshotStore() throws IOException { snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); - newShard.close("test", false); + newShard.close("test", false, false); snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); @@ -1813,7 +1813,7 @@ public void testIndexingOperationsListeners() throws IOException { AtomicInteger preDelete = new AtomicInteger(); AtomicInteger postDelete = new AtomicInteger(); AtomicInteger postDeleteException = new AtomicInteger(); - shard.close("simon says", true); + shard.close("simon says", true, false); shard = reinitShard(shard, new IndexingOperationListener() { @Override public Engine.Index preIndex(ShardId shardId, Engine.Index operation) { @@ -1900,7 +1900,7 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Exception ex) { assertEquals(1, postDelete.get()); assertEquals(0, postDeleteException.get()); - shard.close("Unexpected close", true); + shard.close("Unexpected close", true, false); shard.state = IndexShardState.STARTED; // It will generate exception try { @@ -4303,7 +4303,7 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover Thread closeShardThread = new Thread(() -> { try { readyToCloseLatch.await(); - shard.close("testing", false); + shard.close("testing", false, false); // in integration tests, this is done as a listener on IndexService. MockFSDirectoryFactory.checkIndex(logger, shard.store(), shard.shardId); } catch (InterruptedException | IOException e) { @@ -4755,7 +4755,7 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { recoveryThread.start(); try { warmerStarted.await(); - shard.close("testing", false); + shard.close("testing", false, false); assertThat(shard.state, equalTo(IndexShardState.CLOSED)); } finally { warmerBlocking.countDown(); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 8d4a2046b72d8..6755f354ac5e8 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -586,7 +586,7 @@ public void testReplicaReceivesLowerGeneration() throws Exception { assertEqualCommittedSegments(primary, replica_1); shards.promoteReplicaToPrimary(replica_2).get(); - primary.close("demoted", false); + primary.close("demoted", false, false); primary.store().close(); IndexShard oldPrimary = shards.addReplicaWithExistingPath(primary.shardPath(), primary.routingEntry().currentNodeId()); shards.recoverReplica(oldPrimary); @@ -625,7 +625,7 @@ public void testReplicaRestarts() throws Exception { // randomly resetart a replica final IndexShard replicaToRestart = getRandomReplica(shards); - replicaToRestart.close("restart", false); + replicaToRestart.close("restart", false, false); replicaToRestart.store().close(); shards.removeReplica(replicaToRestart); final IndexShard newReplica = shards.addReplicaWithExistingPath( @@ -723,7 +723,7 @@ private void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlush shards.promoteReplicaToPrimary(nextPrimary).get(); // close oldPrimary. - oldPrimary.close("demoted", false); + oldPrimary.close("demoted", false, false); oldPrimary.store().close(); assertEquals(InternalEngine.class, nextPrimary.getEngine().getClass()); @@ -790,7 +790,7 @@ public void testNRTReplicaPromotedAsPrimary() throws Exception { shards.promoteReplicaToPrimary(nextPrimary); // close and start the oldPrimary as a replica. - oldPrimary.close("demoted", false); + oldPrimary.close("demoted", false, false); oldPrimary.store().close(); oldPrimary = shards.addReplicaWithExistingPath(oldPrimary.shardPath(), oldPrimary.routingEntry().currentNodeId()); shards.recoverReplica(oldPrimary); @@ -873,7 +873,7 @@ public void onFailure(Exception e) { assertEquals(nextPrimary.getEngine().getClass(), InternalEngine.class); nextPrimary.refresh("test"); - oldPrimary.close("demoted", false); + oldPrimary.close("demoted", false, false); oldPrimary.store().close(); IndexShard newReplica = shards.addReplicaWithExistingPath(oldPrimary.shardPath(), oldPrimary.routingEntry().currentNodeId()); shards.recoverReplica(newReplica); @@ -1082,7 +1082,7 @@ private IndexShard failAndPromoteRandomReplica(ReplicationGroup shards) throws I IndexShard primary = shards.getPrimary(); final IndexShard newPrimary = getRandomReplica(shards); shards.promoteReplicaToPrimary(newPrimary); - primary.close("demoted", true); + primary.close("demoted", true, false); primary.store().close(); primary = shards.addReplicaWithExistingPath(primary.shardPath(), primary.routingEntry().currentNodeId()); shards.recoverReplica(primary); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index f5f24402c1646..fec9b04d6e371 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -84,7 +84,7 @@ public void setup() throws IOException { @After public void tearDown() throws Exception { - indexShard.close("test tearDown", true); + indexShard.close("test tearDown", true, false); super.tearDown(); } diff --git a/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java index 3e824c0afee25..3bf7781fb909f 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java @@ -51,7 +51,7 @@ public void setup() throws IOException { @After public void tearDown() throws Exception { - indexShard.close("test tearDown", true); + indexShard.close("test tearDown", true, false); super.tearDown(); } diff --git a/server/src/test/java/org/opensearch/indices/IndexingMemoryControllerTests.java b/server/src/test/java/org/opensearch/indices/IndexingMemoryControllerTests.java index 7a362ce8ded74..4a9f15f7128ad 100644 --- a/server/src/test/java/org/opensearch/indices/IndexingMemoryControllerTests.java +++ b/server/src/test/java/org/opensearch/indices/IndexingMemoryControllerTests.java @@ -369,7 +369,7 @@ public void testTranslogRecoveryWorksWithIMC() throws IOException { for (int i = 0; i < 100; i++) { indexDoc(shard, Integer.toString(i), "{\"foo\" : \"bar\"}", XContentType.JSON, null); } - shard.close("simon says", false); + shard.close("simon says", false, false); AtomicReference shardRef = new AtomicReference<>(); Settings settings = Settings.builder().put("indices.memory.index_buffer_size", "50kb").build(); Iterable iterable = () -> (shardRef.get() == null) diff --git a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 730d9b4215b73..9faa8ddff8183 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -380,7 +380,7 @@ public void testResetStartRequestIfTranslogIsCorrupted() throws Exception { ); IndexShard shard = newStartedShard(false); final SeqNoStats seqNoStats = populateRandomData(shard); - shard.close("test", false); + shard.close("test", false, false); if (randomBoolean()) { shard.store().associateIndexWithNewTranslog(UUIDs.randomBase64UUID()); } else if (randomBoolean()) { diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryStatusTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryStatusTests.java index 73caa611dbcdb..3038d11e6ad91 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryStatusTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryStatusTests.java @@ -94,7 +94,7 @@ public void testRenameTempFiles() throws IOException { } } assertNotNull(expectedFile); - indexShard.close("foo", false);// we have to close it here otherwise rename fails since the write.lock is held by the engine + indexShard.close("foo", false, false);// we have to close it here otherwise rename fails since the write.lock is held by the engine multiFileWriter.renameAllTempFiles(); strings = Sets.newHashSet(indexShard.store().directory().listAll()); assertTrue(strings.toString(), strings.contains("foo.bar")); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java index eae070b98c4a1..97772564acc88 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java @@ -267,7 +267,7 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception { final String historyUUID = replica.getHistoryUUID(); Translog.TranslogGeneration translogGeneration = getTranslog(replica).getGeneration(); shards.removeReplica(replica); - replica.close("test", false); + replica.close("test", false, false); IndexWriterConfig iwc = new IndexWriterConfig(null).setCommitOnClose(false) // we don't want merges to happen here - we call maybe merge on the engine // later once we stared it up otherwise we would need to wait for it here @@ -391,7 +391,7 @@ public void testSequenceBasedRecoveryKeepsTranslog() throws Exception { if (randomBoolean()) { shards.flush(); } - replica.close("test", randomBoolean()); + replica.close("test", randomBoolean(), false); replica.store().close(); final IndexShard newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); shards.recoverReplica(newReplica); @@ -509,7 +509,7 @@ public void testRecoveryTrimsLocalTranslog() throws Exception { } shards.syncGlobalCheckpoint(); shards.promoteReplicaToPrimary(randomFrom(shards.getReplicas())).get(); - oldPrimary.close("demoted", false); + oldPrimary.close("demoted", false, false); oldPrimary.store().close(); oldPrimary = shards.addReplicaWithExistingPath(oldPrimary.shardPath(), oldPrimary.routingEntry().currentNodeId()); shards.recoverReplica(oldPrimary); diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index c3bd4dcaf530d..6797a1db23b2d 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -162,7 +162,7 @@ public void testRestoreSnapshotWithExistingFiles() throws IOException { } finally { if (shard != null && shard.state() != IndexShardState.CLOSED) { try { - shard.close("test", false); + shard.close("test", false, false); } finally { IOUtils.close(shard.store()); } @@ -228,7 +228,7 @@ public void testSnapshotWithConflictingName() throws Exception { } finally { if (shard != null && shard.state() != IndexShardState.CLOSED) { try { - shard.close("test", false); + shard.close("test", false, false); } finally { IOUtils.close(shard.store()); } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 4f4ad6e90abfb..1c784622d0df3 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -806,7 +806,7 @@ protected void closeShard(IndexShard shard, boolean assertConsistencyBetweenTran EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber(engine); } } finally { - IOUtils.close(() -> shard.close("test", false), shard.store()); + IOUtils.close(() -> shard.close("test", false, false), shard.store()); } }