Skip to content

Commit

Permalink
Remove the JS runtime from threshold calculations
Browse files Browse the repository at this point in the history
In this commit we replace the previously existing
thresholds condition evaluation, which was depending
on Goja's Runtime, with a new pure-Go one.

Thresholds are now parsed, and evaluated in o, and
no JS rutimes are involved in the process anymore. It
is built upong the thresholds parser, and parser
combinators library introduced in previous commits.
  • Loading branch information
oleiade committed Jan 11, 2022
1 parent dd2edd7 commit 81cccd9
Show file tree
Hide file tree
Showing 3 changed files with 441 additions and 180 deletions.
29 changes: 18 additions & 11 deletions core/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func TestEngine_processSamples(t *testing.T) {
})
t.Run("submetric", func(t *testing.T) {
t.Parallel()
ths, err := stats.NewThresholds([]string{`1+1==2`})
ths, err := stats.NewThresholds([]string{`value<2`})
assert.NoError(t, err)

e, _, wait := newTestEngine(t, nil, nil, nil, lib.Options{
Expand Down Expand Up @@ -286,7 +286,10 @@ func TestEngineThresholdsWillAbort(t *testing.T) {
t.Parallel()
metric := stats.New("my_metric", stats.Gauge)

ths, err := stats.NewThresholds([]string{"1+1==3"})
// The incoming samples for the metric set it to 1.25. Considering
// the metric is of type Gauge, value > 1.25 should always fail, and
// trigger an abort.
ths, err := stats.NewThresholds([]string{"value>1.25"})
assert.NoError(t, err)
ths.Thresholds[0].AbortOnFail = true

Expand All @@ -305,7 +308,11 @@ func TestEngineAbortedByThresholds(t *testing.T) {
t.Parallel()
metric := stats.New("my_metric", stats.Gauge)

ths, err := stats.NewThresholds([]string{"1+1==3"})
// The MiniRunner sets the value of the metric to 1.25. Considering
// the metric is of type Gauge, value > 1.25 should always fail, and
// trigger an abort.
// **N.B**: a threshold returning an error, won't trigger an abort.
ths, err := stats.NewThresholds([]string{"value>1.25"})
assert.NoError(t, err)
ths.Thresholds[0].AbortOnFail = true

Expand Down Expand Up @@ -343,14 +350,14 @@ func TestEngine_processThresholds(t *testing.T) {
ths map[string][]string
abort bool
}{
"passing": {true, map[string][]string{"my_metric": {"1+1==2"}}, false},
"failing": {false, map[string][]string{"my_metric": {"1+1==3"}}, false},
"aborting": {false, map[string][]string{"my_metric": {"1+1==3"}}, true},

"submetric,match,passing": {true, map[string][]string{"my_metric{a:1}": {"1+1==2"}}, false},
"submetric,match,failing": {false, map[string][]string{"my_metric{a:1}": {"1+1==3"}}, false},
"submetric,nomatch,passing": {true, map[string][]string{"my_metric{a:2}": {"1+1==2"}}, false},
"submetric,nomatch,failing": {true, map[string][]string{"my_metric{a:2}": {"1+1==3"}}, false},
"passing": {true, map[string][]string{"my_metric": {"value<2"}}, false},
"failing": {false, map[string][]string{"my_metric": {"value>1.25"}}, false},
"aborting": {false, map[string][]string{"my_metric": {"value>1.25"}}, true},

"submetric,match,passing": {true, map[string][]string{"my_metric{a:1}": {"value<2"}}, false},
"submetric,match,failing": {false, map[string][]string{"my_metric{a:1}": {"value>1.25"}}, false},
"submetric,nomatch,passing": {true, map[string][]string{"my_metric{a:2}": {"value<2"}}, false},
"submetric,nomatch,failing": {true, map[string][]string{"my_metric{a:2}": {"value>1.25"}}, false},
}

for name, data := range testdata {
Expand Down
181 changes: 112 additions & 69 deletions stats/thresholds.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,54 +17,35 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

package stats

import (
"bytes"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/dop251/goja"

"go.k6.io/k6/lib/types"
)

const jsEnvSrc = `
function p(pct) {
return __sink__.P(pct/100.0);
};
`

var jsEnv *goja.Program

func init() {
pgm, err := goja.Compile("__env__", jsEnvSrc, true)
if err != nil {
panic(err)
}
jsEnv = pgm
}

// Threshold is a representation of a single threshold for a single metric
type Threshold struct {
// Source is the text based source of the threshold
Source string
// LastFailed is a makrer if the last testing of this threshold failed
// LastFailed is a marker if the last testing of this threshold failed
LastFailed bool
// AbortOnFail marks if a given threshold fails that the whole test should be aborted
AbortOnFail bool
// AbortGracePeriod is a the minimum amount of time a test should be running before a failing
// this threshold will abort the test
AbortGracePeriod types.NullDuration

pgm *goja.Program
rt *goja.Runtime
// parsed is the threshold expression parsed from the Source
parsed *thresholdExpression
}

func newThreshold(src string, newThreshold *goja.Runtime, abortOnFail bool, gracePeriod types.NullDuration) (*Threshold, error) {
pgm, err := goja.Compile("__threshold__", src, true)
func newThreshold(src string, abortOnFail bool, gracePeriod types.NullDuration) (*Threshold, error) {
parsedExpression, err := parseThresholdExpression(src)
if err != nil {
return nil, err
}
Expand All @@ -73,23 +54,57 @@ func newThreshold(src string, newThreshold *goja.Runtime, abortOnFail bool, grac
Source: src,
AbortOnFail: abortOnFail,
AbortGracePeriod: gracePeriod,
pgm: pgm,
rt: newThreshold,
parsed: parsedExpression,
}, nil
}

func (t Threshold) runNoTaint() (bool, error) {
v, err := t.rt.RunProgram(t.pgm)
if err != nil {
return false, err
func (t *Threshold) runNoTaint(sinks map[string]float64) (bool, error) {
// Extract the sink value for the aggregation method used in the threshold
// expression
lhs, ok := sinks[t.parsed.AggregationMethod]
if !ok {
return false, fmt.Errorf("unable to apply threshold %s over metrics; reason: "+
"no metric supporting the %s aggregation method found",
t.Source,
t.parsed.AggregationMethod)
}
return v.ToBoolean(), nil

// Apply the threshold expression operator to the left and
// right hand side values
var passes bool
switch t.parsed.Operator {
case ">":
passes = lhs > t.parsed.Value
case ">=":
passes = lhs >= t.parsed.Value
case "<=":
passes = lhs <= t.parsed.Value
case "<":
passes = lhs < t.parsed.Value
case "==", "===":
// Considering a sink always maps to float64 values,
// strictly equal is equivalent to loosely equal
passes = lhs == t.parsed.Value
case "!=":
passes = lhs != t.parsed.Value
default:
// The parseThresholdExpression function should ensure that no invalid
// operator gets through, but let's protect our future selves anyhow.
return false, fmt.Errorf("unable to apply threshold %s over metrics; "+
"reason: %s is an invalid operator",
t.Source,
t.parsed.Operator,
)
}

// Perform the actual threshold verification
return passes, nil
}

func (t *Threshold) run() (bool, error) {
b, err := t.runNoTaint()
t.LastFailed = !b
return b, err
func (t *Threshold) run(sinks map[string]float64) (bool, error) {
passes, err := t.runNoTaint(sinks)
t.LastFailed = !passes
return passes, err
}

type thresholdConfig struct {
Expand All @@ -98,11 +113,11 @@ type thresholdConfig struct {
AbortGracePeriod types.NullDuration `json:"delayAbortEval"`
}

//used internally for JSON marshalling
// used internally for JSON marshalling
type rawThresholdConfig thresholdConfig

func (tc *thresholdConfig) UnmarshalJSON(data []byte) error {
//shortcircuit unmarshalling for simple string format
// shortcircuit unmarshalling for simple string format
if err := json.Unmarshal(data, &tc.Threshold); err == nil {
return nil
}
Expand All @@ -122,9 +137,9 @@ func (tc thresholdConfig) MarshalJSON() ([]byte, error) {

// Thresholds is the combination of all Thresholds for a given metric
type Thresholds struct {
Runtime *goja.Runtime
Thresholds []*Threshold
Abort bool
sinked map[string]float64
}

// NewThresholds returns Thresholds objects representing the provided source strings
Expand All @@ -138,60 +153,88 @@ func NewThresholds(sources []string) (Thresholds, error) {
}

func newThresholdsWithConfig(configs []thresholdConfig) (Thresholds, error) {
rt := goja.New()
if _, err := rt.RunProgram(jsEnv); err != nil {
return Thresholds{}, fmt.Errorf("threshold builtin error: %w", err)
}
thresholds := make([]*Threshold, len(configs))
sinked := make(map[string]float64)

ts := make([]*Threshold, len(configs))
for i, config := range configs {
t, err := newThreshold(config.Threshold, rt, config.AbortOnFail, config.AbortGracePeriod)
t, err := newThreshold(config.Threshold, config.AbortOnFail, config.AbortGracePeriod)
if err != nil {
return Thresholds{}, fmt.Errorf("threshold %d error: %w", i, err)
}
ts[i] = t
thresholds[i] = t
}

return Thresholds{rt, ts, false}, nil
return Thresholds{thresholds, false, sinked}, nil
}

func (ts *Thresholds) updateVM(sink Sink, t time.Duration) error {
ts.Runtime.Set("__sink__", sink)
f := sink.Format(t)
for k, v := range f {
ts.Runtime.Set(k, v)
}
return nil
}

func (ts *Thresholds) runAll(t time.Duration) (bool, error) {
succ := true
for i, th := range ts.Thresholds {
b, err := th.run()
func (ts *Thresholds) runAll(timeSpentInTest time.Duration) (bool, error) {
succeeded := true
for i, threshold := range ts.Thresholds {
b, err := threshold.run(ts.sinked)
if err != nil {
return false, fmt.Errorf("threshold %d run error: %w", i, err)
}

if !b {
succ = false
succeeded = false

if ts.Abort || !th.AbortOnFail {
if ts.Abort || !threshold.AbortOnFail {
continue
}

ts.Abort = !th.AbortGracePeriod.Valid ||
th.AbortGracePeriod.Duration < types.Duration(t)
ts.Abort = !threshold.AbortGracePeriod.Valid ||
threshold.AbortGracePeriod.Duration < types.Duration(timeSpentInTest)
}
}
return succ, nil

return succeeded, nil
}

// Run processes all the thresholds with the provided Sink at the provided time and returns if any
// of them fails
func (ts *Thresholds) Run(sink Sink, t time.Duration) (bool, error) {
if err := ts.updateVM(sink, t); err != nil {
return false, err
func (ts *Thresholds) Run(sink Sink, duration time.Duration) (bool, error) {
// Initialize the sinks store
ts.sinked = make(map[string]float64)

// FIXME: Remove this comment as soon as the stats.Sink does not expose Format anymore.
//
// As of December 2021, this block reproduces the behavior of the
// stats.Sink.Format behavior. As we intend to try to get away from it,
// we instead implement the behavior directly here.
//
// For more details, see https://github.com/grafana/k6/issues/2320
switch sinkImpl := sink.(type) {
case *CounterSink:
ts.sinked["count"] = sinkImpl.Value
ts.sinked["rate"] = sinkImpl.Value / (float64(duration) / float64(time.Second))
case *GaugeSink:
ts.sinked["value"] = sinkImpl.Value
case *TrendSink:
ts.sinked["min"] = sinkImpl.Min
ts.sinked["max"] = sinkImpl.Max
ts.sinked["avg"] = sinkImpl.Avg
ts.sinked["med"] = sinkImpl.Med

// Parse the percentile thresholds and insert them in
// the sinks mapping.
for _, threshold := range ts.Thresholds {
if !strings.HasPrefix(threshold.parsed.AggregationMethod, "p(") {
continue
}

ts.sinked[threshold.parsed.AggregationMethod] = sinkImpl.P(threshold.parsed.AggregationValue.Float64 / 100)
}
case *RateSink:
ts.sinked["rate"] = float64(sinkImpl.Trues) / float64(sinkImpl.Total)
case DummySink:
for k, v := range sinkImpl {
ts.sinked[k] = v
}
default:
return false, fmt.Errorf("unable to run Thresholds; reason: unknown sink type")
}
return ts.runAll(t)

return ts.runAll(duration)
}

// UnmarshalJSON is implementation of json.Unmarshaler
Expand Down
Loading

0 comments on commit 81cccd9

Please sign in to comment.