diff --git a/src/main/scala/ScalatraBootstrap.scala b/src/main/scala/ScalatraBootstrap.scala index b0e4d37a..098487d2 100644 --- a/src/main/scala/ScalatraBootstrap.scala +++ b/src/main/scala/ScalatraBootstrap.scala @@ -1,6 +1,7 @@ import javax.servlet.ServletContext import org.codeoverflow.chatoverflow.ui.web.rest.config.ConfigController import org.codeoverflow.chatoverflow.ui.web.rest.connector.ConnectorController +import org.codeoverflow.chatoverflow.ui.web.rest.events.{EventsController, EventsDispatcher} import org.codeoverflow.chatoverflow.ui.web.rest.plugin.PluginInstanceController import org.codeoverflow.chatoverflow.ui.web.rest.types.TypeController import org.codeoverflow.chatoverflow.ui.web.{CodeOverflowSwagger, OpenAPIServlet} @@ -21,6 +22,9 @@ class ScalatraBootstrap extends LifeCycle { context.initParameters("org.scalatra.cors.allowedMethods") = "*" // Add all servlets and controller + val eventsController = new EventsController() + EventsDispatcher.init(eventsController) + context.mount(eventsController, "/events/*", "events") context.mount(new TypeController(), "/types/*", "types") context.mount(new ConfigController(), "/config/*", "config") context.mount(new PluginInstanceController(), "/instances/*", "instances") diff --git a/src/main/scala/org/codeoverflow/chatoverflow/framework/manager/PluginManagerImpl.scala b/src/main/scala/org/codeoverflow/chatoverflow/framework/manager/PluginManagerImpl.scala index 85a91c9b..10e5b4d8 100644 --- a/src/main/scala/org/codeoverflow/chatoverflow/framework/manager/PluginManagerImpl.scala +++ b/src/main/scala/org/codeoverflow/chatoverflow/framework/manager/PluginManagerImpl.scala @@ -4,6 +4,7 @@ import java.util import org.codeoverflow.chatoverflow.WithLogger import org.codeoverflow.chatoverflow.api.plugin.{PluginLogMessage, PluginManager} +import org.codeoverflow.chatoverflow.ui.web.rest.events.{EventMessage, EventsDispatcher} import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer @@ -23,11 +24,18 @@ class PluginManagerImpl(pluginInstanceName: String, logOutputOnConsole: Boolean) * @param message the message to show */ override def log(message: String): Unit = { - logMessages += new PluginLogMessage(message) + val logMessage = new PluginLogMessage(message) + logMessages += logMessage if (logOutputOnConsole) { logger info s"[$pluginInstanceName] $message" } + + EventsDispatcher.broadcast("instance", EventMessage("log", Map( + ("name", pluginInstanceName), + ("message", message), + ("timestamp", logMessage.getTimestamp.toString) + ))) } /** diff --git a/src/main/scala/org/codeoverflow/chatoverflow/instance/PluginInstance.scala b/src/main/scala/org/codeoverflow/chatoverflow/instance/PluginInstance.scala index 86aac0a8..a7adca24 100644 --- a/src/main/scala/org/codeoverflow/chatoverflow/instance/PluginInstance.scala +++ b/src/main/scala/org/codeoverflow/chatoverflow/instance/PluginInstance.scala @@ -8,6 +8,7 @@ import org.codeoverflow.chatoverflow.api.plugin.{Plugin, PluginManager} import org.codeoverflow.chatoverflow.framework.PluginCompatibilityState.PluginCompatibilityState import org.codeoverflow.chatoverflow.framework.manager.{PluginManagerImpl, PluginManagerStub} import org.codeoverflow.chatoverflow.framework.{PluginCompatibilityState, PluginType} +import org.codeoverflow.chatoverflow.ui.web.rest.events.{EventMessage, EventsDispatcher} /** * A plugin instance holds all the general information of the plugin type and specific information of @@ -149,6 +150,8 @@ class PluginInstance(val instanceName: String, pluginType: PluginType) extends W logger info s"Starting plugin '$instanceName' in new thread!" try { instanceThread = new Thread(() => { + EventsDispatcher.broadcast("instance", EventMessage("start", Map(("name", instanceName)))) + try { // Execute plugin setup @@ -191,6 +194,7 @@ class PluginInstance(val instanceName: String, pluginType: PluginType) extends W requirement.asInstanceOf[Requirement[Output]].get().shutdown() }) + EventsDispatcher.broadcast("instance", EventMessage("stop", Map(("name", instanceName)))) } }) instanceThread.start() diff --git a/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventMessage.scala b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventMessage.scala new file mode 100644 index 00000000..704ae346 --- /dev/null +++ b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventMessage.scala @@ -0,0 +1,3 @@ +package org.codeoverflow.chatoverflow.ui.web.rest.events + +case class EventMessage[T](action: String, data: T) diff --git a/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsController.scala b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsController.scala new file mode 100644 index 00000000..3b2600e4 --- /dev/null +++ b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsController.scala @@ -0,0 +1,84 @@ +package org.codeoverflow.chatoverflow.ui.web.rest.events + +import java.io.PrintWriter +import java.util.concurrent.ConcurrentHashMap + +import javax.servlet.AsyncContext +import javax.servlet.http.HttpServletRequest +import org.codeoverflow.chatoverflow.ui.web.JsonServlet +import org.scalatra.servlet.ScalatraAsyncSupport +import org.scalatra.{BadRequest, Unauthorized} +import org.scalatra.swagger.Swagger + +class EventsController(implicit val swagger: Swagger) extends JsonServlet with ScalatraAsyncSupport with EventsControllerDefinition { + private val connectionWriters = new ConcurrentHashMap[AsyncContext, PrintWriter]() + + def broadcast(messageType: String, message: String = null): Unit = { + connectionWriters.forEach((_, writer) => { + try { + sendMessage(writer, messageType, message) + } catch { + //probably lost or closed connection, remove from the list of connected clients + case _: Throwable => connectionWriters.remove(writer) + } + }) + } + + def closeConnections(): Unit = { + connectionWriters.forEach((_, writer) => { + try { + sendMessage(writer, "close", null) + writer.close() + } finally { + connectionWriters.remove(writer) + } + }) + } + + private def sendMessage(writer: PrintWriter, messageType: String, message: String): Unit = { + /* + Every message has the following format and ends with two line feeds (\n): + event: [name of event] + data: [first line] + data: [second line] + ... + + See also: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#Examples + */ + + var msg = "event: " + messageType.replace("\n", "") + "\n" + if (message != null) + msg += "data: " + message.replace("\n", "\ndata: ") + "\n\n" + writer.write(msg) + writer.flush() + } + + get("/", operation(getEvents)) { + val accept = request.getHeader("Accept") + if (accept == null || !accept.replace(" ", "").split(",").contains("text/event-stream")) { + status = 406 + } else { + authParamRequired { + contentType = "text/event-stream" + + val asyncContext = request.startAsync() + asyncContext.setTimeout(0) + + val writer = asyncContext.getResponse.getWriter + connectionWriters.put(asyncContext, writer) + } + } + } + + private def authParamRequired(func: => Any)(implicit request: HttpServletRequest): Any = { + val authKeyKey = "authKey" + + if (!request.parameters.contains(authKeyKey) || request.getParameter(authKeyKey).isEmpty) { + BadRequest() + } else if (request.getParameter(authKeyKey) != chatOverflow.credentialsService.generateAuthKey()) { + Unauthorized() + } else { + func + } + } +} diff --git a/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsControllerDefinition.scala b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsControllerDefinition.scala new file mode 100644 index 00000000..c303b3c4 --- /dev/null +++ b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsControllerDefinition.scala @@ -0,0 +1,21 @@ +package org.codeoverflow.chatoverflow.ui.web.rest.events + +import org.codeoverflow.chatoverflow.ui.web.rest.{AuthSupport, TagSupport} +import org.scalatra.swagger.{SwaggerSupport, SwaggerSupportSyntax} +import org.scalatra.swagger.SwaggerSupportSyntax.OperationBuilder + +trait EventsControllerDefinition extends SwaggerSupport with TagSupport with AuthSupport { + val getEvents: OperationBuilder = + (apiOperation[Object]("getEvents") + summary "Get events" + description "Get events from chatoverflow using the EventSource API. Requires an Accept-header with the value text/event-stream." + parameter authQuery + tags controllerTag) + + protected def authQuery: SwaggerSupportSyntax.ParameterBuilder[String] = + queryParam[String]("authKey").description("connection auth key required") + + override def controllerTag: String = "events" + + override protected def applicationDescription: String = "Handles chatoverflow events." +} diff --git a/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsDispatcher.scala b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsDispatcher.scala new file mode 100644 index 00000000..fa3a044b --- /dev/null +++ b/src/main/scala/org/codeoverflow/chatoverflow/ui/web/rest/events/EventsDispatcher.scala @@ -0,0 +1,50 @@ +package org.codeoverflow.chatoverflow.ui.web.rest.events + +import org.json4s.{DefaultFormats, Formats} +import org.json4s.jackson.Serialization + +/** + * The EventsDispatcher is the central point for realtime communication to the clients + */ +object EventsDispatcher { + private var controller: EventsController = _ + implicit val formats: Formats = DefaultFormats + + /** + * Initializes the EventsDispatcher with the registered controller + * Only to be used from the bootstrap + * @param eventsController registered controller that accepts the incoming connections + */ + def init(eventsController: EventsController): Unit = { + if (controller == null) + controller = eventsController + } + + /** + * Sends the message to all connected clients + * @param messageType type of the message / event + * @param message the message to send + * @tparam T type of the message data + */ + def broadcast[T](messageType: String, message: EventMessage[T]): Unit = { + broadcast(messageType, Serialization.write(message)) + } + + /** + * Sends the message to all connected clients + * @param messageType type of the message / event + * @param message the message to send + */ + def broadcast(messageType: String, message: String = null): Unit = { + if (controller != null) + controller.broadcast(messageType, message) + } + + /** + * Sends a close message to all connected clients and closes the connections + */ + def close(): Unit = { + if (controller != null) + controller.closeConnections() + } +}