Skip to content

Commit

Permalink
Fixed #1039 - bug in Segment#take
Browse files Browse the repository at this point in the history
If a chunk was emitted whose size was exactly equal to the number of
remaining elements to take, the remainder was decremented to zero and
the take did not complete until the next element or chunk was emitted.
If another chunk was never emitted, the resulting segment hung.
  • Loading branch information
mpilquist committed Jan 2, 2018
1 parent 6d55021 commit 680de1a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
11 changes: 8 additions & 3 deletions core/shared/src/main/scala/fs2/Segment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -562,12 +562,17 @@ abstract class Segment[+O,+R] { self =>
var staged: Step[O,R] = null
staged = self.stage(depth.increment, defer,
o => { if (rem > 0) { rem -= 1; emit(o) } else done(Right(staged.remainder.cons(o))) },
os => { if (os.size <= rem) { rem -= os.size; emits(os) }
else {
os => os.size match {
case sz if sz < rem =>
rem -= os.size
emits(os)
case sz if sz == rem =>
emits(os)
done(Right(staged.remainder))
case _ =>
var i = 0
while (rem > 0) { rem -= 1; emit(os(i)); i += 1 }
done(Right(staged.remainder.prepend(Segment.chunk(os.drop(i)))))
}
},
r => done(Left(r -> rem))
).value
Expand Down
6 changes: 5 additions & 1 deletion core/shared/src/test/scala/fs2/SegmentSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,18 @@ class SegmentSpec extends Fs2Spec {
forAll { (s: Segment[Int,Unit], n: Int) =>
val v = s.force.toVector
s.take(n).force.toVector shouldBe v.take(n)
if (n > 0 && n >= v.size) {
if (n > 0 && n > v.size) {
s.take(n).drain.force.run shouldBe Left(((), n - v.size))
} else {
s.take(n).drain.force.run.map(_.force.toVector) shouldBe Right(Segment.vector(v.drop(n)).force.toVector)
}
}
}

"take eagerly exits" in {
Segment.from(0L).filter(_ < 2L).take(2).force.toVector shouldBe Vector(0L, 1L)
}

"takeWhile" in {
forAll { (s: Segment[Int,Unit], f: Int => Boolean) =>
s.takeWhile(f).force.toVector shouldBe s.force.toVector.takeWhile(f)
Expand Down

0 comments on commit 680de1a

Please sign in to comment.