diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/model/CustomVocabulary.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/model/CustomVocabulary.scala index ed53509b9..0cfc4fe66 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/model/CustomVocabulary.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/model/CustomVocabulary.scala @@ -78,15 +78,19 @@ import com.typesafe.config.Config * `:\$name`. * - `base-query`: query for the denominator. * - `keys`: tag keys that are available for use on the denominator. + * + * @param dependencies + * Other vocabularies to depend on, defaults to the `StyleVocabulary`. */ -class CustomVocabulary(config: Config) extends Vocabulary { +class CustomVocabulary(config: Config, dependencies: List[Vocabulary] = List(StyleVocabulary)) + extends Vocabulary { import CustomVocabulary.* import scala.jdk.CollectionConverters.* val name: String = "custom" - val dependsOn: List[Vocabulary] = List(StyleVocabulary) + val dependsOn: List[Vocabulary] = dependencies val words: List[Word] = { val vocab = config.getConfig("atlas.core.vocabulary") diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/model/TraceQuery.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/model/TraceQuery.scala index 8a641dcc3..c7df49e66 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/model/TraceQuery.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/model/TraceQuery.scala @@ -39,5 +39,8 @@ object TraceQuery { case class Child(q1: Query, q2: Query) extends TraceQuery /** Filter to select the set of spans from a trace to forward as events. */ - case class SpanFilter(q: TraceQuery, f: DataExpr) extends Expr + case class SpanFilter(q: TraceQuery, f: Query) extends Expr + + /** Time series based on data from a set of matching traces. */ + case class SpanTimeSeries(q: TraceQuery, expr: StyleExpr) extends Expr } diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/model/TraceVocabulary.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/model/TraceVocabulary.scala index 4d306e825..e0be14dbd 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/model/TraceVocabulary.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/model/TraceVocabulary.scala @@ -25,12 +25,13 @@ object TraceVocabulary extends Vocabulary { val name: String = "trace" - val dependsOn: List[Vocabulary] = List(DataVocabulary) + val dependsOn: List[Vocabulary] = List(StyleVocabulary) override def words: List[Word] = List( SpanAndWord, SpanOrWord, SpanFilterWord, + SpanTimeSeriesWord, ChildWord ) @@ -109,14 +110,11 @@ object TraceVocabulary extends Vocabulary { override def name: String = "span-filter" override protected def matcher: PartialFunction[List[Any], Boolean] = { - case (_: Query) :: TraceQueryType(_) :: _ => true - case (_: DataExpr) :: TraceQueryType(_) :: _ => true + case (_: Query) :: TraceQueryType(_) :: _ => true } override protected def executor: PartialFunction[List[Any], List[Any]] = { case (f: Query) :: TraceQueryType(q) :: stack => - TraceQuery.SpanFilter(q, DataExpr.All(f)) :: stack - case (f: DataExpr) :: TraceQueryType(q) :: stack => TraceQuery.SpanFilter(q, f) :: stack } @@ -129,4 +127,27 @@ object TraceVocabulary extends Vocabulary { override def examples: List[String] = List("app,foo,:eq,app,bar,:eq") } + + case object SpanTimeSeriesWord extends SimpleWord { + + override def name: String = "span-time-series" + + override protected def matcher: PartialFunction[List[Any], Boolean] = { + case PresentationType(_) :: TraceQueryType(_) :: _ => true + } + + override protected def executor: PartialFunction[List[Any], List[Any]] = { + case PresentationType(f: StyleExpr) :: TraceQueryType(q) :: stack => + TraceQuery.SpanTimeSeries(q, f) :: stack + } + + override def signature: String = "q:TraceQuery f:Query -- SpanFilter" + + override def summary: String = + """ + |Time series based on data from a set of matching traces. + |""".stripMargin + + override def examples: List[String] = List("app,foo,:eq,app,bar,:eq,:sum,ts,:legend") + } } diff --git a/atlas-core/src/test/scala/com/netflix/atlas/core/model/TraceVocabularySuite.scala b/atlas-core/src/test/scala/com/netflix/atlas/core/model/TraceVocabularySuite.scala index a564441b5..5651b1cfd 100644 --- a/atlas-core/src/test/scala/com/netflix/atlas/core/model/TraceVocabularySuite.scala +++ b/atlas-core/src/test/scala/com/netflix/atlas/core/model/TraceVocabularySuite.scala @@ -37,6 +37,13 @@ class TraceVocabularySuite extends FunSuite { } } + private def parseTimeSeries(str: String): TraceQuery.SpanTimeSeries = { + interpreter.execute(str).stack match { + case (t: TraceQuery.SpanTimeSeries) :: Nil => t + case _ => throw new IllegalArgumentException(str) + } + } + test("simple Query coerced to TraceQuery") { val q = parseTraceQuery("app,foo,:eq") assertEquals(q, TraceQuery.Simple(Query.Equal("app", "foo"))) @@ -76,7 +83,19 @@ class TraceVocabularySuite extends FunSuite { Query.Equal("app", "foo"), Query.Equal("app", "bar") ), - DataExpr.All(Query.Equal("app", "foo")) + Query.Equal("app", "foo") + ) + assertEquals(q, expected) + } + + test("span-time-series") { + val q = parseTimeSeries("app,foo,:eq,app,bar,:eq,:child,app,foo,:eq,:span-time-series") + val expected = TraceQuery.SpanTimeSeries( + TraceQuery.Child( + Query.Equal("app", "foo"), + Query.Equal("app", "bar") + ), + StyleExpr(DataExpr.Sum(Query.Equal("app", "foo")), Map.empty) ) assertEquals(q, expected) } diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/ExprType.java b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/ExprType.java index 667466feb..b56290ace 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/ExprType.java +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/ExprType.java @@ -17,15 +17,19 @@ /** Indicates the type of expression for a subscription. */ public enum ExprType { + + /** Expression to select a set of events to be passed through. */ + EVENTS, + /** * Time series expression such as used with Atlas Graph API. Can also be used for analytics * queries on top of event data. */ TIME_SERIES, - /** Expression to select a set of events to be passed through. */ - EVENTS, - /** Expression to select a set of traces to be passed through. */ - TRACES + TRACE_EVENTS, + + /** Time series expression based on data extraced from traces. */ + TRACE_TIME_SERIES } diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala index 87ffa4e72..6e0f59a4d 100644 --- a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala @@ -66,7 +66,7 @@ abstract class AbstractLwcEventClient extends LwcEventClient { // Trace pass-through traceHandlers = subscriptions.tracePassThrough.map { sub => - sub -> ExprUtils.parseTraceQuery(sub.expression) + sub -> ExprUtils.parseTraceEventsQuery(sub.expression) }.toMap index = idx @@ -125,7 +125,7 @@ abstract class AbstractLwcEventClient extends LwcEventClient { override def processTrace(trace: Seq[LwcEvent.Span]): Unit = { traceHandlers.foreachEntry { (sub, filter) => if (TraceMatcher.matches(filter.q, trace)) { - val filtered = trace.filter(event => ExprUtils.matches(filter.f.query, event.tagValue)) + val filtered = trace.filter(event => ExprUtils.matches(filter.f, event.tagValue)) if (filtered.nonEmpty) { submit(sub.id, LwcEvent.Events(filtered)) } diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/ExprUtils.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/ExprUtils.scala index 09f29dcec..0242aedc2 100644 --- a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/ExprUtils.scala +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/ExprUtils.scala @@ -21,6 +21,7 @@ import com.netflix.atlas.core.model.EventExpr import com.netflix.atlas.core.model.EventVocabulary import com.netflix.atlas.core.model.ModelExtractors import com.netflix.atlas.core.model.Query +import com.netflix.atlas.core.model.StyleExpr import com.netflix.atlas.core.model.TraceQuery import com.netflix.atlas.core.model.TraceVocabulary import com.netflix.atlas.core.stacklang.Interpreter @@ -56,11 +57,11 @@ private[events] object ExprUtils { private val traceInterpreter = Interpreter(TraceVocabulary.allWords) private def matchAllSpans(q: TraceQuery): TraceQuery.SpanFilter = { - TraceQuery.SpanFilter(q, DataExpr.All(Query.True)) + TraceQuery.SpanFilter(q, Query.True) } - /** Parse a single trace query expression. */ - def parseTraceQuery(str: String): TraceQuery.SpanFilter = { + /** Parse a single trace events query expression. */ + def parseTraceEventsQuery(str: String): TraceQuery.SpanFilter = { traceInterpreter.execute(str).stack match { case TraceQueryType(q) :: Nil => matchAllSpans(q) case (f: TraceQuery.SpanFilter) :: Nil => f @@ -68,6 +69,19 @@ private[events] object ExprUtils { } } + private def sumAllSpans(q: TraceQuery): TraceQuery.SpanTimeSeries = { + TraceQuery.SpanTimeSeries(q, StyleExpr(DataExpr.Sum(Query.True), Map.empty)) + } + + /** Parse a single trace time series query expression. */ + def parseTraceTimeSeriesQuery(str: String): TraceQuery.SpanTimeSeries = { + traceInterpreter.execute(str).stack match { + case TraceQueryType(q) :: Nil => sumAllSpans(q) + case (f: TraceQuery.SpanTimeSeries) :: Nil => f + case _ => throw new IllegalArgumentException(str) + } + } + /** Convert from Atlas query model to Spectator to use with a QueryIndex. */ def toSpectatorQuery(query: Query): SpectatorQuery = { Parser.parseQuery(query.toString) diff --git a/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/ExprUtilsSuite.scala b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/ExprUtilsSuite.scala index db1d87875..055f8e870 100644 --- a/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/ExprUtilsSuite.scala +++ b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/ExprUtilsSuite.scala @@ -18,6 +18,7 @@ package com.netflix.atlas.lwc.events import com.netflix.atlas.core.model.DataExpr import com.netflix.atlas.core.model.EventExpr import com.netflix.atlas.core.model.Query +import com.netflix.atlas.core.model.StyleExpr import com.netflix.atlas.core.model.TraceQuery import munit.FunSuite @@ -53,9 +54,9 @@ class ExprUtilsSuite extends FunSuite { test("trace: simple query") { val expected = TraceQuery.SpanFilter( TraceQuery.Simple(Query.Equal("app", "foo")), - DataExpr.All(Query.True) + Query.True ) - assertEquals(ExprUtils.parseTraceQuery("app,foo,:eq"), expected) + assertEquals(ExprUtils.parseTraceEventsQuery("app,foo,:eq"), expected) } test("trace: complex") { @@ -64,21 +65,21 @@ class ExprUtilsSuite extends FunSuite { Query.Equal("app", "foo"), Query.Equal("app", "bar") ), - DataExpr.All(Query.Equal("app", "foo")) + Query.Equal("app", "foo") ) val expr = "app,foo,:eq,app,bar,:eq,:child,app,foo,:eq,:span-filter" - assertEquals(ExprUtils.parseTraceQuery(expr), expected) + assertEquals(ExprUtils.parseTraceEventsQuery(expr), expected) } test("trace: analytics") { - val expected = TraceQuery.SpanFilter( + val expected = TraceQuery.SpanTimeSeries( TraceQuery.Child( Query.Equal("app", "foo"), Query.Equal("app", "bar") ), - DataExpr.Sum(Query.Equal("app", "foo")) + StyleExpr(DataExpr.Sum(Query.Equal("app", "foo")), Map.empty) ) - val expr = "app,foo,:eq,app,bar,:eq,:child,app,foo,:eq,:sum,:span-filter" - assertEquals(ExprUtils.parseTraceQuery(expr), expected) + val expr = "app,foo,:eq,app,bar,:eq,:child,app,foo,:eq,:sum,:span-time-series" + assertEquals(ExprUtils.parseTraceTimeSeriesQuery(expr), expected) } } diff --git a/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/TraceMatcherSuite.scala b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/TraceMatcherSuite.scala index 0f0819ca5..268770591 100644 --- a/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/TraceMatcherSuite.scala +++ b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/TraceMatcherSuite.scala @@ -58,37 +58,37 @@ class TraceMatcherSuite extends FunSuite { } test("simple") { - checkMatch(ExprUtils.parseTraceQuery("app,a,:eq").q, true) - checkMatch(ExprUtils.parseTraceQuery("app,z,:eq").q, false) + checkMatch(ExprUtils.parseTraceEventsQuery("app,a,:eq").q, true) + checkMatch(ExprUtils.parseTraceEventsQuery("app,z,:eq").q, false) } test("span-and") { - val q1 = ExprUtils.parseTraceQuery("app,a,:eq,app,e,:eq,:span-and").q + val q1 = ExprUtils.parseTraceEventsQuery("app,a,:eq,app,e,:eq,:span-and").q checkMatch(q1, true) - val q2 = ExprUtils.parseTraceQuery("app,a,:eq,app,z,:eq,:span-and").q + val q2 = ExprUtils.parseTraceEventsQuery("app,a,:eq,app,z,:eq,:span-and").q checkMatch(q2, false) } test("span-or") { - val q1 = ExprUtils.parseTraceQuery("app,a,:eq,app,e,:eq,:span-or").q + val q1 = ExprUtils.parseTraceEventsQuery("app,a,:eq,app,e,:eq,:span-or").q checkMatch(q1, true) - val q2 = ExprUtils.parseTraceQuery("app,a,:eq,app,z,:eq,:span-or").q + val q2 = ExprUtils.parseTraceEventsQuery("app,a,:eq,app,z,:eq,:span-or").q checkMatch(q2, true) - val q3 = ExprUtils.parseTraceQuery("app,y,:eq,app,z,:eq,:span-or").q + val q3 = ExprUtils.parseTraceEventsQuery("app,y,:eq,app,z,:eq,:span-or").q checkMatch(q3, false) } test("child") { - val q1 = ExprUtils.parseTraceQuery("app,a,:eq,app,c,:eq,:child").q + val q1 = ExprUtils.parseTraceEventsQuery("app,a,:eq,app,c,:eq,:child").q checkMatch(q1, true) - val q2 = ExprUtils.parseTraceQuery("app,c,:eq,app,e,:eq,:child").q + val q2 = ExprUtils.parseTraceEventsQuery("app,c,:eq,app,e,:eq,:child").q checkMatch(q2, true) - val q3 = ExprUtils.parseTraceQuery("app,a,:eq,app,e,:eq,:child").q + val q3 = ExprUtils.parseTraceEventsQuery("app,a,:eq,app,e,:eq,:child").q checkMatch(q3, false) } }