-
Notifications
You must be signed in to change notification settings - Fork 529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Harden Queue#take
's cancelation properties
#2920
Comments
Should the guarantee be that code like this never loses the item: val tsk = IO.uncancelable { poll =>
poll(q.take).flatTap { taken =>
// save taken item somewhere
}
} So even if It seems that currently code like this can indeed lose the item. I think (one possible) reason is this line in val cleanup = state.update { s => s.copy(takers = s.takers.filter(_ ne taker)) } This runs when cancelling |
Okay, so "the whole thing must be uncancelable aside from an atomic poll" seems to be already true, if I'm reading it correctly. It is also not enough. I think what would need to happen in this case is that when the fiber running Or, alternatively, |
Going to write some more here in a bit, but to be extremely concrete, this is the test which I'm pretty sure fails right now and what I feel like we need to pass: "never lose data" in {
// this is a race condition test so we iterate
val test = 0.until(10000).toList traverse_ { _ =>
for {
q <- Queue.bounded[IO, Int](100)
_ <- 0.until(100).toList.traverse_(q.offer(_))
latch <- IO.deferred[Unit]
backR <- IO.ref(Vector[Int]())
fiber <- {
val action = for {
// take half of the queue's contents
front <- 0.until(50).toVector.traverse(_ => q.take)
_ <- backR.set(front)
// release the canceler to race with us
_ <- latch.complete(())
// take the other half of the contents with atomic writing
_ <- 50.until(100).toVector traverse { i =>
IO uncancelable { poll =>
// if data is lost, it would need to manifest here
// specifically, take would claim a value but flatMap wouldn't run
poll(q.take).flatMap(a => backR.update(_ :+ a))
}
}
} yield ()
action.start
}
_ <- latch.get
_ <- fiber.cancel
// grab whatever is left in the queue
remainderOpt <- q.tryTakeN(None)
remainderVec = remainderOpt.getOrElse(Nil).toVector
_ <- backR.update(_ ++ remainderVec)
// if we lost data, we'll be missing a value in backR
results <- backR.get
_ <- IO(results must contain(0.until(100)))
} yield ()
}
test.as(ok)
} |
Yeah, okay, I believe my pseudocode is the same as the important part of that test. And I can confirm, that an item can become "lost" that way. I think this is a really difficult problem. I try to describe it with an example:
I think what could happen (and what causes the item to be lost) is this:
After 2., the item |
Okay, philosophy time. Proactively paging @SystemFw for rebuttal. In a MPMC queue, ordering and fairness are not properties which are strongly defined. Ordering with respect to what? If you have two producers which generate values A and B close together in time, then it's really unclear which one is actually "first". Of course, the queue data structure forces us to pick a winner here, so we say that one of them is "first", but is it really? It is not. And note that this applies even with MPSC. The same argument can be made with SPMC. A single producer generates A and B while two consumers are waiting. Who's to say which consumer is first to the value? When parallel events are close together in time, ordering is not even a meaningful thing to discuss. Note that the pathologies with SPMC only occur when multiple events are being produced close together in time; singular events do not have this issue. Taking a step even further back, all of this stuff is based on So having established that concurrent ordering isn't an absolute, let's look at the broader picture a bit. "At least once" semantics for Conversely, if we strengthen I'm pretty convinced that the only useful semantic here is "exactly once" and we really have to maintain it, even when it means corrupting fairness or ordering, so long as fairness and ordering are preserved in sparser temporal sequencing cases, and relative partial ordering within a consumer is guaranteed, and both of these properties are things that we can do. Speaking more concretely… We can achieve these properties by changing the |
I've been thinking about this for some days actually, and I'm almost convinced :) , especially if the argument is made in general, and not just for the purpose of implementing
is a very strong argument, and in some ways similar to the argument I made for
I'm also generally a fan of this strategy, I used it for With that being said though, I want the tradeoff we're making to be absolutely clear, cause I think the argument somewhat brushes over it:
I think ordering comes into account in two ways: 1) there is no guarantee of ordering, but there is close to infinite probability that there is indeed an order, and 2) there is a guaranteed happens-before relationship. If you have neither, then you can say "order is not meaningful", if you do, I feel the argument is weaker. Queue[IO, Int].unbounded.flatMap { q =>
q.take.start >>
IO.sleep(2.days) >>
q.take.start >>
IO.sleep(2.days)
q.offer(1) >>
q.offer(2)
} now, if we describe ordering as "in which order should elements be processed", then I agree that the very notion of *PMC queues is against it: because of concurrent consumers, you've already relinquished order of processing. If we instead define ordering as "who should get which element" or indeed "which producer is first to the value", then I think in that example it's pretty obvious what the ordering should be: the consumers fall into the "infinite probability" case, and producer falls into the happens-before case. The proposed strategy breaks this latter notion of ordering if you are unlucky with your races (read this for a worked out example) If everyone is onboard with this limitation, I'll be onboard as well since I can see the general argument for it |
Question: is there no relevant prior art on this issue? If not, I assume it's because the CE cancellation model is so original? Like, what does everyone else do 😅 |
This is a great summary, and it goes directly to where I think the distinction in definitions lies. I would define ordering as the following pair of things:
These are both, fundamentally, the same problem. In a sense, the registration of consumers is effectively like producing a listener message back to the producers, in the same way that the production of elements is producing an element forward to the consumers. Thus, I believe the guarantees on both should be symmetrical. In a MP queue scenario, ordering on element production is degraded. In a MC queue scenario, ordering on consumer registration is degraded. (also, I say "degraded" and not "lost" because, as you noted, the probability of strong ordering is asymptotic as relative temporal delta increases) So to that end, the limitation here is that, in any MC scenario, we cannot guarantee strong wake-up ordering between consumers in closely interspersed events, just as in any MP scenario we cannot guarantee strong ordering between elements in closely interspersed events. I believe that limitation is acceptable. |
Regarding decreasing fairness: in my opinion, it's probably fine. As far as I understand #2885, actual FIFO semantics are not broken. An intuitive "fairness" is decreased (hopefully only slightly). FIFO semantics are not broken, because registration is unobservable in the current queue API. So in this example: q.take.start >>
IO.sleep(x) >>
q.take.start The two An idea: to (probably) further descrease this fairness, and (maybe) increase performance, the queue in #2885 could use concurrent bags instread of queues for the waiters (but not for |
take
must have the property such that it either does not remove the element, or it removes the element and returns it, even when canceled. Thus the whole thing must be uncancelable aside from an atomicpoll
(when semantic blocking is needed). Without this property, it is possible in rare race conditions to cancel atake
and lose data.Applies to both the existing
Concurrent
queue and the newAsync
queue in #2885The text was updated successfully, but these errors were encountered: