Skip to content

Commit 65d7da5

Browse files
committed
[INTERNAL] Add transaction termination
This update brings a PRIVATE API for transaction termination. It replaces the `InternalReactiveTransaction.interrupt()` and provides a similar functionality that is also available in the synchronous API.
1 parent c5bf627 commit 65d7da5

File tree

17 files changed

+154
-54
lines changed

17 files changed

+154
-54
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.exceptions;
20+
21+
import java.io.Serial;
22+
23+
/**
24+
* Indicates that the transaction has been terminated.
25+
* @since 5.10
26+
*/
27+
public class TransactionTerminatedException extends ClientException {
28+
@Serial
29+
private static final long serialVersionUID = 7639191706067500206L;
30+
31+
/**
32+
* Creates a new instance.
33+
* @param message the message
34+
*/
35+
public TransactionTerminatedException(String message) {
36+
super(message);
37+
}
38+
}

driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,22 @@ public boolean isOpen() {
6666
return tx.isOpen();
6767
}
6868

69+
/**
70+
* <b>THIS IS A PRIVATE API</b>
71+
* <p>
72+
* Terminates the transaction by sending the Bolt {@code RESET} message and waiting for its response as long as the
73+
* transaction has not already been terminated, is not closed or closing.
74+
*
75+
* @since 5.10
76+
* @throws org.neo4j.driver.exceptions.ClientException if the transaction is closed or is closing
77+
* @see org.neo4j.driver.exceptions.TransactionTerminatedException
78+
*/
79+
public void terminate() {
80+
Futures.blockingGet(
81+
tx.terminateAsync(),
82+
() -> terminateConnectionOnThreadInterrupt("Thread interrupted while terminating the transaction"));
83+
}
84+
6985
private void terminateConnectionOnThreadInterrupt(String reason) {
7086
tx.connection().terminateAndRelease(reason);
7187
}

driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.atomic.AtomicReference;
3333
import org.neo4j.driver.Logger;
3434
import org.neo4j.driver.Logging;
35+
import org.neo4j.driver.exceptions.Neo4jException;
3536
import org.neo4j.driver.internal.BoltServerAddress;
3637
import org.neo4j.driver.internal.async.connection.ChannelAttributes;
3738
import org.neo4j.driver.internal.async.inbound.ConnectionReadTimeoutHandler;
@@ -146,9 +147,9 @@ public void writeAndFlush(Message message1, ResponseHandler handler1, Message me
146147
}
147148

148149
@Override
149-
public CompletionStage<Void> reset() {
150+
public CompletionStage<Void> reset(Neo4jException error) {
150151
CompletableFuture<Void> result = new CompletableFuture<>();
151-
ResetResponseHandler handler = new ResetResponseHandler(messageDispatcher, result);
152+
ResetResponseHandler handler = new ResetResponseHandler(messageDispatcher, result, error);
152153
writeResetMessageIfNeeded(handler, true);
153154
return result;
154155
}

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.neo4j.driver.async.ResultCursor;
4444
import org.neo4j.driver.exceptions.ClientException;
4545
import org.neo4j.driver.exceptions.TransactionNestingException;
46+
import org.neo4j.driver.exceptions.TransactionTerminatedException;
4647
import org.neo4j.driver.internal.DatabaseBookmark;
4748
import org.neo4j.driver.internal.DatabaseName;
4849
import org.neo4j.driver.internal.FailableCursor;
@@ -175,7 +176,8 @@ public CompletionStage<Void> resetAsync() {
175176
return existingTransactionOrNull()
176177
.thenAccept(tx -> {
177178
if (tx != null) {
178-
tx.markTerminated(null);
179+
tx.markTerminated(new TransactionTerminatedException(
180+
"The transaction has been explicitly terminated by the driver"));
179181
}
180182
})
181183
.thenCompose(ignore -> connectionStage)

driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal.async;
2020

21+
import static java.util.concurrent.CompletableFuture.completedFuture;
2122
import static org.neo4j.driver.internal.util.Futures.asCompletionException;
2223
import static org.neo4j.driver.internal.util.Futures.combineErrors;
2324
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
@@ -44,6 +45,7 @@
4445
import org.neo4j.driver.exceptions.AuthorizationExpiredException;
4546
import org.neo4j.driver.exceptions.ClientException;
4647
import org.neo4j.driver.exceptions.ConnectionReadTimeoutException;
48+
import org.neo4j.driver.exceptions.TransactionTerminatedException;
4749
import org.neo4j.driver.internal.DatabaseBookmark;
4850
import org.neo4j.driver.internal.cursor.AsyncResultCursor;
4951
import org.neo4j.driver.internal.cursor.RxResultCursor;
@@ -319,20 +321,21 @@ private CompletionStage<Void> closeAsync(boolean commit, boolean completeWithNul
319321
return stage;
320322
}
321323

322-
/**
323-
* Marks transaction as terminated and sends {@code RESET} message over allocated connection.
324-
* <p>
325-
* <b>THIS METHOD IS NOT PART OF PUBLIC API. This method may be changed or removed at any moment in time.</b>
326-
*
327-
* @return {@code RESET} response stage
328-
*/
329-
public CompletionStage<Void> interruptAsync() {
324+
public CompletionStage<Void> terminateAsync() {
330325
return executeWithLock(lock, () -> {
331-
if (interruptStage == null) {
332-
markTerminated(null);
333-
interruptStage = connection.reset();
326+
if (!isOpen() || commitFuture != null || rollbackFuture != null) {
327+
return failedFuture(new ClientException("Can't terminate closed or closing transaction"));
328+
} else {
329+
if (state == State.TERMINATED) {
330+
return interruptStage != null ? interruptStage : completedFuture(null);
331+
} else {
332+
var error = new TransactionTerminatedException(
333+
"The transaction has been explicitly terminated by the driver");
334+
markTerminated(error);
335+
interruptStage = connection.reset(error);
336+
return interruptStage;
337+
}
334338
}
335-
return interruptStage;
336339
});
337340
}
338341
}

driver/src/main/java/org/neo4j/driver/internal/async/connection/DirectConnection.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.concurrent.CompletionStage;
2222
import org.neo4j.driver.AccessMode;
23+
import org.neo4j.driver.exceptions.Neo4jException;
2324
import org.neo4j.driver.internal.BoltServerAddress;
2425
import org.neo4j.driver.internal.DatabaseName;
2526
import org.neo4j.driver.internal.DirectConnectionProvider;
@@ -84,8 +85,8 @@ public void writeAndFlush(Message message1, ResponseHandler handler1, Message me
8485
}
8586

8687
@Override
87-
public CompletionStage<Void> reset() {
88-
return delegate.reset();
88+
public CompletionStage<Void> reset(Neo4jException error) {
89+
return delegate.reset(error);
8990
}
9091

9192
@Override

driver/src/main/java/org/neo4j/driver/internal/async/connection/RoutingConnection.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.concurrent.CompletionStage;
2222
import org.neo4j.driver.AccessMode;
23+
import org.neo4j.driver.exceptions.Neo4jException;
2324
import org.neo4j.driver.internal.BoltServerAddress;
2425
import org.neo4j.driver.internal.DatabaseName;
2526
import org.neo4j.driver.internal.RoutingErrorHandler;
@@ -84,8 +85,8 @@ public void writeAndFlush(Message message1, ResponseHandler handler1, Message me
8485
}
8586

8687
@Override
87-
public CompletionStage<Void> reset() {
88-
return delegate.reset();
88+
public CompletionStage<Void> reset(Neo4jException error) {
89+
return delegate.reset(error);
8990
}
9091

9192
@Override

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Arrays;
2929
import java.util.LinkedList;
3030
import java.util.Map;
31+
import java.util.Optional;
3132
import java.util.Queue;
3233
import org.neo4j.driver.Logger;
3334
import org.neo4j.driver.Logging;
@@ -146,22 +147,32 @@ public void handleFailureMessage(String code, String message) {
146147
public void handleIgnoredMessage() {
147148
log.debug("S: IGNORED");
148149

149-
ResponseHandler handler = removeHandler();
150+
var handler = removeHandler();
150151

151152
Throwable error;
152153
if (currentError != null) {
153154
error = currentError;
154155
} else {
155-
log.warn(
156-
"Received IGNORED message for handler %s but error is missing and RESET is not in progress. "
157-
+ "Current handlers %s",
158-
handler, handlers);
159-
160-
error = new ClientException("Database ignored the request");
156+
var resetHandlerOpt = getResetResponseHandler();
157+
if (resetHandlerOpt.isEmpty()) {
158+
log.warn(
159+
"Received IGNORED message for handler %s but error is missing and RESET is not in progress. Current handlers %s",
160+
handler, handlers);
161+
}
162+
error = resetHandlerOpt
163+
.flatMap(ResetResponseHandler::error)
164+
.orElseGet(() -> new ClientException("Database ignored the request"));
161165
}
162166
handler.onFailure(error);
163167
}
164168

169+
private Optional<ResetResponseHandler> getResetResponseHandler() {
170+
return handlers.stream()
171+
.filter(nextHandler -> nextHandler instanceof ResetResponseHandler)
172+
.map(nextHandler -> (ResetResponseHandler) nextHandler)
173+
.findFirst();
174+
}
175+
165176
public void handleChannelInactive(Throwable cause) {
166177
// report issue if the connection has not been terminated as a result of a graceful shutdown request from its
167178
// parent pool

driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,33 @@
1919
package org.neo4j.driver.internal.handlers;
2020

2121
import java.util.Map;
22+
import java.util.Optional;
2223
import java.util.concurrent.CompletableFuture;
2324
import org.neo4j.driver.Value;
25+
import org.neo4j.driver.exceptions.Neo4jException;
2426
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
2527
import org.neo4j.driver.internal.spi.ResponseHandler;
2628

2729
public class ResetResponseHandler implements ResponseHandler {
2830
private final InboundMessageDispatcher messageDispatcher;
2931
private final CompletableFuture<Void> completionFuture;
32+
private final Neo4jException error;
3033

3134
public ResetResponseHandler(InboundMessageDispatcher messageDispatcher) {
3235
this(messageDispatcher, null);
3336
}
3437

3538
public ResetResponseHandler(InboundMessageDispatcher messageDispatcher, CompletableFuture<Void> completionFuture) {
39+
this(messageDispatcher, completionFuture, null);
40+
}
41+
42+
public ResetResponseHandler(
43+
InboundMessageDispatcher messageDispatcher,
44+
CompletableFuture<Void> completionFuture,
45+
Neo4jException error) {
3646
this.messageDispatcher = messageDispatcher;
3747
this.completionFuture = completionFuture;
48+
this.error = error;
3849
}
3950

4051
@Override
@@ -52,6 +63,10 @@ public final void onRecord(Value[] fields) {
5263
throw new UnsupportedOperationException();
5364
}
5465

66+
public Optional<Neo4jException> error() {
67+
return Optional.ofNullable(error);
68+
}
69+
5570
private void resetCompleted(boolean success) {
5671
messageDispatcher.clearCurrentError();
5772
if (completionFuture != null) {

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,16 @@ public Publisher<ReactiveResult> run(Query query) {
6161
}
6262

6363
/**
64-
* Marks transaction as terminated and sends {@code RESET} message over allocated connection.
64+
* <b>THIS IS A PRIVATE API</b>
6565
* <p>
66-
* <b>THIS METHOD IS NOT PART OF PUBLIC API. This method may be changed or removed at any moment in time.</b>
66+
* Terminates the transaction by sending the Bolt {@code RESET} message and waiting for its response as long as the
67+
* transaction has not already been terminated, is not closed or closing.
6768
*
68-
* @return {@code RESET} response publisher
69+
* @return completion publisher (the {@code RESET} completion publisher if the message was sent)
70+
* @since 5.10
6971
*/
70-
public Publisher<Void> interrupt() {
71-
return publisherToFlowPublisher(Mono.fromCompletionStage(tx.interruptAsync()));
72+
public Publisher<Void> terminate() {
73+
return publisherToFlowPublisher(Mono.fromCompletionStage(tx.terminateAsync()));
7274
}
7375

7476
@Override

0 commit comments

Comments
 (0)