diff --git a/csv/src/main/scala/akka/stream/alpakka/csv/CsvParsingStage.scala b/csv/src/main/scala/akka/stream/alpakka/csv/CsvParsingStage.scala index d6f9693dca..392dbfe406 100644 --- a/csv/src/main/scala/akka/stream/alpakka/csv/CsvParsingStage.scala +++ b/csv/src/main/scala/akka/stream/alpakka/csv/CsvParsingStage.scala @@ -4,11 +4,11 @@ package akka.stream.alpakka.csv import akka.event.Logging -import akka.stream.alpakka.csv.scaladsl.CsvParsing import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} import akka.stream.{Attributes, FlowShape, Inlet, Outlet} import akka.util.ByteString +import scala.annotation.tailrec import scala.util.control.NonFatal /** @@ -37,18 +37,30 @@ private[csv] class CsvParsingStage(delimiter: Byte, quoteChar: Byte, escapeChar: override def onPull(): Unit = tryPollBuffer() - override def onUpstreamFinish(): Unit = - buffer.poll(requireLineEnd = false) match { - case Some(csvLine) ⇒ emit(out, csvLine) - case _ ⇒ completeStage() - } + override def onUpstreamFinish(): Unit = { + emitRemaining() + completeStage() + } private def tryPollBuffer() = try buffer.poll(requireLineEnd = true) match { case Some(csvLine) ⇒ push(out, csvLine) - case _ ⇒ if (isClosed(in)) completeStage() else pull(in) + case _ ⇒ + if (isClosed(in)) { + emitRemaining() + completeStage() + } else pull(in) } catch { case NonFatal(ex) ⇒ failStage(ex) } + + @tailrec private def emitRemaining(): Unit = + buffer.poll(requireLineEnd = false) match { + case Some(csvLine) ⇒ + emit(out, csvLine) + emitRemaining() + case _ ⇒ + } + } } diff --git a/csv/src/test/scala/akka/stream/alpakka/csv/scaladsl/CsvParsingSpec.scala b/csv/src/test/scala/akka/stream/alpakka/csv/scaladsl/CsvParsingSpec.scala index fe545102ad..679311dc3a 100644 --- a/csv/src/test/scala/akka/stream/alpakka/csv/scaladsl/CsvParsingSpec.scala +++ b/csv/src/test/scala/akka/stream/alpakka/csv/scaladsl/CsvParsingSpec.scala @@ -3,16 +3,30 @@ */ package akka.stream.alpakka.csv.scaladsl -import java.nio.charset.StandardCharsets import java.nio.file.Paths import akka.NotUsed -import akka.stream.scaladsl.{FileIO, Flow, Sink, Source} +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{FileIO, Flow, Keep, Sink, Source} +import akka.stream.testkit.scaladsl.{TestSink, TestSource} +import akka.testkit.TestKit import akka.util.ByteString +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers, WordSpecLike} import scala.collection.immutable.Seq +import scala.concurrent.duration.DurationInt -class CsvParsingSpec extends CsvSpec { +class CsvParsingSpec + extends TestKit(ActorSystem(classOf[CsvParsingSpec].getSimpleName)) + with WordSpecLike + with Matchers + with BeforeAndAfterAll + with BeforeAndAfterEach + with ScalaFutures { + + implicit val materializer = ActorMaterializer() def documentation(): Unit = { import CsvParsing._ @@ -89,6 +103,23 @@ class CsvParsingSpec extends CsvSpec { res(1) should be(List("uno", "dos", "tres")) } + "emit completion even without new line at end" in { + val (source, sink) = TestSource + .probe[ByteString] + .via(CsvParsing.lineScanner()) + .map(_.map(_.utf8String)) + .toMat(TestSink.probe[List[String]])(Keep.both) + .run() + source.sendNext(ByteString("eins,zwei,drei\nuno,dos,tres\n1,2,3")) + sink.request(3) + sink.expectNext(List("eins", "zwei", "drei")) + sink.expectNext(List("uno", "dos", "tres")) + sink.expectNoMsg(100.millis) + source.sendComplete() + sink.expectNext(List("1", "2", "3")) + sink.expectComplete() + } + "parse Apple Numbers exported file" in { val fut = FileIO