Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moving data through queue leaks memory #1026

Closed
avakhrenev opened this issue Dec 19, 2017 · 22 comments
Closed

Moving data through queue leaks memory #1026

avakhrenev opened this issue Dec 19, 2017 · 22 comments
Milestone

Comments

@avakhrenev
Copy link
Contributor

The following program

Stream.eval(async.boundedQueue[IO, Either[Throwable, Option[Int]]](10)).flatMap { queue =>
  queue.dequeueAvailable.rethrow.unNoneTerminate
    .concurrently(
      Stream.constant(1, 128).covary[IO].noneTerminate.attempt.evalMap(queue.enqueue1(_)))
    .evalMap(_ => IO.unit)
}

leaks memory. Here is what Eclipse Momory Analyzer shows:

image

image

image

fs2 version is 0.10.0-M9

@avakhrenev
Copy link
Contributor Author

avakhrenev commented Dec 19, 2017

I'm running it from sbt via runMain with this setting enabled:
connectInput in run := true
This allows me to interrupt the program by hitting Enter

Here is a full program for copy-pasting

package com.example

import cats.effect.IO
import fs2._
import fs2.StreamApp.ExitCode
import cats.syntax.apply._

object MemorySanityCheck extends StreamApp[IO] {
  import scala.concurrent.ExecutionContext.Implicits._

  override final def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, ExitCode] =
    (io
      .readInputStream[IO](IO(System.in), 32, false)
      .takeWhile(_ != '\n') //read System.in to the end
      .drain
      .onFinalize(IO(println("Exiting...")) *> requestShutdown) mergeHaltBoth app(
      args,
      requestShutdown).evalMap(_ => IO.unit).drain).handleErrorWith(err =>
      Stream.eval(IO {
        err.printStackTrace(System.out)
        ExitCode.Error
      }))

  def app(args: List[String], requestShutdown: IO[Unit]): Stream[IO, Any] =
    Stream.eval(async.boundedQueue[IO, Either[Throwable, Option[Int]]](10)).flatMap { queue =>
      queue.dequeueAvailable.rethrow.unNoneTerminate
        .concurrently(
          Stream.constant(1, 128).covary[IO].noneTerminate.attempt.evalMap(queue.enqueue1(_)))
        .evalMap(_ => IO.unit)
    }
}

@avakhrenev
Copy link
Contributor Author

Promise, created at this line gets filled with unset callbacks

@mpilquist
Copy link
Member

@avakhrenev I'm surprised it's the promise in unconsAsync and not one in Queue

@avakhrenev
Copy link
Contributor Author

That's what I've seen after instrumenting Promise creation with sourcecode and adding printlns when waiting map reaches 1000 elements. It may be not a root cause.

@mpilquist
Copy link
Member

@SystemFw Totally guessing here, but this could occur if the cancellation action in AsyncPull#race wasn't unregistering from the promise's state. Quick spot check appears fine though.

@SystemFw
Copy link
Collaborator

AsyncPull#race is definitely the bit of the concurrency PR I'm less confident about. However, I'm still unsure exactly how and where unconsAsync enters the picture in the queue example...

@avakhrenev
Copy link
Contributor Author

It may be triggered by surrounding code, which reads from input stream simultaneously with running data through queue using mergeHaltBoth combinator.

@SystemFw
Copy link
Collaborator

SystemFw commented Dec 19, 2017

ah, mergeHaltBoth!
I didn't see that, but that's far more likely (and when you drain a stream then merge, I prefer to use concurrently or join) .

However, the minimal snippet does not have that.

@mpilquist
Copy link
Member

@avakhrenev I just ran the sample code without the surrounding stream interruption stuff and I haven't been able to reproduce a memory leak. I suspect the real leak is in merge when one of the sides doesn't make progress.

@SystemFw
Copy link
Collaborator

a leak on merge makes a lot more sense to me, since that's where AsyncPull.race is involved.
Not sure if it's a problem with race, or just another case of a.drain merge b not being good enough .

@avakhrenev
Copy link
Contributor Author

Interesting, I've just removed reading from input stream with mergeHaltBoth and got the same result: instances of Promise, allocated in ToPull#unconsAsync continue to grow. Here is the full code for reference:

package com.example

import cats.effect.IO
import fs2._
import fs2.StreamApp.ExitCode

object MemorySanityCheck extends StreamApp[IO] {
  import scala.concurrent.ExecutionContext.Implicits._

  override final def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, ExitCode] = {
    Stream.eval(async.boundedQueue[IO, Either[Throwable, Option[Int]]](10)).flatMap { queue =>
      queue.dequeueAvailable.rethrow.unNoneTerminate
        .concurrently(
          Stream.constant(1, 128).covary[IO].noneTerminate.attempt.evalMap(queue.enqueue1(_)))
        .evalMap(_ => IO.unit)
    }.drain ++ Stream(ExitCode.Error)
  }
}

@mpilquist
Copy link
Member

@avakhrenev Can you try without StreamApp? Here's what I tried:

object QueueTest extends App {
  import ExecutionContext.Implicits.global
  Stream.eval(async.boundedQueue[IO, Either[Throwable, Option[Int]]](10)).flatMap { queue =>
    queue.dequeueAvailable.rethrow.unNoneTerminate.concurrently(
      Stream.constant(1, 128).covary[IO].noneTerminate.attempt.evalMap(queue.enqueue1(_))
    ).evalMap(_ => IO.unit)
  }.run.unsafeRunSync()
}

@mpilquist
Copy link
Member

This program definitely leaks:

object HungMerge extends App {
  import ExecutionContext.Implicits.global
  val hung = Stream.eval(IO.async[Int](_ => ()))
  val progress = Stream.constant(1, 128).covary[IO]
  (hung merge progress).run.unsafeRunSync()
}

@mpilquist
Copy link
Member

Note StreamApp uses an interruptWhen on the result of def stream, resulting in a call to merge.

@avakhrenev
Copy link
Contributor Author

Hm, your program leaks also, or maybe I'm misinterpreting results. I've added println at Promise.scala:96:

case State.Unset(waiting) =>
  val wu = waiting.updated(id, cb)
  State.Unset(wu) -> F.delay[Unit](
    if(wu.size > 100)
      println(s"Waiters size is: ${wu.size}")
    else ())

@SystemFw
Copy link
Collaborator

Wait, does QueueTest leak or not?

In any case, HungMerge does, so would you say this is a bug on AsyncPull.race (which I should probably start working on)? No problem with non-hung merge, I'm assuming

@mpilquist
Copy link
Member

@avakhrenev Yeah, I see a slow leak in QueueTest, though it is much, much slower than the leak in merge -- about ~200MB over 5 minutes on my machine.

@SystemFw I'll test non-hung merge now.

@mpilquist
Copy link
Member

Merging 2 progressing streams does not leak:

object ProgressMerge extends App {
  import ExecutionContext.Implicits.global
  val progress = Stream.constant(1, 128).covary[IO]
  (progress merge progress).run.unsafeRunSync()
}

@SystemFw
Copy link
Collaborator

@mpilquist Probably found the cause, cancellableGet in Promise. I'll start working on a fix/precise diagnosis and we can then see if the leaks are also fixed.

@mpilquist
Copy link
Member

Fixed by #1027

@SystemFw
Copy link
Collaborator

@avakhrenev Thanks a lot for spotting this, turns out it was a pretty serious problem.
Matter of fact, I'm surprised it "only" resulted in a memory leak and didn't break any tests.
Michael fixed it, in any case :)

@avakhrenev
Copy link
Contributor Author

@mpilquist @SystemFw Thank you for your great work!

@mpilquist mpilquist added this to the 0.10 milestone Jan 6, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants