Skip to content

Commit

Permalink
fix #631, use correct lists of ref vars for each expression
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Jun 10, 2016
1 parent b8493e4 commit 6fae8ad
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

- [#621](https://github.com/influxdata/kapacitor/pull/621): Fix obscure error about single vs double quotes.
- [#623](https://github.com/influxdata/kapacitor/pull/623): Fix issues with recording metadata missing data url.
- [#631](https://github.com/influxdata/kapacitor/pull/631): Fix issues with using iterative lambda expressions in an EvalNode.


## v1.0.0-beta1 [2016-06-06]
Expand Down
15 changes: 10 additions & 5 deletions eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type EvalNode struct {
e *pipeline.EvalNode
expressions []stateful.Expression
expressionsByGroup map[models.GroupID][]stateful.Expression
refVarList [][]string
scopePool stateful.ScopePool

evalErrors *expvar.Int
Expand All @@ -39,6 +40,7 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN
}
// Create stateful expressions
en.expressions = make([]stateful.Expression, len(n.Lambdas))
en.refVarList = make([][]string, len(n.Lambdas))
expressions := make([]ast.Node, len(n.Lambdas))
for i, lambda := range n.Lambdas {
expressions[i] = lambda.Expression
Expand All @@ -47,9 +49,12 @@ func newEvalNode(et *ExecutingTask, n *pipeline.EvalNode, l *log.Logger) (*EvalN
return nil, fmt.Errorf("Failed to compile %v expression: %v", i, err)
}
en.expressions[i] = statefulExpr
refVars := stateful.FindReferenceVariables(lambda.Expression)
en.refVarList[i] = refVars
}

// Create a single pool for the combination of all expressions
en.scopePool = stateful.NewScopePool(stateful.FindReferenceVariables(expressions...))

en.node.runF = en.runEval
return en, nil
}
Expand Down Expand Up @@ -113,10 +118,6 @@ func (e *EvalNode) runEval(snapshot []byte) error {
func (e *EvalNode) eval(now time.Time, group models.GroupID, fields models.Fields, tags map[string]string) (models.Fields, error) {
vars := e.scopePool.Get()
defer e.scopePool.Put(vars)
err := fillScope(vars, e.scopePool.ReferenceVariables(), now, fields, tags)
if err != nil {
return nil, err
}
expressions, ok := e.expressionsByGroup[group]
if !ok {
expressions = make([]stateful.Expression, len(e.expressions))
Expand All @@ -126,6 +127,10 @@ func (e *EvalNode) eval(now time.Time, group models.GroupID, fields models.Field
e.expressionsByGroup[group] = expressions
}
for i, expr := range expressions {
err := fillScope(vars, e.refVarList[i], now, fields, tags)
if err != nil {
return nil, err
}
v, err := expr.Eval(vars)
if err != nil {
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ func fillScope(vars *stateful.Scope, referenceVariables []string, now time.Time,
vars.Set(refVariableName, tagValue)
}
if !isFieldExists && !isTagExists {
return fmt.Errorf("no field or tag exists for %s", refVariableName)
if !vars.Has(refVariableName) {
return fmt.Errorf("no field or tag exists for %s", refVariableName)
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions integrations/data/TestStream_Eval_Keep.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
dbname
rpname
types value0=0,value1=1 0000000001
89 changes: 88 additions & 1 deletion integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ stream
testStreamerWithOutput(t, "TestStream_SimpleMR", script, 15*time.Second, er, nil, false)
}

func TestStream_EvalAllTypes(t *testing.T) {
func TestStream_Eval_AllTypes(t *testing.T) {
var script = `
stream
|from()
Expand Down Expand Up @@ -627,6 +627,93 @@ stream
testStreamerWithOutput(t, "TestStream_EvalAllTypes", script, 2*time.Second, er, nil, false)
}

func TestStream_Eval_KeepAll(t *testing.T) {
var script = `
stream
|from()
.measurement('types')
|eval(lambda: "value0" + "value1", lambda: "value0" - "value1")
.as( 'pos', 'neg')
.keep()
|httpOut('TestStream_Eval_Keep')
`
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "types",
Tags: nil,
Columns: []string{"time", "neg", "pos", "value0", "value1"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
-1.0,
1.0,
0.0,
1.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_Eval_Keep", script, 2*time.Second, er, nil, false)
}

func TestStream_Eval_KeepSome(t *testing.T) {
var script = `
stream
|from()
.measurement('types')
|eval(lambda: "value0" + "value1", lambda: "value0" - "value1")
.as( 'pos', 'neg')
.keep('value0', 'pos', 'neg')
|httpOut('TestStream_Eval_Keep')
`
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "types",
Tags: nil,
Columns: []string{"time", "neg", "pos", "value0"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
-1.0,
1.0,
0.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_Eval_Keep", script, 2*time.Second, er, nil, false)
}

func TestStream_Eval_KeepSomeWithHidden(t *testing.T) {
var script = `
stream
|from()
.measurement('types')
|eval(lambda: "value0" + "value1", lambda: "pos" - "value1")
.as( 'pos', 'zero')
.keep('value0', 'zero')
|httpOut('TestStream_Eval_Keep')
`
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "types",
Tags: nil,
Columns: []string{"time", "value0", "zero"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
0.0,
0.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_Eval_Keep", script, 2*time.Second, er, nil, false)
}

func TestStream_EvalGroups(t *testing.T) {
var script = `
stream
Expand Down
19 changes: 18 additions & 1 deletion tick/stateful/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (

type DynamicMethod func(self interface{}, args ...interface{}) (interface{}, error)

// Special marker that a value is empty
var empty = new(interface{})

// Contains a set of variables references and their values.
type Scope struct {
variables map[string]interface{}
Expand All @@ -27,9 +30,15 @@ func (s *Scope) Set(name string, value interface{}) {
s.variables[name] = value
}

// Whether a value has been set on the scope
func (s *Scope) Has(name string) bool {
v, ok := s.variables[name]
return ok && v != empty
}

// Get returns the value of 'name'.
func (s *Scope) Get(name string) (interface{}, error) {
if v, ok := s.variables[name]; ok {
if v, ok := s.variables[name]; ok && v != empty {
return v, nil
}
var possible []string
Expand All @@ -39,6 +48,14 @@ func (s *Scope) Get(name string) (interface{}, error) {
return nil, fmt.Errorf("name %q is undefined. Names in scope: %s", name, strings.Join(possible, ","))
}

// Reset all scope values to an empty state.
func (s *Scope) Reset() {
// Scopes, are intended to be reused so do not free resources
for name := range s.variables {
s.Set(name, empty)
}
}

func (s *Scope) SetDynamicMethod(name string, m DynamicMethod) {
s.dynamicMethods[name] = m
}
Expand Down
3 changes: 2 additions & 1 deletion tick/stateful/scope_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewScopePool(referenceVariables []string) ScopePool {
New: func() interface{} {
scope := NewScope()
for _, refVariable := range scopePool.referenceVariables {
scope.Set(refVariable, nil)
scope.Set(refVariable, empty)
}

return scope
Expand All @@ -49,5 +49,6 @@ func (s *scopePool) Get() *Scope {

// Put - put used scope back to the pool
func (s *scopePool) Put(scope *Scope) {
scope.Reset()
s.pool.Put(scope)
}
31 changes: 21 additions & 10 deletions tick/stateful/scope_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,34 @@ import (
func TestScopePool_Sanity(t *testing.T) {
n := stateful.NewScopePool([]string{"value"})

// first
scope := n.Get()

_, existsErr := scope.Get("value")

if existsErr != nil {
t.Errorf("First: Expected \"value\" to exist in the scope, but go an error: %v", existsErr)
if scope.Has("value") {
t.Errorf("First: expected scope to not have a value set")
}
value := 42
scope.Set("value", value)
if !scope.Has("value") {
t.Errorf("First: expected scope to have a value set")
}
if v, err := scope.Get("value"); err != nil || v != value {
t.Errorf("First: unexpected scope value got %v exp %v", v, value)
}

// second, after put
n.Put(scope)

// Scope should be empty now
scope = n.Get()
_, existsErr = scope.Get("value")

if existsErr != nil {
t.Errorf("Second: Expected \"value\" to exist in the scope, but go an error: %v", existsErr)
if scope.Has("value") {
t.Errorf("Second: expected scope to not have a value set")
}
value = 24
scope.Set("value", value)
if !scope.Has("value") {
t.Errorf("Second: expected scope to have a value set")
}
if v, err := scope.Get("value"); err != nil || v != value {
t.Errorf("Second: unexpected scope value got %v exp %v", v, value)
}
}

Expand Down

0 comments on commit 6fae8ad

Please sign in to comment.