From 1f7afe8321264967e430cb49506f87184fb9c8d9 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 11 Jan 2023 01:53:58 +0530 Subject: [PATCH] Upload translog only if primaryMode is true Signed-off-by: Ashish Singh --- .../opensearch/index/engine/EngineConfig.java | 17 +++++++++++++++ .../index/engine/EngineConfigFactory.java | 3 +++ .../index/engine/InternalEngine.java | 7 ++++--- .../index/engine/NRTReplicationEngine.java | 3 ++- .../opensearch/index/engine/NoOpEngine.java | 3 ++- .../index/engine/ReadOnlyEngine.java | 3 ++- .../opensearch/index/shard/IndexShard.java | 1 + .../translog/InternalTranslogFactory.java | 4 +++- .../translog/InternalTranslogManager.java | 13 +++++++----- ...emoteBlobStoreInternalTranslogFactory.java | 7 +++++-- .../index/translog/RemoteFsTranslog.java | 14 +++++++++++-- .../index/translog/TranslogFactory.java | 4 +++- .../translog/WriteOnlyTranslogManager.java | 7 +++++-- .../engine/EngineConfigFactoryTests.java | 2 ++ .../InternalTranslogManagerTests.java | 21 ++++++++++++------- .../index/translog/RemoteFSTranslogTests.java | 6 ++++-- 16 files changed, 87 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java index 1e741725749cc..fe003405fd3f8 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java @@ -61,6 +61,7 @@ import java.util.List; import java.util.Objects; +import java.util.function.BooleanSupplier; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -100,6 +101,7 @@ public final class EngineConfig { private final LongSupplier globalCheckpointSupplier; private final Supplier retentionLeasesSupplier; private final boolean isReadOnlyReplica; + private final BooleanSupplier primaryModeSupplier; /** * A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been @@ -200,6 +202,7 @@ private EngineConfig(Builder builder) { this.primaryTermSupplier = builder.primaryTermSupplier; this.tombstoneDocSupplier = builder.tombstoneDocSupplier; this.isReadOnlyReplica = builder.isReadOnlyReplica; + this.primaryModeSupplier = builder.primaryModeSupplier; this.translogFactory = builder.translogFactory; } @@ -405,6 +408,14 @@ public boolean isReadOnlyReplica() { return indexSettings.isSegRepEnabled() && isReadOnlyReplica; } + /** + * Returns the underlying primaryModeSupplier. + * @return the primary mode supplier. + */ + public BooleanSupplier getPrimaryModeSupplier() { + return primaryModeSupplier; + } + /** * Returns the underlying translog factory * @return the translog factory @@ -470,6 +481,7 @@ public static class Builder { private TombstoneDocSupplier tombstoneDocSupplier; private TranslogDeletionPolicyFactory translogDeletionPolicyFactory; private boolean isReadOnlyReplica; + private BooleanSupplier primaryModeSupplier; private TranslogFactory translogFactory = new InternalTranslogFactory(); public Builder shardId(ShardId shardId) { @@ -592,6 +604,11 @@ public Builder readOnlyReplica(boolean isReadOnlyReplica) { return this; } + public Builder primaryModeSupplier(BooleanSupplier primaryModeSupplier) { + this.primaryModeSupplier = primaryModeSupplier; + return this; + } + public Builder translogFactory(TranslogFactory translogFactory) { this.translogFactory = translogFactory; return this; diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java index 71d3e6d0f71a0..f5a5d50e11220 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java @@ -38,6 +38,7 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.function.BooleanSupplier; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -149,6 +150,7 @@ public EngineConfig newEngineConfig( LongSupplier primaryTermSupplier, EngineConfig.TombstoneDocSupplier tombstoneDocSupplier, boolean isReadOnlyReplica, + BooleanSupplier primaryModeSupplier, TranslogFactory translogFactory ) { CodecService codecServiceToUse = codecService; @@ -180,6 +182,7 @@ public EngineConfig newEngineConfig( .primaryTermSupplier(primaryTermSupplier) .tombstoneDocSupplier(tombstoneDocSupplier) .readOnlyReplica(isReadOnlyReplica) + .primaryModeSupplier(primaryModeSupplier) .translogFactory(translogFactory) .build(); } diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index d25e7679332f3..2a89c3a931fcb 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -286,11 +286,12 @@ public void onFailure(String reason, Exception ex) { translogDeletionPolicy, shardId, readLock, - () -> getLocalCheckpointTracker(), + this::getLocalCheckpointTracker, translogUUID, new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId), - this::ensureOpen, - engineConfig.getTranslogFactory() + this, + engineConfig.getTranslogFactory(), + engineConfig.getPrimaryModeSupplier() ); this.translogManager = translogManagerRef; this.softDeletesPolicy = newSoftDeletesPolicy(); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 41abbce91c48c..1d28921fdf2a6 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -107,7 +107,8 @@ public void onAfterTranslogSync() { } }, this, - engineConfig.getTranslogFactory() + engineConfig.getTranslogFactory(), + engineConfig.getPrimaryModeSupplier() ); this.translogManager = translogManagerRef; } catch (IOException e) { diff --git a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java index b796db8d4ca44..2b126e627bd3d 100644 --- a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java @@ -202,7 +202,8 @@ public void trimUnreferencedTranslogFiles() throws TranslogException { translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), - seqNo -> {} + seqNo -> {}, + engineConfig.getPrimaryModeSupplier() ) ) { translog.trimUnreferencedReaders(); diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index dceb26bc33aa7..cda21f5b05569 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -259,7 +259,8 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm translogDeletionPolicy, config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier(), - seqNo -> {} + seqNo -> {}, + config.getPrimaryModeSupplier() ) ) { return translog.stats(); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 4be11badd0879..c347c66ebb603 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3421,6 +3421,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro () -> getOperationPrimaryTerm(), tombstoneDocSupplier(), isReadOnlyReplica, + replicationTracker::isPrimaryMode, translogFactorySupplier.apply(indexSettings, shardRouting) ); } diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java index a363992203721..d7be1250c0b5b 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java @@ -9,6 +9,7 @@ package org.opensearch.index.translog; import java.io.IOException; +import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; @@ -26,7 +27,8 @@ public Translog newTranslog( TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier, - LongConsumer persistedSequenceNumberConsumer + LongConsumer persistedSequenceNumberConsumer, + BooleanSupplier primaryModeSupplier ) throws IOException { return new LocalTranslog( diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 80966fbd5bd96..679b9d7f2f88b 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -22,6 +22,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -55,7 +56,8 @@ public InternalTranslogManager( String translogUUID, TranslogEventListener translogEventListener, LifecycleAware engineLifeCycleAware, - TranslogFactory translogFactory + TranslogFactory translogFactory, + BooleanSupplier primaryModeSupplier ) throws IOException { this.shardId = shardId; this.readLock = readLock; @@ -68,7 +70,7 @@ public InternalTranslogManager( if (tracker != null) { tracker.markSeqNoAsPersisted(seqNo); } - }, translogUUID, translogFactory); + }, translogUUID, translogFactory, primaryModeSupplier); assert translog.getGeneration() != null; this.translog = translog; assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it"; @@ -345,16 +347,17 @@ protected Translog openTranslog( LongSupplier globalCheckpointSupplier, LongConsumer persistedSequenceNumberConsumer, String translogUUID, - TranslogFactory translogFactory + TranslogFactory translogFactory, + BooleanSupplier primaryModeSupplier ) throws IOException { - return translogFactory.newTranslog( translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier, primaryTermSupplier, - persistedSequenceNumberConsumer + persistedSequenceNumberConsumer, + primaryModeSupplier ); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java index 0d9e01aef4891..272e560991386 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java @@ -16,6 +16,7 @@ import java.io.IOException; import java.util.concurrent.ExecutorService; +import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -53,7 +54,8 @@ public Translog newTranslog( TranslogDeletionPolicy deletionPolicy, LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier, - LongConsumer persistedSequenceNumberConsumer + LongConsumer persistedSequenceNumberConsumer, + BooleanSupplier primaryModeSupplier ) throws IOException { assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; @@ -66,7 +68,8 @@ public Translog newTranslog( primaryTermSupplier, persistedSequenceNumberConsumer, blobStoreRepository, - executorService + executorService, + primaryModeSupplier ); } } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index e7d6b509b1c4c..618a84656f128 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; +import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; @@ -45,6 +46,7 @@ public class RemoteFsTranslog extends Translog { private final BlobStoreRepository blobStoreRepository; private final TranslogTransferManager translogTransferManager; private final FileTransferTracker fileTransferTracker; + private final BooleanSupplier primaryModeSupplier; private volatile long maxRemoteTranslogGenerationUploaded; private volatile long minSeqNoToKeep; @@ -57,10 +59,12 @@ public RemoteFsTranslog( LongSupplier primaryTermSupplier, LongConsumer persistedSequenceNumberConsumer, BlobStoreRepository blobStoreRepository, - ExecutorService executorService + ExecutorService executorService, + BooleanSupplier primaryModeSupplier ) throws IOException { super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer); this.blobStoreRepository = blobStoreRepository; + this.primaryModeSupplier = primaryModeSupplier; fileTransferTracker = new FileTransferTracker(shardId); this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, executorService, shardId, fileTransferTracker); @@ -198,7 +202,13 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc } private boolean upload(Long primaryTerm, Long generation) throws IOException { - logger.trace("uploading translog for {} {} ", primaryTerm, generation); + boolean primaryMode = primaryModeSupplier.getAsBoolean(); + if (primaryMode) { + logger.trace("skipped uploading translog for {} {}", primaryTerm, generation); + // NO-OP + return true; + } + logger.trace("uploading translog for {} {}", primaryTerm, generation); try ( TranslogCheckpointTransferSnapshot transferSnapshotProvider = new TranslogCheckpointTransferSnapshot.Builder( primaryTerm, diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java index 5500bda99808d..ab8e2b7752e66 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java @@ -9,6 +9,7 @@ package org.opensearch.index.translog; import java.io.IOException; +import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; @@ -27,6 +28,7 @@ Translog newTranslog( final TranslogDeletionPolicy deletionPolicy, final LongSupplier globalCheckpointSupplier, final LongSupplier primaryTermSupplier, - final LongConsumer persistedSequenceNumberConsumer + final LongConsumer persistedSequenceNumberConsumer, + final BooleanSupplier primaryModeSupplier ) throws IOException; } diff --git a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java index 60abdcb0dcb57..0f7340a6a5a8b 100644 --- a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java @@ -15,6 +15,7 @@ import org.opensearch.index.translog.listener.TranslogEventListener; import java.io.IOException; +import java.util.function.BooleanSupplier; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -36,7 +37,8 @@ public WriteOnlyTranslogManager( String translogUUID, TranslogEventListener translogEventListener, LifecycleAware engineLifecycleAware, - TranslogFactory translogFactory + TranslogFactory translogFactory, + BooleanSupplier primaryModeSupplier ) throws IOException { super( translogConfig, @@ -49,7 +51,8 @@ public WriteOnlyTranslogManager( translogUUID, translogEventListener, engineLifecycleAware, - translogFactory + translogFactory, + primaryModeSupplier ); } diff --git a/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java b/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java index 269d89352fb18..2db3cd24da80d 100644 --- a/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java @@ -68,6 +68,7 @@ public void testCreateEngineConfigFromFactory() { null, null, false, + () -> Boolean.TRUE, new InternalTranslogFactory() ); @@ -146,6 +147,7 @@ public void testCreateCodecServiceFromFactory() { null, null, false, + () -> Boolean.TRUE, new InternalTranslogFactory() ); assertNotNull(config.getCodec()); diff --git a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java index 234abfba66622..4b2cd23a677a0 100644 --- a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java @@ -48,7 +48,8 @@ public void testRecoveryFromTranslog() throws IOException { translogUUID, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, () -> {}, - new InternalTranslogFactory() + new InternalTranslogFactory(), + () -> Boolean.TRUE ); final int docs = randomIntBetween(1, 100); for (int i = 0; i < docs; i++) { @@ -87,7 +88,8 @@ public void onBeginTranslogRecovery() { } }, () -> {}, - new InternalTranslogFactory() + new InternalTranslogFactory(), + () -> Boolean.TRUE ); AtomicInteger opsRecovered = new AtomicInteger(); int opsRecoveredFromTranslog = translogManager.recoverFromTranslog((snapshot) -> { @@ -125,7 +127,8 @@ public void testTranslogRollsGeneration() throws IOException { translogUUID, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, () -> {}, - new InternalTranslogFactory() + new InternalTranslogFactory(), + () -> Boolean.TRUE ); final int docs = randomIntBetween(1, 100); for (int i = 0; i < docs; i++) { @@ -154,7 +157,8 @@ public void testTranslogRollsGeneration() throws IOException { translogUUID, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, () -> {}, - new InternalTranslogFactory() + new InternalTranslogFactory(), + () -> Boolean.TRUE ); AtomicInteger opsRecovered = new AtomicInteger(); int opsRecoveredFromTranslog = translogManager.recoverFromTranslog((snapshot) -> { @@ -188,7 +192,8 @@ public void testTrimOperationsFromTranslog() throws IOException { translogUUID, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, () -> {}, - new InternalTranslogFactory() + new InternalTranslogFactory(), + () -> Boolean.TRUE ); final int docs = randomIntBetween(1, 100); for (int i = 0; i < docs; i++) { @@ -219,7 +224,8 @@ public void testTrimOperationsFromTranslog() throws IOException { translogUUID, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER, () -> {}, - new InternalTranslogFactory() + new InternalTranslogFactory(), + () -> Boolean.TRUE ); AtomicInteger opsRecovered = new AtomicInteger(); int opsRecoveredFromTranslog = translogManager.recoverFromTranslog((snapshot) -> { @@ -267,7 +273,8 @@ public void onAfterTranslogSync() { } }, () -> {}, - new InternalTranslogFactory() + new InternalTranslogFactory(), + () -> Boolean.TRUE ); translogManagerAtomicReference.set(translogManager); Engine.Index index = indexForDoc(doc); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index 79887a4fe92c2..4437012f15430 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -170,7 +170,8 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin primaryTerm::get, getPersistedSeqNoConsumer(), repository, - threadPool.executor(ThreadPool.Names.TRANSLOG_TRANSFER) + threadPool.executor(ThreadPool.Names.TRANSLOG_TRANSFER), + () -> Boolean.TRUE ); } @@ -1120,7 +1121,8 @@ public int write(ByteBuffer src) throws IOException { primaryTerm::get, persistedSeqNos::add, repository, - threadPool.executor(ThreadPool.Names.TRANSLOG_TRANSFER) + threadPool.executor(ThreadPool.Names.TRANSLOG_TRANSFER), + () -> Boolean.TRUE ) { @Override ChannelFactory getChannelFactory() {