Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revisit write operations #725

Merged
merged 17 commits into from
May 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-Present Pivotal Software Inc, All Rights Reserved.
* Copyright (c) 2011-2019 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,14 +44,10 @@ ext {
}
}

gradleVersion = '3.3'
gradleScriptDir = "${rootProject.projectDir}/gradle"

reactorCoreVersion = "3.2.9.BUILD-SNAPSHOT"

// Languages
groovyVersion = '2.4.1'

// Logging
slf4jVersion = '1.7.12'
logbackVersion = '1.1.2'
Expand Down
5 changes: 0 additions & 5 deletions gradle/setup.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@
* limitations under the License.
*/

task wrapper(type: Wrapper, description: "Create a Gradle self-download wrapper") {
group = 'Project Setup'
gradleVersion = "$gradleVersion"
}

configure(rootProject) { subproject ->
apply from: "$gradleScriptDir/doc.gradle"
apply plugin: 'propdeps-maven'
Expand Down
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
#
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.2-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-5.4-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
18 changes: 17 additions & 1 deletion gradlew
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
#!/usr/bin/env sh

#
# Copyright 2015 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

##############################################################################
##
## Gradle start up script for UN*X
Expand Down Expand Up @@ -28,7 +44,7 @@ APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`

# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS=""
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'

# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
Expand Down
18 changes: 17 additions & 1 deletion gradlew.bat
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
@rem
@rem Copyright 2015 the original author or authors.
@rem
@rem Licensed under the Apache License, Version 2.0 (the "License");
@rem you may not use this file except in compliance with the License.
@rem You may obtain a copy of the License at
@rem
@rem http://www.apache.org/licenses/LICENSE-2.0
@rem
@rem Unless required by applicable law or agreed to in writing, software
@rem distributed under the License is distributed on an "AS IS" BASIS,
@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
@rem

@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
Expand All @@ -14,7 +30,7 @@ set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%

@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS=
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"

@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
Expand Down
186 changes: 26 additions & 160 deletions src/main/java/reactor/netty/FutureMono.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,17 @@
package reactor.netty;

import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoOperator;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

Expand Down Expand Up @@ -79,82 +71,51 @@ public static <F extends Future<Void>> Mono<Void> deferFuture(Supplier<F> deferr
}

/**
* Write the passed {@link Publisher} and return a disposable {@link Mono}.
* <p>
* Convert a supplied {@link Future} for each subscriber into {@link Mono}.
* {@link Mono#subscribe(Subscriber)}
* will bridge to {@link Future#addListener(GenericFutureListener)}.
*
* In addition, current method allows interaction with downstream context, so it
* may be transferred to implicitly connected upstream
* <p>
*
* Example:
* <p>
*
* <pre><code>
* Flux&lt;String&gt; dataStream = Flux.just("a", "b", "c");
* FutureMono.deferFutureWithContext((subscriberContext) ->
* context().channel()
* .writeAndFlush(PublisherContext.withContext(dataStream, subscriberContext)));
* FutureMono.deferFutureWithContext((subscriberContext) -> channel.writeAndFlush(subscriberContext.get("data"));
* </code></pre>
*
* @param dataStream the publisher to write
* @param deferredFuture the future to evaluate and convert from
* @param <F> the future type
*
* @return A {@link Mono} forwarding {@link Future} success, failure and cancel
* @return A {@link Mono} forwarding {@link Future} success or failure
*/
public static Mono<Void> disposableWriteAndFlush(Channel channel,
Publisher<?> dataStream) {
return new DeferredWriteMono(channel, dataStream);
public static <F extends Future<Void>> Mono<Void> deferFutureWithContext(Function<Context, F> deferredFuture) {
return new DeferredContextFutureMono<>(deferredFuture);
}

/**
* Convert a supplied {@link Future} for each subscriber into {@link Mono}.
* {@link Mono#subscribe(Subscriber)}
* will bridge to {@link Future#addListener(GenericFutureListener)}.
*
* Write the passed {@link Publisher} and return a disposable {@link Mono}.
* <p>
* In addition, current method allows interaction with downstream context, so it
* may be transferred to implicitly connected upstream
*
* <p>
* Example:
*
* <p>
* <pre><code>
* Flux&lt;String&gt; dataStream = Flux.just("a", "b", "c");
* FutureMono.deferFutureWithContext((subscriberContext) ->
* context().channel()
* context().channel()
* .writeAndFlush(PublisherContext.withContext(dataStream, subscriberContext)));
* </code></pre>
*
* @param deferredFuture the future to evaluate and convert from
* @param <F> the future type
* @param dataStream the publisher to write
*
* @return A {@link Mono} forwarding {@link Future} success or failure
* @return A {@link Mono} forwarding {@link Future} success, failure and cancel
* @deprecated use {@link NettyOutbound#send(Publisher)}
*/
public static <F extends Future<Void>> Mono<Void> deferFutureWithContext(Function<Context, F> deferredFuture) {
return new DeferredContextFutureMono<>(deferredFuture);
}

static <T> Publisher<T> wrapContextAndDispose(Publisher<T> publisher, ChannelFutureSubscription cfs) {
if (publisher instanceof Callable) {
return publisher;
}
else if (publisher instanceof Flux) {
return new FluxOperator<T, T>((Flux<T>) publisher) {

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
cfs.ioActual = actual;
source.subscribe(actual);
}
};
}
else if (publisher instanceof Mono) {
return new MonoOperator<T, T>((Mono<T>) publisher) {

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
cfs.ioActual = actual;
source.subscribe(actual);
}
};
}
else {
return publisher;
}
@Deprecated
public static Mono<Void> disposableWriteAndFlush(Channel channel, Publisher<?> dataStream) {
return Mono.error(new UnsupportedOperationException("deprecated: use NettyOutbound#send(Publisher)"));
}

final static class ImmediateFutureMono<F extends Future<Void>> extends FutureMono {
Expand Down Expand Up @@ -244,11 +205,11 @@ public void subscribe(CoreSubscriber<? super Void> s) {
return;
}

if(f.isDone()){
if(f.isSuccess()){
if (f.isDone()) {
if (f.isSuccess()) {
Operators.complete(s);
}
else{
else {
Operators.error(s, f.cause());
}
return;
Expand All @@ -261,8 +222,6 @@ public void subscribe(CoreSubscriber<? super Void> s) {
}
}



final static class FutureSubscription<F extends Future<Void>>
implements GenericFutureListener<F>, Subscription, Supplier<Context> {

Expand Down Expand Up @@ -303,97 +262,4 @@ public void operationComplete(F future) {
}

}

final static class DeferredWriteMono extends FutureMono {

final Channel channel;
final Publisher<?> dataStream;

DeferredWriteMono(Channel channel, Publisher<?> dataStream) {
this.channel = Objects.requireNonNull(channel, "channel");
this.dataStream = Objects.requireNonNull(dataStream, "dataStream");
}

@Override
public void subscribe(CoreSubscriber<? super Void> s) {
ChannelFutureSubscription cfs = new ChannelFutureSubscription(channel, s);

s.onSubscribe(cfs);

channel.writeAndFlush(wrapContextAndDispose(dataStream, cfs), cfs);
}
}

final static class ChannelFutureSubscription extends DefaultChannelPromise
implements Subscription, Function<Void, Context> {

final CoreSubscriber<? super Void> actual;
CoreSubscriber<?> ioActual;

ChannelFutureSubscription(Channel channel, CoreSubscriber<? super Void> actual) {
super(channel, channel.eventLoop());
this.actual = actual;
}

@Override
public void request(long n) {
//noop
}

@Override
public Context apply(Void aVoid) {
return actual.currentContext();
}

@Override
@SuppressWarnings("unchecked")
public void cancel() {
if (!executor().inEventLoop()) {
//must defer to be sure about ioActual field (assigned on event loop)
executor().execute(this::cancel);
return;
}
CoreSubscriber<?> ioActual = this.ioActual;
this.ioActual = null;
if (ioActual instanceof Consumer) {
((Consumer<ChannelFuture>)ioActual).accept(this);
}
}

@Override
public boolean trySuccess(Void result) {
this.ioActual = null;
boolean r = super.trySuccess(result);
actual.onComplete();
return r;
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public ChannelPromise setSuccess(Void result) {
this.ioActual = null;
// Returned value is deliberately ignored
super.setSuccess(result);
actual.onComplete();
return this;
}

@Override
public boolean tryFailure(Throwable cause) {
this.ioActual = null;
boolean r = super.tryFailure(cause);
actual.onError(cause);
return r;
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public ChannelPromise setFailure(Throwable cause) {
this.ioActual = null;
// Returned value is deliberately ignored
super.setFailure(cause);
actual.onError(cause);
return this;
}
}
}
12 changes: 3 additions & 9 deletions src/main/java/reactor/netty/NettyOutbound.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,7 @@ default Mono<Void> neverComplete() {
*
* @return this {@link NettyOutbound}
*/
default NettyOutbound options(Consumer<? super NettyPipeline.SendOptions> configurator) {
return withConnection(c -> c.channel()
.pipeline()
.fireUserEventTriggered(new NettyPipeline.SendOptionsChangeEvent(configurator)));
}
NettyOutbound options(Consumer<? super NettyPipeline.SendOptions> configurator);

/**
* Sends data to the peer, listens for any error on write and closes on terminal signal
Expand All @@ -90,9 +86,7 @@ default NettyOutbound options(Consumer<? super NettyPipeline.SendOptions> config
* @return A new {@link NettyOutbound} to append further send. It will emit a complete
* signal successful sequence write (e.g. after "flush") or any error during write.
*/
default NettyOutbound send(Publisher<? extends ByteBuf> dataStream) {
return sendObject(dataStream);
}
NettyOutbound send(Publisher<? extends ByteBuf> dataStream);

/**
* Sends bytes to the peer, listens for any error on write and closes on terminal
Expand Down Expand Up @@ -277,7 +271,7 @@ default NettyOutbound sendString(Publisher<? extends String> dataStream) {
*/
default NettyOutbound sendString(Publisher<? extends String> dataStream,
Charset charset) {
return sendObject(ReactorNetty.publisherOrScalarMap(
return send(ReactorNetty.publisherOrScalarMap(
dataStream, s -> {
ByteBuf buffer = alloc().buffer();
buffer.writeCharSequence(s, charset);
Expand Down
Loading