diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/model/MathExpr.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/model/MathExpr.scala index ca1de4caf..a3c90dc8c 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/model/MathExpr.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/model/MathExpr.scala @@ -402,8 +402,9 @@ object MathExpr { override def eval(context: EvalContext, data: Map[DataExpr, List[TimeSeries]]): ResultSet = { val rs = expr.eval(context, data) val newData = rs.data.map { t => - // Assumes rate-per-second counter - val multiple = t.data.step / 1000 + // Assumes rate-per-second counter. If the step size is less than a second, then it + // will be a fractional multiple and reduce the amount. + val multiple = t.data.step / 1000.0 t.unaryOp(s"$name(%s)", v => v * multiple) } ResultSet(this, newData, rs.state) @@ -920,7 +921,7 @@ object MathExpr { val results = new Array[Double](pcts.length) // Inputs are counters reported as a rate per second. We need to convert to a rate per - // step to get the correct counts for the estimation + // step to get the correct counts for the estimation. val multiple = context.step / 1000.0 // If the input was a timer the unit for the buckets is nanoseconds. The type is reflected diff --git a/atlas-core/src/test/scala/com/netflix/atlas/core/db/TimeSeriesBufferSuite.scala b/atlas-core/src/test/scala/com/netflix/atlas/core/db/TimeSeriesBufferSuite.scala index 1c96c928a..4c54179aa 100644 --- a/atlas-core/src/test/scala/com/netflix/atlas/core/db/TimeSeriesBufferSuite.scala +++ b/atlas-core/src/test/scala/com/netflix/atlas/core/db/TimeSeriesBufferSuite.scala @@ -33,12 +33,13 @@ class TimeSeriesBufferSuite extends FunSuite { private val emptyTags = Map.empty[String, String] - private def newBuffer(v: Double, start: Long = 0L) = { - TimeSeriesBuffer(emptyTags, 60000, start, Array.fill(1)(v)) - } - - private def newBufferN(v: Double, n: Int, start: Long = 0L) = { - TimeSeriesBuffer(emptyTags, 60000, start, Array.fill(n)(v)) + private def newBuffer( + v: Double, + step: Long = 60000L, + start: Long = 0L, + n: Int = 1 + ): TimeSeriesBuffer = { + TimeSeriesBuffer(emptyTags, step, start, Array.fill(n)(v)) } test("apply List[Block]") { @@ -107,7 +108,7 @@ class TimeSeriesBufferSuite extends FunSuite { test("add Block with step 10s cf 6m") { val tags = emptyTags val step = 10000L - val blockSize = 6 * 60 + val blockSize = 6 * 60 // each block is 1h val blocks = (0 until 1000).map(i => ConstantBlock(i * blockSize * step, blockSize, 4.0)).toList @@ -123,6 +124,44 @@ class TimeSeriesBufferSuite extends FunSuite { assert(m.values.forall(_ == 4.0)) } + test("add Block with step 50ms cf 30s") { + val tags = emptyTags + val step = 50L + val blockSize = 20 * 60 // each block is 1m + val blocks = + (0 until 1000).map(i => ConstantBlock(i * blockSize * step, blockSize, 4.0)).toList + + val multiple = 10 * 60 + val consol = multiple * step + val buffer = TimeSeriesBuffer(tags, consol, consol, 20 * consol) + blocks.foreach { b => + buffer.aggrBlock(b, Block.Sum, ConsolidationFunction.Max, multiple, Math.addNaN) + } + val m = buffer + assertEquals(m.step, consol) + assertEquals(m.start, consol) + assert(m.values.forall(_ == 4.0)) + } + + test("add Block with step 1ms cf 30s") { + val tags = emptyTags + val step = 1L + val blockSize = 1000 * 60 // each block is 1m + val blocks = + (0 until 1000).map(i => ConstantBlock(i * blockSize * step, blockSize, 4.0)).toList + + val multiple = 1000 * 30 + val consol = multiple * step + val buffer = TimeSeriesBuffer(tags, consol, consol, 20 * consol) + blocks.foreach { b => + buffer.aggrBlock(b, Block.Sum, ConsolidationFunction.Max, multiple, Math.addNaN) + } + val m = buffer + assertEquals(m.step, consol) + assertEquals(m.start, consol) + assert(m.values.forall(_ == 4.0)) + } + test("aggregate tags") { // No longer done, it will always use the tags from the initial buffer val common = Map("a" -> "b", "c" -> "d") @@ -349,17 +388,17 @@ class TimeSeriesBufferSuite extends FunSuite { } test("getValue prior to start") { - val b1 = newBuffer(42.0, 120000) + val b1 = newBuffer(42.0, start = 120000) assert(b1.getValue(60000).isNaN) } test("getValue after the end") { - val b1 = newBuffer(42.0, 120000) + val b1 = newBuffer(42.0, start = 120000) assert(b1.getValue(240000).isNaN) } test("getValue with match") { - val b1 = newBuffer(42.0, 120000) + val b1 = newBuffer(42.0, start = 120000) assertEquals(b1.getValue(129000), 42.0) } @@ -400,15 +439,15 @@ class TimeSeriesBufferSuite extends FunSuite { } test("merge diff sizes b1.length < b2.length") { - val b1 = newBufferN(1.0, 1) - val b2 = newBufferN(2.0, 2) + val b1 = newBuffer(1.0, n = 1) + val b2 = newBuffer(2.0, n = 2) b1.merge(b2) assert(b1.values(0) == 2.0) } test("merge diff sizes b1.length > b2.length") { - val b1 = newBufferN(7.0, 1) - val b2 = newBufferN(2.0, 2) + val b1 = newBuffer(7.0, n = 1) + val b2 = newBuffer(2.0, n = 2) b2.merge(b1) assert(b2.values(0) == 7.0) assert(b2.values(1) == 2.0) diff --git a/atlas-core/src/test/scala/com/netflix/atlas/core/model/PerStepSuite.scala b/atlas-core/src/test/scala/com/netflix/atlas/core/model/PerStepSuite.scala new file mode 100644 index 000000000..029344bff --- /dev/null +++ b/atlas-core/src/test/scala/com/netflix/atlas/core/model/PerStepSuite.scala @@ -0,0 +1,79 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.core.model + +import com.netflix.atlas.core.stacklang.Interpreter +import munit.FunSuite + +import scala.language.postfixOps + +class PerStepSuite extends FunSuite { + + private val interpreter = Interpreter(MathVocabulary.allWords) + + def eval(str: String, step: Long, value: Double): List[TimeSeries] = { + val expr = interpreter.execute(str).stack match { + case (v: TimeSeriesExpr) :: _ => v + case _ => throw new IllegalArgumentException("invalid expr") + } + val seq = new FunctionTimeSeq(DsType.Rate, step, _ => value) + val input = List(LazyTimeSeries(Map("name" -> "test"), "test", seq)) + val context = EvalContext(0L, step * 2, step) + expr.eval(context, input).data + } + + test("1ms step") { + val step = 1L + val data = eval("name,test,:eq,:sum,:per-step", step, 4.0) + assertEquals(data.size, 1) + assertEquals(data.head.data.apply(step), 0.004) + } + + test("500ms step") { + val step = 500L + val data = eval("name,test,:eq,:sum,:per-step", step, 4.0) + assertEquals(data.size, 1) + assertEquals(data.head.data.apply(step), 2.0) + } + + test("1s step") { + val step = 1_000L + val data = eval("name,test,:eq,:sum,:per-step", step, 4.0) + assertEquals(data.size, 1) + assertEquals(data.head.data.apply(step), 4.0) + } + + test("5s step") { + val step = 5_000L + val data = eval("name,test,:eq,:sum,:per-step", step, 4.0) + assertEquals(data.size, 1) + assertEquals(data.head.data.apply(step), 20.0) + } + + test("1m step") { + val step = 60_000L + val data = eval("name,test,:eq,:sum,:per-step", step, 4.0) + assertEquals(data.size, 1) + assertEquals(data.head.data.apply(step), 240.0) + } + + test("10m step") { + val step = 600_000L + val data = eval("name,test,:eq,:sum,:per-step", step, 4.0) + assertEquals(data.size, 1) + assertEquals(data.head.data.apply(step), 2400.0) + } +} diff --git a/atlas-core/src/test/scala/com/netflix/atlas/core/model/TrendSuite.scala b/atlas-core/src/test/scala/com/netflix/atlas/core/model/TrendSuite.scala index aa0641a9f..7d892815f 100644 --- a/atlas-core/src/test/scala/com/netflix/atlas/core/model/TrendSuite.scala +++ b/atlas-core/src/test/scala/com/netflix/atlas/core/model/TrendSuite.scala @@ -21,27 +21,26 @@ import munit.FunSuite class TrendSuite extends FunSuite { - private val step = 60000L private val dataTags = Map("name" -> "cpu", "node" -> "i-1") - private val alignedStream = List( - List(Datapoint(dataTags, 0L * step, 1.0)), - List(Datapoint(dataTags, 1L * step, 1.5)), - List(Datapoint(dataTags, 2L * step, 1.6)), - List(Datapoint(dataTags, 3L * step, 1.7)), - List(Datapoint(dataTags, 4L * step, 1.4)), - List(Datapoint(dataTags, 5L * step, 1.3)), - List(Datapoint(dataTags, 6L * step, 1.2)), - List(Datapoint(dataTags, 7L * step, 1.0)), - List(Datapoint(dataTags, 8L * step, 0.0)), - List(Datapoint(dataTags, 9L * step, 0.0)), - List(Datapoint(dataTags, 10L * step, 1.0)), - List(Datapoint(dataTags, 11L * step, 1.1)), - List(Datapoint(dataTags, 12L * step, 1.2)), - List(Datapoint(dataTags, 13L * step, 1.2)) + private def alignedStream(step: Long): List[List[Datapoint]] = List( + List(Datapoint(dataTags, 0L * step, 1.0, step)), + List(Datapoint(dataTags, 1L * step, 1.5, step)), + List(Datapoint(dataTags, 2L * step, 1.6, step)), + List(Datapoint(dataTags, 3L * step, 1.7, step)), + List(Datapoint(dataTags, 4L * step, 1.4, step)), + List(Datapoint(dataTags, 5L * step, 1.3, step)), + List(Datapoint(dataTags, 6L * step, 1.2, step)), + List(Datapoint(dataTags, 7L * step, 1.0, step)), + List(Datapoint(dataTags, 8L * step, 0.0, step)), + List(Datapoint(dataTags, 9L * step, 0.0, step)), + List(Datapoint(dataTags, 10L * step, 1.0, step)), + List(Datapoint(dataTags, 11L * step, 1.1, step)), + List(Datapoint(dataTags, 12L * step, 1.2, step)), + List(Datapoint(dataTags, 13L * step, 1.2, step)) ) - private val alignedInputTS = TimeSeries( + private def alignedInputTS(step: Long): TimeSeries = TimeSeries( dataTags, new ArrayTimeSeq( DsType.Gauge, @@ -51,12 +50,16 @@ class TrendSuite extends FunSuite { ) ) - private val trend = StatefulExpr.Trend( + private def trend(step: Long): StatefulExpr.Trend = StatefulExpr.Trend( DataExpr.Sum(Query.Equal("name", "cpu")), Duration.ofMillis(step * 3) ) - def eval(expr: TimeSeriesExpr, data: List[List[Datapoint]]): List[List[TimeSeries]] = { + def eval( + step: Long, + expr: TimeSeriesExpr, + data: List[List[Datapoint]] + ): List[List[TimeSeries]] = { var state = Map.empty[StatefulExpr, Any] data.map { ts => val t = ts.head.timestamp @@ -67,12 +70,18 @@ class TrendSuite extends FunSuite { } } - test("trend: incremental exec matches global") { + private def incrementalMatchesGlobal(step: Long): Unit = { val s = 0L val e = 14L * step val context = EvalContext(s, e, step, Map.empty) - val expected = trend.eval(context, List(alignedInputTS)).data.head.data.bounded(s, e).data - val result = eval(trend, alignedStream) + val expected = trend(step) + .eval(context, List(alignedInputTS(step))) + .data + .head + .data + .bounded(s, e) + .data + val result = eval(step, trend(step), alignedStream(step)) result.zip(expected).zipWithIndex.foreach { case ((ts, v), i) => @@ -86,4 +95,16 @@ class TrendSuite extends FunSuite { } } } + + test("trend: incremental exec matches global, 1ms") { + incrementalMatchesGlobal(1L) + } + + test("trend: incremental exec matches global, 50ms") { + incrementalMatchesGlobal(50L) + } + + test("trend: incremental exec matches global, 1m") { + incrementalMatchesGlobal(60000L) + } } diff --git a/atlas-core/src/test/scala/com/netflix/atlas/core/norm/RateValueFunctionSuite.scala b/atlas-core/src/test/scala/com/netflix/atlas/core/norm/RateValueFunctionSuite.scala index 7c621fd79..0ae80636d 100644 --- a/atlas-core/src/test/scala/com/netflix/atlas/core/norm/RateValueFunctionSuite.scala +++ b/atlas-core/src/test/scala/com/netflix/atlas/core/norm/RateValueFunctionSuite.scala @@ -26,7 +26,14 @@ class RateValueFunctionSuite extends FunSuite { listVF } - test("basic") { + test("basic, 1ms step") { + val vf = newFunction + assertEquals(vf.update(1L, 1.0), Nil) + assertEquals(vf.update(2L, 2.0), List(2L -> 1000.0)) + assertEquals(vf.update(3L, 4.0), List(3L -> 2000.0)) + } + + test("basic, 5ms step") { val vf = newFunction assertEquals(vf.update(5L, 1.0), Nil) assertEquals(vf.update(15L, 2.0), List(15L -> 100.0))