Skip to content

Commit

Permalink
Enable concurrent evaluations of matched expressions
Browse files Browse the repository at this point in the history
Do this via semaphores.
  • Loading branch information
tonyhb committed Jul 17, 2024
1 parent b064fe2 commit 5964869
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 29 deletions.
85 changes: 56 additions & 29 deletions expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/google/cel-go/common/operators"
"github.com/google/uuid"
"golang.org/x/sync/semaphore"
)

var (
Expand Down Expand Up @@ -70,11 +71,17 @@ func NewAggregateEvaluator(
parser TreeParser,
eval ExpressionEvaluator,
evalLoader EvaluableLoader,
concurrency int64,
) AggregateEvaluator {
if concurrency == 0 {
concurrency = 1
}

return &aggregator{
eval: eval,
parser: parser,
loader: evalLoader,
sem: semaphore.NewWeighted(concurrency),
engines: map[EngineType]MatchingEngine{
EngineTypeStringHash: newStringEqualityMatcher(),
EngineTypeNullMatch: newNullMatcher(),
Expand All @@ -92,6 +99,8 @@ type aggregator struct {
// engines records all engines
engines map[EngineType]MatchingEngine

sem *semaphore.Weighted

// lock prevents concurrent updates of data
lock *sync.RWMutex
// len stores the current len of aggregable expressions.
Expand Down Expand Up @@ -131,25 +140,35 @@ func (a *aggregator) Evaluate(ctx context.Context, data map[string]any) ([]Evalu
if err != nil {
return nil, 0, err
}
for _, expr := range constantEvals {
atomic.AddInt32(&matched, 1)

if expr.GetExpression() == "" {
result = append(result, expr)
continue
for _, item := range constantEvals {
if err := a.sem.Acquire(ctx, 1); err != nil {
return result, matched, err
}

// NOTE: We don't need to add lifted expression variables,
// because match.Parsed.Evaluable() returns the original expression
// string.
ok, evalerr := a.eval(ctx, expr, data)
if evalerr != nil {
err = errors.Join(err, evalerr)
continue
}
if ok {
result = append(result, expr)
}
expr := item
go func() {
defer a.sem.Release(1)

atomic.AddInt32(&matched, 1)

if expr.GetExpression() == "" {
result = append(result, expr)
return
}

// NOTE: We don't need to add lifted expression variables,
// because match.Parsed.Evaluable() returns the original expression
// string.
ok, evalerr := a.eval(ctx, expr, data)
if evalerr != nil {
err = errors.Join(err, evalerr)
return
}
if ok {
result = append(result, expr)
}
}()
}

matches, merr := a.AggregateMatch(ctx, data)
Expand Down Expand Up @@ -178,21 +197,29 @@ func (a *aggregator) Evaluate(ctx context.Context, data map[string]any) ([]Evalu
continue
}

atomic.AddInt32(&matched, 1)
// NOTE: We don't need to add lifted expression variables,
// because match.Parsed.Evaluable() returns the original expression
// string.
ok, evalerr := a.eval(ctx, match, data)
if err := a.sem.Acquire(ctx, 1); err != nil {
return result, matched, err
}

seen[match.GetID()] = struct{}{}
expr := match
go func() {

if evalerr != nil {
err = errors.Join(err, evalerr)
continue
}
if ok {
result = append(result, match)
}
atomic.AddInt32(&matched, 1)
// NOTE: We don't need to add lifted expression variables,
// because match.Parsed.Evaluable() returns the original expression
// string.
ok, evalerr := a.eval(ctx, expr, data)

seen[match.GetID()] = struct{}{}

if evalerr != nil {
err = errors.Join(err, evalerr)
return
}
if ok {
result = append(result, match)
}
}()
}

return result, matched, err
Expand Down
136 changes: 136 additions & 0 deletions vendor/golang.org/x/sync/semaphore/semaphore.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ golang.org/x/exp/slices
# golang.org/x/sync v0.6.0
## explicit; go 1.18
golang.org/x/sync/errgroup
golang.org/x/sync/semaphore
# golang.org/x/text v0.9.0
## explicit; go 1.17
golang.org/x/text/transform
Expand Down

0 comments on commit 5964869

Please sign in to comment.