-
Notifications
You must be signed in to change notification settings - Fork 604
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New concurrency primitives #1006
New concurrency primitives #1006
Conversation
private val state = new AtomicReference[Ref.State[A]] | ||
state.set(new Ref.State[A](null, LinkedMap.empty, 0)) | ||
|
||
//TODO in the old implementation Read and Nevermind submit the continuation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likely an oversight / bug.
override def setSyncPure(a: A): F[Unit] = | ||
F.async[Unit](cb => actor ! Msg.Set(a, () => cb(Right(())))) *> F.shift | ||
F.async[Unit](cb => set(a, () => cb(Right(())))) *> F.shift |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I just added this shift when doing the SyncRef
PR. Without it a test was hanging -- I think because the subsequent actions flatmapped on to the returned F[Unit]
were getting run on the actor thread (based on where the callback was being invoked). Perhaps this isn't necessary if we are consistent in only ever invoking callbacks from within an EC.
Overall I really like the idea of getting rid of I like the idea of getting rid of forking. Let's do all that stuff at the call site. Some miscellaneous thoughts on refs:
This leaves me wondering if we should rename this actor-less ref |
just an initial comment. I really think we can do most of the complexity away by defining UnsetRef in terms of SyncRef. I have a feeling that 1:1 implementation with Actor variant won't work nicely here. |
Ah @mpilquist excellent idea. with Ref/MVar I really like it. Lets do it! |
Also I think we can remove |
How so? Do you mean moving |
One thing I believe this makes possible is moving the EC and the cats-effect constraint at call site. Ah, you said so already, github race condition |
@mpilquist exactly. You can then use MVar/Ref as boundary between two concurrent programs in |
Yep, agreed then assuming we can make it work :) |
I think the only places where we need that |
Sounds like this is worth pursuing, so I'll keep going at it :) |
Yeah agreed -- I'm not as sure about @SystemFw Looking forward to what you come up with. :) |
Yeah I'm not even sure that an unsettable MVar carries its weight tbh. In haskell that makes it quite deadlock prone. But definitely we can improve Ref and move EC down for a start. |
@SystemFw I would start with mVar to just have read, and perhaps not implement take at first. I agree that |
|
@SystemFw up to you :-) |
So a couple of the things we might want to do, e.g. remove On another note, one thing we might do is have Ref track its state (Set or Unset). When Unset the implementation will stay as it is now (or close), but when Set it could delegate to a SyncRef. That way it would be "slow" on the blocking get and first set/modify, and faster afterwards. I'd have to think what use cases will actually benefit from this though, and if it can actually be made to work. Thoughts on either paragraph? |
Do you mean I thought about swapping out for a |
Yeah you'd have to dispatch. I only want to add it (since it's going to complicate things) if we can show it's convenient from a performance perspective. |
@SystemFw just a side note:-). I really liked @mpilquist idea to put any unset / async behaviour behind mVar, and leave Ref essentially just synchronous with forking. I wouldn't complicate Ref with any of the |
@pchlupacek I'm not sure whether you mean having three types (SyncRef, Ref, and MVar), or just renaming |
@SystemFw SyncRef => Ref, MVar as completely new type. Sorry to be not clear. |
Yeah I'm not against that. I'll see how far I go along modifying the current Ref, then tackle that. |
Yeah I'm not stuck on ability to unset an |
Just realised we are going in the direction of having IORef and MVar a la haskell |
Yep - not unintentional on my part. :) |
yeah, thats what it is exactly :-) |
d321d01
to
5eefcc1
Compare
e0e9f2c
to
a2489d9
Compare
0a5203c
to
7f6a305
Compare
Only exception being in Queue
} | ||
|
||
ref.modify2 { | ||
case s @ State.Set(_) => s -> F.unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be F.raiseError
instead? This way a second set is silently ignored. That way it would at least complain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for raising error violating the semantics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mpilquist said he prefers not failing on double set. I have added a trySetSync
for cases in which you care (atm, it's just AsyncPull.race, which I will refactor a bit in a separate PR).
I don't have a strong opinion on this, I can see arguments for both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion, silently ignoring subsequent sets is the worst of both worlds. (The 2 worlds being: (1) working mutiple sets and (2) first set succeeds and subsequent sets fail.) If this behavior remains, I'd recommend renaming it to complete
or even tryComplete
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be inclined to agree (but I do want set-once semantics). Let's see what Michael thinks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with set-once semantics. I don't have strong opinions on how subsequent sets are signaled. I didn't want them to fail b/c I thought it would be error prone in certain usage patterns but I suppose folks can always call .attempt and explicitly ignore the error if it's appropriate for their use case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will change that F.unit
to F.raiseError
Does anyone know why the old EDIT: nope, the reason is explained in #1009 . So these two will go back to shared as they don't have property based tests in them. |
Re: why single set semantics for Promise. Having multiple The key realisation however, is that this doesn't buy you much, really. To build the vast majority of concurrent structures and combinators, you still need multiple MVars, i.e. MVar is a building block at the same level of abstraction of Promise, except with more complicated semantics, and a greater chance of deadlock due to multiple points in which blocking can happen. Promise achieves the same in a simpler, easier-to-get-right way. OldRef allowed multiple Now the remaining question is what to do if a user tries to
Note that scenarios where I don't have a strong opinion on this latter point, but at the moment I'm edging towards having a single method for all cases, which fails when called on a Promise that's already Set |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @mpilquist so that the second set could be handled in attempt. In fact I would propose that return type to be explicitly suggesting that as F[Either[Throwable, Unit]] instead just F[Unit] so the user is forced to resolve that case.
t0 <- self.cancellableGet.run | ||
(a, cancelA) = t0 | ||
t1 <- b.cancellableGet.run | ||
(b, cancelB) = t1 | ||
fa = a.run.map(Left(_): Either[A, B]) | ||
fb = b.run.map(Right(_): Either[A, B]) | ||
_ <- async.fork(fa.attempt.flatMap(ref.setAsyncPure)) | ||
_ <- async.fork(fb.attempt.flatMap(ref.setAsyncPure)) | ||
_ <- async.fork(fa.attempt.flatMap(x => promise.setSync(x))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we get rid of x
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I left it there because I already have code to make this function simpler, exploiting the fact that set
will fail on second set (and that needs the x
)
// and the fact that suspend is roughly 2x faster than delay flatMap | ||
def get: F[A] = F.suspend { | ||
val id = new Token | ||
getOrWait(id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just getOrWait(new Token)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new Token
is a side effect in this case, because Token
s are compared with reference equality, so I think that suspend
it's needed
} | ||
|
||
ref.modify2 { | ||
case s @ State.Set(_) => s -> F.unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for raising error violating the semantics
// | ||
// NOTE: this differs in behaviour from the old Ref in that by the time readers are notified | ||
// the new value is already guaranteed to be in place. | ||
def setSync(a: A): F[Unit] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to trySet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sprry set
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had renamed it to set
, then changed my mind. set: F[Unit]
does not have enough information to tell you if the operation is synchronous or asynchronous, especially because it's now up to the user to fork
to get async behaviour. setSync
is more informative in this respect, and doesn't force you to go read the scaladoc. Not too hung up on this though
// NOTE: this differs in behaviour from the old Ref in that by the time readers are notified | ||
// the new value is already guaranteed to be in place. | ||
def setSync(a: A): F[Unit] = { | ||
def notifyReaders(r: State.Unset[A]): Unit = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this private so trySet and set may share this implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking trySet
can go altogether if we raiseError
on second set
(which I will do, I think), since one can use handleError
or attempt
|
||
/** Like [[get]] but returns an `F[Unit]` that can be used to cancel the subscription. */ | ||
def cancellableGet: F[(F[A], F[Unit])] = F.delay { | ||
val id = new Token |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really think it is any longer necessary to wrap this in delay, instead I would suggest just to return tuple (F[A], F[Unit])
there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see reply above. new Token
is a side effect, due to the use of reference equality for Token
|
||
override def get: F[A] = F.flatMap(F.delay(new MsgId)) { mid => F.map(getStamped(mid))(_._1) } | ||
def access: F[(A, A => F[Boolean])] = F.delay { | ||
def snapshot = ar.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think snapshot must be val for this to work correctly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mm it works regardless since snapshot
is evaluated when I put it into the tuple, but val
is clearer in any case, so I'll change it to that. Good catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah no, you're right!
get
in access
and snapshot
is different. Great catch then!
(Not used to side effects anymore :P )
@@ -114,8 +111,8 @@ package object async { | |||
* nowhere. | |||
*/ | |||
def race[F[_]: Effect, A, B](fa: F[A], fb: F[B])(implicit ec: ExecutionContext): F[Either[A, B]] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the either signature, but couldn't be the => F[C] just replaced by flatMap ?
@@ -47,8 +47,8 @@ abstract class Signal[F[_], A] extends immutable.Signal[F, A] { self => | |||
def get: F[B] = self.get.map(f) | |||
def set(b: B): F[Unit] = self.set(g(b)) | |||
def refresh: F[Unit] = self.refresh | |||
def modify(bb: B => B): F[Change[B]] = modify2( b => (bb(b),()) ).map(_._1) | |||
def modify2[B2](bb: B => (B,B2)):F[(Change[B], B2)] = | |||
def modify(bb: B => B): F[Ref.Change[B]] = modify2( b => (bb(b),()) ).map(_._1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need to prefix this with Ref? How about moving Change to top-level fs2? We did similar with Scope, and I thing Change i used much more often.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a slight preference (in general) for keeping the top-level small but I can definitely move Change back to where it was if you prefer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we end up adding MVar
, we'll need a top-level Change
I think. If not, I'd rather keep it contained in Ref
companion to keep top level concepts smaller.
@@ -1,65 +1,65 @@ | |||
package fs2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is gonna be implemented?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I already have that code, I'll push shortly
Replied to review comments, except
I agree with raising error on second set, but not with an explicit Either I don't think. The vast majority of code already calls |
@SystemFw re the Either[Throwable,Unit] signature. I did that for scope too, and in fact was only way to discover few subtle bugs I was trying to catch. I really think we shall do this there, as finding the double sets will be really hard. |
I don't understand why this is though. With a double set your task will fail with an exception telling you "you have tried to set this to 1, but it was already set to 2", and you won't be forced to do In any case I'll do the rest of the work and we can then return on this, it's just an |
0206f36
to
6d5f172
Compare
@mpilquist @pchlupacek I think this PR is now in a mergeable state, bar the final minor decisions over the exact naming and type signatures of some operations. Please note that I've left a few things to separate PRs:
Let me know what you think |
This is a first sketch of a Ref that doesn't use Actor. I'm only running a very very simple benchmark, which shows performance to be identical to the Actor version. I think it can be a good starting point, so I reckon having a PR will facilitate discussion.This PR introduces a new concurrency scheme based on
Ref
+Promise
.It offers a more orthogonal api which separates concurrent state and synchronisation, a simpler implementation (no Actor), and greatly improved performance (on a ballpark benchmark).
Fixes #993