Skip to content

Commit

Permalink
Introduce new Reactive Streams session (#1325)
Browse files Browse the repository at this point in the history
This update introduces support for reactive session with Reactive Streams types. While it is similar to the deprecated `RxSession`, it includes the improvements introduced with the `ReactiveSession` that uses Flow API types.

Sample session creation:
```
var session = driver.reactiveSession(org.neo4j.driver.reactivestreams.ReactiveSession.class);
```
  • Loading branch information
injectives authored Oct 26, 2022
1 parent 93c7b95 commit 1fc6a41
Show file tree
Hide file tree
Showing 65 changed files with 1,719 additions and 39 deletions.
12 changes: 12 additions & 0 deletions driver/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -397,4 +397,16 @@
<method>org.neo4j.driver.types.TypeSystem getDefault()</method>
</difference>

<difference>
<className>org/neo4j/driver/Driver</className>
<differenceType>7012</differenceType>
<method>org.neo4j.driver.BaseReactiveSession reactiveSession(java.lang.Class)</method>
</difference>

<difference>
<className>org/neo4j/driver/Driver</className>
<differenceType>7012</differenceType>
<method>org.neo4j.driver.BaseReactiveSession reactiveSession(java.lang.Class, org.neo4j.driver.SessionConfig)</method>
</difference>

</differences>
1 change: 1 addition & 0 deletions driver/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
exports org.neo4j.driver;
exports org.neo4j.driver.async;
exports org.neo4j.driver.reactive;
exports org.neo4j.driver.reactivestreams;
exports org.neo4j.driver.types;
exports org.neo4j.driver.summary;
exports org.neo4j.driver.net;
Expand Down
24 changes: 24 additions & 0 deletions driver/src/main/java/org/neo4j/driver/BaseReactiveSession.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver;

/**
* A base interface for reactive sessions, used by {@link Driver#reactiveSession(Class)} and {@link Driver#reactiveSession(Class, SessionConfig)}.
*/
public interface BaseReactiveSession {}
44 changes: 41 additions & 3 deletions driver/src/main/java/org/neo4j/driver/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ public interface Driver extends AutoCloseable {
*
* @return a new {@link Session} object.
*/
Session session();
default Session session() {
return session(SessionConfig.defaultConfig());
}

/**
* Create a new {@link Session} with a specified {@link SessionConfig session configuration}.
Expand Down Expand Up @@ -133,7 +135,41 @@ default ReactiveSession reactiveSession() {
* @param sessionConfig used to customize the session.
* @return a new {@link ReactiveSession} object.
*/
ReactiveSession reactiveSession(SessionConfig sessionConfig);
default ReactiveSession reactiveSession(SessionConfig sessionConfig) {
return reactiveSession(ReactiveSession.class, sessionConfig);
}

/**
* Create a new reactive session of supported type with default {@link SessionConfig session configuration}.
* <p>
* Supported types are:
* <ul>
* <li>{@link org.neo4j.driver.reactive.ReactiveSession} - reactive session using Flow API</li>
* <li>{@link org.neo4j.driver.reactivestreams.ReactiveSession} - reactive session using Reactive Streams API</li>
* </ul>
*
* @param sessionClass session type class
* @return session instance
* @param <T> session type
*/
default <T extends BaseReactiveSession> T reactiveSession(Class<T> sessionClass) {
return reactiveSession(sessionClass, SessionConfig.defaultConfig());
}

/**
* Create a new reactive session of supported type with a specified {@link SessionConfig session configuration}.
* <p>
* Supported types are:
* <ul>
* <li>{@link org.neo4j.driver.reactive.ReactiveSession} - reactive session using Flow API</li>
* <li>{@link org.neo4j.driver.reactivestreams.ReactiveSession} - reactive session using Reactive Streams API</li>
* </ul>
*
* @param sessionClass session type class
* @return session instance
* @param <T> session type
*/
<T extends BaseReactiveSession> T reactiveSession(Class<T> sessionClass, SessionConfig sessionConfig);

/**
* Create a new general purpose {@link AsyncSession} with default {@link SessionConfig session configuration}. The {@link AsyncSession} provides an
Expand All @@ -143,7 +179,9 @@ default ReactiveSession reactiveSession() {
*
* @return a new {@link AsyncSession} object.
*/
AsyncSession asyncSession();
default AsyncSession asyncSession() {
return asyncSession(SessionConfig.defaultConfig());
}

/**
* Create a new {@link AsyncSession} with a specified {@link SessionConfig session configuration}.
Expand Down
28 changes: 14 additions & 14 deletions driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
*/
package org.neo4j.driver.internal;

import static java.util.Objects.requireNonNull;
import static org.neo4j.driver.internal.util.Futures.completedWithNull;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import org.neo4j.driver.BaseReactiveSession;
import org.neo4j.driver.Driver;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
Expand All @@ -33,12 +35,10 @@
import org.neo4j.driver.internal.async.NetworkSession;
import org.neo4j.driver.internal.metrics.DevNullMetricsProvider;
import org.neo4j.driver.internal.metrics.MetricsProvider;
import org.neo4j.driver.internal.reactive.InternalReactiveSession;
import org.neo4j.driver.internal.reactive.InternalRxSession;
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.types.InternalTypeSystem;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.reactive.ReactiveSession;
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.types.TypeSystem;

Expand All @@ -61,11 +61,6 @@ public class InternalDriver implements Driver {
this.log = logging.getLog(getClass());
}

@Override
public Session session() {
return new InternalSession(newSession(SessionConfig.defaultConfig()));
}

@Override
public Session session(SessionConfig sessionConfig) {
return new InternalSession(newSession(sessionConfig));
Expand All @@ -77,14 +72,19 @@ public RxSession rxSession(SessionConfig sessionConfig) {
return new InternalRxSession(newSession(sessionConfig));
}

@SuppressWarnings({"deprecation", "unchecked"})
@Override
public ReactiveSession reactiveSession(SessionConfig sessionConfig) {
return new InternalReactiveSession(newSession(sessionConfig));
}

@Override
public AsyncSession asyncSession() {
return new InternalAsyncSession(newSession(SessionConfig.defaultConfig()));
public <T extends BaseReactiveSession> T reactiveSession(Class<T> sessionClass, SessionConfig sessionConfig) {
requireNonNull(sessionClass, "sessionClass must not be null");
requireNonNull(sessionClass, "sessionConfig must not be null");
if (org.neo4j.driver.reactive.ReactiveSession.class.isAssignableFrom(sessionClass)) {
return (T) new org.neo4j.driver.internal.reactive.InternalReactiveSession(newSession(sessionConfig));
} else if (org.neo4j.driver.reactivestreams.ReactiveSession.class.isAssignableFrom(sessionClass)) {
return (T) new org.neo4j.driver.internal.reactivestreams.InternalReactiveSession(newSession(sessionConfig));
} else {
throw new IllegalArgumentException(
String.format("Unsupported session type '%s'", sessionClass.getCanonicalName()));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

abstract class AbstractReactiveSession<S> {
public abstract class AbstractReactiveSession<S> {
protected final NetworkSession session;

public AbstractReactiveSession(NetworkSession session) {
Expand All @@ -45,15 +45,15 @@ public AbstractReactiveSession(NetworkSession session) {
this.session = session;
}

abstract S createTransaction(UnmanagedTransaction unmanagedTransaction);
protected abstract S createTransaction(UnmanagedTransaction unmanagedTransaction);

abstract Publisher<Void> closeTransaction(S transaction, boolean commit);
protected abstract Publisher<Void> closeTransaction(S transaction, boolean commit);

Publisher<S> doBeginTransaction(TransactionConfig config) {
return doBeginTransaction(config, null);
}

Publisher<S> doBeginTransaction(TransactionConfig config, String txType) {
protected Publisher<S> doBeginTransaction(TransactionConfig config, String txType) {
return createSingleItemPublisher(
() -> {
CompletableFuture<S> txFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -87,7 +87,7 @@ Publisher<S> beginTransaction(AccessMode mode, TransactionConfig config) {
"Unexpected condition, begin transaction call has completed successfully with transaction being null"));
}

<T> Publisher<T> runTransaction(
protected <T> Publisher<T> runTransaction(
AccessMode mode, Function<S, ? extends Publisher<T>> work, TransactionConfig config) {
Flux<T> repeatableWork = Flux.usingWhen(
beginTransaction(mode, config),
Expand Down Expand Up @@ -119,7 +119,7 @@ public Set<Bookmark> lastBookmarks() {
return session.lastBookmarks();
}

<T> Publisher<T> doClose() {
protected <T> Publisher<T> doClose() {
return createEmptyPublisher(session::closeAsync);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,30 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

abstract class AbstractReactiveTransaction {
public abstract class AbstractReactiveTransaction {
protected final UnmanagedTransaction tx;

protected AbstractReactiveTransaction(UnmanagedTransaction tx) {
this.tx = tx;
}

<T> Publisher<T> doCommit() {
protected <T> Publisher<T> doCommit() {
return createEmptyPublisher(tx::commitAsync);
}

<T> Publisher<T> doRollback() {
protected <T> Publisher<T> doRollback() {
return createEmptyPublisher(tx::rollbackAsync);
}

Publisher<Void> doClose() {
protected Publisher<Void> doClose() {
return close(false);
}

Publisher<Boolean> doIsOpen() {
protected Publisher<Boolean> doIsOpen() {
return Mono.just(tx.isOpen());
}

Publisher<Void> close(boolean commit) {
public Publisher<Void> close(boolean commit) {
return createEmptyPublisher(() -> tx.closeAsync(commit));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ public InternalReactiveSession(NetworkSession session) {
}

@Override
ReactiveTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) {
protected ReactiveTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) {
return new InternalReactiveTransaction(unmanagedTransaction);
}

@Override
org.reactivestreams.Publisher<Void> closeTransaction(ReactiveTransaction transaction, boolean commit) {
protected org.reactivestreams.Publisher<Void> closeTransaction(ReactiveTransaction transaction, boolean commit) {
return ((InternalReactiveTransaction) transaction).close(commit);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ public InternalRxSession(NetworkSession session) {
}

@Override
RxTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) {
protected RxTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) {
return new InternalRxTransaction(unmanagedTransaction);
}

@Override
Publisher<Void> closeTransaction(RxTransaction transaction, boolean commit) {
protected Publisher<Void> closeTransaction(RxTransaction transaction, boolean commit) {
return ((InternalRxTransaction) transaction).close(commit);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.reactivestreams;

import java.util.Map;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.Value;
import org.neo4j.driver.Values;
import org.neo4j.driver.internal.util.Extract;
import org.neo4j.driver.internal.value.MapValue;
import org.neo4j.driver.reactivestreams.ReactiveQueryRunner;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

interface BaseReactiveQueryRunner extends ReactiveQueryRunner {
@Override
default Publisher<ReactiveResult> run(String queryStr, Value parameters) {
try {
Query query = new Query(queryStr, parameters);
return run(query);
} catch (Throwable t) {
return Mono.error(t);
}
}

@Override
default Publisher<ReactiveResult> run(String query, Map<String, Object> parameters) {
return run(query, parameters(parameters));
}

@Override
default Publisher<ReactiveResult> run(String query, Record parameters) {
return run(query, parameters(parameters));
}

@Override
default Publisher<ReactiveResult> run(String queryStr) {
try {
Query query = new Query(queryStr);
return run(query);
} catch (Throwable t) {
return Mono.error(t);
}
}

static Value parameters(Record record) {
return record == null ? Values.EmptyMap : parameters(record.asMap());
}

static Value parameters(Map<String, Object> map) {
if (map == null || map.isEmpty()) {
return Values.EmptyMap;
}
return new MapValue(Extract.mapOfValues(map));
}
}
Loading

0 comments on commit 1fc6a41

Please sign in to comment.