Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement SnapshotRetentionTask's snapshot filtering and deletion #44764

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
public class SnapshotLifecyclePolicy extends AbstractDiffable<SnapshotLifecyclePolicy>
implements Writeable, Diffable<SnapshotLifecyclePolicy>, ToXContentObject {

public static final String POLICY_ID_METADATA_FIELD = "policy";

private final String id;
private final String name;
private final String schedule;
Expand All @@ -61,7 +63,6 @@ public class SnapshotLifecyclePolicy extends AbstractDiffable<SnapshotLifecycleP
private static final ParseField RETENTION = new ParseField("retention");
private static final IndexNameExpressionResolver.DateMathExpressionResolver DATE_MATH_RESOLVER =
new IndexNameExpressionResolver.DateMathExpressionResolver();
private static final String POLICY_ID_METADATA_FIELD = "policy";
private static final String METADATA_FIELD_NAME = "metadata";

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.snapshots.SnapshotInfo;

import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;

public class SnapshotRetentionConfiguration implements ToXContentObject, Writeable {

Expand Down Expand Up @@ -56,6 +59,26 @@ public TimeValue getExpireAfter() {
return this.expireAfter;
}

/**
* Return a predicate by which a SnapshotInfo can be tested to see
* whether it should be deleted according to this retention policy.
* @param allSnapshots a list of all snapshot pertaining to this SLM policy and repository
*/
public Predicate<SnapshotInfo> getSnapshotDeletionPredicate(final List<SnapshotInfo> allSnapshots) {
return si -> {
if (this.expireAfter != null) {
TimeValue snapshotAge = new TimeValue(System.currentTimeMillis() - si.startTime());
if (snapshotAge.compareTo(this.expireAfter) > 0) {
return true;
} else {
return false;
}
}
// If nothing matched, the snapshot is not eligible for deletion
return false;
};
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@
package org.elasticsearch.xpack.slm;

import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
Expand All @@ -22,6 +25,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy;
import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotRetentionConfiguration;

Expand Down Expand Up @@ -222,6 +226,93 @@ public void testPolicyManualExecution() throws Exception {
});
}

public void testBasicTimeBasedRetenion() throws Exception {
final String indexName = "test";
final String policyName = "test-policy";
final String repoId = "my-repo";
int docCount = randomIntBetween(10, 50);
List<IndexRequestBuilder> indexReqs = new ArrayList<>();
for (int i = 0; i < docCount; i++) {
index(client(), indexName, "" + i, "foo", "bar");
}

// Create a snapshot repo
inializeRepo(repoId);

// Create a policy with a retention period of 1 millisecond
createSnapshotPolicy(policyName, "snap", "1 2 3 4 5 ?", repoId, indexName, true,
new SnapshotRetentionConfiguration(TimeValue.timeValueMillis(1)));

// Manually create a snapshot
Response executeResp = client().performRequest(new Request("PUT", "/_slm/policy/" + policyName + "/_execute"));

final String snapshotName;
try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, EntityUtils.toByteArray(executeResp.getEntity()))) {
snapshotName = parser.mapStrings().get("snapshot_name");

// Check that the executed snapshot is created
assertBusy(() -> {
try {
Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName));
Map<String, Object> snapshotResponseMap;
try (InputStream is = response.getEntity().getContent()) {
snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
}
assertThat(snapshotResponseMap.size(), greaterThan(0));
final Map<String, Object> metadata = extractMetadata(snapshotResponseMap, snapshotName);
assertNotNull(metadata);
assertThat(metadata.get("policy"), equalTo(policyName));
assertHistoryIsPresent(policyName, true, repoId);
} catch (ResponseException e) {
fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity()));
}
});
}

// Run retention every second
ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest();
req.transientSettings(Settings.builder().put(LifecycleSettings.SLM_RETENTION_SCHEDULE, "*/1 * * * * ?"));
try (XContentBuilder builder = jsonBuilder()) {
req.toXContent(builder, ToXContent.EMPTY_PARAMS);
Request r = new Request("PUT", "/_cluster/settings");
r.setJsonEntity(Strings.toString(builder));
Response updateSettingsResp = client().performRequest(r);
}

try {
// Check that the snapshot created by the policy has been removed by retention
assertBusy(() -> {
// We expect a failed response because the snapshot should not exist
try {
Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName));
assertThat(EntityUtils.toString(response.getEntity()), containsString("snapshot_missing_exception"));
gwbrown marked this conversation as resolved.
Show resolved Hide resolved
} catch (ResponseException e) {
assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception"));
}
});

Request delReq = new Request("DELETE", "/_slm/policy/" + policyName);
assertOK(client().performRequest(delReq));
gwbrown marked this conversation as resolved.
Show resolved Hide resolved

// It's possible there could have been a snapshot in progress when the
// policy is deleted, so wait for it to be finished
assertBusy(() -> {
assertThat(wipeSnapshots().size(), equalTo(0));
});
gwbrown marked this conversation as resolved.
Show resolved Hide resolved
} finally {
// Unset retention
ClusterUpdateSettingsRequest unsetRequest = new ClusterUpdateSettingsRequest();
unsetRequest.transientSettings(Settings.builder().put(LifecycleSettings.SLM_RETENTION_SCHEDULE, (String) null));
try (XContentBuilder builder = jsonBuilder()) {
unsetRequest.toXContent(builder, ToXContent.EMPTY_PARAMS);
Request r = new Request("PUT", "/_cluster/settings");
r.setJsonEntity(Strings.toString(builder));
client().performRequest(r);
}
}
}

@SuppressWarnings("unchecked")
private static Map<String, Object> extractMetadata(Map<String, Object> snapshotResponseMap, String snapshotPrefix) {
List<Map<String, Object>> snapResponse = ((List<Map<String, Object>>) snapshotResponseMap.get("responses")).stream()
Expand Down Expand Up @@ -284,6 +375,13 @@ private void assertHistoryIsPresent(String policyName, boolean success, String r

private void createSnapshotPolicy(String policyName, String snapshotNamePattern, String schedule, String repoId,
String indexPattern, boolean ignoreUnavailable) throws IOException {
createSnapshotPolicy(policyName, snapshotNamePattern, schedule, repoId, indexPattern,
ignoreUnavailable, SnapshotRetentionConfiguration.EMPTY);
}

private void createSnapshotPolicy(String policyName, String snapshotNamePattern, String schedule, String repoId,
String indexPattern, boolean ignoreUnavailable,
SnapshotRetentionConfiguration retention) throws IOException {
Map<String, Object> snapConfig = new HashMap<>();
snapConfig.put("indices", Collections.singletonList(indexPattern));
snapConfig.put("ignore_unavailable", ignoreUnavailable);
Expand All @@ -295,8 +393,8 @@ private void createSnapshotPolicy(String policyName, String snapshotNamePattern,
() -> randomAlphaOfLength(5)), randomAlphaOfLength(4));
}
}
SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyName, snapshotNamePattern, schedule, repoId, snapConfig,
SnapshotRetentionConfiguration.EMPTY);
SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyName, snapshotNamePattern, schedule,
repoId, snapConfig, retention);

Request putLifecycle = new Request("PUT", "/_slm/policy/" + policyName);
XContentBuilder lifecycleBuilder = JsonXContent.contentBuilder();
Expand Down
Loading