From 8abd2674a4da8e8ec7c035fc7915fe35e2711ebc Mon Sep 17 00:00:00 2001 From: dcereijodo Date: Fri, 14 Feb 2020 17:17:16 +0100 Subject: [PATCH 1/7] relax patience configuration for integration tests --- src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala b/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala index 2eec6e9..f08b898 100644 --- a/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala +++ b/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala @@ -33,7 +33,7 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with S Http().cachedHostConnectionPoolHttps[Request]("jsonplaceholder.typicode.com") private implicit val defaultPatience: PatienceConfig = - PatienceConfig(timeout = Span(2, Seconds), interval = Span(5, Millis)) + PatienceConfig(timeout = Span(10, Seconds), interval = Span(5, Millis)) import Request._ From d36046029da65ef7e7970b801f2aad24e1f6e126 Mon Sep 17 00:00:00 2001 From: dcereijodo Date: Fri, 14 Feb 2020 17:49:46 +0100 Subject: [PATCH 2/7] port is an optional configuration parameter --- .../com/pagantis/singer/flows/BatchHttp.scala | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala b/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala index de2d5ae..f86d68b 100644 --- a/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala +++ b/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala @@ -6,6 +6,7 @@ import akka.http.scaladsl.Http import akka.stream.ActorMaterializer import akka.stream.scaladsl.{JsonFraming, StreamConverters} import com.typesafe.config.ConfigFactory +import net.ceedubs.ficus.Ficus._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContextExecutor} @@ -21,19 +22,19 @@ object BatchHttp extends App { } val inputStream = System.in - // init actor system, loggers and execution context - implicit val system: ActorSystem = ActorSystem("BatchHttp") - implicit val materializer: ActorMaterializer = ActorMaterializer() - implicit val standardLogger: LoggingAdapter = Logging(system, clazz) - implicit val ec: ExecutionContextExecutor = system.dispatcher - val config = ConfigFactory.load() val endpoint = config.getString("flow.endpoint") - val port = config.getInt("flow.port") + val port = config.as[Option[Int]]("flow.port") val parallelism = config.getInt("flow.parallelism") val frameLength = config.getInt("flow.frame_length") + // init actor system, loggers and execution context + implicit val system: ActorSystem = ActorSystem("BatchHttp") + implicit val materializer: ActorMaterializer = ActorMaterializer() + implicit val standardLogger: LoggingAdapter = Logging(system, clazz) + implicit val ec: ExecutionContextExecutor = system.dispatcher + // This shutdown sequence was copied from another related issue: https://github.com/akka/akka-http/issues/907#issuecomment-345288919 def shutdownSequence = { for { @@ -61,7 +62,12 @@ object BatchHttp extends App { } ) .log(clazz) - .via(Http().cachedHostConnectionPoolHttps[Request](host = endpoint, port = port)) + .via( + port match { + case Some(number) => Http().cachedHostConnectionPoolHttps[Request](host = endpoint, port = number) + case None => Http().cachedHostConnectionPoolHttps[Request](host = endpoint) + } + ) .log(clazz) .mapAsync(parallelism)(parseResponse(_)) .log(clazz) From 35898057277626880762644a33d290968b6ccfb6 Mon Sep 17 00:00:00 2001 From: dcereijodo Date: Fri, 14 Feb 2020 17:51:11 +0100 Subject: [PATCH 3/7] add custom bash start script --- src/templates/bash-template | 352 ++++++++++++++++++++++++++++++++++++ 1 file changed, 352 insertions(+) create mode 100644 src/templates/bash-template diff --git a/src/templates/bash-template b/src/templates/bash-template new file mode 100644 index 0000000..dbd6a14 --- /dev/null +++ b/src/templates/bash-template @@ -0,0 +1,352 @@ +#!/usr/bin/env bash + +### ------------------------------- ### +### Helper methods for BASH scripts ### +### ------------------------------- ### + +die() { + echo "$@" 1>&2 + exit 1 +} + +realpath () { +( + TARGET_FILE="$1" + CHECK_CYGWIN="$2" + + cd "$(dirname "$TARGET_FILE")" + TARGET_FILE=$(basename "$TARGET_FILE") + + COUNT=0 + while [ -L "$TARGET_FILE" -a $COUNT -lt 100 ] + do + TARGET_FILE=$(readlink "$TARGET_FILE") + cd "$(dirname "$TARGET_FILE")" + TARGET_FILE=$(basename "$TARGET_FILE") + COUNT=$(($COUNT + 1)) + done + + if [ "$TARGET_FILE" == "." -o "$TARGET_FILE" == ".." ]; then + cd "$TARGET_FILE" + TARGET_FILEPATH= + else + TARGET_FILEPATH=/$TARGET_FILE + fi + + # make sure we grab the actual windows path, instead of cygwin's path. + if [[ "x$CHECK_CYGWIN" == "x" ]]; then + echo "$(pwd -P)/$TARGET_FILE" + else + echo $(cygwinpath "$(pwd -P)/$TARGET_FILE") + fi +) +} + +# TODO - Do we need to detect msys? + +# Uses uname to detect if we're in the odd cygwin environment. +is_cygwin() { + local os=$(uname -s) + case "$os" in + CYGWIN*) return 0 ;; + *) return 1 ;; + esac +} + +# This can fix cygwin style /cygdrive paths so we get the +# windows style paths. +cygwinpath() { + local file="$1" + if is_cygwin; then + echo $(cygpath -w $file) + else + echo $file + fi +} + +# Make something URI friendly +make_url() { + url="$1" + local nospaces=${url// /%20} + if is_cygwin; then + echo "/${nospaces//\\//}" + else + echo "$nospaces" + fi +} + +# This crazy function reads in a vanilla "linux" classpath string (only : are separators, and all /), +# and returns a classpath with windows style paths, and ; separators. +fixCygwinClasspath() { + OLDIFS=$IFS + IFS=":" + read -a classpath_members <<< "$1" + declare -a fixed_members + IFS=$OLDIFS + for i in "${!classpath_members[@]}" + do + fixed_members[i]=$(realpath "${classpath_members[i]}" "fix") + done + IFS=";" + echo "${fixed_members[*]}" + IFS=$OLDIFS +} + +# Fix the classpath we use for cygwin. +fix_classpath() { + cp="$1" + if is_cygwin; then + echo "$(fixCygwinClasspath "$cp")" + else + echo "$cp" + fi +} +# Detect if we should use JAVA_HOME or just try PATH. +get_java_cmd() { + # High-priority override for Jlink images + if [[ -n "$bundled_jvm" ]]; then + echo "$bundled_jvm/bin/java" + elif [[ -n "$JAVA_HOME" ]] && [[ -x "$JAVA_HOME/bin/java" ]]; then + echo "$JAVA_HOME/bin/java" + else + echo "java" + fi +} + +echoerr () { + echo 1>&2 "$@" +} +vlog () { + [[ $verbose || $debug ]] && echoerr "$@" +} +dlog () { + [[ $debug ]] && echoerr "$@" +} +execRunner () { + # print the arguments one to a line, quoting any containing spaces + [[ $verbose || $debug ]] && echo "# Executing command line:" && { + for arg; do + if printf "%s\n" "$arg" | grep -q ' '; then + printf "\"%s\"\n" "$arg" + else + printf "%s\n" "$arg" + fi + done + echo "" + } + + # we use "exec" here for our pids to be accurate. + exec "$@" +} +addJava () { + dlog "[addJava] arg = '$1'" + java_args+=( "$1" ) +} +addApp () { + dlog "[addApp] arg = '$1'" + app_commands+=( "$1" ) +} +addResidual () { + dlog "[residual] arg = '$1'" + residual_args+=( "$1" ) +} +addDebugger () { + addJava "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$1" +} + +require_arg () { + local type="$1" + local opt="$2" + local arg="$3" + if [[ -z "$arg" ]] || [[ "${arg:0:1}" == "-" ]]; then + die "$opt requires <$type> argument" + fi +} +is_function_defined() { + declare -f "$1" > /dev/null +} + +# Attempt to detect if the script is running via a GUI or not +# TODO - Determine where/how we use this generically +detect_terminal_for_ui() { + [[ ! -t 0 ]] && [[ "${#residual_args}" == "0" ]] && { + echo "true" + } + # SPECIAL TEST FOR MAC + [[ "$(uname)" == "Darwin" ]] && [[ "$HOME" == "$PWD" ]] && [[ "${#residual_args}" == "0" ]] && { + echo "true" + } +} + +# Processes incoming arguments and places them in appropriate global variables. called by the run method. +process_args () { + local no_more_snp_opts=0 + while [[ $# -gt 0 ]]; do + case "$1" in + --) shift && no_more_snp_opts=1 && break ;; + -h|-help) usage; exit 1 ;; + -version|--version|version) echo "$APP_VERSION" ; exit 0 ;; + -v|-verbose) verbose=1 && shift ;; + -d|-debug) debug=1 && shift ;; + + -no-version-check) no_version_check=1 && shift ;; + + -mem) echo "!! WARNING !! -mem option is ignored. Please use -J-Xmx and -J-Xms" && shift 2 ;; + -jvm-debug) require_arg port "$1" "$2" && addDebugger $2 && shift 2 ;; + + -main) custom_mainclass="$2" && shift 2 ;; + + -java-home) require_arg path "$1" "$2" && jre=`eval echo $2` && java_cmd="$jre/bin/java" && shift 2 ;; + + -D*|-agentlib*|-XX*) addJava "$1" && shift ;; + -J*) addJava "${1:2}" && shift ;; + *) addResidual "$1" && shift ;; + esac + done + + if [[ no_more_snp_opts ]]; then + while [[ $# -gt 0 ]]; do + addResidual "$1" && shift + done + fi + + is_function_defined process_my_args && { + myargs=("${residual_args[@]}") + residual_args=() + process_my_args "${myargs[@]}" + } +} + +# Actually runs the script. +run() { + # TODO - check for sane environment + + # process the combined args, then reset "$@" to the residuals + process_args "$@" + set -- "${residual_args[@]}" + argumentCount=$# + + #check for jline terminal fixes on cygwin + if is_cygwin; then + stty -icanon min 1 -echo > /dev/null 2>&1 + addJava "-Djline.terminal=jline.UnixTerminal" + addJava "-Dsbt.cygwin=true" + fi + + # check java version + if [[ ! $no_version_check ]]; then + java_version_check + fi + + if [ -n "$custom_mainclass" ]; then + mainclass=("$custom_mainclass") + else + mainclass=("${app_mainclass[@]}") + fi + + # Now we check to see if there are any java opts on the environment. These get listed first, with the script able to override them. + if [[ "$JAVA_OPTS" != "" ]]; then + java_opts="${JAVA_OPTS}" + fi + + # run sbt + execRunner "$java_cmd" \ + ${java_opts[@]} \ + "${java_args[@]}" \ + -cp "$(fix_classpath "$app_classpath")" \ + "${mainclass[@]}" \ + "${app_commands[@]}" \ + "${residual_args[@]}" + + local exit_code=$? + if is_cygwin; then + stty icanon echo > /dev/null 2>&1 + fi + exit $exit_code +} + +# Loads a configuration file full of default command line options for this script. +loadConfigFile() { + cat "$1" | sed $'/^\#/d;s/\r$//' +} + +# Now check to see if it's a good enough version +# TODO - Check to see if we have a configured default java version, otherwise use 1.6 +java_version_check() { + readonly java_version=$("$java_cmd" -version 2>&1 | awk -F '"' '/version/ {print $2}') + if [[ "$java_version" == "" ]]; then + echo + echo No java installations was detected. + echo Please go to http://www.java.com/getjava/ and download + echo + exit 1 + else + local major=$(echo "$java_version" | cut -d'.' -f1) + if [[ "$major" -eq "1" ]]; then + local major=$(echo "$java_version" | cut -d'.' -f2) + fi + if [[ "$major" -lt "6" ]]; then + echo + echo The java installation you have is not up to date + echo $app_name requires at least version 1.6+, you have + echo version $java_version + echo + echo Please go to http://www.java.com/getjava/ and download + echo a valid Java Runtime and install before running $app_name. + echo + exit 1 + fi + fi +} + +### ------------------------------- ### +### Start of customized settings ### +### ------------------------------- ### +usage() { + cat < Define a custom main class + -jvm-debug Turn on JVM debugging, open at the given port. + # java version (default: java from PATH, currently $(java -version 2>&1 | grep version)) + -java-home alternate JAVA_HOME + # jvm options and output control + JAVA_OPTS environment variable, if unset uses "$java_opts" + -Dkey=val pass -Dkey=val directly to the java runtime + -J-X pass option -X directly to the java runtime + (-J is stripped) + # special option + -- To stop parsing built-in commands from the rest of the command-line. + e.g.) enabling debug and sending -d as app argument + \$ ./start-script -d -- -d +In the case of duplicated or conflicting options, basically the order above +shows precedence: JAVA_OPTS lowest, command line options highest except "--". +${{available_main_classes}} +EOM +} + +### ------------------------------- ### +### Main script ### +### ------------------------------- ### + +declare -a residual_args +declare -a java_args +declare -a app_commands +declare -r real_script_path="$(realpath "$0")" +declare -r app_home="$(realpath "$(dirname "$real_script_path")")" +# TODO - Check whether this is ok in cygwin... +declare -r lib_dir="$(realpath "${app_home}/../lib")" +declare -a app_mainclass=(${{app_mainclass}}) + +${{template_declares}} +# java_cmd is overrode in process_args when -java-home is used +declare java_cmd=$(get_java_cmd) + +# if configuration files exist, prepend their contents to $@ so it can be processed by this runner +[[ -f "$script_conf_file" ]] && set -- $(loadConfigFile "$script_conf_file") "$@" + +run "$@" \ No newline at end of file From 48954a3f18e5ddfac0073eb357e9abd192899782 Mon Sep 17 00:00:00 2001 From: dcereijodo Date: Fri, 14 Feb 2020 18:01:16 +0100 Subject: [PATCH 4/7] relax patience even more --- src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala b/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala index f08b898..b94b957 100644 --- a/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala +++ b/src/it/scala/com/pagantis/singer/flows/it/TestRequest.scala @@ -33,7 +33,7 @@ class TestRequest extends FlatSpec with Matchers with DefaultJsonProtocol with S Http().cachedHostConnectionPoolHttps[Request]("jsonplaceholder.typicode.com") private implicit val defaultPatience: PatienceConfig = - PatienceConfig(timeout = Span(10, Seconds), interval = Span(5, Millis)) + PatienceConfig(timeout = Span(15, Seconds), interval = Span(5, Millis)) import Request._ From 93dbb909aa5c466a8f923fa4a2cfc3ddcdec50ab Mon Sep 17 00:00:00 2001 From: dcereijodo Date: Sat, 15 Feb 2020 13:35:14 +0100 Subject: [PATCH 5/7] support request throttling --- src/main/resources/application.conf | 4 ++ .../com/pagantis/singer/flows/BatchHttp.scala | 43 ++++++++++++------- 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index e406ca0..05209ee 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -9,6 +9,10 @@ flow { port = 8080 path = "/maps/api/geocode/json" + # requests throughput can be throttled down to a number of requests per second + # if no throttling configuration is provided the maximum available is used + # throttle = 10 + parallelism = 8 # additional parameters to be inculded in the http query if any extra_params { diff --git a/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala b/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala index f86d68b..dd85f8d 100644 --- a/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala +++ b/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala @@ -3,12 +3,13 @@ package com.pagantis.singer.flows import akka.actor.{ActorSystem, Props} import akka.event.{Logging, LoggingAdapter} import akka.http.scaladsl.Http -import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{JsonFraming, StreamConverters} +import akka.http.scaladsl.model.HttpRequest +import akka.stream.{ActorMaterializer, Materializer} +import akka.stream.scaladsl.{Flow, JsonFraming, StreamConverters} import com.typesafe.config.ConfigFactory import net.ceedubs.ficus.Ficus._ -import scala.concurrent.duration.Duration +import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContextExecutor} import scala.util.Success @@ -28,12 +29,24 @@ object BatchHttp extends App { val port = config.as[Option[Int]]("flow.port") val parallelism = config.getInt("flow.parallelism") val frameLength = config.getInt("flow.frame_length") - - // init actor system, loggers and execution context - implicit val system: ActorSystem = ActorSystem("BatchHttp") - implicit val materializer: ActorMaterializer = ActorMaterializer() - implicit val standardLogger: LoggingAdapter = Logging(system, clazz) - implicit val ec: ExecutionContextExecutor = system.dispatcher + val throttlingConfig = config.as[Option[Int]]("flow.throttle") + + /** + * Instantiates a host connection pool ([[Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool]]]) + * and prepends a throttling stage if a throttling configuration is provided. + * @return + */ + def requestProcessor(implicit fm: Materializer) = { + val throttleOrNeutral = throttlingConfig match { + case Some(elements) => Flow[(HttpRequest, Request)].throttle(elements, 1 second) + case _ => Flow[(HttpRequest, Request)] + } + val connectionPool = port match { + case Some(number) => Http().cachedHostConnectionPoolHttps[Request](host = endpoint, port = number) + case None => Http().cachedHostConnectionPoolHttps[Request](host = endpoint) + } + throttleOrNeutral.via(connectionPool) + } // This shutdown sequence was copied from another related issue: https://github.com/akka/akka-http/issues/907#issuecomment-345288919 def shutdownSequence = { @@ -43,6 +56,11 @@ object BatchHttp extends App { } yield akka } + // init actor system, loggers and execution context + implicit val system: ActorSystem = ActorSystem("BatchHttp") + implicit val materializer: ActorMaterializer = ActorMaterializer() + implicit val standardLogger: LoggingAdapter = Logging(system, clazz) + implicit val ec: ExecutionContextExecutor = system.dispatcher val counter = system.actorOf(Props[CountLogger], "counter") @@ -62,12 +80,7 @@ object BatchHttp extends App { } ) .log(clazz) - .via( - port match { - case Some(number) => Http().cachedHostConnectionPoolHttps[Request](host = endpoint, port = number) - case None => Http().cachedHostConnectionPoolHttps[Request](host = endpoint) - } - ) + .via(requestProcessor) .log(clazz) .mapAsync(parallelism)(parseResponse(_)) .log(clazz) From dcc47a273eb85e401c6bee43d324901c051c48aa Mon Sep 17 00:00:00 2001 From: dcereijodo Date: Sat, 15 Feb 2020 13:37:25 +0100 Subject: [PATCH 6/7] fix typo in README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 60141fb..a5abad5 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # BatchHttp [![CircleCI](https://circleci.com/gh/digitalorigin/batch-http.svg?style=svg&circle-token=d196d5b828e9e0debb5c25f04e7279c1f342d675)](https://circleci.com/gh/digitalorigin/batch-http) A tool for processing HTTP request batches through a REST API. It reads the `stdin` for JSON lines representing HTTP requests, -converts each line to an HTTP and executes it, providing both the request and the response as an output in the `stdout`. +converts each line to an HTTP request and makes the request, providing both the request and the response as an output in the `stdout`. For example, when passed a JSON string such as ```json From 56b6eb1e2eaeb36c541e219c09339226b77044c8 Mon Sep 17 00:00:00 2001 From: dcereijodo Date: Mon, 17 Feb 2020 17:10:12 +0100 Subject: [PATCH 7/7] fix bad merge --- src/main/scala/com/pagantis/singer/flows/BatchHttp.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala b/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala index 72af291..dd85f8d 100644 --- a/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala +++ b/src/main/scala/com/pagantis/singer/flows/BatchHttp.scala @@ -48,12 +48,6 @@ object BatchHttp extends App { throttleOrNeutral.via(connectionPool) } - // init actor system, loggers and execution context - implicit val system: ActorSystem = ActorSystem("BatchHttp") - implicit val materializer: ActorMaterializer = ActorMaterializer() - implicit val standardLogger: LoggingAdapter = Logging(system, clazz) - implicit val ec: ExecutionContextExecutor = system.dispatcher - // This shutdown sequence was copied from another related issue: https://github.com/akka/akka-http/issues/907#issuecomment-345288919 def shutdownSequence = { for {