Skip to content

Commit

Permalink
Update handling of cancellation state in reactive result (#1583)
Browse files Browse the repository at this point in the history
  • Loading branch information
injectives authored Nov 14, 2024
1 parent 5a7afbb commit e611993
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public CompletionStage<ResultCursor> runAsync(Query query, TransactionConfig con
public CompletionStage<RxResultCursor> runRx(
Query query, TransactionConfig config, CompletionStage<RxResultCursor> cursorPublishStage) {
ensureSessionIsOpen();
return ensureNoOpenTxBeforeRunningQuery()
var newResultCursorStage = ensureNoOpenTxBeforeRunningQuery()
.thenCompose(ignore -> acquireConnection(mode))
.thenCompose(connection -> {
var parameters = query.parameters().asMap(Values::value);
Expand Down Expand Up @@ -244,9 +244,12 @@ public CompletionStage<RxResultCursor> runRx(
}
})
.thenCompose(Function.identity());
resultCursorStage = cursorStage.exceptionally(error -> null);
return cursorStage.thenApply(Function.identity());
});
resultCursorStage = newResultCursorStage
.thenCompose(cursor -> cursor == null ? CompletableFuture.completedFuture(null) : cursorPublishStage)
.exceptionally(throwable -> null);
return newResultCursorStage;
}

public CompletionStage<UnmanagedTransaction> beginTransactionAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,18 @@ public CompletionStage<ResultSummary> summaryAsync() {

@Override
public CompletionStage<Void> rollback() {
log.trace("[%d] Rolling back unpublished result", hashCode());
synchronized (this) {
state = State.SUCCEEDED;
log.trace("[%d] Rolling back unpublished result %s state", hashCode(), state);
switch (state) {
case READY -> state = State.SUCCEEDED;
case STREAMING, DISCARDING -> {
return summaryFuture.thenApply(ignored -> null);
}
case FAILED, SUCCEEDED -> {
return CompletableFuture.completedFuture(null);
}
}
}
completeSummaryFuture(null, null);
var resetFuture = new CompletableFuture<Void>();
boltConnection
.reset()
Expand All @@ -319,14 +326,17 @@ public void onComplete() {
resetFuture.completeExceptionally(throwable);
}
});
return resetFuture.thenCompose(ignored -> boltConnection.close()).exceptionally(throwable -> null);
return resetFuture
.thenCompose(ignored -> boltConnection.close())
.whenComplete((ignored, throwable) -> completeSummaryFuture(null, null))
.exceptionally(throwable -> null);
}

@Override
public void onComplete() {
log.trace("[%d] onComplete", hashCode());
Runnable runnable;
synchronized (this) {
log.trace("[%d] onComplete", hashCode());
var throwable = interruptSupplier.get();
if (throwable != null) {
handleError(throwable, true);
Expand Down

0 comments on commit e611993

Please sign in to comment.