Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce parMap and deprecate parTraverseXXX #2863

Merged
merged 11 commits into from
Jan 24, 2023
30 changes: 30 additions & 0 deletions arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,21 @@ public final class arrow/fx/coroutines/FlowExtensions {
public static final fun retry (Lkotlinx/coroutines/flow/Flow;Larrow/fx/coroutines/Schedule;)Lkotlinx/coroutines/flow/Flow;
}

public final class arrow/fx/coroutines/ParMapKt {
public static final fun parMap (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun parMap (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun parMap$default (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun parMap$default (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun parMapOrAccumulate (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun parMapOrAccumulate (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun parMapOrAccumulate (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun parMapOrAccumulate (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun parMapOrAccumulate$default (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun parMapOrAccumulate$default (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun parMapOrAccumulate$default (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun parMapOrAccumulate$default (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public final class arrow/fx/coroutines/ParTraverse {
public static final fun parSequence (Ljava/lang/Iterable;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun parSequence (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down Expand Up @@ -537,6 +552,21 @@ public final class arrow/fx/coroutines/ScheduleKt {
public static final fun retryOrElseEither (Larrow/fx/coroutines/Schedule;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class arrow/fx/coroutines/ScopedRaise : arrow/core/continuations/EffectScope, kotlinx/coroutines/CoroutineScope {
public fun <init> (Larrow/core/continuations/EffectScope;Lkotlinx/coroutines/CoroutineScope;)V
public fun attempt (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun bind (Larrow/core/Either;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun bind (Larrow/core/Option;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun bind (Larrow/core/Validated;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun bind (Larrow/core/continuations/EagerEffect;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun bind (Larrow/core/continuations/Effect;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun bind (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun catch (Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun ensure (ZLkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
public fun shift (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class arrow/fx/coroutines/Use {
public static final synthetic fun box-impl (Lkotlin/jvm/functions/Function1;)Larrow/fx/coroutines/Use;
public static fun constructor-impl (Lkotlin/jvm/functions/Function1;)Lkotlin/jvm/functions/Function1;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package arrow.fx.coroutines

import arrow.core.Either
import arrow.core.NonEmptyList
import arrow.core.continuations.EffectScope
import arrow.core.continuations.either
import arrow.core.flattenOrAccumulate
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

public suspend fun <A, B> Iterable<A>.parMap(
ctx: CoroutineContext = EmptyCoroutineContext,
concurrency: Int,
f: suspend CoroutineScope.(A) -> B
): List<B> {
val semaphore = Semaphore(concurrency)
return parMap(ctx) {
semaphore.withPermit { f(it) }
}
}

public suspend fun <A, B> Iterable<A>.parMap(
context: CoroutineContext = EmptyCoroutineContext,
transform: suspend CoroutineScope.(A) -> B
): List<B> = coroutineScope {
map { async(context) { transform.invoke(this, it) } }.awaitAll()
}

/** Temporary intersection type, until we have context receivers */
public class ScopedRaise<Error>(
raise: EffectScope<Error>,
scope: CoroutineScope
) : CoroutineScope by scope, EffectScope<Error> by raise

public suspend fun <Error, A, B> Iterable<A>.parMapOrAccumulate(
context: CoroutineContext = EmptyCoroutineContext,
concurrency: Int,
combine: (Error, Error) -> Error,
transform: suspend ScopedRaise<Error>.(A) -> B
): Either<Error, List<B>> =
coroutineScope {
val semaphore = Semaphore(concurrency)
map {
async(context) {
either {
semaphore.withPermit {
transform(ScopedRaise(this, this@coroutineScope), it)
}
}
}
}.awaitAll().flattenOrAccumulate(combine)
}

public suspend fun <Error, A, B> Iterable<A>.parMapOrAccumulate(
context: CoroutineContext = EmptyCoroutineContext,
combine: (Error, Error) -> Error,
transform: suspend ScopedRaise<Error>.(A) -> B
): Either<Error, List<B>> =
coroutineScope {
map {
async(context) {
either {
transform(ScopedRaise(this, this@coroutineScope), it)
}
}
}.awaitAll().flattenOrAccumulate(combine)
}

public suspend fun <Error, A, B> Iterable<A>.parMapOrAccumulate(
context: CoroutineContext = EmptyCoroutineContext,
concurrency: Int,
transform: suspend ScopedRaise<Error>.(A) -> B
): Either<NonEmptyList<Error>, List<B>> =
coroutineScope {
val semaphore = Semaphore(concurrency)
map {
async(context) {
either {
semaphore.withPermit {
transform(ScopedRaise(this, this@coroutineScope), it)
}
}
}
}.awaitAll().flattenOrAccumulate()
}

public suspend fun <Error, A, B> Iterable<A>.parMapOrAccumulate(
context: CoroutineContext = EmptyCoroutineContext,
transform: suspend ScopedRaise<Error>.(A) -> B
): Either<NonEmptyList<Error>, List<B>> =
coroutineScope {
map {
async(context) {
either {
transform(ScopedRaise(this, this@coroutineScope), it)
}
}
}.awaitAll().flattenOrAccumulate()
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,51 @@ package arrow.fx.coroutines

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.jvm.JvmMultifileClass
import kotlin.jvm.JvmName

@Deprecated(
"Function is being renamed to parMap in 2.x.x",
ReplaceWith(
"parMap(Dispatchers.Default, n) { it() }",
"arrow.fx.coroutines.parMap",
"kotlinx.coroutines.Dispatchers"
)
)
public suspend fun <A> Iterable<suspend () -> A>.parSequenceN(n: Int): List<A> =
parSequenceN(Dispatchers.Default, n)
parMap(Dispatchers.Default, n) { it() }

/**
* Sequences all tasks in [n] parallel processes on [Dispatchers.Default] and return the result.
*
* Cancelling this operation cancels all running tasks
*/
@Deprecated(
"Function is being renamed to parMap in 2.x.x",
ReplaceWith(
"parMap(Dispatchers.Default, n) { it() }",
"arrow.fx.coroutines.parMap",
"kotlinx.coroutines.Dispatchers"
)
)
@JvmName("parSequenceNScoped")
public suspend fun <A> Iterable<suspend CoroutineScope.() -> A>.parSequenceN(n: Int): List<A> =
parSequenceN(Dispatchers.Default, n)
parMap(Dispatchers.Default, n) { it() }

public suspend fun <A> Iterable<suspend () -> A>.parSequenceN(ctx: CoroutineContext = EmptyCoroutineContext, n: Int): List<A> {
val s = Semaphore(n)
return parTraverse(ctx) {
s.withPermit { it.invoke() }
}
}
@Deprecated(
"Function is being renamed to parMap in 2.x.x",
ReplaceWith(
"parMap(ctx, n) { it() }",
"arrow.fx.coroutines.parMap"
)
)
public suspend fun <A> Iterable<suspend () -> A>.parSequenceN(
ctx: CoroutineContext = EmptyCoroutineContext,
n: Int
): List<A> = parMap(ctx, n) { it() }

/**
* Sequences all tasks in [n] parallel processes and return the result.
Expand All @@ -44,16 +60,28 @@ public suspend fun <A> Iterable<suspend () -> A>.parSequenceN(ctx: CoroutineCont
*
* Cancelling this operation cancels all running tasks
*/
@Deprecated(
"Function is being renamed to parMap in 2.x.x",
ReplaceWith(
"parMap(ctx, n) { it() }",
"arrow.fx.coroutines.parMap"
)
)
@JvmName("parSequenceNScoped")
public suspend fun <A> Iterable<suspend CoroutineScope.() -> A>.parSequenceN(ctx: CoroutineContext = EmptyCoroutineContext, n: Int): List<A> {
val s = Semaphore(n)
return parTraverse(ctx) {
s.withPermit { it.invoke(this) }
}
}

public suspend fun <A> Iterable<suspend CoroutineScope.() -> A>.parSequenceN(
ctx: CoroutineContext = EmptyCoroutineContext,
n: Int
): List<A> = parMap(ctx, n) { it() }
@Deprecated(
"Function is being renamed to parMap in 2.x.x",
ReplaceWith(
"parMap(Dispatchers.Default) { it() }",
"arrow.fx.coroutines.parMap",
"kotlinx.coroutines.Dispatchers"
)
)
public suspend fun <A> Iterable<suspend () -> A>.parSequence(): List<A> =
parSequence(Dispatchers.Default)
parMap(Dispatchers.Default) { it() }

/**
* Sequences all tasks in parallel on [Dispatchers.Default] and return the result
Expand All @@ -79,12 +107,27 @@ public suspend fun <A> Iterable<suspend () -> A>.parSequence(): List<A> =
* ```
* <!--- KNIT example-partraverse-01.kt -->
*/
@Deprecated(
"Function is being renamed to parMap in 2.x.x",
ReplaceWith(
"parMap(Dispatchers.Default) { it() }",
"arrow.fx.coroutines.parMap",
"kotlinx.coroutines.Dispatchers"
)
)
@JvmName("parSequenceScoped")
public suspend fun <A> Iterable<suspend CoroutineScope.() -> A>.parSequence(): List<A> =
parSequence(Dispatchers.Default)
parMap(Dispatchers.Default) { it() }

@Deprecated(
"Function is being renamed to parMap in 2.x.x",
ReplaceWith(
"parMap(ctx) { it() }",
"arrow.fx.coroutines.parMap"
)
)
public suspend fun <A> Iterable<suspend () -> A>.parSequence(ctx: CoroutineContext = EmptyCoroutineContext): List<A> =
parTraverse(ctx) { it.invoke() }
parMap(ctx) { it() }

/**
* Sequences all tasks in parallel and return the result
Expand Down Expand Up @@ -115,17 +158,31 @@ public suspend fun <A> Iterable<suspend () -> A>.parSequence(ctx: CoroutineConte
* ```
* <!--- KNIT example-partraverse-02.kt -->
*/
@Deprecated(
"Function is being renamed to parMap in 2.x.x",
ReplaceWith(
"parMap(ctx) { it() }",
"arrow.fx.coroutines.parMap"
)
)
@JvmName("parSequenceScoped")
public suspend fun <A> Iterable<suspend CoroutineScope.() -> A>.parSequence(ctx: CoroutineContext = EmptyCoroutineContext): List<A> =
parTraverse(ctx) { it.invoke(this) }
parMap(ctx) { it() }

/**
* Traverses this [Iterable] and runs [f] in [n] parallel operations on [Dispatchers.Default].
* Cancelling this operation cancels all running tasks.
*/

@Deprecated(
"Function is being renamed to parMap in 2.x.x",
ReplaceWith(
"parMap(Dispatchers.Default, n, f)",
"arrow.fx.coroutines.parMap",
"kotlinx.coroutines.Dispatchers"
)
)
public suspend fun <A, B> Iterable<A>.parTraverseN(n: Int, f: suspend CoroutineScope.(A) -> B): List<B> =
parTraverseN(Dispatchers.Default, n, f)
parMap(Dispatchers.Default, n, f)

/**
* Traverses this [Iterable] and runs [f] in [n] parallel operations on [ctx].
Expand All @@ -136,16 +193,18 @@ public suspend fun <A, B> Iterable<A>.parTraverseN(n: Int, f: suspend CoroutineS
*
* Cancelling this operation cancels all running tasks.
*/
@Deprecated(
"Function is being renamed to parMap in 2.x.x",
ReplaceWith(
"parMap(ctx, n, f)",
"arrow.fx.coroutines.parMap"
)
)
public suspend fun <A, B> Iterable<A>.parTraverseN(
ctx: CoroutineContext = EmptyCoroutineContext,
n: Int,
f: suspend CoroutineScope.(A) -> B
): List<B> {
val s = Semaphore(n)
return parTraverse(ctx) { a ->
s.withPermit { f(a) }
}
}
): List<B> = parMap(ctx, n, f)

/**
* Traverses this [Iterable] and runs all mappers [f] on [Dispatchers.Default].
Expand All @@ -169,8 +228,16 @@ public suspend fun <A, B> Iterable<A>.parTraverseN(
* ```
* <!--- KNIT example-partraverse-03.kt -->
*/
@Deprecated(
"Function is being renamed to parMap in 2.x.x",
ReplaceWith(
"parMap(Dispatchers.Default, f)",
"arrow.fx.coroutines.parMap",
"kotlinx.coroutines.Dispatchers"
)
)
public suspend fun <A, B> Iterable<A>.parTraverse(f: suspend CoroutineScope.(A) -> B): List<B> =
parTraverse(Dispatchers.Default, f)
parMap(Dispatchers.Default, f)

/**
* Traverses this [Iterable] and runs all mappers [f] on [CoroutineContext].
Expand Down Expand Up @@ -200,9 +267,11 @@ public suspend fun <A, B> Iterable<A>.parTraverse(f: suspend CoroutineScope.(A)
* ```
* <!--- KNIT example-partraverse-04.kt -->
*/
@Deprecated(
"Function is being renamed to parMap in 2.x.x",
ReplaceWith("parMap(ctx, f)", "arrow.fx.coroutines.parMap")
)
public suspend fun <A, B> Iterable<A>.parTraverse(
ctx: CoroutineContext = EmptyCoroutineContext,
f: suspend CoroutineScope.(A) -> B
): List<B> = coroutineScope {
map { async(ctx) { f.invoke(this, it) } }.awaitAll()
}
): List<B> = parMap(ctx, f)
Loading