diff --git a/README.md b/README.md index 60141fb..a5abad5 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # BatchHttp [![CircleCI](https://circleci.com/gh/digitalorigin/batch-http.svg?style=svg&circle-token=d196d5b828e9e0debb5c25f04e7279c1f342d675)](https://circleci.com/gh/digitalorigin/batch-http) A tool for processing HTTP request batches through a REST API. It reads the `stdin` for JSON lines representing HTTP requests, -converts each line to an HTTP and executes it, providing both the request and the response as an output in the `stdout`. +converts each line to an HTTP request and makes the request, providing both the request and the response as an output in the `stdout`. For example, when passed a JSON string such as ```json diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index e406ca0..05209ee 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -9,6 +9,10 @@ flow { port = 8080 path = "/maps/api/geocode/json" + # requests throughput can be throttled down to a number of requests per second + # if no throttling configuration is provided the maximum available is used + # throttle = 10 + parallelism = 8 # additional parameters to be inculded in the http query if any extra_params { diff --git a/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala b/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala index f86d68b..dd85f8d 100644 --- a/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala +++ b/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala @@ -3,12 +3,13 @@ package com.pagantis.singer.flows import akka.actor.{ActorSystem, Props} import akka.event.{Logging, LoggingAdapter} import akka.http.scaladsl.Http -import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{JsonFraming, StreamConverters} +import akka.http.scaladsl.model.HttpRequest +import akka.stream.{ActorMaterializer, Materializer} +import akka.stream.scaladsl.{Flow, JsonFraming, StreamConverters} import com.typesafe.config.ConfigFactory import net.ceedubs.ficus.Ficus._ -import scala.concurrent.duration.Duration +import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContextExecutor} import scala.util.Success @@ -28,12 +29,24 @@ object BatchHttp extends App { val port = config.as[Option[Int]]("flow.port") val parallelism = config.getInt("flow.parallelism") val frameLength = config.getInt("flow.frame_length") - - // init actor system, loggers and execution context - implicit val system: ActorSystem = ActorSystem("BatchHttp") - implicit val materializer: ActorMaterializer = ActorMaterializer() - implicit val standardLogger: LoggingAdapter = Logging(system, clazz) - implicit val ec: ExecutionContextExecutor = system.dispatcher + val throttlingConfig = config.as[Option[Int]]("flow.throttle") + + /** + * Instantiates a host connection pool ([[Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool]]]) + * and prepends a throttling stage if a throttling configuration is provided. + * @return + */ + def requestProcessor(implicit fm: Materializer) = { + val throttleOrNeutral = throttlingConfig match { + case Some(elements) => Flow[(HttpRequest, Request)].throttle(elements, 1 second) + case _ => Flow[(HttpRequest, Request)] + } + val connectionPool = port match { + case Some(number) => Http().cachedHostConnectionPoolHttps[Request](host = endpoint, port = number) + case None => Http().cachedHostConnectionPoolHttps[Request](host = endpoint) + } + throttleOrNeutral.via(connectionPool) + } // This shutdown sequence was copied from another related issue: https://github.com/akka/akka-http/issues/907#issuecomment-345288919 def shutdownSequence = { @@ -43,6 +56,11 @@ object BatchHttp extends App { } yield akka } + // init actor system, loggers and execution context + implicit val system: ActorSystem = ActorSystem("BatchHttp") + implicit val materializer: ActorMaterializer = ActorMaterializer() + implicit val standardLogger: LoggingAdapter = Logging(system, clazz) + implicit val ec: ExecutionContextExecutor = system.dispatcher val counter = system.actorOf(Props[CountLogger], "counter") @@ -62,12 +80,7 @@ object BatchHttp extends App { } ) .log(clazz) - .via( - port match { - case Some(number) => Http().cachedHostConnectionPoolHttps[Request](host = endpoint, port = number) - case None => Http().cachedHostConnectionPoolHttps[Request](host = endpoint) - } - ) + .via(requestProcessor) .log(clazz) .mapAsync(parallelism)(parseResponse(_)) .log(clazz)