Skip to content

Commit

Permalink
Ensure we aggregate expressions via predicate groups (#28)
Browse files Browse the repository at this point in the history
* Ensure we aggregate expressions via predicate groups

This fully enables aggregate expressions with groups of OR chains.  Note
that this does not enable aggregate expressions for nested trees.

* Simplify lookups
  • Loading branch information
tonyhb authored Nov 4, 2024
1 parent 3505ed1 commit 01208c9
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 23 deletions.
61 changes: 38 additions & 23 deletions expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ func NewAggregateEvaluator(
EngineTypeNullMatch: newNullMatcher(),
EngineTypeBTree: newNumberMatcher(),
},
lock: &sync.RWMutex{},
mixed: map[uuid.UUID]struct{}{},
lock: &sync.RWMutex{},
constants: map[uuid.UUID]struct{}{},
mixed: map[uuid.UUID]struct{}{},
}
}

Expand Down Expand Up @@ -125,7 +126,7 @@ type aggregator struct {

// constants tracks evaluable IDs that must always be evaluated, due to
// the expression containing non-aggregateable clauses.
constants []uuid.UUID
constants map[uuid.UUID]struct{}
}

// Len returns the total number of aggregateable and constantly matched expressions
Expand Down Expand Up @@ -167,10 +168,17 @@ func (a *aggregator) Evaluate(ctx context.Context, data map[string]any) ([]Evalu
)

// TODO: Concurrently match constant expressions using a semaphore for capacity.

// Match constant expressions always.
a.lock.RLock()
constantEvals, err := a.loader(ctx, a.constants...)
uuids := make([]uuid.UUID, len(a.constants))
n := 0
for id := range a.constants {
uuids[n] = id
n++
}
a.lock.RUnlock()
constantEvals, err := a.loader(ctx, uuids...)
if err != nil {
return nil, 0, err
}
Expand Down Expand Up @@ -227,7 +235,7 @@ func (a *aggregator) Evaluate(ctx context.Context, data map[string]any) ([]Evalu
}

// Load all evaluable instances directly.
uuids := make([]uuid.UUID, len(matches))
uuids = make([]uuid.UUID, len(matches))
for n, m := range matches {
uuids[n] = m.Parsed.EvaluableID
}
Expand Down Expand Up @@ -322,6 +330,7 @@ func (a *aggregator) AggregateMatch(ctx context.Context, data map[string]any) ([
// Add all found items from the engine to the above list.
for _, eval := range matched {
counts[eval.GroupID] += 1

if _, ok := found[eval.GroupID]; !ok {
found[eval.GroupID] = []*StoredExpressionPart{}
}
Expand All @@ -348,15 +357,15 @@ func (a *aggregator) AggregateMatch(ctx context.Context, data map[string]any) ([
// matching engine, so we cannot use group sizes if the expr part
// has an OR.
for _, i := range found[groupID] {
// if this is purely aggregateable, we're safe to rely on group IDs.
//
// So, we only need to care if this expression is mixed. If it's mixed,
// we can ignore group IDs for the time being.
if _, ok := a.mixed[i.Parsed.EvaluableID]; ok {
// for now, mark this as viable as it had an OR
// this wasn't fully aggregatable so evaluate it.
result = append(result, i)
}

if len(i.Parsed.Root.Ors) > 0 {
// for now, mark this as viable as it had an OR
result = append(result, i)
}
}
}

Expand All @@ -380,7 +389,7 @@ func (a *aggregator) Add(ctx context.Context, eval Evaluable) (float64, error) {
if eval.GetExpression() == "" || parsed.HasMacros {
// This is an empty expression which always matches.
a.lock.Lock()
a.constants = append(a.constants, parsed.EvaluableID)
a.constants[parsed.EvaluableID] = struct{}{}
a.lock.Unlock()
return -1, nil
}
Expand All @@ -393,7 +402,7 @@ func (a *aggregator) Add(ctx context.Context, eval Evaluable) (float64, error) {
// This is the first time we're seeing a non-aggregateable
// group, so add it to the constants list and don't do anything else.
a.lock.Lock()
a.constants = append(a.constants, parsed.EvaluableID)
a.constants[parsed.EvaluableID] = struct{}{}
a.lock.Unlock()
return -1, err
}
Expand All @@ -405,7 +414,7 @@ func (a *aggregator) Add(ctx context.Context, eval Evaluable) (float64, error) {
// This is a non-aggregateable, slow expression.
// Add it to the constants list and don't do anything else.
a.lock.Lock()
a.constants = append(a.constants, parsed.EvaluableID)
a.constants[parsed.EvaluableID] = struct{}{}
a.lock.Unlock()
return stats.Ratio(), err
}
Expand Down Expand Up @@ -475,18 +484,11 @@ func (a *aggregator) removeConstantEvaluable(_ context.Context, eval Evaluable)
defer a.lock.Unlock()

// Find the index of the evaluable in constants and yank out.
idx := -1
for n, item := range a.constants {
if item == eval.GetID() {
idx = n
break
}
}
if idx == -1 {
if _, ok := a.constants[eval.GetID()]; !ok {
return ErrEvaluableNotFound
}

a.constants = append(a.constants[:idx], a.constants[idx+1:]...)
delete(a.constants, eval.GetID())
return nil
}

Expand Down Expand Up @@ -554,9 +556,22 @@ func (a *aggregator) iterGroup(ctx context.Context, node *Node, parsed *ParsedEx
}
}

all := node.Ands

// XXX: Here we must add the OR groups to make group IDs a success.
if len(node.Ors) > 0 {
for _, n := range node.Ors {
if !n.HasPredicate() || len(n.Ors) > 0 {
// Don't handle sub-branching for now.
// TODO: Recursively iterate.
stats.AddSlow()
continue
}

all = append(all, n)
}
}

all := node.Ands
if node.Predicate != nil {
if !isAggregateable(node) {
stats.AddSlow()
Expand Down
103 changes: 103 additions & 0 deletions expr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,109 @@ func TestEvaluate_Null(t *testing.T) {
})
}

func TestMixedEngines(t *testing.T) {
ctx := context.Background()
parser := NewTreeParser(NewCachingCompiler(newEnv(), nil))

loader := newEvalLoader()

e := NewAggregateEvaluator(parser, testBoolEvaluator, loader.Load, 0).(*aggregator)

t.Run("Assert mixed engines", func(t *testing.T) {
exprs := []string{
// each id has 1, 2, 3 as a TS
`event.data.id == "a" && (event.ts == null || event.ts > 1)`,
`event.data.id == "a" && (event.ts == null || event.ts > 2)`,
`event.data.id == "a" && (event.ts == null || event.ts > 3)`,

`event.data.id == "b" && (event.ts == null || event.ts > 1)`,
`event.data.id == "b" && (event.ts == null || event.ts > 2)`,
`event.data.id == "b" && (event.ts == null || event.ts > 3)`,

`event.data.id == "c" && (event.ts == null || event.ts > 1)`,
`event.data.id == "c" && (event.ts == null || event.ts > 2)`,
`event.data.id == "c" && (event.ts == null || event.ts > 3)`,
}

for n, expr := range exprs {
eval := tex(expr, fmt.Sprintf("id-%d", n))
loader.AddEval(eval)
ratio, err := e.Add(ctx, eval)
require.NoError(t, err)
require.EqualValues(t, 1.0, ratio)
}

t.Run("Success matches", func(t *testing.T) {
// Should match no expressions - event.ts <= 1, 2, and 3.
eval, count, err := e.Evaluate(ctx, map[string]any{
"event": map[string]any{
"data": map[string]any{
"id": "a",
},
"ts": 1,
},
})
require.NoError(t, err)
require.EqualValues(t, 0, len(eval))
require.EqualValues(t, 0, count)

// Should match just the first "a" expression
eval, count, err = e.Evaluate(ctx, map[string]any{
"event": map[string]any{
"data": map[string]any{
"id": "a",
},
"ts": 2,
},
})
require.NoError(t, err)
require.EqualValues(t, 1, len(eval))
require.EqualValues(t, 1, count)
require.Equal(t, `event.data.id == "a" && (event.ts == null || event.ts > 1)`, eval[0].GetExpression())

// Should match the first and second "a" expr.
eval, count, err = e.Evaluate(ctx, map[string]any{
"event": map[string]any{
"data": map[string]any{
"id": "a",
},
"ts": 3,
},
})
require.NoError(t, err)
require.EqualValues(t, 2, len(eval))
require.EqualValues(t, 2, count)

// Null matches
eval, count, err = e.Evaluate(ctx, map[string]any{
"event": map[string]any{
"data": map[string]any{
"id": "a",
},
"ts": nil,
},
})
require.NoError(t, err)
require.EqualValues(t, 3, len(eval))
require.EqualValues(t, 3, count)
})
})

t.Run("Fail matches", func(t *testing.T) {
eval, count, err := e.Evaluate(ctx, map[string]any{
"event": map[string]any{
"data": map[string]any{
"id": "z",
},
"ts": 5,
},
})
require.NoError(t, err)
require.EqualValues(t, 0, len(eval))
require.EqualValues(t, 0, count)
})
}

// tex represents a test Evaluable expression
func tex(expr string, ids ...string) Evaluable {
return testEvaluable{
Expand Down

0 comments on commit 01208c9

Please sign in to comment.