Skip to content

Commit

Permalink
add span-time-series operator (#1612)
Browse files Browse the repository at this point in the history
Just using span-filter for both pass-through and time series
use-cases on traces creates ambiguity for some use-cases
in particular with implicit conversions.
  • Loading branch information
brharrington authored Feb 27, 2024
1 parent 8a59ed9 commit 0d8a345
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,19 @@ import com.typesafe.config.Config
* `:\$name`.
* - `base-query`: query for the denominator.
* - `keys`: tag keys that are available for use on the denominator.
*
* @param dependencies
* Other vocabularies to depend on, defaults to the `StyleVocabulary`.
*/
class CustomVocabulary(config: Config) extends Vocabulary {
class CustomVocabulary(config: Config, dependencies: List[Vocabulary] = List(StyleVocabulary))
extends Vocabulary {

import CustomVocabulary.*
import scala.jdk.CollectionConverters.*

val name: String = "custom"

val dependsOn: List[Vocabulary] = List(StyleVocabulary)
val dependsOn: List[Vocabulary] = dependencies

val words: List[Word] = {
val vocab = config.getConfig("atlas.core.vocabulary")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,8 @@ object TraceQuery {
case class Child(q1: Query, q2: Query) extends TraceQuery

/** Filter to select the set of spans from a trace to forward as events. */
case class SpanFilter(q: TraceQuery, f: DataExpr) extends Expr
case class SpanFilter(q: TraceQuery, f: Query) extends Expr

/** Time series based on data from a set of matching traces. */
case class SpanTimeSeries(q: TraceQuery, expr: StyleExpr) extends Expr
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ object TraceVocabulary extends Vocabulary {

val name: String = "trace"

val dependsOn: List[Vocabulary] = List(DataVocabulary)
val dependsOn: List[Vocabulary] = List(StyleVocabulary)

override def words: List[Word] = List(
SpanAndWord,
SpanOrWord,
SpanFilterWord,
SpanTimeSeriesWord,
ChildWord
)

Expand Down Expand Up @@ -109,14 +110,11 @@ object TraceVocabulary extends Vocabulary {
override def name: String = "span-filter"

override protected def matcher: PartialFunction[List[Any], Boolean] = {
case (_: Query) :: TraceQueryType(_) :: _ => true
case (_: DataExpr) :: TraceQueryType(_) :: _ => true
case (_: Query) :: TraceQueryType(_) :: _ => true
}

override protected def executor: PartialFunction[List[Any], List[Any]] = {
case (f: Query) :: TraceQueryType(q) :: stack =>
TraceQuery.SpanFilter(q, DataExpr.All(f)) :: stack
case (f: DataExpr) :: TraceQueryType(q) :: stack =>
TraceQuery.SpanFilter(q, f) :: stack
}

Expand All @@ -129,4 +127,27 @@ object TraceVocabulary extends Vocabulary {

override def examples: List[String] = List("app,foo,:eq,app,bar,:eq")
}

case object SpanTimeSeriesWord extends SimpleWord {

override def name: String = "span-time-series"

override protected def matcher: PartialFunction[List[Any], Boolean] = {
case PresentationType(_) :: TraceQueryType(_) :: _ => true
}

override protected def executor: PartialFunction[List[Any], List[Any]] = {
case PresentationType(f: StyleExpr) :: TraceQueryType(q) :: stack =>
TraceQuery.SpanTimeSeries(q, f) :: stack
}

override def signature: String = "q:TraceQuery f:Query -- SpanFilter"

override def summary: String =
"""
|Time series based on data from a set of matching traces.
|""".stripMargin

override def examples: List[String] = List("app,foo,:eq,app,bar,:eq,:sum,ts,:legend")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ class TraceVocabularySuite extends FunSuite {
}
}

private def parseTimeSeries(str: String): TraceQuery.SpanTimeSeries = {
interpreter.execute(str).stack match {
case (t: TraceQuery.SpanTimeSeries) :: Nil => t
case _ => throw new IllegalArgumentException(str)
}
}

test("simple Query coerced to TraceQuery") {
val q = parseTraceQuery("app,foo,:eq")
assertEquals(q, TraceQuery.Simple(Query.Equal("app", "foo")))
Expand Down Expand Up @@ -76,7 +83,19 @@ class TraceVocabularySuite extends FunSuite {
Query.Equal("app", "foo"),
Query.Equal("app", "bar")
),
DataExpr.All(Query.Equal("app", "foo"))
Query.Equal("app", "foo")
)
assertEquals(q, expected)
}

test("span-time-series") {
val q = parseTimeSeries("app,foo,:eq,app,bar,:eq,:child,app,foo,:eq,:span-time-series")
val expected = TraceQuery.SpanTimeSeries(
TraceQuery.Child(
Query.Equal("app", "foo"),
Query.Equal("app", "bar")
),
StyleExpr(DataExpr.Sum(Query.Equal("app", "foo")), Map.empty)
)
assertEquals(q, expected)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@

/** Indicates the type of expression for a subscription. */
public enum ExprType {

/** Expression to select a set of events to be passed through. */
EVENTS,

/**
* Time series expression such as used with Atlas Graph API. Can also be used for analytics
* queries on top of event data.
*/
TIME_SERIES,

/** Expression to select a set of events to be passed through. */
EVENTS,

/** Expression to select a set of traces to be passed through. */
TRACES
TRACE_EVENTS,

/** Time series expression based on data extraced from traces. */
TRACE_TIME_SERIES
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ abstract class AbstractLwcEventClient extends LwcEventClient {

// Trace pass-through
traceHandlers = subscriptions.tracePassThrough.map { sub =>
sub -> ExprUtils.parseTraceQuery(sub.expression)
sub -> ExprUtils.parseTraceEventsQuery(sub.expression)
}.toMap

index = idx
Expand Down Expand Up @@ -125,7 +125,7 @@ abstract class AbstractLwcEventClient extends LwcEventClient {
override def processTrace(trace: Seq[LwcEvent.Span]): Unit = {
traceHandlers.foreachEntry { (sub, filter) =>
if (TraceMatcher.matches(filter.q, trace)) {
val filtered = trace.filter(event => ExprUtils.matches(filter.f.query, event.tagValue))
val filtered = trace.filter(event => ExprUtils.matches(filter.f, event.tagValue))
if (filtered.nonEmpty) {
submit(sub.id, LwcEvent.Events(filtered))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.netflix.atlas.core.model.EventExpr
import com.netflix.atlas.core.model.EventVocabulary
import com.netflix.atlas.core.model.ModelExtractors
import com.netflix.atlas.core.model.Query
import com.netflix.atlas.core.model.StyleExpr
import com.netflix.atlas.core.model.TraceQuery
import com.netflix.atlas.core.model.TraceVocabulary
import com.netflix.atlas.core.stacklang.Interpreter
Expand Down Expand Up @@ -56,18 +57,31 @@ private[events] object ExprUtils {
private val traceInterpreter = Interpreter(TraceVocabulary.allWords)

private def matchAllSpans(q: TraceQuery): TraceQuery.SpanFilter = {
TraceQuery.SpanFilter(q, DataExpr.All(Query.True))
TraceQuery.SpanFilter(q, Query.True)
}

/** Parse a single trace query expression. */
def parseTraceQuery(str: String): TraceQuery.SpanFilter = {
/** Parse a single trace events query expression. */
def parseTraceEventsQuery(str: String): TraceQuery.SpanFilter = {
traceInterpreter.execute(str).stack match {
case TraceQueryType(q) :: Nil => matchAllSpans(q)
case (f: TraceQuery.SpanFilter) :: Nil => f
case _ => throw new IllegalArgumentException(str)
}
}

private def sumAllSpans(q: TraceQuery): TraceQuery.SpanTimeSeries = {
TraceQuery.SpanTimeSeries(q, StyleExpr(DataExpr.Sum(Query.True), Map.empty))
}

/** Parse a single trace time series query expression. */
def parseTraceTimeSeriesQuery(str: String): TraceQuery.SpanTimeSeries = {
traceInterpreter.execute(str).stack match {
case TraceQueryType(q) :: Nil => sumAllSpans(q)
case (f: TraceQuery.SpanTimeSeries) :: Nil => f
case _ => throw new IllegalArgumentException(str)
}
}

/** Convert from Atlas query model to Spectator to use with a QueryIndex. */
def toSpectatorQuery(query: Query): SpectatorQuery = {
Parser.parseQuery(query.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.netflix.atlas.lwc.events
import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.model.EventExpr
import com.netflix.atlas.core.model.Query
import com.netflix.atlas.core.model.StyleExpr
import com.netflix.atlas.core.model.TraceQuery
import munit.FunSuite

Expand Down Expand Up @@ -53,9 +54,9 @@ class ExprUtilsSuite extends FunSuite {
test("trace: simple query") {
val expected = TraceQuery.SpanFilter(
TraceQuery.Simple(Query.Equal("app", "foo")),
DataExpr.All(Query.True)
Query.True
)
assertEquals(ExprUtils.parseTraceQuery("app,foo,:eq"), expected)
assertEquals(ExprUtils.parseTraceEventsQuery("app,foo,:eq"), expected)
}

test("trace: complex") {
Expand All @@ -64,21 +65,21 @@ class ExprUtilsSuite extends FunSuite {
Query.Equal("app", "foo"),
Query.Equal("app", "bar")
),
DataExpr.All(Query.Equal("app", "foo"))
Query.Equal("app", "foo")
)
val expr = "app,foo,:eq,app,bar,:eq,:child,app,foo,:eq,:span-filter"
assertEquals(ExprUtils.parseTraceQuery(expr), expected)
assertEquals(ExprUtils.parseTraceEventsQuery(expr), expected)
}

test("trace: analytics") {
val expected = TraceQuery.SpanFilter(
val expected = TraceQuery.SpanTimeSeries(
TraceQuery.Child(
Query.Equal("app", "foo"),
Query.Equal("app", "bar")
),
DataExpr.Sum(Query.Equal("app", "foo"))
StyleExpr(DataExpr.Sum(Query.Equal("app", "foo")), Map.empty)
)
val expr = "app,foo,:eq,app,bar,:eq,:child,app,foo,:eq,:sum,:span-filter"
assertEquals(ExprUtils.parseTraceQuery(expr), expected)
val expr = "app,foo,:eq,app,bar,:eq,:child,app,foo,:eq,:sum,:span-time-series"
assertEquals(ExprUtils.parseTraceTimeSeriesQuery(expr), expected)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,37 +58,37 @@ class TraceMatcherSuite extends FunSuite {
}

test("simple") {
checkMatch(ExprUtils.parseTraceQuery("app,a,:eq").q, true)
checkMatch(ExprUtils.parseTraceQuery("app,z,:eq").q, false)
checkMatch(ExprUtils.parseTraceEventsQuery("app,a,:eq").q, true)
checkMatch(ExprUtils.parseTraceEventsQuery("app,z,:eq").q, false)
}

test("span-and") {
val q1 = ExprUtils.parseTraceQuery("app,a,:eq,app,e,:eq,:span-and").q
val q1 = ExprUtils.parseTraceEventsQuery("app,a,:eq,app,e,:eq,:span-and").q
checkMatch(q1, true)

val q2 = ExprUtils.parseTraceQuery("app,a,:eq,app,z,:eq,:span-and").q
val q2 = ExprUtils.parseTraceEventsQuery("app,a,:eq,app,z,:eq,:span-and").q
checkMatch(q2, false)
}

test("span-or") {
val q1 = ExprUtils.parseTraceQuery("app,a,:eq,app,e,:eq,:span-or").q
val q1 = ExprUtils.parseTraceEventsQuery("app,a,:eq,app,e,:eq,:span-or").q
checkMatch(q1, true)

val q2 = ExprUtils.parseTraceQuery("app,a,:eq,app,z,:eq,:span-or").q
val q2 = ExprUtils.parseTraceEventsQuery("app,a,:eq,app,z,:eq,:span-or").q
checkMatch(q2, true)

val q3 = ExprUtils.parseTraceQuery("app,y,:eq,app,z,:eq,:span-or").q
val q3 = ExprUtils.parseTraceEventsQuery("app,y,:eq,app,z,:eq,:span-or").q
checkMatch(q3, false)
}

test("child") {
val q1 = ExprUtils.parseTraceQuery("app,a,:eq,app,c,:eq,:child").q
val q1 = ExprUtils.parseTraceEventsQuery("app,a,:eq,app,c,:eq,:child").q
checkMatch(q1, true)

val q2 = ExprUtils.parseTraceQuery("app,c,:eq,app,e,:eq,:child").q
val q2 = ExprUtils.parseTraceEventsQuery("app,c,:eq,app,e,:eq,:child").q
checkMatch(q2, true)

val q3 = ExprUtils.parseTraceQuery("app,a,:eq,app,e,:eq,:child").q
val q3 = ExprUtils.parseTraceEventsQuery("app,a,:eq,app,e,:eq,:child").q
checkMatch(q3, false)
}
}
Expand Down

0 comments on commit 0d8a345

Please sign in to comment.