Skip to content

Commit

Permalink
Aggregate neq (#30)
Browse files Browse the repository at this point in the history
* wip

* Handle not equals in fast aggregation

* Remove denies from expr

* Lints

* comments

* remove unused atomic

* fix allocation

* fix locks

* Remove predicate - unnecessary memory

* Lower count for race detector

* Update engine_stringmap.go

Co-authored-by: Bruno Scheufler <git@brunoscheufler.com>

* Fix tests

* update all tests for !=

* Add removal of neq aggregates from stringmap

* remove prints

---------

Co-authored-by: Bruno Scheufler <git@brunoscheufler.com>
  • Loading branch information
tonyhb and BrunoScheufler authored Nov 6, 2024
1 parent 774832e commit b8451aa
Show file tree
Hide file tree
Showing 8 changed files with 522 additions and 167 deletions.
6 changes: 4 additions & 2 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ const (
type MatchingEngine interface {
// Type returns the EngineType
Type() EngineType

// Match takes an input event, containing key:value pairs of data, and
// matches the given data to any ExpressionParts stored in the engine.
//
// Each implementation of the engine may differ on granularity of
// expression parts received. Some may return false positives, but
// each MatchingEngine should NEVER omit ExpressionParts which match
// the given input.
Match(ctx context.Context, input map[string]any) ([]*StoredExpressionPart, error)
Match(ctx context.Context, input map[string]any) (matched []*StoredExpressionPart, err error)

// Add adds a new expression part to the matching engine for future matches.
Add(ctx context.Context, p ExpressionPart) error
// Remove removes an expression part from the matching engine, ensuring that the
Expand All @@ -44,7 +46,7 @@ type MatchingEngine interface {
// ignoring the variable name. Note that each MatchingEngine should NEVER
// omit ExpressionParts which match the given input; false positives are okay,
// but not returning valid matches must be impossible.
Search(ctx context.Context, variable string, input any) []*StoredExpressionPart
Search(ctx context.Context, variable string, input any) (matched []*StoredExpressionPart)
}

// Leaf represents the leaf within a tree. This stores all expressions
Expand Down
15 changes: 9 additions & 6 deletions engine_null.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ func (n *nullLookup) Type() EngineType {
return EngineTypeNullMatch
}

func (n *nullLookup) Match(ctx context.Context, data map[string]any) ([]*StoredExpressionPart, error) {

func (n *nullLookup) Match(ctx context.Context, data map[string]any) (matched []*StoredExpressionPart, err error) {
l := &sync.Mutex{}
found := []*StoredExpressionPart{}
matched = []*StoredExpressionPart{}
eg := errgroup.Group{}

for item := range n.paths {
Expand All @@ -55,17 +54,21 @@ func (n *nullLookup) Match(ctx context.Context, data map[string]any) ([]*StoredE

// This matches null, nil (as null), and any non-null items.
l.Lock()
found = append(found, n.Search(ctx, path, res[0])...)

// XXX: This engine hasn't been updated with denied items for !=. It needs consideration
// in how to handle these cases appropriately.
found := n.Search(ctx, path, res[0])
matched = append(matched, found...)
l.Unlock()

return nil
})
}

return found, eg.Wait()
return matched, eg.Wait()
}

func (n *nullLookup) Search(ctx context.Context, variable string, input any) []*StoredExpressionPart {
func (n *nullLookup) Search(ctx context.Context, variable string, input any) (matched []*StoredExpressionPart) {
if input == nil {
// The input data is null, so the only items that can match are equality
// comparisons to null.
Expand Down
29 changes: 14 additions & 15 deletions engine_number.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ func (n numbers) Type() EngineType {
return EngineTypeBTree
}

func (n *numbers) Match(ctx context.Context, input map[string]any) ([]*StoredExpressionPart, error) {
func (n *numbers) Match(ctx context.Context, input map[string]any) (matched []*StoredExpressionPart, err error) {
l := &sync.Mutex{}
found := []*StoredExpressionPart{}
matched = []*StoredExpressionPart{}
eg := errgroup.Group{}

for item := range n.paths {
Expand Down Expand Up @@ -72,28 +72,27 @@ func (n *numbers) Match(ctx context.Context, input map[string]any) ([]*StoredExp

// This matches null, nil (as null), and any non-null items.
l.Lock()
found = append(found, n.Search(ctx, path, val)...)
found := n.Search(ctx, path, val)
matched = append(matched, found...)
l.Unlock()

return nil
})
}

err := eg.Wait()

return found, err
return matched, eg.Wait()
}

// Search returns all ExpressionParts which match the given input, ignoring the variable name
// entirely.
func (n *numbers) Search(ctx context.Context, variable string, input any) []*StoredExpressionPart {
func (n *numbers) Search(ctx context.Context, variable string, input any) (matched []*StoredExpressionPart) {
n.lock.RLock()
defer n.lock.RUnlock()

var (
val float64
found = []*StoredExpressionPart{}
)
// initialize matched
matched = []*StoredExpressionPart{}

var val float64

switch v := input.(type) {
case int:
Expand All @@ -114,7 +113,7 @@ func (n *numbers) Search(ctx context.Context, variable string, input any) []*Sto
continue
}
// This is a candidatre.
found = append(found, m)
matched = append(matched, m)
}
}

Expand All @@ -130,7 +129,7 @@ func (n *numbers) Search(ctx context.Context, variable string, input any) []*Sto
continue
}
// This is a candidatre.
found = append(found, m)
matched = append(matched, m)
}
return true
})
Expand All @@ -147,12 +146,12 @@ func (n *numbers) Search(ctx context.Context, variable string, input any) []*Sto
continue
}
// This is a candidatre.
found = append(found, m)
matched = append(matched, m)
}
return true
})

return found
return matched
}

func (n *numbers) Add(ctx context.Context, p ExpressionPart) error {
Expand Down
Loading

0 comments on commit b8451aa

Please sign in to comment.