Skip to content

Commit

Permalink
eval: fix message drops for higher throughput (Netflix#1634)
Browse files Browse the repository at this point in the history
Diagnostic messages and events were using the ds logger
that has a separate queue. This made them prone to getting
dropped if the throughput was high. Refactor to pass these
messages over the same path as the data points.
  • Loading branch information
brharrington authored and manolama committed May 22, 2024
1 parent 0ac5550 commit 2fb4f04
Show file tree
Hide file tree
Showing 14 changed files with 477 additions and 69 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.eval.model

import com.netflix.atlas.eval.stream.Evaluator

/**
* Pairs set of data points with other arbitrary messages to pass through to the
* consumer.
*/
case class DatapointsTuple(
data: List[AggrDatapoint],
messages: List[Evaluator.MessageEnvelope] = Nil
) {

def step: Long = {
if (data.nonEmpty) data.head.step else 0L
}

def groupByStep: List[DatapointsTuple] = {
val dps = data.groupBy(_.step).map(t => DatapointsTuple(t._2)).toList
if (messages.nonEmpty)
DatapointsTuple(Nil, messages) :: dps
else
dps
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,31 @@
public enum ExprType {

/** Expression to select a set of events to be passed through. */
EVENTS,
EVENTS(false),

/**
* Time series expression such as used with Atlas Graph API. Can also be used for analytics
* queries on top of event data.
*/
TIME_SERIES,
TIME_SERIES(true),

/** Expression to select a set of traces to be passed through. */
TRACE_EVENTS,
TRACE_EVENTS(false),

/** Time series expression based on data extraced from traces. */
TRACE_TIME_SERIES
TRACE_TIME_SERIES(true);

private final boolean timeSeries;

ExprType(boolean timeSeries) {
this.timeSeries = timeSeries;
}

public boolean isEventType() {
return !timeSeries;
}

public boolean isTimeSeriesType() {
return timeSeries;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.eval.model

import com.netflix.atlas.eval.stream.Evaluator

/**
* Pairs set of time groups with other arbitrary messages to pass through to the
* consumer.
*/
case class TimeGroupsTuple(
groups: List[TimeGroup],
messages: List[Evaluator.MessageEnvelope] = Nil
) {

def step: Long = {
if (groups.nonEmpty) groups.head.step else 0L
}

def groupByStep: List[TimeGroupsTuple] = {
val gps = groups.groupBy(_.step).map(t => TimeGroupsTuple(t._2)).toList
if (messages.nonEmpty)
TimeGroupsTuple(Nil, messages) :: gps
else
gps
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import com.netflix.atlas.eval.model.ExprType
import com.netflix.atlas.eval.model.LwcExpression
import com.netflix.atlas.eval.model.LwcMessages
import com.netflix.atlas.eval.model.TimeGroup
import com.netflix.atlas.eval.model.TimeGroupsTuple
import com.netflix.atlas.eval.stream.EurekaSource.Instance
import com.netflix.atlas.eval.stream.Evaluator.DataSource
import com.netflix.atlas.eval.stream.Evaluator.DataSources
Expand Down Expand Up @@ -244,12 +245,11 @@ private[stream] abstract class EvaluatorImpl(
val intermediateEval = createInputFlow(context)
.via(context.monitorFlow("10_InputBatches"))
.via(new LwcToAggrDatapoint(context))
.flatMapConcat { vs =>
Source(vs.groupBy(_.step).map(_._2.toList))
.flatMapConcat { t =>
Source(t.groupByStep)
}
.groupBy(Int.MaxValue, _.head.step, allowClosedSubstreamRecreation = true)
.groupBy(Int.MaxValue, _.step, allowClosedSubstreamRecreation = true)
.via(new TimeGrouped(context))
.flatMapConcat(Source.apply)
.mergeSubstreams
.via(context.monitorFlow("11_GroupedDatapoints"))

Expand Down Expand Up @@ -362,7 +362,7 @@ private[stream] abstract class EvaluatorImpl(

aggregator match {
case Some(aggr) if aggr.limitExceeded =>
context.logDatapointsExceeded(group.timestamp, t._1)
context.logDatapointsExceeded(group.timestamp, t._1.toString)
AggrValuesInfo(Nil, t._2.size)
case Some(aggr) =>
AggrValuesInfo(aggr.datapoints, t._2.size)
Expand All @@ -389,9 +389,9 @@ private[stream] abstract class EvaluatorImpl(
* the objects so the FinalExprEval stage will only see a single step.
*/
private def stepSize: PartialFunction[AnyRef, Long] = {
case ds: DataSources => ds.stepSize()
case grp: TimeGroup => grp.step
case v => throw new IllegalArgumentException(s"unexpected value in stream: $v")
case ds: DataSources => ds.stepSize()
case t: TimeGroupsTuple => t.step
case v => throw new IllegalArgumentException(s"unexpected value in stream: $v")
}

/**
Expand All @@ -408,6 +408,8 @@ private[stream] abstract class EvaluatorImpl(
new DataSources(sources.asJava)
}
.toList
case t: TimeGroupsTuple =>
t.groupByStep
case _ =>
List(value)
}
Expand Down Expand Up @@ -506,8 +508,10 @@ private[stream] abstract class EvaluatorImpl(

private def toExprSet(dss: DataSources, interpreter: ExprInterpreter): Set[LwcExpression] = {
dss.sources.asScala.flatMap { dataSource =>
interpreter.eval(Uri(dataSource.uri)).exprs.map { expr =>
LwcExpression(expr.toString, ExprType.TIME_SERIES, dataSource.step.toMillis)
val (exprType, exprs) = interpreter.parseQuery(Uri(dataSource.uri))
exprs.map { expr =>
val step = if (exprType.isTimeSeriesType) dataSource.step.toMillis else 0L
LwcExpression(expr.toString, exprType, step)
}
}.toSet
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,21 @@
*/
package com.netflix.atlas.eval.stream

import com.netflix.atlas.core.model.CustomVocabulary
import org.apache.pekko.http.scaladsl.model.Uri
import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.model.EventExpr
import com.netflix.atlas.core.model.EventVocabulary
import com.netflix.atlas.core.model.Expr
import com.netflix.atlas.core.model.FilterExpr
import com.netflix.atlas.core.model.ModelExtractors
import com.netflix.atlas.core.model.StatefulExpr
import com.netflix.atlas.core.model.TraceQuery
import com.netflix.atlas.core.model.TraceVocabulary
import com.netflix.atlas.core.stacklang.Interpreter
import com.netflix.atlas.eval.graph.GraphConfig
import com.netflix.atlas.eval.graph.Grapher
import com.netflix.atlas.eval.model.ExprType
import com.netflix.atlas.eval.stream.Evaluator.DataSource
import com.netflix.atlas.eval.stream.Evaluator.DataSources
import com.netflix.atlas.eval.util.HostRewriter
Expand All @@ -31,6 +39,16 @@ import scala.util.Success

private[stream] class ExprInterpreter(config: Config) {

import ExprInterpreter.*

private val eventInterpreter = Interpreter(
new CustomVocabulary(config, List(EventVocabulary)).allWords
)

private val traceInterpreter = Interpreter(
new CustomVocabulary(config, List(TraceVocabulary)).allWords
)

private val grapher = Grapher(config)

private val hostRewriter = new HostRewriter(config.getConfig("atlas.eval.host-rewrite"))
Expand Down Expand Up @@ -72,16 +90,103 @@ private[stream] class ExprInterpreter(config: Config) {
)
}

def dataExprMap(ds: DataSources): Map[DataExpr, List[DataSource]] = {
def dataExprMap(ds: DataSources): Map[String, List[DataSource]] = {
import scala.jdk.CollectionConverters.*
ds.sources.asScala.toList
.flatMap { s =>
val exprs = eval(Uri(s.uri)).exprs.flatMap(_.expr.dataExprs).distinct
exprs.map(_ -> s)
dataExprs(Uri(s.uri)).map(_ -> s)
}
.groupBy(_._1)
.map {
case (expr, vs) => expr -> vs.map(_._2)
}
}

private def invalidValue(value: Any): IllegalArgumentException = {
new IllegalArgumentException(s"invalid value on stack; $value")
}

private def evalEvents(uri: Uri): List[EventExpr] = {
uri.query().get("q") match {
case Some(query) =>
eventInterpreter.execute(query).stack.map {
case ModelExtractors.EventExprType(t) => t
case value => throw invalidValue(value)
}
case None =>
throw new IllegalArgumentException(s"missing required parameter: q ($uri)")
}
}

private def evalTraceEvents(uri: Uri): List[TraceQuery.SpanFilter] = {
uri.query().get("q") match {
case Some(query) =>
traceInterpreter.execute(query).stack.map {
case ModelExtractors.TraceFilterType(t) => t
case value => throw invalidValue(value)
}
case None =>
throw new IllegalArgumentException(s"missing required parameter: q ($uri)")
}
}

private def evalTraceTimeSeries(uri: Uri): List[TraceQuery.SpanTimeSeries] = {
uri.query().get("q") match {
case Some(query) =>
traceInterpreter.execute(query).stack.map {
case ModelExtractors.TraceTimeSeriesType(t) => t
case value => throw invalidValue(value)
}
case None =>
throw new IllegalArgumentException(s"missing required parameter: q ($uri)")
}
}

def parseQuery(uri: Uri): (ExprType, List[Expr]) = {
val exprType = determineExprType(uri)
val exprs = exprType match {
case ExprType.TIME_SERIES => eval(uri).exprs
case ExprType.EVENTS => evalEvents(uri)
case ExprType.TRACE_EVENTS => evalTraceEvents(uri)
case ExprType.TRACE_TIME_SERIES => evalTraceTimeSeries(uri)
}
exprType -> exprs.distinct
}

def dataExprs(uri: Uri): List[String] = {
val exprs = determineExprType(uri) match {
case ExprType.TIME_SERIES => eval(uri).exprs.flatMap(_.expr.dataExprs)
case ExprType.EVENTS => evalEvents(uri)
case ExprType.TRACE_EVENTS => evalTraceEvents(uri)
case ExprType.TRACE_TIME_SERIES => evalTraceTimeSeries(uri)
}
exprs.map(_.toString).distinct
}

def determineExprType(uri: Uri): ExprType = {
val reversed = reversedPath(uri.path)
if (reversed.startsWith(eventsPrefix))
ExprType.EVENTS
else if (reversed.startsWith(traceEventsPrefix))
ExprType.TRACE_EVENTS
else if (reversed.startsWith(traceTimeSeriesPrefix))
ExprType.TRACE_TIME_SERIES
else
ExprType.TIME_SERIES
}

private def reversedPath(path: Uri.Path): Uri.Path = {
val reversed = path.reverse
if (reversed.startsWithSlash)
reversed.tail
else
reversed
}
}

private[stream] object ExprInterpreter {

private val eventsPrefix = Uri.Path("events")
private val traceEventsPrefix = Uri.Path("traces")
private val traceTimeSeriesPrefix = Uri.Path("graph") / "traces"
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.netflix.atlas.core.model.StyleExpr
import com.netflix.atlas.core.model.TimeSeries
import com.netflix.atlas.core.util.IdentityMap
import com.netflix.atlas.eval.model.TimeGroup
import com.netflix.atlas.eval.model.TimeGroupsTuple
import com.netflix.atlas.eval.model.TimeSeriesMessage
import com.netflix.atlas.eval.stream.Evaluator.DataSources
import com.netflix.atlas.eval.stream.Evaluator.MessageEnvelope
Expand Down Expand Up @@ -173,7 +174,7 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter)

// Perform the final evaluation and create a source with the TimeSeriesMessages
// addressed to each recipient
private def handleData(group: TimeGroup): Unit = {
private def handleData(group: TimeGroup): List[MessageEnvelope] = {
// Finalize the DataExprs, needed as input for further evaluation
val timestamp = group.timestamp
val groupedDatapoints = group.dataExprValues
Expand Down Expand Up @@ -236,14 +237,26 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter)
case (id, rate) => new MessageEnvelope(id, rate)
}.toList

push(out, Source(output ++ rateMessages))
output ++ rateMessages
}

private def handleSingleGroup(g: TimeGroup): Unit = {
push(out, Source(handleData(g)))
}

private def handleGroups(t: TimeGroupsTuple): Unit = {
val msgs = List.newBuilder[MessageEnvelope]
msgs ++= t.messages
msgs ++= t.groups.flatMap(handleData)
push(out, Source(msgs.result()))
}

override def onPush(): Unit = {
grab(in) match {
case ds: DataSources => handleDataSources(ds)
case data: TimeGroup => handleData(data)
case v => throw new MatchError(v)
case ds: DataSources => handleDataSources(ds)
case data: TimeGroup => handleSingleGroup(data)
case t: TimeGroupsTuple => handleGroups(t)
case v => throw new MatchError(v)
}
}

Expand Down
Loading

0 comments on commit 2fb4f04

Please sign in to comment.