Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Revert parTupledN and parMapN derivation #249

Merged
merged 6 commits into from
Aug 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions arrow-docs/docs/fx/async/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ data class ThreadInfo(

suspend fun main(): Unit {
val (threadA: String, threadB: String) =
parMapN(::threadName, ::threadName) { (a, b) ->
ThreadInfo(a, b)
}
parMapN(::threadName, ::threadName, ::ThreadInfo)

println(threadA)
println(threadB)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,6 @@ suspend fun <A, B> Iterable<A>.parTraverse(ctx: CoroutineContext, f: suspend (A)
if (ctx === EmptyCoroutineContext || ctx[ContinuationInterceptor] == null) map { a -> f(a) }
else toList().foldRight(suspend { emptyList<B>() }) { a, acc ->
suspend {
parMapN(ctx, { f(a) }, { acc.invoke() }) { (a, b) -> listOf(a) + b }
parMapN(ctx, { f(a) }, { acc.invoke() }) { a, b -> listOf(a) + b }
}
}.invoke()
132 changes: 61 additions & 71 deletions arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/ParTupledN.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,48 @@ import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
import kotlin.coroutines.intrinsics.intercepted
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED

/**
* Tuples [fa], [fb] in parallel on [ComputationPool].
* Cancelling this operation cancels both operations running in parallel.
*
* @see parTupledN for the same function that can race on any [CoroutineContext].
*/
suspend fun <A, B> parTupledN(fa: suspend () -> A, fb: suspend () -> B): Pair<A, B> =
parTupledN(ComputationPool, fa, fb)

/**
* Tuples [fa], [fb], [fc] in parallel on [ComputationPool].
* Cancelling this operation cancels both tasks running in parallel.
*
* @see parTupledN for the same function that can race on any [CoroutineContext].
*/
suspend fun <A, B, C> parTupledN(fa: suspend () -> A, fb: suspend () -> B, fc: suspend () -> C): Triple<A, B, C> =
parTupledN(ComputationPool, fa, fb, fc)

/**
* Tuples [fa], [fb] on the provided [CoroutineContext].
* Cancelling this operation cancels both tasks running in parallel.
*
* **WARNING** it runs in parallel depending on the capabilities of the provided [CoroutineContext].
* We ensure they start in sequence so it's guaranteed to finish on a single threaded context.
*
* @see parTupledN for a function that ensures it runs in parallel on the [ComputationPool].
*/
suspend fun <A, B> parTupledN(ctx: CoroutineContext, fa: suspend () -> A, fb: suspend () -> B): Pair<A, B> =
parMapN(ctx, fa, fb, ::Pair)

/**
* Tuples [fa], [fb] & [fc] on the provided [CoroutineContext].
* Cancelling this operation cancels both tasks running in parallel.
*
* **WARNING** it runs in parallel depending on the capabilities of the provided [CoroutineContext].
* We ensure they start in sequence so it's guaranteed to finish on a single threaded context.
*
* @see parTupledN for a function that ensures it runs in parallel on the [ComputationPool].
*/
suspend fun <A, B, C> parTupledN(ctx: CoroutineContext, fa: suspend () -> A, fb: suspend () -> B, fc: suspend () -> C): Triple<A, B, C> =
parMapN(ctx, fa, fb, fc, ::Triple)

/**
* Parallel maps [fa], [fb] in parallel on [ComputationPool].
* Cancelling this operation cancels both operations running in parallel.
Expand All @@ -21,7 +63,7 @@ import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
* val result = parMapN(
* { "First one is on ${Thread.currentThread().name}" },
* { "Second one is on ${Thread.currentThread().name}" }
* ) { (a, b) ->
* ) { a, b ->
* "$a\n$b"
* }
* //sampleEnd
Expand All @@ -36,8 +78,8 @@ import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
*
* @see parMapN for the same function that can race on any [CoroutineContext].
*/
suspend fun <A, B, C> parMapN(fa: suspend () -> A, fb: suspend () -> B, f: (Pair<A, B>) -> C): C =
f(parTupledN(ComputationPool, fa, fb))
suspend fun <A, B, C> parMapN(fa: suspend () -> A, fb: suspend () -> B, f: (A, B) -> C): C =
parMapN(ComputationPool, fa, fb, f)

/**
* Parallel maps [fa], [fb], [fc] in parallel on [ComputationPool].
Expand All @@ -49,9 +91,8 @@ suspend fun <A, B, C, D> parMapN(
fa: suspend () -> A,
fb: suspend () -> B,
fc: suspend () -> C,
f: (Triple<A, B, C>) -> D
): D =
f(parTupledN(ComputationPool, fa, fb, fc))
f: (A, B, C) -> D
): D = parMapN(ComputationPool, fa, fb, fc, f)

/**
* Parallel maps [fa], [fb] on the provided [CoroutineContext].
Expand All @@ -63,66 +104,13 @@ suspend fun <A, B, C, D> parMapN(
*
* @see parMapN for a function that ensures it runs in parallel on the [ComputationPool].
*/
@Suppress("UNCHECKED_CAST")
suspend fun <A, B, C> parMapN(
ctx: CoroutineContext,
fa: suspend () -> A,
fb: suspend () -> B,
f: (Pair<A, B>) -> C
f: (A, B) -> C
): C =
f(parTupledN(ctx, fa, fb))

/**
* Parallel maps [fa], [fb], [fc] on the provided [CoroutineContext].
* Cancelling this operation cancels both tasks running in parallel.
*
* **WARNING** this function forks [fa], [fb] & [fc] but if it runs in parallel depends
* on the capabilities of the provided [CoroutineContext].
* We ensure they start in sequence so it's guaranteed to finish on a single threaded context.
*
* @see parMapN for a function that ensures it runs in parallel on the [ComputationPool].
*/
suspend fun <A, B, C, D> parMapN(
ctx: CoroutineContext,
fa: suspend () -> A,
fb: suspend () -> B,
fc: suspend () -> C,
f: (Triple<A, B, C>) -> D
): D =
f(parTupledN(ctx, fa, fb, fc))

/**
* Tuples [fa], [fb] in parallel on [ComputationPool].
* Cancelling this operation cancels both operations running in parallel.
*
* @see parTupledN for the same function that can race on any [CoroutineContext].
*/
suspend fun <A, B> parTupledN(fa: suspend () -> A, fb: suspend () -> B): Pair<A, B> =
parTupledN(ComputationPool, fa, fb)

/**
* Tuples [fa], [fb], [fc] in parallel on [ComputationPool].
* Cancelling this operation cancels both tasks running in parallel.
*
* @see parTupledN for the same function that can race on any [CoroutineContext].
*/
suspend fun <A, B, C> parTupledN(fa: suspend () -> A, fb: suspend () -> B, fc: suspend () -> C): Triple<A, B, C> =
parTupledN(ComputationPool, fa, fb, fc)

/**
* Tuples [fa], [fb] on the provided [CoroutineContext].
* Cancelling this operation cancels both tasks running in parallel.
*
* **WARNING** it runs in parallel depending on the capabilities of the provided [CoroutineContext].
* We ensure they start in sequence so it's guaranteed to finish on a single threaded context.
*
* @see parTupledN for a function that ensures it runs in parallel on the [ComputationPool].
*/
@Suppress("UNCHECKED_CAST")
suspend fun <A, B> parTupledN(
ctx: CoroutineContext,
fa: suspend () -> A,
fb: suspend () -> B
): Pair<A, B> =
suspendCoroutineUninterceptedOrReturn { cont ->
val conn = cont.context.connection()
val cont = cont.intercepted()
Expand All @@ -140,9 +128,9 @@ suspend fun <A, B> parTupledN(
conn.pop()
cb(
try {
Result.success(Pair(a, b))
Result.success(f(a, b))
} catch (e: Throwable) {
Result.failure<Pair<A, B>>(e.nonFatalOrThrow())
Result.failure<C>(e.nonFatalOrThrow())
}
)
}
Expand All @@ -151,7 +139,7 @@ suspend fun <A, B> parTupledN(
is Throwable -> Unit // Do nothing we already finished
else -> other.cancelToken().cancel.startCoroutine(Continuation(EmptyCoroutineContext) { r ->
conn.pop()
cb(Result.failure<Pair<A, B>>(r.fold({ e }, { e2 -> Platform.composeErrors(e, e2) })))
cb(Result.failure(r.fold({ e }, { e2 -> Platform.composeErrors(e, e2) })))
})
}

Expand Down Expand Up @@ -185,20 +173,22 @@ suspend fun <A, B> parTupledN(
}

/**
* Tuples [fa], [fb] & [fc] on the provided [CoroutineContext].
* Parallel maps [fa], [fb], [fc] on the provided [CoroutineContext].
* Cancelling this operation cancels both tasks running in parallel.
*
* **WARNING** it runs in parallel depending on the capabilities of the provided [CoroutineContext].
* **WARNING** this function forks [fa], [fb] & [fc] but if it runs in parallel depends
* on the capabilities of the provided [CoroutineContext].
* We ensure they start in sequence so it's guaranteed to finish on a single threaded context.
*
* @see parTupledN for a function that ensures it runs in parallel on the [ComputationPool].
* @see parMapN for a function that ensures it runs in parallel on the [ComputationPool].
*/
suspend fun <A, B, C> parTupledN(
suspend fun <A, B, C, D> parMapN(
ctx: CoroutineContext,
fa: suspend () -> A,
fb: suspend () -> B,
fc: suspend () -> C
): Triple<A, B, C> =
fc: suspend () -> C,
f: (A, B, C) -> D
): D =
suspendCoroutineUninterceptedOrReturn { cont ->
val conn = cont.context.connection()
val cont = cont.intercepted()
Expand All @@ -217,7 +207,7 @@ suspend fun <A, B, C> parTupledN(

fun complete(a: A, b: B, c: C) {
conn.pop()
cb(Result.success(Triple(a, b, c)))
cb(Result.success(f(a, b, c)))
}

fun tryComplete(result: Triple<A?, B?, C?>?): Unit {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class InterruptionTest : StreamSpec(spec = {

latch.get()
f.cancel()
timeOutOrNull(50.milliseconds) { exit.get() } shouldBe ExitCase.Cancelled
exit.get() shouldBe ExitCase.Cancelled
}
}

Expand Down