Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add span-time-series operator #1612

Merged
merged 3 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,19 @@ import com.typesafe.config.Config
* `:\$name`.
* - `base-query`: query for the denominator.
* - `keys`: tag keys that are available for use on the denominator.
*
* @param dependencies
* Other vocabularies to depend on, defaults to the `StyleVocabulary`.
*/
class CustomVocabulary(config: Config) extends Vocabulary {
class CustomVocabulary(config: Config, dependencies: List[Vocabulary] = List(StyleVocabulary))
extends Vocabulary {

import CustomVocabulary.*
import scala.jdk.CollectionConverters.*

val name: String = "custom"

val dependsOn: List[Vocabulary] = List(StyleVocabulary)
val dependsOn: List[Vocabulary] = dependencies

val words: List[Word] = {
val vocab = config.getConfig("atlas.core.vocabulary")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,8 @@ object TraceQuery {
case class Child(q1: Query, q2: Query) extends TraceQuery

/** Filter to select the set of spans from a trace to forward as events. */
case class SpanFilter(q: TraceQuery, f: DataExpr) extends Expr
case class SpanFilter(q: TraceQuery, f: Query) extends Expr

/** Time series based on data from a set of matching traces. */
case class SpanTimeSeries(q: TraceQuery, expr: StyleExpr) extends Expr
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ object TraceVocabulary extends Vocabulary {

val name: String = "trace"

val dependsOn: List[Vocabulary] = List(DataVocabulary)
val dependsOn: List[Vocabulary] = List(StyleVocabulary)

override def words: List[Word] = List(
SpanAndWord,
SpanOrWord,
SpanFilterWord,
SpanTimeSeriesWord,
ChildWord
)

Expand Down Expand Up @@ -109,14 +110,11 @@ object TraceVocabulary extends Vocabulary {
override def name: String = "span-filter"

override protected def matcher: PartialFunction[List[Any], Boolean] = {
case (_: Query) :: TraceQueryType(_) :: _ => true
case (_: DataExpr) :: TraceQueryType(_) :: _ => true
case (_: Query) :: TraceQueryType(_) :: _ => true
}

override protected def executor: PartialFunction[List[Any], List[Any]] = {
case (f: Query) :: TraceQueryType(q) :: stack =>
TraceQuery.SpanFilter(q, DataExpr.All(f)) :: stack
case (f: DataExpr) :: TraceQueryType(q) :: stack =>
TraceQuery.SpanFilter(q, f) :: stack
}

Expand All @@ -129,4 +127,27 @@ object TraceVocabulary extends Vocabulary {

override def examples: List[String] = List("app,foo,:eq,app,bar,:eq")
}

case object SpanTimeSeriesWord extends SimpleWord {

override def name: String = "span-time-series"

override protected def matcher: PartialFunction[List[Any], Boolean] = {
case PresentationType(_) :: TraceQueryType(_) :: _ => true
}

override protected def executor: PartialFunction[List[Any], List[Any]] = {
case PresentationType(f: StyleExpr) :: TraceQueryType(q) :: stack =>
TraceQuery.SpanTimeSeries(q, f) :: stack
}

override def signature: String = "q:TraceQuery f:Query -- SpanFilter"

override def summary: String =
"""
|Time series based on data from a set of matching traces.
|""".stripMargin

override def examples: List[String] = List("app,foo,:eq,app,bar,:eq,:sum,ts,:legend")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ class TraceVocabularySuite extends FunSuite {
}
}

private def parseTimeSeries(str: String): TraceQuery.SpanTimeSeries = {
interpreter.execute(str).stack match {
case (t: TraceQuery.SpanTimeSeries) :: Nil => t
case _ => throw new IllegalArgumentException(str)
}
}

test("simple Query coerced to TraceQuery") {
val q = parseTraceQuery("app,foo,:eq")
assertEquals(q, TraceQuery.Simple(Query.Equal("app", "foo")))
Expand Down Expand Up @@ -76,7 +83,19 @@ class TraceVocabularySuite extends FunSuite {
Query.Equal("app", "foo"),
Query.Equal("app", "bar")
),
DataExpr.All(Query.Equal("app", "foo"))
Query.Equal("app", "foo")
)
assertEquals(q, expected)
}

test("span-time-series") {
val q = parseTimeSeries("app,foo,:eq,app,bar,:eq,:child,app,foo,:eq,:span-time-series")
val expected = TraceQuery.SpanTimeSeries(
TraceQuery.Child(
Query.Equal("app", "foo"),
Query.Equal("app", "bar")
),
StyleExpr(DataExpr.Sum(Query.Equal("app", "foo")), Map.empty)
)
assertEquals(q, expected)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@

/** Indicates the type of expression for a subscription. */
public enum ExprType {

/** Expression to select a set of events to be passed through. */
EVENTS,

/**
* Time series expression such as used with Atlas Graph API. Can also be used for analytics
* queries on top of event data.
*/
TIME_SERIES,

/** Expression to select a set of events to be passed through. */
EVENTS,

/** Expression to select a set of traces to be passed through. */
TRACES
TRACE_EVENTS,

/** Time series expression based on data extraced from traces. */
TRACE_TIME_SERIES
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ abstract class AbstractLwcEventClient extends LwcEventClient {

// Trace pass-through
traceHandlers = subscriptions.tracePassThrough.map { sub =>
sub -> ExprUtils.parseTraceQuery(sub.expression)
sub -> ExprUtils.parseTraceEventsQuery(sub.expression)
}.toMap

index = idx
Expand Down Expand Up @@ -125,7 +125,7 @@ abstract class AbstractLwcEventClient extends LwcEventClient {
override def processTrace(trace: Seq[LwcEvent.Span]): Unit = {
traceHandlers.foreachEntry { (sub, filter) =>
if (TraceMatcher.matches(filter.q, trace)) {
val filtered = trace.filter(event => ExprUtils.matches(filter.f.query, event.tagValue))
val filtered = trace.filter(event => ExprUtils.matches(filter.f, event.tagValue))
if (filtered.nonEmpty) {
submit(sub.id, LwcEvent.Events(filtered))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.netflix.atlas.core.model.EventExpr
import com.netflix.atlas.core.model.EventVocabulary
import com.netflix.atlas.core.model.ModelExtractors
import com.netflix.atlas.core.model.Query
import com.netflix.atlas.core.model.StyleExpr
import com.netflix.atlas.core.model.TraceQuery
import com.netflix.atlas.core.model.TraceVocabulary
import com.netflix.atlas.core.stacklang.Interpreter
Expand Down Expand Up @@ -56,18 +57,31 @@ private[events] object ExprUtils {
private val traceInterpreter = Interpreter(TraceVocabulary.allWords)

private def matchAllSpans(q: TraceQuery): TraceQuery.SpanFilter = {
TraceQuery.SpanFilter(q, DataExpr.All(Query.True))
TraceQuery.SpanFilter(q, Query.True)
}

/** Parse a single trace query expression. */
def parseTraceQuery(str: String): TraceQuery.SpanFilter = {
/** Parse a single trace events query expression. */
def parseTraceEventsQuery(str: String): TraceQuery.SpanFilter = {
traceInterpreter.execute(str).stack match {
case TraceQueryType(q) :: Nil => matchAllSpans(q)
case (f: TraceQuery.SpanFilter) :: Nil => f
case _ => throw new IllegalArgumentException(str)
}
}

private def sumAllSpans(q: TraceQuery): TraceQuery.SpanTimeSeries = {
TraceQuery.SpanTimeSeries(q, StyleExpr(DataExpr.Sum(Query.True), Map.empty))
}

/** Parse a single trace time series query expression. */
def parseTraceTimeSeriesQuery(str: String): TraceQuery.SpanTimeSeries = {
traceInterpreter.execute(str).stack match {
case TraceQueryType(q) :: Nil => sumAllSpans(q)
case (f: TraceQuery.SpanTimeSeries) :: Nil => f
case _ => throw new IllegalArgumentException(str)
}
}

/** Convert from Atlas query model to Spectator to use with a QueryIndex. */
def toSpectatorQuery(query: Query): SpectatorQuery = {
Parser.parseQuery(query.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.netflix.atlas.lwc.events
import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.model.EventExpr
import com.netflix.atlas.core.model.Query
import com.netflix.atlas.core.model.StyleExpr
import com.netflix.atlas.core.model.TraceQuery
import munit.FunSuite

Expand Down Expand Up @@ -53,9 +54,9 @@ class ExprUtilsSuite extends FunSuite {
test("trace: simple query") {
val expected = TraceQuery.SpanFilter(
TraceQuery.Simple(Query.Equal("app", "foo")),
DataExpr.All(Query.True)
Query.True
)
assertEquals(ExprUtils.parseTraceQuery("app,foo,:eq"), expected)
assertEquals(ExprUtils.parseTraceEventsQuery("app,foo,:eq"), expected)
}

test("trace: complex") {
Expand All @@ -64,21 +65,21 @@ class ExprUtilsSuite extends FunSuite {
Query.Equal("app", "foo"),
Query.Equal("app", "bar")
),
DataExpr.All(Query.Equal("app", "foo"))
Query.Equal("app", "foo")
)
val expr = "app,foo,:eq,app,bar,:eq,:child,app,foo,:eq,:span-filter"
assertEquals(ExprUtils.parseTraceQuery(expr), expected)
assertEquals(ExprUtils.parseTraceEventsQuery(expr), expected)
}

test("trace: analytics") {
val expected = TraceQuery.SpanFilter(
val expected = TraceQuery.SpanTimeSeries(
TraceQuery.Child(
Query.Equal("app", "foo"),
Query.Equal("app", "bar")
),
DataExpr.Sum(Query.Equal("app", "foo"))
StyleExpr(DataExpr.Sum(Query.Equal("app", "foo")), Map.empty)
)
val expr = "app,foo,:eq,app,bar,:eq,:child,app,foo,:eq,:sum,:span-filter"
assertEquals(ExprUtils.parseTraceQuery(expr), expected)
val expr = "app,foo,:eq,app,bar,:eq,:child,app,foo,:eq,:sum,:span-time-series"
assertEquals(ExprUtils.parseTraceTimeSeriesQuery(expr), expected)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,37 +58,37 @@ class TraceMatcherSuite extends FunSuite {
}

test("simple") {
checkMatch(ExprUtils.parseTraceQuery("app,a,:eq").q, true)
checkMatch(ExprUtils.parseTraceQuery("app,z,:eq").q, false)
checkMatch(ExprUtils.parseTraceEventsQuery("app,a,:eq").q, true)
checkMatch(ExprUtils.parseTraceEventsQuery("app,z,:eq").q, false)
}

test("span-and") {
val q1 = ExprUtils.parseTraceQuery("app,a,:eq,app,e,:eq,:span-and").q
val q1 = ExprUtils.parseTraceEventsQuery("app,a,:eq,app,e,:eq,:span-and").q
checkMatch(q1, true)

val q2 = ExprUtils.parseTraceQuery("app,a,:eq,app,z,:eq,:span-and").q
val q2 = ExprUtils.parseTraceEventsQuery("app,a,:eq,app,z,:eq,:span-and").q
checkMatch(q2, false)
}

test("span-or") {
val q1 = ExprUtils.parseTraceQuery("app,a,:eq,app,e,:eq,:span-or").q
val q1 = ExprUtils.parseTraceEventsQuery("app,a,:eq,app,e,:eq,:span-or").q
checkMatch(q1, true)

val q2 = ExprUtils.parseTraceQuery("app,a,:eq,app,z,:eq,:span-or").q
val q2 = ExprUtils.parseTraceEventsQuery("app,a,:eq,app,z,:eq,:span-or").q
checkMatch(q2, true)

val q3 = ExprUtils.parseTraceQuery("app,y,:eq,app,z,:eq,:span-or").q
val q3 = ExprUtils.parseTraceEventsQuery("app,y,:eq,app,z,:eq,:span-or").q
checkMatch(q3, false)
}

test("child") {
val q1 = ExprUtils.parseTraceQuery("app,a,:eq,app,c,:eq,:child").q
val q1 = ExprUtils.parseTraceEventsQuery("app,a,:eq,app,c,:eq,:child").q
checkMatch(q1, true)

val q2 = ExprUtils.parseTraceQuery("app,c,:eq,app,e,:eq,:child").q
val q2 = ExprUtils.parseTraceEventsQuery("app,c,:eq,app,e,:eq,:child").q
checkMatch(q2, true)

val q3 = ExprUtils.parseTraceQuery("app,a,:eq,app,e,:eq,:child").q
val q3 = ExprUtils.parseTraceEventsQuery("app,a,:eq,app,e,:eq,:child").q
checkMatch(q3, false)
}
}
Expand Down
Loading