Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Handle translog upload during primary relocation for remote-backed indexes #6062

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@

import java.util.List;
import java.util.Objects;
import java.util.function.BooleanSupplier;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand Down Expand Up @@ -100,6 +101,7 @@ public final class EngineConfig {
private final LongSupplier globalCheckpointSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private final boolean isReadOnlyReplica;
private final BooleanSupplier primaryModeSupplier;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -200,6 +202,7 @@ private EngineConfig(Builder builder) {
this.primaryTermSupplier = builder.primaryTermSupplier;
this.tombstoneDocSupplier = builder.tombstoneDocSupplier;
this.isReadOnlyReplica = builder.isReadOnlyReplica;
this.primaryModeSupplier = builder.primaryModeSupplier;
this.translogFactory = builder.translogFactory;
}

Expand Down Expand Up @@ -405,6 +408,14 @@ public boolean isReadOnlyReplica() {
return indexSettings.isSegRepEnabled() && isReadOnlyReplica;
}

/**
* Returns the underlying primaryModeSupplier.
* @return the primary mode supplier.
*/
public BooleanSupplier getPrimaryModeSupplier() {
return primaryModeSupplier;
}

/**
* Returns the underlying translog factory
* @return the translog factory
Expand Down Expand Up @@ -470,6 +481,7 @@ public static class Builder {
private TombstoneDocSupplier tombstoneDocSupplier;
private TranslogDeletionPolicyFactory translogDeletionPolicyFactory;
private boolean isReadOnlyReplica;
private BooleanSupplier primaryModeSupplier;
private TranslogFactory translogFactory = new InternalTranslogFactory();

public Builder shardId(ShardId shardId) {
Expand Down Expand Up @@ -592,6 +604,11 @@ public Builder readOnlyReplica(boolean isReadOnlyReplica) {
return this;
}

public Builder primaryModeSupplier(BooleanSupplier primaryModeSupplier) {
this.primaryModeSupplier = primaryModeSupplier;
return this;
}

public Builder translogFactory(TranslogFactory translogFactory) {
this.translogFactory = translogFactory;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.BooleanSupplier;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand Down Expand Up @@ -149,6 +150,7 @@ public EngineConfig newEngineConfig(
LongSupplier primaryTermSupplier,
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica,
BooleanSupplier primaryModeSupplier,
TranslogFactory translogFactory
) {
CodecService codecServiceToUse = codecService;
Expand Down Expand Up @@ -180,6 +182,7 @@ public EngineConfig newEngineConfig(
.primaryTermSupplier(primaryTermSupplier)
.tombstoneDocSupplier(tombstoneDocSupplier)
.readOnlyReplica(isReadOnlyReplica)
.primaryModeSupplier(primaryModeSupplier)
.translogFactory(translogFactory)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,12 @@ public void onFailure(String reason, Exception ex) {
translogDeletionPolicy,
shardId,
readLock,
() -> getLocalCheckpointTracker(),
this::getLocalCheckpointTracker,
translogUUID,
new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId),
this::ensureOpen,
engineConfig.getTranslogFactory()
engineConfig.getTranslogFactory(),
engineConfig.getPrimaryModeSupplier()
);
this.translogManager = translogManagerRef;
this.softDeletesPolicy = newSoftDeletesPolicy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public void onAfterTranslogSync() {
}
},
this,
engineConfig.getTranslogFactory()
engineConfig.getTranslogFactory(),
engineConfig.getPrimaryModeSupplier()
);
this.translogManager = translogManagerRef;
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ public void trimUnreferencedTranslogFiles() throws TranslogException {
translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(),
engineConfig.getPrimaryTermSupplier(),
seqNo -> {}
seqNo -> {},
engineConfig.getPrimaryModeSupplier()
)
) {
translog.trimUnreferencedReaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
translogDeletionPolicy,
config.getGlobalCheckpointSupplier(),
config.getPrimaryTermSupplier(),
seqNo -> {}
seqNo -> {},
config.getPrimaryModeSupplier()
)
) {
return translog.stats();
Expand Down
26 changes: 23 additions & 3 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@
import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX;
import static org.opensearch.index.translog.Translog.Durability;

/**
* An OpenSearch index shard
Expand Down Expand Up @@ -770,6 +771,14 @@ public void relocated(
try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
forceRefreshes.close();

boolean syncTranslog = isRemoteTranslogEnabled() && Durability.ASYNC == indexSettings.getTranslogDurability();
// Since all the index permits are acquired at this point, the translog buffer will not change.
// It is safe to perform sync of translogs now as this will ensure for remote-backed indexes, the
// translogs has been uploaded to the remote store.
if (syncTranslog) {
maybeSync();
}
// no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
: "in-flight operations in progress while moving shard state to relocated";
Expand Down Expand Up @@ -806,6 +815,16 @@ public void relocated(
}
}

private void maybeSync() {
try {
if (isSyncNeeded()) {
sync();
}
} catch (IOException e) {
logger.warn("failed to sync translog", e);
}
}

private void verifyRelocatingState() {
if (state != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, state);
Expand Down Expand Up @@ -2932,7 +2951,7 @@ public void maybeSyncGlobalCheckpoint(final String reason) {
assert assertPrimaryMode();
// only sync if there are no operations in flight, or when using async durability
final SeqNoStats stats = getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
final boolean asyncDurability = indexSettings().getTranslogDurability() == Translog.Durability.ASYNC;
final boolean asyncDurability = indexSettings().getTranslogDurability() == Durability.ASYNC;
if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint() || asyncDurability) {
final ObjectLongMap<String> globalCheckpoints = getInSyncGlobalCheckpoints();
final long globalCheckpoint = replicationTracker.getGlobalCheckpoint();
Expand Down Expand Up @@ -3025,7 +3044,7 @@ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext p
+ routingEntry()
+ "]";
assert getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint()
|| indexSettings().getTranslogDurability() == Translog.Durability.ASYNC : "local checkpoint ["
|| indexSettings().getTranslogDurability() == Durability.ASYNC : "local checkpoint ["
+ getLocalCheckpoint()
+ "] does not match checkpoint from primary context ["
+ primaryContext
Expand Down Expand Up @@ -3445,6 +3464,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
() -> getOperationPrimaryTerm(),
tombstoneDocSupplier(),
isReadOnlyReplica,
replicationTracker::isPrimaryMode,
translogFactorySupplier.apply(indexSettings, shardRouting)
);
}
Expand Down Expand Up @@ -3850,7 +3870,7 @@ public boolean isSyncNeeded() {
/**
* Returns the current translog durability mode
*/
public Translog.Durability getTranslogDurability() {
public Durability getTranslogDurability() {
return indexSettings.getTranslogDurability();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void beforeRefresh() throws IOException {
public void afterRefresh(boolean didRefresh) {
synchronized (this) {
try {
if (indexShard.shardRouting.primary()) {
if (indexShard.getReplicationTracker().isPrimaryMode()) {
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
this.primaryTerm = indexShard.getOperationPrimaryTerm();
this.remoteDirectory.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.translog;

import java.io.IOException;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;

Expand All @@ -26,7 +27,8 @@ public Translog newTranslog(
TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier,
LongSupplier primaryTermSupplier,
LongConsumer persistedSequenceNumberConsumer
LongConsumer persistedSequenceNumberConsumer,
BooleanSupplier primaryModeSupplier
) throws IOException {

return new LocalTranslog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -59,7 +60,8 @@ public InternalTranslogManager(
String translogUUID,
TranslogEventListener translogEventListener,
LifecycleAware engineLifeCycleAware,
TranslogFactory translogFactory
TranslogFactory translogFactory,
BooleanSupplier primaryModeSupplier
) throws IOException {
this.shardId = shardId;
this.readLock = readLock;
Expand All @@ -72,7 +74,7 @@ public InternalTranslogManager(
if (tracker != null) {
tracker.markSeqNoAsPersisted(seqNo);
}
}, translogUUID, translogFactory);
}, translogUUID, translogFactory, primaryModeSupplier);
assert translog.getGeneration() != null;
this.translog = translog;
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
Expand Down Expand Up @@ -344,16 +346,17 @@ protected Translog openTranslog(
LongSupplier globalCheckpointSupplier,
LongConsumer persistedSequenceNumberConsumer,
String translogUUID,
TranslogFactory translogFactory
TranslogFactory translogFactory,
BooleanSupplier primaryModeSupplier
) throws IOException {

return translogFactory.newTranslog(
translogConfig,
translogUUID,
translogDeletionPolicy,
globalCheckpointSupplier,
primaryTermSupplier,
persistedSequenceNumberConsumer
persistedSequenceNumberConsumer,
primaryModeSupplier
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -53,7 +54,8 @@ public Translog newTranslog(
TranslogDeletionPolicy deletionPolicy,
LongSupplier globalCheckpointSupplier,
LongSupplier primaryTermSupplier,
LongConsumer persistedSequenceNumberConsumer
LongConsumer persistedSequenceNumberConsumer,
BooleanSupplier primaryModeSupplier
) throws IOException {

assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
Expand All @@ -66,7 +68,8 @@ public Translog newTranslog(
primaryTermSupplier,
persistedSequenceNumberConsumer,
blobStoreRepository,
executorService
executorService,
primaryModeSupplier
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;

Expand All @@ -45,6 +46,7 @@ public class RemoteFsTranslog extends Translog {
private final BlobStoreRepository blobStoreRepository;
private final TranslogTransferManager translogTransferManager;
private final FileTransferTracker fileTransferTracker;
private final BooleanSupplier primaryModeSupplier;
private volatile long maxRemoteTranslogGenerationUploaded;

private volatile long minSeqNoToKeep;
Expand All @@ -57,10 +59,12 @@ public RemoteFsTranslog(
LongSupplier primaryTermSupplier,
LongConsumer persistedSequenceNumberConsumer,
BlobStoreRepository blobStoreRepository,
ExecutorService executorService
ExecutorService executorService,
BooleanSupplier primaryModeSupplier
) throws IOException {
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
this.blobStoreRepository = blobStoreRepository;
this.primaryModeSupplier = primaryModeSupplier;
fileTransferTracker = new FileTransferTracker(shardId);
this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, executorService, shardId, fileTransferTracker);

Expand Down Expand Up @@ -198,7 +202,17 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc
}

private boolean upload(Long primaryTerm, Long generation) throws IOException {
logger.trace("uploading translog for {} {} ", primaryTerm, generation);
// During primary relocation (primary-primary peer recovery), both the old and the new primary have engine
// created with the RemoteFsTranslog. Both primaries are equipped to upload the translogs. The primary mode check
// below ensures that the real primary only is uploading. Before the primary mode is set as true for the new
// primary, the engine is reset to InternalEngine which also initialises the RemoteFsTranslog which in turns
// downloads all the translogs from remote store and does a flush before the relocation finishes.
if (primaryModeSupplier.getAsBoolean() == false) {
logger.trace("skipped uploading translog for {} {}", primaryTerm, generation);
// NO-OP
return true;
}
logger.trace("uploading translog for {} {}", primaryTerm, generation);
try (
TranslogCheckpointTransferSnapshot transferSnapshotProvider = new TranslogCheckpointTransferSnapshot.Builder(
primaryTerm,
Expand Down Expand Up @@ -262,7 +276,7 @@ public void sync() throws IOException {
}

/**
* Returns <code>true</code> if an fsync and/or remote transfer is required to ensure durability of the translogs operations or it's metadata.
* Returns <code>true</code> if an fsync and/or remote transfer is required to ensure durability of the translogs operations or it's metadata.
*/
public boolean syncNeeded() {
try (ReleasableLock lock = readLock.acquire()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.translog;

import java.io.IOException;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;

Expand All @@ -27,6 +28,7 @@ Translog newTranslog(
final TranslogDeletionPolicy deletionPolicy,
final LongSupplier globalCheckpointSupplier,
final LongSupplier primaryTermSupplier,
final LongConsumer persistedSequenceNumberConsumer
final LongConsumer persistedSequenceNumberConsumer,
final BooleanSupplier primaryModeSupplier
) throws IOException;
}
Loading