-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(query/stdlib): enable read group min max #19158
Conversation
cfde09c
to
df6b766
Compare
storage/flux/table.gen.go.tmpl
Outdated
if err != nil { | ||
panic(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that I should panic here, but I'm not sure how to handle this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. It seems to me that if it's not possible to guarantee that only supported aggregate types will be passed down here then advance() bool
needs to be refactored to advance() (bool, error)
.
It looks above that t.do
, which takes the advance
function returns an error, so perhaps this would be a relatively small refactor?
b0a8f4f
to
5c6a1bd
Compare
storage/flux/table.gen.go.tmpl
Outdated
length := arr.Size() | ||
for i := 0; i < length; i++ { | ||
t, v := groupBy(arr.Timestamps, arr.Values) | ||
timestamps = append(timestamps, t) | ||
values = append(values, v) | ||
} | ||
|
||
{{if and (ne .Name "Boolean") (ne .Name "String")}} | ||
timestamp, next = groupBy(timestamps, values) | ||
//todo(faith): trying to aggregate values between array cursors if the subsequent | ||
// value is still part of the same group. Unsure of the logic here. | ||
if len != 0 { | ||
value += next | ||
} else { | ||
value = next | ||
} | ||
{{end}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm missing something in my logic here to preserve values between array cursors. I feel like the overall logic is pretty close, but there's something off here.
storage/flux/table.gen.go.tmpl
Outdated
func groupByCount{{.Name}}(timestamps []int64, values []{{.Type}})(int64, {{.Type}}) { | ||
//todo(faith): I need a way to keep track of the values between array cursors | ||
return math.MaxInt64, values[0] | ||
} | ||
{{end}} | ||
|
||
{{if and (ne .Name "Boolean") (ne .Name "String")}} | ||
func groupBySum{{.Name}}(timestamps []int64, values []{{.Type}})(int64, {{.Type}}) { | ||
//todo(faith): I need a way to keep track of the values between array cursors | ||
return math.MaxInt64, values[0] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are the affected cases that are causing the remaining tests to fail.
5c6a1bd
to
4ea9b11
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great to me @fchikwekwe. I haven't been keeping track of the pushdown work but I found this PR easy to digest :-)
Only requesting changes because I don't think we can let storage panic if we are sending down unsupported aggregates. I'm guessing a slight refactor on advance
would fix that?
The performance stuff on sum aggregates is probably not important but I thought it was worth highlighting anyway.
storage/flux/table.gen.go
Outdated
} | ||
} | ||
|
||
func groupByMinFloat(timestamps []int64, values []float64) (int64, float64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm interested in understanding if we can in some cases have an invariant that timestamps
is sorted? We always return data out of the storage engine in timestamp order, because that's how we store it. If we have done any merging of series then data could be out of order.
Anyway, not sure if this is something worth thinking about but I just wondered is all :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this specific circumstance I think the timestamps would generally not be sorted. The reason is these methods are all aggregation methods of series that have already been aggregated so we're dealing with aggregating the series together here. So for min
we can have the first series select a point at 40 seconds and then in the second series at 10 seconds.
storage/flux/table.gen.go
Outdated
// their final result does not contain _time, so this timestamp value can be anything | ||
// and it won't matter. | ||
|
||
func groupBySumFloat(timestamps []int64, values []float64) (int64, float64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically this is something that would benefit from going as fast as possible, and it shouldn't be too complicated to improve performance for a nice tight loop like this.
One thing that might be worth trying here is to unroll the loop. Typically I have found with right loops like this you can usually improve performance by 30-40% depending on the situation.
I haven't tested this but I would try something like:
// we need to know how many values might not fit into blocks of four.
// we can sum them up afterwards.
remainder := len(values) % 4
// we will store the intermediate results in this array.
var sums [4]float64
// sum up any extra values...
for _, v := range a[len(a)-remainder:] {
sums[0] += v
}
// iterate over four values at a time. This can improve overall perf
// because of a bunch of reasons (bounds checking, pipelining, better cache, fewers instructions).
for i := 0; i < len(a)-remainder; i += 4 {
sums[0] += a[i+3] // as a habit I put the biggest index first incase it helps reduce bounds checking...
sums[1] += a[i+2]
sums[2] += a[i+1]
sums[3] += a[i]
}
// just need to sum up the intermediate results
return math.MaxInt64, sums[0] + sums[1] + sums[2] + sums[3]
Of course, I'm speculating and benchmarks are the source of truth 😄. But I would expect something like the above to be quite a bit faster for larger groups of values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a cool idea and I'd be interested in benchmarking this as part of a future PR.
storage/flux/table.gen.go.tmpl
Outdated
{{end}} | ||
|
||
{{if and (ne .Name "Boolean") (ne .Name "String")}} | ||
func groupBySum{{.Name}}(timestamps []int64, values []{{.Type}})(int64, {{.Type}}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since timestamps
is not used it can be omitted for clarity.
func groupBySum{{.Name}}(timestamps []int64, values []{{.Type}})(int64, {{.Type}}) { | |
func groupBySum{{.Name}}(_ []int64, values []{{.Type}})(int64, {{.Type}}) { |
storage/flux/table.gen.go.tmpl
Outdated
if err != nil { | ||
panic(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. It seems to me that if it's not possible to guarantee that only supported aggregate types will be passed down here then advance() bool
needs to be refactored to advance() (bool, error)
.
It looks above that t.do
, which takes the advance
function returns an error, so perhaps this would be a relatively small refactor?
@@ -2564,6 +2564,115 @@ func TestStorageReader_EmptyTableNoEmptyWindows(t *testing.T) { | |||
} | |||
} | |||
|
|||
func TestStorageReader_ReadGroup(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 nice tests here
3583c14
to
2bdfe43
Compare
@e-dard I think I addressed your feedback and I think it passes tests now. Can you re-review? |
@jsternberg howdy. Is |
I think it's a caching issue. It doesn't panic anymore. Can you refresh to try and confirm that? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a question about the extra plan node we're adding in the planner rule. I don't think we need them anymore. My guess is that you can remove the extra node, and update the planner rule test, and the other tests will continue to pass and you'll be good to go.
I also had a suggestion about the templated code in storage/flux
but that's more subjective.
SelectorConfig: execute.SelectorConfig{ | ||
Column: execute.DefaultValueColLabel, | ||
}, | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that @yzhang1991 recently completed work so that storage will more completely compute aggregate values. It used to be that it would produce an aggregate value per input series, rather than per output group.
That was why this extra min
node was needed, but I think that it is not needed anymore? Or if it is still needed, we should be doing the re-aggregation in storage in the same place we do it for sum
, count
, etc, rather than here.
Note that for sum
, count
, first
and last
above we just return the single source node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try this out.
storage/flux/table.gen.go.tmpl
Outdated
return groupByLast{{.Name}}, nil | ||
case datatypes.AggregateTypeCount: | ||
{{if eq .Name "Integer"}} | ||
return groupByCount{{.Name}}, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a little confusing, since groupByCount
actually does a sum
, as it should, but I had to double-check to make sure. Maybe just put a comment here explaining this?
storage/flux/table.gen.go.tmpl
Outdated
// their final result does not contain _time, so this timestamp value can be anything | ||
// and it won't matter. | ||
{{if eq .Name "Integer"}} | ||
func groupByCount{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this? Can we just call groupBySum
directly everywhere this would be called? It might make this a little easier to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's needed, but I didn't want it to be confusing. Do you think it would be better named as aggregateCountGroups
? That might represent more of what this is. It is an aggregation of existing groups rather than the grouping themselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I like that name just fine.
Enables the mix and max aggregates for the ReadGroupAggregte pushdown behind a feature flag. Co-authored-by: Jonathan A. Sternberg <jonathan@influxdata.com>
2bdfe43
to
647f315
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
This PR will enable readGroup min/max in OSS behind a feature flag.
To do: