diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java
index 3c3c4040ca..34008f1a0c 100644
--- a/src/main/java/io/reactivex/Completable.java
+++ b/src/main/java/io/reactivex/Completable.java
@@ -908,6 +908,28 @@ public final Completable andThen(CompletableSource next) {
return concatWith(next);
}
+ /**
+ * Calls the specified converter function during assembly time and returns its resulting value.
+ *
+ * This allows fluent conversion to any other type.
+ *
+ *
Scheduler:
+ *
{@code as} does not operate by default on a particular {@link Scheduler}.
+ *
+ *
+ * @param the resulting object type
+ * @param converter the function that receives the current Completable instance and returns a value
+ * @return the converted value
+ * @throws NullPointerException if converter is null
+ * @since 2.1.7 - experimental
+ */
+ @Experimental
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final R as(@NonNull CompletableConverter extends R> converter) {
+ return ObjectHelper.requireNonNull(converter, "converter is null").apply(this);
+ }
+
/**
* Subscribes to and awaits the termination of this Completable instance in a blocking manner and
* rethrows any exception emitted.
diff --git a/src/main/java/io/reactivex/CompletableConverter.java b/src/main/java/io/reactivex/CompletableConverter.java
new file mode 100644
index 0000000000..39ec9b452b
--- /dev/null
+++ b/src/main/java/io/reactivex/CompletableConverter.java
@@ -0,0 +1,35 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex;
+
+import io.reactivex.annotations.*;
+
+/**
+ * Convenience interface and callback used by the {@link Completable#as} operator to turn a Completable into another
+ * value fluently.
+ *
+ * @param the output type
+ * @since 2.1.7 - experimental
+ */
+@Experimental
+public interface CompletableConverter {
+ /**
+ * Applies a function to the upstream Completable and returns a converted value of type {@code R}.
+ *
+ * @param upstream the upstream Completable instance
+ * @return the converted value
+ */
+ @NonNull
+ R apply(@NonNull Completable upstream);
+}
diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java
index 7bc45def95..b2da55c6fa 100644
--- a/src/main/java/io/reactivex/Flowable.java
+++ b/src/main/java/io/reactivex/Flowable.java
@@ -5237,6 +5237,31 @@ public final Single any(Predicate super T> predicate) {
return RxJavaPlugins.onAssembly(new FlowableAnySingle(this, predicate));
}
+ /**
+ * Calls the specified converter function during assembly time and returns its resulting value.
+ *
+ * This allows fluent conversion to any other type.
+ *
+ *
Backpressure:
+ *
The backpressure behavior depends on what happens in the {@code converter} function.
+ *
Scheduler:
+ *
{@code as} does not operate by default on a particular {@link Scheduler}.
+ *
+ *
+ * @param the resulting object type
+ * @param converter the function that receives the current Flowable instance and returns a value
+ * @return the converted value
+ * @throws NullPointerException if converter is null
+ * @since 2.1.7 - experimental
+ */
+ @Experimental
+ @CheckReturnValue
+ @BackpressureSupport(BackpressureKind.SPECIAL)
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final R as(@NonNull FlowableConverter converter) {
+ return ObjectHelper.requireNonNull(converter, "converter is null").apply(this);
+ }
+
/**
* Returns the first item emitted by this {@code Flowable}, or throws
* {@code NoSuchElementException} if it emits no items.
diff --git a/src/main/java/io/reactivex/FlowableConverter.java b/src/main/java/io/reactivex/FlowableConverter.java
new file mode 100644
index 0000000000..541e335bcd
--- /dev/null
+++ b/src/main/java/io/reactivex/FlowableConverter.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex;
+
+import io.reactivex.annotations.*;
+
+/**
+ * Convenience interface and callback used by the {@link Flowable#as} operator to turn a Flowable into another
+ * value fluently.
+ *
+ * @param the upstream type
+ * @param the output type
+ * @since 2.1.7 - experimental
+ */
+@Experimental
+public interface FlowableConverter {
+ /**
+ * Applies a function to the upstream Flowable and returns a converted value of type {@code R}.
+ *
+ * @param upstream the upstream Flowable instance
+ * @return the converted value
+ */
+ @NonNull
+ R apply(@NonNull Flowable upstream);
+}
diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java
index 13922ed8d3..c2a88aacb3 100644
--- a/src/main/java/io/reactivex/Maybe.java
+++ b/src/main/java/io/reactivex/Maybe.java
@@ -1989,6 +1989,28 @@ public final Maybe ambWith(MaybeSource extends T> other) {
return ambArray(this, other);
}
+ /**
+ * Calls the specified converter function during assembly time and returns its resulting value.
+ *
+ * This allows fluent conversion to any other type.
+ *
+ *
Scheduler:
+ *
{@code as} does not operate by default on a particular {@link Scheduler}.
+ *
+ *
+ * @param the resulting object type
+ * @param converter the function that receives the current Maybe instance and returns a value
+ * @return the converted value
+ * @throws NullPointerException if converter is null
+ * @since 2.1.7 - experimental
+ */
+ @Experimental
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final R as(@NonNull MaybeConverter converter) {
+ return ObjectHelper.requireNonNull(converter, "converter is null").apply(this);
+ }
+
/**
* Waits in a blocking fashion until the current Maybe signals a success value (which is returned),
* null if completed or an exception (which is propagated).
diff --git a/src/main/java/io/reactivex/MaybeConverter.java b/src/main/java/io/reactivex/MaybeConverter.java
new file mode 100644
index 0000000000..e156ed5944
--- /dev/null
+++ b/src/main/java/io/reactivex/MaybeConverter.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex;
+
+import io.reactivex.annotations.*;
+
+/**
+ * Convenience interface and callback used by the {@link Maybe#as} operator to turn a Maybe into another
+ * value fluently.
+ *
+ * @param the upstream type
+ * @param the output type
+ * @since 2.1.7 - experimental
+ */
+@Experimental
+public interface MaybeConverter {
+ /**
+ * Applies a function to the upstream Maybe and returns a converted value of type {@code R}.
+ *
+ * @param upstream the upstream Maybe instance
+ * @return the converted value
+ */
+ @NonNull
+ R apply(@NonNull Maybe upstream);
+}
diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java
index a124159e07..bfa3a19e4e 100644
--- a/src/main/java/io/reactivex/Observable.java
+++ b/src/main/java/io/reactivex/Observable.java
@@ -4800,6 +4800,28 @@ public final Single any(Predicate super T> predicate) {
return RxJavaPlugins.onAssembly(new ObservableAnySingle(this, predicate));
}
+ /**
+ * Calls the specified converter function during assembly time and returns its resulting value.
+ *
+ * This allows fluent conversion to any other type.
+ *
+ *
Scheduler:
+ *
{@code as} does not operate by default on a particular {@link Scheduler}.
+ *
+ *
+ * @param the resulting object type
+ * @param converter the function that receives the current Observable instance and returns a value
+ * @return the converted value
+ * @throws NullPointerException if converter is null
+ * @since 2.1.7 - experimental
+ */
+ @Experimental
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final R as(@NonNull ObservableConverter converter) {
+ return ObjectHelper.requireNonNull(converter, "converter is null").apply(this);
+ }
+
/**
* Returns the first item emitted by this {@code Observable}, or throws
* {@code NoSuchElementException} if it emits no items.
diff --git a/src/main/java/io/reactivex/ObservableConverter.java b/src/main/java/io/reactivex/ObservableConverter.java
new file mode 100644
index 0000000000..b413de69de
--- /dev/null
+++ b/src/main/java/io/reactivex/ObservableConverter.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex;
+
+import io.reactivex.annotations.*;
+
+/**
+ * Convenience interface and callback used by the {@link Observable#as} operator to turn an Observable into another
+ * value fluently.
+ *
+ * @param the upstream type
+ * @param the output type
+ * @since 2.1.7 - experimental
+ */
+@Experimental
+public interface ObservableConverter {
+ /**
+ * Applies a function to the upstream Observable and returns a converted value of type {@code R}.
+ *
+ * @param upstream the upstream Observable instance
+ * @return the converted value
+ */
+ @NonNull
+ R apply(@NonNull Observable upstream);
+}
diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java
index 03b659a71a..72ee2872c6 100644
--- a/src/main/java/io/reactivex/Single.java
+++ b/src/main/java/io/reactivex/Single.java
@@ -1522,6 +1522,28 @@ public final Single ambWith(SingleSource extends T> other) {
return ambArray(this, other);
}
+ /**
+ * Calls the specified converter function during assembly time and returns its resulting value.
+ *
+ * This allows fluent conversion to any other type.
+ *
+ *
Scheduler:
+ *
{@code as} does not operate by default on a particular {@link Scheduler}.
+ *
+ *
+ * @param the resulting object type
+ * @param converter the function that receives the current Single instance and returns a value
+ * @return the converted value
+ * @throws NullPointerException if converter is null
+ * @since 2.1.7 - experimental
+ */
+ @Experimental
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final R as(@NonNull SingleConverter converter) {
+ return ObjectHelper.requireNonNull(converter, "converter is null").apply(this);
+ }
+
/**
* Hides the identity of the current Single, including the Disposable that is sent
* to the downstream via {@code onSubscribe()}.
diff --git a/src/main/java/io/reactivex/SingleConverter.java b/src/main/java/io/reactivex/SingleConverter.java
new file mode 100644
index 0000000000..9938b22cc7
--- /dev/null
+++ b/src/main/java/io/reactivex/SingleConverter.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex;
+
+import io.reactivex.annotations.*;
+
+/**
+ * Convenience interface and callback used by the {@link Single#as} operator to turn a Single into another
+ * value fluently.
+ *
+ * @param the upstream type
+ * @param the output type
+ * @since 2.1.7 - experimental
+ */
+@Experimental
+public interface SingleConverter {
+ /**
+ * Applies a function to the upstream Single and returns a converted value of type {@code R}.
+ *
+ * @param upstream the upstream Single instance
+ * @return the converted value
+ */
+ @NonNull
+ R apply(@NonNull Single upstream);
+}
diff --git a/src/main/java/io/reactivex/parallel/ParallelFlowable.java b/src/main/java/io/reactivex/parallel/ParallelFlowable.java
index b1f6d60322..4dd6dfd93d 100644
--- a/src/main/java/io/reactivex/parallel/ParallelFlowable.java
+++ b/src/main/java/io/reactivex/parallel/ParallelFlowable.java
@@ -122,6 +122,24 @@ public static ParallelFlowable from(@NonNull Publisher extends T> sourc
return RxJavaPlugins.onAssembly(new ParallelFromPublisher(source, parallelism, prefetch));
}
+ /**
+ * Calls the specified converter function during assembly time and returns its resulting value.
+ *
+ * This allows fluent conversion to any other type.
+ *
+ * @param the resulting object type
+ * @param converter the function that receives the current ParallelFlowable instance and returns a value
+ * @return the converted value
+ * @throws NullPointerException if converter is null
+ * @since 2.1.7 - experimental
+ */
+ @Experimental
+ @CheckReturnValue
+ @NonNull
+ public final R as(@NonNull ParallelFlowableConverter converter) {
+ return ObjectHelper.requireNonNull(converter, "converter is null").apply(this);
+ }
+
/**
* Maps the source values on each 'rail' to another value.
*