diff --git a/driver/clirr-ignored-differences.xml b/driver/clirr-ignored-differences.xml index 28c94b0a5a..4004ebf51a 100644 --- a/driver/clirr-ignored-differences.xml +++ b/driver/clirr-ignored-differences.xml @@ -397,4 +397,16 @@ org.neo4j.driver.types.TypeSystem getDefault() + + org/neo4j/driver/Driver + 7012 + org.neo4j.driver.BaseReactiveSession reactiveSession(java.lang.Class) + + + + org/neo4j/driver/Driver + 7012 + org.neo4j.driver.BaseReactiveSession reactiveSession(java.lang.Class, org.neo4j.driver.SessionConfig) + + diff --git a/driver/src/main/java/module-info.java b/driver/src/main/java/module-info.java index 368891ed49..ba5b0b1ce9 100644 --- a/driver/src/main/java/module-info.java +++ b/driver/src/main/java/module-info.java @@ -21,6 +21,7 @@ exports org.neo4j.driver; exports org.neo4j.driver.async; exports org.neo4j.driver.reactive; + exports org.neo4j.driver.reactivestreams; exports org.neo4j.driver.types; exports org.neo4j.driver.summary; exports org.neo4j.driver.net; diff --git a/driver/src/main/java/org/neo4j/driver/BaseReactiveSession.java b/driver/src/main/java/org/neo4j/driver/BaseReactiveSession.java new file mode 100644 index 0000000000..92ddb9f8e6 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/BaseReactiveSession.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver; + +/** + * A base interface for reactive sessions, used by {@link Driver#reactiveSession(Class)} and {@link Driver#reactiveSession(Class, SessionConfig)}. + */ +public interface BaseReactiveSession {} diff --git a/driver/src/main/java/org/neo4j/driver/Driver.java b/driver/src/main/java/org/neo4j/driver/Driver.java index 05e4584a7e..1b0fdf08fd 100644 --- a/driver/src/main/java/org/neo4j/driver/Driver.java +++ b/driver/src/main/java/org/neo4j/driver/Driver.java @@ -77,7 +77,9 @@ public interface Driver extends AutoCloseable { * * @return a new {@link Session} object. */ - Session session(); + default Session session() { + return session(SessionConfig.defaultConfig()); + } /** * Create a new {@link Session} with a specified {@link SessionConfig session configuration}. @@ -133,7 +135,41 @@ default ReactiveSession reactiveSession() { * @param sessionConfig used to customize the session. * @return a new {@link ReactiveSession} object. */ - ReactiveSession reactiveSession(SessionConfig sessionConfig); + default ReactiveSession reactiveSession(SessionConfig sessionConfig) { + return reactiveSession(ReactiveSession.class, sessionConfig); + } + + /** + * Create a new reactive session of supported type with default {@link SessionConfig session configuration}. + *

+ * Supported types are: + *

+ * + * @param sessionClass session type class + * @return session instance + * @param session type + */ + default T reactiveSession(Class sessionClass) { + return reactiveSession(sessionClass, SessionConfig.defaultConfig()); + } + + /** + * Create a new reactive session of supported type with a specified {@link SessionConfig session configuration}. + *

+ * Supported types are: + *

    + *
  • {@link org.neo4j.driver.reactive.ReactiveSession} - reactive session using Flow API
  • + *
  • {@link org.neo4j.driver.reactivestreams.ReactiveSession} - reactive session using Reactive Streams API
  • + *
+ * + * @param sessionClass session type class + * @return session instance + * @param session type + */ + T reactiveSession(Class sessionClass, SessionConfig sessionConfig); /** * Create a new general purpose {@link AsyncSession} with default {@link SessionConfig session configuration}. The {@link AsyncSession} provides an @@ -143,7 +179,9 @@ default ReactiveSession reactiveSession() { * * @return a new {@link AsyncSession} object. */ - AsyncSession asyncSession(); + default AsyncSession asyncSession() { + return asyncSession(SessionConfig.defaultConfig()); + } /** * Create a new {@link AsyncSession} with a specified {@link SessionConfig session configuration}. diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java index 5e047c3219..2ca80ee182 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java @@ -18,10 +18,12 @@ */ package org.neo4j.driver.internal; +import static java.util.Objects.requireNonNull; import static org.neo4j.driver.internal.util.Futures.completedWithNull; import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicBoolean; +import org.neo4j.driver.BaseReactiveSession; import org.neo4j.driver.Driver; import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; @@ -33,12 +35,10 @@ import org.neo4j.driver.internal.async.NetworkSession; import org.neo4j.driver.internal.metrics.DevNullMetricsProvider; import org.neo4j.driver.internal.metrics.MetricsProvider; -import org.neo4j.driver.internal.reactive.InternalReactiveSession; import org.neo4j.driver.internal.reactive.InternalRxSession; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.types.InternalTypeSystem; import org.neo4j.driver.internal.util.Futures; -import org.neo4j.driver.reactive.ReactiveSession; import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.types.TypeSystem; @@ -61,11 +61,6 @@ public class InternalDriver implements Driver { this.log = logging.getLog(getClass()); } - @Override - public Session session() { - return new InternalSession(newSession(SessionConfig.defaultConfig())); - } - @Override public Session session(SessionConfig sessionConfig) { return new InternalSession(newSession(sessionConfig)); @@ -77,14 +72,19 @@ public RxSession rxSession(SessionConfig sessionConfig) { return new InternalRxSession(newSession(sessionConfig)); } + @SuppressWarnings({"deprecation", "unchecked"}) @Override - public ReactiveSession reactiveSession(SessionConfig sessionConfig) { - return new InternalReactiveSession(newSession(sessionConfig)); - } - - @Override - public AsyncSession asyncSession() { - return new InternalAsyncSession(newSession(SessionConfig.defaultConfig())); + public T reactiveSession(Class sessionClass, SessionConfig sessionConfig) { + requireNonNull(sessionClass, "sessionClass must not be null"); + requireNonNull(sessionClass, "sessionConfig must not be null"); + if (org.neo4j.driver.reactive.ReactiveSession.class.isAssignableFrom(sessionClass)) { + return (T) new org.neo4j.driver.internal.reactive.InternalReactiveSession(newSession(sessionConfig)); + } else if (org.neo4j.driver.reactivestreams.ReactiveSession.class.isAssignableFrom(sessionClass)) { + return (T) new org.neo4j.driver.internal.reactivestreams.InternalReactiveSession(newSession(sessionConfig)); + } else { + throw new IllegalArgumentException( + String.format("Unsupported session type '%s'", sessionClass.getCanonicalName())); + } } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java index 9f09995a5c..5821accf9d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java @@ -34,7 +34,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; -abstract class AbstractReactiveSession { +public abstract class AbstractReactiveSession { protected final NetworkSession session; public AbstractReactiveSession(NetworkSession session) { @@ -45,15 +45,15 @@ public AbstractReactiveSession(NetworkSession session) { this.session = session; } - abstract S createTransaction(UnmanagedTransaction unmanagedTransaction); + protected abstract S createTransaction(UnmanagedTransaction unmanagedTransaction); - abstract Publisher closeTransaction(S transaction, boolean commit); + protected abstract Publisher closeTransaction(S transaction, boolean commit); Publisher doBeginTransaction(TransactionConfig config) { return doBeginTransaction(config, null); } - Publisher doBeginTransaction(TransactionConfig config, String txType) { + protected Publisher doBeginTransaction(TransactionConfig config, String txType) { return createSingleItemPublisher( () -> { CompletableFuture txFuture = new CompletableFuture<>(); @@ -87,7 +87,7 @@ Publisher beginTransaction(AccessMode mode, TransactionConfig config) { "Unexpected condition, begin transaction call has completed successfully with transaction being null")); } - Publisher runTransaction( + protected Publisher runTransaction( AccessMode mode, Function> work, TransactionConfig config) { Flux repeatableWork = Flux.usingWhen( beginTransaction(mode, config), @@ -119,7 +119,7 @@ public Set lastBookmarks() { return session.lastBookmarks(); } - Publisher doClose() { + protected Publisher doClose() { return createEmptyPublisher(session::closeAsync); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveTransaction.java index 97939c87db..74d2d9b24a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveTransaction.java @@ -24,30 +24,30 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; -abstract class AbstractReactiveTransaction { +public abstract class AbstractReactiveTransaction { protected final UnmanagedTransaction tx; protected AbstractReactiveTransaction(UnmanagedTransaction tx) { this.tx = tx; } - Publisher doCommit() { + protected Publisher doCommit() { return createEmptyPublisher(tx::commitAsync); } - Publisher doRollback() { + protected Publisher doRollback() { return createEmptyPublisher(tx::rollbackAsync); } - Publisher doClose() { + protected Publisher doClose() { return close(false); } - Publisher doIsOpen() { + protected Publisher doIsOpen() { return Mono.just(tx.isOpen()); } - Publisher close(boolean commit) { + public Publisher close(boolean commit) { return createEmptyPublisher(() -> tx.closeAsync(commit)); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java index b84fc49462..efea7b8490 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java @@ -46,12 +46,12 @@ public InternalReactiveSession(NetworkSession session) { } @Override - ReactiveTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) { + protected ReactiveTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) { return new InternalReactiveTransaction(unmanagedTransaction); } @Override - org.reactivestreams.Publisher closeTransaction(ReactiveTransaction transaction, boolean commit) { + protected org.reactivestreams.Publisher closeTransaction(ReactiveTransaction transaction, boolean commit) { return ((InternalReactiveTransaction) transaction).close(commit); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java index 52aa03337e..7dd371c34a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java @@ -43,12 +43,12 @@ public InternalRxSession(NetworkSession session) { } @Override - RxTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) { + protected RxTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) { return new InternalRxTransaction(unmanagedTransaction); } @Override - Publisher closeTransaction(RxTransaction transaction, boolean commit) { + protected Publisher closeTransaction(RxTransaction transaction, boolean commit) { return ((InternalRxTransaction) transaction).close(commit); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/BaseReactiveQueryRunner.java b/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/BaseReactiveQueryRunner.java new file mode 100644 index 0000000000..a16491aebf --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/BaseReactiveQueryRunner.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.reactivestreams; + +import java.util.Map; +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Value; +import org.neo4j.driver.Values; +import org.neo4j.driver.internal.util.Extract; +import org.neo4j.driver.internal.value.MapValue; +import org.neo4j.driver.reactivestreams.ReactiveQueryRunner; +import org.neo4j.driver.reactivestreams.ReactiveResult; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; + +interface BaseReactiveQueryRunner extends ReactiveQueryRunner { + @Override + default Publisher run(String queryStr, Value parameters) { + try { + Query query = new Query(queryStr, parameters); + return run(query); + } catch (Throwable t) { + return Mono.error(t); + } + } + + @Override + default Publisher run(String query, Map parameters) { + return run(query, parameters(parameters)); + } + + @Override + default Publisher run(String query, Record parameters) { + return run(query, parameters(parameters)); + } + + @Override + default Publisher run(String queryStr) { + try { + Query query = new Query(queryStr); + return run(query); + } catch (Throwable t) { + return Mono.error(t); + } + } + + static Value parameters(Record record) { + return record == null ? Values.EmptyMap : parameters(record.asMap()); + } + + static Value parameters(Map map) { + if (map == null || map.isEmpty()) { + return Values.EmptyMap; + } + return new MapValue(Extract.mapOfValues(map)); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/DelegatingReactiveTransactionContext.java b/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/DelegatingReactiveTransactionContext.java new file mode 100644 index 0000000000..a69d5836ef --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/DelegatingReactiveTransactionContext.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.reactivestreams; + +import java.util.Map; +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Value; +import org.neo4j.driver.reactivestreams.ReactiveResult; +import org.neo4j.driver.reactivestreams.ReactiveTransaction; +import org.neo4j.driver.reactivestreams.ReactiveTransactionContext; +import org.reactivestreams.Publisher; + +public final class DelegatingReactiveTransactionContext implements ReactiveTransactionContext { + private final ReactiveTransaction delegate; + + public DelegatingReactiveTransactionContext(ReactiveTransaction delegate) { + this.delegate = delegate; + } + + @Override + public Publisher run(String query, Value parameters) { + return delegate.run(query, parameters); + } + + @Override + public Publisher run(String query, Map parameters) { + return delegate.run(query, parameters); + } + + @Override + public Publisher run(String query, Record parameters) { + return delegate.run(query, parameters); + } + + @Override + public Publisher run(String query) { + return delegate.run(query); + } + + @Override + public Publisher run(Query query) { + return delegate.run(query); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveResult.java b/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveResult.java new file mode 100644 index 0000000000..42d596b784 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveResult.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.reactivestreams; + +import static org.neo4j.driver.internal.util.ErrorUtil.newResultConsumedError; +import static reactor.core.publisher.FluxSink.OverflowStrategy.IGNORE; + +import java.util.List; +import java.util.function.BiConsumer; +import org.neo4j.driver.Record; +import org.neo4j.driver.internal.cursor.RxResultCursor; +import org.neo4j.driver.internal.util.Futures; +import org.neo4j.driver.reactivestreams.ReactiveResult; +import org.neo4j.driver.summary.ResultSummary; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; + +public class InternalReactiveResult implements ReactiveResult { + private final RxResultCursor cursor; + + public InternalReactiveResult(RxResultCursor cursor) { + this.cursor = cursor; + } + + @Override + public List keys() { + return cursor.keys(); + } + + @Override + public Publisher records() { + return Flux.create( + sink -> { + if (cursor.isDone()) { + sink.error(newResultConsumedError()); + } else { + cursor.installRecordConsumer(createRecordConsumer(sink)); + sink.onCancel(cursor::cancel); + sink.onRequest(cursor::request); + } + }, + IGNORE); + } + + @Override + public Publisher consume() { + return Mono.create(sink -> cursor.summaryAsync().whenComplete((summary, summaryCompletionError) -> { + Throwable error = Futures.completionExceptionCause(summaryCompletionError); + if (summary != null) { + sink.success(summary); + } else { + sink.error(error); + } + })); + } + + @Override + public Publisher isOpen() { + return Mono.just(!cursor.isDone()); + } + + /** + * Defines how a subscriber shall consume records. A record consumer holds a reference to a subscriber. A publisher and/or a subscription who holds a + * reference to this consumer shall release the reference to this object after subscription is done or cancelled so that the subscriber can be garbage + * collected. + * + * @param sink the subscriber + * @return a record consumer. + */ + private BiConsumer createRecordConsumer(FluxSink sink) { + return (r, e) -> { + if (r != null) { + sink.next(r); + } else if (e != null) { + sink.error(e); + } else { + sink.complete(); + } + }; + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveSession.java new file mode 100644 index 0000000000..032377254c --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveSession.java @@ -0,0 +1,121 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.reactivestreams; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletionStage; +import org.neo4j.driver.AccessMode; +import org.neo4j.driver.Bookmark; +import org.neo4j.driver.Query; +import org.neo4j.driver.TransactionConfig; +import org.neo4j.driver.internal.async.NetworkSession; +import org.neo4j.driver.internal.async.UnmanagedTransaction; +import org.neo4j.driver.internal.cursor.RxResultCursor; +import org.neo4j.driver.internal.reactive.AbstractReactiveSession; +import org.neo4j.driver.internal.util.Futures; +import org.neo4j.driver.reactivestreams.ReactiveResult; +import org.neo4j.driver.reactivestreams.ReactiveSession; +import org.neo4j.driver.reactivestreams.ReactiveTransaction; +import org.neo4j.driver.reactivestreams.ReactiveTransactionCallback; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; + +public class InternalReactiveSession extends AbstractReactiveSession + implements ReactiveSession, BaseReactiveQueryRunner { + public InternalReactiveSession(NetworkSession session) { + super(session); + } + + @Override + public ReactiveTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) { + return new InternalReactiveTransaction(unmanagedTransaction); + } + + @Override + public Publisher closeTransaction(ReactiveTransaction transaction, boolean commit) { + return ((InternalReactiveTransaction) transaction).close(commit); + } + + @Override + public Publisher beginTransaction(TransactionConfig config) { + return beginTransaction(config, null); + } + + public Publisher beginTransaction(TransactionConfig config, String txType) { + return doBeginTransaction(config, txType); + } + + @Override + public Publisher executeRead( + ReactiveTransactionCallback> callback, TransactionConfig config) { + return runTransaction( + AccessMode.READ, tx -> callback.execute(new DelegatingReactiveTransactionContext(tx)), config); + } + + @Override + public Publisher executeWrite( + ReactiveTransactionCallback> callback, TransactionConfig config) { + return runTransaction( + AccessMode.WRITE, tx -> callback.execute(new DelegatingReactiveTransactionContext(tx)), config); + } + + @Override + public Publisher run(Query query) { + return run(query, TransactionConfig.empty()); + } + + @Override + public Publisher run(Query query, TransactionConfig config) { + CompletionStage cursorStage; + try { + cursorStage = session.runRx(query, config); + } catch (Throwable t) { + cursorStage = Futures.failedFuture(t); + } + + return Mono.fromCompletionStage(cursorStage) + .onErrorResume(error -> Mono.fromCompletionStage(session.releaseConnectionAsync()) + .onErrorMap(releaseError -> Futures.combineErrors(error, releaseError)) + .then(Mono.error(error))) + .flatMap(cursor -> { + Mono publisher; + Throwable runError = cursor.getRunError(); + if (runError != null) { + publisher = Mono.fromCompletionStage(session.releaseConnectionAsync()) + .onErrorMap(releaseError -> Futures.combineErrors(runError, releaseError)) + .then(Mono.error(runError)); + } else { + publisher = Mono.just(cursor); + } + return publisher; + }) + .map(InternalReactiveResult::new); + } + + @Override + public Set lastBookmarks() { + return new HashSet<>(session.lastBookmarks()); + } + + @Override + public Publisher close() { + return doClose(); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveTransaction.java new file mode 100644 index 0000000000..b91419e6cf --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/reactivestreams/InternalReactiveTransaction.java @@ -0,0 +1,92 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.reactivestreams; + +import java.util.concurrent.CompletionStage; +import org.neo4j.driver.Query; +import org.neo4j.driver.internal.async.UnmanagedTransaction; +import org.neo4j.driver.internal.cursor.RxResultCursor; +import org.neo4j.driver.internal.reactive.AbstractReactiveTransaction; +import org.neo4j.driver.internal.util.Futures; +import org.neo4j.driver.reactivestreams.ReactiveResult; +import org.neo4j.driver.reactivestreams.ReactiveTransaction; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; + +public class InternalReactiveTransaction extends AbstractReactiveTransaction + implements ReactiveTransaction, BaseReactiveQueryRunner { + protected InternalReactiveTransaction(UnmanagedTransaction tx) { + super(tx); + } + + @Override + public Publisher run(Query query) { + CompletionStage cursorStage; + try { + cursorStage = tx.runRx(query); + } catch (Throwable t) { + cursorStage = Futures.failedFuture(t); + } + + return Mono.fromCompletionStage(cursorStage) + .flatMap(cursor -> { + Mono publisher; + Throwable runError = cursor.getRunError(); + if (runError != null) { + publisher = Mono.error(runError); + tx.markTerminated(runError); + } else { + publisher = Mono.just(cursor); + } + return publisher; + }) + .map(InternalReactiveResult::new); + } + + /** + * Marks transaction as terminated and sends {@code RESET} message over allocated connection. + *

+ * THIS METHOD IS NOT PART OF PUBLIC API. This method may be changed or removed at any moment in time. + * + * @return {@code RESET} response publisher + */ + public Publisher interrupt() { + return Mono.fromCompletionStage(tx.interruptAsync()); + } + + @Override + public Publisher commit() { + return doCommit(); + } + + @Override + public Publisher rollback() { + return doRollback(); + } + + @Override + public Publisher close() { + return doClose(); + } + + @Override + public Publisher isOpen() { + return doIsOpen(); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/reactive/ReactiveSession.java b/driver/src/main/java/org/neo4j/driver/reactive/ReactiveSession.java index 6b220a643e..82ca5ddb4a 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/ReactiveSession.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/ReactiveSession.java @@ -23,6 +23,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.Flow.Publisher; import org.neo4j.driver.AccessMode; +import org.neo4j.driver.BaseReactiveSession; import org.neo4j.driver.Bookmark; import org.neo4j.driver.Query; import org.neo4j.driver.Result; @@ -39,7 +40,7 @@ * @see Publisher * @since 5.0 */ -public interface ReactiveSession extends ReactiveQueryRunner { +public interface ReactiveSession extends BaseReactiveSession, ReactiveQueryRunner { /** * Begin a new unmanaged {@linkplain ReactiveTransaction transaction}. At most one transaction may exist in a session at any point in time. To * maintain multiple concurrent transactions, use multiple concurrent sessions. diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxQueryRunner.java b/driver/src/main/java/org/neo4j/driver/reactive/RxQueryRunner.java index 2757f82e49..f529822d50 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/RxQueryRunner.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxQueryRunner.java @@ -31,7 +31,7 @@ * @see RxSession * @see RxTransaction * @since 4.0 - * @deprecated superseded by {@link ReactiveQueryRunner} + * @deprecated superseded by {@link org.neo4j.driver.reactive.ReactiveQueryRunner} and {@link org.neo4j.driver.reactivestreams.ReactiveQueryRunner} */ @Deprecated public interface RxQueryRunner { diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxResult.java b/driver/src/main/java/org/neo4j/driver/reactive/RxResult.java index b245c8fa27..e44dcf3a9f 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/RxResult.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxResult.java @@ -38,7 +38,7 @@ * @see Subscriber * @see Subscription * @since 4.0 - * @deprecated superseded by {@link ReactiveResult} + * @deprecated superseded by {@link org.neo4j.driver.reactive.ReactiveResult} and {@link org.neo4j.driver.reactivestreams.ReactiveResult} */ @Deprecated public interface RxResult { diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxSession.java b/driver/src/main/java/org/neo4j/driver/reactive/RxSession.java index 3d2583e4ca..abfa4825b4 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/RxSession.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxSession.java @@ -35,7 +35,7 @@ * @see RxTransaction * @see Publisher * @since 4.0 - * @deprecated superseded by {@link ReactiveSession} + * @deprecated superseded by {@link org.neo4j.driver.reactive.ReactiveSession} and {@link org.neo4j.driver.reactivestreams.ReactiveSession} */ @Deprecated public interface RxSession extends RxQueryRunner { diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxTransaction.java b/driver/src/main/java/org/neo4j/driver/reactive/RxTransaction.java index aa5f4cff7a..473b24f526 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/RxTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxTransaction.java @@ -27,7 +27,7 @@ * @see RxSession * @see Publisher * @since 4.0 - * @deprecated superseded by {@link ReactiveTransaction} + * @deprecated superseded by {@link org.neo4j.driver.reactive.ReactiveTransaction} and {@link org.neo4j.driver.reactivestreams.ReactiveTransaction} */ @Deprecated public interface RxTransaction extends RxQueryRunner { diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java index 82bfbbca90..43a1640406 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java @@ -24,7 +24,7 @@ * * @param the return type of this work * @since 4.0 - * @deprecated superseded by {@link ReactiveTransactionCallback} + * @deprecated superseded by {@link org.neo4j.driver.reactive.ReactiveTransactionCallback} and {@link org.neo4j.driver.reactivestreams.ReactiveTransactionCallback} */ @Deprecated public interface RxTransactionWork { diff --git a/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveQueryRunner.java b/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveQueryRunner.java new file mode 100644 index 0000000000..6b7e7c5c51 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveQueryRunner.java @@ -0,0 +1,112 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.reactivestreams; + +import java.util.Map; +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Value; +import org.neo4j.driver.Values; +import org.reactivestreams.Publisher; + +/** + * Common interface for components that can execute Neo4j queries using Reactive API. + * + * @see ReactiveSession + * @see ReactiveTransaction + * @since 5.2 + */ +public interface ReactiveQueryRunner { + /** + * Register running of a query and return a publisher of {@link ReactiveResult}. + *

+ * Invoking this method will result in a Bolt RUN message exchange with server and the returned publisher will either emit an instance of {@link + * ReactiveResult} on success or an error otherwise. + *

+ * This method takes a set of parameters that will be injected into the query by Neo4j. Using parameters is highly encouraged, it helps avoid dangerous + * cypher injection attacks and improves database performance as Neo4j can re-use query plans more often. + *

+ * This particular method takes a {@link Value} as its input. This is useful if you want to take a map-like value that you've gotten from a prior result and + * send it back as parameters. + *

+ * If you are creating parameters programmatically, {@link #run(String, Map)} might be more helpful, it converts your map to a {@link Value} for you. + * + * @param query text of a Neo4j query + * @param parameters input parameters, should be a map Value, see {@link Values#parameters(Object...)}. + * @return a publisher of reactive result. + */ + Publisher run(String query, Value parameters); + + /** + * Register running of a query and return a publisher of {@link ReactiveResult}. + *

+ * Invoking this method will result in a Bolt RUN message exchange with server and the returned publisher will either emit an instance of {@link + * ReactiveResult} on success or an error otherwise. + *

+ * This method takes a set of parameters that will be injected into the query by Neo4j. Using parameters is highly encouraged, it helps avoid dangerous + * cypher injection attacks and improves database performance as Neo4j can re-use query plans more often. + *

+ * This version of run takes a {@link Map} of parameters. The values in the map must be values that can be converted to Neo4j types. See {@link + * Values#parameters(Object...)} for a list of allowed types. + * + * @param query text of a Neo4j query + * @param parameters input data for the query + * @return a publisher of reactive result. + */ + Publisher run(String query, Map parameters); + + /** + * Register running of a query and return a publisher of {@link ReactiveResult}. + *

+ * Invoking this method will result in a Bolt RUN message exchange with server and the returned publisher will either emit an instance of {@link + * ReactiveResult} on success or an error otherwise. + *

+ * This method takes a set of parameters that will be injected into the query by Neo4j. Using parameters is highly encouraged, it helps avoid dangerous + * cypher injection attacks and improves database performance as Neo4j can re-use query plans more often. + *

+ * This version of run takes a {@link Record} of parameters, which can be useful if you want to use the output of one query as input for another. + * + * @param query text of a Neo4j query + * @param parameters input data for the query + * @return a publisher of reactive result. + */ + Publisher run(String query, Record parameters); + + /** + * Register running of a query and return a publisher of {@link ReactiveResult}. + *

+ * Invoking this method will result in a Bolt RUN message exchange with server and the returned publisher will either emit an instance of {@link + * ReactiveResult} on success or an error otherwise. + * + * @param query text of a Neo4j query + * @return a publisher of reactive result. + */ + Publisher run(String query); + + /** + * Register running of a query and return a publisher of {@link ReactiveResult}. + *

+ * Invoking this method will result in a Bolt RUN message exchange with server and the returned publisher will either emit an instance of {@link + * ReactiveResult} on success or an error otherwise. + * + * @param query a Neo4j query + * @return a publisher of reactive result. + */ + Publisher run(Query query); +} diff --git a/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveResult.java b/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveResult.java new file mode 100644 index 0000000000..1cb78ad552 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveResult.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.reactivestreams; + +import java.util.List; +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.exceptions.ResultConsumedException; +import org.neo4j.driver.summary.ResultSummary; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +/** + * A reactive result provides a reactive way to execute query on the server and receives records back. This reactive result consists of a result key publisher, + * a record publisher and a result summary publisher. The reactive result is created via {@link ReactiveSession#run(Query)} and {@link + * ReactiveTransaction#run(Query)} for example. On the creation of the result, the query submitted to create this result will not be executed until one of the + * publishers in this class is subscribed. The records or the summary stream has to be consumed and finished (completed or errored) to ensure the resources used + * by this result to be freed correctly. + * + * @see Publisher + * @see Subscriber + * @see Subscription + * @since 5.2 + */ +public interface ReactiveResult { + /** + * Returns a list of keys. + * + * @return a list of keys. + */ + List keys(); + + /** + * Returns a cold unicast publisher of records. + *

+ * When the record publisher is {@linkplain Publisher#subscribe(Subscriber) subscribed}, the query is executed and the result is streamed back as a record + * stream followed by a result summary. This record publisher publishes all records in the result and signals the completion. However before completion or + * error reporting if any, a cleanup of result resources such as network connection will be carried out automatically. + *

+ * Therefore the {@link Subscriber} of this record publisher shall wait for the termination signal (complete or error) to ensure that the resources used by + * this result are released correctly. Then the session is ready to be used to run more queries. + *

+ * Cancelling of the record streaming will immediately terminate the propagation of new records. But it will not cancel query execution on the server. When + * the execution is finished, the {@link Subscriber} will be notified with a termination signal (complete or error). + *

+ * The record publishing event by default runs in an Network IO thread, as a result no blocking operation is allowed in this thread. Otherwise network IO + * might be blocked by application logic. + *

+ * This publisher can only be subscribed by one {@link Subscriber} once. + *

+ * If this publisher is subscribed after {@link #keys()}, then the publish of records is carried out after the arrival of keys. If this publisher is + * subscribed after {@link #consume()}, then a {@link ResultConsumedException} will be thrown. + * + * @return a cold unicast publisher of records. + */ + Publisher records(); + + /** + * Returns a cold publisher of result summary which arrives after all records. + *

+ * {@linkplain Publisher#subscribe(Subscriber) Subscribing} the summary publisher results in the execution of the query followed by the result summary being + * returned. The summary publisher cancels record publishing if not yet subscribed and directly streams back the summary on query execution completion. As a + * result, the invocation of {@link #records()} after this method, would receive an {@link ResultConsumedException}. + *

+ * If subscribed after {@link #keys()}, then the result summary will be published after the query execution without streaming any record to client. If + * subscribed after {@link #records()}, then the result summary will be published after the query execution and the streaming of records. + *

+ * Usually, this method shall be chained after {@link #records()} to ensure that all records are processed before summary. + *

+ * This method can be subscribed multiple times. When the {@linkplain ResultSummary summary} arrives, it will be buffered locally for all subsequent calls. + * + * @return a cold publisher of result summary which only arrives after all records. + */ + Publisher consume(); + + /** + * Determine if result is open. + *

+ * Result is considered to be open if it has not been consumed ({@link #consume()}) and its creator object (e.g. session or transaction) has not been closed + * (including committed or rolled back). + *

+ * Attempts to access data on closed result will produce {@link ResultConsumedException}. + * + * @return a publisher emitting {@code true} if result is open and {@code false} otherwise. + */ + Publisher isOpen(); +} diff --git a/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveSession.java b/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveSession.java new file mode 100644 index 0000000000..46a8156243 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveSession.java @@ -0,0 +1,252 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.reactivestreams; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletionStage; +import org.neo4j.driver.AccessMode; +import org.neo4j.driver.BaseReactiveSession; +import org.neo4j.driver.Bookmark; +import org.neo4j.driver.Query; +import org.neo4j.driver.Result; +import org.neo4j.driver.Session; +import org.neo4j.driver.TransactionConfig; +import org.neo4j.driver.Values; +import org.reactivestreams.Publisher; + +/** + * A reactive session is the same as {@link Session} except it provides a reactive API. + * + * @see Session + * @see ReactiveResult + * @see ReactiveTransaction + * @see Publisher + * @since 5.2 + */ +public interface ReactiveSession extends BaseReactiveSession, ReactiveQueryRunner { + /** + * Begin a new unmanaged {@linkplain ReactiveTransaction transaction}. At most one transaction may exist in a session at any point in time. To + * maintain multiple concurrent transactions, use multiple concurrent sessions. + *

+ * It by default is executed in a Network IO thread, as a result no blocking operation is allowed in this thread. + * + * @return a new {@link ReactiveTransaction} + */ + default Publisher beginTransaction() { + return beginTransaction(TransactionConfig.empty()); + } + + /** + * Begin a new unmanaged {@linkplain ReactiveTransaction transaction} with the specified {@link TransactionConfig configuration}. At most one + * transaction may exist in a session at any point in time. To maintain multiple concurrent transactions, use multiple concurrent sessions. + *

+ * It by default is executed in a Network IO thread, as a result no blocking operation is allowed in this thread. + * + * @param config configuration for the new transaction. + * @return a new {@link ReactiveTransaction} + */ + Publisher beginTransaction(TransactionConfig config); + + /** + * Execute a unit of work as a single, managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. The transaction allows for one + * or more statements to be run. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will + * result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver. + *

+ * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a publisher that emits the result of the unit of work and success signals on success or error otherwise. + */ + default Publisher executeRead(ReactiveTransactionCallback> callback) { + return executeRead(callback, TransactionConfig.empty()); + } + + /** + * Execute a unit of work as a single, managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. The transaction allows for one + * or more statements to be run. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will + * result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver. + *

+ * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @param the return type of the given unit of work. + * @return a publisher that emits the result of the unit of work and success signals on success or error otherwise. + */ + Publisher executeRead( + ReactiveTransactionCallback> callback, TransactionConfig config); + + /** + * Execute a unit of work as a single, managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. The transaction allows for + * one or more statements to be run. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will + * result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver. + *

+ * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a publisher that emits the result of the unit of work and success signals on success or error otherwise. + */ + default Publisher executeWrite(ReactiveTransactionCallback> callback) { + return executeWrite(callback, TransactionConfig.empty()); + } + + /** + * Execute a unit of work as a single, managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. The transaction allows for + * one or more statements to be run. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. Any exception emitted by the unit of work will + * result in a rollback attempt and abortion of execution unless exception is considered to be valid for retry attempt by the driver. + *

+ * The provided unit of work should not return {@link Result} object as it won't be valid outside the scope of the transaction. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @param the return type of the given unit of work. + * @return a publisher that emits the result of the unit of work and success signals on success or error otherwise. + */ + Publisher executeWrite( + ReactiveTransactionCallback> callback, TransactionConfig config); + + /** + * Run a query with parameters in an auto-commit transaction with specified {@link TransactionConfig} and return a publisher of {@link ReactiveResult}. + *

+ * Invoking this method will result in a Bolt RUN message exchange with server and the returned publisher will either emit an instance of {@link + * ReactiveResult} on success or an error otherwise. + * + * @param query text of a Neo4j query. + * @param config configuration for the new transaction. + * @return a publisher of reactive result. + */ + default Publisher run(String query, TransactionConfig config) { + return run(new Query(query), config); + } + + /** + * Run a query with parameters in an auto-commit transaction with specified {@link TransactionConfig} and return a publisher of {@link ReactiveResult}. + *

+ * Invoking this method will result in a Bolt RUN message exchange with server and the returned publisher will either emit an instance of {@link ReactiveResult} on success or an error otherwise. + *

+ * This method takes a set of parameters that will be injected into the query by Neo4j. Using parameters is highly encouraged, it helps avoid dangerous + * cypher injection attacks and improves database performance as Neo4j can re-use query plans more often. + *

+ * This version of run takes a {@link Map} of parameters. The values in the map must be values that can be converted to Neo4j types. See {@link + * Values#parameters(Object...)} for a list of allowed types. + * + *

Example

+ *
+     * {@code
+     * Map metadata = new HashMap<>();
+     * metadata.put("type", "update name");
+     *
+     * TransactionConfig config = TransactionConfig.builder()
+     *                 .withTimeout(Duration.ofSeconds(3))
+     *                 .withMetadata(metadata)
+     *                 .build();
+     *
+     * Map parameters = new HashMap<>();
+     * parameters.put("myNameParam", "Bob");
+     *
+     * reactiveSession.run("MATCH (n) WHERE n.name = $myNameParam RETURN (n)", parameters, config);
+     * }
+     * 
+ * + * @param query text of a Neo4j query. + * @param parameters input data for the query. + * @param config configuration for the new transaction. + * @return a publisher of reactive result. + */ + default Publisher run(String query, Map parameters, TransactionConfig config) { + return run(new Query(query, parameters), config); + } + + /** + * Run a query in an auto-commit transaction with specified {@link TransactionConfig configuration} and return a publisher of {@link ReactiveResult}. + *

+ * Invoking this method will result in a Bolt RUN message exchange with server and the returned publisher will either emit an instance of {@link + * ReactiveResult} on success or an error otherwise. + * + *

Example

+ *
+     * {@code
+     * Map metadata = new HashMap<>();
+     * metadata.put("type", "update name");
+     *
+     * TransactionConfig config = TransactionConfig.builder()
+     *                 .withTimeout(Duration.ofSeconds(3))
+     *                 .withMetadata(metadata)
+     *                 .build();
+     *
+     * Query query = new Query("MATCH (n) WHERE n.name = $myNameParam RETURN n.age");
+     *
+     * reactiveSession.run(query.withParameters(Values.parameters("myNameParam", "Bob")));
+     * }
+     * 
+ * + * @param query a Neo4j query. + * @param config configuration for the new transaction. + * @return a publisher of reactive result. + */ + Publisher run(Query query, TransactionConfig config); + + /** + * Return a set of last bookmarks. + *

+ * When no new bookmark is received, the initial bookmarks are returned. This may happen when no work has been done using the session. Multivalued {@link + * Bookmark} instances will be mapped to distinct {@link Bookmark} instances. If no initial bookmarks have been provided, an empty set is returned. + * + * @return the immutable set of last bookmarks. + */ + Set lastBookmarks(); + + /** + * Signal that you are done using this session. In the default driver usage, closing and accessing sessions is very low cost. + *

+ * This operation is not needed if 1) all results created in the session have been fully consumed and 2) all transactions opened by this session have been + * either committed or rolled back. + *

+ * This method is a fallback if you failed to fulfill the two requirements above. This publisher is completed when all outstanding queries in the session + * have completed, meaning any writes you performed are guaranteed to be durably stored. It might be completed exceptionally when there are unconsumed + * errors from previous queries or transactions. + * + * @param makes it easier to be chained. + * @return an empty publisher that represents the reactive close. + */ + Publisher close(); +} diff --git a/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveTransaction.java b/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveTransaction.java new file mode 100644 index 0000000000..fc8a0700af --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveTransaction.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.reactivestreams; + +import org.neo4j.driver.Transaction; +import org.reactivestreams.Publisher; + +/** + * Same as {@link Transaction} except this reactive transaction exposes a reactive API. + * + * @see Transaction + * @see ReactiveSession + * @see Publisher + * @since 5.2 + */ +public interface ReactiveTransaction extends ReactiveQueryRunner { + /** + * Commits the transaction. It completes without publishing anything if transaction is committed successfully. Otherwise, errors when there is any error to + * commit. + * + * @param makes it easier to be chained after other publishers. + * @return an empty publisher. + */ + Publisher commit(); + + /** + * Rolls back the transaction. It completes without publishing anything if transaction is rolled back successfully. Otherwise, errors when there is any + * error to roll back. + * + * @param makes it easier to be chained after other publishers. + * @return an empty publisher. + */ + Publisher rollback(); + + /** + * Close the transaction. If the transaction has been {@link #commit() committed} or {@link #rollback() rolled back}, the close is optional and no operation + * is performed. Otherwise, the transaction will be rolled back by default by this method. + * + * @return new {@link Publisher} that gets completed when close is successful, otherwise an error is signalled. + */ + Publisher close(); + + /** + * Determine if transaction is open. + * + * @return a publisher emitting {@code true} if transaction is open and {@code false} otherwise. + */ + Publisher isOpen(); +} diff --git a/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveTransactionCallback.java b/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveTransactionCallback.java new file mode 100644 index 0000000000..15742d5060 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveTransactionCallback.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.reactivestreams; + +/** + * Callback that executes operations against a given {@link ReactiveTransactionContext}. + * + * @param the return type of this work. + * @since 5.2 + */ +public interface ReactiveTransactionCallback { + /** + * Executes all given operations against the same transaction context. + * + * @param context the transaction context to use. + * @return result object or {@code null} if none. + */ + T execute(ReactiveTransactionContext context); +} diff --git a/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveTransactionContext.java b/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveTransactionContext.java new file mode 100644 index 0000000000..e0247729e3 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/reactivestreams/ReactiveTransactionContext.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.reactivestreams; + +/** + * A context for running queries within transaction. + * + * @since 5.2 + */ +public interface ReactiveTransactionContext extends ReactiveQueryRunner {} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/ReactiveTransactionContextStreamsAdapter.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/ReactiveTransactionContextStreamsAdapter.java new file mode 100644 index 0000000000..009953c139 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/ReactiveTransactionContextStreamsAdapter.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend; + +import java.util.Map; +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Value; +import org.neo4j.driver.reactivestreams.ReactiveResult; +import org.neo4j.driver.reactivestreams.ReactiveTransaction; +import org.neo4j.driver.reactivestreams.ReactiveTransactionContext; +import org.reactivestreams.Publisher; + +public class ReactiveTransactionContextStreamsAdapter implements ReactiveTransaction { + private final ReactiveTransactionContext delegate; + + public ReactiveTransactionContextStreamsAdapter(ReactiveTransactionContext delegate) { + this.delegate = delegate; + } + + @Override + public Publisher run(String query, Value parameters) { + return delegate.run(query, parameters); + } + + @Override + public Publisher run(String query, Map parameters) { + return delegate.run(query, parameters); + } + + @Override + public Publisher run(String query, Record parameters) { + return delegate.run(query, parameters); + } + + @Override + public Publisher run(String query) { + return delegate.run(query); + } + + @Override + public Publisher run(Query query) { + return delegate.run(query); + } + + @Override + public Publisher commit() { + throw new UnsupportedOperationException("commit is not allowed on transaction context"); + } + + @Override + public Publisher rollback() { + throw new UnsupportedOperationException("rollback is not allowed on transaction context"); + } + + @Override + public Publisher close() { + throw new UnsupportedOperationException("close is not allowed on transaction context"); + } + + @Override + public Publisher isOpen() { + throw new UnsupportedOperationException("isOpen is not allowed on transaction context"); + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java index 7f811f8c10..a81a75bf32 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java @@ -40,6 +40,8 @@ public static void main(String[] args) throws InterruptedException { backendMode = TestkitRequestProcessorHandler.BackendMode.REACTIVE_LEGACY; } else if ("reactive".equals(modeArg)) { backendMode = TestkitRequestProcessorHandler.BackendMode.REACTIVE; + } else if ("reactive-streams".equals(modeArg)) { + backendMode = TestkitRequestProcessorHandler.BackendMode.REACTIVE; } else { backendMode = TestkitRequestProcessorHandler.BackendMode.SYNC; } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java index 4dbac4bdc0..c91af5ebc7 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java @@ -29,8 +29,11 @@ import neo4j.org.testkit.backend.holder.AsyncTransactionHolder; import neo4j.org.testkit.backend.holder.DriverHolder; import neo4j.org.testkit.backend.holder.ReactiveResultHolder; +import neo4j.org.testkit.backend.holder.ReactiveResultStreamsHolder; import neo4j.org.testkit.backend.holder.ReactiveSessionHolder; +import neo4j.org.testkit.backend.holder.ReactiveSessionStreamsHolder; import neo4j.org.testkit.backend.holder.ReactiveTransactionHolder; +import neo4j.org.testkit.backend.holder.ReactiveTransactionStreamsHolder; import neo4j.org.testkit.backend.holder.ResultCursorHolder; import neo4j.org.testkit.backend.holder.ResultHolder; import neo4j.org.testkit.backend.holder.RxResultHolder; @@ -60,14 +63,18 @@ public class TestkitState { private final Map sessionIdToAsyncSessionHolder = new HashMap<>(); private final Map sessionIdToRxSessionHolder = new HashMap<>(); private final Map sessionIdToReactiveSessionHolder = new HashMap<>(); + private final Map sessionIdToReactiveSessionStreamsHolder = new HashMap<>(); private final Map resultIdToResultHolder = new HashMap<>(); private final Map resultIdToResultCursorHolder = new HashMap<>(); private final Map resultIdToRxResultHolder = new HashMap<>(); private final Map resultIdToReactiveResultHolder = new HashMap<>(); + private final Map resultIdToReactiveResultStreamsHolder = new HashMap<>(); private final Map transactionIdToTransactionHolder = new HashMap<>(); private final Map transactionIdToAsyncTransactionHolder = new HashMap<>(); private final Map transactionIdToRxTransactionHolder = new HashMap<>(); private final Map transactionIdToReactiveTransactionHolder = new HashMap<>(); + private final Map transactionIdToReactiveTransactionStreamsHolder = + new HashMap<>(); private final Map bookmarkManagerIdToBookmarkManager = new HashMap<>(); @Getter @@ -129,6 +136,14 @@ public Mono getReactiveSessionHolder(String id) { return getRx(id, sessionIdToReactiveSessionHolder, SESSION_NOT_FOUND_MESSAGE); } + public String addReactiveSessionStreamsHolder(ReactiveSessionStreamsHolder sessionHolder) { + return add(sessionHolder, sessionIdToReactiveSessionStreamsHolder); + } + + public Mono getReactiveSessionStreamsHolder(String id) { + return getRx(id, sessionIdToReactiveSessionStreamsHolder, SESSION_NOT_FOUND_MESSAGE); + } + public String addTransactionHolder(TransactionHolder transactionHolder) { return add(transactionHolder, transactionIdToTransactionHolder); } @@ -161,6 +176,14 @@ public Mono getReactiveTransactionHolder(String id) { return getRx(id, transactionIdToReactiveTransactionHolder, TRANSACTION_NOT_FOUND_MESSAGE); } + public String addReactiveTransactionStreamsHolder(ReactiveTransactionStreamsHolder transactionHolder) { + return add(transactionHolder, transactionIdToReactiveTransactionStreamsHolder); + } + + public Mono getReactiveTransactionStreamsHolder(String id) { + return getRx(id, transactionIdToReactiveTransactionStreamsHolder, TRANSACTION_NOT_FOUND_MESSAGE); + } + public String addResultHolder(ResultHolder resultHolder) { return add(resultHolder, resultIdToResultHolder); } @@ -193,6 +216,14 @@ public Mono getReactiveResultHolder(String id) { return getRx(id, resultIdToReactiveResultHolder, RESULT_NOT_FOUND_MESSAGE); } + public String addReactiveResultStreamsHolder(ReactiveResultStreamsHolder resultHolder) { + return add(resultHolder, resultIdToReactiveResultStreamsHolder); + } + + public Mono getReactiveResultStreamsHolder(String id) { + return getRx(id, resultIdToReactiveResultStreamsHolder, RESULT_NOT_FOUND_MESSAGE); + } + public void addBookmarkManager(String id, BookmarkManager bookmarkManager) { bookmarkManagerIdToBookmarkManager.put(id, bookmarkManager); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/ReactiveResultStreamsHolder.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/ReactiveResultStreamsHolder.java new file mode 100644 index 0000000000..4edf583eff --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/ReactiveResultStreamsHolder.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.holder; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Getter; +import lombok.Setter; +import neo4j.org.testkit.backend.RxBufferedSubscriber; +import org.neo4j.driver.Record; +import org.neo4j.driver.reactivestreams.ReactiveResult; + +public class ReactiveResultStreamsHolder + extends AbstractResultHolder { + @Setter + private RxBufferedSubscriber subscriber; + + @Getter + private final AtomicLong requestedRecordsCounter = new AtomicLong(); + + public ReactiveResultStreamsHolder(ReactiveSessionStreamsHolder sessionHolder, ReactiveResult result) { + super(sessionHolder, result); + } + + public ReactiveResultStreamsHolder(ReactiveTransactionStreamsHolder transactionHolder, ReactiveResult result) { + super(transactionHolder, result); + } + + public Optional> getSubscriber() { + return Optional.ofNullable(subscriber); + } + + @Override + protected ReactiveSessionStreamsHolder getSessionHolder(ReactiveTransactionStreamsHolder transactionHolder) { + return transactionHolder.getSessionHolder(); + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/ReactiveSessionStreamsHolder.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/ReactiveSessionStreamsHolder.java new file mode 100644 index 0000000000..fa598800af --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/ReactiveSessionStreamsHolder.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.holder; + +import org.neo4j.driver.SessionConfig; +import org.neo4j.driver.reactivestreams.ReactiveSession; + +public class ReactiveSessionStreamsHolder extends AbstractSessionHolder { + public ReactiveSessionStreamsHolder(DriverHolder driverHolder, ReactiveSession session, SessionConfig config) { + super(driverHolder, session, config); + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/ReactiveTransactionStreamsHolder.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/ReactiveTransactionStreamsHolder.java new file mode 100644 index 0000000000..b98b321a0a --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/ReactiveTransactionStreamsHolder.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.holder; + +import org.neo4j.driver.reactivestreams.ReactiveTransaction; + +public class ReactiveTransactionStreamsHolder + extends AbstractTransactionHolder { + public ReactiveTransactionStreamsHolder( + ReactiveSessionStreamsHolder sessionHolder, ReactiveTransaction transaction) { + super(sessionHolder, transaction); + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/AbstractResultNext.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/AbstractResultNext.java index 21c5033fb5..d3a0e1d0b7 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/AbstractResultNext.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/AbstractResultNext.java @@ -24,6 +24,7 @@ import neo4j.org.testkit.backend.RxBufferedSubscriber; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.holder.ReactiveResultHolder; +import neo4j.org.testkit.backend.holder.ReactiveResultStreamsHolder; import neo4j.org.testkit.backend.holder.RxResultHolder; import neo4j.org.testkit.backend.messages.requests.TestkitRequest; import neo4j.org.testkit.backend.messages.responses.NullRecord; @@ -91,6 +92,25 @@ public Mono processReactive(TestkitState testkitState) { }); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return testkitState.getReactiveResultStreamsHolder(getResultId()).flatMap(resultHolder -> { + RxBufferedSubscriber subscriber = resultHolder + .getSubscriber() + .orElseGet(() -> { + RxBufferedSubscriber subscriberInstance = + new RxBufferedSubscriber<>(getFetchSize(resultHolder)); + resultHolder.setSubscriber(subscriberInstance); + resultHolder.getResult().records().subscribe(subscriberInstance); + return subscriberInstance; + }); + return subscriber + .next() + .map(this::createResponse) + .defaultIfEmpty(NullRecord.builder().build()); + }); + } + protected abstract neo4j.org.testkit.backend.messages.responses.TestkitResponse createResponse(Record record); protected abstract String getResultId(); @@ -124,4 +144,17 @@ private long getFetchSize(ReactiveResultHolder resultHolder) { .fetchSize()); return fetchSize == -1 ? Long.MAX_VALUE : fetchSize; } + + private long getFetchSize(ReactiveResultStreamsHolder resultHolder) { + long fetchSize = resultHolder + .getSessionHolder() + .getConfig() + .fetchSize() + .orElse(resultHolder + .getSessionHolder() + .getDriverHolder() + .getConfig() + .fetchSize()); + return fetchSize == -1 ? Long.MAX_VALUE : fetchSize; + } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/BookmarkManagerClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/BookmarkManagerClose.java index 55a8376532..c72352da83 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/BookmarkManagerClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/BookmarkManagerClose.java @@ -52,6 +52,11 @@ public Mono processReactive(TestkitState testkitState) { return Mono.just(removeBookmarkManagerAndCreateResponse(testkitState)); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return Mono.just(removeBookmarkManagerAndCreateResponse(testkitState)); + } + private BookmarkManager removeBookmarkManagerAndCreateResponse(TestkitState testkitState) { var id = data.getId(); testkitState.removeBookmarkManager(id); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckDriverIsEncrypted.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckDriverIsEncrypted.java index 32c350d043..6125959cc5 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckDriverIsEncrypted.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckDriverIsEncrypted.java @@ -53,6 +53,11 @@ public Mono processReactive(TestkitState testkitState) { return Mono.just(createResponse(testkitState)); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return processReactive(testkitState); + } + private DriverIsEncrypted createResponse(TestkitState testkitState) { DriverHolder driverHolder = testkitState.getDriverHolder(data.getDriverId()); return DriverIsEncrypted.builder() diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java index 992fae71e0..e4cd94e9d1 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java @@ -57,6 +57,11 @@ public Mono processReactive(TestkitState testkitState) { return Mono.fromCompletionStage(processAsync(testkitState)); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return processReactive(testkitState); + } + private MultiDBSupport createResponse(boolean available) { return MultiDBSupport.builder() .data(MultiDBSupport.MultiDBSupportBody.builder() diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java index 673cc91a28..950ade6158 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java @@ -56,6 +56,11 @@ public Mono processReactive(TestkitState testkitState) { return Mono.fromCompletionStage(processAsync(testkitState)); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return processReactive(testkitState); + } + private Driver createResponse() { return Driver.builder() .data(Driver.DriverBody.builder().id(data.getDriverId()).build()) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetConnectionPoolMetrics.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetConnectionPoolMetrics.java index d6786b6d61..ef521b8c4a 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetConnectionPoolMetrics.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetConnectionPoolMetrics.java @@ -58,6 +58,11 @@ public Mono processReactive(TestkitState testkitState) { return Mono.just(getConnectionPoolMetrics(testkitState)); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return processReactive(testkitState); + } + private ConnectionPoolMetrics getConnectionPoolMetrics(TestkitState testkitState) { DriverHolder driverHolder = testkitState.getDriverHolder(data.getDriverId()); Metrics metrics = driverHolder.getDriver().metrics(); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java index 854dcdc154..035821944a 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java @@ -100,6 +100,11 @@ public Mono processReactive(TestkitState testkitState) { return Mono.just(createResponse(COMMON_FEATURES)); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return Mono.just(createResponse(COMMON_FEATURES)); + } + private FeatureList createResponse(Set features) { return FeatureList.builder() .data(FeatureList.FeatureListBody.builder().features(features).build()) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java index 0359df8c2e..433c0dc955 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java @@ -89,6 +89,11 @@ public Mono processReactive(TestkitState testkitState) { return Mono.just(process(testkitState)); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return processReactive(testkitState); + } + @Setter @Getter public static class GetRoutingTableBody { diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewBookmarkManager.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewBookmarkManager.java index 8a4481c4b5..594fe78ecd 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewBookmarkManager.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewBookmarkManager.java @@ -60,6 +60,11 @@ public Mono processReactive(TestkitState testkitState) { return Mono.just(createBookmarkManagerAndResponse(testkitState)); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return Mono.just(createBookmarkManagerAndResponse(testkitState)); + } + private BookmarkManager createBookmarkManagerAndResponse(TestkitState testkitState) { var id = testkitState.newId(); var initialBookmarks = diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java index 7b4bb52afa..62217f4f1c 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java @@ -147,6 +147,11 @@ public Mono processReactive(TestkitState testkitState) { return Mono.fromCompletionStage(processAsync(testkitState)); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return processReactive(testkitState); + } + private ServerAddressResolver callbackResolver(TestkitState testkitState) { return address -> { String callbackId = testkitState.newId(); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java index a4523ab94f..e0dcd6cd2d 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java @@ -31,6 +31,7 @@ import neo4j.org.testkit.backend.holder.AsyncSessionHolder; import neo4j.org.testkit.backend.holder.DriverHolder; import neo4j.org.testkit.backend.holder.ReactiveSessionHolder; +import neo4j.org.testkit.backend.holder.ReactiveSessionStreamsHolder; import neo4j.org.testkit.backend.holder.RxSessionHolder; import neo4j.org.testkit.backend.holder.SessionHolder; import neo4j.org.testkit.backend.messages.responses.Session; @@ -68,6 +69,12 @@ public Mono processReactive(TestkitState testkitState) { testkitState, this::createReactiveSessionState, testkitState::addReactiveSessionHolder)); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return Mono.just(createSessionStateAndResponse( + testkitState, this::createReactiveSessionStreamsState, testkitState::addReactiveSessionStreamsHolder)); + } + protected TestkitResponse createSessionStateAndResponse( TestkitState testkitState, BiFunction sessionStateProducer, @@ -119,6 +126,16 @@ private ReactiveSessionHolder createReactiveSessionState(DriverHolder driverHold driverHolder, driverHolder.getDriver().reactiveSession(sessionConfig), sessionConfig); } + private ReactiveSessionStreamsHolder createReactiveSessionStreamsState( + DriverHolder driverHolder, SessionConfig sessionConfig) { + return new ReactiveSessionStreamsHolder( + driverHolder, + driverHolder + .getDriver() + .reactiveSession(org.neo4j.driver.reactivestreams.ReactiveSession.class, sessionConfig), + sessionConfig); + } + @Setter @Getter public static class NewSessionBody { diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java index 962542788a..1f5e81acad 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java @@ -85,6 +85,15 @@ public Mono processReactive(TestkitState testkitState) { .map(this::createResponse); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return testkitState + .getReactiveResultStreamsHolder(data.getResultId()) + .flatMap( + resultHolder -> Mono.fromDirect(resultHolder.getResult().consume())) + .map(this::createResponse); + } + private Summary createResponse(org.neo4j.driver.summary.ResultSummary summary) { Summary.ServerInfo serverInfo = Summary.ServerInfo.builder() .address(summary.server().address()) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultList.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultList.java index 70e0be8d95..8653560d10 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultList.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultList.java @@ -59,6 +59,11 @@ public Mono processReactive(TestkitState testkitState) { throw new UnsupportedOperationException("Operation not supported"); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + throw new UnsupportedOperationException("Operation not supported"); + } + private RecordList createResponse(List records) { List mappedRecords = records.stream() .map(record -> Record.RecordBody.builder().values(record).build()) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultPeek.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultPeek.java index c17cae0aa5..853f0edafa 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultPeek.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultPeek.java @@ -63,6 +63,11 @@ public Mono processReactive(TestkitState testkitState) { throw new UnsupportedOperationException("Operation not supported"); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + throw new UnsupportedOperationException("Operation not supported"); + } + private TestkitResponse createResponse(Record record) { return neo4j.org.testkit.backend.messages.responses.Record.builder() .data(neo4j.org.testkit.backend.messages.responses.Record.RecordBody.builder() diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultSingle.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultSingle.java index a5d47307f0..0dc5b0d9ad 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultSingle.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultSingle.java @@ -58,6 +58,11 @@ public Mono processReactive(TestkitState testkitState) { throw new UnsupportedOperationException("Single method is not supported by reactive API"); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + throw new UnsupportedOperationException("Single method is not supported by reactive API"); + } + private neo4j.org.testkit.backend.messages.responses.TestkitResponse createResponse(Record record) { return neo4j.org.testkit.backend.messages.responses.Record.builder() .data(neo4j.org.testkit.backend.messages.responses.Record.RecordBody.builder() diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java index 1734531b62..569b19e1c5 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java @@ -87,6 +87,20 @@ public Mono processReactive(TestkitState testkitState) { }); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return testkitState.getReactiveSessionStreamsHolder(data.getSessionId()).mapNotNull(sessionHolder -> { + Throwable throwable; + if (!"".equals(data.getErrorId())) { + throwable = testkitState.getErrors().get(data.getErrorId()); + } else { + throwable = new FrontendError(); + } + sessionHolder.getTxWorkFuture().completeExceptionally(throwable); + return null; + }); + } + @Setter @Getter public static class RetryableNegativeBody { diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java index 46a31c5282..73adffd899 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java @@ -65,6 +65,14 @@ public Mono processReactive(TestkitState testkitState) { }); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return testkitState.getReactiveSessionStreamsHolder(data.getSessionId()).mapNotNull(sessionHolder -> { + sessionHolder.getTxWorkFuture().complete(null); + return null; + }); + } + @Setter @Getter public static class RetryablePositiveBody { diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java index 4bbde0053c..7b8c1dcf34 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java @@ -30,6 +30,7 @@ import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.holder.AsyncTransactionHolder; import neo4j.org.testkit.backend.holder.ReactiveTransactionHolder; +import neo4j.org.testkit.backend.holder.ReactiveTransactionStreamsHolder; import neo4j.org.testkit.backend.holder.RxTransactionHolder; import neo4j.org.testkit.backend.holder.SessionHolder; import neo4j.org.testkit.backend.holder.TransactionHolder; @@ -120,6 +121,21 @@ public Mono processReactive(TestkitState testkitState) { }); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return testkitState.getReactiveSessionStreamsHolder(data.getSessionId()).flatMap(sessionHolder -> { + var session = sessionHolder.getSession(); + TransactionConfig.Builder builder = TransactionConfig.builder(); + Optional.ofNullable(data.txMeta).ifPresent(builder::withMetadata); + + configureTimeout(builder); + + return Mono.fromDirect(session.beginTransaction(builder.build())) + .map(tx -> transaction(testkitState.addReactiveTransactionStreamsHolder( + new ReactiveTransactionStreamsHolder(sessionHolder, tx)))); + }); + } + private Transaction transaction(String txId) { return Transaction.builder() .data(Transaction.TransactionBody.builder().id(txId).build()) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java index 269391fe56..560d4a64f2 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java @@ -65,6 +65,15 @@ public Mono processReactive(TestkitState testkitState) { .then(Mono.just(createResponse())); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return testkitState + .getReactiveSessionStreamsHolder(data.getSessionId()) + .flatMap(sessionHolder -> + Mono.fromDirect(sessionHolder.getSession().close())) + .then(Mono.just(createResponse())); + } + private Session createResponse() { return Session.builder() .data(Session.SessionBody.builder().id(data.getSessionId()).build()) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java index e1a5d8ecca..a4673cfe29 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java @@ -67,6 +67,14 @@ public Mono processReactive(TestkitState testkitState) { .map(this::createResponse); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return testkitState + .getReactiveSessionStreamsHolder(data.getSessionId()) + .map(sessionHolder -> sessionHolder.getSession().lastBookmarks()) + .map(this::createResponse); + } + private Bookmarks createResponse(Set bookmarks) { return Bookmarks.builder() .data(Bookmarks.BookmarksBody.builder() diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java index b657d61ed8..2d13c08e81 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java @@ -27,9 +27,11 @@ import lombok.Getter; import lombok.Setter; import neo4j.org.testkit.backend.ReactiveTransactionContextAdapter; +import neo4j.org.testkit.backend.ReactiveTransactionContextStreamsAdapter; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.holder.AsyncTransactionHolder; import neo4j.org.testkit.backend.holder.ReactiveTransactionHolder; +import neo4j.org.testkit.backend.holder.ReactiveTransactionStreamsHolder; import neo4j.org.testkit.backend.holder.RxTransactionHolder; import neo4j.org.testkit.backend.holder.SessionHolder; import neo4j.org.testkit.backend.holder.TransactionHolder; @@ -120,6 +122,26 @@ public Mono processReactive(TestkitState testkitState) { .then(Mono.just(retryableDone())); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return testkitState + .getReactiveSessionStreamsHolder(data.getSessionId()) + .flatMap(sessionHolder -> { + org.neo4j.driver.reactivestreams.ReactiveTransactionCallback> workWrapper = tx -> { + String txId = + testkitState.addReactiveTransactionStreamsHolder(new ReactiveTransactionStreamsHolder( + sessionHolder, new ReactiveTransactionContextStreamsAdapter(tx))); + testkitState.getResponseWriter().accept(retryableTry(txId)); + CompletableFuture tryResult = new CompletableFuture<>(); + sessionHolder.setTxWorkFuture(tryResult); + return Mono.fromCompletionStage(tryResult); + }; + + return Mono.fromDirect(sessionHolder.getSession().executeRead(workWrapper)); + }) + .then(Mono.just(retryableDone())); + } + @SuppressWarnings("deprecation") private TransactionWork handle(TestkitState testkitState, SessionHolder sessionHolder) { return tx -> { diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java index 967ee3453e..2e0eb237c4 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java @@ -31,6 +31,7 @@ import neo4j.org.testkit.backend.CustomDriverError; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.holder.ReactiveResultHolder; +import neo4j.org.testkit.backend.holder.ReactiveResultStreamsHolder; import neo4j.org.testkit.backend.holder.ResultCursorHolder; import neo4j.org.testkit.backend.holder.ResultHolder; import neo4j.org.testkit.backend.holder.RxResultHolder; @@ -141,6 +142,26 @@ public Mono processReactive(TestkitState testkitState) { }); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return testkitState.getReactiveSessionStreamsHolder(data.getSessionId()).flatMap(sessionHolder -> { + var session = sessionHolder.getSession(); + Query query = Optional.ofNullable(data.params) + .map(params -> new Query(data.cypher, data.params)) + .orElseGet(() -> new Query(data.cypher)); + TransactionConfig.Builder transactionConfig = TransactionConfig.builder(); + Optional.ofNullable(data.getTxMeta()).ifPresent(transactionConfig::withMetadata); + configureTimeout(transactionConfig); + + return Mono.fromDirect(session.run(query, transactionConfig.build())) + .map(result -> { + String id = testkitState.addReactiveResultStreamsHolder( + new ReactiveResultStreamsHolder(sessionHolder, result)); + return createResponse(id, result.keys()); + }); + }); + } + private Result createResponse(String resultId, List keys) { return Result.builder() .data(Result.ResultBody.builder().id(resultId).keys(keys).build()) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java index 7f26847b0a..88abf48685 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java @@ -28,9 +28,11 @@ import lombok.Getter; import lombok.Setter; import neo4j.org.testkit.backend.ReactiveTransactionContextAdapter; +import neo4j.org.testkit.backend.ReactiveTransactionContextStreamsAdapter; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.holder.AsyncTransactionHolder; import neo4j.org.testkit.backend.holder.ReactiveTransactionHolder; +import neo4j.org.testkit.backend.holder.ReactiveTransactionStreamsHolder; import neo4j.org.testkit.backend.holder.RxTransactionHolder; import neo4j.org.testkit.backend.holder.SessionHolder; import neo4j.org.testkit.backend.holder.TransactionHolder; @@ -121,6 +123,26 @@ public Mono processReactive(TestkitState testkitState) { .then(Mono.just(retryableDone())); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return testkitState + .getReactiveSessionStreamsHolder(data.getSessionId()) + .flatMap(sessionHolder -> { + org.neo4j.driver.reactivestreams.ReactiveTransactionCallback> workWrapper = tx -> { + String txId = + testkitState.addReactiveTransactionStreamsHolder(new ReactiveTransactionStreamsHolder( + sessionHolder, new ReactiveTransactionContextStreamsAdapter(tx))); + testkitState.getResponseWriter().accept(retryableTry(txId)); + CompletableFuture tryResult = new CompletableFuture<>(); + sessionHolder.setTxWorkFuture(tryResult); + return Mono.fromCompletionStage(tryResult); + }; + + return Mono.fromDirect(sessionHolder.getSession().executeWrite(workWrapper)); + }) + .then(Mono.just(retryableDone())); + } + @SuppressWarnings("deprecation") private TransactionWork handle(TestkitState testkitState, SessionHolder sessionHolder) { return tx -> { diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartSubTest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartSubTest.java index 86544c2994..8429828233 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartSubTest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartSubTest.java @@ -175,6 +175,12 @@ public Mono processReactive(TestkitState testkitState) { return Mono.just(testkitResponse); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + TestkitResponse testkitResponse = createResponse(REACTIVE_LEGACY_SKIP_PATTERN_TO_CHECK); + return Mono.just(testkitResponse); + } + private TestkitResponse createResponse(Map skipPatternToCheck) { return skipPatternToCheck.entrySet().stream() .filter(entry -> data.getTestName().matches(entry.getKey())) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java index 1521d141fa..7ecf5018cb 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java @@ -179,6 +179,15 @@ public Mono processReactive(TestkitState testkitState) { return Mono.just(testkitResponse); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + TestkitResponse testkitResponse = createSkipResponse(REACTIVE_SKIP_PATTERN_TO_REASON) + .orElseGet(() -> StartSubTest.decidePerSubTestReactive(data.getTestName()) + ? RunSubTests.builder().build() + : RunTest.builder().build()); + return Mono.just(testkitResponse); + } + private Optional createSkipResponse(Map skipPatternToReason) { return skipPatternToReason.entrySet().stream() .filter(entry -> data.getTestName().matches(entry.getKey())) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitCallbackResult.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitCallbackResult.java index b09f444103..c83c1cc7e1 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitCallbackResult.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitCallbackResult.java @@ -54,4 +54,10 @@ default Mono processReactive(TestkitState testkitState) { testkitState.getCallbackIdToFuture().get(getCallbackId()).complete(this); return Mono.empty(); } + + @Override + default Mono processReactiveStreams(TestkitState testkitState) { + testkitState.getCallbackIdToFuture().get(getCallbackId()).complete(this); + return Mono.empty(); + } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java index 73db952865..341894af75 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java @@ -71,4 +71,6 @@ public interface TestkitRequest { Mono processRx(TestkitState testkitState); Mono processReactive(TestkitState testkitState); + + Mono processReactiveStreams(TestkitState testkitState); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java index 9adb2e8999..eeb31e6b28 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java @@ -69,6 +69,15 @@ public Mono processReactive(TestkitState testkitState) { .then(Mono.just(createResponse(data.getTxId()))); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return testkitState + .getReactiveTransactionStreamsHolder(data.getTxId()) + .map(AbstractTransactionHolder::getTransaction) + .flatMap(tx -> Mono.fromDirect(tx.close())) + .then(Mono.just(createResponse(data.getTxId()))); + } + private Transaction createResponse(String txId) { return Transaction.builder() .data(Transaction.TransactionBody.builder().id(txId).build()) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java index 7299cd58e0..ec19585cc4 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java @@ -64,6 +64,14 @@ public Mono processReactive(TestkitState testkitState) { .then(Mono.just(createResponse(data.getTxId()))); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return testkitState + .getReactiveTransactionStreamsHolder(data.getTxId()) + .flatMap(tx -> Mono.fromDirect(tx.getTransaction().commit())) + .then(Mono.just(createResponse(data.getTxId()))); + } + private Transaction createResponse(String txId) { return Transaction.builder() .data(Transaction.TransactionBody.builder().id(txId).build()) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java index d4da1568df..4ca9b515a0 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java @@ -64,6 +64,14 @@ public Mono processReactive(TestkitState testkitState) { .then(Mono.just(createResponse(data.getTxId()))); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return testkitState + .getReactiveTransactionStreamsHolder(data.getTxId()) + .flatMap(tx -> Mono.fromDirect(tx.getTransaction().rollback())) + .then(Mono.just(createResponse(data.getTxId()))); + } + private Transaction createResponse(String txId) { return Transaction.builder() .data(Transaction.TransactionBody.builder().id(txId).build()) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java index 22d8da6bca..1e19c558df 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java @@ -29,6 +29,7 @@ import lombok.Setter; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.holder.ReactiveResultHolder; +import neo4j.org.testkit.backend.holder.ReactiveResultStreamsHolder; import neo4j.org.testkit.backend.holder.ResultCursorHolder; import neo4j.org.testkit.backend.holder.ResultHolder; import neo4j.org.testkit.backend.holder.RxResultHolder; @@ -96,6 +97,20 @@ public Mono processReactive(TestkitState testkitState) { }); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return testkitState.getReactiveTransactionStreamsHolder(data.getTxId()).flatMap(transactionHolder -> { + var tx = transactionHolder.getTransaction(); + Map params = data.getParams() != null ? data.getParams() : Collections.emptyMap(); + + return Mono.fromDirect(tx.run(data.getCypher(), params)).map(result -> { + String id = testkitState.addReactiveResultStreamsHolder( + new ReactiveResultStreamsHolder(transactionHolder, result)); + return createResponse(id, result.keys()); + }); + }); + } + protected Result createResponse(String resultId, List keys) { return Result.builder() .data(Result.ResultBody.builder().id(resultId).keys(keys).build()) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java index cdc5b4e4f9..0789911895 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java @@ -58,6 +58,11 @@ public Mono processReactive(TestkitState testkitState) { return Mono.fromCompletionStage(processAsync(testkitState)); } + @Override + public Mono processReactiveStreams(TestkitState testkitState) { + return processReactive(testkitState); + } + private Driver createResponse(String id) { return Driver.builder().data(Driver.DriverBody.builder().id(id).build()).build(); } diff --git a/testkit-tests/pom.xml b/testkit-tests/pom.xml index 7437177ad6..50d24606d5 100644 --- a/testkit-tests/pom.xml +++ b/testkit-tests/pom.xml @@ -26,6 +26,7 @@ %a-a %a-rl %a-r + %a-rs @@ -204,6 +205,37 @@ + + + run-testkit-reactive-streams + integration-test + + + start + + + + + tklnchr + + ${testkit.reactive.streams.name.pattern} + + ${project.build.directory}/testkit-reactive + reactive-streams + + --configs 4.0-enterprise-neo4j 4.1-enterprise-neo4j 4.2-community-bolt 4.2-community-neo4j + 4.2-enterprise-bolt 4.2-enterprise-neo4j 4.2-enterprise-cluster-neo4j 4.3-community-bolt 4.3-community-neo4j + 4.3-enterprise-bolt 4.3-enterprise-neo4j 4.3-enterprise-cluster-neo4j ${testkit.args} + + + + ${testkit.reactive.streams.name.pattern}> + + + + + + remove-testkit-launcher post-integration-test