-
Notifications
You must be signed in to change notification settings - Fork 605
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Topic/interrupt scope #1019
Topic/interrupt scope #1019
Conversation
+ simplified concurrently definition with new Promise
@@ -114,32 +130,45 @@ private[fs2] object Algebra { | |||
, g: (B, O) => B | |||
, v: FreeC.ViewL[Algebra[F,O,?], Option[(Segment[O,Unit], FreeC[Algebra[F,O,?],Unit])]] | |||
)(implicit F: Sync[F]): F[B] = { | |||
|
|||
// println(s"RFL(${scope.id}): ${v.get}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove
haltWhenTrue.noneTerminate.either(self.noneTerminate). | ||
takeWhile(_.fold(halt => halt.map(!_).getOrElse(false), o => o.isDefined)). | ||
collect { case Right(Some(i)) => i } | ||
def interruptWhen(haltWhenTrue: Stream[F,Boolean])(implicit F: Effect[F], ec: ExecutionContext): Stream[F,O] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a similar refactoring for pauseWhen
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to go over all nondet combinators when that is done and clean them up. I think we may need to touch mergeXX as well. But prefferably I would like to do this as separate pr ?
@@ -175,7 +202,8 @@ private[fs2] object Algebra { | |||
} | |||
|
|||
case o: Algebra.OpenScope[F,_] => | |||
F.flatMap(scope.open) { innerScope => | |||
F.flatMap(scope.open(o.interruptible)) { innerScope => | |||
//println(s"RFL(${scope.id}): --- OPEN SCOPE -- ${innerScope.id} interruptible: ${innerScope.interruptible.nonEmpty}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove
@@ -88,11 +97,12 @@ private[internal] final class RunFoldScope[F[_]] private (val id: Token, private | |||
* If this scope is currently closed, then the child scope is opened on the first | |||
* open ancestor of this scope. | |||
*/ | |||
def open: F[RunFoldScope[F]] = { | |||
def open(interruptible: Option[(Effect[F], ExecutionContext, Promise[F, Throwable])]): F[RunFoldScope[F]] = { | |||
// println(s"OPENING new scope at $id") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove
* Like `scope` but allows this scope to be interrupted. | ||
* Note that this may fail with `Interrupted` when interruption occured | ||
*/ | ||
private[fs2] def interruptScope[F[_], O, R](pull: FreeC[Algebra[F,O,?],R])(implicit effect: Effect[F], ec: ExecutionContext): FreeC[Algebra[F,O,?],R] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we define both this and scope
in terms of a new method that has the bulk of this body?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
+ modified Stream#append to resume from intettruption + modified Stream#flatMap to not use Stream#append, but own interruptible version of append + added flatMap2 to FreeC
@mpilquist pushed changes, I think this shall be good to go. |
@mpilquist of the final build failed, so I need to find out reason why again ... |
Either[Throwable,Any] => FreeC[Algebra[F,O,?], Unit]] | ||
val fx = bound.fx.asInstanceOf[FreeC.Eval[Algebra[F,O,?],_]].fr | ||
fx match { | ||
case _: Algebra.CloseScope[F,_] => FreeC.Bind(FreeC.Eval(fx), { (_: Either[Throwable, Any]) => f(Left(e)) }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if there's another CloseScope
in the result of f(fx)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if we need deep search this. I am quite certain we always get the closest to root scope that may be needed to close when flatMap fails. I was not able to come with stream structure where that would not be enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we only care about the closest CloseScope
?
@@ -14,6 +14,11 @@ private[fs2] sealed abstract class FreeC[F[_], +R] { | |||
case Left(e) => FreeC.Fail(e) | |||
}) | |||
|
|||
def flatMap2[R2](f: Either[Throwable, R] => FreeC[F, R2]): FreeC[F, R2] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to transformWith
(matching naming convention used in other types)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kk
* Specific version of `Free.asHandler` that will output any CloseScope that may have | ||
* been opened before the `rsn` was yielded | ||
*/ | ||
def asHandler[F[_], O](free: FreeC[Algebra[F,O,?],Unit], e: Throwable): FreeC[Algebra[F,O,?],Unit] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems too adhoc to me, like we're working around a flaw somewhere else in the design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well this is all realted to #1022. Any shortcuting of propert CloseScope causes the (A) resource leakage (B) interuption to be not possible. May be there is more princcipled approach, but that may // likely will require to touch algebra of either FreeC or Algebra itself
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re: #1022, handleErrorWith
can be defined like I suggested if we want it to introduce a scope and hence, release resources acquired during that stream. Critically, there's no resource leak there at all, just a mismatched expectation as to scoping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a thought experiment, what would happen if ++
on Stream
introduced a new scope? Would we be able to keep the old definition of flatMap
then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not unfortunatelly helping (the issue with handleErrorWith). I will bring you example of resource leak thats because of the mismatched scoping later. See the issue is that with interrupt we need to interrupt flatMaps that are defined in program AFTER the scope was defined, however stays within the scope
of execution. i.e.
Stream(1).flatMap { i => xxx}.interruptWhen(...).scope // that will work
Stream(1).interruptWhen(...).scope.flatMap { i => xxx} // thats the problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re ++'s scope. That was one of the earliest attempts. If we introduce scope with every ++, performance sucks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I understand performance will be unacceptable, but my point is that all this adhoc machinery is really just trying to make up for lack of scopes on each append in a use case specific way. I'm really not convinced we should be doing this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I have good answer to that. If we won't we couldn't have interruption done properly (not just ++, but also error propagation) and that is cutting fs2 as option to define more complex programs. If we introduce scope to ++ we will sacrifice perfromance in place where we don't want to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to get some thoughts from @pchiusano -- maybe he has some ideas.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, sure.
// in that case, in certain situations, the flatMap followed by `handleErrorWith` may | ||
// without this change postpone closure of resources until the handler in handleErrorWith is interpreted. | ||
|
||
def fby( s1: Either[O, Stream[F, O]], s2: => Stream[F,O2]): Stream[F, O2] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not convinced we should ever be searching the tail for more work -- seems like this is patching for something else that's wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, what wrong is that we skip all cleanups on error. Thats it. And in that case we rely on the cleanups to be run by Algebra.runFold
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we would signal interrupt by exception, that's likely only way. We could perhaps decide it not to signal by exception but somehow build it in the FreeC itself, but I do not have a good solution for that either.
@mpilquist I have couple of more ideas how to move further here. We certainly need to fix #1022, and then it seems it would perhaps start to work as it is. |
@mpilquist this is next iteration. val s1 = Stream(1,0,2)
val s2 = Stream(3,4,5)
(s1.interruptWhen(x) onInterrupt s2).flatMap { x =>
Stream.eval(IO { x })
}
will still fail if, the flatMap is evaluated during interruption What do you think? |
@mpilquist hm, surprisingly the above example won't fail even when being interrupted in |
@pchlupacek This is looking really good, thanks for all the word you've done on this! Can we move |
yes and thanks for support on this one |
@mpilquist unfortunatelly this is still not good enough (Stream(1,0,2).covary[IO].interruptWhen(interrupt.get.attempt).onInterrupt(Stream(3,4,5)))
.flatMap { i => Stream.eval(promise.get.map { _ => i }) }
.collect { case x => x }
will fail with I may need to look further how the pulls are altering the scope hierarchy, as this seems to be related to that. |
The collect example should have the same scope structure as: (Stream(1,0,2).covary[IO].interruptWhen(interrupt.get.attempt).onInterrupt(Stream(3,4,5)))
.flatMap { i => Stream.eval(promise.get.map { _ => i }) }
.scope |
@mpilquist , @SystemFw, pushed another iteration.
Bad news :
Moving forward: I really need to resolve that first issue with SoE an single element flatMap. This is show stopper, and I was not able to figure solution to that I am sort of out of options how to improve this any further. So I'll appreciate any ideas guys. My guts are that to make this more cleaner, we will need to heavily alter algebra, and to be honest I even don't know how :-( atm. The whole complexity comes from the Guys, i think we have to give this a priority, hence first user examples are showing that may be related to interruption (#1030) for example |
…-for-scala/fs2 into topic/interrupt-scope
@mpilquist @SystemFw after playing with this little bit more, I run out of ideas on more proper way how to implement interruption than it is currently done. So If you agree, I would like you to do final review of this, and then we could merge. If you are unhappy with current interrupt implementation, I bet the only way around it is to say that only arbitrary Throwable is propagated correctly (there we don't have to look up for the correct point to recover from) and leave interrupt in old implementation with all the drawbacks it may bring. |
@pchlupacek A final review sounds good. I'll do that this morning and then we can merge after. Then hopefully get a milestone release out today. |
} | ||
|
||
"interrupt (3)" in { | ||
// tests the interruption of the constant stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super minor style nitpick -- these comments can be either put in to the test name -- e.g. intterupt (3) - interruption of constant stream
or wrapped in an info("interruption of constant stream")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure about info syntax, can you just write quick example pls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"interrupt (3)" in {
info("tests interruption of the constant stream")
...
}
It just prints the info string to the test output in test reports. In this case, it's probably better in the test name itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have recently experimented with that, however info() causes to print out the text at every Gen
case, so perhaps 100x or so for every test.
I am not sure about moving test cases to the names, not sure what that will do with tools that using names of tests to filter execution of only one single test case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ouch. Moving text description to test name works fine -- e.g., testOnly *MergeJoinSpec -- -z interrupt
will run all the tests that include the word interrupt
in their name
def acquireResource[R](fr: F[R], release: R => F[Unit]): F[Either[Throwable, (R, Token)]] = { | ||
val resource = Resource.create | ||
F.flatMap(register(resource)) { mayAcquire => | ||
if (!mayAcquire) F.raiseError(Interrupted(id, 0)) // todo: not sure if we shall signal by interrupted anymore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see -- we used Interrupted
to signal this condition before but now Interrupted
has meaning that's specific to interruptWhen
. How about a new internal exception type here, AcquireAfterScopeClosed
or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kk makes sense
|
||
/** | ||
* Signals interruption of the evaluation. Contains id of last scope that shall be interrupted and | ||
* any children of that scope. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps we can reword a bit?
"Contains id of last scope that shall be interrupted along with its children"
Assuming that the meaning is "this will interrupt the scope with id scopeId and also interrupt all the children of said scope"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
* In each recursive iteration, this will increment by 1 up to limit defined in current scope, | ||
* After which this will Interrupt stream w/o searching further for any cleanups. | ||
*/ | ||
final case class Interrupted(scopeId: Token, loop: Int) extends Throwable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, the internal Token
type leaks to public API here. Can we make scopeId
and loop
private vals?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can make them private, however I was sort of thinking ot make the whole Interrupt() back to private again. I cannot imagine case where user will somehow catch/use the Interrupt. DO you think I should make it back private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay yeah, that's fine with me.
|
||
/** | ||
* Run the supplied effectful action at the end of this stream, regardless of how the stream terminates. | ||
*/ | ||
def onFinalize(f: F[Unit])(implicit F: Applicative[F]): Stream[F,O] = | ||
Stream.bracket(F.pure(()))(_ => self, _ => f) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra whitespace line
/** Deprecated alias for `compile.last`. */ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra whitespace line
, promise = Promise.unsafeCreate[F, Throwable](effect, ec) | ||
, ref = Ref.unsafeCreate[F, (Option[Throwable], Boolean)]((None, false)) | ||
, interruptScopeId = newScopeId | ||
, maxInterruptDepth = 256 // todo: want we somehow configure this ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we write a separate issue for configuring this value along with maxSteps and chunkSize in Algebra?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we ok with the side effects here (creating promise, ref, token)?
It's guarded by flatMap
, but it's threading the line (however, it might be hard to write it otherwise)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@SystemFw they are guarded by modify2, I think we are on the safe side here. The scope allocation is pretty costly now. It is also quite often, i.e. every repeatPull
(that is used by map
for example) closes and opens scope. So I would like to leave it as it is.
@@ -178,6 +226,11 @@ private[internal] final class CompileScope[F[_]] private (val id: Token, private | |||
go(self, Catenable.empty) | |||
} | |||
|
|||
/** yields to true, if this scope has ancestor with given scope Id **/ | |||
def hasAncestor(scopeId: Token): F[Boolean] = { | |||
F.map(ancestors) { c => Catenable.instance.exists(c)(_.id == scopeId) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that if you import cats.implicits._
, you can write this line as ancestors.map(_.exists(_.id == scopeId))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's quite a lot of F.map
and F.flatMap
, but imho it hurts readability a bit due to all the parenthesis, so unless there's a specific reason not to, definitely 👍 to use cats syntax
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure there is no extra allocation coming from the cats syntax implicit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, there's an allocation involved. I'm fine with the policy of fs2.internal
not using cats syntax.
// See docs on [[Scope#interrupt]] | ||
def interrupt(cause: Either[Throwable, Unit]): F[Unit] = { | ||
interruptible match { | ||
case None => F.raiseError(new Throwable("Scope#interrupt called for Scope that cannot be interrupted")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/Throwable/IllegalStateException/
// for every asynchronous child scope of the stream, so this assumption shall be completely safe. | ||
F.flatMap(iCtx.promise.cancellableGet) { case (get, cancel) => | ||
F.flatMap(fs2.async.race(get, F.attempt(f))(iCtx.effect, iCtx.ec)) { | ||
case Right(result) => F.map(cancel)(_ => result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With cats syntax this becomes cancel.as(result)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was trying to avoid cats.syntax in whole CompileScope. Syntax is zero cost right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally no -- some cats syntax is zero cost but anything provided via simulacrum is not. I'm fine with not using it in CompileScope
to avoid allocations.
|
||
/** | ||
* When the stream is evaluated, there may be `Eval` that needs to be cancelled early, when asynchronous interruption | ||
* is taking the place. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/taking the place/taking place
/** | ||
* When the stream is evaluated, there may be `Eval` that needs to be cancelled early, when asynchronous interruption | ||
* is taking the place. | ||
* This allows to augment eval so whenever this scope is interupted it will return on left the reason of interruption. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/interupted/interrupted
private[fs2] sealed trait Algebra[F[_],O,R] | ||
|
||
// Shared algebra between fold/uncons | ||
private[fs2] sealed trait AlgebraShared[F[_], O, R] extends Algebra[F, O, R] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like a better name here - also, let's move this trait inside Algebra companion object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about Algebra.Effectful
? Because the only constructors which aren't subtypes of it are Output
, Run
, and Uncons
which by themselves are all non-effectful (stepping through an uncons may involve evaluating some nested effectful nodes of course)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I am still playing with Scope to have specific implementation for Uncons/Fold to resolve the interrupt. But yes lets name it effectfull
, chunkSize: Int | ||
, maxSteps: Long | ||
)(implicit F: Sync[F]): F[(CompileScope[F], Option[(Segment[O,Unit], FreeC[Algebra[F,O,?],Unit])])] = { | ||
F.delay(s.viewL.get) flatMap { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to wrap s.viewL.get
in a delay constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, there are situation that will SoE without it. I think one of them was transform
stack safety
F.delay(s.viewL.get) flatMap { | ||
case done: FreeC.Pure[Algebra[F,O,?], Unit] => F.pure((scope, None)) | ||
case failed: FreeC.Fail[Algebra[F,O,?], Unit] => F.raiseError(failed.error) | ||
case bound: FreeC.Bind[Algebra[F,O,?], _, Unit] => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This lost a cleanup change that replaced the existential with a type variable, which then reduced need for casting below. I can make this change again after we merge though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no prob I put the existential there
compileFoldLoop(scope, output.values.fold(acc)(g).force.run._2, g, f(Right(()))) | ||
} | ||
catch { | ||
case err: Throwable => compileFoldLoop(scope, acc, g, f(Left(err))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace case err: Throwable
with a case NonFatal(err)
throughout here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
case failed: FreeC.Fail[Algebra[F,O,?], _] => F.raiseError(failed.error) | ||
case run: Algebra.Run[F, O, r] => | ||
try { | ||
val (r, b) = run.values.fold(acc)(g).force.run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like we need to use chunkSize
and maxSteps
here to limit this call to run
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
run shall always terminate after the fixes should it? Except when you define one that never terminates and as pure. If that would be the case, then, perhaps I will revert to code that stepped through uncons, but I don't think so it is necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stream.segment(Segment.constant(0)).covary[IO].to(Sink.showLinesStdOut[IO,Int]).compile.drain
<-- I'd expect this to print stuff forever but as defined now, I don't think it will print anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above example works fine with this branch (because to
uses evalMap
which uses flatMap
which calls uncons
. The result is the full infinite segment, which is then lazily folded an element at a time.
I think you can ignore the need for chunkSize
and maxSteps
here until/if I come up with an example that fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this shall work always, hence the uncons is implemented with splitAt
, honoring the chunkSize and maxSteps, however the runFold is implemented in fold
.
Also on the ChunkSize, wouldn't that mean here that chunk is always like 256 items? Just thinking loudly what that does with Stream[F, Byte]....
@@ -193,6 +228,7 @@ private[fs2] object Algebra { | |||
case o: Output[F,O2] => Output[G,O2](o.values) | |||
case Run(values) => Run[G,O2,X](values) | |||
case Eval(value) => Eval[G,O2,X](u(value)) | |||
case un:Uncons[F,x,O2] => Uncons[G,x,O2](FreeC.suspend(un.s.translate(algFtoG)), un.chunkSize, un.maxSteps).asInstanceOf[Algebra[G,O2,X]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is suspend needed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had problems with SoE and transform. Perhaps with delaying viewL, this could be removed too.
Looks good to me. Ready for merge? |
Just pushed the changes. |
Sounds like a good plan to me. I'll merge and then release M11. |
Resolves #916
This still needs a cleanup and perhaps more cases.
Still the
drained
streams won't get interrupted.@mpilquist @pchiusano @SystemFw pls feedback.
Thanks.