Skip to content

Commit

Permalink
Add base framework for snapshot retention (#43605)
Browse files Browse the repository at this point in the history
* Add base framework for snapshot retention

This adds a basic `SnapshotRetentionService` and `SnapshotRetentionTask`
to start as the basis for SLM's retention implementation.

Relates to #38461

* Remove extraneous 'public'

* Use a local var instead of reading class var repeatedly
  • Loading branch information
dakrone authored Jun 27, 2019
1 parent 6393ac8 commit 9879a4b
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
*/
package org.elasticsearch.xpack.core.indexlifecycle;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.scheduler.CronSchedule;

/**
* Class encapsulating settings related to Index Lifecycle Management X-Pack Plugin
Expand All @@ -17,6 +19,8 @@ public class LifecycleSettings {
public static final String LIFECYCLE_INDEXING_COMPLETE = "index.lifecycle.indexing_complete";

public static final String SLM_HISTORY_INDEX_ENABLED = "slm.history_index_enabled";
public static final String SLM_RETENTION_SCHEDULE = "slm.retention_schedule";


public static final Setting<TimeValue> LIFECYCLE_POLL_INTERVAL_SETTING = Setting.timeSetting(LIFECYCLE_POLL_INTERVAL,
TimeValue.timeValueMinutes(10), TimeValue.timeValueSeconds(1), Setting.Property.Dynamic, Setting.Property.NodeScope);
Expand All @@ -27,4 +31,15 @@ public class LifecycleSettings {

public static final Setting<Boolean> SLM_HISTORY_INDEX_ENABLED_SETTING = Setting.boolSetting(SLM_HISTORY_INDEX_ENABLED, true,
Setting.Property.NodeScope);
public static final Setting<String> SLM_RETENTION_SCHEDULE_SETTING = Setting.simpleString(SLM_RETENTION_SCHEDULE, str -> {
try {
if (Strings.hasText(str)) {
// Test that the setting is a valid cron syntax
new CronSchedule(str);
}
} catch (Exception e) {
throw new IllegalArgumentException("invalid cron expression [" + str + "] for SLM retention schedule [" +
SLM_RETENTION_SCHEDULE + "]", e);
}
}, Setting.Property.Dynamic, Setting.Property.NodeScope);
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@
import org.elasticsearch.xpack.indexlifecycle.action.TransportStopILMAction;
import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleService;
import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleTask;
import org.elasticsearch.xpack.snapshotlifecycle.SnapshotRetentionService;
import org.elasticsearch.xpack.snapshotlifecycle.SnapshotRetentionTask;
import org.elasticsearch.xpack.snapshotlifecycle.action.RestDeleteSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.RestExecuteSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.RestGetSnapshotLifecycleAction;
Expand All @@ -111,6 +113,7 @@
public class IndexLifecycle extends Plugin implements ActionPlugin {
private final SetOnce<IndexLifecycleService> indexLifecycleInitialisationService = new SetOnce<>();
private final SetOnce<SnapshotLifecycleService> snapshotLifecycleService = new SetOnce<>();
private final SetOnce<SnapshotRetentionService> snapshotRetentionService = new SetOnce<>();
private final SetOnce<SnapshotHistoryStore> snapshotHistoryStore = new SetOnce<>();
private Settings settings;
private boolean enabled;
Expand All @@ -132,7 +135,8 @@ public List<Setting<?>> getSettings() {
LifecycleSettings.LIFECYCLE_NAME_SETTING,
LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING,
RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING,
LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING);
LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING,
LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING);
}

@Override
Expand All @@ -150,7 +154,10 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
snapshotHistoryStore.set(new SnapshotHistoryStore(settings, client, getClock().getZone()));
snapshotLifecycleService.set(new SnapshotLifecycleService(settings,
() -> new SnapshotLifecycleTask(client, clusterService, snapshotHistoryStore.get()), clusterService, getClock()));
return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotHistoryStore.get());
snapshotRetentionService.set(new SnapshotRetentionService(settings, () -> new SnapshotRetentionTask(client, clusterService),
clusterService, getClock()));
return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotHistoryStore.get(),
snapshotRetentionService.get());
}

@Override
Expand Down Expand Up @@ -240,7 +247,7 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
@Override
public void close() {
try {
IOUtils.close(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get());
IOUtils.close(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotRetentionService.get());
} catch (IOException e) {
throw new ElasticsearchException("unable to close index lifecycle services", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.snapshotlifecycle;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
import org.elasticsearch.xpack.core.scheduler.CronSchedule;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy;

import java.io.Closeable;
import java.time.Clock;
import java.util.function.Supplier;

/**
* The {@code SnapshotRetentionService} is responsible for scheduling the period kickoff of SLM's
* snapshot retention. This means that when the retention schedule setting is configured, the
* scheduler schedules a job that, when triggered, will delete snapshots according to the retention
* policy configured in the {@link SnapshotLifecyclePolicy}.
*/
public class SnapshotRetentionService implements LocalNodeMasterListener, Closeable {

static final String SLM_RETENTION_JOB_ID = "slm-retention-job";

private static final Logger logger = LogManager.getLogger(SnapshotRetentionService.class);

private final SchedulerEngine scheduler;

private volatile String slmRetentionSchedule;

public SnapshotRetentionService(Settings settings,
Supplier<SnapshotRetentionTask> taskSupplier,
ClusterService clusterService,
Clock clock) {
this.scheduler = new SchedulerEngine(settings, clock);
this.scheduler.register(taskSupplier.get());
this.slmRetentionSchedule = LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING.get(settings);
clusterService.addLocalNodeMasterListener(this);
clusterService.getClusterSettings().addSettingsUpdateConsumer(LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING,
this::setUpdateSchedule);
}

void setUpdateSchedule(String retentionSchedule) {
this.slmRetentionSchedule = retentionSchedule;
// The schedule has changed, so reschedule the retention job
rescheduleRetentionJob();
}

// Only used for testing
SchedulerEngine getScheduler() {
return this.scheduler;
}

@Override
public void onMaster() {
rescheduleRetentionJob();
}

@Override
public void offMaster() {
cancelRetentionJob();
}

private void rescheduleRetentionJob() {
final String schedule = this.slmRetentionSchedule;
if (Strings.hasText(schedule)) {
final SchedulerEngine.Job retentionJob = new SchedulerEngine.Job(SLM_RETENTION_JOB_ID,
new CronSchedule(schedule));
logger.debug("scheduling SLM retention job for [{}]", schedule);
this.scheduler.add(retentionJob);
} else {
// The schedule has been unset, so cancel the scheduled retention job
cancelRetentionJob();
}
}

private void cancelRetentionJob() {
this.scheduler.scheduledJobIds().forEach(this.scheduler::remove);
}

@Override
public String executorName() {
return ThreadPool.Names.SNAPSHOT;
}

@Override
public void close() {
this.scheduler.stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.snapshotlifecycle;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/**
* The {@code SnapshotRetentionTask} is invoked by the scheduled job from the
* {@link SnapshotRetentionService}. It is responsible for retrieving the snapshots for repositories
* that have an SLM policy configured, and then deleting the snapshots that fall outside the
* retention policy.
*/
public class SnapshotRetentionTask implements SchedulerEngine.Listener {

private static final Logger logger = LogManager.getLogger(SnapshotRetentionTask.class);
private static final AtomicBoolean running = new AtomicBoolean(false);

private final Client client;
private final ClusterService clusterService;

public SnapshotRetentionTask(Client client, ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
}

@Override
public void triggered(SchedulerEngine.Event event) {
assert event.getJobName().equals(SnapshotRetentionService.SLM_RETENTION_JOB_ID) :
"expected id to be " + SnapshotRetentionService.SLM_RETENTION_JOB_ID + " but it was " + event.getJobName();
if (running.compareAndSet(false, true)) {
try {
logger.info("starting SLM retention snapshot cleanup task");
final ClusterState state = clusterService.state();

// Find all SLM policies that have retention enabled
final Map<String, SnapshotLifecyclePolicy> policiesWithRetention = getAllPoliciesWithRetentionEnabled(state);

// For those policies (there may be more than one for the same repo),
// return the repos that we need to get the snapshots for
final Set<String> repositioriesToFetch = policiesWithRetention.values().stream()
.map(SnapshotLifecyclePolicy::getRepository)
.collect(Collectors.toSet());

// Find all the snapshots that are past their retention date
// TODO: include min/max snapshot count as a criteria for deletion also
final List<SnapshotInfo> snapshotsToBeDeleted = getAllSnapshots(repositioriesToFetch).stream()
.filter(snapshot -> snapshotEligibleForDeletion(snapshot, policiesWithRetention))
.collect(Collectors.toList());

// Finally, delete the snapshots that need to be deleted
deleteSnapshots(snapshotsToBeDeleted);

} finally {
running.set(false);
}
} else {
logger.debug("snapshot lifecycle retention task started, but a task is already running, skipping");
}
}

static Map<String, SnapshotLifecyclePolicy> getAllPoliciesWithRetentionEnabled(final ClusterState state) {
// TODO: fill me in
return Collections.emptyMap();
}

static boolean snapshotEligibleForDeletion(SnapshotInfo snapshot, Map<String, SnapshotLifecyclePolicy> policies) {
// TODO: fill me in
return false;
}

List<SnapshotInfo> getAllSnapshots(Collection<String> repositories) {
// TODO: fill me in
return Collections.emptyList();
}

void deleteSnapshots(List<SnapshotInfo> snapshotsToDelete) {
// TODO: fill me in
logger.info("deleting {}", snapshotsToDelete);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ public static SnapshotLifecyclePolicy createPolicy(String id, String schedule) {
return new SnapshotLifecyclePolicy(id, randomAlphaOfLength(4), schedule, randomAlphaOfLength(4), config);
}

private static String randomSchedule() {
public static String randomSchedule() {
return randomIntBetween(0, 59) + " " +
randomIntBetween(0, 59) + " " +
randomIntBetween(0, 12) + " * * ?";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.snapshotlifecycle;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.watcher.watch.ClockMock;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;

public class SnapshotRetentionServiceTests extends ESTestCase {

private static final ClusterSettings clusterSettings;
static {
Set<Setting<?>> internalSettings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
internalSettings.add(LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING);
clusterSettings = new ClusterSettings(Settings.EMPTY, internalSettings);
}

public void testJobsAreScheduled() {
final DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(),
Collections.emptyMap(), DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT);
ClockMock clock = new ClockMock();

try (ThreadPool threadPool = new TestThreadPool("test");
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings);
SnapshotRetentionService service = new SnapshotRetentionService(Settings.EMPTY,
FakeRetentionTask::new, clusterService, clock)) {
assertThat(service.getScheduler().jobCount(), equalTo(0));

service.setUpdateSchedule(SnapshotLifecycleServiceTests.randomSchedule());
assertThat(service.getScheduler().scheduledJobIds(), containsInAnyOrder(SnapshotRetentionService.SLM_RETENTION_JOB_ID));

service.offMaster();
assertThat(service.getScheduler().jobCount(), equalTo(0));

service.onMaster();
assertThat(service.getScheduler().scheduledJobIds(), containsInAnyOrder(SnapshotRetentionService.SLM_RETENTION_JOB_ID));

service.setUpdateSchedule("");
assertThat(service.getScheduler().jobCount(), equalTo(0));
threadPool.shutdownNow();
}
}

private static class FakeRetentionTask extends SnapshotRetentionTask {
FakeRetentionTask() {
super(null, null);
}

@Override
public void triggered(SchedulerEngine.Event event) {
super.triggered(event);
}
}
}

0 comments on commit 9879a4b

Please sign in to comment.