Skip to content

Commit

Permalink
Merge pull request #460 from lutovich/1.5-close-tx-in-session-close
Browse files Browse the repository at this point in the history
Close open transaction when session is closed
  • Loading branch information
zhenlineo authored Jan 23, 2018
2 parents bdb51bf + b0d0935 commit 5864fb6
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;

Expand Down Expand Up @@ -361,28 +362,12 @@ private BiFunction<Void,Throwable,Void> handleCommitOrRollback( Throwable cursor
{
return ( ignore, commitOrRollbackError ) ->
{
if ( cursorFailure != null && commitOrRollbackError != null )
CompletionException combinedError = Futures.combineErrors( cursorFailure, commitOrRollbackError );
if ( combinedError != null )
{
Throwable cause1 = Futures.completionExceptionCause( cursorFailure );
Throwable cause2 = Futures.completionExceptionCause( commitOrRollbackError );
if ( cause1 != cause2 )
{
cause1.addSuppressed( cause2 );
}
throw Futures.asCompletionException( cause1 );
}
else if ( cursorFailure != null )
{
throw Futures.asCompletionException( cursorFailure );
}
else if ( commitOrRollbackError != null )
{
throw Futures.asCompletionException( commitOrRollbackError );
}
else
{
return null;
throw combinedError;
}
return null;
};
}

Expand Down
49 changes: 23 additions & 26 deletions driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,24 +167,23 @@ public CompletionStage<Void> closeAsync()
{
return resultCursorStage.thenCompose( cursor ->
{
if ( cursor == null )
if ( cursor != null )
{
return completedWithNull();
// there exists a cursor with potentially unconsumed error, try to extract and propagate it
return cursor.failureAsync();
}
return cursor.failureAsync();
} ).thenCompose( error -> releaseResources().thenApply( ignore ->
// no result cursor exists so no error exists
return completedWithNull();
} ).thenCompose( cursorError -> closeTransactionAndReleaseConnection().thenApply( txCloseError ->
{
if ( error != null )
{
// connection has been acquired and there is an unconsumed error in result cursor
throw Futures.asCompletionException( error );
}
else
// now we have cursor error, active transaction has been closed and connection has been released
// back to the pool; try to propagate cursor and transaction close errors, if any
CompletionException combinedError = Futures.combineErrors( cursorError, txCloseError );
if ( combinedError != null )
{
// either connection acquisition failed or
// there are no unconsumed errors in the result cursor
return null;
throw combinedError;
}
return null;
} ) );
}
return completedWithNull();
Expand Down Expand Up @@ -520,26 +519,22 @@ private CompletionStage<Connection> acquireConnection( AccessMode mode )
return newConnectionStage;
}

private CompletionStage<Void> releaseResources()
{
return rollbackTransaction().thenCompose( ignore -> releaseConnection() );
}

private CompletionStage<Void> rollbackTransaction()
private CompletionStage<Throwable> closeTransactionAndReleaseConnection()
{
return existingTransactionOrNull().thenCompose( tx ->
{
if ( tx != null )
{
return tx.rollbackAsync();
// there exists an open transaction, let's close it and propagate the error, if any
return tx.closeAsync()
.thenApply( ignore -> (Throwable) null )
.exceptionally( error -> error );
}
// no open transaction so nothing to close
return completedWithNull();
} ).exceptionally( error ->
{
Throwable cause = Futures.completionExceptionCause( error );
logger.warn( "Active transaction rolled back with an error", cause );
return null;
} );
} ).thenCompose( txCloseError ->
// then release the connection and propagate transaction close error, if any
releaseConnection().thenApply( ignore -> txCloseError ) );
}

private CompletionStage<Void> releaseConnection()
Expand All @@ -548,8 +543,10 @@ private CompletionStage<Void> releaseConnection()
{
if ( connection != null )
{
// there exists connection, try to release it back to the pool
return connection.release();
}
// no connection so return null
return completedWithNull();
} );
}
Expand Down
34 changes: 34 additions & 0 deletions driver/src/main/java/org/neo4j/driver/internal/util/Futures.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,40 @@ public static CompletionException asCompletionException( Throwable error )
return new CompletionException( error );
}

/**
* Combine given errors into a single {@link CompletionException} to be rethrown from inside a
* {@link CompletionStage} chain.
*
* @param error1 the first error or {@code null}.
* @param error2 the second error or {@code null}.
* @return {@code null} if both errors are null, {@link CompletionException} otherwise.
*/
public static CompletionException combineErrors( Throwable error1, Throwable error2 )
{
if ( error1 != null && error2 != null )
{
Throwable cause1 = completionExceptionCause( error1 );
Throwable cause2 = completionExceptionCause( error2 );
if ( cause1 != cause2 )
{
cause1.addSuppressed( cause2 );
}
return asCompletionException( cause1 );
}
else if ( error1 != null )
{
return asCompletionException( error1 );
}
else if ( error2 != null )
{
return asCompletionException( error2 );
}
else
{
return null;
}
}

private static void safeRun( Runnable runnable )
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.neo4j.driver.internal.async.EventLoopGroupFactory;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -360,4 +361,43 @@ public void shouldKeepCompletionExceptionAsIs()
CompletionException error = new CompletionException( new RuntimeException( "Hello" ) );
assertEquals( error, Futures.asCompletionException( error ) );
}

@Test
public void shouldCombineTwoErrors()
{
RuntimeException error1 = new RuntimeException( "Error1" );
RuntimeException error2Cause = new RuntimeException( "Error2" );
CompletionException error2 = new CompletionException( error2Cause );

CompletionException combined = Futures.combineErrors( error1, error2 );

assertEquals( error1, combined.getCause() );
assertArrayEquals( new Throwable[]{error2Cause}, combined.getCause().getSuppressed() );
}

@Test
public void shouldCombineErrorAndNull()
{
RuntimeException error1 = new RuntimeException( "Error1" );

CompletionException combined = Futures.combineErrors( error1, null );

assertEquals( error1, combined.getCause() );
}

@Test
public void shouldCombineNullAndError()
{
RuntimeException error2 = new RuntimeException( "Error2" );

CompletionException combined = Futures.combineErrors( null, error2 );

assertEquals( error2, combined.getCause() );
}

@Test
public void shouldCombineNullAndNullErrors()
{
assertNull( Futures.combineErrors( null, null ) );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,19 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;

import org.neo4j.driver.internal.DriverFactory;
import org.neo4j.driver.internal.cluster.RoutingContext;
import org.neo4j.driver.internal.cluster.RoutingSettings;
import org.neo4j.driver.internal.logging.ConsoleLogging;
import org.neo4j.driver.internal.retry.RetrySettings;
import org.neo4j.driver.internal.util.DriverFactoryWithFixedRetryLogic;
import org.neo4j.driver.internal.util.DriverFactoryWithOneEventLoopThread;
import org.neo4j.driver.internal.util.ServerVersion;
import org.neo4j.driver.v1.AccessMode;
import org.neo4j.driver.v1.AuthToken;
import org.neo4j.driver.v1.AuthTokens;
import org.neo4j.driver.v1.Config;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;
Expand All @@ -62,6 +65,7 @@
import org.neo4j.driver.v1.exceptions.TransientException;
import org.neo4j.driver.v1.summary.ResultSummary;
import org.neo4j.driver.v1.summary.StatementType;
import org.neo4j.driver.v1.util.StubServer;
import org.neo4j.driver.v1.util.TestNeo4j;
import org.neo4j.driver.v1.util.TestUtil;

Expand Down Expand Up @@ -1296,6 +1300,50 @@ public void shouldNotAllowStartingMultipleTransactions()
}
}

@Test
public void shouldCloseOpenTransactionWhenClosed()
{
try ( Session session = neo4j.driver().session() )
{
Transaction tx = session.beginTransaction();
tx.run( "CREATE (:Node {id: 123})" );
tx.run( "CREATE (:Node {id: 456})" );

tx.success();
}

assertEquals( 1, countNodesWithId( 123 ) );
assertEquals( 1, countNodesWithId( 456 ) );
}

@Test
public void shouldRollbackOpenTransactionWhenClosed()
{
try ( Session session = neo4j.driver().session() )
{
Transaction tx = session.beginTransaction();
tx.run( "CREATE (:Node {id: 123})" );
tx.run( "CREATE (:Node {id: 456})" );

tx.failure();
}

assertEquals( 0, countNodesWithId( 123 ) );
assertEquals( 0, countNodesWithId( 456 ) );
}

@Test
public void shouldPropagateTransactionCommitErrorWhenClosed() throws Exception
{
testTransactionCloseErrorPropagationWhenSessionClosed( "commit_error.script", true, "Unable to commit" );
}

@Test
public void shouldPropagateTransactionRollbackErrorWhenClosed() throws Exception
{
testTransactionCloseErrorPropagationWhenSessionClosed( "rollback_error.script", false, "Unable to rollback" );
}

private void testExecuteReadTx( AccessMode sessionMode )
{
Driver driver = neo4j.driver();
Expand Down Expand Up @@ -1501,6 +1549,52 @@ private static void await( CountDownLatch latch )
}
}

private static void testTransactionCloseErrorPropagationWhenSessionClosed( String script, boolean commit,
String expectedErrorMessage ) throws Exception
{
StubServer server = StubServer.start( script, 9001 );
try
{
Config config = Config.build()
.withLogging( DEV_NULL_LOGGING )
.withLogging( new ConsoleLogging( Level.INFO ) )
.withoutEncryption()
.toConfig();
try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", AuthTokens.none(), config ) )
{
Session session = driver.session();

Transaction tx = session.beginTransaction();
StatementResult result = tx.run( "CREATE (n {name:'Alice'}) RETURN n.name AS name" );
assertEquals( "Alice", result.single().get( "name" ).asString() );

if ( commit )
{
tx.success();
}
else
{
tx.failure();
}

try
{
session.close();
fail( "Exception expected" );
}
catch ( TransientException e )
{
assertEquals( "Neo.TransientError.General.DatabaseUnavailable", e.code() );
assertEquals( expectedErrorMessage, e.getMessage() );
}
}
}
finally
{
assertEquals( 0, server.exitStatus() );
}
}

private static class ThrowingWork implements TransactionWork<Record>
{
final String query;
Expand Down

0 comments on commit 5864fb6

Please sign in to comment.