-
Notifications
You must be signed in to change notification settings - Fork 605
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/merge interrupt #1061
Feature/merge interrupt #1061
Conversation
…/fs2 into feature/interrupt-with-algebra
That test looks over-specified to me. |
.eval(async.unboundedQueue[F, Option[(F[Unit], Chunk[O2])]]) | ||
.flatMap { outQ => // note that the queue actually contains up to 2 max elements thanks to semaphores guarding each side. | ||
def runUpstream(s: Stream[F, O2], interrupt: Promise[F, Unit]): F[Unit] = | ||
async.semaphore(1).flatMap { quard => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/quard/guard/
.flatMap { outQ => // note that the queue actually contains up to 2 max elements thanks to semaphores guarding each side. | ||
def runUpstream(s: Stream[F, O2], interrupt: Promise[F, Unit]): F[Unit] = | ||
async.semaphore(1).flatMap { quard => | ||
s.chunks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why operate in chunks instead of segments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a) Segment cannot be tested for emptiness
b) I want the cost of processing of the Segment
to be done concurrently between s1 and s2 instead on resulting stream, essentially utilising more concurrency.
s.chunks | ||
.interruptWhen(interrupt.get.attempt) | ||
.evalMap { chunk => | ||
if (chunk.isEmpty) F.pure(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why elide empty chunks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't do it, stream may in fact hang with exception unable to propagate it to downstream. I had new merge (that I have added) tests failing w/o this, as some of the Failed
Stream examples seems to emit empty chunk and then failure. I don't think so this does any harm, because if you care about empty chunks, you could always do s1.chunk.merge(s2.chunk) and they won't be elided in that case. For the Stream[F,A] I think use doesn't care about empty chunks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, this makes me thing there's something subtly wrong. We don't elide empty chunks and we use segments in join
-- why would merge
be different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I'll investigate this again and see exact cause of this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I remmebered the scenario why we have to drain empty chunks. Essentially this comes from the following spec :
(s1: PureStream[Int], f: Failure) =>
an[Err.type] should be thrownBy {
s1.get
.merge(f.get)
.flatMap { _ =>
Stream.eval(IO.async[Unit](_ => ()))
}
.compile
.drain
.unsafeRunSync()
}
if f
stream is structure of Stream.emits() ++ Stream.fail
or similar, the stream will not fail unless that empty chunk get processed. That may not be a big deal for that example code, but Empty chunks may not be that explicit as you see there, they in fact may be a result of stream like Stream.emit(42).map(_ => throw Err))
where the exception appeared, was thrown but until the empty chunk preceding the failure processing(`Algebra.raiseError())is processed, stream won't fail.
I think the join
may perhaps need similar treatment, hence I didn't really wrote tests of its behaviour on blocking flatMap
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, yes, no problem with that although, still, b/c if you look on Failure examples do you think user will have an idea there are empty chunks emitted before failure ? I mean how he should know in case of deadlocks that this filter
has to be added for the combinator to work as expected? Again just reassuring myself that your suggested path is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's nothing particularly special about empty chunks leading to deadlocks due to the shape of the stream after merge. You could construct an example that deadlocks on singleton chunks or any other shape.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok well, lets start with this and well see how that goes. In fact, the effects shall be run by concurrently anyhow where that should not be an issue at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mpilquist seems that only sink
tests are affected when I change this. So, I gonna revert it back w/o emptiness guard (I still leave this as chunk for (b) reason mentioned above). ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be at segment level -- you can always resegment/rechunk to get more concurrency out of it if workload calls for it.
Re the failing test, I am more concerned how it shall behave to me. IMHO the observe shall not somehow automagically swallow failures in sink, as it does today. I think this may be failure (async/sync consistency) in diamond that manifests itself now. |
I'm fine with removing |
@mpilquist perhaps I don't understand the contract, but how if observer is a => Stream.raiseError, can terminate with something else than left and exactly once? I mean it fails once and then just emits inputs? |
It will terminate with a |
Ah ok, I see now. I also see now why sync/async behaves differently. So I will think how to fix that and update this PR. Thanks. |
@pchlupacek I fixed the 2 merge tests that were failing (they just needed a Looks like there's still an intermittent failure in |
@mpilquist I have been investigating failures and there seems to be something odd with concurrently or at least how the sink is implemented. I have no solution neither cause identified yet. |
Okay, I'll take a look at that test and see if I can spot anything. I'll open add the failing case to #1063 too. |
merge is now implemented w/o unconsAsync :
race
combinator required, except the one resulting frominterrupt
in streamss1.concurrently(s2)
, howeverconcurrently
has little less ovrhead.There is spurious failure of
Pipe.handle errors from observing sink
test. I am not sure if thesink
behaviour expected in test is correct. I feel very weird, that when sink is about to fail for eachO
it will just to be stopped to be observed for nextO
. I am perhaps missing something here @mpilquist any thoughts?