Skip to content

Commit

Permalink
[Transform] Consider task cancelled exceptions as recoverable
Browse files Browse the repository at this point in the history
A task cancelled exception has REST status 400, which makes it
irrecoverable as far as transforms is concerned. This means that
a transform that suffers such an exception will fail without
doing any retries. This is bad, because a search can fail with
a task cancelled exception if one of its lower level phases
suffers a circuit breaker exception. We want transforms to retry
in the event of there temporarily not being sufficient memory
for a search.
  • Loading branch information
droberts195 committed Oct 13, 2023
1 parent bfc87d4 commit 427f847
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private void handleBulkIndexingException(BulkIndexingException bulkIndexingExcep
* @param numFailureRetries the number of configured retries
*/
private void handleElasticsearchException(ElasticsearchException elasticsearchException, boolean unattended, int numFailureRetries) {
if (unattended == false && ExceptionRootCauseFinder.IRRECOVERABLE_REST_STATUSES.contains(elasticsearchException.status())) {
if (unattended == false && ExceptionRootCauseFinder.isExceptionIrrecoverable(elasticsearchException)) {
String message = "task encountered irrecoverable failure: " + elasticsearchException.getDetailedMessage();
fail(message);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskCancelledException;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;

/**
Expand All @@ -24,17 +23,15 @@ public final class ExceptionRootCauseFinder {
/**
* List of rest statuses that we consider irrecoverable
*/
public static final Set<RestStatus> IRRECOVERABLE_REST_STATUSES = new HashSet<>(
Arrays.asList(
RestStatus.GONE,
RestStatus.NOT_IMPLEMENTED,
RestStatus.NOT_FOUND,
RestStatus.BAD_REQUEST,
RestStatus.UNAUTHORIZED,
RestStatus.FORBIDDEN,
RestStatus.METHOD_NOT_ALLOWED,
RestStatus.NOT_ACCEPTABLE
)
static final Set<RestStatus> IRRECOVERABLE_REST_STATUSES = Set.of(
RestStatus.GONE,
RestStatus.NOT_IMPLEMENTED,
RestStatus.NOT_FOUND,
RestStatus.BAD_REQUEST,
RestStatus.UNAUTHORIZED,
RestStatus.FORBIDDEN,
RestStatus.METHOD_NOT_ALLOWED,
RestStatus.NOT_ACCEPTABLE
);

/**
Expand Down Expand Up @@ -65,7 +62,7 @@ public static Throwable getFirstIrrecoverableExceptionFromBulkResponses(Collecti
}

if (unwrappedThrowable instanceof ElasticsearchException elasticsearchException) {
if (IRRECOVERABLE_REST_STATUSES.contains(elasticsearchException.status())) {
if (isExceptionIrrecoverable(elasticsearchException)) {
return elasticsearchException;
}
}
Expand All @@ -74,6 +71,21 @@ public static Throwable getFirstIrrecoverableExceptionFromBulkResponses(Collecti
return null;
}

private ExceptionRootCauseFinder() {}
public static boolean isExceptionIrrecoverable(ElasticsearchException elasticsearchException) {
if (IRRECOVERABLE_REST_STATUSES.contains(elasticsearchException.status())) {

// Even if the status indicates the exception is irrecoverable, some exceptions
// with these status are worth retrying on.

// A TaskCancelledException occurs if a sub-action of a search encounters a circuit
// breaker exception. In this case the overall search task is cancelled.
if (elasticsearchException instanceof TaskCancelledException) {
return false;
}
return true;
}
return false;
}

private ExceptionRootCauseFinder() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,17 @@
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.DocWriteRequest.OpType;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DocumentParsingException;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.XContentLocation;

Expand Down Expand Up @@ -149,6 +155,20 @@ public void testGetFirstIrrecoverableExceptionFromBulkResponses() {
assertNull(ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(bulkItemResponses.values()));
}

public void testIsIrrecoverable() {
assertFalse(ExceptionRootCauseFinder.isExceptionIrrecoverable(new MapperException("mappings problem")));
assertFalse(ExceptionRootCauseFinder.isExceptionIrrecoverable(new TaskCancelledException("cancelled task")));
assertFalse(
ExceptionRootCauseFinder.isExceptionIrrecoverable(
new CircuitBreakingException("circuit broken", CircuitBreaker.Durability.TRANSIENT)
)
);
assertTrue(ExceptionRootCauseFinder.isExceptionIrrecoverable(new IndexClosedException(new Index("index", "1234"))));
assertTrue(
ExceptionRootCauseFinder.isExceptionIrrecoverable(new DocumentParsingException(new XContentLocation(1, 2), "parse error"))
);
}

private static void assertFirstException(Collection<BulkItemResponse> bulkItemResponses, Class<?> expectedClass, String message) {
Throwable t = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(bulkItemResponses);
assertNotNull(t);
Expand Down

0 comments on commit 427f847

Please sign in to comment.