Skip to content

Commit

Permalink
Throws ElasticsearchException from cluster state tasks if something
Browse files Browse the repository at this point in the history
really bad happens

If the `onFailure()` method is called on the cluster state task then
something bad happened that we can't really deal with so this change
throws an ElasticsearchException in that case and the step will be
re-executed when the policy is next triggered. If we can't submit a
cluster state task then we can't move to the error state so there isn't
really anything else we can do here. If the cluster state task fails
like this there are probably bigger issues witht he cluster anyway.

x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde
xlifecycle/ExecuteStepsUpdateTask.java
x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde
xlifecycle/MoveToErrorStepUpdateTask.java
x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/inde
xlifecycle/MoveToNextStepUpdateTask.java
x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde
xlifecycle/ClusterStateUpdateStepTests.java
x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde
xlifecycle/ExecuteStepsUpdateTaskTests.java
x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde
xlifecycle/MoveToErrorStepUpdateTaskTests.java
x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/inde
xlifecycle/MoveToNextStepUpdateTaskTests.java
  • Loading branch information
colings86 authored and jasontedor committed Aug 17, 2018
1 parent c1be0ac commit 09eae9e
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.indexlifecycle;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.common.logging.ESLoggerFactory;
Expand Down Expand Up @@ -98,6 +99,7 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(String source, Exception e) {
throw new RuntimeException(e); // NORELEASE implement error handling
throw new ElasticsearchException(
"policy [" + policy + "] for index [" + index.getName() + "] failed on step [" + startStep.getKey() + "].", e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.indexlifecycle;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -56,6 +57,7 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(String source, Exception e) {
throw new RuntimeException(e); // NORELEASE implement error handling
throw new ElasticsearchException("policy [" + policy + "] for index [" + index.getName()
+ "] failed trying to move from step [" + currentStepKey + "] to the ERROR step.", e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.indexlifecycle;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -74,7 +75,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS

@Override
public void onFailure(String source, Exception e) {
throw new RuntimeException(e); // NORELEASE implement error handling
throw new ElasticsearchException("policy [" + policy + "] for index [" + index.getName() + "] failed trying to move from step ["
+ currentStepKey + "] to step [" + nextStepKey + "].", e);
}

@FunctionalInterface
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.indexlifecycle;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -167,6 +168,19 @@ public void testExecuteIncompleteWaitStep() {
assertThat(LifecycleSettings.LIFECYCLE_ACTION_TIME_SETTING.get(newState.metaData().index(index).getSettings()), equalTo(-1L));
}

public void testOnFailure() {
setStateToKey(secondStepKey);
Step startStep = policyStepsRegistry.getStep(mixedPolicyName, secondStepKey);
long now = randomNonNegativeLong();
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now);
Exception expectedException = new RuntimeException();
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> task.onFailure(randomAlphaOfLength(10), expectedException));
assertEquals("policy [" + mixedPolicyName + "] for index [" + index.getName() + "] failed on step [" + startStep.getKey() + "].",
exception.getMessage());
assertSame(expectedException, exception.getCause());
}

private void setStateToKey(StepKey stepKey) {
clusterState = ClusterState.builder(clusterState)
.metaData(MetaData.builder(clusterState.metaData())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.indexlifecycle;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -79,6 +80,21 @@ public void testExecuteNoopDifferentPolicy() {
assertThat(newState, sameInstance(clusterState));
}

public void testOnFailure() {
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
long now = randomNonNegativeLong();

setStateToKey(currentStepKey);

MoveToErrorStepUpdateTask task = new MoveToErrorStepUpdateTask(index, policy, currentStepKey, () -> now);
Exception expectedException = new RuntimeException();
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> task.onFailure(randomAlphaOfLength(10), expectedException));
assertEquals("policy [" + policy + "] for index [" + index.getName() + "] failed trying to move from step [" + currentStepKey
+ "] to the ERROR step.", exception.getMessage());
assertSame(expectedException, exception.getCause());
}

private void setStatePolicy(String policy) {
clusterState = ClusterState.builder(clusterState)
.metaData(MetaData.builder(clusterState.metaData())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.indexlifecycle;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -96,6 +97,24 @@ public void testClusterProcessedWithNoChange() {
assertNull(changed.get());
}

public void testOnFailure() {
StepKey currentStepKey = new StepKey("current-phase", "current-action", "current-name");
StepKey nextStepKey = new StepKey("next-phase", "next-action", "next-name");
long now = randomNonNegativeLong();

setStateToKey(currentStepKey);

SetOnce<Boolean> changed = new SetOnce<>();
MoveToNextStepUpdateTask.Listener listener = (c) -> changed.set(true);
MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, () -> now, listener);
Exception expectedException = new RuntimeException();
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> task.onFailure(randomAlphaOfLength(10), expectedException));
assertEquals("policy [" + policy + "] for index [" + index.getName() + "] failed trying to move from step [" + currentStepKey
+ "] to step [" + nextStepKey + "].", exception.getMessage());
assertSame(expectedException, exception.getCause());
}

private void setStatePolicy(String policy) {
clusterState = ClusterState.builder(clusterState)
.metaData(MetaData.builder(clusterState.metaData())
Expand Down

0 comments on commit 09eae9e

Please sign in to comment.