From 262594b74a5ae5e66873a9fc9a905372e3c6d024 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 17 Jul 2019 16:45:40 -0600 Subject: [PATCH 1/6] Implement SnapshotRetentionTask's snapshot filtering and deletion This commit implements the snapshot filtering and deletion for `SnapshotRetentionTask`. Currently only the expire-after age is used for determining whether a snapshot is eligible for deletion. Relates to #43663 --- .../SnapshotLifecyclePolicy.java | 3 +- .../SnapshotRetentionConfiguration.java | 23 ++ .../xpack/slm/SnapshotLifecycleIT.java | 90 +++++++- .../xpack/slm/SnapshotRetentionTask.java | 130 +++++++++-- .../xpack/slm/SnapshotRetentionTaskTests.java | 213 ++++++++++++++++++ 5 files changed, 437 insertions(+), 22 deletions(-) create mode 100644 x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/snapshotlifecycle/SnapshotLifecyclePolicy.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/snapshotlifecycle/SnapshotLifecyclePolicy.java index a187ad7e9e076..755d389734bfe 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/snapshotlifecycle/SnapshotLifecyclePolicy.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/snapshotlifecycle/SnapshotLifecyclePolicy.java @@ -47,6 +47,8 @@ public class SnapshotLifecyclePolicy extends AbstractDiffable implements Writeable, Diffable, ToXContentObject { + public static final String POLICY_ID_METADATA_FIELD = "policy"; + private final String id; private final String name; private final String schedule; @@ -61,7 +63,6 @@ public class SnapshotLifecyclePolicy extends AbstractDiffable getSnapshotDeletionPredicate(final List allSnapshots) { + return si -> { + if (this.expireAfter != null) { + TimeValue snapshotAge = new TimeValue(System.currentTimeMillis() - si.startTime()); + if (snapshotAge.compareTo(this.expireAfter) > 0) { + return true; + } else { + return false; + } + } + // If nothing matched, the snapshot is not eligible for deletion + return false; + }; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java index 897358378debc..4c87e2084678e 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java @@ -7,12 +7,15 @@ package org.elasticsearch.xpack.slm; import org.apache.http.util.EntityUtils; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ToXContent; @@ -22,6 +25,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings; import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy; import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotRetentionConfiguration; @@ -222,6 +226,81 @@ public void testPolicyManualExecution() throws Exception { }); } + public void testBasicTimeBasedRetenion() throws Exception { + final String indexName = "test"; + final String policyName = "test-policy"; + final String repoId = "my-repo"; + int docCount = randomIntBetween(10, 50); + List indexReqs = new ArrayList<>(); + for (int i = 0; i < docCount; i++) { + index(client(), indexName, "" + i, "foo", "bar"); + } + + // Create a snapshot repo + inializeRepo(repoId); + + // Create a policy with a retention period of 1 millisecond + createSnapshotPolicy(policyName, "snap", "1 2 3 4 5 ?", repoId, indexName, true, + new SnapshotRetentionConfiguration(TimeValue.timeValueMillis(1))); + + // Manually create a snapshot + Response executeResp = client().performRequest(new Request("PUT", "/_slm/policy/" + policyName + "/_execute")); + + final String snapshotName; + try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, EntityUtils.toByteArray(executeResp.getEntity()))) { + snapshotName = parser.mapStrings().get("snapshot_name"); + + // Check that the executed snapshot is created + assertBusy(() -> { + try { + Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName)); + Map snapshotResponseMap; + try (InputStream is = response.getEntity().getContent()) { + snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); + } + assertThat(snapshotResponseMap.size(), greaterThan(0)); + final Map metadata = extractMetadata(snapshotResponseMap, snapshotName); + assertNotNull(metadata); + assertThat(metadata.get("policy"), equalTo(policyName)); + assertHistoryIsPresent(policyName, true, repoId); + } catch (ResponseException e) { + fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity())); + } + }); + } + + // Run retention every second + ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest(); + req.transientSettings(Settings.builder().put(LifecycleSettings.SLM_RETENTION_SCHEDULE, "*/1 * * * * ?")); + try (XContentBuilder builder = jsonBuilder()) { + req.toXContent(builder, ToXContent.EMPTY_PARAMS); + Request r = new Request("PUT", "/_cluster/settings"); + r.setJsonEntity(Strings.toString(builder)); + Response updateSettingsResp = client().performRequest(r); + } + + // Check that the snapshot created by the policy has been removed by retention + assertBusy(() -> { + // We expect a failed response because the snapshot should not exist + try { + Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName)); + assertThat(EntityUtils.toString(response.getEntity()), containsString("snapshot_missing_exception")); + } catch (ResponseException e) { + assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception")); + } + }); + + Request delReq = new Request("DELETE", "/_slm/policy/" + policyName); + assertOK(client().performRequest(delReq)); + + // It's possible there could have been a snapshot in progress when the + // policy is deleted, so wait for it to be finished + assertBusy(() -> { + assertThat(wipeSnapshots().size(), equalTo(0)); + }); + } + @SuppressWarnings("unchecked") private static Map extractMetadata(Map snapshotResponseMap, String snapshotPrefix) { List> snapResponse = ((List>) snapshotResponseMap.get("responses")).stream() @@ -284,6 +363,13 @@ private void assertHistoryIsPresent(String policyName, boolean success, String r private void createSnapshotPolicy(String policyName, String snapshotNamePattern, String schedule, String repoId, String indexPattern, boolean ignoreUnavailable) throws IOException { + createSnapshotPolicy(policyName, snapshotNamePattern, schedule, repoId, indexPattern, + ignoreUnavailable, SnapshotRetentionConfiguration.EMPTY); + } + + private void createSnapshotPolicy(String policyName, String snapshotNamePattern, String schedule, String repoId, + String indexPattern, boolean ignoreUnavailable, + SnapshotRetentionConfiguration retention) throws IOException { Map snapConfig = new HashMap<>(); snapConfig.put("indices", Collections.singletonList(indexPattern)); snapConfig.put("ignore_unavailable", ignoreUnavailable); @@ -295,8 +381,8 @@ private void createSnapshotPolicy(String policyName, String snapshotNamePattern, () -> randomAlphaOfLength(5)), randomAlphaOfLength(4)); } } - SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyName, snapshotNamePattern, schedule, repoId, snapConfig, - SnapshotRetentionConfiguration.EMPTY); + SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyName, snapshotNamePattern, schedule, + repoId, snapConfig, retention); Request putLifecycle = new Request("PUT", "/_slm/policy/" + policyName); XContentBuilder lifecycleBuilder = JsonXContent.contentBuilder(); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index bee258964c5eb..8a71a912ca72f 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -8,20 +8,31 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; +import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecycleMetadata; import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy; +import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotRetentionConfiguration; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * The {@code SnapshotRetentionTask} is invoked by the scheduled job from the @@ -60,40 +71,121 @@ public void triggered(SchedulerEngine.Event event) { .map(SnapshotLifecyclePolicy::getRepository) .collect(Collectors.toSet()); - // Find all the snapshots that are past their retention date - // TODO: include min/max snapshot count as a criteria for deletion also - final List snapshotsToBeDeleted = getAllSnapshots(repositioriesToFetch).stream() - .filter(snapshot -> snapshotEligibleForDeletion(snapshot, policiesWithRetention)) - .collect(Collectors.toList()); + getAllSnapshots(repositioriesToFetch, new ActionListener<>() { + @Override + public void onResponse(List> allSnapshots) { + // Find all the snapshots that are past their retention date + final List> snapshotsToBeDeleted = allSnapshots.stream() + .filter(snapshot -> snapshotEligibleForDeletion(snapshot.v2(), allSnapshots, policiesWithRetention)) + .collect(Collectors.toList()); - // Finally, delete the snapshots that need to be deleted - deleteSnapshots(snapshotsToBeDeleted); + // Finally, delete the snapshots that need to be deleted + deleteSnapshots(snapshotsToBeDeleted); + } + + @Override + public void onFailure(Exception e) { + running.set(false); + } + }, err -> running.set(false)); } finally { running.set(false); } } else { - logger.debug("snapshot lifecycle retention task started, but a task is already running, skipping"); + logger.trace("snapshot lifecycle retention task started, but a task is already running, skipping"); } } static Map getAllPoliciesWithRetentionEnabled(final ClusterState state) { - // TODO: fill me in - return Collections.emptyMap(); + final SnapshotLifecycleMetadata snapMeta = state.metaData().custom(SnapshotLifecycleMetadata.TYPE); + if (snapMeta == null) { + return Collections.emptyMap(); + } + return snapMeta.getSnapshotConfigurations().entrySet().stream() + .filter(e -> e.getValue().getPolicy().getRetentionPolicy() != null) + .filter(e -> e.getValue().getPolicy().getRetentionPolicy().equals(SnapshotRetentionConfiguration.EMPTY) == false) + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getPolicy())); } - static boolean snapshotEligibleForDeletion(SnapshotInfo snapshot, Map policies) { - // TODO: fill me in - return false; + static boolean snapshotEligibleForDeletion(SnapshotInfo snapshot, List> allSnapshots, + Map policies) { + if (snapshot.userMetadata() == null) { + // This snapshot has no metadata, it is not eligible for deletion + return false; + } + + final String policyId; + try { + policyId = (String) snapshot.userMetadata().get(SnapshotLifecyclePolicy.POLICY_ID_METADATA_FIELD); + } catch (Exception e) { + logger.error("unable to retrieve policy id from snapshot metadata [" + snapshot.userMetadata() + "]", e); + throw e; + } + + SnapshotLifecyclePolicy policy = policies.get(policyId); + if (policy == null) { + // This snapshot was taking by a policy that doesn't exist, so it's not eligible + return false; + } + + SnapshotRetentionConfiguration retention = policy.getRetentionPolicy(); + if (retention == null || retention.equals(SnapshotRetentionConfiguration.EMPTY)) { + // Retention is not configured + return false; + } + + final String repository = policy.getRepository(); + // Retrieve the predicate based on the retention policy, passing in snapshots pertaining only to *this* policy and repository + boolean eligible = retention.getSnapshotDeletionPredicate( + allSnapshots.stream() + .filter(t -> t.v1().equals(repository)) + .filter(t -> Optional.ofNullable(t.v2().userMetadata()) + .map(meta -> meta.get(SnapshotLifecyclePolicy.POLICY_ID_METADATA_FIELD)) + .map(pId -> pId.equals(policyId)) + .orElse(false)) + .map(Tuple::v2).collect(Collectors.toList())) + .test(snapshot); + logger.debug("[{}] testing snapshot [{}] deletion eligibility: {}", + repository, snapshot.snapshotId(), eligible ? "ELIGIBLE" : "INELIGIBLE"); + return eligible; } - List getAllSnapshots(Collection repositories) { - // TODO: fill me in - return Collections.emptyList(); + void getAllSnapshots(Collection repositories, ActionListener>> listener, + Consumer errorHandler) { + client.admin().cluster().prepareGetSnapshots(repositories.toArray(Strings.EMPTY_ARRAY)) + .setIgnoreUnavailable(true) + .execute(new ActionListener() { + @Override + public void onResponse(final GetSnapshotsResponse resp) { + listener.onResponse(repositories.stream() + .flatMap(repo -> { + try { + return resp.getSnapshots(repo).stream() + .map(si -> new Tuple<>(repo, si)); + } catch (Exception e) { + logger.debug(new ParameterizedMessage("unable to retrieve snapshots for [{}] repository", repo), e); + return Stream.empty(); + } + }) + .collect(Collectors.toList())); + } + + @Override + public void onFailure(Exception e) { + logger.debug(new ParameterizedMessage("unable to retrieve snapshots for [{}] repositories", repositories), e); + errorHandler.accept(e); + } + }); } - void deleteSnapshots(List snapshotsToDelete) { - // TODO: fill me in - logger.info("deleting {}", snapshotsToDelete); + void deleteSnapshots(List> snapshotsToDelete) { + // TODO: make this more resilient and possibly only delete for a certain amount of time + logger.info("starting snapshot retention deletion for [{}] snapshots", snapshotsToDelete.size()); + CountDownLatch latch = new CountDownLatch(snapshotsToDelete.size()); + snapshotsToDelete.forEach(snap -> { + logger.info("[{}] snapshot retention deleting snapshot [{}]", snap.v1(), snap.v2().snapshotId()); + client.admin().cluster().prepareDeleteSnapshot(snap.v1(), snap.v2().snapshotId().getName()).get(); + }); } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java new file mode 100644 index 0000000000000..6f3aabdc11781 --- /dev/null +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -0,0 +1,213 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.slm; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.indexlifecycle.OperationMode; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; +import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecycleMetadata; +import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy; +import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicyMetadata; +import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotRetentionConfiguration; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.equalTo; + +public class SnapshotRetentionTaskTests extends ESTestCase { + + public void testGetAllPoliciesWithRetentionEnabled() { + SnapshotLifecyclePolicy policyWithout = new SnapshotLifecyclePolicy("policyWithout", "snap", "1 * * * * ?", + "repo", null, SnapshotRetentionConfiguration.EMPTY); + SnapshotLifecyclePolicy policyWithout2 = new SnapshotLifecyclePolicy("policyWithout2", "snap", "1 * * * * ?", + "repo", null, new SnapshotRetentionConfiguration(null)); + SnapshotLifecyclePolicy policyWith = new SnapshotLifecyclePolicy("policyWith", "snap", "1 * * * * ?", + "repo", null, new SnapshotRetentionConfiguration(TimeValue.timeValueDays(30))); + + // Test with no SLM metadata + ClusterState state = ClusterState.builder(new ClusterName("cluster")).build(); + assertThat(SnapshotRetentionTask.getAllPoliciesWithRetentionEnabled(state), equalTo(Collections.emptyMap())); + + // Test with empty SLM metadata + MetaData metaData = MetaData.builder() + .putCustom(SnapshotLifecycleMetadata.TYPE, new SnapshotLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING)) + .build(); + state = ClusterState.builder(new ClusterName("cluster")).metaData(metaData).build(); + assertThat(SnapshotRetentionTask.getAllPoliciesWithRetentionEnabled(state), equalTo(Collections.emptyMap())); + + // Test with metadata containing only a policy without retention + state = createState(policyWithout); + assertThat(SnapshotRetentionTask.getAllPoliciesWithRetentionEnabled(state), equalTo(Collections.emptyMap())); + + // Test with metadata containing a couple of policies + state = createState(policyWithout, policyWithout2, policyWith); + Map policyMap = SnapshotRetentionTask.getAllPoliciesWithRetentionEnabled(state); + assertThat(policyMap.size(), equalTo(1)); + assertThat(policyMap.get("policyWith"), equalTo(policyWith)); + } + + public void testSnapshotEligibleForDeletion() { + SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy("policy", "snap", "1 * * * * ?", + "repo", null, new SnapshotRetentionConfiguration(TimeValue.timeValueDays(30))); + SnapshotLifecyclePolicy policyWithNoRetention = new SnapshotLifecyclePolicy("policy", "snap", "1 * * * * ?", + "repo", null, randomBoolean() ? null : SnapshotRetentionConfiguration.EMPTY); + Map policyMap = Collections.singletonMap("policy", policy); + Map policyWithNoRetentionMap = Collections.singletonMap("policy", policyWithNoRetention); + Function>> mkInfos = i -> Collections.singletonList(new Tuple<>("repo", i)); + + // Test when user metadata is null + SnapshotInfo info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), + 0L, "reason", 1L, 1, Collections.emptyList(), true, null); + assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(false)); + + // Test when no retention is configured + info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), + 0L, "reason", 1L, 1, Collections.emptyList(), true, null); + assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyWithNoRetentionMap), equalTo(false)); + + // Test when user metadata is a map that doesn't contain "policy" + info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), + 0L, "reason", 1L, 1, Collections.emptyList(), true, Collections.singletonMap("foo", "bar")); + assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(false)); + + // Test with an ancient snapshot that should be expunged + info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), + 0L, "reason", 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(true)); + + // Test with a snapshot that's start date is old enough to be expunged (but the finish date is not) + long time = System.currentTimeMillis() - TimeValue.timeValueDays(30).millis() - 1; + info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), + time, "reason", time + TimeValue.timeValueDays(4).millis(), 1, Collections.emptyList(), + true, Collections.singletonMap("policy", "policy")); + assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(true)); + + // Test with a fresh snapshot that should not be expunged + info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), + System.currentTimeMillis(), "reason", System.currentTimeMillis() + 1, + 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + assertThat(SnapshotRetentionTask.snapshotEligibleForDeletion(info, mkInfos.apply(info), policyMap), equalTo(false)); + } + + public void testRetentionTask() throws Exception { + try (ThreadPool threadPool = new TestThreadPool("slm-test"); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + Client noOpClient = new NoOpClient("slm-test")) { + + SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy("policy", "snap", "1 * * * * ?", + "repo", null, new SnapshotRetentionConfiguration(TimeValue.timeValueDays(30))); + + ClusterState state = createState(policy); + ClusterServiceUtils.setState(clusterService, state); + + final SnapshotInfo eligibleSnapshot = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), + 0L, "reason", 1L, 1, Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + final SnapshotInfo ineligibleSnapshot = new SnapshotInfo(new SnapshotId("name2", "uuid2"), Collections.singletonList("index"), + System.currentTimeMillis(), "reason", System.currentTimeMillis() + 1, 1, + Collections.emptyList(), true, Collections.singletonMap("policy", "policy")); + + AtomicReference> deleted = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + MockSnapshotRetentionTask retentionTask = new MockSnapshotRetentionTask(noOpClient, clusterService, + () -> { + List> snaps = new ArrayList<>(2); + snaps.add(new Tuple<>("repo", eligibleSnapshot)); + snaps.add(new Tuple<>("repo", ineligibleSnapshot)); + logger.info("--> retrieving snapshots [{}]", snaps); + return snaps; + }, + snapsToDelete -> { + logger.info("--> deleting {}", snapsToDelete); + deleted.set(snapsToDelete.stream().map(Tuple::v2).collect(Collectors.toList())); + latch.countDown(); + }); + + long time = System.currentTimeMillis(); + retentionTask.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_JOB_ID, time, time)); + + latch.await(10, TimeUnit.SECONDS); + + assertNotNull("something should have been deleted", deleted.get()); + assertThat("one snapshot should have been deleted", deleted.get().size(), equalTo(1)); + assertThat(deleted.get().get(0), equalTo(eligibleSnapshot)); + + threadPool.shutdownNow(); + threadPool.awaitTermination(10, TimeUnit.SECONDS); + } + } + + public ClusterState createState(SnapshotLifecyclePolicy... policies) { + Map policyMetadataMap = Arrays.stream(policies) + .map(policy -> SnapshotLifecyclePolicyMetadata.builder() + .setPolicy(policy) + .setHeaders(Collections.emptyMap()) + .setModifiedDate(randomNonNegativeLong()) + .setVersion(randomNonNegativeLong()) + .build()) + .collect(Collectors.toMap(pm -> pm.getPolicy().getId(), pm -> pm)); + + MetaData metaData = MetaData.builder() + .putCustom(SnapshotLifecycleMetadata.TYPE, new SnapshotLifecycleMetadata(policyMetadataMap, OperationMode.RUNNING)) + .build(); + return ClusterState.builder(new ClusterName("cluster")) + .metaData(metaData) + .build(); + } + + private class MockSnapshotRetentionTask extends SnapshotRetentionTask { + + private final Supplier>> snapshotRetriever; + private final Consumer>> snapshotDeleter; + + MockSnapshotRetentionTask(Client client, + ClusterService clusterService, + Supplier>> snapshotRetriever, + Consumer>> snapshotDeleter) { + super(client, clusterService); + this.snapshotRetriever = snapshotRetriever; + this.snapshotDeleter = snapshotDeleter; + } + + @Override + void getAllSnapshots(Collection repositories, + ActionListener>> listener, + Consumer errorHandler) { + listener.onResponse(this.snapshotRetriever.get()); + } + + @Override + void deleteSnapshots(List> snapshotsToDelete) { + this.snapshotDeleter.accept(snapshotsToDelete); + } + } +} From cd59fd324d276bd00c5df70fb09b06763967b248 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 23 Jul 2019 11:01:21 -0600 Subject: [PATCH 2/6] Fix deletes running on the wrong thread --- .../xpack/slm/SnapshotLifecycleIT.java | 46 ++++++++++++------- .../xpack/slm/SnapshotRetentionTask.java | 28 ++++++++++- 2 files changed, 55 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java index 4c87e2084678e..daa53210ac277 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java @@ -280,25 +280,37 @@ public void testBasicTimeBasedRetenion() throws Exception { Response updateSettingsResp = client().performRequest(r); } - // Check that the snapshot created by the policy has been removed by retention - assertBusy(() -> { - // We expect a failed response because the snapshot should not exist - try { - Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName)); - assertThat(EntityUtils.toString(response.getEntity()), containsString("snapshot_missing_exception")); - } catch (ResponseException e) { - assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception")); - } - }); + try { + // Check that the snapshot created by the policy has been removed by retention + assertBusy(() -> { + // We expect a failed response because the snapshot should not exist + try { + Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName)); + assertThat(EntityUtils.toString(response.getEntity()), containsString("snapshot_missing_exception")); + } catch (ResponseException e) { + assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception")); + } + }); - Request delReq = new Request("DELETE", "/_slm/policy/" + policyName); - assertOK(client().performRequest(delReq)); + Request delReq = new Request("DELETE", "/_slm/policy/" + policyName); + assertOK(client().performRequest(delReq)); - // It's possible there could have been a snapshot in progress when the - // policy is deleted, so wait for it to be finished - assertBusy(() -> { - assertThat(wipeSnapshots().size(), equalTo(0)); - }); + // It's possible there could have been a snapshot in progress when the + // policy is deleted, so wait for it to be finished + assertBusy(() -> { + assertThat(wipeSnapshots().size(), equalTo(0)); + }); + } finally { + // Unset retention + ClusterUpdateSettingsRequest unsetRequest = new ClusterUpdateSettingsRequest(); + unsetRequest.transientSettings(Settings.builder().put(LifecycleSettings.SLM_RETENTION_SCHEDULE, (String) null)); + try (XContentBuilder builder = jsonBuilder()) { + unsetRequest.toXContent(builder, ToXContent.EMPTY_PARAMS); + Request r = new Request("PUT", "/_cluster/settings"); + r.setJsonEntity(Strings.toString(builder)); + client().performRequest(r); + } + } } @SuppressWarnings("unchecked") diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index 8a71a912ca72f..2e96719c427b3 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -10,7 +10,9 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; @@ -182,10 +184,32 @@ public void onFailure(Exception e) { void deleteSnapshots(List> snapshotsToDelete) { // TODO: make this more resilient and possibly only delete for a certain amount of time logger.info("starting snapshot retention deletion for [{}] snapshots", snapshotsToDelete.size()); - CountDownLatch latch = new CountDownLatch(snapshotsToDelete.size()); snapshotsToDelete.forEach(snap -> { logger.info("[{}] snapshot retention deleting snapshot [{}]", snap.v1(), snap.v2().snapshotId()); - client.admin().cluster().prepareDeleteSnapshot(snap.v1(), snap.v2().snapshotId().getName()).get(); + CountDownLatch latch = new CountDownLatch(1); + client.admin().cluster().prepareDeleteSnapshot(snap.v1(), snap.v2().snapshotId().getName()) + .execute(new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + if (acknowledgedResponse.isAcknowledged()) { + logger.debug("[{}] snapshot [{}] deleted successfully", snap.v1(), snap.v2().snapshotId()); + } + } + + @Override + public void onFailure(Exception e) { + logger.warn(new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention", + snap.v1(), snap.v2().snapshotId()), e); + } + }, latch)); + try { + // Deletes cannot occur simultaneously, so wait for this + // deletion to complete before attempting the next one + latch.await(); + } catch (InterruptedException e) { + logger.error(new ParameterizedMessage("[{}] deletion of snapshot [{}] interrupted", + snap.v1(), snap.v2().snapshotId()), e); + } }); } } From 8d799955ebbee29d714112ca841417b4ac8ea527 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 24 Jul 2019 14:33:49 -0600 Subject: [PATCH 3/6] Handle missing or null policy in snap metadata differently --- .../elasticsearch/xpack/slm/SnapshotRetentionTask.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index 2e96719c427b3..6bddc81187b8d 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -121,8 +121,13 @@ static boolean snapshotEligibleForDeletion(SnapshotInfo snapshot, List Date: Wed, 24 Jul 2019 14:58:43 -0600 Subject: [PATCH 4/6] Convert Tuple> to Map> --- .../xpack/slm/SnapshotRetentionTask.java | 99 ++++++++++--------- .../xpack/slm/SnapshotRetentionTaskTests.java | 26 ++--- 2 files changed, 63 insertions(+), 62 deletions(-) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index 6bddc81187b8d..ebf5057485292 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -17,7 +17,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecycleMetadata; @@ -26,6 +25,7 @@ import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.stream.Collectors; -import java.util.stream.Stream; /** * The {@code SnapshotRetentionTask} is invoked by the scheduled job from the @@ -75,11 +74,13 @@ public void triggered(SchedulerEngine.Event event) { getAllSnapshots(repositioriesToFetch, new ActionListener<>() { @Override - public void onResponse(List> allSnapshots) { + public void onResponse(Map> allSnapshots) { // Find all the snapshots that are past their retention date - final List> snapshotsToBeDeleted = allSnapshots.stream() - .filter(snapshot -> snapshotEligibleForDeletion(snapshot.v2(), allSnapshots, policiesWithRetention)) - .collect(Collectors.toList()); + final Map> snapshotsToBeDeleted = allSnapshots.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, + e -> e.getValue().stream() + .filter(snapshot -> snapshotEligibleForDeletion(snapshot, allSnapshots, policiesWithRetention)) + .collect(Collectors.toList()))); // Finally, delete the snapshots that need to be deleted deleteSnapshots(snapshotsToBeDeleted); @@ -110,7 +111,7 @@ static Map getAllPoliciesWithRetentionEnabled(f .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getPolicy())); } - static boolean snapshotEligibleForDeletion(SnapshotInfo snapshot, List> allSnapshots, + static boolean snapshotEligibleForDeletion(SnapshotInfo snapshot, Map> allSnapshots, Map policies) { if (snapshot.userMetadata() == null) { // This snapshot has no metadata, it is not eligible for deletion @@ -145,37 +146,35 @@ static boolean snapshotEligibleForDeletion(SnapshotInfo snapshot, List t.v1().equals(repository)) - .filter(t -> Optional.ofNullable(t.v2().userMetadata()) + allSnapshots.get(repository).stream() + .filter(info -> Optional.ofNullable(info.userMetadata()) .map(meta -> meta.get(SnapshotLifecyclePolicy.POLICY_ID_METADATA_FIELD)) .map(pId -> pId.equals(policyId)) .orElse(false)) - .map(Tuple::v2).collect(Collectors.toList())) + .collect(Collectors.toList())) .test(snapshot); logger.debug("[{}] testing snapshot [{}] deletion eligibility: {}", repository, snapshot.snapshotId(), eligible ? "ELIGIBLE" : "INELIGIBLE"); return eligible; } - void getAllSnapshots(Collection repositories, ActionListener>> listener, + void getAllSnapshots(Collection repositories, ActionListener>> listener, Consumer errorHandler) { + if (repositories.isEmpty()) { + // Skip retrieving anything if there are no repositories to fetch + listener.onResponse(Collections.emptyMap()); + } + client.admin().cluster().prepareGetSnapshots(repositories.toArray(Strings.EMPTY_ARRAY)) .setIgnoreUnavailable(true) - .execute(new ActionListener() { + .execute(new ActionListener<>() { @Override public void onResponse(final GetSnapshotsResponse resp) { - listener.onResponse(repositories.stream() - .flatMap(repo -> { - try { - return resp.getSnapshots(repo).stream() - .map(si -> new Tuple<>(repo, si)); - } catch (Exception e) { - logger.debug(new ParameterizedMessage("unable to retrieve snapshots for [{}] repository", repo), e); - return Stream.empty(); - } - }) - .collect(Collectors.toList())); + Map> snapshots = new HashMap<>(); + repositories.forEach(repo -> { + snapshots.put(repo, resp.getSnapshots(repo)); + }); + listener.onResponse(snapshots); } @Override @@ -186,35 +185,37 @@ public void onFailure(Exception e) { }); } - void deleteSnapshots(List> snapshotsToDelete) { + void deleteSnapshots(Map> snapshotsToDelete) { // TODO: make this more resilient and possibly only delete for a certain amount of time logger.info("starting snapshot retention deletion for [{}] snapshots", snapshotsToDelete.size()); - snapshotsToDelete.forEach(snap -> { - logger.info("[{}] snapshot retention deleting snapshot [{}]", snap.v1(), snap.v2().snapshotId()); - CountDownLatch latch = new CountDownLatch(1); - client.admin().cluster().prepareDeleteSnapshot(snap.v1(), snap.v2().snapshotId().getName()) - .execute(new LatchedActionListener<>(new ActionListener<>() { - @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { - if (acknowledgedResponse.isAcknowledged()) { - logger.debug("[{}] snapshot [{}] deleted successfully", snap.v1(), snap.v2().snapshotId()); + snapshotsToDelete.forEach((repo, snapshots) -> { + snapshots.forEach(info -> { + logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, info.snapshotId()); + CountDownLatch latch = new CountDownLatch(1); + client.admin().cluster().prepareDeleteSnapshot(repo, info.snapshotId().getName()) + .execute(new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + if (acknowledgedResponse.isAcknowledged()) { + logger.debug("[{}] snapshot [{}] deleted successfully", repo, info.snapshotId()); + } } - } - @Override - public void onFailure(Exception e) { - logger.warn(new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention", - snap.v1(), snap.v2().snapshotId()), e); - } - }, latch)); - try { - // Deletes cannot occur simultaneously, so wait for this - // deletion to complete before attempting the next one - latch.await(); - } catch (InterruptedException e) { - logger.error(new ParameterizedMessage("[{}] deletion of snapshot [{}] interrupted", - snap.v1(), snap.v2().snapshotId()), e); - } + @Override + public void onFailure(Exception e) { + logger.warn(new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention", + repo, info.snapshotId()), e); + } + }, latch)); + try { + // Deletes cannot occur simultaneously, so wait for this + // deletion to complete before attempting the next one + latch.await(); + } catch (InterruptedException e) { + logger.error(new ParameterizedMessage("[{}] deletion of snapshot [{}] interrupted", + repo, info.snapshotId()), e); + } + }); }); } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java index 6f3aabdc11781..fb560798907d6 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -12,7 +12,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; @@ -83,7 +82,8 @@ public void testSnapshotEligibleForDeletion() { "repo", null, randomBoolean() ? null : SnapshotRetentionConfiguration.EMPTY); Map policyMap = Collections.singletonMap("policy", policy); Map policyWithNoRetentionMap = Collections.singletonMap("policy", policyWithNoRetention); - Function>> mkInfos = i -> Collections.singletonList(new Tuple<>("repo", i)); + Function>> mkInfos = i -> + Collections.singletonMap("repo", Collections.singletonList(i)); // Test when user metadata is null SnapshotInfo info = new SnapshotInfo(new SnapshotId("name", "uuid"), Collections.singletonList("index"), @@ -140,15 +140,15 @@ public void testRetentionTask() throws Exception { CountDownLatch latch = new CountDownLatch(1); MockSnapshotRetentionTask retentionTask = new MockSnapshotRetentionTask(noOpClient, clusterService, () -> { - List> snaps = new ArrayList<>(2); - snaps.add(new Tuple<>("repo", eligibleSnapshot)); - snaps.add(new Tuple<>("repo", ineligibleSnapshot)); + List snaps = new ArrayList<>(2); + snaps.add(eligibleSnapshot); + snaps.add(ineligibleSnapshot); logger.info("--> retrieving snapshots [{}]", snaps); - return snaps; + return Collections.singletonMap("repo", snaps); }, snapsToDelete -> { logger.info("--> deleting {}", snapsToDelete); - deleted.set(snapsToDelete.stream().map(Tuple::v2).collect(Collectors.toList())); + deleted.set(snapsToDelete.values().stream().flatMap(Collection::stream).collect(Collectors.toList())); latch.countDown(); }); @@ -186,13 +186,13 @@ public ClusterState createState(SnapshotLifecyclePolicy... policies) { private class MockSnapshotRetentionTask extends SnapshotRetentionTask { - private final Supplier>> snapshotRetriever; - private final Consumer>> snapshotDeleter; + private final Supplier>> snapshotRetriever; + private final Consumer>> snapshotDeleter; MockSnapshotRetentionTask(Client client, ClusterService clusterService, - Supplier>> snapshotRetriever, - Consumer>> snapshotDeleter) { + Supplier>> snapshotRetriever, + Consumer>> snapshotDeleter) { super(client, clusterService); this.snapshotRetriever = snapshotRetriever; this.snapshotDeleter = snapshotDeleter; @@ -200,13 +200,13 @@ private class MockSnapshotRetentionTask extends SnapshotRetentionTask { @Override void getAllSnapshots(Collection repositories, - ActionListener>> listener, + ActionListener>> listener, Consumer errorHandler) { listener.onResponse(this.snapshotRetriever.get()); } @Override - void deleteSnapshots(List> snapshotsToDelete) { + void deleteSnapshots(Map> snapshotsToDelete) { this.snapshotDeleter.accept(snapshotsToDelete); } } From ded1c608d3a4e4d8476074f6e5e2536aba324a1a Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 24 Jul 2019 16:41:53 -0600 Subject: [PATCH 5/6] Use the `OriginSettingClient` to work with security, enhance logging --- .../xpack/slm/SnapshotRetentionTask.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index ebf5057485292..b71366feea60e 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -14,10 +14,12 @@ import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecycleMetadata; import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy; @@ -50,7 +52,7 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener { private final ClusterService clusterService; public SnapshotRetentionTask(Client client, ClusterService clusterService) { - this.client = client; + this.client = new OriginSettingClient(client, ClientHelper.INDEX_LIFECYCLE_ORIGIN); this.clusterService = clusterService; } @@ -165,7 +167,8 @@ void getAllSnapshots(Collection repositories, ActionListener() { @Override @@ -187,7 +190,12 @@ public void onFailure(Exception e) { void deleteSnapshots(Map> snapshotsToDelete) { // TODO: make this more resilient and possibly only delete for a certain amount of time - logger.info("starting snapshot retention deletion for [{}] snapshots", snapshotsToDelete.size()); + int count = snapshotsToDelete.values().stream().mapToInt(List::size).sum(); + if (count == 0) { + logger.debug("no snapshots are eligible for deletion"); + return; + } + logger.info("starting snapshot retention deletion for [{}] snapshots", count); snapshotsToDelete.forEach((repo, snapshots) -> { snapshots.forEach(info -> { logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, info.snapshotId()); From 83a91e0795915e2a3b22d720931ad219d34f975a Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 24 Jul 2019 17:03:59 -0600 Subject: [PATCH 6/6] Prevent NPE in test by mocking Client --- .../xpack/slm/SnapshotRetentionServiceTests.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java index e17460f0d9cbd..04a763224f31b 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.slm; import org.elasticsearch.Version; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.service.ClusterService; @@ -27,6 +28,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; public class SnapshotRetentionServiceTests extends ESTestCase { @@ -65,7 +67,7 @@ public void testJobsAreScheduled() { private static class FakeRetentionTask extends SnapshotRetentionTask { FakeRetentionTask() { - super(null, null); + super(mock(Client.class), null); } @Override