Important: As of version 10.0.8, Akka HTTP now includes Akka SSE. Hence this project has come to an end.
Akka SSE adds support for Server-Sent Events (SSE) – a lightweight and standardized technology for pushing notifications from a HTTP server to a HTTP client – to Akka HTTP. In contrast to WebSocket, which enables two-way communication, SSE only allows for one-way communication from the server to the client. If that's all you need, SSE offers advantages, because it's much simpler and relies on HTTP only.
Since version 2 Akka SSE supports both Scala and Java, even if the below examples only show Scala.
Akka SSE is published to Bintray and Maven Central.
// All releases including intermediate ones are published here,
// final ones are also published to Maven Central.
resolvers += Resolver.bintrayRepo("hseeberger", "maven")
libraryDependencies ++= Seq(
"de.heikoseeberger" %% "akka-sse" % "3.0.0",
...
)
Akka SSE models an event stream as Source[ServerSentEvent, Any]
with Source
from Akka Streams
and ServerSentEvent
from Akka SSE. ServerSentEvent
is a case class with the following fields:
data: String
– the actual payload, may span multiple linestype: Option[String]
with defaultNone
– optional qualifier, e.g. "added", "removed", etc.id: Option[String]
with defaultNone
– optional identifierretry: Option[Int]
with defaultNone
– optional reconnection delay in milliseconds
More informatioon about the above fields can be found in the SSE specification.
In order to respond to a HTTP request with an event stream, you have to bring the implicit ToResponseMarshaller[Source[ServerSentEvent, Any]]
defined by EventStreamMarshalling
into the
scope defining the respective route:
object TimeServer {
...
private def route = {
import Directives._
import EventStreamMarshalling._ // That does the trick!
def assets = ...
def events =
path("events") {
get {
complete {
Source
.tick(2.seconds, 2.seconds, NotUsed)
.map(_ => LocalTime.now())
.map(timeToServerSentEvent)
.keepAlive(1.second, () => ServerSentEvent.heartbeat)
}
}
}
assets ~ events
}
private def timeToServerSentEvent(time: LocalTime) =
ServerSentEvent(DateTimeFormatter.ISO_LOCAL_TIME.format(time))
}
To send periodic heartbeats, simply use the keepAlive
standard stage with a
ServerSentEvent.heartbeat
which has am empty data
field and hence is ignored by clients
according to the specification.
In order to unmarshal server-sent events as Source[ServerSentEvent, NotUsed]
, you have to bring
the implicit FromEntityUnmarshaller[Source[ServerSentEvent, NotUsed]]
defined by
EventStreamUnmarshalling
into scope:
import EventStreamUnmarshalling._ // That does the trick!
import system.dispatcher
Http()
.singleRequest(Get("http://localhost:8000/events"))
.flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
.foreach(_.runForeach(println))
Contributions via GitHub pull requests are gladly accepted from their original author. Along with any pull requests, please state that the contribution is your original work and that you license the work to the project under the project's open source license. Whether or not you state this explicitly, by submitting any copyrighted material via pull request, email, or other means you agree to license the material under the project's open source license and warrant that you have the legal authority to do so.
This code is open source software licensed under the Apache 2.0 License.