Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce retention lease expiration #37195

Merged
merged 7 commits into from
Jan 8, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -133,6 +133,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_GC_DELETES_SETTING,
IndexSettings.INDEX_SOFT_DELETES_SETTING,
IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING,
IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING,
IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING,
UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING,
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
24 changes: 24 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
@@ -256,6 +256,17 @@ public final class IndexSettings {
Setting.longSetting("index.soft_deletes.retention.operations", 0, 0,
Property.IndexScope, Property.Dynamic);

/**
* Controls the maximum length of time since a retention lease is created or renewed before it is considered expired.
*/
public static final Setting<TimeValue> INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING =
Setting.timeSetting(
"index.soft_deletes.retention.lease",
TimeValue.timeValueHours(12),
TimeValue.ZERO,
Property.Dynamic,
Property.IndexScope);

/**
* The maximum number of refresh listeners allows on this shard.
*/
@@ -316,6 +327,18 @@ public final class IndexSettings {
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
private final boolean softDeleteEnabled;
private volatile long softDeleteRetentionOperations;

private volatile long retentionLeaseMillis;

/**
* The maximum age of a retention lease before it is considered expired.
*
* @return the maximum age
*/
public long getRetentionLeaseMillis() {
return retentionLeaseMillis;
}

private volatile boolean warmerEnabled;
private volatile int maxResultWindow;
private volatile int maxInnerResultWindow;
@@ -431,6 +454,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
softDeleteEnabled = version.onOrAfter(Version.V_6_5_0) && scopedSettings.get(INDEX_SOFT_DELETES_SETTING);
softDeleteRetentionOperations = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING);
retentionLeaseMillis = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING).millis();
warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING);
maxResultWindow = scopedSettings.get(MAX_RESULT_WINDOW_SETTING);
maxInnerResultWindow = scopedSettings.get(MAX_INNER_RESULT_WINDOW_SETTING);
Original file line number Diff line number Diff line change
@@ -35,7 +35,6 @@
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -137,6 +136,12 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/
private final LongConsumer onGlobalCheckpointUpdated;

/**
* A supplier of the current time. This supplier is used to add a timestamp to retention leases, and to determine retention lease
* expiration.
*/
private final LongSupplier currentTimeMillisSupplier;

/**
* This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
* current global checkpoint.
@@ -151,12 +156,21 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
private final Map<String, RetentionLease> retentionLeases = new HashMap<>();

/**
* Get all retention leases tracker on this shard. An unmodifiable copy of the retention leases is returned.
* Get all non-expired retention leases tracker on this shard. An unmodifiable copy of the retention leases is returned.
*
* @return the retention leases
*/
public synchronized Collection<RetentionLease> getRetentionLeases() {
return Collections.unmodifiableCollection(new ArrayList<>(retentionLeases.values()));
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
final Collection<RetentionLease> nonExpiredRetentionLeases = retentionLeases
.values()
.stream()
.filter(retentionLease -> currentTimeMillis - retentionLease.timestamp() <= retentionLeaseMillis)
.collect(Collectors.toList());
retentionLeases.clear();
retentionLeases.putAll(nonExpiredRetentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, lease -> lease)));
return Collections.unmodifiableCollection(nonExpiredRetentionLeases);
}

/**
@@ -168,7 +182,7 @@ public synchronized Collection<RetentionLease> getRetentionLeases() {
*/
public synchronized void addOrUpdateRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
assert primaryMode;
retentionLeases.put(id, new RetentionLease(id, retainingSequenceNumber, source));
retentionLeases.put(id, new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source));
}

public static class CheckpointState implements Writeable {
@@ -425,7 +439,8 @@ public ReplicationTracker(
final String allocationId,
final IndexSettings indexSettings,
final long globalCheckpoint,
final LongConsumer onGlobalCheckpointUpdated) {
final LongConsumer onGlobalCheckpointUpdated,
final LongSupplier currentTimeMillisSupplier) {
super(shardId, indexSettings);
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
this.shardAllocationId = allocationId;
@@ -435,6 +450,7 @@ public ReplicationTracker(
this.checkpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas());
checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false));
this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated);
this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier);
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;
Original file line number Diff line number Diff line change
@@ -23,9 +23,9 @@
* A "shard history retention lease" (or "retention lease" for short) is conceptually a marker containing a retaining sequence number such
* that all operations with sequence number at least that retaining sequence number will be retained during merge operations (which could
* otherwise merge away operations that have been soft deleted). Each retention lease contains a unique identifier, the retaining sequence
* number, and the source of the retention lease (e.g., "ccr").
* number, the timestamp of when the lease was created or renewed, and the source of the retention lease (e.g., "ccr").
*/
public class RetentionLease {
public final class RetentionLease {

private final String id;

@@ -50,6 +50,17 @@ public long retainingSequenceNumber() {
return retainingSequenceNumber;
}

private final long timestamp;

/**
* The timestamp of when this retention lease was created or renewed.
*
* @return the timestamp used as a basis for determining lease expiration
*/
public long timestamp() {
return timestamp;
}

private final String source;

/**
@@ -66,19 +77,22 @@ public String source() {
*
* @param id the identifier of the retention lease
* @param retainingSequenceNumber the retaining sequence number
* @param timestamp the timestamp of when the retention lease was created or renewed
* @param source the source of the retention lease
*/
public RetentionLease(final String id, final long retainingSequenceNumber, final String source) {
public RetentionLease(final String id, final long retainingSequenceNumber, final long timestamp, final String source) {
this.id = id;
this.retainingSequenceNumber = retainingSequenceNumber;
this.timestamp = timestamp;
this.source = source;
}

@Override
public String toString() {
return "ShardHistoryRetentionLease{" +
return "RetentionLease{" +
"id='" + id + '\'' +
", retainingSequenceNumber=" + retainingSequenceNumber +
", timestamp=" + timestamp +
", source='" + source + '\'' +
'}';
}
Original file line number Diff line number Diff line change
@@ -305,7 +305,13 @@ public IndexShard(
this.globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), threadPool.scheduler(), logger);
this.replicationTracker =
new ReplicationTracker(shardId, aId, indexSettings, UNASSIGNED_SEQ_NO, globalCheckpointListeners::globalCheckpointUpdated);
new ReplicationTracker(
shardId,
aId,
indexSettings,
UNASSIGNED_SEQ_NO,
globalCheckpointListeners::globalCheckpointUpdated,
threadPool::absoluteTimeInMillis);

// the query cache is a node-level thing, however we want the most popular filters
// to be computed on a per-shard basis
Original file line number Diff line number Diff line change
@@ -55,7 +55,7 @@ public void testSoftDeletesRetentionLock() {
() -> {
final Set<RetentionLease> leases = new HashSet<>(retainingSequenceNumbers.length);
for (int i = 0; i < retainingSequenceNumbers.length; i++) {
leases.add(new RetentionLease(Integer.toString(i), retainingSequenceNumbers[i].get(), "test"));
leases.add(new RetentionLease(Integer.toString(i), retainingSequenceNumbers[i].get(), 0L, "test"));
}
return leases;
};
Original file line number Diff line number Diff line change
@@ -21,18 +21,23 @@

import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.IndexSettingsModule;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;

import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTestCase {

@@ -43,7 +48,8 @@ public void testAddOrUpdateRetentionLease() {
id.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO,
value -> {});
value -> {},
() -> 0L);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(id.getId()),
@@ -55,19 +61,73 @@ public void testAddOrUpdateRetentionLease() {
for (int i = 0; i < length; i++) {
minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
replicationTracker.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers);
assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L);
}

for (int i = 0; i < length; i++) {
minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE);
replicationTracker.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers);
assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L);
}

}

public void testExpiration() {
final AllocationId id = AllocationId.newInitializing();
final AtomicLong currentTimeMillis = new AtomicLong(randomLongBetween(0, 1024));
final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis());
final Settings settings = Settings
.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis))
.build();
final ReplicationTracker replicationTracker = new ReplicationTracker(
new ShardId("test", "_na", 0),
id.getId(),
IndexSettingsModule.newIndexSettings("test", settings),
UNASSIGNED_SEQ_NO,
value -> {},
currentTimeMillis::get);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(id.getId()),
routingTable(Collections.emptySet(), id),
Collections.emptySet());
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final long[] retainingSequenceNumbers = new long[1];
retainingSequenceNumbers[0] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
replicationTracker.addOrUpdateRetentionLease("0", retainingSequenceNumbers[0], "test-0");

{
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
assertThat(retentionLeases, hasSize(1));
final RetentionLease retentionLease = retentionLeases.iterator().next();
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get);
}

// renew the lease
currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, 1024));
retainingSequenceNumbers[0] = randomLongBetween(retainingSequenceNumbers[0], Long.MAX_VALUE);
replicationTracker.addOrUpdateRetentionLease("0", retainingSequenceNumbers[0], "test-0");

{
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
assertThat(retentionLeases, hasSize(1));
final RetentionLease retentionLease = retentionLeases.iterator().next();
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get);
}

// now force the lease to expire
currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get()));
assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get);
}

private void assertRetentionLeases(
final ReplicationTracker replicationTracker, final int size, final long[] minimumRetainingSequenceNumbers) {
final ReplicationTracker replicationTracker,
final int size,
final long[] minimumRetainingSequenceNumbers,
final LongSupplier currentTimeMillisSupplier) {
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
final Map<String, RetentionLease> idToRetentionLease = new HashMap<>();
for (final RetentionLease retentionLease : retentionLeases) {
@@ -79,6 +139,9 @@ private void assertRetentionLeases(
assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i)));
final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i));
assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i]));
assertThat(
currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(),
lessThanOrEqualTo(replicationTracker.indexSettings().getRetentionLeaseMillis()));
assertThat(retentionLease.source(), equalTo("test-" + i));
}
}
Original file line number Diff line number Diff line change
@@ -31,18 +31,23 @@

import java.util.Set;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;

import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

public abstract class ReplicationTrackerTestCase extends ESTestCase {

ReplicationTracker newTracker(final AllocationId allocationId, final LongConsumer updatedGlobalCheckpoint) {
ReplicationTracker newTracker(
final AllocationId allocationId,
final LongConsumer updatedGlobalCheckpoint,
final LongSupplier currentTimeMillisSupplier) {
return new ReplicationTracker(
new ShardId("test", "_na_", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO,
updatedGlobalCheckpoint);
updatedGlobalCheckpoint,
currentTimeMillisSupplier);
}

static IndexShardRoutingTable routingTable(final Set<AllocationId> initializingIds, final AllocationId primaryId) {
Original file line number Diff line number Diff line change
@@ -406,7 +406,7 @@ public void testWaitForAllocationIdToBeInSync() throws Exception {
private AtomicLong updatedGlobalCheckpoint = new AtomicLong(UNASSIGNED_SEQ_NO);

private ReplicationTracker newTracker(final AllocationId allocationId) {
return newTracker(allocationId, updatedGlobalCheckpoint::set);
return newTracker(allocationId, updatedGlobalCheckpoint::set, () -> 0L);
}

public void testWaitForAllocationIdToBeInSyncCanBeInterrupted() throws BrokenBarrierException, InterruptedException {
@@ -683,10 +683,10 @@ public void testPrimaryContextHandoff() throws IOException {
final AllocationId primaryAllocationId = clusterState.routingTable.primaryShard().allocationId();
final LongConsumer onUpdate = updatedGlobalCheckpoint -> {};
final long globalCheckpoint = UNASSIGNED_SEQ_NO;
ReplicationTracker oldPrimary =
new ReplicationTracker(shardId, primaryAllocationId.getId(), indexSettings, globalCheckpoint, onUpdate);
ReplicationTracker newPrimary =
new ReplicationTracker(shardId, primaryAllocationId.getRelocationId(), indexSettings, globalCheckpoint, onUpdate);
ReplicationTracker oldPrimary = new ReplicationTracker(
shardId, primaryAllocationId.getId(), indexSettings, globalCheckpoint, onUpdate, () -> 0L);
ReplicationTracker newPrimary = new ReplicationTracker(
shardId, primaryAllocationId.getRelocationId(), indexSettings, globalCheckpoint, onUpdate, () -> 0L);

Set<String> allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId));

Loading