Skip to content

Commit

Permalink
eval: preserve state with multi-level grouping (#1579)
Browse files Browse the repository at this point in the history
When executing an expression in a streaming context, stateful
operators used as input to a second level grouping were not
preserving the state.
  • Loading branch information
brharrington authored Sep 19, 2023
1 parent a34ca16 commit 834456f
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ object MathExpr {
List(TimeSeries(tags, k, t.data))
}

ResultSet(this, newData, context.state)
ResultSet(this, newData, inner.state)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,54 @@ class FinalExprEvalSuite extends FunSuite {
})
}

test("multi-level group by with stateful operation") {
val expr = DataExpr.GroupBy(DataExpr.Sum(Query.Equal("name", "rps")), List("node", "app"))
val input = List(
sources(ds("a", s"http://atlas/graph?q=$expr,2,:rolling-max,:max,(,app,),:by")),
group(
0,
AggrDatapoint(0, step, expr, "i-1", Map("app" -> "foo", "node" -> "i-1"), 42.0),
AggrDatapoint(0, step, expr, "i-2", Map("app" -> "foo", "node" -> "i-2"), 1.0)
),
group(
1,
AggrDatapoint(0, step, expr, "i-1", Map("app" -> "foo", "node" -> "i-1"), 42.0),
AggrDatapoint(0, step, expr, "i-2", Map("app" -> "foo", "node" -> "i-2"), 21.0)
),
group(
2,
AggrDatapoint(0, step, expr, "i-1", Map("app" -> "foo", "node" -> "i-1"), 42.0),
AggrDatapoint(0, step, expr, "i-2", Map("app" -> "foo", "node" -> "i-2"), 84.0)
),
group(
3,
AggrDatapoint(0, step, expr, "i-1", Map("app" -> "foo", "node" -> "i-1"), 42.0),
AggrDatapoint(0, step, expr, "i-2", Map("app" -> "foo", "node" -> "i-2"), 21.0)
),
group(
4,
AggrDatapoint(0, step, expr, "i-1", Map("app" -> "foo", "node" -> "i-1"), 21.0),
AggrDatapoint(0, step, expr, "i-2", Map("app" -> "foo", "node" -> "i-2"), 21.0)
),
group(
5,
AggrDatapoint(0, step, expr, "i-1", Map("app" -> "foo", "node" -> "i-1"), 21.0),
AggrDatapoint(0, step, expr, "i-2", Map("app" -> "foo", "node" -> "i-2"), 21.0)
)
)

val output = run(input)

val expected = Array(42.0, 42.0, 84.0, 84.0, 42.0, 21.0)
val timeseries = output.filter(isTimeSeries)
assertEquals(timeseries.size, 6)
timeseries.zip(expected).foreach {
case (envelope, expectedValue) =>
val ts = envelope.message.asInstanceOf[TimeSeriesMessage]
checkValue(ts, expectedValue)
}
}

// https://github.com/Netflix/atlas/issues/762
test(":legend is honored") {
val expr = DataExpr.Sum(Query.Equal("name", "rps"))
Expand Down

0 comments on commit 834456f

Please sign in to comment.