Skip to content

Commit

Permalink
lwcapi: avoid queue for high priority messages (#1683)
Browse files Browse the repository at this point in the history
Some messages such as the subscription metadata need to go
through in order to properly process data. In some cases we
would see a lot of input data come in almost immediately and
result in the subscription info messages getting dropped. The
consumer was then unable to match the datapoints with an expr.

This changes the messages for setting up a new subscription to
be a high priority path that bypasses the queue for incoming
data points.
  • Loading branch information
brharrington authored Aug 9, 2024
1 parent 226e2a7 commit 727fd46
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class SubscribeApi(
registry.clock().wallTime() / step * step
}

private def register(streamMeta: StreamMetadata): (QueueHandler, Source[JsonSupport, NotUsed]) = {
private def register(streamMeta: StreamMetadata): Source[JsonSupport, NotUsed] = {

val streamId = streamMeta.streamId

Expand Down Expand Up @@ -183,28 +183,29 @@ class SubscribeApi(
Source(steps)
}

val source = Source
Source
.fromPublisher(pub)
.flatMapConcat(Source.apply)
.merge(heartbeatSrc)
.viaMat(StreamOps.monitorFlow(registry, "StreamApi"))(Keep.left)

handler -> source
}

private def subscribe(streamId: String, expressions: List[ExpressionMetadata]): List[ErrorMsg] = {
private def subscribe(
streamId: String,
expressions: List[ExpressionMetadata]
): List[JsonSupport] = {
evalsCounter.increment()
itemsCounter.increment(expressions.size)

val errors = scala.collection.mutable.ListBuffer[ErrorMsg]()
val messages = List.newBuilder[JsonSupport]
val subIdsBuilder = Set.newBuilder[String]

expressions.foreach { expr =>
try {
val splits = splitter.split(expr.expression, expr.exprType, expr.frequency)

// Add any new expressions
val (queue, addedSubs) = sm.subscribe(streamId, splits)
val (_, addedSubs) = sm.subscribe(streamId, splits)
val subMessages = addedSubs.map { sub =>
val meta = sub.metadata
val exprInfo = LwcDataExpr(meta.id, meta.expression, meta.frequency)
Expand All @@ -216,14 +217,14 @@ class SubscribeApi(
LwcSubscriptionV2(expr.expression, expr.exprType, List(exprInfo))
}
}
queue.offer(subMessages)
messages ++= subMessages

// Add expression ids in use by this split
subIdsBuilder ++= splits.map(_.metadata.id)
} catch {
case NonFatal(e) =>
logger.error(s"Unable to subscribe to expression ${expr.expression}", e)
errors += ErrorMsg(expr.expression, e.getMessage)
messages += DiagnosticMessage.error(s"[${expr.expression}] ${e.getMessage}")
}
}

Expand All @@ -233,7 +234,7 @@ class SubscribeApi(
.filter(s => !subIds.contains(s.metadata.id))
.foreach(s => sm.unsubscribe(streamId, s.metadata.id))

errors.toList
messages.result()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ import scala.util.control.NonFatal
*/
private[lwcapi] class WebSocketSessionManager(
val streamMeta: StreamMetadata,
val registerFunc: StreamMetadata => (QueueHandler, Source[JsonSupport, NotUsed]),
val subscribeFunc: (String, List[ExpressionMetadata]) => List[ErrorMsg]
val registerFunc: StreamMetadata => Source[JsonSupport, NotUsed],
val subscribeFunc: (String, List[ExpressionMetadata]) => List[JsonSupport]
) extends GraphStage[FlowShape[AnyRef, Source[JsonSupport, NotUsed]]]
with StrictLogging {

Expand All @@ -54,16 +54,17 @@ private[lwcapi] class WebSocketSessionManager(
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) with InHandler with OutHandler {

var dataSourcePushed = false
var queueHandler: QueueHandler = _
var dataSource: Source[JsonSupport, NotUsed] = _
private var dataSourcePushed = false
private var dataSource: Source[JsonSupport, NotUsed] = _

// Messages that shouldn't get dropped and thus shouldn't go through the queue
// with other data
private var highPriorityMessages: List[JsonSupport] = Nil

setHandlers(in, out, this)

override def preStart(): Unit = {
val (_queueHandler, _dataSource) = registerFunc(streamMeta)
queueHandler = _queueHandler
dataSource = _dataSource
dataSource = registerFunc(streamMeta)
}

override def onPush(): Unit = {
Expand All @@ -75,12 +76,11 @@ private[lwcapi] class WebSocketSessionManager(
val metadata =
lwcExpressions.map(v => ExpressionMetadata(v.expression, v.exprType, v.step))
// Update subscription here
val errors = subscribeFunc(streamMeta.streamId, metadata).map { error =>
DiagnosticMessage.error(s"[${error.expression}] ${error.message}")
}
queueHandler.offer(errors)
val messages = subscribeFunc(streamMeta.streamId, metadata)
highPriorityMessages = highPriorityMessages ::: messages
} catch {
case NonFatal(t) => queueHandler.offer(Seq(DiagnosticMessage.error(t)))
case NonFatal(t) =>
highPriorityMessages = DiagnosticMessage.error(t) :: highPriorityMessages
} finally {
// Push out dataSource only once
if (!dataSourcePushed) {
Expand All @@ -95,7 +95,12 @@ private[lwcapi] class WebSocketSessionManager(
}

override def onPull(): Unit = {
pull(in)
if (highPriorityMessages.nonEmpty) {
push(out, Source(highPriorityMessages))
highPriorityMessages = Nil
} else {
pull(in)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ class WebSocketSessionManagerSuite extends FunSuite {

private def run(
data: List[ByteString],
registerFunc: StreamMetadata => (QueueHandler, Source[JsonSupport, NotUsed]),
subscribeFunc: (String, List[ExpressionMetadata]) => List[ErrorMsg]
registerFunc: StreamMetadata => Source[JsonSupport, NotUsed],
subscribeFunc: (String, List[ExpressionMetadata]) => List[JsonSupport]
): List[String] = {
val future = Source(data)
.via(new WebSocketSessionManager(StreamMetadata(""), registerFunc, subscribeFunc))
Expand All @@ -91,23 +91,15 @@ class WebSocketSessionManagerSuite extends FunSuite {

private def createSubscribeFunc(
subsCollector: ArrayBuffer[List[ExpressionMetadata]]
): (String, List[ExpressionMetadata]) => List[ErrorMsg] = {
val subFunc = (_: String, expressions: List[ExpressionMetadata]) => {
subsCollector += expressions
List[ErrorMsg]()
}
subFunc
): (String, List[ExpressionMetadata]) => List[JsonSupport] = {
(_: String, expressions: List[ExpressionMetadata]) =>
{
subsCollector += expressions
Nil
}
}

private def createNoopRegisterFunc()
: StreamMetadata => (QueueHandler, Source[JsonSupport, NotUsed]) = {
val noopQueueHandler = new QueueHandler(StreamMetadata(""), null) {
override def offer(msgs: Seq[JsonSupport]): Unit = ()
override def complete(): Unit = ()
}
val noopSource = Source.empty[JsonSupport].mapMaterializedValue(_ => NotUsed)
val noopRegisterFunc = (_: StreamMetadata) => (noopQueueHandler, noopSource)

noopRegisterFunc
private def createNoopRegisterFunc(): StreamMetadata => Source[JsonSupport, NotUsed] = { _ =>
Source.empty
}
}

0 comments on commit 727fd46

Please sign in to comment.