Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parMapNotNull #3110

Merged
merged 6 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ public final class arrow/fx/coroutines/ParMapKt {
public static final fun parMap (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun parMap$default (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun parMap$default (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun parMapNotNull (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun parMapNotNull (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun parMapNotNull$default (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun parMapNotNull$default (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun parMapOrAccumulate (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun parMapOrAccumulate (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun parMapOrAccumulate (Ljava/lang/Iterable;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ public suspend fun <A, B> Iterable<A>.parMap(
map { async(context) { transform.invoke(this, it) } }.awaitAll()
}

public suspend fun <A, B> Iterable<A>.parMapNotNull(
context: CoroutineContext = EmptyCoroutineContext,
concurrency: Int,
transform: suspend CoroutineScope.(A) -> B?
): List<B> =
parMap(context, concurrency, transform).filterNotNull()

public suspend fun <A, B> Iterable<A>.parMapNotNull(
context: CoroutineContext = EmptyCoroutineContext,
transform: suspend CoroutineScope.(A) -> B?
): List<B> =
parMap(context, transform).filterNotNull()

/** Temporary intersection type, until we have context receivers */
public class ScopedRaiseAccumulate<Error>(
raise: Raise<NonEmptyList<Error>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,75 @@ class ParMapTest : StringSpec({
} shouldBe NonEmptyList(e, (1 until 100).map { e }).left()
}
}

"parMapNotNull is stack-safe" {
val count = 20_000
val ref = Atomic(0)
(0 until count).parMapNotNull { _: Int ->
ref.update { it + 1 }
}
ref.get() shouldBe count
}

"parMapNotNull runs in parallel" {
val promiseA = CompletableDeferred<Unit>()
val promiseB = CompletableDeferred<Unit>()
val promiseC = CompletableDeferred<Unit>()

listOf(
suspend {
promiseA.await()
promiseC.complete(Unit)
},
suspend {
promiseB.await()
promiseA.complete(Unit)
},
suspend {
promiseB.complete(Unit)
promiseC.await()
}
).parMapNotNull { it.invoke() }
}

"parMapNotNull results in the correct error" {
checkAll(
Arb.int(min = 10, max = 20),
Arb.int(min = 1, max = 9),
Arb.throwable()
) { n, killOn, e ->
Either.catch {
(0 until n).parMapNotNull { i ->
if (i == killOn) throw e else Unit
}
} should leftException(e)
}
}

"parMapNotNull(concurrency = 1) only runs one task at a time" {
val promiseA = CompletableDeferred<Unit>()

withTimeoutOrNull(100.milliseconds) {
listOf(
suspend { promiseA.await() },
suspend { promiseA.complete(Unit) }
).parMapNotNull(concurrency = 1) { it.invoke() }
} shouldBe null
}

"parMapNotNull discards nulls" {
(0 until 100).parMapNotNull { _ ->
null
} shouldBe emptyList()
}

"parMapNotNull retains non-nulls" {
checkAll(Arb.int()) { i ->
(0 until 100).parMapNotNull { _ ->
i
} shouldBe List(100) { i }
}
}
})

private val emptyError: (Nothing, Nothing) -> Nothing =
Expand Down