Skip to content

Commit

Permalink
ILM action to wait for SLM policy execution (elastic#50454)
Browse files Browse the repository at this point in the history
This change add new ILM action to wait for SLM policy execution to ensure that index has snapshot before deletion.

Closes elastic#45067
  • Loading branch information
probakowski committed Jan 13, 2020
1 parent 86fb06a commit 7744c7e
Show file tree
Hide file tree
Showing 14 changed files with 540 additions and 2 deletions.
35 changes: 35 additions & 0 deletions docs/reference/ilm/policy-definitions.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ policy definition.
- <<ilm-allocate-action,Allocate>>
- <<ilm-freeze-action,Freeze>>
* Delete
- <<ilm-delete-action,Wait For Snapshot>>
- <<ilm-delete-action,Delete>>

[[ilm-allocate-action]]
Expand Down Expand Up @@ -224,6 +225,40 @@ PUT _ilm/policy/my_policy
}
--------------------------------------------------

[[ilm-wait-for-snapshot-action]]
==== Wait For Snapshot

Phases allowed: delete.

The Wait For Snapshot Action waits for defined SLM policy to be executed to ensure that snapshot of index exists before
deletion.

[[ilm-wait-for-snapshot-options]]
.Wait For Snapshot
[options="header"]
|======
| Name | Required | Default | Description
| `policy` | yes | - | SLM policy name that this action should wait for
|======

[source,console]
--------------------------------------------------
PUT _ilm/policy/my_policy
{
"policy": {
"phases": {
"delete": {
"actions": {
"wait_for_snapshot" : {
"policy": "slm-policy-name"
}
}
}
}
}
}
--------------------------------------------------

[[ilm-delete-action]]
==== Delete

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
import org.elasticsearch.xpack.core.ilm.WaitForSnapshotAction;
import org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType;
import org.elasticsearch.xpack.core.ilm.UnfollowAction;
import org.elasticsearch.xpack.core.ilm.action.DeleteLifecycleAction;
Expand Down Expand Up @@ -588,6 +589,7 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, WaitForSnapshotAction.NAME, WaitForSnapshotAction::new),
// Data Frame
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.TRANSFORM, TransformFeatureSetUsage::new),
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, TransformField.TASK_NAME, TransformTaskParams::new),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class TimeseriesLifecycleType implements LifecycleType {
AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME);
static final List<String> ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, AllocateAction.NAME,
FreezeAction.NAME);
static final List<String> ORDERED_VALID_DELETE_ACTIONS = Arrays.asList(DeleteAction.NAME);
static final List<String> ORDERED_VALID_DELETE_ACTIONS = Arrays.asList(WaitForSnapshotAction.NAME, DeleteAction.NAME);
static final Set<String> VALID_HOT_ACTIONS = Sets.newHashSet(ORDERED_VALID_HOT_ACTIONS);
static final Set<String> VALID_WARM_ACTIONS = Sets.newHashSet(ORDERED_VALID_WARM_ACTIONS);
static final Set<String> VALID_COLD_ACTIONS = Sets.newHashSet(ORDERED_VALID_COLD_ACTIONS);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.client.Client;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

/**
* A {@link LifecycleAction} which waits for snapshot to be taken (by configured SLM policy).
*/
public class WaitForSnapshotAction implements LifecycleAction {

public static final String NAME = "wait_for_snapshot";
public static final ParseField POLICY_FIELD = new ParseField("policy");

private static final ConstructingObjectParser<WaitForSnapshotAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
a -> new WaitForSnapshotAction((String) a[0]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), POLICY_FIELD);
}

private final String policy;

public static WaitForSnapshotAction parse(XContentParser parser) {
return PARSER.apply(parser, null);
}

public WaitForSnapshotAction(String policy) {
if (Strings.hasText(policy) == false) {
throw new IllegalArgumentException("policy name must be specified");
}
this.policy = policy;
}

public WaitForSnapshotAction(StreamInput in) throws IOException {
this(in.readString());
}

@Override
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
StepKey waitForSnapshotKey = new StepKey(phase, NAME, WaitForSnapshotStep.NAME);
return Collections.singletonList(new WaitForSnapshotStep(waitForSnapshotKey, nextStepKey, policy));
}

@Override
public boolean isSafeAction() {
return true;
}

@Override
public String getWriteableName() {
return NAME;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(policy);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(POLICY_FIELD.getPreferredName(), policy);
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WaitForSnapshotAction that = (WaitForSnapshotAction) o;
return policy.equals(that.policy);
}

@Override
public int hashCode() {
return Objects.hash(policy);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyMetadata;

import java.util.Date;
import java.util.Locale;
import java.util.Objects;

/***
* A step that waits for snapshot to be taken by SLM to ensure we have backup before we delete the index.
* It will signal error if it can't get data needed to do the check (phase time from ILM and SLM metadata)
* and will only return success if execution of SLM policy took place after index entered deleted phase.
*/
public class WaitForSnapshotStep extends ClusterStateWaitStep {

static final String NAME = "wait-for-snapshot";

private static final String MESSAGE_FIELD = "message";
private static final String POLICY_NOT_EXECUTED_MESSAGE = "waiting for policy '%s' to be executed since %s";
private static final String POLICY_NOT_FOUND_MESSAGE = "configured policy '%s' not found";
private static final String NO_INDEX_METADATA_MESSAGE = "no index metadata found for index '%s'";
private static final String NO_PHASE_TIME_MESSAGE = "no information about ILM phase start in index metadata for index '%s'";

private final String policy;

WaitForSnapshotStep(StepKey key, StepKey nextStepKey, String policy) {
super(key, nextStepKey);
this.policy = policy;
}

@Override
public Result isConditionMet(Index index, ClusterState clusterState) {
IndexMetaData indexMetaData = clusterState.metaData().index(index);
if (indexMetaData == null) {
throw error(NO_INDEX_METADATA_MESSAGE, index.getName());
}

Long phaseTime = LifecycleExecutionState.fromIndexMetadata(indexMetaData).getPhaseTime();

if (phaseTime == null) {
throw error(NO_PHASE_TIME_MESSAGE, index.getName());
}

SnapshotLifecycleMetadata snapMeta = clusterState.metaData().custom(SnapshotLifecycleMetadata.TYPE);
if (snapMeta == null || snapMeta.getSnapshotConfigurations().containsKey(policy) == false) {
throw error(POLICY_NOT_FOUND_MESSAGE, policy);
}
SnapshotLifecyclePolicyMetadata snapPolicyMeta = snapMeta.getSnapshotConfigurations().get(policy);
if (snapPolicyMeta.getLastSuccess() == null || snapPolicyMeta.getLastSuccess().getTimestamp() < phaseTime) {
return new Result(false, notExecutedMessage(phaseTime));
}

return new Result(true, null);
}

public String getPolicy() {
return policy;
}

@Override
public boolean isRetryable() {
return true;
}

private ToXContentObject notExecutedMessage(long time) {
return (builder, params) -> {
builder.startObject();
builder.field(MESSAGE_FIELD, String.format(Locale.ROOT, POLICY_NOT_EXECUTED_MESSAGE, policy, new Date(time)));
builder.endObject();
return builder;
};
}

private IllegalStateException error(String message, Object... args) {
return new IllegalStateException(String.format(Locale.ROOT, message, args));
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
WaitForSnapshotStep that = (WaitForSnapshotStep) o;
return policy.equals(that.policy);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), policy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() {
new NamedWriteableRegistry.Entry(LifecycleType.class, TimeseriesLifecycleType.TYPE,
(in) -> TimeseriesLifecycleType.INSTANCE),
new NamedWriteableRegistry.Entry(LifecycleAction.class, AllocateAction.NAME, AllocateAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, WaitForSnapshotAction.NAME, WaitForSnapshotAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, ForceMergeAction.NAME, ForceMergeAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, ReadOnlyAction.NAME, ReadOnlyAction::new),
Expand All @@ -57,6 +58,8 @@ protected NamedXContentRegistry xContentRegistry() {
new NamedXContentRegistry.Entry(LifecycleType.class, new ParseField(TimeseriesLifecycleType.TYPE),
(p) -> TimeseriesLifecycleType.INSTANCE),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(AllocateAction.NAME), AllocateAction::parse),
new NamedXContentRegistry.Entry(LifecycleAction.class,
new ParseField(WaitForSnapshotAction.NAME), WaitForSnapshotAction::parse),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ForceMergeAction.NAME), ForceMergeAction::parse),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ReadOnlyAction.NAME), ReadOnlyAction::parse),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() {
new NamedWriteableRegistry.Entry(LifecycleType.class, TimeseriesLifecycleType.TYPE,
(in) -> TimeseriesLifecycleType.INSTANCE),
new NamedWriteableRegistry.Entry(LifecycleAction.class, AllocateAction.NAME, AllocateAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, WaitForSnapshotAction.NAME, WaitForSnapshotAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, ForceMergeAction.NAME, ForceMergeAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, ReadOnlyAction.NAME, ReadOnlyAction::new),
Expand All @@ -66,6 +67,8 @@ protected NamedXContentRegistry xContentRegistry() {
new NamedXContentRegistry.Entry(LifecycleType.class, new ParseField(TimeseriesLifecycleType.TYPE),
(p) -> TimeseriesLifecycleType.INSTANCE),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(AllocateAction.NAME), AllocateAction::parse),
new NamedXContentRegistry.Entry(LifecycleAction.class,
new ParseField(WaitForSnapshotAction.NAME), WaitForSnapshotAction::parse),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ForceMergeAction.NAME), ForceMergeAction::parse),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ReadOnlyAction.NAME), ReadOnlyAction::parse),
Expand Down Expand Up @@ -110,6 +113,8 @@ public static LifecyclePolicy randomTimeseriesLifecyclePolicyWithAllPhases(@Null
return AllocateActionTests.randomInstance();
case DeleteAction.NAME:
return new DeleteAction();
case WaitForSnapshotAction.NAME:
return WaitForSnapshotActionTests.randomInstance();
case ForceMergeAction.NAME:
return ForceMergeActionTests.randomInstance();
case ReadOnlyAction.NAME:
Expand Down Expand Up @@ -160,6 +165,8 @@ public static LifecyclePolicy randomTimeseriesLifecyclePolicy(@Nullable String l
switch (action) {
case AllocateAction.NAME:
return AllocateActionTests.randomInstance();
case WaitForSnapshotAction.NAME:
return WaitForSnapshotActionTests.randomInstance();
case DeleteAction.NAME:
return new DeleteAction();
case ForceMergeAction.NAME:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
private static final AllocateAction TEST_ALLOCATE_ACTION =
new AllocateAction(2, Collections.singletonMap("node", "node1"),null, null);
private static final DeleteAction TEST_DELETE_ACTION = new DeleteAction();
private static final WaitForSnapshotAction TEST_WAIT_FOR_SNAPSHOT_ACTION = new WaitForSnapshotAction("policy");
private static final ForceMergeAction TEST_FORCE_MERGE_ACTION = new ForceMergeAction(1);
private static final RolloverAction TEST_ROLLOVER_ACTION = new RolloverAction(new ByteSizeValue(1), null, null);
private static final ShrinkAction TEST_SHRINK_ACTION = new ShrinkAction(1);
Expand Down Expand Up @@ -556,6 +557,8 @@ private LifecycleAction getTestAction(String actionName) {
switch (actionName) {
case AllocateAction.NAME:
return TEST_ALLOCATE_ACTION;
case WaitForSnapshotAction.NAME:
return TEST_WAIT_FOR_SNAPSHOT_ACTION;
case DeleteAction.NAME:
return TEST_DELETE_ACTION;
case ForceMergeAction.NAME:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.List;

public class WaitForSnapshotActionTests extends AbstractActionTestCase<WaitForSnapshotAction> {

@Override
public void testToSteps() {
WaitForSnapshotAction action = createTestInstance();
Step.StepKey nextStep = new Step.StepKey("", "", "");
List<Step> steps = action.toSteps(null, "delete", nextStep);
assertEquals(1, steps.size());
Step step = steps.get(0);
assertTrue(step instanceof WaitForSnapshotStep);
assertEquals(nextStep, step.getNextStepKey());

Step.StepKey key = step.getKey();
assertEquals("delete", key.getPhase());
assertEquals(WaitForSnapshotAction.NAME, key.getAction());
assertEquals(WaitForSnapshotStep.NAME, key.getName());
}

@Override
protected WaitForSnapshotAction doParseInstance(XContentParser parser) throws IOException {
return WaitForSnapshotAction.parse(parser);
}

@Override
protected WaitForSnapshotAction createTestInstance() {
return randomInstance();
}

@Override
protected Writeable.Reader<WaitForSnapshotAction> instanceReader() {
return WaitForSnapshotAction::new;
}

@Override
protected WaitForSnapshotAction mutateInstance(WaitForSnapshotAction instance) throws IOException {
return randomInstance();
}

static WaitForSnapshotAction randomInstance() {
return new WaitForSnapshotAction(randomAlphaOfLengthBetween(5, 10));
}

}
Loading

0 comments on commit 7744c7e

Please sign in to comment.