Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eval: require precise scoping for hi-res streams #1624

Merged
merged 5 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import com.netflix.atlas.eval.stream.Evaluator.DataSources
import com.netflix.atlas.eval.util.HostRewriter
import com.typesafe.config.Config

import java.time.Duration
import scala.util.Success

private[stream] class ExprInterpreter(config: Config) {
Expand All @@ -36,17 +35,9 @@ private[stream] class ExprInterpreter(config: Config) {

private val hostRewriter = new HostRewriter(config.getConfig("atlas.eval.host-rewrite"))

private val maxStep = config.getDuration("atlas.eval.stream.limits.max-step")

def eval(uri: Uri): GraphConfig = {
val graphCfg = grapher.toGraphConfig(uri)

// Check step size is within bounds
if (graphCfg.stepSize > maxStep.toMillis) {
val step = Duration.ofMillis(graphCfg.stepSize)
throw new IllegalArgumentException(s"max allowed step size exceeded ($step > $maxStep)")
}

// Check that data expressions are supported. The streaming path doesn't support
// time shifts, filters, and integral. The filters and integral are excluded because
// they can be confusing as the time window for evaluation is not bounded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ private[stream] class StreamContext(
def maxIntermediateDatapointsPerExpression: Int =
config.getInt("limits.max-intermediate-datapoints")

// Maximum allowed step size for a stream
private val maxStep = config.getDuration("limits.max-step")

val interpreter = new ExprInterpreter(rootConfig)

def findBackendForUri(uri: Uri): Backend = {
Expand Down Expand Up @@ -150,11 +153,25 @@ private[stream] class StreamContext(
Try {
val uri = Uri(ds.uri)

// Check step size is within bounds
if (ds.step.toMillis > maxStep.toMillis) {
throw new IllegalArgumentException(
s"max allowed step size exceeded (${ds.step} > $maxStep)"
)
}

// Check that expression is parseable and perform basic static analysis of DataExprs to
// weed out expensive queries up front
val results = interpreter.eval(uri).exprs
results.foreach(_.expr.dataExprs.foreach(validateDataExpr))

// For hi-res streams, require more precise scoping that allows us to more efficiently
// match the data and run it only where needed. This would ideally be applied everywhere,
// but for backwards compatiblity the 1m step is opted out for now.
if (ds.step.toMillis < 60_000) {
results.foreach(_.expr.dataExprs.foreach(expr => restrictsNameAndApp(expr.query)))
}

// Check that there is a backend available for it
findBackendForUri(uri)

Expand All @@ -178,6 +195,27 @@ private[stream] class StreamContext(
}
}

private def restrictsNameAndApp(query: Query): Unit = {
val dnf = Query.dnfList(query)
if (!dnf.forall(isRestricted)) {
throw new IllegalArgumentException(
s"rejected expensive query [$query], hi-res streams " +
"must restrict name and nf.app with :eq or :in"
)
}
}

private def isRestricted(query: Query): Boolean = {
isRestricted(query, Set("nf.app", "nf.cluster", "nf.asg")) && isRestricted(query, Set("name"))
}

private def isRestricted(query: Query, keys: Set[String]): Boolean = query match {
case Query.And(q1, q2) => isRestricted(q1, keys) || isRestricted(q2, keys)
case Query.Equal(k, _) => keys.contains(k)
case Query.In(k, _) => keys.contains(k)
case _ => false
}

/**
* Emit an error to the sources where the number of input
* or intermediate datapoints exceed for an expression.
Expand Down
Loading
Loading