Skip to content

Commit

Permalink
Make sure Driver interrupt always handled properly
Browse files Browse the repository at this point in the history
An interrupt might occur when isFinishedInternal is executed. If it
occurs at that step it should still be post-processed correctly.
  • Loading branch information
arhimondr committed May 13, 2022
1 parent 1c4c2e5 commit 35c4f1c
Showing 1 changed file with 93 additions and 95 deletions.
188 changes: 93 additions & 95 deletions core/trino-main/src/main/java/io/trino/operator/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,22 @@ public ListenableFuture<Void> process(Duration maxRuntime, int maxIterations)
}
while (System.nanoTime() - start < maxRuntimeInNanos && iterations < maxIterations && !isFinishedInternal());
}
catch (Throwable t) {
List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
if (interrupterStack == null) {
driverContext.failed(t);
throw t;
}

// Driver thread was interrupted which should only happen if the task is already finished.
// If this becomes the actual cause of a failed query there is a bug in the task state machine.
Exception exception = new Exception("Interrupted By");
exception.setStackTrace(interrupterStack.stream().toArray(StackTraceElement[]::new));
TrinoException newException = new TrinoException(GENERIC_INTERNAL_ERROR, "Driver was interrupted", exception);
newException.addSuppressed(t);
driverContext.failed(newException);
throw newException;
}
finally {
driverContext.getYieldSignal().reset();
driverContext.recordProcessed(operationTimer);
Expand Down Expand Up @@ -364,119 +380,101 @@ private ListenableFuture<Void> processInternal(OperationTimer operationTimer)

handleMemoryRevoke();

try {
processNewSources();

// If there is only one operator, finish it
// Some operators (LookupJoinOperator and HashBuildOperator) are broken and requires finish to be called continuously
// TODO remove the second part of the if statement, when these operators are fixed
// Note: finish should not be called on the natural source of the pipeline as this could cause the task to finish early
if (!activeOperators.isEmpty() && activeOperators.size() != allOperators.size()) {
Operator rootOperator = activeOperators.get(0);
rootOperator.finish();
rootOperator.getOperatorContext().recordFinish(operationTimer);
}
processNewSources();

boolean movedPage = false;
for (int i = 0; i < activeOperators.size() - 1 && !driverContext.isDone(); i++) {
Operator current = activeOperators.get(i);
Operator next = activeOperators.get(i + 1);
// If there is only one operator, finish it
// Some operators (LookupJoinOperator and HashBuildOperator) are broken and requires finish to be called continuously
// TODO remove the second part of the if statement, when these operators are fixed
// Note: finish should not be called on the natural source of the pipeline as this could cause the task to finish early
if (!activeOperators.isEmpty() && activeOperators.size() != allOperators.size()) {
Operator rootOperator = activeOperators.get(0);
rootOperator.finish();
rootOperator.getOperatorContext().recordFinish(operationTimer);
}

// skip blocked operator
if (getBlockedFuture(current).isPresent()) {
continue;
}
boolean movedPage = false;
for (int i = 0; i < activeOperators.size() - 1 && !driverContext.isDone(); i++) {
Operator current = activeOperators.get(i);
Operator next = activeOperators.get(i + 1);

// if the current operator is not finished and next operator isn't blocked and needs input...
if (!current.isFinished() && getBlockedFuture(next).isEmpty() && next.needsInput()) {
// get an output page from current operator
Page page = current.getOutput();
current.getOperatorContext().recordGetOutput(operationTimer, page);

// if we got an output page, add it to the next operator
if (page != null && page.getPositionCount() != 0) {
next.addInput(page);
next.getOperatorContext().recordAddInput(operationTimer, page);
movedPage = true;
}
// skip blocked operator
if (getBlockedFuture(current).isPresent()) {
continue;
}

if (current instanceof SourceOperator) {
movedPage = true;
}
// if the current operator is not finished and next operator isn't blocked and needs input...
if (!current.isFinished() && getBlockedFuture(next).isEmpty() && next.needsInput()) {
// get an output page from current operator
Page page = current.getOutput();
current.getOperatorContext().recordGetOutput(operationTimer, page);

// if we got an output page, add it to the next operator
if (page != null && page.getPositionCount() != 0) {
next.addInput(page);
next.getOperatorContext().recordAddInput(operationTimer, page);
movedPage = true;
}

// if current operator is finished...
if (current.isFinished()) {
// let next operator know there will be no more data
next.finish();
next.getOperatorContext().recordFinish(operationTimer);
if (current instanceof SourceOperator) {
movedPage = true;
}
}

for (int index = activeOperators.size() - 1; index >= 0; index--) {
if (activeOperators.get(index).isFinished()) {
// close and remove this operator and all source operators
List<Operator> finishedOperators = this.activeOperators.subList(0, index + 1);
Throwable throwable = closeAndDestroyOperators(finishedOperators);
finishedOperators.clear();
if (throwable != null) {
throwIfUnchecked(throwable);
throw new RuntimeException(throwable);
}
// Finish the next operator, which is now the first operator.
if (!activeOperators.isEmpty()) {
Operator newRootOperator = activeOperators.get(0);
newRootOperator.finish();
newRootOperator.getOperatorContext().recordFinish(operationTimer);
}
break;
}
// if current operator is finished...
if (current.isFinished()) {
// let next operator know there will be no more data
next.finish();
next.getOperatorContext().recordFinish(operationTimer);
}
}

// if we did not move any pages, check if we are blocked
if (!movedPage) {
List<Operator> blockedOperators = new ArrayList<>();
List<ListenableFuture<Void>> blockedFutures = new ArrayList<>();
for (Operator operator : activeOperators) {
Optional<ListenableFuture<Void>> blocked = getBlockedFuture(operator);
if (blocked.isPresent()) {
blockedOperators.add(operator);
blockedFutures.add(blocked.get());
}
for (int index = activeOperators.size() - 1; index >= 0; index--) {
if (activeOperators.get(index).isFinished()) {
// close and remove this operator and all source operators
List<Operator> finishedOperators = this.activeOperators.subList(0, index + 1);
Throwable throwable = closeAndDestroyOperators(finishedOperators);
finishedOperators.clear();
if (throwable != null) {
throwIfUnchecked(throwable);
throw new RuntimeException(throwable);
}

if (!blockedFutures.isEmpty()) {
// unblock when the first future is complete
ListenableFuture<Void> blocked = firstFinishedFuture(blockedFutures);
// driver records serial blocked time
driverContext.recordBlocked(blocked);
// each blocked operator is responsible for blocking the execution
// until one of the operators can continue
for (Operator operator : blockedOperators) {
operator.getOperatorContext().recordBlocked(blocked);
}
return blocked;
// Finish the next operator, which is now the first operator.
if (!activeOperators.isEmpty()) {
Operator newRootOperator = activeOperators.get(0);
newRootOperator.finish();
newRootOperator.getOperatorContext().recordFinish(operationTimer);
}
break;
}

return NOT_BLOCKED;
}
catch (Throwable t) {
List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
if (interrupterStack == null) {
driverContext.failed(t);
throw t;

// if we did not move any pages, check if we are blocked
if (!movedPage) {
List<Operator> blockedOperators = new ArrayList<>();
List<ListenableFuture<Void>> blockedFutures = new ArrayList<>();
for (Operator operator : activeOperators) {
Optional<ListenableFuture<Void>> blocked = getBlockedFuture(operator);
if (blocked.isPresent()) {
blockedOperators.add(operator);
blockedFutures.add(blocked.get());
}
}

// Driver thread was interrupted which should only happen if the task is already finished.
// If this becomes the actual cause of a failed query there is a bug in the task state machine.
Exception exception = new Exception("Interrupted By");
exception.setStackTrace(interrupterStack.stream().toArray(StackTraceElement[]::new));
TrinoException newException = new TrinoException(GENERIC_INTERNAL_ERROR, "Driver was interrupted", exception);
newException.addSuppressed(t);
driverContext.failed(newException);
throw newException;
if (!blockedFutures.isEmpty()) {
// unblock when the first future is complete
ListenableFuture<Void> blocked = firstFinishedFuture(blockedFutures);
// driver records serial blocked time
driverContext.recordBlocked(blocked);
// each blocked operator is responsible for blocking the execution
// until one of the operators can continue
for (Operator operator : blockedOperators) {
operator.getOperatorContext().recordBlocked(blocked);
}
return blocked;
}
}

return NOT_BLOCKED;
}

@GuardedBy("exclusiveLock")
Expand Down

0 comments on commit 35c4f1c

Please sign in to comment.