Skip to content

Commit

Permalink
Upload translog only if primaryMode is true
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Jan 10, 2023
1 parent ff8a3af commit 1f7afe8
Show file tree
Hide file tree
Showing 16 changed files with 87 additions and 28 deletions.
17 changes: 17 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/EngineConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@

import java.util.List;
import java.util.Objects;
import java.util.function.BooleanSupplier;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand Down Expand Up @@ -100,6 +101,7 @@ public final class EngineConfig {
private final LongSupplier globalCheckpointSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private final boolean isReadOnlyReplica;
private final BooleanSupplier primaryModeSupplier;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -200,6 +202,7 @@ private EngineConfig(Builder builder) {
this.primaryTermSupplier = builder.primaryTermSupplier;
this.tombstoneDocSupplier = builder.tombstoneDocSupplier;
this.isReadOnlyReplica = builder.isReadOnlyReplica;
this.primaryModeSupplier = builder.primaryModeSupplier;
this.translogFactory = builder.translogFactory;
}

Expand Down Expand Up @@ -405,6 +408,14 @@ public boolean isReadOnlyReplica() {
return indexSettings.isSegRepEnabled() && isReadOnlyReplica;
}

/**
* Returns the underlying primaryModeSupplier.
* @return the primary mode supplier.
*/
public BooleanSupplier getPrimaryModeSupplier() {
return primaryModeSupplier;
}

/**
* Returns the underlying translog factory
* @return the translog factory
Expand Down Expand Up @@ -470,6 +481,7 @@ public static class Builder {
private TombstoneDocSupplier tombstoneDocSupplier;
private TranslogDeletionPolicyFactory translogDeletionPolicyFactory;
private boolean isReadOnlyReplica;
private BooleanSupplier primaryModeSupplier;
private TranslogFactory translogFactory = new InternalTranslogFactory();

public Builder shardId(ShardId shardId) {
Expand Down Expand Up @@ -592,6 +604,11 @@ public Builder readOnlyReplica(boolean isReadOnlyReplica) {
return this;
}

public Builder primaryModeSupplier(BooleanSupplier primaryModeSupplier) {
this.primaryModeSupplier = primaryModeSupplier;
return this;
}

public Builder translogFactory(TranslogFactory translogFactory) {
this.translogFactory = translogFactory;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.BooleanSupplier;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand Down Expand Up @@ -149,6 +150,7 @@ public EngineConfig newEngineConfig(
LongSupplier primaryTermSupplier,
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica,
BooleanSupplier primaryModeSupplier,
TranslogFactory translogFactory
) {
CodecService codecServiceToUse = codecService;
Expand Down Expand Up @@ -180,6 +182,7 @@ public EngineConfig newEngineConfig(
.primaryTermSupplier(primaryTermSupplier)
.tombstoneDocSupplier(tombstoneDocSupplier)
.readOnlyReplica(isReadOnlyReplica)
.primaryModeSupplier(primaryModeSupplier)
.translogFactory(translogFactory)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,12 @@ public void onFailure(String reason, Exception ex) {
translogDeletionPolicy,
shardId,
readLock,
() -> getLocalCheckpointTracker(),
this::getLocalCheckpointTracker,
translogUUID,
new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId),
this::ensureOpen,
engineConfig.getTranslogFactory()
this,
engineConfig.getTranslogFactory(),
engineConfig.getPrimaryModeSupplier()
);
this.translogManager = translogManagerRef;
this.softDeletesPolicy = newSoftDeletesPolicy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ public void onAfterTranslogSync() {
}
},
this,
engineConfig.getTranslogFactory()
engineConfig.getTranslogFactory(),
engineConfig.getPrimaryModeSupplier()
);
this.translogManager = translogManagerRef;
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ public void trimUnreferencedTranslogFiles() throws TranslogException {
translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(),
engineConfig.getPrimaryTermSupplier(),
seqNo -> {}
seqNo -> {},
engineConfig.getPrimaryModeSupplier()
)
) {
translog.trimUnreferencedReaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
translogDeletionPolicy,
config.getGlobalCheckpointSupplier(),
config.getPrimaryTermSupplier(),
seqNo -> {}
seqNo -> {},
config.getPrimaryModeSupplier()
)
) {
return translog.stats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3421,6 +3421,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
() -> getOperationPrimaryTerm(),
tombstoneDocSupplier(),
isReadOnlyReplica,
replicationTracker::isPrimaryMode,
translogFactorySupplier.apply(indexSettings, shardRouting)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.translog;

import java.io.IOException;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;

Expand All @@ -26,7 +27,8 @@ public Translog newTranslog(
TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier,
LongSupplier primaryTermSupplier,
LongConsumer persistedSequenceNumberConsumer
LongConsumer persistedSequenceNumberConsumer,
BooleanSupplier primaryModeSupplier
) throws IOException {

return new LocalTranslog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -55,7 +56,8 @@ public InternalTranslogManager(
String translogUUID,
TranslogEventListener translogEventListener,
LifecycleAware engineLifeCycleAware,
TranslogFactory translogFactory
TranslogFactory translogFactory,
BooleanSupplier primaryModeSupplier
) throws IOException {
this.shardId = shardId;
this.readLock = readLock;
Expand All @@ -68,7 +70,7 @@ public InternalTranslogManager(
if (tracker != null) {
tracker.markSeqNoAsPersisted(seqNo);
}
}, translogUUID, translogFactory);
}, translogUUID, translogFactory, primaryModeSupplier);
assert translog.getGeneration() != null;
this.translog = translog;
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
Expand Down Expand Up @@ -345,16 +347,17 @@ protected Translog openTranslog(
LongSupplier globalCheckpointSupplier,
LongConsumer persistedSequenceNumberConsumer,
String translogUUID,
TranslogFactory translogFactory
TranslogFactory translogFactory,
BooleanSupplier primaryModeSupplier
) throws IOException {

return translogFactory.newTranslog(
translogConfig,
translogUUID,
translogDeletionPolicy,
globalCheckpointSupplier,
primaryTermSupplier,
persistedSequenceNumberConsumer
persistedSequenceNumberConsumer,
primaryModeSupplier
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -53,7 +54,8 @@ public Translog newTranslog(
TranslogDeletionPolicy deletionPolicy,
LongSupplier globalCheckpointSupplier,
LongSupplier primaryTermSupplier,
LongConsumer persistedSequenceNumberConsumer
LongConsumer persistedSequenceNumberConsumer,
BooleanSupplier primaryModeSupplier
) throws IOException {

assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
Expand All @@ -66,7 +68,8 @@ public Translog newTranslog(
primaryTermSupplier,
persistedSequenceNumberConsumer,
blobStoreRepository,
executorService
executorService,
primaryModeSupplier
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;

Expand All @@ -45,6 +46,7 @@ public class RemoteFsTranslog extends Translog {
private final BlobStoreRepository blobStoreRepository;
private final TranslogTransferManager translogTransferManager;
private final FileTransferTracker fileTransferTracker;
private final BooleanSupplier primaryModeSupplier;
private volatile long maxRemoteTranslogGenerationUploaded;

private volatile long minSeqNoToKeep;
Expand All @@ -57,10 +59,12 @@ public RemoteFsTranslog(
LongSupplier primaryTermSupplier,
LongConsumer persistedSequenceNumberConsumer,
BlobStoreRepository blobStoreRepository,
ExecutorService executorService
ExecutorService executorService,
BooleanSupplier primaryModeSupplier
) throws IOException {
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
this.blobStoreRepository = blobStoreRepository;
this.primaryModeSupplier = primaryModeSupplier;
fileTransferTracker = new FileTransferTracker(shardId);
this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, executorService, shardId, fileTransferTracker);

Expand Down Expand Up @@ -198,7 +202,13 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc
}

private boolean upload(Long primaryTerm, Long generation) throws IOException {
logger.trace("uploading translog for {} {} ", primaryTerm, generation);
boolean primaryMode = primaryModeSupplier.getAsBoolean();
if (primaryMode) {
logger.trace("skipped uploading translog for {} {}", primaryTerm, generation);
// NO-OP
return true;
}
logger.trace("uploading translog for {} {}", primaryTerm, generation);
try (
TranslogCheckpointTransferSnapshot transferSnapshotProvider = new TranslogCheckpointTransferSnapshot.Builder(
primaryTerm,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.translog;

import java.io.IOException;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;

Expand All @@ -27,6 +28,7 @@ Translog newTranslog(
final TranslogDeletionPolicy deletionPolicy,
final LongSupplier globalCheckpointSupplier,
final LongSupplier primaryTermSupplier,
final LongConsumer persistedSequenceNumberConsumer
final LongConsumer persistedSequenceNumberConsumer,
final BooleanSupplier primaryModeSupplier
) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.index.translog.listener.TranslogEventListener;

import java.io.IOException;
import java.util.function.BooleanSupplier;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand All @@ -36,7 +37,8 @@ public WriteOnlyTranslogManager(
String translogUUID,
TranslogEventListener translogEventListener,
LifecycleAware engineLifecycleAware,
TranslogFactory translogFactory
TranslogFactory translogFactory,
BooleanSupplier primaryModeSupplier
) throws IOException {
super(
translogConfig,
Expand All @@ -49,7 +51,8 @@ public WriteOnlyTranslogManager(
translogUUID,
translogEventListener,
engineLifecycleAware,
translogFactory
translogFactory,
primaryModeSupplier
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public void testCreateEngineConfigFromFactory() {
null,
null,
false,
() -> Boolean.TRUE,
new InternalTranslogFactory()
);

Expand Down Expand Up @@ -146,6 +147,7 @@ public void testCreateCodecServiceFromFactory() {
null,
null,
false,
() -> Boolean.TRUE,
new InternalTranslogFactory()
);
assertNotNull(config.getCodec());
Expand Down
Loading

0 comments on commit 1f7afe8

Please sign in to comment.