-
Notifications
You must be signed in to change notification settings - Fork 521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallel[IO, IO.Par] implementation #115
Conversation
@@ -0,0 +1,102 @@ | |||
/* | |||
* Copyright 2017 Typelevel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2018
case error @ Left(_) => | ||
cb(error) | ||
rb match { | ||
case Left(error2) => throw error2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this make it impossible to handle any errors in fb
? Won't this simply kill a thread just because fb
fails?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, was thinking that if the second error happens, then it needs to be handled somehow and we don't have a reporter.
Is ignoring this second error the right approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe a printStackTrace()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure. If f
would have type (Attempt[A], Attempt[B]) => C
, it could handle both errors. But that probably won't work with Parallel
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been thinking about it and I think the right approach is to wrap both errors into a CompositeException
and throw that one instead.
I've had this approach in Monix in cases where multiple operations have to be executed without interruption even if any of them fails — collecting thrown exceptions into a list, then wrapping that into a CompositeException
and throwing that.
fb.unsafeRunAsync { attemptB => | ||
// Using Java 8 platform intrinsics | ||
state.getAndSet(Right(attemptB)) match { | ||
case null => () // wait for B |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait for A
?
PR is ready, asking for feedback. |
* caught from evaluating multiple independent IO actions | ||
* and that need to be signaled together. | ||
*/ | ||
class CompositeException(val errors: List[Throwable]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, in FS2 we have this same type but we require 2+ throwables in order to construct it. https://github.com/functional-streams-for-scala/fs2/blob/180b811083296762a2270afcef38d7fe4b278a1a/core/shared/src/main/scala/fs2/CompositeFailure.scala
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 that's a good idea — might do that here as well.
Any reasonably sized project that cares about exception handling has this type 😜
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm in favor something stronger than List for CompositeException, otherwise I think this looks good. Questions are to ensure I understand behavior correctly.
case Right(b) => | ||
cb(try Right(f(a, b)) catch { case NonFatal(e) => Left(e) }) | ||
case error @ Left(_) => | ||
cb(error.asInstanceOf[Left[Throwable, C]]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this asInstanceOf
necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I want to reuse that value, as I know it's correct, but the compiler isn't smart enough 🙃
[error] cats-effect/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala:48:26: type mismatch;
[error] found : scala.util.Left[Throwable,B]
[error] required: Either[Throwable,C]
[error] cb(error)
[error] ^
state.getAndSet(Left(attemptA)) match { | ||
case null => () // wait for B | ||
case Right(attemptB) => complete(attemptA, attemptB) | ||
case left => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These blocks are due to Either[Either[Throwable, A], Either[Throwable, B]]
as we expect asynchronously to expect both values, if we already have a State of an A
set it should be impossible to receive another?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So these cannot happen if the IO.async
callback contract is respected, if that callback gets called at most once.
But the protocol can get violated, as the type system can't prevent it. Well, not by users because they are protected by that callback wrapper (IOPlatform.onceOnly
) injected in IO.async
. But if you workaround the library's encapsulation, or by mistakes from the library authors, then it's possible.
case Right(a) => | ||
rb match { | ||
case Right(b) => | ||
cb(try Right(f(a, b)) catch { case NonFatal(e) => Left(e) }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what restores the error handling behavior, as our case NonFatal(e) => Left(e)
enables the recovery from the eventual possible failure conditions within IO.async
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO I wouldn't leave IO.async
to catch exceptions. Currently the exception handling is reliable due to being protected by IOPlatform.onlyOnce
, but that requires extra synchronization.
In Monix there is no onlyOnce
backed by an AtomicReference
, that check being only a plain variable. And in this case what happens is that the error simply gets reported with the provided Scheduler
and thus it can't be recovered. From a usability perspective I find that reasonable, as the user is given a callback and you can't expect a Task
to complete without ensuring that the callback gets called.
Also in this particular case that error is triggered asynchronously. It might happen from another thread, depending on the IO
values it evaluated. So it won't be caught by IO.async
.
private[effect] final class TrampolineEC private (underlying: ExecutionContext) | ||
extends ExecutionContext { | ||
|
||
// Starts with `null`! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you comment why vs justing using Nil
? I guess you are using null
to signal that you are not in a localRunLoop. Is that right? Can you comment?
Could it be NonEmptyList[Runnable]
I wonder, and then use the null
trick on that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's the difference between null
and Nil
, null
signaling that we aren't in a run-loop and Nil
signaling that a Runnable
is in progress. This optimizes for the shallow case, as the first Runnable
being executed doesn't get stored and retrieved from that List
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this isn't so obvious, guess at least a code comment is needed.
Status — having problems with the |
Codecov Report
@@ Coverage Diff @@
## master #115 +/- ##
==========================================
+ Coverage 88.49% 89.36% +0.87%
==========================================
Files 20 23 +3
Lines 452 489 +37
Branches 41 36 -5
==========================================
+ Hits 400 437 +37
Misses 52 52 |
@mpilquist @durban I have now modified Only difference is that I do prefer |
I have now fixed the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 on green. Since travis is misbehaving, harder to tell.
Fixes #83
This adds:
internals.TrampolineEC
: an internalExecutionContext
that's based on a trampoline, for "light async boundaries" in order to prevent stack overflowparMap2
, reviewIOParMap.scala
newtype
calledIO.Par
, with an encoding inspired by alexknvl/newtypesParallel[IO, IO.Par]
andApplicative[IO.Par]
implementationsAs a matter of implementation detail:
AtomicReference
getAndSet
, which in terms of performance is pretty good (versuscompareAndSet
) because it uses platform intrinsics on Java 8TrampolineEC
)Waiting for the other to finish, even in the face of error, is necessary in order to avoid leaks.
Consider this:
If any of these
IO
values finish in error, if we don't wait for the other one, we've got ourselves a memory leak because the loop wouldn't back-pressure and it wouldn't cancel the other one. By waiting on both we avoid memory leaks and is the way to make this safe forIO
.Speaking of
TrampolineEC
, this raises the question — why isn'tIO.async
stack safe? Because we can definitely do it, noimplicit ExecutionContext
needed 😉