Skip to content

Commit

Permalink
Downgrade writelock to readlock during translog upload
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Sep 20, 2023
1 parent 3369069 commit e97609f
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.common.SetOnce;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.io.IOUtils;
Expand All @@ -37,6 +36,7 @@
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -237,7 +237,7 @@ public static TranslogTransferManager buildTranslogTransferManager(

@Override
public boolean ensureSynced(Location location) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
try {
assert location.generation <= current.getGeneration();
if (location.generation == current.getGeneration()) {
ensureOpen();
Expand All @@ -260,7 +260,8 @@ public void rollGeneration() throws IOException {
}

private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOException {
try (Releasable ignored = writeLock.acquire()) {
Releasable uploadReadLock = null;
try (ReleasableLock updateWriteLock = writeLock.acquire()) {
if (generation == null || generation == current.getGeneration()) {
try {
final TranslogReader reader = current.closeIntoReader();
Expand All @@ -270,6 +271,11 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc
logger.trace("Creating new writer for gen: [{}]", current.getGeneration() + 1);
current = createWriter(current.getGeneration() + 1);
}
assert updateWriteLock.isHeldByCurrentThread() : "Write lock must be held before we acquire the read lock";
// Here we are downgrading the write lock by acquiring the read lock and releasing the write lock
// This ensures that other threads can still acquire the read locks while also protecting the
// readers and writer to not be mutated any further.
uploadReadLock = readLock.acquire();
} catch (final Exception e) {
tragedy.setTragicException(e);
closeOnTragicEvent(e);
Expand All @@ -278,7 +284,13 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc
} else if (generation < current.getGeneration()) {
return false;
}
}

assert Objects.nonNull(uploadReadLock);
try (
Releasable ignoredReadLock = uploadReadLock;
Releasable ignoredTranslogGenLock = deletionPolicy.acquireTranslogGen(getMinFileGeneration())
) {
// Do we need remote writes in sync fashion ?
// If we don't , we should swallow FileAlreadyExistsException while writing to remote store
// and also verify for same during primary-primary relocation
Expand Down Expand Up @@ -317,10 +329,9 @@ private boolean upload(Long primaryTerm, Long generation) throws IOException {
Translog::getCommitCheckpointFileName
).build()
) {
Releasable transferReleasable = Releasables.wrap(deletionPolicy.acquireTranslogGen(getMinFileGeneration()));
return translogTransferManager.transferSnapshot(
transferSnapshotProvider,
new RemoteFsTranslogTransferListener(transferReleasable, generation, primaryTerm)
new RemoteFsTranslogTransferListener(generation, primaryTerm)
);
}

Expand Down Expand Up @@ -499,16 +510,17 @@ protected void onDelete() {
translogTransferManager.delete();
}

// Visible for testing
boolean isRemoteGenerationDeletionPermitsAvailable() {
return remoteGenerationDeletionPermits.availablePermits() == REMOTE_DELETION_PERMITS;
}

/**
* TranslogTransferListener implementation for RemoteFsTranslog
*
* @opensearch.internal
*/
private class RemoteFsTranslogTransferListener implements TranslogTransferListener {
/**
* Releasable instance for the translog
*/
private final Releasable transferReleasable;

/**
* Generation for the translog
Expand All @@ -520,25 +532,20 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen
*/
private final Long primaryTerm;

RemoteFsTranslogTransferListener(Releasable transferReleasable, Long generation, Long primaryTerm) {
this.transferReleasable = transferReleasable;
RemoteFsTranslogTransferListener(Long generation, Long primaryTerm) {
this.generation = generation;
this.primaryTerm = primaryTerm;
}

@Override
public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException {
transferReleasable.close();
closeFilesIfNoPendingRetentionLocks();
maxRemoteTranslogGenerationUploaded = generation;
minRemoteGenReferenced = getMinFileGeneration();
logger.trace("uploaded translog for {} {} ", primaryTerm, generation);
}

@Override
public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException {
transferReleasable.close();
closeFilesIfNoPendingRetentionLocks();
if (ex instanceof IOException) {
throw (IOException) ex;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@

package org.opensearch.index.translog;

import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.backward_codecs.store.EndiannessReverserUtil;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.tests.mockfile.FilterFileChannel;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.tests.util.TimeUnits;
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
Expand Down Expand Up @@ -107,6 +109,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@TimeoutSuite(millis = 60 * TimeUnits.MINUTE)
@LuceneTestCase.SuppressFileSystems("ExtrasFS")

public class RemoteFsTranslogTests extends OpenSearchTestCase {
Expand Down Expand Up @@ -621,6 +624,7 @@ public void testSimpleOperationsUpload() throws Exception {

translog.setMinSeqNoToKeep(2);

assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable()));
translog.trimUnreferencedReaders();
assertEquals(1, translog.readers.size());
assertEquals(1, translog.stats().estimatedNumberOfOperations());
Expand Down

0 comments on commit e97609f

Please sign in to comment.