diff --git a/sse/src/main/scala/akka/stream/alpakka/sse/scaladsl/EventSource.scala b/sse/src/main/scala/akka/stream/alpakka/sse/scaladsl/EventSource.scala index 1b58c89c1c..39d2e16a25 100644 --- a/sse/src/main/scala/akka/stream/alpakka/sse/scaladsl/EventSource.scala +++ b/sse/src/main/scala/akka/stream/alpakka/sse/scaladsl/EventSource.scala @@ -8,6 +8,7 @@ package scaladsl import akka.NotUsed import akka.actor.ActorSystem import akka.http.scaladsl.client.RequestBuilding.Get +import akka.http.scaladsl.coding.Gzip import akka.http.scaladsl.model.headers.Accept import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri} import akka.http.scaladsl.unmarshalling.Unmarshal @@ -26,7 +27,7 @@ import scala.concurrent.duration.{Duration, FiniteDuration} * This stream processing stage establishes a continuous source of server-sent events from the given URI. * * A single source of server-sent events is obtained from the URI. Once completed, either normally or by failure, a next - * one is obtained thereby sending a Last-Evend-ID header if available. This continues in an endless cycle. + * one is obtained thereby sending a Last-Event-ID header if available. This continues in an endless cycle. * * The shape of this processing stage is a source of server-sent events; to take effect it must be connected and run. * Progress (including termination) is controlled by the connected flow or sink, e.g. a retry delay can be implemented @@ -76,7 +77,7 @@ object EventSource { /** * @param uri URI with absolute path, e.g. "http://myserver/events * @param send function to send a HTTP request - * @param initialLastEventId initial value for Last-Evend-ID header, `None` by default + * @param initialLastEventId initial value for Last-Event-ID header, `None` by default * @param retryDelay delay for retrying after completion, `0` by default * @param mat implicit `Materializer`, needed to obtain server-sent events * @return continuous source of server-sent events @@ -97,7 +98,10 @@ object EventSource { val r = Get(uri).addHeader(Accept(`text/event-stream`)) lastEventId.foldLeft(r)((r, i) => r.addHeader(`Last-Event-ID`(i))) } - send(request).flatMap(Unmarshal(_).to[EventSource]).fallbackTo(Future.successful(noEvents)) + send(request) + .map(response => Gzip.decodeMessage(response)) + .flatMap(Unmarshal(_).to[EventSource]) + .fallbackTo(Future.successful(noEvents)) } def recover(eventSource: EventSource) = eventSource.recoverWithRetries(1, { case _ => noEvents }) def delimit(eventSource: EventSource) = eventSource.concat(singleDelimiter) diff --git a/sse/src/test/scala/docs/scaladsl/EventSourceSpec.scala b/sse/src/test/scala/docs/scaladsl/EventSourceSpec.scala index 91bee94291..91aada7e4d 100644 --- a/sse/src/test/scala/docs/scaladsl/EventSourceSpec.scala +++ b/sse/src/test/scala/docs/scaladsl/EventSourceSpec.scala @@ -186,12 +186,7 @@ final class EventSourceSpec extends AsyncWordSpec with Matchers with BeforeAndAf val nrOfSamples = 20 val (host, port) = hostAndPort() val server = system.actorOf(Props(new Server(host, port, 2, shouldSetEventId = true))) - val send: HttpRequest => Future[HttpResponse] = Http() - .singleRequest(_) - .map { possiblyGzipped => - val response = akka.http.scaladsl.coding.Gzip.decodeMessage(possiblyGzipped) - response - } + val send: HttpRequest => Future[HttpResponse] = Http().singleRequest(_) val eventSource = EventSource(Uri(s"http://$host:$port/gzipped"), send, None, 1.second) val events = eventSource.take(nrOfSamples).runWith(Sink.seq)