Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Fixes #4948 | Lazy event parsing (#4986)
Browse files Browse the repository at this point in the history
Change SSE event stream handler to use events that are
lazy parsed to JSON. This will reduce CPU time when most events
are filtered and/or when more then one subscriper is connected.
  • Loading branch information
janisz authored and unterstein committed Mar 9, 2017
1 parent e5903dc commit 62bb2c7
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 26 deletions.
5 changes: 5 additions & 0 deletions src/main/scala/mesosphere/marathon/core/event/Events.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package mesosphere.marathon.core.event

import akka.event.EventStream
import com.fasterxml.jackson.annotation.JsonIgnore
import mesosphere.marathon.api.v2.json.Formats.eventToJson
import mesosphere.marathon.core.condition.Condition
import mesosphere.marathon.core.health.HealthCheck
import mesosphere.marathon.core.instance.update.InstanceChange
Expand All @@ -9,12 +11,15 @@ import mesosphere.marathon.core.instance.Instance
import mesosphere.marathon.state.{ AppDefinition, PathId, Timestamp }
import mesosphere.marathon.upgrade.{ DeploymentPlan, DeploymentStep }
import org.apache.mesos.{ Protos => Mesos }
import play.api.libs.json.Json

import scala.collection.immutable.Seq

sealed trait MarathonEvent {
val eventType: String
val timestamp: String
@JsonIgnore
lazy val jsonString: String = Json.stringify(eventToJson(this))
}

// api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mesosphere.marathon.core.event.impl.stream
import akka.actor._
import com.google.inject.Inject
import mesosphere.marathon.core.election.{ ElectionService, LocalLeadershipEvent }
import mesosphere.marathon.core.event.MarathonEvent
import mesosphere.marathon.core.event.impl.stream.HttpEventStreamActor._
import mesosphere.marathon.metrics.Metrics.AtomicIntGauge
import mesosphere.marathon.metrics.{ MetricPrefixes, Metrics }
Expand All @@ -16,7 +17,7 @@ import scala.util.Try
trait HttpEventStreamHandle {
def id: String
def remoteAddress: String
def sendEvent(event: String, message: String): Unit
def sendEvent(event: MarathonEvent): Unit
def close(): Unit
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import java.io.EOFException
import akka.actor.{ Actor, ActorLogging, Status }
import akka.event.EventStream
import akka.pattern.pipe
import mesosphere.marathon.api.v2.json.Formats._
import mesosphere.marathon.core.event.impl.stream.HttpEventStreamHandleActor._
import mesosphere.marathon.core.event.{ EventStreamAttached, EventStreamDetached, MarathonEvent }
import mesosphere.util.ThreadPoolContext
import play.api.libs.json.Json

import scala.concurrent.Future
import scala.util.Try
Expand Down Expand Up @@ -60,7 +58,7 @@ class HttpEventStreamHandleActor(
outstanding = List.empty[MarathonEvent]
context.become(stashEvents)
val sendFuture = Future {
toSend.foreach(event => handle.sendEvent(event.eventType, Json.stringify(eventToJson(event))))
toSend.foreach(event => handle.sendEvent(event))
WorkDone
}(ThreadPoolContext.ioContext)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import javax.servlet.http.{ Cookie, HttpServletRequest, HttpServletResponse }

import akka.actor.ActorRef
import mesosphere.marathon.api.RequestFacade
import mesosphere.marathon.core.event.EventConf
import mesosphere.marathon.core.event.{ EventConf, MarathonEvent }
import mesosphere.marathon.core.event.impl.stream.HttpEventStreamActor._
import mesosphere.marathon.plugin.auth._
import mesosphere.marathon.plugin.http.HttpResponse
Expand All @@ -24,7 +24,7 @@ class HttpEventSSEHandle(request: HttpServletRequest, emitter: Emitter) extends

lazy val id: String = UUID.randomUUID().toString

val subscribedEventTypes = request.getParameterMap.getOrDefault("event_type", Array.empty).toSet
private val subscribedEventTypes = request.getParameterMap.getOrDefault("event_type", Array.empty).toSet

def subscribed(eventType: String): Boolean = {
subscribedEventTypes.isEmpty || subscribedEventTypes.contains(eventType)
Expand All @@ -34,8 +34,8 @@ class HttpEventSSEHandle(request: HttpServletRequest, emitter: Emitter) extends

override def close(): Unit = emitter.close()

override def sendEvent(event: String, message: String): Unit = {
if (subscribed(event)) blocking(emitter.event(event, message))
override def sendEvent(event: MarathonEvent): Unit = {
if (subscribed(event.eventType)) blocking(emitter.event(event.eventType, event.jsonString))
}

override def toString: String = s"HttpEventSSEHandle($id on $remoteAddress on event types from $subscribedEventTypes)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package core.event.impl.stream
import java.util.Collections
import javax.servlet.http.HttpServletRequest

import mesosphere.marathon.core.event.{ Subscribe, Unsubscribe }
import mesosphere.marathon.test.{ MarathonSpec, Mockito }
import org.eclipse.jetty.servlets.EventSource.Emitter
import org.scalatest.{ GivenWhenThen, Matchers }
Expand All @@ -12,30 +13,30 @@ import mesosphere.marathon.stream._
class HttpEventSSEHandleTest extends MarathonSpec with Matchers with Mockito with GivenWhenThen {

test("events should be filtered") {
Given("An emiter")
Given("An emitter")
val emitter = mock[Emitter]
Given("An request with params")
val req = mock[HttpServletRequest]
req.getParameterMap returns Map("event_type" -> Array("xyz"))
req.getParameterMap returns Map("event_type" -> Array(unsubscribe.eventType))

Given("handler for request is created")
val handle = new HttpEventSSEHandle(req, emitter)

When("Want to sent unwanted event")
handle.sendEvent("any event", "")
handle.sendEvent(subscribed)

Then("event should NOT be sent")
verify(emitter, never).event("any event", "")
verify(emitter, never).event(eq(subscribed.eventType), any[String])

When("Want to sent subscribed event")
handle.sendEvent("xyz", "")
handle.sendEvent(unsubscribe)

Then("event should be sent")
verify(emitter).event("xyz", "")
verify(emitter).event(eq(unsubscribe.eventType), any[String])
}

test("events should NOT be filtered") {
Given("An emiter")
Given("An emitter")
val emitter = mock[Emitter]

Given("An request without params")
Expand All @@ -46,16 +47,20 @@ class HttpEventSSEHandleTest extends MarathonSpec with Matchers with Mockito wit
val handle = new HttpEventSSEHandle(req, emitter)

When("Want to sent event")
handle.sendEvent("any event", "")
Then("event should NOT be sent")
handle.sendEvent(subscribed)

Then("event should be sent")
verify(emitter).event(eq(subscribed.eventType), any[String])

verify(emitter).event("any event", "")
When("Want to sent event")

handle.sendEvent("xyz", "")
handle.sendEvent(unsubscribe)

Then("event should be sent")
verify(emitter).event("xyz", "")
verify(emitter).event(eq(unsubscribe.eventType), any[String])
}

val subscribed = Subscribe("client IP", "callback URL")
val unsubscribe = Unsubscribe("client IP", "callback URL")
}

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.util.concurrent.CountDownLatch
import akka.actor.Props
import akka.event.EventStream
import akka.testkit.{ EventFilter, ImplicitSender, TestActorRef }
import mesosphere.marathon.core.event.{ EventStreamAttached, EventStreamDetached, Subscribe }
import mesosphere.marathon.core.event.{ EventStreamAttached, EventStreamDetached, MarathonEvent, Subscribe }
import mesosphere.marathon.test.{ MarathonActorSupport, MarathonSpec, Mockito }
import org.scalatest.{ BeforeAndAfter, GivenWhenThen, Matchers }

Expand All @@ -18,20 +18,20 @@ class HttpEventStreamHandleActorTest extends MarathonActorSupport
test("A message send to the handle actor will be transferred to the stream handle") {
Given("A handler that will postpone sending until latch is hit")
val latch = new CountDownLatch(1)
handle.sendEvent(any[String], any[String]) answers (_ => latch.countDown())
handle.sendEvent(any[MarathonEvent]) answers (_ => latch.countDown())

When("The event is send to the actor, the outstanding messages is 1")
handleActor ! EventStreamAttached("remote")

Then("We need to wait for the future to succeed")
awaitCond(latch.getCount == 0)
verify(handle, times(1)).sendEvent(any[String], any[String])
verify(handle, times(1)).sendEvent(any[MarathonEvent])
}

test("If the consumer is slow and maxOutstanding limit is reached, messages get dropped") {
Given("A handler that will postpone the sending")
val latch = new CountDownLatch(1)
handle.sendEvent(any[String], any[String]) answers (_ => latch.await())
handle.sendEvent(any[MarathonEvent]) answers (_ => latch.await())
val filter = EventFilter(pattern = "Ignore event.*", occurrences = 1)

When("More than the max size of outstanding events is send to the actor")
Expand All @@ -46,7 +46,7 @@ class HttpEventStreamHandleActorTest extends MarathonActorSupport

test("If the handler throws an EOF exception, the actor stops acting") {
Given("A handler that will postpone the sending")
handle.sendEvent(any[String], any[String]) answers { _ => throw new EOFException() }
handle.sendEvent(any[MarathonEvent]) answers { _ => throw new EOFException() }
val filter = EventFilter(pattern = "Received EOF.*", occurrences = 1)

When("An event is send to actor")
Expand All @@ -60,7 +60,7 @@ class HttpEventStreamHandleActorTest extends MarathonActorSupport
Given("A handler that will postpone the sending")
val latch = new CountDownLatch(1)
var events = List.empty[String]
handle.sendEvent(any[String], any[String]) answers { args => events ::= args(0).asInstanceOf[String]; latch.await() }
handle.sendEvent(any[MarathonEvent]) answers { args => events ::= args(0).asInstanceOf[MarathonEvent].eventType; latch.await() }
handleActor = TestActorRef(Props(
new HttpEventStreamHandleActor(handle, stream, 50)
))
Expand Down

0 comments on commit 62bb2c7

Please sign in to comment.