Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Add KotlinX Coroutines as base dependency to Arrow Fx Coroutines #317

Merged
merged 39 commits into from
Dec 11, 2020

Conversation

nomisRev
Copy link
Member

Currently in the eco-system we see that KotlinX Coroutines is hardcoded everywhere in frameworks. Spring, Ktor, Android, SqlDelight, etc and thus in all places where we care or want to grow. Since suspend is very implicit, in the sense that you can bind directly in the environment it’s important you mix the environments correctly. That’s something we currently cannot do without depending on KotlinX Coroutines, so we do so through a separate artifact and the responsibility lies with the user to do it correctly. This inevitably leads to a lot of strange bugs, which we’ve already seen on Kotlin Slack.

This can all be solved by depending on KotlinX Coroutines since we then can guarantee that the effect enviroments are correctly mixed, and thus no such strange bugs can occur in the user code.

Additionally, the maintenance and future development costs is drastically reduced since we don’t have to build or maintain interopt modules such as JDK8, Reactive Streams, RxJava, Reactor, etc since those live in the KotlinX Coroutines space. And in some areas such as generated code it’s even impossible for us to build interopt such as Spring which relies on generated code, or SqlDelight and perhaps Compose.

As mentioned above all the frameworks we care about already relied on KotlinX Coroutines so this is on everyones classpath anyway already.

This makes Arrow Fx Coroutines the companion library to KotlinX Coroutines for functional operators, such as Arrow Core is a companion to Kotlin’s standard library.

TODO:

  • Flakyness in concurrently Stream operator

@nomisRev nomisRev requested a review from a team December 10, 2020 10:56
Copy link
Member

@raulraja raulraja left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great improvement @nomisRev Kudos. This gives us a better maintenance path forward and much easier for users where it all works drop-in with KotlinX 👏

iterations: Int,
map: (Int) -> A,
cb: suspend (A) -> Unit,
onEnd: suspend () -> Unit = { }
): Unit = suspend {
): Unit {
var i = 0
arrow.fx.coroutines.repeat(Schedule.recurs(iterations)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we are gonna be fully integrating with kotlinx we should perhaps consider renaming repeat or retyping the function in some way so that we don't have name collisions like this one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it could be a method on Schedule itself.

Schedule.recurs(iterations).repeat {
  ...
}

Only annoying is that in compositions you'd need to add parenthesis around it.

(Schedule.recurs(iterations) zipRight Schedule.identity()).repeat { ... } it might also worsen type inference since now repeat can infer which type Schedule has to be based on the return type of lambda. Which is not possible in the latter case, right? 🤔

uCont.resumeWith(usedAndReleasedOrSuspended as Result<B>)
}
}, { e -> uCont.resumeWith(Result.failure(e)) })
val res = try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was the right decision for maintenance and user happiness 👌

@@ -42,6 +42,8 @@ import kotlinx.atomicfu.atomic
* > Note that this only works if all operations over the [ConcurrentVar] follow the pattern of first taking and then putting back both exactly once and in order.
* Or use the helpers to also be safe in case of exceptions and cancellation.
*/
// TODO benchmark against Channel(1), if Channel(1) is faster deprecate ConcurrentVar.
// Channel(1) uses locks.
interface ConcurrentVar<A> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I'm wondering what each FP abstraction maps to in KotlinX names xD

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConcurrentVar is also the same as a bounded Queue of size. So this type was already slightly redundant 😅

null -> throw ArrowInternalException("$ArrowExceptionMessage\nSuspend execution should yield a valid result")
}
}
internal fun <A> unsafeRunSync(f: suspend () -> A): A =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also deprecate this and just let people use runBlocking as unsafe method if they wish too? I think most of the users know this by now/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this has always been internal but that's why I @Deprecated Enviroment type since you should just use structured concurrency to run unsafely side-effects from Java or runBlocking to run from main or similar.

@raulraja raulraja requested a review from a team December 10, 2020 11:35
Copy link
Member

@rachelcarmena rachelcarmena left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nomisRev 🙌

@raulraja
Copy link
Member

Thanks @nomisRev !

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants