From 34148bf7f45180b5c918ff5bdce34aeb074de30d Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 31 Jul 2019 14:22:38 -0600 Subject: [PATCH 1/5] Time-bound deletion of snapshots in retention delete function With a cluster that has a large number of snapshots, it's possible that snapshot deletion can take a very long time (especially since deletes currently have to happen in a serial fashion). To prevent snapshot deletion from taking forever in a cluster and blocking other operations, this commit adds a setting to allow configuring a maximum time to spend deletion snapshots during retention. This dynamic setting defaults to 1 hour and is best-effort, meaning that it doesn't hard stop a deletion at an hour mark, but ensures that once the time has passed, all subsequent deletions are deferred until the next retention cycle. Relates to #43663 --- .../xpack/core/ilm/LifecycleSettings.java | 3 + .../ilm/TimeSeriesLifecycleActionsIT.java | 7 +- .../xpack/slm/SnapshotLifecycleIT.java | 4 +- .../xpack/slm/SnapshotRetentionTask.java | 100 +++++++++----- .../xpack/slm/SnapshotRetentionTaskTests.java | 127 ++++++++++++++++-- 5 files changed, 189 insertions(+), 52 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecycleSettings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecycleSettings.java index 6e3ac760bfc32..3e2623c35250c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecycleSettings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/LifecycleSettings.java @@ -20,6 +20,7 @@ public class LifecycleSettings { public static final String SLM_HISTORY_INDEX_ENABLED = "slm.history_index_enabled"; public static final String SLM_RETENTION_SCHEDULE = "slm.retention_schedule"; + public static final String SLM_RETENTION_DURATION = "slm.retention_duration"; public static final Setting LIFECYCLE_POLL_INTERVAL_SETTING = Setting.timeSetting(LIFECYCLE_POLL_INTERVAL, @@ -42,4 +43,6 @@ public class LifecycleSettings { SLM_RETENTION_SCHEDULE + "]", e); } }, Setting.Property.Dynamic, Setting.Property.NodeScope); + public static final Setting SLM_RETENTION_DURATION_SETTING = Setting.timeSetting(SLM_RETENTION_DURATION, + TimeValue.timeValueHours(1), TimeValue.timeValueMillis(500), Setting.Property.Dynamic, Setting.Property.NodeScope); } diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java index 575d75aa10336..e442e70a36631 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java @@ -383,7 +383,8 @@ public void testDeleteDuringSnapshot() throws Exception { // index document so snapshot actually does something indexDocument(); // start snapshot - request = new Request("PUT", "/_snapshot/repo/snapshot"); + String snapName = "snapshot-" + randomAlphaOfLength(6).toLowerCase(Locale.ROOT); + request = new Request("PUT", "/_snapshot/repo/" + snapName); request.addParameter("wait_for_completion", "false"); request.setJsonEntity("{\"indices\": \"" + index + "\"}"); assertOK(client().performRequest(request)); @@ -392,8 +393,8 @@ public void testDeleteDuringSnapshot() throws Exception { // assert that index was deleted assertBusy(() -> assertFalse(indexExists(index)), 2, TimeUnit.MINUTES); // assert that snapshot is still in progress and clean up - assertThat(getSnapshotState("snapshot"), equalTo("SUCCESS")); - assertOK(client().performRequest(new Request("DELETE", "/_snapshot/repo/snapshot"))); + assertThat(getSnapshotState(snapName), equalTo("SUCCESS")); + assertOK(client().performRequest(new Request("DELETE", "/_snapshot/repo/" + snapName))); } public void testReadOnly() throws Exception { diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java index d93cbe74c41fa..5ad8dbf11510d 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java @@ -36,6 +36,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; @@ -293,12 +294,13 @@ public void testBasicTimeBasedRetenion() throws Exception { assertBusy(() -> { // We expect a failed response because the snapshot should not exist try { + logger.info("--> checking to see if snapshot has been deleted..."); Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName)); assertThat(EntityUtils.toString(response.getEntity()), containsString("snapshot_missing_exception")); } catch (ResponseException e) { assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception")); } - }); + }, 30, TimeUnit.SECONDS); Request delReq = new Request("DELETE", "/_slm/policy/" + policyName); assertOK(client().performRequest(delReq)); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index 2a604bc6845f3..fe6fdc93d1f99 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -18,8 +18,12 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata; import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy; @@ -64,6 +68,7 @@ public void triggered(SchedulerEngine.Event event) { try { logger.info("starting SLM retention snapshot cleanup task"); final ClusterState state = clusterService.state(); + final TimeValue maxDeletionTime = LifecycleSettings.SLM_RETENTION_DURATION_SETTING.get(state.metaData().settings()); // Find all SLM policies that have retention enabled final Map policiesWithRetention = getAllPoliciesWithRetentionEnabled(state); @@ -74,7 +79,7 @@ public void triggered(SchedulerEngine.Event event) { .map(SnapshotLifecyclePolicy::getRepository) .collect(Collectors.toSet()); - getAllSnapshots(repositioriesToFetch, new ActionListener<>() { + getAllSuccessfulSnapshots(repositioriesToFetch, new ActionListener<>() { @Override public void onResponse(Map> allSnapshots) { // Find all the snapshots that are past their retention date @@ -85,7 +90,7 @@ public void onResponse(Map> allSnapshots) { .collect(Collectors.toList()))); // Finally, delete the snapshots that need to be deleted - deleteSnapshots(snapshotsToBeDeleted); + deleteSnapshots(snapshotsToBeDeleted, maxDeletionTime); } @Override @@ -160,8 +165,8 @@ static boolean snapshotEligibleForDeletion(SnapshotInfo snapshot, Map repositories, ActionListener>> listener, - Consumer errorHandler) { + void getAllSuccessfulSnapshots(Collection repositories, ActionListener>> listener, + Consumer errorHandler) { if (repositories.isEmpty()) { // Skip retrieving anything if there are no repositories to fetch listener.onResponse(Collections.emptyMap()); @@ -175,7 +180,11 @@ void getAllSnapshots(Collection repositories, ActionListener> snapshots = new HashMap<>(); repositories.forEach(repo -> { - snapshots.put(repo, resp.getSnapshots(repo)); + snapshots.put(repo, + // Only return snapshots in the SUCCESS state + resp.getSnapshots(repo).stream() + .filter(info -> info.state() == SnapshotState.SUCCESS) + .collect(Collectors.toList())); }); listener.onResponse(snapshots); } @@ -188,42 +197,63 @@ public void onFailure(Exception e) { }); } - void deleteSnapshots(Map> snapshotsToDelete) { - // TODO: make this more resilient and possibly only delete for a certain amount of time + void deleteSnapshots(Map> snapshotsToDelete, TimeValue maximumTime) { int count = snapshotsToDelete.values().stream().mapToInt(List::size).sum(); if (count == 0) { logger.debug("no snapshots are eligible for deletion"); return; } + logger.info("starting snapshot retention deletion for [{}] snapshots", count); - snapshotsToDelete.forEach((repo, snapshots) -> { - snapshots.forEach(info -> { - logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, info.snapshotId()); - CountDownLatch latch = new CountDownLatch(1); - client.admin().cluster().prepareDeleteSnapshot(repo, info.snapshotId().getName()) - .execute(new LatchedActionListener<>(new ActionListener<>() { - @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { - if (acknowledgedResponse.isAcknowledged()) { - logger.debug("[{}] snapshot [{}] deleted successfully", repo, info.snapshotId()); - } - } - - @Override - public void onFailure(Exception e) { - logger.warn(new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention", - repo, info.snapshotId()), e); - } - }, latch)); - try { - // Deletes cannot occur simultaneously, so wait for this - // deletion to complete before attempting the next one - latch.await(); - } catch (InterruptedException e) { - logger.error(new ParameterizedMessage("[{}] deletion of snapshot [{}] interrupted", - repo, info.snapshotId()), e); + long startTime = System.nanoTime(); + int deleted = 0; + for (Map.Entry> entry : snapshotsToDelete.entrySet()) { + String repo = entry.getKey(); + List snapshots = entry.getValue(); + for (SnapshotInfo info : snapshots) { + deleteSnapshot(repo, info.snapshotId()); + deleted++; + // Check whether we have exceeded the maximum time allowed to spend deleting + // snapshots, if we have, short-circuit the rest of the deletions + TimeValue elapsedDeletionTime = TimeValue.timeValueNanos(System.nanoTime() - startTime); + if (elapsedDeletionTime.compareTo(maximumTime) > 0) { + logger.info("maximum snapshot retention deletion time reached, time spent: [{}]," + + " maximum allowed time: [{}], deleted {} out of {} snapshots scheduled for deletion", + elapsedDeletionTime, maximumTime, deleted, count); + return; } - }); - }); + } + } + } + + /** + * Delete the given snapshot from the repository in blocking manner + */ + void deleteSnapshot(String repo, SnapshotId snapshot) { + logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, snapshot); + CountDownLatch latch = new CountDownLatch(1); + client.admin().cluster().prepareDeleteSnapshot(repo, snapshot.getName()) + .execute(new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + if (acknowledgedResponse.isAcknowledged()) { + logger.debug("[{}] snapshot [{}] deleted successfully", repo, snapshot); + } + } + + @Override + public void onFailure(Exception e) { + logger.warn(new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention", + repo, snapshot), e); + } + }, latch)); + try { + // Deletes cannot occur simultaneously, so wait for this + // deletion to complete before attempting the next one + latch.await(); + } catch (InterruptedException e) { + logger.error(new ParameterizedMessage("[{}] deletion of snapshot [{}] interrupted", + repo, snapshot), e); + } } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java index 7800fbbb3b69b..186f2613d4d16 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -12,14 +12,17 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.ilm.OperationMode; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata; @@ -33,16 +36,21 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +@TestLogging(value = "org.elasticsearch.xpack.slm:TRACE", reason = "I want to log") public class SnapshotRetentionTaskTests extends ESTestCase { public void testGetAllPoliciesWithRetentionEnabled() { @@ -87,34 +95,34 @@ public void testSnapshotEligibleForDeletion() { // Test when user metadata is null SnapshotInfo info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), - 0L, "reason", 1L, 1, Collections.emptyList(), true, null); + 0L, null, 1L, 1, Collections.emptyList(), true, null); assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(false)); // Test when no retention is configured info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), - 0L, "reason", 1L, 1, Collections.emptyList(), true, null); + 0L, null, 1L, 1, Collections.emptyList(), true, null); assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyWithNoRetentionMap), equalTo(false)); // Test when user metadata is a map that doesn't contain "policy" info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), - 0L, "reason", 1L, 1, Collections.emptyList(), true, Collections.singletonMap("foo", "bar")); + 0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("foo", "bar")); assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(false)); // Test with an ancient snapshot that should be expunged info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), - 0L, "reason", 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + 0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(true)); // Test with a snapshot that's start date is old enough to be expunged (but the finish date is not) long time = System.currentTimeMillis() - TimeValue.timeValueDays(30).millis() - 1; info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), - time, "reason", time + TimeValue.timeValueDays(4).millis(), 1, Collections.emptyList(), + time, null, time + TimeValue.timeValueDays(4).millis(), 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(true)); // Test with a fresh snapshot that should not be expunged info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), - System.currentTimeMillis(), "reason", System.currentTimeMillis() + 1, + System.currentTimeMillis(), null, System.currentTimeMillis() + 1, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(false)); } @@ -131,9 +139,9 @@ public void testRetentionTask() throws Exception { ClusterServiceUtils.setState(clusterService, state); final SnapshotInfo eligibleSnapshot = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), - 0L, "reason", 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + 0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); final SnapshotInfo ineligibleSnapshot = new SnapshotInfo(new SnapshotId("name2", "uuid2"), Collections.singletonList("index"), - System.currentTimeMillis(), "reason", System.currentTimeMillis() + 1, 1, + System.currentTimeMillis(), null, System.currentTimeMillis() + 1, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); AtomicReference> deleted = new AtomicReference<>(); @@ -166,6 +174,73 @@ public void testRetentionTask() throws Exception { } } + public void testTimeBoundedDeletion() throws Exception { + try (ThreadPool threadPool = new TestThreadPool("slm-test"); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + Client noOpClient = new NoOpClient("slm-test")) { + + SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy("policy", "snap", "1 * * * * ?", + "repo", null, new SnapshotRetentionConfiguration(null, null,1)); + + ClusterState state = createState(policy); + state = ClusterState.builder(state) + .metaData(MetaData.builder(state.metaData()) + .transientSettings(Settings.builder() + .put(LifecycleSettings.SLM_RETENTION_DURATION, "500ms") + .build())).build(); + ClusterServiceUtils.setState(clusterService, state); + + final SnapshotInfo snap1 = new SnapshotInfo(new SnapshotId("name1", "uuid1"), Collections.singletonList("index"), + 0L, null, 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + final SnapshotInfo snap2 = new SnapshotInfo(new SnapshotId("name2", "uuid2"), Collections.singletonList("index"), + 1L, null, 2L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + final SnapshotInfo snap3 = new SnapshotInfo(new SnapshotId("name3", "uuid3"), Collections.singletonList("index"), + 2L, null, 3L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + final SnapshotInfo snap4 = new SnapshotInfo(new SnapshotId("name4", "uuid4"), Collections.singletonList("index"), + 3L, null, 4L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + final SnapshotInfo snap5 = new SnapshotInfo(new SnapshotId("name5", "uuid5"), Collections.singletonList("index"), + 4L, null, 5L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + + final Set deleted = ConcurrentHashMap.newKeySet(); + // We're expected two deletions before they hit the "taken too long" test, so have a latch of 2 + CountDownLatch latch = new CountDownLatch(2); + OverrideDeleteSnapshotRetentionTask retentionTask = new OverrideDeleteSnapshotRetentionTask(noOpClient, clusterService, + () -> { + List snaps = Arrays.asList(snap1, snap2, snap3, snap4, snap5); + logger.info("--> retrieving snapshots [{}]", snaps); + return Collections.singletonMap("repo", snaps); + }, + (repo, snapshotId) -> { + // Don't pause until snapshot 2 + if (snapshotId.equals(snap2.snapshotId())) { + try { + logger.info("--> pausing for 501ms while deleting snap2 to simulate deletion past a threshold"); + Thread.sleep(501); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + deleted.add(snapshotId); + latch.countDown(); + }); + + long time = System.currentTimeMillis(); + retentionTask.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_JOB_ID, time, time)); + + boolean success = latch.await(10, TimeUnit.SECONDS); + + assertThat("expected 2 snapshot deletions within 10 seconds, deleted: " + deleted, success, equalTo(true)); + + assertNotNull("something should have been deleted", deleted); + assertThat("two snapshots should have been deleted", deleted.size(), equalTo(2)); + assertThat(deleted, containsInAnyOrder(snap1.snapshotId(), snap2.snapshotId())); + + threadPool.shutdownNow(); + threadPool.awaitTermination(10, TimeUnit.SECONDS); + } + + } + public ClusterState createState(SnapshotLifecyclePolicy... policies) { Map policyMetadataMap = Arrays.stream(policies) .map(policy -> SnapshotLifecyclePolicyMetadata.builder() @@ -184,7 +259,7 @@ public ClusterState createState(SnapshotLifecyclePolicy... policies) { .build(); } - private class MockSnapshotRetentionTask extends SnapshotRetentionTask { + private static class MockSnapshotRetentionTask extends SnapshotRetentionTask { private final Supplier>> snapshotRetriever; private final Consumer>> snapshotDeleter; @@ -199,15 +274,41 @@ private class MockSnapshotRetentionTask extends SnapshotRetentionTask { } @Override - void getAllSnapshots(Collection repositories, - ActionListener>> listener, - Consumer errorHandler) { + void getAllSuccessfulSnapshots(Collection repositories, + ActionListener>> listener, + Consumer errorHandler) { listener.onResponse(this.snapshotRetriever.get()); } @Override - void deleteSnapshots(Map> snapshotsToDelete) { + void deleteSnapshots(Map> snapshotsToDelete, TimeValue maxDeleteTime) { this.snapshotDeleter.accept(snapshotsToDelete); } } + + private static class OverrideDeleteSnapshotRetentionTask extends SnapshotRetentionTask { + private final Supplier>> snapshotRetriever; + private final BiConsumer deleteRunner; + + OverrideDeleteSnapshotRetentionTask(Client client, + ClusterService clusterService, + Supplier>> snapshotRetriever, + BiConsumer deleteRunner) { + super(client, clusterService); + this.snapshotRetriever = snapshotRetriever; + this.deleteRunner = deleteRunner; + } + + @Override + void getAllSuccessfulSnapshots(Collection repositories, + ActionListener>> listener, + Consumer errorHandler) { + listener.onResponse(this.snapshotRetriever.get()); + } + + @Override + void deleteSnapshot(String repo, SnapshotId snapshot) { + deleteRunner.accept(repo, snapshot); + } + } } From a5e1c10aae0d5e60e1446dbb74dffdaf3833d677 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 31 Jul 2019 16:29:42 -0600 Subject: [PATCH 2/5] Wow snapshots suuuure can take a long time. --- .../java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java index 5ad8dbf11510d..4ebac8728e527 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java @@ -300,7 +300,7 @@ public void testBasicTimeBasedRetenion() throws Exception { } catch (ResponseException e) { assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception")); } - }, 30, TimeUnit.SECONDS); + }, 60, TimeUnit.SECONDS); Request delReq = new Request("DELETE", "/_slm/policy/" + policyName); assertOK(client().performRequest(delReq)); From 70fdf9f2096826bd73e7e355a81ff7fb5d3d04f5 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 6 Aug 2019 09:27:47 -0600 Subject: [PATCH 3/5] Use a LongSupplier instead of actually sleeping --- .../xpack/ilm/IndexLifecycle.java | 3 ++- .../xpack/slm/SnapshotRetentionTask.java | 10 ++++++--- .../slm/SnapshotRetentionServiceTests.java | 2 +- .../xpack/slm/SnapshotRetentionTaskTests.java | 22 ++++++++++--------- 4 files changed, 22 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java index cedc69bba0252..f466f34ea0228 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java @@ -154,7 +154,8 @@ public Collection createComponents(Client client, ClusterService cluster snapshotHistoryStore.set(new SnapshotHistoryStore(settings, client, getClock().getZone())); snapshotLifecycleService.set(new SnapshotLifecycleService(settings, () -> new SnapshotLifecycleTask(client, clusterService, snapshotHistoryStore.get()), clusterService, getClock())); - snapshotRetentionService.set(new SnapshotRetentionService(settings, () -> new SnapshotRetentionTask(client, clusterService), + snapshotRetentionService.set(new SnapshotRetentionService(settings, + () -> new SnapshotRetentionTask(client, clusterService, System::nanoTime), clusterService, getClock())); return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotHistoryStore.get(), snapshotRetentionService.get()); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index fe6fdc93d1f99..89e8ee621022d 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -39,6 +39,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import java.util.function.LongSupplier; import java.util.stream.Collectors; /** @@ -54,10 +55,12 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener { private final Client client; private final ClusterService clusterService; + private final LongSupplier nowNanoSupplier; - public SnapshotRetentionTask(Client client, ClusterService clusterService) { + public SnapshotRetentionTask(Client client, ClusterService clusterService, LongSupplier nowNanoSupplier) { this.client = new OriginSettingClient(client, ClientHelper.INDEX_LIFECYCLE_ORIGIN); this.clusterService = clusterService; + this.nowNanoSupplier = nowNanoSupplier; } @Override @@ -205,7 +208,7 @@ void deleteSnapshots(Map> snapshotsToDelete, TimeValu } logger.info("starting snapshot retention deletion for [{}] snapshots", count); - long startTime = System.nanoTime(); + long startTime = nowNanoSupplier.getAsLong(); int deleted = 0; for (Map.Entry> entry : snapshotsToDelete.entrySet()) { String repo = entry.getKey(); @@ -215,7 +218,8 @@ void deleteSnapshots(Map> snapshotsToDelete, TimeValu deleted++; // Check whether we have exceeded the maximum time allowed to spend deleting // snapshots, if we have, short-circuit the rest of the deletions - TimeValue elapsedDeletionTime = TimeValue.timeValueNanos(System.nanoTime() - startTime); + TimeValue elapsedDeletionTime = TimeValue.timeValueNanos(nowNanoSupplier.getAsLong() - startTime); + logger.trace("elapsed time for deletion of [{}] snapshot: {}", info.snapshotId(), elapsedDeletionTime); if (elapsedDeletionTime.compareTo(maximumTime) > 0) { logger.info("maximum snapshot retention deletion time reached, time spent: [{}]," + " maximum allowed time: [{}], deleted {} out of {} snapshots scheduled for deletion", diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java index 683f2eaff07eb..257dcb518662f 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java @@ -67,7 +67,7 @@ public void testJobsAreScheduled() { private static class FakeRetentionTask extends SnapshotRetentionTask { FakeRetentionTask() { - super(mock(Client.class), null); + super(mock(Client.class), null, System::nanoTime); } @Override diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java index 186f2613d4d16..50167438d6693 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -40,10 +40,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -204,6 +206,7 @@ public void testTimeBoundedDeletion() throws Exception { final Set deleted = ConcurrentHashMap.newKeySet(); // We're expected two deletions before they hit the "taken too long" test, so have a latch of 2 CountDownLatch latch = new CountDownLatch(2); + AtomicLong nanos = new AtomicLong(System.nanoTime()); OverrideDeleteSnapshotRetentionTask retentionTask = new OverrideDeleteSnapshotRetentionTask(noOpClient, clusterService, () -> { List snaps = Arrays.asList(snap1, snap2, snap3, snap4, snap5); @@ -211,18 +214,16 @@ public void testTimeBoundedDeletion() throws Exception { return Collections.singletonMap("repo", snaps); }, (repo, snapshotId) -> { + logger.info("--> deleting {}", snapshotId); // Don't pause until snapshot 2 if (snapshotId.equals(snap2.snapshotId())) { - try { - logger.info("--> pausing for 501ms while deleting snap2 to simulate deletion past a threshold"); - Thread.sleep(501); - } catch (InterruptedException e) { - throw new AssertionError(e); - } + logger.info("--> pausing for 501ms while deleting snap2 to simulate deletion past a threshold"); + nanos.addAndGet(TimeValue.timeValueMillis(501).nanos()); } deleted.add(snapshotId); latch.countDown(); - }); + }, + nanos::get); long time = System.currentTimeMillis(); retentionTask.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_JOB_ID, time, time)); @@ -268,7 +269,7 @@ private static class MockSnapshotRetentionTask extends SnapshotRetentionTask { ClusterService clusterService, Supplier>> snapshotRetriever, Consumer>> snapshotDeleter) { - super(client, clusterService); + super(client, clusterService, System::nanoTime); this.snapshotRetriever = snapshotRetriever; this.snapshotDeleter = snapshotDeleter; } @@ -293,8 +294,9 @@ private static class OverrideDeleteSnapshotRetentionTask extends SnapshotRetenti OverrideDeleteSnapshotRetentionTask(Client client, ClusterService clusterService, Supplier>> snapshotRetriever, - BiConsumer deleteRunner) { - super(client, clusterService); + BiConsumer deleteRunner, + LongSupplier nanoSupplier) { + super(client, clusterService, nanoSupplier); this.snapshotRetriever = snapshotRetriever; this.deleteRunner = deleteRunner; } From 33175afcad8d9ff069e5b2d964c22fad95794303 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 6 Aug 2019 09:28:03 -0600 Subject: [PATCH 4/5] Remove TestLogging annotation --- .../org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java index 50167438d6693..a56dd55d8a404 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -19,7 +19,6 @@ import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; @@ -52,7 +51,6 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; -@TestLogging(value = "org.elasticsearch.xpack.slm:TRACE", reason = "I want to log") public class SnapshotRetentionTaskTests extends ESTestCase { public void testGetAllPoliciesWithRetentionEnabled() { From 77b8de672ac27b488c8f1cc83f3d78740cc9540f Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 6 Aug 2019 14:21:35 -0600 Subject: [PATCH 5/5] Remove rate limiting --- .../java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java index 4ebac8728e527..86c04ba29a316 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java @@ -426,7 +426,6 @@ private void inializeRepo(String repoName) throws IOException { .startObject("settings") .field("compress", randomBoolean()) .field("location", System.getProperty("tests.path.repo")) - .field("max_snapshot_bytes_per_sec", "256b") .endObject() .endObject())); assertOK(client().performRequest(request));