Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record history of SLM retention actions #45513

Merged
merged 16 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -40,7 +39,10 @@ public class SnapshotHistoryItem implements Writeable, ToXContentObject {
static final ParseField SNAPSHOT_NAME = new ParseField("snapshot_name");
static final ParseField OPERATION = new ParseField("operation");
static final ParseField SUCCESS = new ParseField("success");
private static final String CREATE_OPERATION = "CREATE";

public static final String CREATE_OPERATION = "CREATE";
public static final String DELETE_OPERATION = "DELETE";

protected final long timestamp;
protected final String policyId;
protected final String repository;
Expand Down Expand Up @@ -98,25 +100,34 @@ public static SnapshotHistoryItem parse(XContentParser parser, String name) {
this.errorDetails = errorDetails;
}

public static SnapshotHistoryItem successRecord(long timestamp, SnapshotLifecyclePolicy policy, String snapshotName) {
public static SnapshotHistoryItem creationSuccessRecord(long timestamp, SnapshotLifecyclePolicy policy, String snapshotName) {
return new SnapshotHistoryItem(timestamp, policy.getId(), policy.getRepository(), snapshotName, CREATE_OPERATION, true,
policy.getConfig(), null);
}

public static SnapshotHistoryItem failureRecord(long timeStamp, SnapshotLifecyclePolicy policy, String snapshotName,
Exception exception) throws IOException {
ToXContent.Params stacktraceParams = new ToXContent.MapParams(Collections.singletonMap(REST_EXCEPTION_SKIP_STACK_TRACE, "false"));
String exceptionString;
try (XContentBuilder causeXContentBuilder = JsonXContent.contentBuilder()) {
causeXContentBuilder.startObject();
ElasticsearchException.generateThrowableXContent(causeXContentBuilder, stacktraceParams, exception);
causeXContentBuilder.endObject();
exceptionString = BytesReference.bytes(causeXContentBuilder).utf8ToString();
}
public static SnapshotHistoryItem creationFailureRecord(long timeStamp, SnapshotLifecyclePolicy policy, String snapshotName,
Exception exception) throws IOException {
String exceptionString = exceptionToString(exception);
return new SnapshotHistoryItem(timeStamp, policy.getId(), policy.getRepository(), snapshotName, CREATE_OPERATION, false,
policy.getConfig(), exceptionString);
}

public static SnapshotHistoryItem deletionSuccessRecord(long timestamp, String snapshotName, String policyId, String repository) {
return new SnapshotHistoryItem(timestamp, policyId, repository, snapshotName, DELETE_OPERATION, true, null, null);
}

public static SnapshotHistoryItem deletionPossibleSuccessRecord(long timestamp, String snapshotName, String policyId, String repository,
String details) {
return new SnapshotHistoryItem(timestamp, policyId, repository, snapshotName, DELETE_OPERATION, true, null, details);
}

public static SnapshotHistoryItem deletionFailureRecord(long timestamp, String snapshotName, String policyId, String repository,
Exception exception) throws IOException {
String exceptionString = exceptionToString(exception);
return new SnapshotHistoryItem(timestamp, policyId, repository, snapshotName, DELETE_OPERATION, false,
null, exceptionString);
}

public SnapshotHistoryItem(StreamInput in) throws IOException {
this.timestamp = in.readVLong();
this.policyId = in.readString();
Expand Down Expand Up @@ -220,4 +231,16 @@ public int hashCode() {
public String toString() {
return Strings.toString(this);
}

private static String exceptionToString(Exception exception) throws IOException {
Params stacktraceParams = new MapParams(Collections.singletonMap(REST_EXCEPTION_SKIP_STACK_TRACE, "false"));
String exceptionString;
try (XContentBuilder causeXContentBuilder = JsonXContent.contentBuilder()) {
causeXContentBuilder.startObject();
ElasticsearchException.generateThrowableXContent(causeXContentBuilder, stacktraceParams, exception);
causeXContentBuilder.endObject();
exceptionString = BytesReference.bytes(causeXContentBuilder).utf8ToString();
}
return exceptionString;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testNoActionIfDisabled() {
final long timestamp = randomNonNegativeLong();
SnapshotLifecyclePolicy.ResolverContext context = new SnapshotLifecyclePolicy.ResolverContext(timestamp);
String snapshotId = policy.generateSnapshotName(context);
SnapshotHistoryItem record = SnapshotHistoryItem.successRecord(timestamp, policy, snapshotId);
SnapshotHistoryItem record = SnapshotHistoryItem.creationSuccessRecord(timestamp, policy, snapshotId);

client.setVerifier((a,r,l) -> {
fail("the history store is disabled, no action should have been taken");
Expand All @@ -76,7 +76,7 @@ public void testPut() throws Exception {
SnapshotLifecyclePolicy.ResolverContext context = new SnapshotLifecyclePolicy.ResolverContext(timestamp);
String snapshotId = policy.generateSnapshotName(context);
{
SnapshotHistoryItem record = SnapshotHistoryItem.successRecord(timestamp, policy, snapshotId);
SnapshotHistoryItem record = SnapshotHistoryItem.creationSuccessRecord(timestamp, policy, snapshotId);

AtomicInteger calledTimes = new AtomicInteger(0);
client.setVerifier((action, request, listener) -> {
Expand Down Expand Up @@ -111,7 +111,7 @@ public void testPut() throws Exception {
{
final String cause = randomAlphaOfLength(9);
Exception failureException = new RuntimeException(cause);
SnapshotHistoryItem record = SnapshotHistoryItem.failureRecord(timestamp, policy, snapshotId, failureException);
SnapshotHistoryItem record = SnapshotHistoryItem.creationFailureRecord(timestamp, policy, snapshotId, failureException);

AtomicInteger calledTimes = new AtomicInteger(0);
client.setVerifier((action, request, listener) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem.CREATE_OPERATION;
import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem.DELETE_OPERATION;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -104,7 +106,7 @@ public void testFullPolicySnapshot() throws Exception {
Map<String, Object> metadata = (Map<String, Object>) snapResponse.get(0).get("metadata");
assertNotNull(metadata);
assertThat(metadata.get("policy"), equalTo(policyName));
assertHistoryIsPresent(policyName, true, repoId);
assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION);

// Check that the last success date was written to the cluster state
Request getReq = new Request("GET", "/_slm/policy/" + policyName);
Expand All @@ -125,7 +127,7 @@ public void testFullPolicySnapshot() throws Exception {
String lastSnapshotName = (String) lastSuccessObject.get("snapshot_name");
assertThat(lastSnapshotName, startsWith("snap-"));

assertHistoryIsPresent(policyName, true, repoId);
assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION);

Map<String, Object> stats = getSLMStats();
Map<String, Object> policyStats = (Map<String, Object>) stats.get(SnapshotLifecycleStats.POLICY_STATS.getPreferredName());
Expand Down Expand Up @@ -174,7 +176,7 @@ public void testPolicyFailure() throws Exception {
assertNotNull(snapshotName);
assertThat(snapshotName, startsWith("snap-"));
}
assertHistoryIsPresent(policyName, false, repoName);
assertHistoryIsPresent(policyName, false, repoName, CREATE_OPERATION);

Map<String, Object> stats = getSLMStats();
Map<String, Object> policyStats = (Map<String, Object>) stats.get(SnapshotLifecycleStats.POLICY_STATS.getPreferredName());
Expand Down Expand Up @@ -224,7 +226,7 @@ public void testPolicyManualExecution() throws Exception {
final Map<String, Object> metadata = extractMetadata(snapshotResponseMap, snapshotName);
assertNotNull(metadata);
assertThat(metadata.get("policy"), equalTo(policyName));
assertHistoryIsPresent(policyName, true, repoId);
assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION);
} catch (ResponseException e) {
fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity()));
}
Expand Down Expand Up @@ -278,7 +280,7 @@ public void testBasicTimeBasedRetenion() throws Exception {
final Map<String, Object> metadata = extractMetadata(snapshotResponseMap, snapshotName);
assertNotNull(metadata);
assertThat(metadata.get("policy"), equalTo(policyName));
assertHistoryIsPresent(policyName, true, repoId);
assertHistoryIsPresent(policyName, true, repoId, CREATE_OPERATION);
} catch (ResponseException e) {
fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity()));
}
Expand Down Expand Up @@ -306,6 +308,7 @@ public void testBasicTimeBasedRetenion() throws Exception {
} catch (ResponseException e) {
assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception"));
}
assertHistoryIsPresent(policyName, true, repoId, DELETE_OPERATION);

Map<String, Object> stats = getSLMStats();
Map<String, Object> policyStats = (Map<String, Object>) stats.get(SnapshotLifecycleStats.POLICY_STATS.getPreferredName());
Expand Down Expand Up @@ -414,7 +417,7 @@ private Map<String, Object> getSLMStats() {
}

// This method should be called inside an assertBusy, it has no retry logic of its own
private void assertHistoryIsPresent(String policyName, boolean success, String repository) throws IOException {
private void assertHistoryIsPresent(String policyName, boolean success, String repository, String operation) throws IOException {
final Request historySearchRequest = new Request("GET", ".slm-history*/_search");
historySearchRequest.setJsonEntity("{\n" +
" \"query\": {\n" +
Expand All @@ -437,7 +440,7 @@ private void assertHistoryIsPresent(String policyName, boolean success, String r
" },\n" +
" {\n" +
" \"term\": {\n" +
" \"operation\": \"CREATE\"\n" +
" \"operation\": \"" + operation + "\"\n" +
" }\n" +
" }\n" +
" ]\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
snapshotLifecycleService.set(new SnapshotLifecycleService(settings,
() -> new SnapshotLifecycleTask(client, clusterService, snapshotHistoryStore.get()), clusterService, getClock()));
snapshotRetentionService.set(new SnapshotRetentionService(settings,
() -> new SnapshotRetentionTask(client, clusterService, System::nanoTime),
() -> new SnapshotRetentionTask(client, clusterService, System::nanoTime, snapshotHistoryStore.get()),
clusterService, getClock()));
return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotHistoryStore.get(),
snapshotRetentionService.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public void onResponse(CreateSnapshotResponse createSnapshotResponse) {
final long timestamp = Instant.now().toEpochMilli();
clusterService.submitStateUpdateTask("slm-record-success-" + policyMetadata.getPolicy().getId(),
WriteJobStatus.success(policyMetadata.getPolicy().getId(), request.snapshot(), timestamp));
historyStore.putAsync(SnapshotHistoryItem.successRecord(timestamp, policyMetadata.getPolicy(), request.snapshot()));
historyStore.putAsync(SnapshotHistoryItem.creationSuccessRecord(timestamp, policyMetadata.getPolicy(),
request.snapshot()));
}

@Override
Expand All @@ -106,7 +107,8 @@ public void onFailure(Exception e) {
WriteJobStatus.failure(policyMetadata.getPolicy().getId(), request.snapshot(), timestamp, e));
final SnapshotHistoryItem failureRecord;
try {
failureRecord = SnapshotHistoryItem.failureRecord(timestamp, policyMetadata.getPolicy(), request.snapshot(), e);
failureRecord = SnapshotHistoryItem.creationFailureRecord(timestamp, policyMetadata.getPolicy(),
request.snapshot(), e);
historyStore.putAsync(failureRecord);
} catch (IOException ex) {
// This shouldn't happen unless there's an issue with serializing the original exception, which shouldn't happen
Expand Down
Loading