Skip to content

Commit

Permalink
core: fixes for sub-second step size (#1700)
Browse files Browse the repository at this point in the history
Add some tests and put in some patches to address minor
issues with using sub-second step sizes. There is a known
issue with percentiles operator that will be addressed in
a follow up PR after some updates to spectator utility
classes.
  • Loading branch information
brharrington authored Oct 10, 2024
1 parent 9558c35 commit 85bb3a0
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,9 @@ object MathExpr {
override def eval(context: EvalContext, data: Map[DataExpr, List[TimeSeries]]): ResultSet = {
val rs = expr.eval(context, data)
val newData = rs.data.map { t =>
// Assumes rate-per-second counter
val multiple = t.data.step / 1000
// Assumes rate-per-second counter. If the step size is less than a second, then it
// will be a fractional multiple and reduce the amount.
val multiple = t.data.step / 1000.0
t.unaryOp(s"$name(%s)", v => v * multiple)
}
ResultSet(this, newData, rs.state)
Expand Down Expand Up @@ -920,7 +921,7 @@ object MathExpr {
val results = new Array[Double](pcts.length)

// Inputs are counters reported as a rate per second. We need to convert to a rate per
// step to get the correct counts for the estimation
// step to get the correct counts for the estimation.
val multiple = context.step / 1000.0

// If the input was a timer the unit for the buckets is nanoseconds. The type is reflected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ class TimeSeriesBufferSuite extends FunSuite {

private val emptyTags = Map.empty[String, String]

private def newBuffer(v: Double, start: Long = 0L) = {
TimeSeriesBuffer(emptyTags, 60000, start, Array.fill(1)(v))
}

private def newBufferN(v: Double, n: Int, start: Long = 0L) = {
TimeSeriesBuffer(emptyTags, 60000, start, Array.fill(n)(v))
private def newBuffer(
v: Double,
step: Long = 60000L,
start: Long = 0L,
n: Int = 1
): TimeSeriesBuffer = {
TimeSeriesBuffer(emptyTags, step, start, Array.fill(n)(v))
}

test("apply List[Block]") {
Expand Down Expand Up @@ -107,7 +108,7 @@ class TimeSeriesBufferSuite extends FunSuite {
test("add Block with step 10s cf 6m") {
val tags = emptyTags
val step = 10000L
val blockSize = 6 * 60
val blockSize = 6 * 60 // each block is 1h
val blocks =
(0 until 1000).map(i => ConstantBlock(i * blockSize * step, blockSize, 4.0)).toList

Expand All @@ -123,6 +124,44 @@ class TimeSeriesBufferSuite extends FunSuite {
assert(m.values.forall(_ == 4.0))
}

test("add Block with step 50ms cf 30s") {
val tags = emptyTags
val step = 50L
val blockSize = 20 * 60 // each block is 1m
val blocks =
(0 until 1000).map(i => ConstantBlock(i * blockSize * step, blockSize, 4.0)).toList

val multiple = 10 * 60
val consol = multiple * step
val buffer = TimeSeriesBuffer(tags, consol, consol, 20 * consol)
blocks.foreach { b =>
buffer.aggrBlock(b, Block.Sum, ConsolidationFunction.Max, multiple, Math.addNaN)
}
val m = buffer
assertEquals(m.step, consol)
assertEquals(m.start, consol)
assert(m.values.forall(_ == 4.0))
}

test("add Block with step 1ms cf 30s") {
val tags = emptyTags
val step = 1L
val blockSize = 1000 * 60 // each block is 1m
val blocks =
(0 until 1000).map(i => ConstantBlock(i * blockSize * step, blockSize, 4.0)).toList

val multiple = 1000 * 30
val consol = multiple * step
val buffer = TimeSeriesBuffer(tags, consol, consol, 20 * consol)
blocks.foreach { b =>
buffer.aggrBlock(b, Block.Sum, ConsolidationFunction.Max, multiple, Math.addNaN)
}
val m = buffer
assertEquals(m.step, consol)
assertEquals(m.start, consol)
assert(m.values.forall(_ == 4.0))
}

test("aggregate tags") {
// No longer done, it will always use the tags from the initial buffer
val common = Map("a" -> "b", "c" -> "d")
Expand Down Expand Up @@ -349,17 +388,17 @@ class TimeSeriesBufferSuite extends FunSuite {
}

test("getValue prior to start") {
val b1 = newBuffer(42.0, 120000)
val b1 = newBuffer(42.0, start = 120000)
assert(b1.getValue(60000).isNaN)
}

test("getValue after the end") {
val b1 = newBuffer(42.0, 120000)
val b1 = newBuffer(42.0, start = 120000)
assert(b1.getValue(240000).isNaN)
}

test("getValue with match") {
val b1 = newBuffer(42.0, 120000)
val b1 = newBuffer(42.0, start = 120000)
assertEquals(b1.getValue(129000), 42.0)
}

Expand Down Expand Up @@ -400,15 +439,15 @@ class TimeSeriesBufferSuite extends FunSuite {
}

test("merge diff sizes b1.length < b2.length") {
val b1 = newBufferN(1.0, 1)
val b2 = newBufferN(2.0, 2)
val b1 = newBuffer(1.0, n = 1)
val b2 = newBuffer(2.0, n = 2)
b1.merge(b2)
assert(b1.values(0) == 2.0)
}

test("merge diff sizes b1.length > b2.length") {
val b1 = newBufferN(7.0, 1)
val b2 = newBufferN(2.0, 2)
val b1 = newBuffer(7.0, n = 1)
val b2 = newBuffer(2.0, n = 2)
b2.merge(b1)
assert(b2.values(0) == 7.0)
assert(b2.values(1) == 2.0)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.core.model

import com.netflix.atlas.core.stacklang.Interpreter
import munit.FunSuite

import scala.language.postfixOps

class PerStepSuite extends FunSuite {

private val interpreter = Interpreter(MathVocabulary.allWords)

def eval(str: String, step: Long, value: Double): List[TimeSeries] = {
val expr = interpreter.execute(str).stack match {
case (v: TimeSeriesExpr) :: _ => v
case _ => throw new IllegalArgumentException("invalid expr")
}
val seq = new FunctionTimeSeq(DsType.Rate, step, _ => value)
val input = List(LazyTimeSeries(Map("name" -> "test"), "test", seq))
val context = EvalContext(0L, step * 2, step)
expr.eval(context, input).data
}

test("1ms step") {
val step = 1L
val data = eval("name,test,:eq,:sum,:per-step", step, 4.0)
assertEquals(data.size, 1)
assertEquals(data.head.data.apply(step), 0.004)
}

test("500ms step") {
val step = 500L
val data = eval("name,test,:eq,:sum,:per-step", step, 4.0)
assertEquals(data.size, 1)
assertEquals(data.head.data.apply(step), 2.0)
}

test("1s step") {
val step = 1_000L
val data = eval("name,test,:eq,:sum,:per-step", step, 4.0)
assertEquals(data.size, 1)
assertEquals(data.head.data.apply(step), 4.0)
}

test("5s step") {
val step = 5_000L
val data = eval("name,test,:eq,:sum,:per-step", step, 4.0)
assertEquals(data.size, 1)
assertEquals(data.head.data.apply(step), 20.0)
}

test("1m step") {
val step = 60_000L
val data = eval("name,test,:eq,:sum,:per-step", step, 4.0)
assertEquals(data.size, 1)
assertEquals(data.head.data.apply(step), 240.0)
}

test("10m step") {
val step = 600_000L
val data = eval("name,test,:eq,:sum,:per-step", step, 4.0)
assertEquals(data.size, 1)
assertEquals(data.head.data.apply(step), 2400.0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,26 @@ import munit.FunSuite

class TrendSuite extends FunSuite {

private val step = 60000L
private val dataTags = Map("name" -> "cpu", "node" -> "i-1")

private val alignedStream = List(
List(Datapoint(dataTags, 0L * step, 1.0)),
List(Datapoint(dataTags, 1L * step, 1.5)),
List(Datapoint(dataTags, 2L * step, 1.6)),
List(Datapoint(dataTags, 3L * step, 1.7)),
List(Datapoint(dataTags, 4L * step, 1.4)),
List(Datapoint(dataTags, 5L * step, 1.3)),
List(Datapoint(dataTags, 6L * step, 1.2)),
List(Datapoint(dataTags, 7L * step, 1.0)),
List(Datapoint(dataTags, 8L * step, 0.0)),
List(Datapoint(dataTags, 9L * step, 0.0)),
List(Datapoint(dataTags, 10L * step, 1.0)),
List(Datapoint(dataTags, 11L * step, 1.1)),
List(Datapoint(dataTags, 12L * step, 1.2)),
List(Datapoint(dataTags, 13L * step, 1.2))
private def alignedStream(step: Long): List[List[Datapoint]] = List(
List(Datapoint(dataTags, 0L * step, 1.0, step)),
List(Datapoint(dataTags, 1L * step, 1.5, step)),
List(Datapoint(dataTags, 2L * step, 1.6, step)),
List(Datapoint(dataTags, 3L * step, 1.7, step)),
List(Datapoint(dataTags, 4L * step, 1.4, step)),
List(Datapoint(dataTags, 5L * step, 1.3, step)),
List(Datapoint(dataTags, 6L * step, 1.2, step)),
List(Datapoint(dataTags, 7L * step, 1.0, step)),
List(Datapoint(dataTags, 8L * step, 0.0, step)),
List(Datapoint(dataTags, 9L * step, 0.0, step)),
List(Datapoint(dataTags, 10L * step, 1.0, step)),
List(Datapoint(dataTags, 11L * step, 1.1, step)),
List(Datapoint(dataTags, 12L * step, 1.2, step)),
List(Datapoint(dataTags, 13L * step, 1.2, step))
)

private val alignedInputTS = TimeSeries(
private def alignedInputTS(step: Long): TimeSeries = TimeSeries(
dataTags,
new ArrayTimeSeq(
DsType.Gauge,
Expand All @@ -51,12 +50,16 @@ class TrendSuite extends FunSuite {
)
)

private val trend = StatefulExpr.Trend(
private def trend(step: Long): StatefulExpr.Trend = StatefulExpr.Trend(
DataExpr.Sum(Query.Equal("name", "cpu")),
Duration.ofMillis(step * 3)
)

def eval(expr: TimeSeriesExpr, data: List[List[Datapoint]]): List[List[TimeSeries]] = {
def eval(
step: Long,
expr: TimeSeriesExpr,
data: List[List[Datapoint]]
): List[List[TimeSeries]] = {
var state = Map.empty[StatefulExpr, Any]
data.map { ts =>
val t = ts.head.timestamp
Expand All @@ -67,12 +70,18 @@ class TrendSuite extends FunSuite {
}
}

test("trend: incremental exec matches global") {
private def incrementalMatchesGlobal(step: Long): Unit = {
val s = 0L
val e = 14L * step
val context = EvalContext(s, e, step, Map.empty)
val expected = trend.eval(context, List(alignedInputTS)).data.head.data.bounded(s, e).data
val result = eval(trend, alignedStream)
val expected = trend(step)
.eval(context, List(alignedInputTS(step)))
.data
.head
.data
.bounded(s, e)
.data
val result = eval(step, trend(step), alignedStream(step))

result.zip(expected).zipWithIndex.foreach {
case ((ts, v), i) =>
Expand All @@ -86,4 +95,16 @@ class TrendSuite extends FunSuite {
}
}
}

test("trend: incremental exec matches global, 1ms") {
incrementalMatchesGlobal(1L)
}

test("trend: incremental exec matches global, 50ms") {
incrementalMatchesGlobal(50L)
}

test("trend: incremental exec matches global, 1m") {
incrementalMatchesGlobal(60000L)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,14 @@ class RateValueFunctionSuite extends FunSuite {
listVF
}

test("basic") {
test("basic, 1ms step") {
val vf = newFunction
assertEquals(vf.update(1L, 1.0), Nil)
assertEquals(vf.update(2L, 2.0), List(2L -> 1000.0))
assertEquals(vf.update(3L, 4.0), List(3L -> 2000.0))
}

test("basic, 5ms step") {
val vf = newFunction
assertEquals(vf.update(5L, 1.0), Nil)
assertEquals(vf.update(15L, 2.0), List(15L -> 100.0))
Expand Down

0 comments on commit 85bb3a0

Please sign in to comment.