Skip to content

Commit

Permalink
Enforce retention leases require soft deletes (#39922)
Browse files Browse the repository at this point in the history
If a primary on 6.7 and a replica on 5.6 are running more than 5 minutes
(retention leases background sync interval), the retention leases
background sync will be triggered, and it will trip 6.7 node due to the
illegal checkpoint value. We can fix the problem by making the returned
checkpoint depends on the node version. This PR, however, chooses to
enforce retention leases require soft deletes, and make retention leases
sync noop if soft deletes is disabled instead.

Closes #39914
  • Loading branch information
dnhatn committed Mar 12, 2019
1 parent 78c4d47 commit a19927e
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,9 @@ private void maybeSyncGlobalCheckpoints() {
}

private void syncRetentionLeases() {
sync(IndexShard::syncRetentionLeases, "retention lease");
if (indexSettings.isSoftDeleteEnabled()) {
sync(IndexShard::syncRetentionLeases, "retention lease");
}
}

private void sync(final Consumer<IndexShard> sync, final String source) {
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1897,6 +1897,14 @@ public void addGlobalCheckpointListener(
this.globalCheckpointListeners.add(waitingForGlobalCheckpoint, listener, timeout);
}

private void ensureSoftDeletesEnabled(String feature) {
if (indexSettings.isSoftDeleteEnabled() == false) {
String message = feature + " requires soft deletes but " + indexSettings.getIndex() + " does not have soft deletes enabled";
assert false : message;
throw new IllegalStateException(message);
}
}

/**
* Get all retention leases tracked on this shard.
*
Expand Down Expand Up @@ -1943,6 +1951,7 @@ public RetentionLease addRetentionLease(
Objects.requireNonNull(listener);
assert assertPrimaryMode();
verifyNotClosed();
ensureSoftDeletesEnabled("retention leases");
try (Closeable ignore = acquireRetentionLock()) {
final long actualRetainingSequenceNumber =
retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber;
Expand All @@ -1964,6 +1973,7 @@ public RetentionLease addRetentionLease(
public RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
assert assertPrimaryMode();
verifyNotClosed();
ensureSoftDeletesEnabled("retention leases");
try (Closeable ignore = acquireRetentionLock()) {
final long actualRetainingSequenceNumber =
retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber;
Expand All @@ -1983,6 +1993,7 @@ public void removeRetentionLease(final String id, final ActionListener<Replicati
Objects.requireNonNull(listener);
assert assertPrimaryMode();
verifyNotClosed();
ensureSoftDeletesEnabled("retention leases");
replicationTracker.removeRetentionLease(id, listener);
}

Expand Down Expand Up @@ -2024,6 +2035,7 @@ public void persistRetentionLeases() throws WriteStateException {
public void syncRetentionLeases() {
assert assertPrimaryMode();
verifyNotClosed();
ensureSoftDeletesEnabled("retention leases");
final Tuple<Boolean, RetentionLeases> retentionLeases = getRetentionLeases(true);
if (retentionLeases.v1()) {
logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.Closeable;
import java.util.ArrayList;
Expand Down Expand Up @@ -73,7 +75,7 @@ public List<Setting<?>> getSettings() {
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Stream.concat(
super.nodePlugins().stream(),
Stream.of(RetentionLeaseSyncIntervalSettingPlugin.class))
Stream.of(RetentionLeaseSyncIntervalSettingPlugin.class, MockTransportService.TestPlugin.class))
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -318,6 +320,36 @@ public void testBackgroundRetentionLeaseSync() throws Exception {
}
}

public void testRetentionLeasesBackgroundSyncWithSoftDeletesDisabled() throws Exception {
final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2);
internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);
TimeValue syncIntervalSetting = TimeValue.timeValueMillis(between(1, 100));
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", numberOfReplicas)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), syncIntervalSetting.getStringRep())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false)
.build();
createIndex("index", settings);
final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId();
final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName();
final MockTransportService primaryTransportService = (MockTransportService) internalCluster().getInstance(
TransportService.class, primaryShardNodeName);
final AtomicBoolean backgroundSyncRequestSent = new AtomicBoolean();
primaryTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.startsWith(RetentionLeaseBackgroundSyncAction.ACTION_NAME)) {
backgroundSyncRequestSent.set(true);
}
connection.sendRequest(requestId, action, request, options);
});
final long start = System.nanoTime();
ensureGreen("index");
final long syncEnd = System.nanoTime();
// We sleep long enough for the retention leases background sync to be triggered
Thread.sleep(Math.max(0, randomIntBetween(2, 3) * syncIntervalSetting.millis() - TimeUnit.NANOSECONDS.toMillis(syncEnd - start)));
assertFalse("retention leases background sync must be a noop if soft deletes is disabled", backgroundSyncRequestSent.get());
}

public void testRetentionLeasesSyncOnRecovery() throws Exception {
final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2);
internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
Expand All @@ -41,6 +42,7 @@ public void testRetentionLeaseStats() throws InterruptedException {
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build();
createIndex("index", settings);
ensureGreen("index");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ protected void tearDownThreadPool() {
}

public void testAddOrRenewRetentionLease() throws IOException {
final IndexShard indexShard = newStartedShard(true);
final IndexShard indexShard = newStartedShard(true,
Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build());
final long primaryTerm = indexShard.getOperationPrimaryTerm();
try {
final int length = randomIntBetween(0, 8);
Expand Down Expand Up @@ -102,7 +103,8 @@ public void testAddOrRenewRetentionLease() throws IOException {
}

public void testRemoveRetentionLease() throws IOException {
final IndexShard indexShard = newStartedShard(true);
final IndexShard indexShard = newStartedShard(true,
Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build());
final long primaryTerm = indexShard.getOperationPrimaryTerm();
try {
final int length = randomIntBetween(0, 8);
Expand Down Expand Up @@ -143,6 +145,7 @@ private void runExpirationTest(final boolean primary) throws IOException {
final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis());
final Settings settings = Settings
.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(
IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.getKey(),
TimeValue.timeValueMillis(retentionLeaseMillis))
Expand Down Expand Up @@ -268,7 +271,8 @@ public void testPersistence() throws IOException {
}

public void testRetentionLeaseStats() throws IOException {
final IndexShard indexShard = newStartedShard(true);
final IndexShard indexShard = newStartedShard(true,
Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build());
try {
final int length = randomIntBetween(0, 8);
final long[] minimumRetainingSequenceNumbers = new long[length];
Expand All @@ -289,6 +293,22 @@ public void testRetentionLeaseStats() throws IOException {
}
}

public void testRetentionLeasesActionsFailWithSoftDeletesDisabled() throws Exception {
IndexShard shard = newStartedShard(true, Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false).build());
assertThat(expectThrows(AssertionError.class, () -> shard.addRetentionLease(randomAlphaOfLength(10),
randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE), "test", ActionListener.wrap(() -> {}))).getMessage(),
equalTo("retention leases requires soft deletes but [index] does not have soft deletes enabled"));
assertThat(expectThrows(AssertionError.class, () -> shard.renewRetentionLease(
randomAlphaOfLength(10), randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE), "test")).getMessage(),
equalTo("retention leases requires soft deletes but [index] does not have soft deletes enabled"));
assertThat(expectThrows(AssertionError.class, () -> shard.removeRetentionLease(
randomAlphaOfLength(10), ActionListener.wrap(() -> {}))).getMessage(),
equalTo("retention leases requires soft deletes but [index] does not have soft deletes enabled"));
assertThat(expectThrows(AssertionError.class, shard::syncRetentionLeases).getMessage(),
equalTo("retention leases requires soft deletes but [index] does not have soft deletes enabled"));
closeShards(shard);
}

private void assertRetentionLeases(
final IndexShard indexShard,
final int size,
Expand Down

0 comments on commit a19927e

Please sign in to comment.