From 61b437105a55c7e5cab5360936031d008bfd296f Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Thu, 24 Mar 2022 12:05:15 +0000 Subject: [PATCH] Update managed transactions --- driver/clirr-ignored-differences.xml | 146 +++++++++++++ .../java/org/neo4j/driver/QueryRunner.java | 140 +------------ .../main/java/org/neo4j/driver/Session.java | 192 ++++++++++++++--- .../org/neo4j/driver/SimpleQueryRunner.java | 148 +++++++++++++ .../org/neo4j/driver/TransactionCallback.java | 24 +++ .../org/neo4j/driver/TransactionContext.java | 23 ++ .../org/neo4j/driver/TransactionWork.java | 1 + .../org/neo4j/driver/async/AsyncSession.java | 182 +++++++++++++++- .../async/AsyncTransactionCallback.java | 24 +++ .../driver/async/AsyncTransactionContext.java | 23 ++ .../driver/async/AsyncTransactionWork.java | 1 + .../DelegatingTransactionContext.java | 68 ++++++ .../driver/internal/InternalSession.java | 13 ++ .../DelegatingAsyncTransactionContext.java | 69 ++++++ .../internal/async/InternalAsyncSession.java | 13 ++ .../DelegatingRxTransactionContext.java | 68 ++++++ .../internal/reactive/InternalRxSession.java | 15 +- .../org/neo4j/driver/reactive/RxSession.java | 197 +++++++++++++++++- .../reactive/RxTransactionCallback.java | 24 +++ .../driver/reactive/RxTransactionContext.java | 23 ++ .../driver/reactive/RxTransactionWork.java | 1 + .../neo4j/driver/util/SessionExtension.java | 17 +- 22 files changed, 1235 insertions(+), 177 deletions(-) create mode 100644 driver/src/main/java/org/neo4j/driver/SimpleQueryRunner.java create mode 100644 driver/src/main/java/org/neo4j/driver/TransactionCallback.java create mode 100644 driver/src/main/java/org/neo4j/driver/TransactionContext.java create mode 100644 driver/src/main/java/org/neo4j/driver/async/AsyncTransactionCallback.java create mode 100644 driver/src/main/java/org/neo4j/driver/async/AsyncTransactionContext.java create mode 100644 driver/src/main/java/org/neo4j/driver/internal/DelegatingTransactionContext.java create mode 100644 driver/src/main/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContext.java create mode 100644 driver/src/main/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContext.java create mode 100644 driver/src/main/java/org/neo4j/driver/reactive/RxTransactionCallback.java create mode 100644 driver/src/main/java/org/neo4j/driver/reactive/RxTransactionContext.java diff --git a/driver/clirr-ignored-differences.xml b/driver/clirr-ignored-differences.xml index ff46e0640a..66893cb60c 100644 --- a/driver/clirr-ignored-differences.xml +++ b/driver/clirr-ignored-differences.xml @@ -122,4 +122,150 @@ org.reactivestreams.Publisher isOpen() + + org/neo4j/driver/Session + 7012 + java.lang.Object executeRead(org.neo4j.driver.TransactionCallback) + + + + org/neo4j/driver/Session + 7012 + java.lang.Object executeRead(org.neo4j.driver.TransactionCallback, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/Session + 7012 + void executeReadWithoutResult(java.util.function.Consumer) + + + + org/neo4j/driver/Session + 7012 + void executeReadWithoutResult(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/Session + 7012 + java.lang.Object executeWrite(org.neo4j.driver.TransactionCallback) + + + + org/neo4j/driver/Session + 7012 + java.lang.Object executeWrite(org.neo4j.driver.TransactionCallback, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/Session + 7012 + void executeWriteWithoutResult(java.util.function.Consumer) + + + + org/neo4j/driver/Session + 7012 + void executeWriteWithoutResult(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeReadAsync(org.neo4j.driver.async.AsyncTransactionCallback) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeReadAsync(org.neo4j.driver.async.AsyncTransactionCallback, org.neo4j.driver.TransactionConfig) + + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeReadWithoutResultAsync(java.util.function.Consumer) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeReadWithoutResultAsync(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeWriteAsync(org.neo4j.driver.async.AsyncTransactionCallback) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeWriteAsync(org.neo4j.driver.async.AsyncTransactionCallback, org.neo4j.driver.TransactionConfig) + + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeWriteWithoutResultAsync(java.util.function.Consumer) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeWriteWithoutResultAsync(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeRead(org.neo4j.driver.reactive.RxTransactionCallback) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeRead(org.neo4j.driver.reactive.RxTransactionCallback, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeReadWithoutResult(java.util.function.Consumer) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeReadWithoutResult(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeWrite(org.neo4j.driver.reactive.RxTransactionCallback) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeWrite(org.neo4j.driver.reactive.RxTransactionCallback, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeWriteWithoutResult(java.util.function.Consumer) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeWriteWithoutResult(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + diff --git a/driver/src/main/java/org/neo4j/driver/QueryRunner.java b/driver/src/main/java/org/neo4j/driver/QueryRunner.java index a27f767107..7492cc1af9 100644 --- a/driver/src/main/java/org/neo4j/driver/QueryRunner.java +++ b/driver/src/main/java/org/neo4j/driver/QueryRunner.java @@ -18,147 +18,11 @@ */ package org.neo4j.driver; -import java.util.Map; - /** - * Common interface for components that can execute Neo4j queries. - * - *

Important notes on semantics

- *

- * queries run in the same {@link QueryRunner} are guaranteed - * to execute in order, meaning changes made by one query will be seen - * by all subsequent queries in the same {@link QueryRunner}. - *

- * However, to allow handling very large results, and to improve performance, - * result streams are retrieved lazily from the network. - * This means that when any of {@link #run(Query)} - * methods return a result, the query has only started executing - it may not - * have completed yet. Most of the time, you will not notice this, because the - * driver automatically waits for queries to complete at specific points to - * fulfill its contracts. - *

- * Specifically, the driver will ensure all outstanding queries are completed - * whenever you: + * An {@link AutoCloseable} extension of the {@link SimpleQueryRunner}. * - *

- *

- * As noted, most of the time, you will not need to consider this - your writes will - * always be durably stored as long as you either use the results, explicitly commit - * {@link Transaction transactions} or close the session you used using {@link Session#close()}. - *

- * While these semantics introduce some complexity, it gives the driver the ability - * to handle infinite result streams (like subscribing to events), significantly lowers - * the memory overhead for your application and improves performance. - * - * @see Session - * @see Transaction * @since 1.0 */ -public interface QueryRunner extends AutoCloseable +public interface QueryRunner extends SimpleQueryRunner, AutoCloseable { - /** - * Run a query and return a result stream. - *

- * This method takes a set of parameters that will be injected into the - * query by Neo4j. Using parameters is highly encouraged, it helps avoid - * dangerous cypher injection attacks and improves database performance as - * Neo4j can re-use query plans more often. - *

- * This particular method takes a {@link Value} as its input. This is useful - * if you want to take a map-like value that you've gotten from a prior result - * and send it back as parameters. - *

- * If you are creating parameters programmatically, {@link #run(String, Map)} - * might be more helpful, it converts your map to a {@link Value} for you. - * - *

Example

- *
-     * {@code
-     *
-     * Result result = session.run( "MATCH (n) WHERE n.name = $myNameParam RETURN (n)",
-     *                                       Values.parameters( "myNameParam", "Bob" ) );
-     * }
-     * 
- * - * @param query text of a Neo4j query - * @param parameters input parameters, should be a map Value, see {@link Values#parameters(Object...)}. - * @return a stream of result values and associated metadata - */ - Result run(String query, Value parameters ); - - /** - * Run a query and return a result stream. - *

- * This method takes a set of parameters that will be injected into the - * query by Neo4j. Using parameters is highly encouraged, it helps avoid - * dangerous cypher injection attacks and improves database performance as - * Neo4j can re-use query plans more often. - *

- * This version of run takes a {@link Map} of parameters. The values in the map - * must be values that can be converted to Neo4j types. See {@link Values#parameters(Object...)} for - * a list of allowed types. - * - *

Example

- *
-     * {@code
-     *
-     * Map parameters = new HashMap();
-     * parameters.put("myNameParam", "Bob");
-     *
-     * Result result = session.run( "MATCH (n) WHERE n.name = $myNameParam RETURN (n)",
-     *                                       parameters );
-     * }
-     * 
- * - * @param query text of a Neo4j query - * @param parameters input data for the query - * @return a stream of result values and associated metadata - */ - Result run(String query, Map parameters ); - - /** - * Run a query and return a result stream. - *

- * This method takes a set of parameters that will be injected into the - * query by Neo4j. Using parameters is highly encouraged, it helps avoid - * dangerous cypher injection attacks and improves database performance as - * Neo4j can re-use query plans more often. - *

- * This version of run takes a {@link Record} of parameters, which can be useful - * if you want to use the output of one query as input for another. - * - * @param query text of a Neo4j query - * @param parameters input data for the query - * @return a stream of result values and associated metadata - */ - Result run(String query, Record parameters ); - - /** - * Run a query and return a result stream. - * - * @param query text of a Neo4j query - * @return a stream of result values and associated metadata - */ - Result run(String query ); - - /** - * Run a query and return a result stream. - *

Example

- *
-     * {@code
-     *
-     * Query query = new Query( "MATCH (n) WHERE n.name = $myNameParam RETURN n.age" );
-     * Result result = session.run( query.withParameters( Values.parameters( "myNameParam", "Bob" )  ) );
-     * }
-     * 
- * - * @param query a Neo4j query - * @return a stream of result values and associated metadata - */ - Result run(Query query); } diff --git a/driver/src/main/java/org/neo4j/driver/Session.java b/driver/src/main/java/org/neo4j/driver/Session.java index d4823b4969..225835b69e 100644 --- a/driver/src/main/java/org/neo4j/driver/Session.java +++ b/driver/src/main/java/org/neo4j/driver/Session.java @@ -19,6 +19,7 @@ package org.neo4j.driver; import java.util.Map; +import java.util.function.Consumer; import org.neo4j.driver.async.AsyncSession; import org.neo4j.driver.util.Resource; @@ -87,67 +88,208 @@ public interface Session extends Resource, QueryRunner * @param work the {@link TransactionWork} to be applied to a new read transaction. * @param the return type of the given unit of work. * @return a result as returned by the given unit of work. + * @deprecated superseded by {@link #executeRead(TransactionCallback)}. */ + @Deprecated T readTransaction( TransactionWork work ); /** - * Execute a unit of work in a managed {@link AccessMode#READ read} transaction - * with the specified {@link TransactionConfig configuration}. + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. *

- * This transaction will automatically be committed unless an exception is - * thrown during query execution or by the user code. + * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. *

- * Managed transactions should not generally be explicitly committed (via - * {@link Transaction#commit()}). + * The provided unit of work should not return {@link Result} object. * - * @param work the {@link TransactionWork} to be applied to a new read transaction. + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a result as returned by the given unit of work. + */ + default T executeRead( TransactionCallback callback ) + { + return executeRead( callback, TransactionConfig.empty() ); + } + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + * + * @param contextConsumer the consumer representing the unit of work. + */ + default void executeReadWithoutResult( Consumer contextConsumer ) + { + executeRead( tc -> + { + contextConsumer.accept( tc ); + return null; + } ); + } + + /** + * Execute a unit of work in a managed {@link AccessMode#READ read} transaction with the specified {@link TransactionConfig configuration}. + *

+ * This transaction will automatically be committed unless an exception is thrown during query execution or by the user code. + *

+ * Managed transactions should not generally be explicitly committed (via {@link Transaction#commit()}). + * + * @param work the {@link TransactionWork} to be applied to a new read transaction. * @param config configuration for all transactions started to execute the unit of work. - * @param the return type of the given unit of work. + * @param the return type of the given unit of work. * @return a result as returned by the given unit of work. + * @deprecated superseded by {@link #executeRead(TransactionCallback, TransactionConfig)}. */ + @Deprecated T readTransaction( TransactionWork work, TransactionConfig config ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + * + * @param callback the callback representing the unit of work. + * @param config the transaction configuration for the managed transaction. + * @param the return type of the given unit of work. + * @return a result as returned by the given unit of work. + */ + T executeRead( TransactionCallback callback, TransactionConfig config ); + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + * + * @param contextConsumer the consumer representing the unit of work. + * @param config the transaction configuration for the managed transaction. + */ + default void executeReadWithoutResult( Consumer contextConsumer, TransactionConfig config ) + { + executeRead( tc -> + { + contextConsumer.accept( tc ); + return null; + }, config ); + } + /** * Execute a unit of work in a managed {@link AccessMode#WRITE write} transaction. *

- * This transaction will automatically be committed unless an exception is - * thrown during query execution or by the user code. + * This transaction will automatically be committed unless an exception is thrown during query execution or by the user code. *

- * Managed transactions should not generally be explicitly committed (via - * {@link Transaction#commit()}). + * Managed transactions should not generally be explicitly committed (via {@link Transaction#commit()}). * * @param work the {@link TransactionWork} to be applied to a new write transaction. - * @param the return type of the given unit of work. + * @param the return type of the given unit of work. * @return a result as returned by the given unit of work. + * @deprecated superseded by {@link #executeWrite(TransactionCallback)}. */ + @Deprecated T writeTransaction( TransactionWork work ); /** - * Execute a unit of work in a managed {@link AccessMode#WRITE write} transaction - * with the specified {@link TransactionConfig configuration}. + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. *

- * This transaction will automatically be committed unless an exception is - * thrown during query execution or by the user code. + * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. *

- * Managed transactions should not generally be explicitly committed (via - * {@link Transaction#commit()}). + * The provided unit of work should not return {@link Result} object. * - * @param work the {@link TransactionWork} to be applied to a new write transaction. + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a result as returned by the given unit of work. + */ + default T executeWrite( TransactionCallback callback ) + { + return executeWrite( callback, TransactionConfig.empty() ); + } + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + * + * @param contextConsumer the consumer representing the unit of work. + */ + default void executeWriteWithoutResult( Consumer contextConsumer ) + { + executeWrite( tc -> + { + contextConsumer.accept( tc ); + return null; + } ); + } + + /** + * Execute a unit of work in a managed {@link AccessMode#WRITE write} transaction with the specified {@link TransactionConfig configuration}. + *

+ * This transaction will automatically be committed unless an exception is thrown during query execution or by the user code. + *

+ * Managed transactions should not generally be explicitly committed (via {@link Transaction#commit()}). + * + * @param work the {@link TransactionWork} to be applied to a new write transaction. * @param config configuration for all transactions started to execute the unit of work. - * @param the return type of the given unit of work. + * @param the return type of the given unit of work. * @return a result as returned by the given unit of work. + * @deprecated superseded by {@link #executeWrite(TransactionCallback, TransactionConfig)}. */ + @Deprecated T writeTransaction( TransactionWork work, TransactionConfig config ); /** - * Run a query in a managed auto-commit transaction with the specified - * {@link TransactionConfig configuration}, and return a result stream. + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. * - * @param query text of a Neo4j query. + * @param callback the callback representing the unit of work. + * @param config the transaction configuration for the managed transaction. + * @param the return type of the given unit of work. + * @return a result as returned by the given unit of work. + */ + T executeWrite( TransactionCallback callback, TransactionConfig config ); + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + * + * @param contextConsumer the consumer representing the unit of work. + * @param config the transaction configuration for the managed transaction. + */ + default void executeWriteWithoutResult( Consumer contextConsumer, TransactionConfig config ) + { + executeWrite( tc -> + { + contextConsumer.accept( tc ); + return null; + }, config ); + } + + /** + * Run a query in a managed auto-commit transaction with the specified {@link TransactionConfig configuration}, and return a result stream. + * + * @param query text of a Neo4j query. * @param config configuration for the new transaction. * @return a stream of result values and associated metadata. */ - Result run(String query, TransactionConfig config ); + Result run( String query, TransactionConfig config ); /** * Run a query with parameters in a managed auto-commit transaction with the diff --git a/driver/src/main/java/org/neo4j/driver/SimpleQueryRunner.java b/driver/src/main/java/org/neo4j/driver/SimpleQueryRunner.java new file mode 100644 index 0000000000..8e3101e766 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/SimpleQueryRunner.java @@ -0,0 +1,148 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver; + +import java.util.Map; + +/** + * Common interface for components that can execute Neo4j queries. + * + *

Important notes on semantics

+ *

+ * queries run in the same {@link QueryRunner} are guaranteed to execute in order, meaning changes made by one query will be seen by all subsequent queries in + * the same {@link QueryRunner}. + *

+ * However, to allow handling very large results, and to improve performance, result streams are retrieved lazily from the network. This means that when any of + * {@link #run(Query)} methods return a result, the query has only started executing - it may not have completed yet. Most of the time, you will not notice + * this, because the driver automatically waits for queries to complete at specific points to fulfill its contracts. + *

+ * Specifically, the driver will ensure all outstanding queries are completed whenever you: + * + *

    + *
  • Read from or discard a result, for instance via + * {@link Result#next()} or {@link Result#consume()}
  • + *
  • Explicitly commit/rollback a transaction using blocking {@link Transaction#close()}
  • + *
  • Close a session using blocking {@link Session#close()}
  • + *
+ *

+ * As noted, most of the time, you will not need to consider this - your writes will + * always be durably stored as long as you either use the results, explicitly commit + * {@link Transaction transactions} or close the session you utilised using {@link Session#close()}. + *

+ * While these semantics introduce some complexity, it gives the driver the ability + * to handle infinite result streams (like subscribing to events), significantly lowers + * the memory overhead for your application and improves performance. + * + * @see Session + * @see Transaction + * @since 5.0 + */ +public interface SimpleQueryRunner +{ + /** + * Run a query and return a result stream. + *

+ * This method takes a set of parameters that will be injected into the query by Neo4j. Using parameters is highly encouraged, it helps avoid dangerous + * cypher injection attacks and improves database performance as Neo4j can re-use query plans more often. + *

+ * This particular method takes a {@link Value} as its input. This is useful if you want to take a map-like value that you've gotten from a prior result and + * send it back as parameters. + *

+ * If you are creating parameters programmatically, {@link #run(String, Map)} might be more helpful, it converts your map to a {@link Value} for you. + * + *

Example

+ *
+     * {@code
+     *
+     * Result result = session.run( "MATCH (n) WHERE n.name = $myNameParam RETURN (n)",
+     *                                       Values.parameters( "myNameParam", "Bob" ) );
+     * }
+     * 
+ * + * @param query text of a Neo4j query + * @param parameters input parameters, should be a map Value, see {@link Values#parameters(Object...)}. + * @return a stream of result values and associated metadata + */ + Result run( String query, Value parameters ); + + /** + * Run a query and return a result stream. + *

+ * This method takes a set of parameters that will be injected into the query by Neo4j. Using parameters is highly encouraged, it helps avoid dangerous + * cypher injection attacks and improves database performance as Neo4j can re-use query plans more often. + *

+ * This version of run takes a {@link Map} of parameters. The values in the map must be values that can be converted to Neo4j types. See {@link + * Values#parameters(Object...)} for a list of allowed types. + * + *

Example

+ *
+     * {@code
+     *
+     * Map parameters = new HashMap();
+     * parameters.put("myNameParam", "Bob");
+     *
+     * Result result = session.run( "MATCH (n) WHERE n.name = $myNameParam RETURN (n)",
+     *                                       parameters );
+     * }
+     * 
+ * + * @param query text of a Neo4j query + * @param parameters input data for the query + * @return a stream of result values and associated metadata + */ + Result run( String query, Map parameters ); + + /** + * Run a query and return a result stream. + *

+ * This method takes a set of parameters that will be injected into the query by Neo4j. Using parameters is highly encouraged, it helps avoid dangerous + * cypher injection attacks and improves database performance as Neo4j can re-use query plans more often. + *

+ * This version of run takes a {@link Record} of parameters, which can be useful if you want to use the output of one query as input for another. + * + * @param query text of a Neo4j query + * @param parameters input data for the query + * @return a stream of result values and associated metadata + */ + Result run( String query, Record parameters ); + + /** + * Run a query and return a result stream. + * + * @param query text of a Neo4j query + * @return a stream of result values and associated metadata + */ + Result run( String query ); + + /** + * Run a query and return a result stream. + *

Example

+ *
+     * {@code
+     *
+     * Query query = new Query( "MATCH (n) WHERE n.name = $myNameParam RETURN n.age" );
+     * Result result = session.run( query.withParameters( Values.parameters( "myNameParam", "Bob" )  ) );
+     * }
+     * 
+ * + * @param query a Neo4j query + * @return a stream of result values and associated metadata + */ + Result run( Query query ); +} diff --git a/driver/src/main/java/org/neo4j/driver/TransactionCallback.java b/driver/src/main/java/org/neo4j/driver/TransactionCallback.java new file mode 100644 index 0000000000..1378957831 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/TransactionCallback.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver; + +public interface TransactionCallback +{ + T execute( TransactionContext context ); +} diff --git a/driver/src/main/java/org/neo4j/driver/TransactionContext.java b/driver/src/main/java/org/neo4j/driver/TransactionContext.java new file mode 100644 index 0000000000..42d7e06746 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/TransactionContext.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver; + +public interface TransactionContext extends SimpleQueryRunner +{ +} diff --git a/driver/src/main/java/org/neo4j/driver/TransactionWork.java b/driver/src/main/java/org/neo4j/driver/TransactionWork.java index f78b0a7cc7..eba2913e24 100644 --- a/driver/src/main/java/org/neo4j/driver/TransactionWork.java +++ b/driver/src/main/java/org/neo4j/driver/TransactionWork.java @@ -25,6 +25,7 @@ * * @param the return type of this work. */ +@Deprecated public interface TransactionWork { /** diff --git a/driver/src/main/java/org/neo4j/driver/async/AsyncSession.java b/driver/src/main/java/org/neo4j/driver/async/AsyncSession.java index a925612066..e391f54e2c 100644 --- a/driver/src/main/java/org/neo4j/driver/async/AsyncSession.java +++ b/driver/src/main/java/org/neo4j/driver/async/AsyncSession.java @@ -22,14 +22,16 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; +import java.util.function.Consumer; import java.util.function.Function; import org.neo4j.driver.AccessMode; +import org.neo4j.driver.Bookmark; import org.neo4j.driver.Query; +import org.neo4j.driver.Result; import org.neo4j.driver.Transaction; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.Values; -import org.neo4j.driver.Bookmark; /** * Provides a context of work for database interactions. @@ -133,9 +135,54 @@ public interface AsyncSession extends AsyncQueryRunner * @param the return type of the given unit of work. * @return a {@link CompletionStage completion stage} completed with the same result as returned by the given * unit of work. Stage can be completed exceptionally if given work or commit fails. + * @deprecated superseded by {@link #executeReadAsync(AsyncTransactionCallback)}. */ + @Deprecated CompletionStage readTransactionAsync( AsyncTransactionWork> work ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a completion stage that completes successfully with the result of the unit of work on success or completes exceptionally otherwise. + */ + default CompletionStage executeReadAsync( AsyncTransactionCallback> callback ) + { + return executeReadAsync( callback, TransactionConfig.empty() ); + } + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param contextConsumer the consumer representing the unit of work. + * @return a completion stage that completes successfully on success or exceptionally otherwise. + */ + default CompletionStage executeReadWithoutResultAsync( Consumer contextConsumer ) + { + return executeReadAsync( tc -> + { + contextConsumer.accept( tc ); + return null; + } ); + } + /** * Execute given unit of asynchronous work in a {@link AccessMode#READ read} asynchronous transaction with * the specified {@link TransactionConfig configuration}. @@ -158,9 +205,53 @@ public interface AsyncSession extends AsyncQueryRunner * @param the return type of the given unit of work. * @return a {@link CompletionStage completion stage} completed with the same result as returned by the given * unit of work. Stage can be completed exceptionally if given work or commit fails. + * @deprecated superseded by {@link #executeReadAsync(AsyncTransactionCallback, TransactionConfig)}. */ + @Deprecated CompletionStage readTransactionAsync( AsyncTransactionWork> work, TransactionConfig config ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @param the return type of the given unit of work. + * @return a completion stage that completes successfully with the result of the unit of work on success or completes exceptionally otherwise. + */ + CompletionStage executeReadAsync( AsyncTransactionCallback> callback, TransactionConfig config ); + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param contextConsumer the consumer representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @return a completion stage that completes successfully on success or exceptionally otherwise. + */ + default CompletionStage executeReadWithoutResultAsync( Consumer contextConsumer, TransactionConfig config ) + { + return executeReadAsync( tc -> + { + contextConsumer.accept( tc ); + return null; + }, config ); + } + /** * Execute given unit of asynchronous work in a {@link AccessMode#WRITE write} asynchronous transaction. *

@@ -181,9 +272,54 @@ public interface AsyncSession extends AsyncQueryRunner * @param the return type of the given unit of work. * @return a {@link CompletionStage completion stage} completed with the same result as returned by the given * unit of work. Stage can be completed exceptionally if given work or commit fails. + * @deprecated superseded by {@link #executeWriteAsync(AsyncTransactionCallback)}. */ + @Deprecated CompletionStage writeTransactionAsync( AsyncTransactionWork> work ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a completion stage that completes successfully with the result of the unit of work on success or completes exceptionally otherwise. + */ + default CompletionStage executeWriteAsync( AsyncTransactionCallback> callback ) + { + return executeWriteAsync( callback, TransactionConfig.empty() ); + } + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param contextConsumer the consumer representing the unit of work. + * @return a completion stage that completes successfully on success or exceptionally otherwise. + */ + default CompletionStage executeWriteWithoutResultAsync( Consumer contextConsumer ) + { + return executeWriteAsync( tc -> + { + contextConsumer.accept( tc ); + return null; + } ); + } + /** * Execute given unit of asynchronous work in a {@link AccessMode#WRITE write} asynchronous transaction with * the specified {@link TransactionConfig configuration}. @@ -206,9 +342,53 @@ public interface AsyncSession extends AsyncQueryRunner * @param the return type of the given unit of work. * @return a {@link CompletionStage completion stage} completed with the same result as returned by the given * unit of work. Stage can be completed exceptionally if given work or commit fails. + * @deprecated superseded by {@link #executeWriteAsync(AsyncTransactionCallback, TransactionConfig)}. */ + @Deprecated CompletionStage writeTransactionAsync( AsyncTransactionWork> work, TransactionConfig config ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @param the return type of the given unit of work. + * @return a completion stage that completes successfully with the result of the unit of work on success or completes exceptionally otherwise. + */ + CompletionStage executeWriteAsync( AsyncTransactionCallback> callback, TransactionConfig config ); + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param contextConsumer the consumer representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @return a completion stage that completes successfully on success or exceptionally otherwise. + */ + default CompletionStage executeWriteWithoutResultAsync( Consumer contextConsumer, TransactionConfig config ) + { + return executeWriteAsync( tc -> + { + contextConsumer.accept( tc ); + return null; + }, config ); + } + /** * Run a query asynchronously in an auto-commit transaction with the specified {@link TransactionConfig configuration} and return a * {@link CompletionStage} with a result cursor. diff --git a/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionCallback.java b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionCallback.java new file mode 100644 index 0000000000..b93f214ff2 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionCallback.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.async; + +public interface AsyncTransactionCallback +{ + T execute( AsyncTransactionContext tx ); +} diff --git a/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionContext.java b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionContext.java new file mode 100644 index 0000000000..f9acb7d4c6 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionContext.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.async; + +public interface AsyncTransactionContext extends AsyncQueryRunner +{ +} diff --git a/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionWork.java b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionWork.java index fc9a29cb91..4197922b91 100644 --- a/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionWork.java +++ b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionWork.java @@ -26,6 +26,7 @@ * @param the return type of this work. * @since 4.0 */ +@Deprecated public interface AsyncTransactionWork { /** diff --git a/driver/src/main/java/org/neo4j/driver/internal/DelegatingTransactionContext.java b/driver/src/main/java/org/neo4j/driver/internal/DelegatingTransactionContext.java new file mode 100644 index 0000000000..874d6e36f1 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/DelegatingTransactionContext.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import java.util.Map; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Result; +import org.neo4j.driver.Transaction; +import org.neo4j.driver.TransactionContext; +import org.neo4j.driver.Value; + +final class DelegatingTransactionContext implements TransactionContext +{ + private final Transaction delegate; + + public DelegatingTransactionContext( Transaction delegate ) + { + this.delegate = delegate; + } + + @Override + public Result run( String query, Value parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public Result run( String query, Map parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public Result run( String query, Record parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public Result run( String query ) + { + return delegate.run( query ); + } + + @Override + public Result run( Query query ) + { + return delegate.run( query ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java index e11133ddee..33008f71c4 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java @@ -26,6 +26,7 @@ import org.neo4j.driver.Result; import org.neo4j.driver.Session; import org.neo4j.driver.Transaction; +import org.neo4j.driver.TransactionCallback; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.TransactionWork; import org.neo4j.driver.async.ResultCursor; @@ -112,6 +113,12 @@ public T readTransaction( TransactionWork work, TransactionConfig config return transaction( AccessMode.READ, work, config ); } + @Override + public T executeRead( TransactionCallback callback, TransactionConfig config ) + { + return readTransaction( tx -> callback.execute( new DelegatingTransactionContext( tx ) ), config ); + } + @Override public T writeTransaction( TransactionWork work ) { @@ -124,6 +131,12 @@ public T writeTransaction( TransactionWork work, TransactionConfig config return transaction( AccessMode.WRITE, work, config ); } + @Override + public T executeWrite( TransactionCallback callback, TransactionConfig config ) + { + return writeTransaction( tx -> callback.execute( new DelegatingTransactionContext( tx ) ), config ); + } + @Override public Bookmark lastBookmark() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContext.java b/driver/src/main/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContext.java new file mode 100644 index 0000000000..f422df85f6 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContext.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async; + +import java.util.Map; +import java.util.concurrent.CompletionStage; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Value; +import org.neo4j.driver.async.AsyncTransaction; +import org.neo4j.driver.async.AsyncTransactionContext; +import org.neo4j.driver.async.ResultCursor; + +final class DelegatingAsyncTransactionContext implements AsyncTransactionContext +{ + private final AsyncTransaction delegate; + + public DelegatingAsyncTransactionContext( AsyncTransaction delegate ) + { + this.delegate = delegate; + } + + @Override + public CompletionStage runAsync( String query, Value parameters ) + { + return delegate.runAsync( query, parameters ); + } + + @Override + public CompletionStage runAsync( String query, Map parameters ) + { + return delegate.runAsync( query, parameters ); + } + + @Override + public CompletionStage runAsync( String query, Record parameters ) + { + return delegate.runAsync( query, parameters ); + } + + @Override + public CompletionStage runAsync( String query ) + { + return delegate.runAsync( query ); + } + + @Override + public CompletionStage runAsync( Query query ) + { + return delegate.runAsync( query ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java index efc291933b..23c12fded6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java @@ -28,6 +28,7 @@ import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.async.AsyncSession; import org.neo4j.driver.async.AsyncTransaction; +import org.neo4j.driver.async.AsyncTransactionCallback; import org.neo4j.driver.async.AsyncTransactionWork; import org.neo4j.driver.async.ResultCursor; import org.neo4j.driver.internal.util.Futures; @@ -99,6 +100,12 @@ public CompletionStage readTransactionAsync( AsyncTransactionWork CompletionStage executeReadAsync( AsyncTransactionCallback> callback, TransactionConfig config ) + { + return readTransactionAsync( tx -> callback.execute( new DelegatingAsyncTransactionContext( tx ) ), config ); + } + @Override public CompletionStage writeTransactionAsync( AsyncTransactionWork> work ) { @@ -111,6 +118,12 @@ public CompletionStage writeTransactionAsync( AsyncTransactionWork CompletionStage executeWriteAsync( AsyncTransactionCallback> callback, TransactionConfig config ) + { + return writeTransactionAsync( tx -> callback.execute( new DelegatingAsyncTransactionContext( tx ) ), config ); + } + @Override public Bookmark lastBookmark() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContext.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContext.java new file mode 100644 index 0000000000..d78116230b --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContext.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.reactive; + +import java.util.Map; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Value; +import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxTransaction; +import org.neo4j.driver.reactive.RxTransactionContext; + +final class DelegatingRxTransactionContext implements RxTransactionContext +{ + private final RxTransaction delegate; + + public DelegatingRxTransactionContext( RxTransaction delegate ) + { + this.delegate = delegate; + } + + @Override + public RxResult run( String query, Value parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public RxResult run( String query, Map parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public RxResult run( String query, Record parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public RxResult run( String query ) + { + return delegate.run( query ); + } + + @Override + public RxResult run( Query query ) + { + return delegate.run( query ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java index 1907ace63f..0403a736b9 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java @@ -35,6 +35,7 @@ import org.neo4j.driver.reactive.RxResult; import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.reactive.RxTransaction; +import org.neo4j.driver.reactive.RxTransactionCallback; import org.neo4j.driver.reactive.RxTransactionWork; import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher; @@ -115,6 +116,12 @@ public Publisher readTransaction( RxTransactionWork Publisher executeRead( RxTransactionCallback> callback, TransactionConfig config ) + { + return readTransaction( tx -> callback.execute( new DelegatingRxTransactionContext( tx ) ), config ); + } + @Override public Publisher writeTransaction( RxTransactionWork> work ) { @@ -127,6 +134,12 @@ public Publisher writeTransaction( RxTransactionWork Publisher executeWrite( RxTransactionCallback> callback, TransactionConfig config ) + { + return writeTransaction( tx -> callback.execute( new DelegatingRxTransactionContext( tx ) ), config ); + } + private Publisher runTransaction( AccessMode mode, RxTransactionWork> work, TransactionConfig config ) { Flux repeatableWork = Flux.usingWhen( beginTransaction( mode, config ), work::execute, @@ -135,7 +148,7 @@ private Publisher runTransaction( AccessMode mode, RxTransactionWork the return type of the given unit of work. * @return a {@link Publisher publisher} completed with the same result as returned by the given unit of work. * publisher can be completed exceptionally if given work or commit fails. + * @deprecated superseded by {@link #executeRead(RxTransactionCallback)}. * */ + @Deprecated Publisher readTransaction( RxTransactionWork> work ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a publisher that emits the result of the unit of work and success signals on success or error otherwise. + */ + default Publisher executeRead( RxTransactionCallback> callback ) + { + return executeRead( callback, TransactionConfig.empty() ); + } + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param contextConsumer the callback representing the unit of work. + * @return a publisher that emits success signal on success or error otherwise. + */ + default Publisher executeReadWithoutResult( Consumer contextConsumer ) + { + return executeRead( tc -> + { + contextConsumer.accept( tc ); + return null; + } ); + } + /** * Execute given unit of reactive work in a {@link AccessMode#READ read} reactive transaction with * the specified {@link TransactionConfig configuration}. @@ -104,10 +152,53 @@ public interface RxSession extends RxQueryRunner * @param the return type of the given unit of work. * @return a {@link Publisher publisher} completed with the same result as returned by the given unit of work. * publisher can be completed exceptionally if given work or commit fails. - * + * @deprecated superseded by {@link #executeRead(RxTransactionCallback, TransactionConfig)}. */ + @Deprecated Publisher readTransaction( RxTransactionWork> work, TransactionConfig config ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @param the return type of the given unit of work. + * @return a publisher that emits the result of the unit of work and success signals on success or error otherwise. + */ + Publisher executeRead( RxTransactionCallback> callback, TransactionConfig config ); + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param contextConsumer the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @return a publisher that emits success signal on success or error otherwise. + */ + default Publisher executeReadWithoutResult( Consumer contextConsumer, TransactionConfig config ) + { + return executeRead( tc -> + { + contextConsumer.accept( tc ); + return null; + }, config ); + } + /** * Execute given unit of reactive work in a {@link AccessMode#WRITE write} reactive transaction.

@@ -125,10 +216,54 @@ public interface RxSession extends RxQueryRunner * @param the return type of the given unit of work. * @return a {@link Publisher publisher} completed with the same result as returned by the given unit of work. * publisher can be completed exceptionally if given work or commit fails. - * + * @deprecated superseded by {@link #executeWrite(RxTransactionCallback)}. */ + @Deprecated Publisher writeTransaction( RxTransactionWork> work ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a publisher that emits the result of the unit of work and success signals on success or error otherwise. + */ + default Publisher executeWrite( RxTransactionCallback> callback ) + { + return executeWrite( callback, TransactionConfig.empty() ); + } + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param contextConsumer the callback representing the unit of work. + * @return a publisher that emits success signal on success or error otherwise. + */ + default Publisher executeWriteWithoutResult( Consumer contextConsumer ) + { + return executeWrite( tc -> + { + contextConsumer.accept( tc ); + return null; + } ); + } + /** * Execute given unit of reactive work in a {@link AccessMode#WRITE write} reactive transaction with * the specified {@link TransactionConfig configuration}. @@ -148,20 +283,62 @@ public interface RxSession extends RxQueryRunner * @param the return type of the given unit of work. * @return a {@link Publisher publisher} completed with the same result as returned by the given unit of work. * publisher can be completed exceptionally if given work or commit fails. - * + * @deprecated superseded by {@link #executeWrite(RxTransactionCallback, TransactionConfig)}. */ + @Deprecated Publisher writeTransaction( RxTransactionWork> work, TransactionConfig config ); /** - * Run a query with parameters in an auto-commit transaction with specified {@link TransactionConfig} and return a reactive result stream. - * The query is not executed when the reactive result is returned. - * Instead, the publishers in the result will actually start the execution of the query. + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. * - * @param query text of a Neo4j query. + * @param callback the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @param the return type of the given unit of work. + * @return a publisher that emits the result of the unit of work and success signals on success or error otherwise. + */ + Publisher executeWrite( RxTransactionCallback> callback, TransactionConfig config ); + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param contextConsumer the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @return a publisher that emits success signal on success or error otherwise. + */ + default Publisher executeWriteWithoutResult( Consumer contextConsumer, TransactionConfig config ) + { + return executeWrite( tc -> + { + contextConsumer.accept( tc ); + return null; + }, config ); + } + + /** + * Run a query with parameters in an auto-commit transaction with specified {@link TransactionConfig} and return a reactive result stream. The query is not + * executed when the reactive result is returned. Instead, the publishers in the result will actually start the execution of the query. + * + * @param query text of a Neo4j query. * @param config configuration for the new transaction. * @return a reactive result. */ - RxResult run(String query, TransactionConfig config ); + RxResult run( String query, TransactionConfig config ); /** * Run a query with parameters in an auto-commit transaction with specified {@link TransactionConfig} and return a reactive result stream. diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionCallback.java b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionCallback.java new file mode 100644 index 0000000000..e26ad842c7 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionCallback.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.reactive; + +public interface RxTransactionCallback +{ + T execute( RxTransactionContext tx ); +} diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionContext.java b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionContext.java new file mode 100644 index 0000000000..6e5d762407 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionContext.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.reactive; + +public interface RxTransactionContext extends RxQueryRunner +{ +} diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java index ef68a8118f..e7b4980853 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java @@ -26,6 +26,7 @@ * @param the return type of this work. * @since 4.0 */ +@Deprecated public interface RxTransactionWork { /** diff --git a/driver/src/test/java/org/neo4j/driver/util/SessionExtension.java b/driver/src/test/java/org/neo4j/driver/util/SessionExtension.java index e1c85057ea..f0d94b968d 100644 --- a/driver/src/test/java/org/neo4j/driver/util/SessionExtension.java +++ b/driver/src/test/java/org/neo4j/driver/util/SessionExtension.java @@ -30,6 +30,7 @@ import org.neo4j.driver.Result; import org.neo4j.driver.Session; import org.neo4j.driver.Transaction; +import org.neo4j.driver.TransactionCallback; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.TransactionWork; import org.neo4j.driver.Value; @@ -94,6 +95,12 @@ public T readTransaction( TransactionWork work, TransactionConfig config return realSession.readTransaction( work, config ); } + @Override + public T executeRead( TransactionCallback callback, TransactionConfig config ) + { + return realSession.executeRead( callback, config ); + } + @Override public T writeTransaction( TransactionWork work ) { @@ -106,6 +113,12 @@ public T writeTransaction( TransactionWork work, TransactionConfig config return realSession.writeTransaction( work, config ); } + @Override + public T executeWrite( TransactionCallback callback, TransactionConfig config ) + { + return realSession.executeWrite( callback, config ); + } + @Override public Bookmark lastBookmark() { @@ -113,9 +126,9 @@ public Bookmark lastBookmark() } @Override - public Result run(String query, Map parameters) + public Result run( String query, Map parameters ) { - return realSession.run(query, parameters); + return realSession.run( query, parameters ); } @Override