Skip to content

Commit

Permalink
Add transaction interruption support for internal use (#1242)
Browse files Browse the repository at this point in the history
* Add transaction interruption support for internal use

This update is for Neo4j internal use only.

* Add additional warning to documentation
  • Loading branch information
injectives authored Jun 9, 2022
1 parent 7818086 commit 30f5ed3
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ private enum State {
private CompletableFuture<Void> commitFuture;
private CompletableFuture<Void> rollbackFuture;
private Throwable causeOfTermination;
private CompletionStage<Void> interruptStage;

public UnmanagedTransaction(Connection connection, BookmarksHolder bookmarksHolder, long fetchSize) {
this(connection, bookmarksHolder, fetchSize, new ResultCursorsHolder());
Expand Down Expand Up @@ -303,4 +304,21 @@ private CompletionStage<Void> closeAsync(boolean commit, boolean completeWithNul

return stage;
}

/**
* Marks transaction as terminated and sends {@code RESET} message over allocated connection.
* <p>
* <b>THIS METHOD IS NOT PART OF PUBLIC API. This method may be changed or removed at any moment in time.</b>
*
* @return {@code RESET} response stage
*/
public CompletionStage<Void> interruptAsync() {
return executeWithLock(lock, () -> {
if (interruptStage == null) {
markTerminated(null);
interruptStage = connection.reset();
}
return interruptStage;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,15 @@ public Publisher<ReactiveResult> run(Query query) {
})
.map(InternalReactiveResult::new);
}

/**
* Marks transaction as terminated and sends {@code RESET} message over allocated connection.
* <p>
* <b>THIS METHOD IS NOT PART OF PUBLIC API. This method may be changed or removed at any moment in time.</b>
*
* @return {@code RESET} response publisher
*/
public Publisher<Void> interrupt() {
return Mono.fromCompletionStage(tx.interruptAsync());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,33 @@ void shouldReturnCompletedWithNullStageOnClosingInactiveTransactionExceptCommitt
assertNull(closeStage.toCompletableFuture().join());
}

@Test
void shouldInterruptOnInterruptAsync() {
// Given
Connection connection = connectionMock(BoltProtocolV4.INSTANCE);
UnmanagedTransaction tx = beginTx(connection);

// When
await(tx.interruptAsync());

// Then
then(connection).should().reset();
}

@Test
void shouldServeTheSameStageOnInterruptAsync() {
// Given
Connection connection = connectionMock(BoltProtocolV4.INSTANCE);
UnmanagedTransaction tx = beginTx(connection);

// When
CompletionStage<Void> stage0 = tx.interruptAsync();
CompletionStage<Void> stage1 = tx.interruptAsync();

// Then
assertEquals(stage0, stage1);
}

private static UnmanagedTransaction beginTx(Connection connection) {
return beginTx(connection, Collections.emptySet());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.reactive;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.neo4j.driver.internal.util.Futures.failedFuture;

import org.junit.jupiter.api.Test;
import org.neo4j.driver.internal.async.UnmanagedTransaction;
import reactor.test.StepVerifier;

public class InternalReactiveTransactionTest {
private InternalReactiveTransaction tx;

@Test
void shouldDelegateInterrupt() {
// Given
UnmanagedTransaction utx = mock(UnmanagedTransaction.class);
given(utx.interruptAsync()).willReturn(completedFuture(null));
tx = new InternalReactiveTransaction(utx);

// When
StepVerifier.create(tx.interrupt()).expectComplete().verify();

// Then
then(utx).should().interruptAsync();
}

@Test
void shouldDelegateInterruptAndReportError() {
// Given
UnmanagedTransaction utx = mock(UnmanagedTransaction.class);
RuntimeException e = mock(RuntimeException.class);
given(utx.interruptAsync()).willReturn(failedFuture(e));
tx = new InternalReactiveTransaction(utx);

// When
StepVerifier.create(tx.interrupt()).expectErrorMatches(ar -> ar == e).verify();

// Then
then(utx).should().interruptAsync();
}
}

0 comments on commit 30f5ed3

Please sign in to comment.