Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The stream gets stuck with the parser when large number of files are pushed thru. #3

Open
wpoosanguansit opened this issue Jun 13, 2016 · 0 comments

Comments

@wpoosanguansit
Copy link

wpoosanguansit commented Jun 13, 2016

Hi,

I managed to get file delete into the stream processing by defining a runnable graph as below:

  def g(source: Source[Path, Unit]) = RunnableGraph.fromGraph(GraphDSL.create() {
    implicit builder =>
      import GraphDSL.Implicits._

      val broadcast = builder.add(Broadcast[Path](2))

      val A: FlowShape[Path, JsValue] = builder.add(flow)
      val B: FlowShape[Path, Path] = builder.add(id)
      val C: FlowShape[Path, Boolean] = builder.add(deleteFlow)
      val zip = builder.add(Zip[JsValue, Path]())
      val unzip = builder.add(Unzip[JsValue, Path]())

      val out = Sink.foreach(println)
      source ~> broadcast ~> A ~> zip.in0
                broadcast ~> B ~> zip.in1
                zip.out ~> unzip.in
                unzip.out0 ~> out
                unzip.out1 ~> C ~> Sink.ignore
      ClosedShape
  })

However, if I have the flow as

val flow = Flow[Path]
    .filter(_.getFileName.toString.trim.endsWith(".json"))
    .throttle(1, 1 second, 1, p => 2, ThrottleMode.Shaping)
    .flatMapConcat(p => FileIO.fromFile(p.toFile))
    .via(JsonStreamParser[JsValue])

The stream will get stuck if I have like 50 files pushed thru in succession. But it would work slowly thru if I have the following code instead:

val flow = Flow[Path]
    .filter(_.getFileName.toString.trim.endsWith(".json"))
    .map(p => ByteBuffer.wrap(scala.io.Source.fromFile(p.toString).toArray.map(_.toByte)))
    .map(Parser.parseFromByteBuffer)
    .map(_.get)

the error that I got: java.lang.RuntimeException: Nonzero exit code returned from runner: 255
at scala.sys.package$.error(package.scala:27)

I know the _.get is unsafe. But I am just testing out and it turns out to have different behavior. Again, I am not really sure if this has anything to do with the parser. Any hint or idea how I should investigate this further is appreciated. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant