Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Time-bound deletion of snapshots in retention delete function #45065

Merged
merged 7 commits into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class LifecycleSettings {

public static final String SLM_HISTORY_INDEX_ENABLED = "slm.history_index_enabled";
public static final String SLM_RETENTION_SCHEDULE = "slm.retention_schedule";
public static final String SLM_RETENTION_DURATION = "slm.retention_duration";


public static final Setting<TimeValue> LIFECYCLE_POLL_INTERVAL_SETTING = Setting.timeSetting(LIFECYCLE_POLL_INTERVAL,
Expand All @@ -42,4 +43,6 @@ public class LifecycleSettings {
SLM_RETENTION_SCHEDULE + "]", e);
}
}, Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<TimeValue> SLM_RETENTION_DURATION_SETTING = Setting.timeSetting(SLM_RETENTION_DURATION,
TimeValue.timeValueHours(1), TimeValue.timeValueMillis(500), Setting.Property.Dynamic, Setting.Property.NodeScope);
}
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,8 @@ public void testDeleteDuringSnapshot() throws Exception {
// index document so snapshot actually does something
indexDocument();
// start snapshot
request = new Request("PUT", "/_snapshot/repo/snapshot");
String snapName = "snapshot-" + randomAlphaOfLength(6).toLowerCase(Locale.ROOT);
request = new Request("PUT", "/_snapshot/repo/" + snapName);
request.addParameter("wait_for_completion", "false");
request.setJsonEntity("{\"indices\": \"" + index + "\"}");
assertOK(client().performRequest(request));
Expand All @@ -392,8 +393,8 @@ public void testDeleteDuringSnapshot() throws Exception {
// assert that index was deleted
assertBusy(() -> assertFalse(indexExists(index)), 2, TimeUnit.MINUTES);
// assert that snapshot is still in progress and clean up
assertThat(getSnapshotState("snapshot"), equalTo("SUCCESS"));
assertOK(client().performRequest(new Request("DELETE", "/_snapshot/repo/snapshot")));
assertThat(getSnapshotState(snapName), equalTo("SUCCESS"));
assertOK(client().performRequest(new Request("DELETE", "/_snapshot/repo/" + snapName)));
}

public void testReadOnly() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -293,12 +294,13 @@ public void testBasicTimeBasedRetenion() throws Exception {
assertBusy(() -> {
// We expect a failed response because the snapshot should not exist
try {
logger.info("--> checking to see if snapshot has been deleted...");
Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName));
assertThat(EntityUtils.toString(response.getEntity()), containsString("snapshot_missing_exception"));
} catch (ResponseException e) {
assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception"));
}
});
}, 60, TimeUnit.SECONDS);

Request delReq = new Request("DELETE", "/_slm/policy/" + policyName);
assertOK(client().performRequest(delReq));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy;
Expand Down Expand Up @@ -64,6 +68,7 @@ public void triggered(SchedulerEngine.Event event) {
try {
logger.info("starting SLM retention snapshot cleanup task");
final ClusterState state = clusterService.state();
final TimeValue maxDeletionTime = LifecycleSettings.SLM_RETENTION_DURATION_SETTING.get(state.metaData().settings());

// Find all SLM policies that have retention enabled
final Map<String, SnapshotLifecyclePolicy> policiesWithRetention = getAllPoliciesWithRetentionEnabled(state);
Expand All @@ -74,7 +79,7 @@ public void triggered(SchedulerEngine.Event event) {
.map(SnapshotLifecyclePolicy::getRepository)
.collect(Collectors.toSet());

getAllSnapshots(repositioriesToFetch, new ActionListener<>() {
getAllSuccessfulSnapshots(repositioriesToFetch, new ActionListener<>() {
@Override
public void onResponse(Map<String, List<SnapshotInfo>> allSnapshots) {
// Find all the snapshots that are past their retention date
Expand All @@ -85,7 +90,7 @@ public void onResponse(Map<String, List<SnapshotInfo>> allSnapshots) {
.collect(Collectors.toList())));

// Finally, delete the snapshots that need to be deleted
deleteSnapshots(snapshotsToBeDeleted);
deleteSnapshots(snapshotsToBeDeleted, maxDeletionTime);
}

@Override
Expand Down Expand Up @@ -160,8 +165,8 @@ static boolean snapshotEligibleForDeletion(SnapshotInfo snapshot, Map<String, Li
return eligible;
}

void getAllSnapshots(Collection<String> repositories, ActionListener<Map<String, List<SnapshotInfo>>> listener,
Consumer<Exception> errorHandler) {
void getAllSuccessfulSnapshots(Collection<String> repositories, ActionListener<Map<String, List<SnapshotInfo>>> listener,
Consumer<Exception> errorHandler) {
if (repositories.isEmpty()) {
// Skip retrieving anything if there are no repositories to fetch
listener.onResponse(Collections.emptyMap());
Expand All @@ -175,7 +180,11 @@ void getAllSnapshots(Collection<String> repositories, ActionListener<Map<String,
public void onResponse(final GetSnapshotsResponse resp) {
Map<String, List<SnapshotInfo>> snapshots = new HashMap<>();
repositories.forEach(repo -> {
snapshots.put(repo, resp.getSnapshots(repo));
snapshots.put(repo,
// Only return snapshots in the SUCCESS state
resp.getSnapshots(repo).stream()
.filter(info -> info.state() == SnapshotState.SUCCESS)
.collect(Collectors.toList()));
});
listener.onResponse(snapshots);
}
Expand All @@ -188,42 +197,63 @@ public void onFailure(Exception e) {
});
}

void deleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDelete) {
// TODO: make this more resilient and possibly only delete for a certain amount of time
void deleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDelete, TimeValue maximumTime) {
int count = snapshotsToDelete.values().stream().mapToInt(List::size).sum();
if (count == 0) {
logger.debug("no snapshots are eligible for deletion");
return;
}

logger.info("starting snapshot retention deletion for [{}] snapshots", count);
snapshotsToDelete.forEach((repo, snapshots) -> {
snapshots.forEach(info -> {
logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, info.snapshotId());
CountDownLatch latch = new CountDownLatch(1);
client.admin().cluster().prepareDeleteSnapshot(repo, info.snapshotId().getName())
.execute(new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
if (acknowledgedResponse.isAcknowledged()) {
logger.debug("[{}] snapshot [{}] deleted successfully", repo, info.snapshotId());
}
}

@Override
public void onFailure(Exception e) {
logger.warn(new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention",
repo, info.snapshotId()), e);
}
}, latch));
try {
// Deletes cannot occur simultaneously, so wait for this
// deletion to complete before attempting the next one
latch.await();
} catch (InterruptedException e) {
logger.error(new ParameterizedMessage("[{}] deletion of snapshot [{}] interrupted",
repo, info.snapshotId()), e);
long startTime = System.nanoTime();
int deleted = 0;
for (Map.Entry<String, List<SnapshotInfo>> entry : snapshotsToDelete.entrySet()) {
String repo = entry.getKey();
List<SnapshotInfo> snapshots = entry.getValue();
for (SnapshotInfo info : snapshots) {
deleteSnapshot(repo, info.snapshotId());
deleted++;
// Check whether we have exceeded the maximum time allowed to spend deleting
// snapshots, if we have, short-circuit the rest of the deletions
TimeValue elapsedDeletionTime = TimeValue.timeValueNanos(System.nanoTime() - startTime);
dakrone marked this conversation as resolved.
Show resolved Hide resolved
if (elapsedDeletionTime.compareTo(maximumTime) > 0) {
logger.info("maximum snapshot retention deletion time reached, time spent: [{}]," +
" maximum allowed time: [{}], deleted {} out of {} snapshots scheduled for deletion",
elapsedDeletionTime, maximumTime, deleted, count);
return;
}
});
});
}
}
}

/**
* Delete the given snapshot from the repository in blocking manner
*/
void deleteSnapshot(String repo, SnapshotId snapshot) {
logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, snapshot);
CountDownLatch latch = new CountDownLatch(1);
client.admin().cluster().prepareDeleteSnapshot(repo, snapshot.getName())
.execute(new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
if (acknowledgedResponse.isAcknowledged()) {
logger.debug("[{}] snapshot [{}] deleted successfully", repo, snapshot);
}
}

@Override
public void onFailure(Exception e) {
logger.warn(new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention",
repo, snapshot), e);
}
}, latch));
try {
// Deletes cannot occur simultaneously, so wait for this
// deletion to complete before attempting the next one
latch.await();
} catch (InterruptedException e) {
logger.error(new ParameterizedMessage("[{}] deletion of snapshot [{}] interrupted",
repo, snapshot), e);
}
}
}
Loading