Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce query planning allocations #7385

Merged
merged 2 commits into from
Oct 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
- [#2792](https://github.com/influxdata/influxdb/issues/2792): Exceeding max retention policy duration gives incorrect error message
- [#7226](https://github.com/influxdata/influxdb/issues/7226): Fix database locked up when deleting shards
- [#7382](https://github.com/influxdata/influxdb/issues/7382): Shard stats include wal path tag so disk bytes make more sense.
- [#7385](https://github.com/influxdata/influxdb/pull/7385): Reduce query planning allocations

## v1.0.1 [2016-09-26]

Expand Down
10 changes: 6 additions & 4 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -2001,17 +2001,19 @@ func walkRefs(exp Expr) []VarRef {
case *VarRef:
return []VarRef{*expr}
case *Call:
var a []VarRef
a := make([]VarRef, 0, len(expr.Args))
for _, expr := range expr.Args {
if ref, ok := expr.(*VarRef); ok {
a = append(a, *ref)
}
}
return a
case *BinaryExpr:
var ret []VarRef
ret = append(ret, walkRefs(expr.LHS)...)
ret = append(ret, walkRefs(expr.RHS)...)
lhs := walkRefs(expr.LHS)
rhs := walkRefs(expr.RHS)
ret := make([]VarRef, 0, len(lhs)+len(rhs))
ret = append(ret, lhs...)
ret = append(ret, rhs...)
return ret
case *ParenExpr:
return walkRefs(expr.Expr)
Expand Down
47 changes: 23 additions & 24 deletions tsdb/meta.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tsdb

import (
"bytes"
"fmt"
"regexp"
"sort"
Expand Down Expand Up @@ -760,18 +761,18 @@ func (m *Measurement) filters(condition influxql.Expr) (map[uint64]influxql.Expr
// TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it.
func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
m.mu.RLock()
defer m.mu.RUnlock()

// get the unique set of series ids and the filters that should be applied to each
filters, err := m.filters(condition)
if err != nil {
m.mu.RUnlock()
return nil, err
}

// For every series, get the tag values for the requested tag keys i.e. dimensions. This is the
// TagSet for that series. Series with the same TagSet are then grouped together, because for the
// purpose of GROUP BY they are part of the same composite series.
tagSets := make(map[string]*influxql.TagSet)
tagSets := make(map[string]*influxql.TagSet, 64)
for id, filter := range filters {
s := m.seriesByID[id]
tags := make(map[string]string, len(dimensions))
Expand All @@ -780,28 +781,25 @@ func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*
for _, dim := range dimensions {
tags[dim] = s.Tags.GetString(dim)
}

// Convert the TagSet to a string, so it can be added to a map allowing TagSets to be handled
// as a set.
tagsAsKey := string(MarshalTags(tags))
tagSet, ok := tagSets[tagsAsKey]
tagsAsKey := MarshalTags(tags)
tagSet, ok := tagSets[string(tagsAsKey)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember. Does this cause the map to use some kind of optimization for memory allocations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Supposed to. But it reduces the []byte{} -> string conversion from 2 -> 1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsternberg yeah this approach is significantly than the previous approach. Around 15ns per lookup versus 45ns from some recent benchmarking I did.

if !ok {
// This TagSet is new, create a new entry for it.
tagSet = &influxql.TagSet{}
tagsForSet := make(map[string]string, len(tags))
for k, v := range tags {
tagsForSet[k] = v
tagSet = &influxql.TagSet{
Tags: tags,
Key: tagsAsKey,
}
tagSet.Tags = tagsForSet
tagSet.Key = MarshalTags(tagsForSet)
}

// Associate the series and filter with the Tagset.
tagSet.AddFilter(m.seriesByID[id].Key, filter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to my previous comment, this is a hot-zone I found while I was working on lazy iterators. When no GROUP BY * is used, a lot of time is spent growing the slice inside of the tag set. I have not figured out a good way to deal with this since attempting to count the number of elements ahead of time didn't really reduce the amount of time.


// Ensure it's back in the map.
tagSets[tagsAsKey] = tagSet
tagSets[string(tagsAsKey)] = tagSet
}
// Release the lock while we sort all the tags
m.mu.RUnlock()

// Sort the series in each tag set.
for _, t := range tagSets {
Expand All @@ -810,26 +808,21 @@ func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*

// The TagSets have been created, as a map of TagSets. Just send
// the values back as a slice, sorting for consistency.
sortedTagSetKeys := make([]string, 0, len(tagSets))
for k := range tagSets {
sortedTagSetKeys = append(sortedTagSetKeys, k)
}
sort.Strings(sortedTagSetKeys)

sortedTagsSets := make([]*influxql.TagSet, 0, len(sortedTagSetKeys))
for _, k := range sortedTagSetKeys {
sortedTagsSets = append(sortedTagsSets, tagSets[k])
sortedTagsSets := make([]*influxql.TagSet, 0, len(tagSets))
for _, v := range tagSets {
sortedTagsSets = append(sortedTagsSets, v)
}
sort.Sort(byTagKey(sortedTagsSets))

return sortedTagsSets, nil
}

// mergeSeriesFilters merges two sets of filter expressions and culls series IDs.
func mergeSeriesFilters(op influxql.Token, ids SeriesIDs, lfilters, rfilters FilterExprs) (SeriesIDs, FilterExprs) {
// Create a map to hold the final set of series filter expressions.
filters := make(map[uint64]influxql.Expr, 0)
filters := make(map[uint64]influxql.Expr, len(ids))
// Resulting list of series IDs
var series SeriesIDs
series := make(SeriesIDs, 0, len(ids))

// Combining logic:
// +==========+==========+==========+=======================+=======================+
Expand Down Expand Up @@ -1982,3 +1975,9 @@ type uint64Slice []uint64
func (a uint64Slice) Len() int { return len(a) }
func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }

type byTagKey []*influxql.TagSet

func (t byTagKey) Len() int { return len(t) }
func (t byTagKey) Less(i, j int) bool { return bytes.Compare(t[i].Key, t[j].Key) < 0 }
func (t byTagKey) Swap(i, j int) { t[i], t[j] = t[j], t[i] }