From d3f0705d8b6a034de44388e7c949433a33e02604 Mon Sep 17 00:00:00 2001 From: Erik LaBianca Date: Tue, 9 Jul 2019 05:05:42 -0400 Subject: [PATCH] Add example showing consuming a stream of requests (#2270) * Add example showing consuming a stream of requests - Fixes #1836 --- .../implications-of-streaming-http-entity.md | 23 +++++++++ .../javadsl/HttpClientExampleDocTest.java | 44 ++++++++++++++++ .../http/scaladsl/HttpClientExampleSpec.scala | 51 +++++++++++++++++++ 3 files changed, 118 insertions(+) diff --git a/docs/src/main/paradox/implications-of-streaming-http-entity.md b/docs/src/main/paradox/implications-of-streaming-http-entity.md index c4cdcd48a1e..17cb5a28d5b 100644 --- a/docs/src/main/paradox/implications-of-streaming-http-entity.md +++ b/docs/src/main/paradox/implications-of-streaming-http-entity.md @@ -45,6 +45,29 @@ Scala Java : @@snip [HttpClientExampleDocTest.java]($test$/java/docs/http/javadsl/HttpClientExampleDocTest.java) { #manual-entity-consume-example-2 } +### Integrating with Akka Streams +In some cases, it is necessary to process the results of a series of Akka HTTP calls as Akka Streams. In order +to ensure that the HTTP Response Entity is consumed in a timely manner, the Akka HTTP stream for each request must +be executed and completely consumed, then sent along for further processing. + +Failing to account for this behavior can result in seemingly non-deterministic failures due to complex interactions +between http and stream buffering. This manifests as errors such as the following: + +``` +Response entity was not subscribed after 1 second. Make sure to read the response entity body or call `discardBytes()` on it. +``` + +This error indicates that the http response has been available for too long without being consumed. It can be +partially worked around by increasing the subscription timeout, but you will still run the risk of running into network +level timeouts and could still exceed the timeout under load so it's best to resolve the issue properly such as in +the examples below: + +Scala +: @@snip [HttpClientExampleSpec.scala]($test$/scala/docs/http/scaladsl/HttpClientExampleSpec.scala) { #manual-entity-consume-example-3 } + +Java +: @@snip [HttpClientExampleDocTest.java]($test$/java/docs/http/javadsl/HttpClientExampleDocTest.java) { #manual-entity-consume-example-3 } + ### Discarding the HTTP Response Entity (Client) Sometimes when calling HTTP services we do not care about their response payload (e.g. all we care about is the response code), diff --git a/docs/src/test/java/docs/http/javadsl/HttpClientExampleDocTest.java b/docs/src/test/java/docs/http/javadsl/HttpClientExampleDocTest.java index 5f2c675eede..e14834f9439 100644 --- a/docs/src/test/java/docs/http/javadsl/HttpClientExampleDocTest.java +++ b/docs/src/test/java/docs/http/javadsl/HttpClientExampleDocTest.java @@ -5,6 +5,7 @@ package docs.http.javadsl; import akka.Done; +import akka.NotUsed; import akka.actor.*; import akka.http.javadsl.model.headers.HttpCredentials; import akka.http.javadsl.model.headers.SetCookie; @@ -21,6 +22,9 @@ import static akka.util.ByteString.emptyByteString; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletionStage; //#manual-entity-consume-example-1 @@ -110,6 +114,46 @@ public ExamplePerson parse(ByteString line) { //#manual-entity-consume-example-2 } + private static class ConsumeExample3 { + //#manual-entity-consume-example-3 + final class ExamplePerson { + final String name; + public ExamplePerson(String name) { this.name = name; } + } + + public ExamplePerson parse(ByteString line) { + return new ExamplePerson(line.utf8String()); + } + + final ActorSystem system = ActorSystem.create(); + final ExecutionContextExecutor dispatcher = system.dispatcher(); + final ActorMaterializer materializer = ActorMaterializer.create(system); + + // run a single request, consuming it completely in a single stream + public CompletionStage runRequest(HttpRequest request) { + return Http.get(system) + .singleRequest(request) + .thenCompose(response -> + response.entity().getDataBytes() + .runReduce((a, b) -> a.concat(b), materializer) + .thenApply(this::parse) + ); + } + + final List requests = new ArrayList<>(); + + final Flow exampleProcessingFlow = Flow + .fromFunction(person -> person.toString().length()); + + final CompletionStage stream = Source + .from(requests) + .mapAsync(1, this::runRequest) + .via(exampleProcessingFlow) + .runWith(Sink.ignore(), materializer); + + //#manual-entity-consume-example-3 + } + void manualEntityDiscardExample1() { //#manual-entity-discard-example-1 final ActorSystem system = ActorSystem.create(); diff --git a/docs/src/test/scala/docs/http/scaladsl/HttpClientExampleSpec.scala b/docs/src/test/scala/docs/http/scaladsl/HttpClientExampleSpec.scala index c3ff8e3f322..5ab9eff5195 100644 --- a/docs/src/test/scala/docs/http/scaladsl/HttpClientExampleSpec.scala +++ b/docs/src/test/scala/docs/http/scaladsl/HttpClientExampleSpec.scala @@ -71,6 +71,57 @@ class HttpClientExampleSpec extends WordSpec with Matchers with CompileOnlySpec //#manual-entity-consume-example-2 } + "manual-entity-consume-example-3" in compileOnlySpec { + //#manual-entity-consume-example-3 + import scala.concurrent.duration._ + import scala.concurrent.Future + + import akka.NotUsed + import akka.actor.ActorSystem + import akka.http.scaladsl.Http + import akka.http.scaladsl.model._ + import akka.stream.ActorMaterializer + import akka.util.ByteString + import akka.stream.scaladsl.{ Flow, Sink, Source } + + implicit val system = ActorSystem() + implicit val dispatcher = system.dispatcher + implicit val materializer = ActorMaterializer() + + case class ExamplePerson(name: String) + + def parse(line: ByteString): Option[ExamplePerson] = + line.utf8String.split(" ").headOption.map(ExamplePerson) + + val requests: Source[HttpRequest, NotUsed] = Source + .fromIterator(() => + Range(0, 10).map(i => HttpRequest(uri = Uri(s"https://localhost/people/$i"))).iterator + ) + + val processorFlow: Flow[Option[ExamplePerson], Int, NotUsed] = + Flow[Option[ExamplePerson]].map(_.map(_.name.length).getOrElse(0)) + + // Run and completely consume a single akka http request + def runRequest(req: HttpRequest): Future[Option[ExamplePerson]] = + Http() + .singleRequest(req) + .flatMap { response => + response.entity.dataBytes + .runReduce(_ ++ _) + .map(parse) + } + + // Run each akka http flow to completion, then continue processing. You'll want to tune the `parallelism` + // parameter to mapAsync -- higher values will create more cpu and memory load which may or may not positively + // impact performance. + requests + .mapAsync(2)(runRequest) + .via(processorFlow) + .runWith(Sink.ignore) + + //#manual-entity-consume-example-3 + } + "manual-entity-discard-example-1" in compileOnlySpec { //#manual-entity-discard-example-1 import akka.actor.ActorSystem