Skip to content

Commit

Permalink
Refactor parMapN functions, add parTupledN (#80)
Browse files Browse the repository at this point in the history
* Deprecate parMapN ext fun, add explicit param

Add parTupledN versions

* Simplify Rx instance for Concurrent

Fix Concurrent docs for parMapN and parTupledN

Co-authored-by: danieh <danimontoya_86@hotmail.com>
Co-authored-by: Simon Vergauwen <nomisRev@users.noreply.github.com>
  • Loading branch information
3 people authored Mar 5, 2020
1 parent 24fbb7a commit 90f876d
Show file tree
Hide file tree
Showing 15 changed files with 892 additions and 202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package arrow.benchmarks
import arrow.core.extensions.list.foldable.foldLeft
import arrow.fx.IO
import arrow.fx.IODispatchers
import arrow.fx.extensions.io.concurrent.parMapN
import org.openjdk.jmh.annotations.Benchmark
import org.openjdk.jmh.annotations.CompilerControl
import org.openjdk.jmh.annotations.Fork
Expand All @@ -26,7 +25,7 @@ open class ParMap {

private fun ioHelper(): IO<Int> =
(0 until size).toList().foldLeft(IO { 0 }) { acc, i ->
IODispatchers.CommonPool.parMapN(acc, IO { i }) { a, b -> a + b }
IO.parMapN(IODispatchers.CommonPool, acc, IO { i }) { (a, b) -> a + b }
}

@Benchmark
Expand Down
3 changes: 2 additions & 1 deletion arrow-libs/fx/arrow-docs/docs/fx/async/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ data class ThreadInfo(
val program = IO.fx {
val (threadA: String, threadB: String) =
!dispatchers().default().parMapN(
!IO.parMapN(
dispatchers().default(),
effect { threadName() },
effect { threadName() },
::ThreadInfo
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package arrow.fx.rx2.extensions

import arrow.core.Tuple2
import arrow.core.Tuple3
import io.reactivex.functions.BiFunction
import io.reactivex.functions.Function3

internal fun <A, B> tupled2(): BiFunction<A, B, Tuple2<A, B>> =
BiFunction { a: A, b: B -> Tuple2(a, b) }

internal fun <A, B, C> tupled3(): Function3<A, B, C, Tuple3<A, B, C>> =
Function3 { a: A, b: B, c: C -> Tuple3(a, b, c) }
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import arrow.core.Either
import arrow.core.Eval
import arrow.core.Option
import arrow.core.Tuple2
import arrow.core.Tuple3

import arrow.fx.RacePair
import arrow.fx.RaceTriple
Expand Down Expand Up @@ -43,7 +44,6 @@ import io.reactivex.BackpressureStrategy
import io.reactivex.Flowable
import java.util.concurrent.TimeUnit
import kotlin.coroutines.CoroutineContext
import io.reactivex.functions.BiFunction
import arrow.fx.rx2.asScheduler
import arrow.fx.rx2.extensions.flowablek.dispatchers.dispatchers
import arrow.fx.rx2.k
Expand Down Expand Up @@ -197,13 +197,11 @@ interface FlowableKConcurrent : Concurrent<ForFlowableK>, FlowableKAsync {
}, BS()).k()
}

override fun <A, B, C> CoroutineContext.parMapN(fa: FlowableKOf<A>, fb: FlowableKOf<B>, f: (A, B) -> C): FlowableK<C> =
FlowableK(fa.value().zipWith(fb.value(), BiFunction(f)).subscribeOn(asScheduler()))
override fun <A, B> parTupledN(ctx: CoroutineContext, fa: FlowableKOf<A>, fb: FlowableKOf<B>): FlowableK<Tuple2<A, B>> =
fa.value().zipWith(fb.value(), tupled2()).subscribeOn(ctx.asScheduler()).k()

override fun <A, B, C, D> CoroutineContext.parMapN(fa: FlowableKOf<A>, fb: FlowableKOf<B>, fc: FlowableKOf<C>, f: (A, B, C) -> D): FlowableK<D> =
FlowableK(fa.value().zipWith(fb.value().zipWith(fc.value(), BiFunction<B, C, Tuple2<B, C>> { b, c -> Tuple2(b, c) }), BiFunction { a: A, tuple: Tuple2<B, C> ->
f(a, tuple.a, tuple.b)
}).subscribeOn(asScheduler()))
override fun <A, B, C> parTupledN(ctx: CoroutineContext, fa: FlowableKOf<A>, fb: FlowableKOf<B>, fc: FlowableKOf<C>): FlowableK<Tuple3<A, B, C>> =
Flowable.zip(fa.value(), fb.value(), fc.value(), tupled3()).subscribeOn(ctx.asScheduler()).k()

override fun <A> cancellable(k: ((Either<Throwable, A>) -> Unit) -> CancelToken<ForFlowableK>): FlowableK<A> =
FlowableK.cancellable(k, BS())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import arrow.core.Either
import arrow.core.Eval
import arrow.core.Option
import arrow.core.Tuple2
import arrow.core.Tuple3
import arrow.extension
import arrow.fx.RacePair
import arrow.fx.RaceTriple
Expand Down Expand Up @@ -44,7 +45,6 @@ import arrow.typeclasses.MonadFilter
import arrow.typeclasses.MonadThrow
import arrow.unsafe
import io.reactivex.Maybe
import io.reactivex.functions.BiFunction
import io.reactivex.schedulers.Schedulers
import io.reactivex.subjects.ReplaySubject
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -171,8 +171,8 @@ interface MaybeKEffect :
}

interface MaybeKConcurrent : Concurrent<ForMaybeK>, MaybeKAsync {
override fun <A> Kind<ForMaybeK, A>.fork(coroutineContext: CoroutineContext): MaybeK<Fiber<ForMaybeK, A>> =
coroutineContext.asScheduler().let { scheduler ->
override fun <A> Kind<ForMaybeK, A>.fork(ctx: CoroutineContext): MaybeK<Fiber<ForMaybeK, A>> =
ctx.asScheduler().let { scheduler ->
Maybe.create<Fiber<ForMaybeK, A>> { emitter ->
if (!emitter.isDisposed) {
val s: ReplaySubject<A> = ReplaySubject.create()
Expand All @@ -184,13 +184,11 @@ interface MaybeKConcurrent : Concurrent<ForMaybeK>, MaybeKAsync {
}.k()
}

override fun <A, B, C> CoroutineContext.parMapN(fa: MaybeKOf<A>, fb: MaybeKOf<B>, f: (A, B) -> C): MaybeK<C> =
MaybeK(fa.value().zipWith(fb.value(), BiFunction(f)).subscribeOn(asScheduler()))
override fun <A, B> parTupledN(ctx: CoroutineContext, fa: MaybeKOf<A>, fb: MaybeKOf<B>): MaybeK<Tuple2<A, B>> =
fa.value().zipWith(fb.value(), tupled2()).subscribeOn(ctx.asScheduler()).k()

override fun <A, B, C, D> CoroutineContext.parMapN(fa: MaybeKOf<A>, fb: MaybeKOf<B>, fc: MaybeKOf<C>, f: (A, B, C) -> D): MaybeK<D> =
MaybeK(fa.value().zipWith(fb.value().zipWith(fc.value(), BiFunction<B, C, Tuple2<B, C>> { b, c -> Tuple2(b, c) }), BiFunction { a: A, tuple: Tuple2<B, C> ->
f(a, tuple.a, tuple.b)
}).subscribeOn(asScheduler()))
override fun <A, B, C> parTupledN(ctx: CoroutineContext, fa: MaybeKOf<A>, fb: MaybeKOf<B>, fc: MaybeKOf<C>): MaybeK<Tuple3<A, B, C>> =
Maybe.zip(fa.value(), fb.value(), fc.value(), tupled3()).subscribeOn(ctx.asScheduler()).k()

override fun <A> cancellable(k: ((Either<Throwable, A>) -> Unit) -> CancelToken<ForMaybeK>): MaybeK<A> =
MaybeK.cancellable(k)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import arrow.core.Either
import arrow.core.Eval
import arrow.core.Option
import arrow.core.Tuple2
import arrow.core.Tuple3
import arrow.extension
import arrow.fx.RacePair
import arrow.fx.RaceTriple
Expand Down Expand Up @@ -43,7 +44,6 @@ import arrow.typeclasses.Monad
import arrow.typeclasses.MonadError
import arrow.typeclasses.MonadThrow
import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import io.reactivex.schedulers.Schedulers
import io.reactivex.subjects.ReplaySubject
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -168,13 +168,11 @@ interface ObservableKConcurrent : Concurrent<ForObservableK>, ObservableKAsync {
}.k()
}

override fun <A, B, C> CoroutineContext.parMapN(fa: ObservableKOf<A>, fb: ObservableKOf<B>, f: (A, B) -> C): ObservableK<C> =
ObservableK(fa.value().zipWith(fb.value(), BiFunction(f)).subscribeOn(asScheduler()))
override fun <A, B> parTupledN(ctx: CoroutineContext, fa: ObservableKOf<A>, fb: ObservableKOf<B>): ObservableK<Tuple2<A, B>> =
fa.value().zipWith(fb.value(), tupled2()).subscribeOn(ctx.asScheduler()).k()

override fun <A, B, C, D> CoroutineContext.parMapN(fa: ObservableKOf<A>, fb: ObservableKOf<B>, fc: ObservableKOf<C>, f: (A, B, C) -> D): ObservableK<D> =
ObservableK(fa.value().zipWith(fb.value().zipWith(fc.value(), BiFunction<B, C, Tuple2<B, C>> { b, c -> Tuple2(b, c) }), BiFunction { a: A, tuple: Tuple2<B, C> ->
f(a, tuple.a, tuple.b)
}).subscribeOn(asScheduler()))
override fun <A, B, C> parTupledN(ctx: CoroutineContext, fa: ObservableKOf<A>, fb: ObservableKOf<B>, fc: ObservableKOf<C>): ObservableK<Tuple3<A, B, C>> =
Observable.zip(fa.value(), fb.value(), fc.value(), tupled3()).subscribeOn(ctx.asScheduler()).k()

override fun <A> cancellable(k: ((Either<Throwable, A>) -> Unit) -> CancelToken<ForObservableK>): ObservableKOf<A> =
ObservableK.cancellable(k)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import arrow.Kind
import arrow.core.Either
import arrow.core.Eval
import arrow.core.Tuple2
import arrow.core.Tuple3

import arrow.fx.RacePair
import arrow.fx.RaceTriple
Expand Down Expand Up @@ -43,7 +44,6 @@ import arrow.typeclasses.MonadError
import arrow.typeclasses.MonadThrow
import arrow.unsafe
import io.reactivex.Single
import io.reactivex.functions.BiFunction
import io.reactivex.schedulers.Schedulers
import io.reactivex.subjects.ReplaySubject
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -150,8 +150,8 @@ interface SingleKEffect :
}

interface SingleKConcurrent : Concurrent<ForSingleK>, SingleKAsync {
override fun <A> Kind<ForSingleK, A>.fork(coroutineContext: CoroutineContext): SingleK<Fiber<ForSingleK, A>> =
coroutineContext.asScheduler().let { scheduler ->
override fun <A> Kind<ForSingleK, A>.fork(ctx: CoroutineContext): SingleK<Fiber<ForSingleK, A>> =
ctx.asScheduler().let { scheduler ->
Single.create<Fiber<ForSingleK, A>> { emitter ->
if (!emitter.isDisposed) {
val s: ReplaySubject<A> = ReplaySubject.create()
Expand All @@ -163,13 +163,11 @@ interface SingleKConcurrent : Concurrent<ForSingleK>, SingleKAsync {
}.k()
}

override fun <A, B, C> CoroutineContext.parMapN(fa: SingleKOf<A>, fb: SingleKOf<B>, f: (A, B) -> C): SingleK<C> =
SingleK(fa.value().zipWith(fb.value(), BiFunction(f)).subscribeOn(asScheduler()))
override fun <A, B> parTupledN(ctx: CoroutineContext, fa: SingleKOf<A>, fb: SingleKOf<B>): SingleK<Tuple2<A, B>> =
fa.value().zipWith(fb.value(), tupled2()).subscribeOn(ctx.asScheduler()).k()

override fun <A, B, C, D> CoroutineContext.parMapN(fa: SingleKOf<A>, fb: SingleKOf<B>, fc: SingleKOf<C>, f: (A, B, C) -> D): SingleK<D> =
SingleK(fa.value().zipWith(fb.value().zipWith(fc.value(), BiFunction<B, C, Tuple2<B, C>> { b, c -> Tuple2(b, c) }), BiFunction { a: A, tuple: Tuple2<B, C> ->
f(a, tuple.a, tuple.b)
}).subscribeOn(asScheduler()))
override fun <A, B, C> parTupledN(ctx: CoroutineContext, fa: SingleKOf<A>, fb: SingleKOf<B>, fc: SingleKOf<C>): SingleK<Tuple3<A, B, C>> =
Single.zip(fa.value(), fb.value(), fc.value(), tupled3()).subscribeOn(ctx.asScheduler()).k()

override fun <A> cancellable(k: ((Either<Throwable, A>) -> Unit) -> CancelToken<ForSingleK>): SingleK<A> =
SingleK.cancellable(k)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import arrow.core.Left
import arrow.core.ListK
import arrow.core.Right
import arrow.core.Tuple2
import arrow.core.Tuple6
import arrow.core.extensions.eq
import arrow.core.extensions.listk.traverse.traverse
import arrow.core.extensions.tuple2.eq.eq
import arrow.core.extensions.tuple6.eq.eq
import arrow.core.identity
import arrow.core.k
import arrow.core.toT
Expand Down Expand Up @@ -85,7 +87,7 @@ object ConcurrentLaws {
Law("Concurrent Laws: race mirrors right winner") { CF.raceMirrorsRightWinner(EQ, ctx) },
Law("Concurrent Laws: race cancels loser") { CF.raceCancelsLoser(EQ, ctx) },
Law("Concurrent Laws: race cancels both") { CF.raceCancelCancelsBoth(EQ, ctx) },
Law("Concurrent Laws: parallel execution with single threaded context makes all Fs start at the same time") { CF.parMapStartsAllAtSameTime(EQK.liftEq(Eq.any())) },
Law("Concurrent Laws: parallel execution with single threaded context makes all Fs start at the same time") { CF.parMapStartsAllAtSameTime(EQK.liftEq(Tuple6.eq(Int.eq(), Int.eq(), Int.eq(), Int.eq(), Int.eq(), Int.eq()))) },
Law("Concurrent Laws: parallel map cancels both") { CF.parMapCancelCancelsBoth(EQ, ctx) },
Law("Concurrent Laws: action concurrent with pure value is just action") { CF.actionConcurrentWithPureValueIsJustAction(EQ, ctx) },
Law("Concurrent Laws: parTraverse can traverse effectful computations") { CF.parTraverseCanTraverseEffectfullComputations(EQ) },
Expand Down Expand Up @@ -624,18 +626,18 @@ object ConcurrentLaws {
}.equalUnderTheLaw(just(a + b + c), EQ)
}

fun <F> Concurrent<F>.parMapStartsAllAtSameTime(EQ: Eq<Kind<F, List<Long>>>) {
val order = mutableListOf<Long>()
fun <F> Concurrent<F>.parMapStartsAllAtSameTime(EQ: Eq<Kind<F, Tuple6<Int, Int, Int, Int, Int, Int>>>) {
val order = mutableListOf<Int>()

fun makePar(num: Long) = sleep((num * 100).milliseconds).map {
fun makePar(num: Int) = sleep((num * 100).milliseconds).map {
order.add(num)
num
}

single.parMapN(
makePar(6), makePar(3), makePar(2), makePar(4), makePar(1), makePar(5)) { six, tree, two, four, one, five -> listOf(six, tree, two, four, one, five) }
.equalUnderTheLaw(just(listOf(6L, 3, 2, 4, 1, 5)), EQ)
order.toList() shouldBe listOf(1L, 2, 3, 4, 5, 6)
parTupledN(single,
makePar(6), makePar(3), makePar(2), makePar(4), makePar(1), makePar(5)
).equalUnderTheLaw(just(Tuple6(6, 3, 2, 4, 1, 5)), EQ) shouldBe true
order.toList() shouldBe listOf(1, 2, 3, 4, 5, 6)
}

fun <F> Concurrent<F>.parMapCancelCancelsBoth(EQ: Eq<Kind<F, Int>>, ctx: CoroutineContext) =
Expand All @@ -648,7 +650,7 @@ object ConcurrentLaws {
val loserA = s.release().bracket(use = { never<String>() }, release = { pa.complete(a) })
val loserB = s.release().bracket(use = { never<Int>() }, release = { pb.complete(b) })

val (_, cancelParMapN) = ctx.parMapN(loserA, loserB, ::Tuple2).fork(ctx).bind()
val (_, cancelParMapN) = parTupledN(ctx, loserA, loserB).fork(ctx).bind()
s.acquireN(2L).flatMap { cancelParMapN }.bind()
pa.get().bind() + pb.get().bind()
}.equalUnderTheLaw(just(a + b), EQ)
Expand Down Expand Up @@ -795,50 +797,50 @@ object ConcurrentLaws {
}

fun <F> Concurrent<F>.parMap2StackSafe(iterations: Int, EQ: Eq<Kind<F, Int>>, ctx: CoroutineContext) {
(0 until iterations).map { just(1) }
.fold(just(0)) { acc, t -> ctx.parMapN(acc, t) { a, b -> a + b } }
.shouldBeEq(just(iterations), EQ)
(0 until iterations).map { just(1) }
.fold(just(0)) { acc, t -> parMapN(ctx, acc, t) { (a, b) -> a + b } }
.shouldBeEq(just(iterations), EQ)
}

fun <F> Concurrent<F>.parMap3StackSafe(iterations: Int, EQ: Eq<Kind<F, Int>>, ctx: CoroutineContext) {
(0 until iterations).map { just(1) }
.fold(just(0)) { acc, t -> ctx.parMapN(acc, t, unit()) { a, b, _ -> a + b } }
.shouldBeEq(just(iterations), EQ)
(0 until iterations).map { just(1) }
.fold(just(0)) { acc, t -> parMapN(ctx, acc, t, unit()) { it.a + it.b } }
.shouldBeEq(just(iterations), EQ)
}

fun <F> Concurrent<F>.parMap4StackSafe(iterations: Int, EQ: Eq<Kind<F, Int>>, ctx: CoroutineContext) {
(0 until iterations).map { just(1) }
.fold(just(0)) { acc, t -> ctx.parMapN(acc, t, unit(), unit()) { a, b, _, _ -> a + b } }
.shouldBeEq(just(iterations), EQ)
(0 until iterations).map { just(1) }
.fold(just(0)) { acc, t -> parMapN(ctx, acc, t, unit(), unit()) { it.a + it.b } }
.shouldBeEq(just(iterations), EQ)
}

fun <F> Concurrent<F>.parMap5StackSafe(iterations: Int, EQ: Eq<Kind<F, Int>>, ctx: CoroutineContext) {
(0 until iterations).map { just(1) }
.fold(just(0)) { acc, t -> ctx.parMapN(acc, t, unit(), unit(), unit()) { a, b, _, _, _ -> a + b } }
.shouldBeEq(just(iterations), EQ)
(0 until iterations).map { just(1) }
.fold(just(0)) { acc, t -> parMapN(ctx, acc, t, unit(), unit(), unit()) { it.a + it.b } }
.shouldBeEq(just(iterations), EQ)
}

fun <F> Concurrent<F>.parMap6StackSafe(iterations: Int, EQ: Eq<Kind<F, Int>>, ctx: CoroutineContext) {
(0 until iterations).map { just(1) }
.fold(just(0)) { acc, t -> ctx.parMapN(acc, t, unit(), unit(), unit(), unit()) { a, b, _, _, _, _ -> a + b } }
.shouldBeEq(just(iterations), EQ)
(0 until iterations).map { just(1) }
.fold(just(0)) { acc, t -> parMapN(ctx, acc, t, unit(), unit(), unit(), unit()) { it.a + it.b } }
.shouldBeEq(just(iterations), EQ)
}

fun <F> Concurrent<F>.parMap7StackSafe(iterations: Int, EQ: Eq<Kind<F, Int>>, ctx: CoroutineContext) {
(0 until iterations).map { just(1) }
.fold(just(0)) { acc, t -> ctx.parMapN(acc, t, unit(), unit(), unit(), unit(), unit()) { a, b, _, _, _, _, _ -> a + b } }
.shouldBeEq(just(iterations), EQ)
(0 until iterations).map { just(1) }
.fold(just(0)) { acc, t -> parMapN(ctx, acc, t, unit(), unit(), unit(), unit(), unit()) { it.a + it.b } }
.shouldBeEq(just(iterations), EQ)
}

fun <F> Concurrent<F>.parMap8StackSafe(iterations: Int, EQ: Eq<Kind<F, Int>>, ctx: CoroutineContext) {
(0 until iterations).map { just(1) }
.fold(just(0)) { acc, t -> ctx.parMapN(acc, t, unit(), unit(), unit(), unit(), unit(), unit()) { a, b, _, _, _, _, _, _ -> a + b } }
.shouldBeEq(just(iterations), EQ)
(0 until iterations).map { just(1) }
.fold(just(0)) { acc, t -> parMapN(ctx, acc, t, unit(), unit(), unit(), unit(), unit(), unit()) { it.a + it.b } }
.shouldBeEq(just(iterations), EQ)
}

fun <F> Concurrent<F>.parMap9StackSafe(iterations: Int, EQ: Eq<Kind<F, Int>>, ctx: CoroutineContext) {
(0 until iterations).map { just(1) }
.fold(just(0)) { acc, t -> ctx.parMapN(acc, t, unit(), unit(), unit(), unit(), unit(), unit(), unit()) { a, b, _, _, _, _, _, _, _ -> a + b } }
.fold(just(0)) { acc, t -> parMapN(ctx, acc, t, unit(), unit(), unit(), unit(), unit(), unit(), unit()) { it.a + it.b } }
.shouldBeEq(just(iterations), EQ)
}

Expand Down
Loading

0 comments on commit 90f876d

Please sign in to comment.