From 9879a4b6fe6cb0b42d7221b183962754f27b6754 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Thu, 27 Jun 2019 07:05:42 -0600 Subject: [PATCH] Add base framework for snapshot retention (#43605) * Add base framework for snapshot retention This adds a basic `SnapshotRetentionService` and `SnapshotRetentionTask` to start as the basis for SLM's retention implementation. Relates to #38461 * Remove extraneous 'public' * Use a local var instead of reading class var repeatedly --- .../indexlifecycle/LifecycleSettings.java | 15 +++ .../xpack/indexlifecycle/IndexLifecycle.java | 13 ++- .../SnapshotRetentionService.java | 100 ++++++++++++++++++ .../SnapshotRetentionTask.java | 99 +++++++++++++++++ .../SnapshotLifecycleServiceTests.java | 2 +- .../SnapshotRetentionServiceTests.java | 76 +++++++++++++ 6 files changed, 301 insertions(+), 4 deletions(-) create mode 100644 x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotRetentionService.java create mode 100644 x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotRetentionTask.java create mode 100644 x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotRetentionServiceTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleSettings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleSettings.java index 0a157b8197a10..cfbd56a7d9898 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleSettings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleSettings.java @@ -5,8 +5,10 @@ */ package org.elasticsearch.xpack.core.indexlifecycle; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.scheduler.CronSchedule; /** * Class encapsulating settings related to Index Lifecycle Management X-Pack Plugin @@ -17,6 +19,8 @@ public class LifecycleSettings { public static final String LIFECYCLE_INDEXING_COMPLETE = "index.lifecycle.indexing_complete"; public static final String SLM_HISTORY_INDEX_ENABLED = "slm.history_index_enabled"; + public static final String SLM_RETENTION_SCHEDULE = "slm.retention_schedule"; + public static final Setting LIFECYCLE_POLL_INTERVAL_SETTING = Setting.timeSetting(LIFECYCLE_POLL_INTERVAL, TimeValue.timeValueMinutes(10), TimeValue.timeValueSeconds(1), Setting.Property.Dynamic, Setting.Property.NodeScope); @@ -27,4 +31,15 @@ public class LifecycleSettings { public static final Setting SLM_HISTORY_INDEX_ENABLED_SETTING = Setting.boolSetting(SLM_HISTORY_INDEX_ENABLED, true, Setting.Property.NodeScope); + public static final Setting SLM_RETENTION_SCHEDULE_SETTING = Setting.simpleString(SLM_RETENTION_SCHEDULE, str -> { + try { + if (Strings.hasText(str)) { + // Test that the setting is a valid cron syntax + new CronSchedule(str); + } + } catch (Exception e) { + throw new IllegalArgumentException("invalid cron expression [" + str + "] for SLM retention schedule [" + + SLM_RETENTION_SCHEDULE + "]", e); + } + }, Setting.Property.Dynamic, Setting.Property.NodeScope); } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java index f346267f16b98..4c2f3e4d89473 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java @@ -89,6 +89,8 @@ import org.elasticsearch.xpack.indexlifecycle.action.TransportStopILMAction; import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleService; import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleTask; +import org.elasticsearch.xpack.snapshotlifecycle.SnapshotRetentionService; +import org.elasticsearch.xpack.snapshotlifecycle.SnapshotRetentionTask; import org.elasticsearch.xpack.snapshotlifecycle.action.RestDeleteSnapshotLifecycleAction; import org.elasticsearch.xpack.snapshotlifecycle.action.RestExecuteSnapshotLifecycleAction; import org.elasticsearch.xpack.snapshotlifecycle.action.RestGetSnapshotLifecycleAction; @@ -111,6 +113,7 @@ public class IndexLifecycle extends Plugin implements ActionPlugin { private final SetOnce indexLifecycleInitialisationService = new SetOnce<>(); private final SetOnce snapshotLifecycleService = new SetOnce<>(); + private final SetOnce snapshotRetentionService = new SetOnce<>(); private final SetOnce snapshotHistoryStore = new SetOnce<>(); private Settings settings; private boolean enabled; @@ -132,7 +135,8 @@ public List> getSettings() { LifecycleSettings.LIFECYCLE_NAME_SETTING, LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING, RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING, - LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING); + LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING, + LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING); } @Override @@ -150,7 +154,10 @@ public Collection createComponents(Client client, ClusterService cluster snapshotHistoryStore.set(new SnapshotHistoryStore(settings, client, getClock().getZone())); snapshotLifecycleService.set(new SnapshotLifecycleService(settings, () -> new SnapshotLifecycleTask(client, clusterService, snapshotHistoryStore.get()), clusterService, getClock())); - return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotHistoryStore.get()); + snapshotRetentionService.set(new SnapshotRetentionService(settings, () -> new SnapshotRetentionTask(client, clusterService), + clusterService, getClock())); + return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotHistoryStore.get(), + snapshotRetentionService.get()); } @Override @@ -240,7 +247,7 @@ public List getRestHandlers(Settings settings, RestController restC @Override public void close() { try { - IOUtils.close(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get()); + IOUtils.close(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotRetentionService.get()); } catch (IOException e) { throw new ElasticsearchException("unable to close index lifecycle services", e); } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotRetentionService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotRetentionService.java new file mode 100644 index 0000000000000..2e6a4dd20be24 --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotRetentionService.java @@ -0,0 +1,100 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.snapshotlifecycle; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.LocalNodeMasterListener; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings; +import org.elasticsearch.xpack.core.scheduler.CronSchedule; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; +import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy; + +import java.io.Closeable; +import java.time.Clock; +import java.util.function.Supplier; + +/** + * The {@code SnapshotRetentionService} is responsible for scheduling the period kickoff of SLM's + * snapshot retention. This means that when the retention schedule setting is configured, the + * scheduler schedules a job that, when triggered, will delete snapshots according to the retention + * policy configured in the {@link SnapshotLifecyclePolicy}. + */ +public class SnapshotRetentionService implements LocalNodeMasterListener, Closeable { + + static final String SLM_RETENTION_JOB_ID = "slm-retention-job"; + + private static final Logger logger = LogManager.getLogger(SnapshotRetentionService.class); + + private final SchedulerEngine scheduler; + + private volatile String slmRetentionSchedule; + + public SnapshotRetentionService(Settings settings, + Supplier taskSupplier, + ClusterService clusterService, + Clock clock) { + this.scheduler = new SchedulerEngine(settings, clock); + this.scheduler.register(taskSupplier.get()); + this.slmRetentionSchedule = LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING.get(settings); + clusterService.addLocalNodeMasterListener(this); + clusterService.getClusterSettings().addSettingsUpdateConsumer(LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING, + this::setUpdateSchedule); + } + + void setUpdateSchedule(String retentionSchedule) { + this.slmRetentionSchedule = retentionSchedule; + // The schedule has changed, so reschedule the retention job + rescheduleRetentionJob(); + } + + // Only used for testing + SchedulerEngine getScheduler() { + return this.scheduler; + } + + @Override + public void onMaster() { + rescheduleRetentionJob(); + } + + @Override + public void offMaster() { + cancelRetentionJob(); + } + + private void rescheduleRetentionJob() { + final String schedule = this.slmRetentionSchedule; + if (Strings.hasText(schedule)) { + final SchedulerEngine.Job retentionJob = new SchedulerEngine.Job(SLM_RETENTION_JOB_ID, + new CronSchedule(schedule)); + logger.debug("scheduling SLM retention job for [{}]", schedule); + this.scheduler.add(retentionJob); + } else { + // The schedule has been unset, so cancel the scheduled retention job + cancelRetentionJob(); + } + } + + private void cancelRetentionJob() { + this.scheduler.scheduledJobIds().forEach(this.scheduler::remove); + } + + @Override + public String executorName() { + return ThreadPool.Names.SNAPSHOT; + } + + @Override + public void close() { + this.scheduler.stop(); + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotRetentionTask.java new file mode 100644 index 0000000000000..ad51d5ff93d5f --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotRetentionTask.java @@ -0,0 +1,99 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.snapshotlifecycle; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; +import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +/** + * The {@code SnapshotRetentionTask} is invoked by the scheduled job from the + * {@link SnapshotRetentionService}. It is responsible for retrieving the snapshots for repositories + * that have an SLM policy configured, and then deleting the snapshots that fall outside the + * retention policy. + */ +public class SnapshotRetentionTask implements SchedulerEngine.Listener { + + private static final Logger logger = LogManager.getLogger(SnapshotRetentionTask.class); + private static final AtomicBoolean running = new AtomicBoolean(false); + + private final Client client; + private final ClusterService clusterService; + + public SnapshotRetentionTask(Client client, ClusterService clusterService) { + this.client = client; + this.clusterService = clusterService; + } + + @Override + public void triggered(SchedulerEngine.Event event) { + assert event.getJobName().equals(SnapshotRetentionService.SLM_RETENTION_JOB_ID) : + "expected id to be " + SnapshotRetentionService.SLM_RETENTION_JOB_ID + " but it was " + event.getJobName(); + if (running.compareAndSet(false, true)) { + try { + logger.info("starting SLM retention snapshot cleanup task"); + final ClusterState state = clusterService.state(); + + // Find all SLM policies that have retention enabled + final Map policiesWithRetention = getAllPoliciesWithRetentionEnabled(state); + + // For those policies (there may be more than one for the same repo), + // return the repos that we need to get the snapshots for + final Set repositioriesToFetch = policiesWithRetention.values().stream() + .map(SnapshotLifecyclePolicy::getRepository) + .collect(Collectors.toSet()); + + // Find all the snapshots that are past their retention date + // TODO: include min/max snapshot count as a criteria for deletion also + final List snapshotsToBeDeleted = getAllSnapshots(repositioriesToFetch).stream() + .filter(snapshot -> snapshotEligibleForDeletion(snapshot, policiesWithRetention)) + .collect(Collectors.toList()); + + // Finally, delete the snapshots that need to be deleted + deleteSnapshots(snapshotsToBeDeleted); + + } finally { + running.set(false); + } + } else { + logger.debug("snapshot lifecycle retention task started, but a task is already running, skipping"); + } + } + + static Map getAllPoliciesWithRetentionEnabled(final ClusterState state) { + // TODO: fill me in + return Collections.emptyMap(); + } + + static boolean snapshotEligibleForDeletion(SnapshotInfo snapshot, Map policies) { + // TODO: fill me in + return false; + } + + List getAllSnapshots(Collection repositories) { + // TODO: fill me in + return Collections.emptyList(); + } + + void deleteSnapshots(List snapshotsToDelete) { + // TODO: fill me in + logger.info("deleting {}", snapshotsToDelete); + } +} diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleServiceTests.java index 801b774d418df..395aef7ee761e 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotLifecycleServiceTests.java @@ -331,7 +331,7 @@ public static SnapshotLifecyclePolicy createPolicy(String id, String schedule) { return new SnapshotLifecyclePolicy(id, randomAlphaOfLength(4), schedule, randomAlphaOfLength(4), config); } - private static String randomSchedule() { + public static String randomSchedule() { return randomIntBetween(0, 59) + " " + randomIntBetween(0, 59) + " " + randomIntBetween(0, 12) + " * * ?"; diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotRetentionServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotRetentionServiceTests.java new file mode 100644 index 0000000000000..ea5d932d18df2 --- /dev/null +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/snapshotlifecycle/SnapshotRetentionServiceTests.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.snapshotlifecycle; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; +import org.elasticsearch.xpack.core.watcher.watch.ClockMock; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; + +public class SnapshotRetentionServiceTests extends ESTestCase { + + private static final ClusterSettings clusterSettings; + static { + Set> internalSettings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + internalSettings.add(LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING); + clusterSettings = new ClusterSettings(Settings.EMPTY, internalSettings); + } + + public void testJobsAreScheduled() { + final DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), + Collections.emptyMap(), DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT); + ClockMock clock = new ClockMock(); + + try (ThreadPool threadPool = new TestThreadPool("test"); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings); + SnapshotRetentionService service = new SnapshotRetentionService(Settings.EMPTY, + FakeRetentionTask::new, clusterService, clock)) { + assertThat(service.getScheduler().jobCount(), equalTo(0)); + + service.setUpdateSchedule(SnapshotLifecycleServiceTests.randomSchedule()); + assertThat(service.getScheduler().scheduledJobIds(), containsInAnyOrder(SnapshotRetentionService.SLM_RETENTION_JOB_ID)); + + service.offMaster(); + assertThat(service.getScheduler().jobCount(), equalTo(0)); + + service.onMaster(); + assertThat(service.getScheduler().scheduledJobIds(), containsInAnyOrder(SnapshotRetentionService.SLM_RETENTION_JOB_ID)); + + service.setUpdateSchedule(""); + assertThat(service.getScheduler().jobCount(), equalTo(0)); + threadPool.shutdownNow(); + } + } + + private static class FakeRetentionTask extends SnapshotRetentionTask { + FakeRetentionTask() { + super(null, null); + } + + @Override + public void triggered(SchedulerEngine.Event event) { + super.triggered(event); + } + } +}