Skip to content

Commit

Permalink
Use JSON framing for input stream (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
dcereijodo authored Sep 16, 2019
1 parent 7f23854 commit 911288c
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 1 deletion.
2 changes: 2 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ flow {
extra_params {
key = ${?API_KEY}
}
# maximum size of framing buffer (input json lines cannot be longer than this)
frame_length = 2048
}
4 changes: 3 additions & 1 deletion src/main/scala/com/pagantis/singer/flows/BatchHttp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.StreamConverters
import akka.stream.scaladsl.{JsonFraming, StreamConverters}
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration.Duration
Expand All @@ -31,6 +31,7 @@ object BatchHttp extends App {

val endpoint = config.getString("flow.endpoint")
val parallelism = config.getInt("flow.parallelism")
val frameLength = config.getInt("flow.frame_length")

val startTime = System.nanoTime()

Expand All @@ -44,6 +45,7 @@ object BatchHttp extends App {
val flowComputation =
StreamConverters
.fromInputStream(() => inputStream)
.via(JsonFraming.objectScanner(frameLength))
.log(clazz)
.mapConcat(_.utf8String.split("\n").toList)
.log(clazz)
Expand Down

0 comments on commit 911288c

Please sign in to comment.