Skip to content

Commit

Permalink
eval: require precise scoping for hi-res streams (Netflix#1624)
Browse files Browse the repository at this point in the history
For hi-res streams, require more precise scoping that allows
us to more efficiently match the data and run it only where
needed. This would ideally be applied everywhere, but for
backwards compatibility the 1m step is opted out for now.
  • Loading branch information
brharrington authored and manolama committed May 22, 2024
1 parent 2792030 commit f9bfa17
Show file tree
Hide file tree
Showing 5 changed files with 725 additions and 629 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import com.netflix.atlas.eval.stream.Evaluator.DataSources
import com.netflix.atlas.eval.util.HostRewriter
import com.typesafe.config.Config

import java.time.Duration
import scala.util.Success

private[stream] class ExprInterpreter(config: Config) {
Expand All @@ -36,17 +35,9 @@ private[stream] class ExprInterpreter(config: Config) {

private val hostRewriter = new HostRewriter(config.getConfig("atlas.eval.host-rewrite"))

private val maxStep = config.getDuration("atlas.eval.stream.limits.max-step")

def eval(uri: Uri): GraphConfig = {
val graphCfg = grapher.toGraphConfig(uri)

// Check step size is within bounds
if (graphCfg.stepSize > maxStep.toMillis) {
val step = Duration.ofMillis(graphCfg.stepSize)
throw new IllegalArgumentException(s"max allowed step size exceeded ($step > $maxStep)")
}

// Check that data expressions are supported. The streaming path doesn't support
// time shifts, filters, and integral. The filters and integral are excluded because
// they can be confusing as the time window for evaluation is not bounded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ private[stream] class StreamContext(
def maxIntermediateDatapointsPerExpression: Int =
config.getInt("limits.max-intermediate-datapoints")

// Maximum allowed step size for a stream
private val maxStep = config.getDuration("limits.max-step")

val interpreter = new ExprInterpreter(rootConfig)

def findBackendForUri(uri: Uri): Backend = {
Expand Down Expand Up @@ -150,11 +153,25 @@ private[stream] class StreamContext(
Try {
val uri = Uri(ds.uri)

// Check step size is within bounds
if (ds.step.toMillis > maxStep.toMillis) {
throw new IllegalArgumentException(
s"max allowed step size exceeded (${ds.step} > $maxStep)"
)
}

// Check that expression is parseable and perform basic static analysis of DataExprs to
// weed out expensive queries up front
val results = interpreter.eval(uri).exprs
results.foreach(_.expr.dataExprs.foreach(validateDataExpr))

// For hi-res streams, require more precise scoping that allows us to more efficiently
// match the data and run it only where needed. This would ideally be applied everywhere,
// but for backwards compatiblity the 1m step is opted out for now.
if (ds.step.toMillis < 60_000) {
results.foreach(_.expr.dataExprs.foreach(expr => restrictsNameAndApp(expr.query)))
}

// Check that there is a backend available for it
findBackendForUri(uri)

Expand All @@ -178,6 +195,27 @@ private[stream] class StreamContext(
}
}

private def restrictsNameAndApp(query: Query): Unit = {
val dnf = Query.dnfList(query)
if (!dnf.forall(isRestricted)) {
throw new IllegalArgumentException(
s"rejected expensive query [$query], hi-res streams " +
"must restrict name and nf.app with :eq or :in"
)
}
}

private def isRestricted(query: Query): Boolean = {
isRestricted(query, Set("nf.app", "nf.cluster", "nf.asg")) && isRestricted(query, Set("name"))
}

private def isRestricted(query: Query, keys: Set[String]): Boolean = query match {
case Query.And(q1, q2) => isRestricted(q1, keys) || isRestricted(q2, keys)
case Query.Equal(k, _) => keys.contains(k)
case Query.In(k, _) => keys.contains(k)
case _ => false
}

/**
* Emit an error to the sources where the number of input
* or intermediate datapoints exceed for an expression.
Expand Down
Loading

0 comments on commit f9bfa17

Please sign in to comment.