-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Min and max numerical aggregates #3078
Changes from all commits
35f7cef
621e2c3
d3ada97
27f2e21
63f062c
58a5093
87ecfd4
e779d9f
caa0862
9a10cb1
4f8b79c
3cff29d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
// Copyright 2024 Democratized Data Foundation | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.txt. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0, included in the file | ||
// licenses/APL.txt. | ||
|
||
package planner | ||
|
||
import ( | ||
"github.com/sourcenetwork/defradb/internal/core" | ||
"github.com/sourcenetwork/defradb/internal/planner/mapper" | ||
|
||
"github.com/sourcenetwork/immutable" | ||
"github.com/sourcenetwork/immutable/enumerable" | ||
) | ||
|
||
type number interface { | ||
int64 | float64 | ||
} | ||
|
||
func lessN[T number](a T, b T) bool { | ||
return a < b | ||
} | ||
|
||
func lessO[T number](a immutable.Option[T], b immutable.Option[T]) bool { | ||
if !a.HasValue() { | ||
return true | ||
} | ||
|
||
if !b.HasValue() { | ||
return false | ||
} | ||
|
||
return a.Value() < b.Value() | ||
} | ||
|
||
// inverse returns the logical inverse of the given sort func. | ||
func inverse[T any](original func(T, T) bool) func(T, T) bool { | ||
return func(t1, t2 T) bool { | ||
return !original(t1, t2) | ||
} | ||
} | ||
|
||
// reduces the documents in a slice, skipping over hidden items (a grouping mechanic). | ||
// | ||
// Docs should be reduced with this function to avoid applying offsets twice (once in the | ||
// select, then once here). | ||
func reduceDocs[T any]( | ||
docs []core.Doc, | ||
initialValue T, | ||
reduceFunc func(core.Doc, T) T, | ||
) T { | ||
var value = initialValue | ||
for _, doc := range docs { | ||
if !doc.Hidden { | ||
value = reduceFunc(doc, value) | ||
} | ||
} | ||
return value | ||
} | ||
|
||
func reduceItems[T any, V any]( | ||
source []T, | ||
aggregateTarget *mapper.AggregateTarget, | ||
less func(T, T) bool, | ||
initialValue V, | ||
reduceFunc func(T, V) V, | ||
) (V, error) { | ||
items := enumerable.New(source) | ||
if aggregateTarget.Filter != nil { | ||
items = enumerable.Where(items, func(item T) (bool, error) { | ||
return mapper.RunFilter(item, aggregateTarget.Filter) | ||
}) | ||
} | ||
|
||
if aggregateTarget.OrderBy != nil && len(aggregateTarget.OrderBy.Conditions) > 0 { | ||
if aggregateTarget.OrderBy.Conditions[0].Direction == mapper.ASC { | ||
items = enumerable.Sort(items, less, len(source)) | ||
} else { | ||
items = enumerable.Sort(items, inverse(less), len(source)) | ||
} | ||
} | ||
|
||
if aggregateTarget.Limit != nil { | ||
items = enumerable.Skip(items, aggregateTarget.Limit.Offset) | ||
items = enumerable.Take(items, aggregateTarget.Limit.Limit) | ||
} | ||
|
||
var value = initialValue | ||
err := enumerable.ForEach(items, func(item T) { | ||
value = reduceFunc(item, value) | ||
}) | ||
return value, err | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,255 @@ | ||
// Copyright 2024 Democratized Data Foundation | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.txt. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0, included in the file | ||
// licenses/APL.txt. | ||
|
||
package planner | ||
|
||
import ( | ||
"math/big" | ||
|
||
"github.com/sourcenetwork/immutable" | ||
|
||
"github.com/sourcenetwork/defradb/client/request" | ||
"github.com/sourcenetwork/defradb/internal/core" | ||
"github.com/sourcenetwork/defradb/internal/planner/mapper" | ||
) | ||
|
||
type maxNode struct { | ||
documentIterator | ||
docMapper | ||
|
||
p *Planner | ||
plan planNode | ||
parent *mapper.Select | ||
|
||
// virtualFieldIndex is the index of the field | ||
// that contains the result of the aggregate. | ||
virtualFieldIndex int | ||
aggregateMapping []mapper.AggregateTarget | ||
|
||
execInfo maxExecInfo | ||
} | ||
|
||
type maxExecInfo struct { | ||
// Total number of times maxNode was executed. | ||
iterations uint64 | ||
} | ||
|
||
func (p *Planner) Max( | ||
field *mapper.Aggregate, | ||
parent *mapper.Select, | ||
) (*maxNode, error) { | ||
return &maxNode{ | ||
p: p, | ||
parent: parent, | ||
aggregateMapping: field.AggregateTargets, | ||
virtualFieldIndex: field.Index, | ||
docMapper: docMapper{field.DocumentMapping}, | ||
}, nil | ||
} | ||
|
||
func (n *maxNode) Kind() string { return "maxNode" } | ||
func (n *maxNode) Init() error { return n.plan.Init() } | ||
func (n *maxNode) Start() error { return n.plan.Start() } | ||
func (n *maxNode) Spans(spans core.Spans) { n.plan.Spans(spans) } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nitpick: This should probably be covered by tests, I think having a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added a test that should cover it
shahzadlone marked this conversation as resolved.
Show resolved
Hide resolved
|
||
func (n *maxNode) Close() error { return n.plan.Close() } | ||
func (n *maxNode) Source() planNode { return n.plan } | ||
func (n *maxNode) SetPlan(p planNode) { n.plan = p } | ||
|
||
func (n *maxNode) simpleExplain() (map[string]any, error) { | ||
sourceExplanations := make([]map[string]any, len(n.aggregateMapping)) | ||
|
||
for i, source := range n.aggregateMapping { | ||
simpleExplainMap := map[string]any{} | ||
|
||
// Add the filter attribute if it exists. | ||
if source.Filter == nil { | ||
simpleExplainMap[filterLabel] = nil | ||
} else { | ||
// get the target aggregate document mapping. Since the filters | ||
// are relative to the target aggregate collection (and doc mapper). | ||
var targetMap *core.DocumentMapping | ||
if source.Index < len(n.documentMapping.ChildMappings) && | ||
n.documentMapping.ChildMappings[source.Index] != nil { | ||
targetMap = n.documentMapping.ChildMappings[source.Index] | ||
} else { | ||
targetMap = n.documentMapping | ||
} | ||
simpleExplainMap[filterLabel] = source.Filter.ToMap(targetMap) | ||
shahzadlone marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// Add the main field name. | ||
simpleExplainMap[fieldNameLabel] = source.Field.Name | ||
|
||
// Add the child field name if it exists. | ||
if source.ChildTarget.HasValue { | ||
simpleExplainMap[childFieldNameLabel] = source.ChildTarget.Name | ||
shahzadlone marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} else { | ||
simpleExplainMap[childFieldNameLabel] = nil | ||
} | ||
|
||
sourceExplanations[i] = simpleExplainMap | ||
} | ||
|
||
return map[string]any{ | ||
sourcesLabel: sourceExplanations, | ||
}, nil | ||
} | ||
|
||
// Explain method returns a map containing all attributes of this node that | ||
// are to be explained, subscribes / opts-in this node to be an explainablePlanNode. | ||
func (n *maxNode) Explain(explainType request.ExplainType) (map[string]any, error) { | ||
switch explainType { | ||
case request.SimpleExplain: | ||
return n.simpleExplain() | ||
|
||
case request.ExecuteExplain: | ||
return map[string]any{ | ||
"iterations": n.execInfo.iterations, | ||
}, nil | ||
|
||
default: | ||
return nil, ErrUnknownExplainRequestType | ||
} | ||
} | ||
|
||
func (n *maxNode) Next() (bool, error) { | ||
n.execInfo.iterations++ | ||
|
||
hasNext, err := n.plan.Next() | ||
if err != nil || !hasNext { | ||
return hasNext, err | ||
} | ||
n.currentValue = n.plan.Value() | ||
|
||
var max *big.Float | ||
isFloat := false | ||
|
||
for _, source := range n.aggregateMapping { | ||
child := n.currentValue.Fields[source.Index] | ||
var collectionMax *big.Float | ||
var err error | ||
switch childCollection := child.(type) { | ||
case []core.Doc: | ||
collectionMax = reduceDocs( | ||
childCollection, | ||
nil, | ||
func(childItem core.Doc, value *big.Float) *big.Float { | ||
childProperty := childItem.Fields[source.ChildTarget.Index] | ||
res := &big.Float{} | ||
switch v := childProperty.(type) { | ||
case int: | ||
res = res.SetInt64(int64(v)) | ||
case int64: | ||
res = res.SetInt64(v) | ||
case uint64: | ||
res = res.SetUint64(v) | ||
case float64: | ||
res = res.SetFloat64(v) | ||
default: | ||
return nil | ||
} | ||
if value == nil || res.Cmp(value) > 0 { | ||
return res | ||
} | ||
return value | ||
}, | ||
) | ||
|
||
case []int64: | ||
collectionMax, err = reduceItems( | ||
childCollection, | ||
&source, | ||
lessN[int64], | ||
nil, | ||
func(childItem int64, value *big.Float) *big.Float { | ||
res := (&big.Float{}).SetInt64(childItem) | ||
if value == nil || res.Cmp(value) > 0 { | ||
return res | ||
} | ||
return value | ||
}, | ||
) | ||
|
||
case []immutable.Option[int64]: | ||
collectionMax, err = reduceItems( | ||
childCollection, | ||
&source, | ||
lessO[int64], | ||
nil, | ||
func(childItem immutable.Option[int64], value *big.Float) *big.Float { | ||
if !childItem.HasValue() { | ||
return value | ||
} | ||
res := (&big.Float{}).SetInt64(childItem.Value()) | ||
if value == nil || res.Cmp(value) > 0 { | ||
return res | ||
} | ||
return value | ||
}, | ||
) | ||
|
||
case []float64: | ||
collectionMax, err = reduceItems( | ||
childCollection, | ||
&source, | ||
lessN[float64], | ||
nil, | ||
func(childItem float64, value *big.Float) *big.Float { | ||
res := big.NewFloat(childItem) | ||
if value == nil || res.Cmp(value) > 0 { | ||
return res | ||
} | ||
return value | ||
}, | ||
) | ||
|
||
case []immutable.Option[float64]: | ||
collectionMax, err = reduceItems( | ||
childCollection, | ||
&source, | ||
lessO[float64], | ||
nil, | ||
func(childItem immutable.Option[float64], value *big.Float) *big.Float { | ||
if !childItem.HasValue() { | ||
return value | ||
} | ||
res := big.NewFloat(childItem.Value()) | ||
if value == nil || res.Cmp(value) > 0 { | ||
return res | ||
} | ||
return value | ||
}, | ||
) | ||
} | ||
if err != nil { | ||
return false, err | ||
} | ||
if collectionMax == nil || (max != nil && collectionMax.Cmp(max) <= 0) { | ||
continue | ||
} | ||
isTargetFloat, err := n.p.isValueFloat(n.parent, &source) | ||
if err != nil { | ||
return false, err | ||
} | ||
isFloat = isTargetFloat | ||
max = collectionMax | ||
} | ||
|
||
if max == nil { | ||
n.currentValue.Fields[n.virtualFieldIndex] = nil | ||
} else if isFloat { | ||
res, _ := max.Float64() | ||
n.currentValue.Fields[n.virtualFieldIndex] = res | ||
} else { | ||
res, _ := max.Int64() | ||
n.currentValue.Fields[n.virtualFieldIndex] = res | ||
} | ||
return true, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: I'm sorry about the bother, because you are copying my undocumented work in the other aggregates, but I think it would be worth a line of documentation on
virtualFieldIndex
on these nodes, its not very clear.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done