Skip to content

Commit

Permalink
Print package version from CLI (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
dcereijodo authored Feb 17, 2020
1 parent 0265b07 commit c64237e
Show file tree
Hide file tree
Showing 3 changed files with 367 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with S
Http().cachedHostConnectionPoolHttps[Request]("jsonplaceholder.typicode.com")

private implicit val defaultPatience: PatienceConfig =
PatienceConfig(timeout = Span(2, Seconds), interval = Span(5, Millis))
PatienceConfig(timeout = Span(15, Seconds), interval = Span(5, Millis))

import Request._

Expand Down
22 changes: 14 additions & 8 deletions src/main/scala/com/pagantis/singer/flows/BatchHttp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{JsonFraming, StreamConverters}
import com.typesafe.config.ConfigFactory
import net.ceedubs.ficus.Ficus._

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContextExecutor}
Expand All @@ -21,19 +22,19 @@ object BatchHttp extends App {
}
val inputStream = System.in

// init actor system, loggers and execution context
implicit val system: ActorSystem = ActorSystem("BatchHttp")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val standardLogger: LoggingAdapter = Logging(system, clazz)
implicit val ec: ExecutionContextExecutor = system.dispatcher

val config = ConfigFactory.load()

val endpoint = config.getString("flow.endpoint")
val port = config.getInt("flow.port")
val port = config.as[Option[Int]]("flow.port")
val parallelism = config.getInt("flow.parallelism")
val frameLength = config.getInt("flow.frame_length")

// init actor system, loggers and execution context
implicit val system: ActorSystem = ActorSystem("BatchHttp")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val standardLogger: LoggingAdapter = Logging(system, clazz)
implicit val ec: ExecutionContextExecutor = system.dispatcher

// This shutdown sequence was copied from another related issue: https://github.com/akka/akka-http/issues/907#issuecomment-345288919
def shutdownSequence = {
for {
Expand Down Expand Up @@ -61,7 +62,12 @@ object BatchHttp extends App {
}
)
.log(clazz)
.via(Http().cachedHostConnectionPoolHttps[Request](host = endpoint, port = port))
.via(
port match {
case Some(number) => Http().cachedHostConnectionPoolHttps[Request](host = endpoint, port = number)
case None => Http().cachedHostConnectionPoolHttps[Request](host = endpoint)
}
)
.log(clazz)
.mapAsync(parallelism)(parseResponse(_))
.log(clazz)
Expand Down
Loading

0 comments on commit c64237e

Please sign in to comment.