diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index c488127857ed5..a6b1d668f763b 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.Logger; import org.opensearch.common.SetOnce; import org.opensearch.common.lease.Releasable; -import org.opensearch.common.lease.Releasables; import org.opensearch.common.logging.Loggers; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.util.io.IOUtils; @@ -37,6 +36,7 @@ import java.util.HashSet; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.Semaphore; import java.util.function.BooleanSupplier; @@ -237,7 +237,7 @@ public static TranslogTransferManager buildTranslogTransferManager( @Override public boolean ensureSynced(Location location) throws IOException { - try (ReleasableLock ignored = writeLock.acquire()) { + try { assert location.generation <= current.getGeneration(); if (location.generation == current.getGeneration()) { ensureOpen(); @@ -260,7 +260,8 @@ public void rollGeneration() throws IOException { } private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOException { - try (Releasable ignored = writeLock.acquire()) { + Releasable uploadReadLock = null; + try (ReleasableLock updateWriteLock = writeLock.acquire()) { if (generation == null || generation == current.getGeneration()) { try { final TranslogReader reader = current.closeIntoReader(); @@ -270,6 +271,11 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc logger.trace("Creating new writer for gen: [{}]", current.getGeneration() + 1); current = createWriter(current.getGeneration() + 1); } + assert updateWriteLock.isHeldByCurrentThread() : "Write lock must be held before we acquire the read lock"; + // Here we are downgrading the write lock by acquiring the read lock and releasing the write lock + // This ensures that other threads can still acquire the read locks while also protecting the + // readers and writer to not be mutated any further. + uploadReadLock = readLock.acquire(); } catch (final Exception e) { tragedy.setTragicException(e); closeOnTragicEvent(e); @@ -278,7 +284,13 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc } else if (generation < current.getGeneration()) { return false; } + } + assert Objects.nonNull(uploadReadLock); + try ( + Releasable ignoredReadLock = uploadReadLock; + Releasable ignoredTranslogGenLock = deletionPolicy.acquireTranslogGen(getMinFileGeneration()) + ) { // Do we need remote writes in sync fashion ? // If we don't , we should swallow FileAlreadyExistsException while writing to remote store // and also verify for same during primary-primary relocation @@ -317,10 +329,9 @@ private boolean upload(Long primaryTerm, Long generation) throws IOException { Translog::getCommitCheckpointFileName ).build() ) { - Releasable transferReleasable = Releasables.wrap(deletionPolicy.acquireTranslogGen(getMinFileGeneration())); return translogTransferManager.transferSnapshot( transferSnapshotProvider, - new RemoteFsTranslogTransferListener(transferReleasable, generation, primaryTerm) + new RemoteFsTranslogTransferListener(generation, primaryTerm) ); } @@ -499,16 +510,17 @@ protected void onDelete() { translogTransferManager.delete(); } + // Visible for testing + boolean isRemoteGenerationDeletionPermitsAvailable() { + return remoteGenerationDeletionPermits.availablePermits() == REMOTE_DELETION_PERMITS; + } + /** * TranslogTransferListener implementation for RemoteFsTranslog * * @opensearch.internal */ private class RemoteFsTranslogTransferListener implements TranslogTransferListener { - /** - * Releasable instance for the translog - */ - private final Releasable transferReleasable; /** * Generation for the translog @@ -520,16 +532,13 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen */ private final Long primaryTerm; - RemoteFsTranslogTransferListener(Releasable transferReleasable, Long generation, Long primaryTerm) { - this.transferReleasable = transferReleasable; + RemoteFsTranslogTransferListener(Long generation, Long primaryTerm) { this.generation = generation; this.primaryTerm = primaryTerm; } @Override public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException { - transferReleasable.close(); - closeFilesIfNoPendingRetentionLocks(); maxRemoteTranslogGenerationUploaded = generation; minRemoteGenReferenced = getMinFileGeneration(); logger.trace("uploaded translog for {} {} ", primaryTerm, generation); @@ -537,8 +546,6 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOExcepti @Override public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException { - transferReleasable.close(); - closeFilesIfNoPendingRetentionLocks(); if (ex instanceof IOException) { throw (IOException) ex; } else { diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index de1b2990f0a50..f7a6eeff9dae2 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -621,6 +621,7 @@ public void testSimpleOperationsUpload() throws Exception { translog.setMinSeqNoToKeep(2); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); translog.trimUnreferencedReaders(); assertEquals(1, translog.readers.size()); assertEquals(1, translog.stats().estimatedNumberOfOperations());