Skip to content

Commit

Permalink
Alpakka Issue akka#60: CSV parsing stage
Browse files Browse the repository at this point in the history
  • Loading branch information
ennru committed Mar 6, 2017
1 parent 46f5711 commit 5f0efb6
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 3 deletions.
3 changes: 3 additions & 0 deletions csv/src/main/scala/akka/stream/alpakka/csv/CsvParser.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.csv

import akka.util.{ByteString, ByteStringBuilder}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.csv.scaladsl

import akka.NotUsed
import akka.event.Logging
import akka.stream.alpakka.csv.CsvParser
import akka.stream.scaladsl.Flow
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString

import scala.util.control.NonFatal

/** Provides CSV framing stages that can separate CSV lines from incoming [[ByteString]] objects. */
object CsvFraming {

def lineScanner(escapeChar: Byte = '\\',
delimiter: Byte = ',',
quoteChar: Byte = '"'): Flow[ByteString, List[ByteString], NotUsed] =
Flow[ByteString].via(new GraphStage[FlowShape[ByteString, List[ByteString]]] {

val in = Inlet[ByteString](Logging.simpleName(this) + ".in")
val out = Outlet[List[ByteString]](Logging.simpleName(this) + ".out")
override val shape = FlowShape(in, out)

override protected def initialAttributes: Attributes = Attributes.name("CsvFraming.lineScanner")

override def createLogic(inheritedAttributes: Attributes) =
new GraphStageLogic(shape) with InHandler with OutHandler {
private val buffer = new CsvParser(escapeChar, delimiter, quoteChar)

setHandlers(in, out, this)

override def onPush(): Unit = {
buffer.offer(grab(in))
tryPopBuffer()
}

override def onPull(): Unit =
tryPopBuffer()

override def onUpstreamFinish(): Unit =
buffer.poll() match {
case Some(csvLine) emit(out, csvLine)
case _ completeStage()
}

def tryPopBuffer() =
try buffer.poll() match {
case Some(json) push(out, json)
case _ if (isClosed(in)) completeStage() else pull(in)
} catch {
case NonFatal(ex) failStage(ex)
}
}
})

}
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.csv

import akka.stream.alpakka.csv.CsvParser.MalformedCSVException
Expand Down Expand Up @@ -42,7 +45,7 @@ class CsvParserSpec extends WordSpec with Matchers with OptionValues {
val in = ByteString.empty
val parser = new CsvParser()
parser.offer(in)
parser.poll() should be ('empty)
parser.poll() should be('empty)
}

"parse leading comma to be an empty column" in {
Expand Down Expand Up @@ -75,7 +78,7 @@ class CsvParserSpec extends WordSpec with Matchers with OptionValues {
parser.offer(in)
val res = parser.poll()
res.value.map(_.utf8String) should be(List(""))
parser.poll() should be ('empty)
parser.poll() should be('empty)
}

"parse an empty line with CR, LF into a single column" in {
Expand Down Expand Up @@ -208,7 +211,7 @@ class CsvParserSpec extends WordSpec with Matchers with OptionValues {
parser.offer(in)
val res = parser.poll()
res.value.map(_.utf8String) should be(List("one", "two", "three"))
parser.poll() should be ('empty)
parser.poll() should be('empty)
}

"ignore trailing \\n\\r" in {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package akka.stream.alpakka.csv.scaladsl

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.testkit.TestKit
import akka.util.ByteString
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers, WordSpec}

import scala.concurrent.duration.DurationInt

abstract class CsvSpec
extends WordSpec
with Matchers
with BeforeAndAfterAll
with BeforeAndAfterEach
with ScalaFutures {


implicit val system = ActorSystem(this.getClass.getSimpleName)
implicit val materializer = ActorMaterializer()

override protected def afterAll(): Unit =
TestKit.shutdownActorSystem(system)
}

class CsvFramingSpec extends CsvSpec {
override implicit val patienceConfig = PatienceConfig(2.seconds)

"CSV Framing" should {
"parse one line" in {
// #line-scanner
val fut = Source.single(ByteString("eins,zwei,drei"))
.via(CsvFraming.lineScanner())
.runWith(Sink.seq)
// #line-scanner
fut.futureValue.head should be (List(ByteString("eins"), ByteString("zwei"), ByteString("drei")))
}

"parse two lines" in {
val fut = Source.single(ByteString("eins,zwei,drei\nuno,dos,tres"))
.via(CsvFraming.lineScanner())
.runWith(Sink.seq)
val res = fut.futureValue
res.head should be (List(ByteString("eins"), ByteString("zwei"), ByteString("drei")))
res(1) should be (List(ByteString("uno"), ByteString("dos"), ByteString("tres")))
}

}
}
70 changes: 70 additions & 0 deletions docs/src/main/paradox/csv.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Comma Separated Files - CSV

Blah blah.

## Artifacts

sbt
: @@@vars
```scala
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-csv" % "$version$"
```
@@@

Maven
: @@@vars
```xml
<dependency>
<groupId>com.lightbend.akka</groupId>
<artifactId>akka-stream-alpakka-csv_$scala.binaryVersion$</artifactId>
<version>$version$</version>
</dependency>
```
@@@

Gradle
: @@@vars
```gradle
dependencies {
compile group: "com.lightbend.akka", name: "akka-stream-alpakka-csv_$scala.binaryVersion$", version: "$version$"
}
```
@@@

## Usage

### CSV parsing

Blah blah.

In this sample we ...:

Scala
: @@snip (../../../../file/src/test/scala/akka/stream/alpakka/file/scaladsl/FileTailSourceSpec.scala) { #simple-lines }

Java
: @@snip (../../../../file/src/test/java/akka/stream/alpakka/file/javadsl/FileTailSourceTest.java) { #simple-lines }


### Running the example code

Both the samples are contained in standalone runnable mains, they can be run
from `sbt` like this:

Scala
: ```
sbt
// tail source
> akka-stream-alpakka-file/test:runMain akka.stream.alpakka.file.scaladsl.FileTailSourceSpec /some/path/toa/file
// or directory changes
> akka-stream-alpakka-file/test:runMain akka.stream.alpakka.file.scaladsl.DirectoryChangesSourceSpec /some/directory/path
```

Java
: ```
sbt
// tail source
> akka-stream-alpakka-file/test:runMain akka.stream.alpakka.file.javadsl.FileTailSourceTest /some/path/toa/file
// or directory changes
> akka-stream-alpakka-file/test:runMain akka.stream.alpakka.file.javadsl.DirectoryChangesSourceTest /some/directory/path
```
1 change: 1 addition & 0 deletions docs/src/main/paradox/data-transformations.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

@@@ index

* [Comma Separated Files - CSV](csv.md)
* [RecordIO Framing](recordio.md)

@@@
Expand Down

0 comments on commit 5f0efb6

Please sign in to comment.