Skip to content

Commit

Permalink
Support throttling (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
dcereijodo authored Feb 17, 2020
1 parent c64237e commit 1f9f5dc
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 16 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# BatchHttp [![CircleCI](https://circleci.com/gh/digitalorigin/batch-http.svg?style=svg&circle-token=d196d5b828e9e0debb5c25f04e7279c1f342d675)](https://circleci.com/gh/digitalorigin/batch-http)
A tool for processing HTTP request batches through a REST API. It reads the `stdin` for JSON lines representing HTTP requests,
converts each line to an HTTP and executes it, providing both the request and the response as an output in the `stdout`.
converts each line to an HTTP request and makes the request, providing both the request and the response as an output in the `stdout`.

For example, when passed a JSON string such as
```json
Expand Down
4 changes: 4 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ flow {
port = 8080
path = "/maps/api/geocode/json"

# requests throughput can be throttled down to a number of requests per second
# if no throttling configuration is provided the maximum available is used
# throttle = 10

parallelism = 8
# additional parameters to be inculded in the http query if any
extra_params {
Expand Down
43 changes: 28 additions & 15 deletions src/main/scala/com/pagantis/singer/flows/BatchHttp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package com.pagantis.singer.flows
import akka.actor.{ActorSystem, Props}
import akka.event.{Logging, LoggingAdapter}
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{JsonFraming, StreamConverters}
import akka.http.scaladsl.model.HttpRequest
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.{Flow, JsonFraming, StreamConverters}
import com.typesafe.config.ConfigFactory
import net.ceedubs.ficus.Ficus._

import scala.concurrent.duration.Duration
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContextExecutor}
import scala.util.Success

Expand All @@ -28,12 +29,24 @@ object BatchHttp extends App {
val port = config.as[Option[Int]]("flow.port")
val parallelism = config.getInt("flow.parallelism")
val frameLength = config.getInt("flow.frame_length")

// init actor system, loggers and execution context
implicit val system: ActorSystem = ActorSystem("BatchHttp")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val standardLogger: LoggingAdapter = Logging(system, clazz)
implicit val ec: ExecutionContextExecutor = system.dispatcher
val throttlingConfig = config.as[Option[Int]]("flow.throttle")

/**
* Instantiates a host connection pool ([[Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool]]])
* and prepends a throttling stage if a throttling configuration is provided.
* @return
*/
def requestProcessor(implicit fm: Materializer) = {
val throttleOrNeutral = throttlingConfig match {
case Some(elements) => Flow[(HttpRequest, Request)].throttle(elements, 1 second)
case _ => Flow[(HttpRequest, Request)]
}
val connectionPool = port match {
case Some(number) => Http().cachedHostConnectionPoolHttps[Request](host = endpoint, port = number)
case None => Http().cachedHostConnectionPoolHttps[Request](host = endpoint)
}
throttleOrNeutral.via(connectionPool)
}

// This shutdown sequence was copied from another related issue: https://github.com/akka/akka-http/issues/907#issuecomment-345288919
def shutdownSequence = {
Expand All @@ -43,6 +56,11 @@ object BatchHttp extends App {
} yield akka
}

// init actor system, loggers and execution context
implicit val system: ActorSystem = ActorSystem("BatchHttp")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val standardLogger: LoggingAdapter = Logging(system, clazz)
implicit val ec: ExecutionContextExecutor = system.dispatcher

val counter = system.actorOf(Props[CountLogger], "counter")

Expand All @@ -62,12 +80,7 @@ object BatchHttp extends App {
}
)
.log(clazz)
.via(
port match {
case Some(number) => Http().cachedHostConnectionPoolHttps[Request](host = endpoint, port = number)
case None => Http().cachedHostConnectionPoolHttps[Request](host = endpoint)
}
)
.via(requestProcessor)
.log(clazz)
.mapAsync(parallelism)(parseResponse(_))
.log(clazz)
Expand Down

0 comments on commit 1f9f5dc

Please sign in to comment.