Skip to content

Commit

Permalink
SSE: Support gzip encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeff Lai committed Jun 30, 2019
1 parent c137fcf commit d1d394b
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package scaladsl
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.client.RequestBuilding.Get
import akka.http.scaladsl.coding.Gzip
import akka.http.scaladsl.model.headers.Accept
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri}
import akka.http.scaladsl.unmarshalling.Unmarshal
Expand All @@ -26,7 +27,7 @@ import scala.concurrent.duration.{Duration, FiniteDuration}
* This stream processing stage establishes a continuous source of server-sent events from the given URI.
*
* A single source of server-sent events is obtained from the URI. Once completed, either normally or by failure, a next
* one is obtained thereby sending a Last-Evend-ID header if available. This continues in an endless cycle.
* one is obtained thereby sending a Last-Event-ID header if available. This continues in an endless cycle.
*
* The shape of this processing stage is a source of server-sent events; to take effect it must be connected and run.
* Progress (including termination) is controlled by the connected flow or sink, e.g. a retry delay can be implemented
Expand Down Expand Up @@ -76,7 +77,7 @@ object EventSource {
/**
* @param uri URI with absolute path, e.g. "http://myserver/events
* @param send function to send a HTTP request
* @param initialLastEventId initial value for Last-Evend-ID header, `None` by default
* @param initialLastEventId initial value for Last-Event-ID header, `None` by default
* @param retryDelay delay for retrying after completion, `0` by default
* @param mat implicit `Materializer`, needed to obtain server-sent events
* @return continuous source of server-sent events
Expand All @@ -97,7 +98,10 @@ object EventSource {
val r = Get(uri).addHeader(Accept(`text/event-stream`))
lastEventId.foldLeft(r)((r, i) => r.addHeader(`Last-Event-ID`(i)))
}
send(request).flatMap(Unmarshal(_).to[EventSource]).fallbackTo(Future.successful(noEvents))
send(request)
.map(response => Gzip.decodeMessage(response))
.flatMap(Unmarshal(_).to[EventSource])
.fallbackTo(Future.successful(noEvents))
}
def recover(eventSource: EventSource) = eventSource.recoverWithRetries(1, { case _ => noEvents })
def delimit(eventSource: EventSource) = eventSource.concat(singleDelimiter)
Expand Down
7 changes: 1 addition & 6 deletions sse/src/test/scala/docs/scaladsl/EventSourceSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,7 @@ final class EventSourceSpec extends AsyncWordSpec with Matchers with BeforeAndAf
val nrOfSamples = 20
val (host, port) = hostAndPort()
val server = system.actorOf(Props(new Server(host, port, 2, shouldSetEventId = true)))
val send: HttpRequest => Future[HttpResponse] = Http()
.singleRequest(_)
.map { possiblyGzipped =>
val response = akka.http.scaladsl.coding.Gzip.decodeMessage(possiblyGzipped)
response
}
val send: HttpRequest => Future[HttpResponse] = Http().singleRequest(_)

val eventSource = EventSource(Uri(s"http://$host:$port/gzipped"), send, None, 1.second)
val events = eventSource.take(nrOfSamples).runWith(Sink.seq)
Expand Down

0 comments on commit d1d394b

Please sign in to comment.