Skip to content

Commit

Permalink
Handle RejectedExecutionException in ShardFollowTasksExecutor (#65648) (
Browse files Browse the repository at this point in the history
#65653)

Follow-up to #65415. We can't have this exception bubble up in an exception
handler any longer due to the new assertion so we must handle it here.
  • Loading branch information
original-brownbear authored Dec 1, 2020
1 parent 6bbeedc commit 16642f1
Showing 1 changed file with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -544,7 +545,12 @@ protected void nodeOperation(final AllocatedPersistentTask task, final ShardFoll
if (ShardFollowNodeTask.shouldRetry(e)) {
logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number",
shardFollowNodeTask), e);
threadPool.schedule(() -> nodeOperation(task, params, state), params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME);
try {
threadPool.schedule(() -> nodeOperation(task, params, state), params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME);
} catch (EsRejectedExecutionException rex) {
rex.addSuppressed(e);
shardFollowNodeTask.onFatalFailure(rex);
}
} else {
shardFollowNodeTask.onFatalFailure(e);
}
Expand Down

0 comments on commit 16642f1

Please sign in to comment.