Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handled recovery to start from lowest seqno in the translog based on retention lease #1217

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING,
IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING,
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING,
Expand Down
49 changes: 46 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,13 @@ public final class IndexSettings {
settings -> Boolean.toString(IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(settings).onOrAfter(LegacyESVersion.V_7_0_0)),
Property.IndexScope, Property.Final);

/**
* Specifies if the index translog should prune based on retention leases.
*/
public static final Setting<Boolean> INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING =
Setting.boolSetting("index.translog.retention_lease.pruning.enabled", false,
Property.IndexScope, Property.Dynamic);

/**
* Controls how many soft-deleted documents will be kept around before being merged away. Keeping more deleted
* documents increases the chance of operation-based recoveries and allows querying a longer history of documents.
Expand All @@ -286,9 +293,11 @@ public final class IndexSettings {
* the chance of ops based recoveries for indices with soft-deletes disabled.
* This setting will be ignored if soft-deletes is used in peer recoveries (default in 7.4).
**/
private static final ByteSizeValue DEFAULT_TRANSLOG_RETENTION_SIZE = new ByteSizeValue(512, ByteSizeUnit.MB);

public static final Setting<ByteSizeValue> INDEX_TRANSLOG_RETENTION_SIZE_SETTING =
Setting.byteSizeSetting("index.translog.retention.size",
settings -> shouldDisableTranslogRetention(settings) ? "-1" : "512MB",
settings -> DEFAULT_TRANSLOG_RETENTION_SIZE.getStringRep(),
Property.Dynamic, Property.IndexScope);

/**
Expand Down Expand Up @@ -389,6 +398,7 @@ public final class IndexSettings {
private final IndexScopedSettings scopedSettings;
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
private final boolean softDeleteEnabled;
private volatile boolean translogPruningByRetentionLease;
private volatile long softDeleteRetentionOperations;

private volatile long retentionLeaseMillis;
Expand Down Expand Up @@ -525,6 +535,9 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
mergeSchedulerConfig = new MergeSchedulerConfig(this);
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
softDeleteEnabled = version.onOrAfter(LegacyESVersion.V_6_5_0) && scopedSettings.get(INDEX_SOFT_DELETES_SETTING);
translogPruningByRetentionLease = version.onOrAfter(Version.V_1_1_0) &&
scopedSettings.get(INDEX_SOFT_DELETES_SETTING) &&
scopedSettings.get(INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING);
softDeleteRetentionOperations = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING);
retentionLeaseMillis = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING).millis();
warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING);
Expand Down Expand Up @@ -593,6 +606,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
this::setGenerationThresholdSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_AGE_SETTING, this::setTranslogRetentionAge);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_SIZE_SETTING, this::setTranslogRetentionSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING,
this::setTranslogPruningByRetentionLease);
scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval);
scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners);
scopedSettings.addSettingsUpdateConsumer(MAX_ANALYZED_OFFSET_SETTING, this::setHighlightMaxAnalyzedOffset);
Expand Down Expand Up @@ -623,8 +638,17 @@ private void setFlushAfterMergeThresholdSize(ByteSizeValue byteSizeValue) {
this.flushAfterMergeThresholdSize = byteSizeValue;
}

private void setTranslogPruningByRetentionLease(boolean enabled) {
this.translogPruningByRetentionLease = this.softDeleteEnabled && enabled;
if(translogPruningByRetentionLease) {
setTranslogRetentionSize(DEFAULT_TRANSLOG_RETENTION_SIZE);
}
}

private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) {
if (shouldDisableTranslogRetention(settings) && byteSizeValue.getBytes() >= 0) {
if (shouldDisableTranslogRetention(settings) &&
!shouldPruneTranslogByRetentionLease(settings) &&
byteSizeValue.getBytes() >= 0) {
// ignore the translog retention settings if soft-deletes enabled
this.translogRetentionSize = new ByteSizeValue(-1);
} else {
Expand Down Expand Up @@ -826,7 +850,12 @@ public TimeValue getRefreshInterval() {
* Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries
*/
public ByteSizeValue getTranslogRetentionSize() {
assert shouldDisableTranslogRetention(settings) == false || translogRetentionSize.getBytes() == -1L : translogRetentionSize;
if(shouldDisableTranslogRetention(settings) && !shouldPruneTranslogByRetentionLease(settings)) {
return new ByteSizeValue(-1);
}
else if(shouldPruneTranslogByRetentionLease(settings) && translogRetentionSize.getBytes() == -1) {
return DEFAULT_TRANSLOG_RETENTION_SIZE;
}
return translogRetentionSize;
}

Expand Down Expand Up @@ -1071,6 +1100,20 @@ public void setRequiredPipeline(final String requiredPipeline) {
this.requiredPipeline = requiredPipeline;
}

/**
* Returns <code>true</code> if translog ops should be pruned based on retention lease
*/
public boolean shouldPruneTranslogByRetentionLease() {
return translogPruningByRetentionLease;
}

/**
* Returns <code>true</code> if translog ops should be pruned based on retention lease
*/
public static boolean shouldPruneTranslogByRetentionLease(Settings settings) {
return INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.get(settings);
}

/**
* Returns <code>true</code> if soft-delete is enabled.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1852,6 +1852,11 @@ public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue tran

}

public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize,
long softDeletesRetentionOps, boolean translogPruningByRetentionLease) {

}

/**
* Returns the timestamp of the last write in nanoseconds.
* Note: this time might not be absolutely accurate since the {@link Operation#startTime()} is used which might be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ public InternalEngine(EngineConfig engineConfig) {
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles(),
engineConfig.retentionLeasesSupplier()
);
store.incRef();
IndexWriter writer = null;
Expand Down Expand Up @@ -2572,7 +2573,8 @@ final void ensureCanFlush() {
}

@Override
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize,
long softDeletesRetentionOps, boolean translogPruningByRetentionLease) {
mergeScheduler.refreshConfig();
// config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed:
maybePruneDeletes();
Expand All @@ -2585,6 +2587,7 @@ public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue tran
final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy();
translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis());
translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes());
translogDeletionPolicy.shouldPruneTranslogByRetentionLease(translogPruningByRetentionLease);
softDeletesPolicy.setRetentionOperations(softDeletesRetentionOps);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1959,10 +1959,14 @@ public void onSettingsChanged() {
Engine engineOrNull = getEngineOrNull();
if (engineOrNull != null) {
final boolean disableTranslogRetention = indexSettings.isSoftDeleteEnabled() && useRetentionLeasesInPeerRecovery;
logger.error("testIndex - shouldPruneTranslogByRetentionLease = {}", indexSettings.shouldPruneTranslogByRetentionLease());
logger.error("testIndex - retentionSizeInBytes = {}", indexSettings.getTranslogRetentionSize());
engineOrNull.onSettingsChanged(
disableTranslogRetention ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(),
disableTranslogRetention ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations()
disableTranslogRetention && !indexSettings.shouldPruneTranslogByRetentionLease() ?
new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations(),
indexSettings.shouldPruneTranslogByRetentionLease()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@

package org.opensearch.index.translog;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.Counter;
import org.opensearch.Assertions;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.logging.Loggers;
import org.opensearch.index.seqno.RetentionLease;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.seqno.SequenceNumbers;

import java.io.IOException;
Expand All @@ -43,10 +47,13 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

public class TranslogDeletionPolicy {

private final Map<Object, RuntimeException> openTranslogRef;
private Supplier<RetentionLeases> retentionLeasesSupplier;
private static Logger log = Loggers.getLogger(TranslogDeletionPolicy.class, "TestTranslog");

public void assertNoOpenTranslogRefs() {
if (openTranslogRef.isEmpty() == false) {
Expand All @@ -69,6 +76,8 @@ public void assertNoOpenTranslogRefs() {

private int retentionTotalFiles;

private boolean shouldPruneTranslogByRetentionLease;

public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles) {
this.retentionSizeInBytes = retentionSizeInBytes;
this.retentionAgeInMillis = retentionAgeInMillis;
Expand All @@ -80,6 +89,12 @@ public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMill
}
}

public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles,
Supplier<RetentionLeases> retentionLeasesSupplier) {
this(retentionSizeInBytes, retentionAgeInMillis, retentionTotalFiles);
this.retentionLeasesSupplier = retentionLeasesSupplier;
}

public synchronized void setLocalCheckpointOfSafeCommit(long newCheckpoint) {
if (newCheckpoint < this.localCheckpointOfSafeCommit) {
throw new IllegalArgumentException("local checkpoint of the safe commit can't go backwards: " +
Expand All @@ -100,6 +115,12 @@ synchronized void setRetentionTotalFiles(int retentionTotalFiles) {
this.retentionTotalFiles = retentionTotalFiles;
}

public synchronized void shouldPruneTranslogByRetentionLease(boolean translogPruneByRetentionLease) {
this.shouldPruneTranslogByRetentionLease = translogPruneByRetentionLease;
log.error("testIndex - shouldPruneTranslogByRetentionLease = {}", shouldPruneTranslogByRetentionLease);
log.error("testIndex - retentionSizeInBytes = {}", retentionSizeInBytes);
}

/**
* acquires the basis generation for a new snapshot. Any translog generation above, and including, the returned generation
* will not be deleted until the returned {@link Releasable} is closed.
Expand Down Expand Up @@ -157,6 +178,12 @@ synchronized long minTranslogGenRequired(List<TranslogReader> readers, TranslogW
long minByLocks = getMinTranslogGenRequiredByLocks();
long minByAge = getMinTranslogGenByAge(readers, writer, retentionAgeInMillis, currentTime());
long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes);
long minByRetentionLeasesAndSize = Long.MAX_VALUE;
if(shouldPruneTranslogByRetentionLease) {
// If retention size is specified, size takes precedence.
long minByRetentionLeases = getMinTranslogGenByRetentionLease(readers, writer, retentionLeasesSupplier);
minByRetentionLeasesAndSize = Math.max(minBySize, minByRetentionLeases);
}
final long minByAgeAndSize;
if (minBySize == Long.MIN_VALUE && minByAge == Long.MIN_VALUE) {
// both size and age are disabled;
Expand All @@ -165,7 +192,28 @@ synchronized long minTranslogGenRequired(List<TranslogReader> readers, TranslogW
minByAgeAndSize = Math.max(minByAge, minBySize);
}
long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles);
return Math.min(Math.max(minByAgeAndSize, minByNumFiles), minByLocks);
long minByTranslogGenSettings = Math.min(Math.max(minByAgeAndSize, minByNumFiles), minByLocks);
return Math.min(minByTranslogGenSettings, minByRetentionLeasesAndSize);
}

static long getMinTranslogGenByRetentionLease(List<TranslogReader> readers, TranslogWriter writer,
Supplier<RetentionLeases> retentionLeasesSupplier) {
long minGen = writer.getGeneration();
final long minimumRetainingSequenceNumber = retentionLeasesSupplier.get()
.leases()
.stream()
.mapToLong(RetentionLease::retainingSequenceNumber)
.min()
.orElse(Long.MAX_VALUE);

for (int i = readers.size() - 1; i >= 0; i--) {
final TranslogReader reader = readers.get(i);
if(reader.getCheckpoint().minSeqNo <= minimumRetainingSequenceNumber &&
reader.getCheckpoint().maxSeqNo >= minimumRetainingSequenceNumber) {
minGen = Math.min(minGen, reader.getGeneration());
}
}
return minGen;
}

static long getMinTranslogGenBySize(List<TranslogReader> readers, TranslogWriter writer, long retentionSizeInBytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,24 @@ && isTargetSameHistory()
// advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can
// always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled
// down.
startingSeqNo = softDeletesEnabled
? Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L
: 0;
if(softDeletesEnabled) {
final long minimumRetainingSequenceNumber = shard.getRetentionLeases()
.leases()
.stream()
.mapToLong(RetentionLease::retainingSequenceNumber)
.min()
.orElse(Long.MAX_VALUE);
final long safeCommitSeqNo = Long.parseLong(safeCommitRef.getIndexCommit()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this right, tlog recovery is a mechanism to optimize serialization. Why would start sequence number change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to have the same set of tlog ops from the primary shard onto the replica shard during the peer recovery. This ensures the ops are available in tlog as well in newly recovered shard

.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L;
if(shard.indexSettings().shouldPruneTranslogByRetentionLease()) {
startingSeqNo = SequenceNumbers.min(safeCommitSeqNo, minimumRetainingSequenceNumber);
} else {
startingSeqNo = safeCommitSeqNo;
}
} else {
startingSeqNo = 0;
}
logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo);

try {
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo);
final Releasable releaseStore = acquireStore(shard.store());
Expand Down