Skip to content

Commit

Permalink
Add example showing consuming a stream of requests (#2270)
Browse files Browse the repository at this point in the history
* Add example showing consuming a stream of requests

- Fixes #1836
  • Loading branch information
easel authored and raboof committed Jul 9, 2019
1 parent 3bd6df9 commit d3f0705
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 0 deletions.
23 changes: 23 additions & 0 deletions docs/src/main/paradox/implications-of-streaming-http-entity.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,29 @@ Scala
Java
: @@snip [HttpClientExampleDocTest.java]($test$/java/docs/http/javadsl/HttpClientExampleDocTest.java) { #manual-entity-consume-example-2 }

### Integrating with Akka Streams
In some cases, it is necessary to process the results of a series of Akka HTTP calls as Akka Streams. In order
to ensure that the HTTP Response Entity is consumed in a timely manner, the Akka HTTP stream for each request must
be executed and completely consumed, then sent along for further processing.

Failing to account for this behavior can result in seemingly non-deterministic failures due to complex interactions
between http and stream buffering. This manifests as errors such as the following:

```
Response entity was not subscribed after 1 second. Make sure to read the response entity body or call `discardBytes()` on it.
```

This error indicates that the http response has been available for too long without being consumed. It can be
partially worked around by increasing the subscription timeout, but you will still run the risk of running into network
level timeouts and could still exceed the timeout under load so it's best to resolve the issue properly such as in
the examples below:

Scala
: @@snip [HttpClientExampleSpec.scala]($test$/scala/docs/http/scaladsl/HttpClientExampleSpec.scala) { #manual-entity-consume-example-3 }

Java
: @@snip [HttpClientExampleDocTest.java]($test$/java/docs/http/javadsl/HttpClientExampleDocTest.java) { #manual-entity-consume-example-3 }

### Discarding the HTTP Response Entity (Client)

Sometimes when calling HTTP services we do not care about their response payload (e.g. all we care about is the response code),
Expand Down
44 changes: 44 additions & 0 deletions docs/src/test/java/docs/http/javadsl/HttpClientExampleDocTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package docs.http.javadsl;

import akka.Done;
import akka.NotUsed;
import akka.actor.*;
import akka.http.javadsl.model.headers.HttpCredentials;
import akka.http.javadsl.model.headers.SetCookie;
Expand All @@ -21,6 +22,9 @@
import static akka.util.ByteString.emptyByteString;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;

//#manual-entity-consume-example-1
Expand Down Expand Up @@ -110,6 +114,46 @@ public ExamplePerson parse(ByteString line) {
//#manual-entity-consume-example-2
}

private static class ConsumeExample3 {
//#manual-entity-consume-example-3
final class ExamplePerson {
final String name;
public ExamplePerson(String name) { this.name = name; }
}

public ExamplePerson parse(ByteString line) {
return new ExamplePerson(line.utf8String());
}

final ActorSystem system = ActorSystem.create();
final ExecutionContextExecutor dispatcher = system.dispatcher();
final ActorMaterializer materializer = ActorMaterializer.create(system);

// run a single request, consuming it completely in a single stream
public CompletionStage<ExamplePerson> runRequest(HttpRequest request) {
return Http.get(system)
.singleRequest(request)
.thenCompose(response ->
response.entity().getDataBytes()
.runReduce((a, b) -> a.concat(b), materializer)
.thenApply(this::parse)
);
}

final List<HttpRequest> requests = new ArrayList<>();

final Flow<ExamplePerson, Integer, NotUsed> exampleProcessingFlow = Flow
.fromFunction(person -> person.toString().length());

final CompletionStage<Done> stream = Source
.from(requests)
.mapAsync(1, this::runRequest)
.via(exampleProcessingFlow)
.runWith(Sink.ignore(), materializer);

//#manual-entity-consume-example-3
}

void manualEntityDiscardExample1() {
//#manual-entity-discard-example-1
final ActorSystem system = ActorSystem.create();
Expand Down
51 changes: 51 additions & 0 deletions docs/src/test/scala/docs/http/scaladsl/HttpClientExampleSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,57 @@ class HttpClientExampleSpec extends WordSpec with Matchers with CompileOnlySpec
//#manual-entity-consume-example-2
}

"manual-entity-consume-example-3" in compileOnlySpec {
//#manual-entity-consume-example-3
import scala.concurrent.duration._
import scala.concurrent.Future

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.util.ByteString
import akka.stream.scaladsl.{ Flow, Sink, Source }

implicit val system = ActorSystem()
implicit val dispatcher = system.dispatcher
implicit val materializer = ActorMaterializer()

case class ExamplePerson(name: String)

def parse(line: ByteString): Option[ExamplePerson] =
line.utf8String.split(" ").headOption.map(ExamplePerson)

val requests: Source[HttpRequest, NotUsed] = Source
.fromIterator(() =>
Range(0, 10).map(i => HttpRequest(uri = Uri(s"https://localhost/people/$i"))).iterator
)

val processorFlow: Flow[Option[ExamplePerson], Int, NotUsed] =
Flow[Option[ExamplePerson]].map(_.map(_.name.length).getOrElse(0))

// Run and completely consume a single akka http request
def runRequest(req: HttpRequest): Future[Option[ExamplePerson]] =
Http()
.singleRequest(req)
.flatMap { response =>
response.entity.dataBytes
.runReduce(_ ++ _)
.map(parse)
}

// Run each akka http flow to completion, then continue processing. You'll want to tune the `parallelism`
// parameter to mapAsync -- higher values will create more cpu and memory load which may or may not positively
// impact performance.
requests
.mapAsync(2)(runRequest)
.via(processorFlow)
.runWith(Sink.ignore)

//#manual-entity-consume-example-3
}

"manual-entity-discard-example-1" in compileOnlySpec {
//#manual-entity-discard-example-1
import akka.actor.ActorSystem
Expand Down

0 comments on commit d3f0705

Please sign in to comment.