Skip to content

Commit

Permalink
[Remote Translog] Handle translog upload during primary relocation fo…
Browse files Browse the repository at this point in the history
…r remote-backed indexes (opensearch-project#5804)

* Upload translog only if primaryMode is true

Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Jan 30, 2023
1 parent 579f1a6 commit 83926d0
Show file tree
Hide file tree
Showing 18 changed files with 176 additions and 32 deletions.
17 changes: 17 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/EngineConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@

import java.util.List;
import java.util.Objects;
import java.util.function.BooleanSupplier;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand Down Expand Up @@ -100,6 +101,7 @@ public final class EngineConfig {
private final LongSupplier globalCheckpointSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private final boolean isReadOnlyReplica;
private final BooleanSupplier primaryModeSupplier;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -200,6 +202,7 @@ private EngineConfig(Builder builder) {
this.primaryTermSupplier = builder.primaryTermSupplier;
this.tombstoneDocSupplier = builder.tombstoneDocSupplier;
this.isReadOnlyReplica = builder.isReadOnlyReplica;
this.primaryModeSupplier = builder.primaryModeSupplier;
this.translogFactory = builder.translogFactory;
}

Expand Down Expand Up @@ -405,6 +408,14 @@ public boolean isReadOnlyReplica() {
return indexSettings.isSegRepEnabled() && isReadOnlyReplica;
}

/**
* Returns the underlying primaryModeSupplier.
* @return the primary mode supplier.
*/
public BooleanSupplier getPrimaryModeSupplier() {
return primaryModeSupplier;
}

/**
* Returns the underlying translog factory
* @return the translog factory
Expand Down Expand Up @@ -470,6 +481,7 @@ public static class Builder {
private TombstoneDocSupplier tombstoneDocSupplier;
private TranslogDeletionPolicyFactory translogDeletionPolicyFactory;
private boolean isReadOnlyReplica;
private BooleanSupplier primaryModeSupplier;
private TranslogFactory translogFactory = new InternalTranslogFactory();

public Builder shardId(ShardId shardId) {
Expand Down Expand Up @@ -592,6 +604,11 @@ public Builder readOnlyReplica(boolean isReadOnlyReplica) {
return this;
}

public Builder primaryModeSupplier(BooleanSupplier primaryModeSupplier) {
this.primaryModeSupplier = primaryModeSupplier;
return this;
}

public Builder translogFactory(TranslogFactory translogFactory) {
this.translogFactory = translogFactory;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.BooleanSupplier;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand Down Expand Up @@ -149,6 +150,7 @@ public EngineConfig newEngineConfig(
LongSupplier primaryTermSupplier,
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica,
BooleanSupplier primaryModeSupplier,
TranslogFactory translogFactory
) {
CodecService codecServiceToUse = codecService;
Expand Down Expand Up @@ -180,6 +182,7 @@ public EngineConfig newEngineConfig(
.primaryTermSupplier(primaryTermSupplier)
.tombstoneDocSupplier(tombstoneDocSupplier)
.readOnlyReplica(isReadOnlyReplica)
.primaryModeSupplier(primaryModeSupplier)
.translogFactory(translogFactory)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,12 @@ public void onFailure(String reason, Exception ex) {
translogDeletionPolicy,
shardId,
readLock,
() -> getLocalCheckpointTracker(),
this::getLocalCheckpointTracker,
translogUUID,
new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId),
this::ensureOpen,
engineConfig.getTranslogFactory()
engineConfig.getTranslogFactory(),
engineConfig.getPrimaryModeSupplier()
);
this.translogManager = translogManagerRef;
this.softDeletesPolicy = newSoftDeletesPolicy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public void onAfterTranslogSync() {
}
},
this,
engineConfig.getTranslogFactory()
engineConfig.getTranslogFactory(),
engineConfig.getPrimaryModeSupplier()
);
this.translogManager = translogManagerRef;
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ public void trimUnreferencedTranslogFiles() throws TranslogException {
translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(),
engineConfig.getPrimaryTermSupplier(),
seqNo -> {}
seqNo -> {},
engineConfig.getPrimaryModeSupplier()
)
) {
translog.trimUnreferencedReaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
translogDeletionPolicy,
config.getGlobalCheckpointSupplier(),
config.getPrimaryTermSupplier(),
seqNo -> {}
seqNo -> {},
config.getPrimaryModeSupplier()
)
) {
return translog.stats();
Expand Down
26 changes: 23 additions & 3 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@
import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX;
import static org.opensearch.index.translog.Translog.Durability;

/**
* An OpenSearch index shard
Expand Down Expand Up @@ -770,6 +771,14 @@ public void relocated(
try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
forceRefreshes.close();

boolean syncTranslog = isRemoteTranslogEnabled() && Durability.ASYNC == indexSettings.getTranslogDurability();
// Since all the index permits are acquired at this point, the translog buffer will not change.
// It is safe to perform sync of translogs now as this will ensure for remote-backed indexes, the
// translogs has been uploaded to the remote store.
if (syncTranslog) {
maybeSync();
}
// no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
: "in-flight operations in progress while moving shard state to relocated";
Expand Down Expand Up @@ -806,6 +815,16 @@ public void relocated(
}
}

private void maybeSync() {
try {
if (isSyncNeeded()) {
sync();
}
} catch (IOException e) {
logger.warn("failed to sync translog", e);
}
}

private void verifyRelocatingState() {
if (state != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, state);
Expand Down Expand Up @@ -2932,7 +2951,7 @@ public void maybeSyncGlobalCheckpoint(final String reason) {
assert assertPrimaryMode();
// only sync if there are no operations in flight, or when using async durability
final SeqNoStats stats = getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
final boolean asyncDurability = indexSettings().getTranslogDurability() == Translog.Durability.ASYNC;
final boolean asyncDurability = indexSettings().getTranslogDurability() == Durability.ASYNC;
if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint() || asyncDurability) {
final ObjectLongMap<String> globalCheckpoints = getInSyncGlobalCheckpoints();
final long globalCheckpoint = replicationTracker.getGlobalCheckpoint();
Expand Down Expand Up @@ -3025,7 +3044,7 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p
+ routingEntry()
+ "]";
assert getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint()
|| indexSettings().getTranslogDurability() == Translog.Durability.ASYNC : "local checkpoint ["
|| indexSettings().getTranslogDurability() == Durability.ASYNC : "local checkpoint ["
+ getLocalCheckpoint()
+ "] does not match checkpoint from primary context ["
+ primaryContext
Expand Down Expand Up @@ -3445,6 +3464,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
() -> getOperationPrimaryTerm(),
tombstoneDocSupplier(),
isReadOnlyReplica,
replicationTracker::isPrimaryMode,
translogFactorySupplier.apply(indexSettings, shardRouting)
);
}
Expand Down Expand Up @@ -3850,7 +3870,7 @@ public boolean isSyncNeeded() {
/**
* Returns the current translog durability mode
*/
public Translog.Durability getTranslogDurability() {
public Durability getTranslogDurability() {
return indexSettings.getTranslogDurability();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void beforeRefresh() throws IOException {
public void afterRefresh(boolean didRefresh) {
synchronized (this) {
try {
if (indexShard.shardRouting.primary()) {
if (indexShard.getReplicationTracker().isPrimaryMode()) {
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
this.primaryTerm = indexShard.getOperationPrimaryTerm();
this.remoteDirectory.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.translog;

import java.io.IOException;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;

Expand All @@ -26,7 +27,8 @@ public Translog newTranslog(
TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier,
LongSupplier primaryTermSupplier,
LongConsumer persistedSequenceNumberConsumer
LongConsumer persistedSequenceNumberConsumer,
BooleanSupplier primaryModeSupplier
) throws IOException {

return new LocalTranslog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -59,7 +60,8 @@ public InternalTranslogManager(
String translogUUID,
TranslogEventListener translogEventListener,
LifecycleAware engineLifeCycleAware,
TranslogFactory translogFactory
TranslogFactory translogFactory,
BooleanSupplier primaryModeSupplier
) throws IOException {
this.shardId = shardId;
this.readLock = readLock;
Expand All @@ -72,7 +74,7 @@ public InternalTranslogManager(
if (tracker != null) {
tracker.markSeqNoAsPersisted(seqNo);
}
}, translogUUID, translogFactory);
}, translogUUID, translogFactory, primaryModeSupplier);
assert translog.getGeneration() != null;
this.translog = translog;
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
Expand Down Expand Up @@ -344,16 +346,17 @@ protected Translog openTranslog(
LongSupplier globalCheckpointSupplier,
LongConsumer persistedSequenceNumberConsumer,
String translogUUID,
TranslogFactory translogFactory
TranslogFactory translogFactory,
BooleanSupplier primaryModeSupplier
) throws IOException {

return translogFactory.newTranslog(
translogConfig,
translogUUID,
translogDeletionPolicy,
globalCheckpointSupplier,
primaryTermSupplier,
persistedSequenceNumberConsumer
persistedSequenceNumberConsumer,
primaryModeSupplier
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -53,7 +54,8 @@ public Translog newTranslog(
TranslogDeletionPolicy deletionPolicy,
LongSupplier globalCheckpointSupplier,
LongSupplier primaryTermSupplier,
LongConsumer persistedSequenceNumberConsumer
LongConsumer persistedSequenceNumberConsumer,
BooleanSupplier primaryModeSupplier
) throws IOException {

assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
Expand All @@ -66,7 +68,8 @@ public Translog newTranslog(
primaryTermSupplier,
persistedSequenceNumberConsumer,
blobStoreRepository,
executorService
executorService,
primaryModeSupplier
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;

Expand All @@ -45,6 +46,7 @@ public class RemoteFsTranslog extends Translog {
private final BlobStoreRepository blobStoreRepository;
private final TranslogTransferManager translogTransferManager;
private final FileTransferTracker fileTransferTracker;
private final BooleanSupplier primaryModeSupplier;
private volatile long maxRemoteTranslogGenerationUploaded;

private volatile long minSeqNoToKeep;
Expand All @@ -57,10 +59,12 @@ public RemoteFsTranslog(
LongSupplier primaryTermSupplier,
LongConsumer persistedSequenceNumberConsumer,
BlobStoreRepository blobStoreRepository,
ExecutorService executorService
ExecutorService executorService,
BooleanSupplier primaryModeSupplier
) throws IOException {
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
this.blobStoreRepository = blobStoreRepository;
this.primaryModeSupplier = primaryModeSupplier;
fileTransferTracker = new FileTransferTracker(shardId);
this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, executorService, shardId, fileTransferTracker);

Expand Down Expand Up @@ -198,7 +202,17 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc
}

private boolean upload(Long primaryTerm, Long generation) throws IOException {
logger.trace("uploading translog for {} {} ", primaryTerm, generation);
// During primary relocation (primary-primary peer recovery), both the old and the new primary have engine
// created with the RemoteFsTranslog. Both primaries are equipped to upload the translogs. The primary mode check
// below ensures that the real primary only is uploading. Before the primary mode is set as true for the new
// primary, the engine is reset to InternalEngine which also initialises the RemoteFsTranslog which in turns
// downloads all the translogs from remote store and does a flush before the relocation finishes.
if (primaryModeSupplier.getAsBoolean() == false) {
logger.trace("skipped uploading translog for {} {}", primaryTerm, generation);
// NO-OP
return true;
}
logger.trace("uploading translog for {} {}", primaryTerm, generation);
try (
TranslogCheckpointTransferSnapshot transferSnapshotProvider = new TranslogCheckpointTransferSnapshot.Builder(
primaryTerm,
Expand Down Expand Up @@ -262,7 +276,7 @@ public void sync() throws IOException {
}

/**
* Returns <code>true</code> if an fsync and/or remote transfer is required to ensure durability of the translogs operations or it's metadata.
* Returns <code>true</code> if an fsync and/or remote transfer is required to ensure durability of the translogs operations or it's metadata.
*/
public boolean syncNeeded() {
try (ReleasableLock lock = readLock.acquire()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.translog;

import java.io.IOException;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;

Expand All @@ -27,6 +28,7 @@ Translog newTranslog(
final TranslogDeletionPolicy deletionPolicy,
final LongSupplier globalCheckpointSupplier,
final LongSupplier primaryTermSupplier,
final LongConsumer persistedSequenceNumberConsumer
final LongConsumer persistedSequenceNumberConsumer,
final BooleanSupplier primaryModeSupplier
) throws IOException;
}
Loading

0 comments on commit 83926d0

Please sign in to comment.