From 2f0e2467571300fe4166741683534df221607a2f Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Sun, 6 Mar 2016 09:52:34 -0500 Subject: [PATCH] Implemented the tag values iterator for `SHOW TAG VALUES` `SHOW TAG VALUES` output has been modified to print the measurement name for every measurement and to return the output in two columns: key and value. An example output might be: > SHOW TAG VALUES WITH KEY IN (host, region) name: cpu --------- key value host server01 region useast name: mem --------- key value host server02 region useast `measurementsByExpr` has been taught how to handle reserved keys (ones with an underscore at the beginning) to allow reusing that function and skipping over expressions that don't matter to the call. Fixes #5593. --- CHANGELOG.md | 1 + cmd/influxd/run/server_test.go | 18 ++-- influxql/ast.go | 30 ++++++ influxql/ast_test.go | 21 ++++ influxql/statement_rewriter.go | 150 +++++++++++++++++++--------- influxql/statement_rewriter_test.go | 16 ++- tsdb/meta.go | 138 +++++++++++++++++++++---- tsdb/shard.go | 133 +++++++++++++++++++++++- 8 files changed, 429 insertions(+), 78 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e864aec953..dfcd19d7e7a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ - [#5691](https://github.com/influxdata/influxdb/pull/5691): Remove associated shard data when retention policies are dropped. - [#5758](https://github.com/influxdata/influxdb/pull/5758): TSM engine stats for cache, WAL, and filestore. Thanks @jonseymour - [#5844](https://github.com/influxdata/influxdb/pull/5844): Tag TSM engine stats with database and retention policy +- [#5593](https://github.com/influxdata/influxdb/issues/5593): Modify `SHOW TAG VALUES` output for the new query engine to normalize the output. ### Bugfixes diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index f5d38c52232..b1926c49f3f 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -4973,37 +4973,37 @@ func TestServer_Query_ShowTagKeys(t *testing.T) { &Query{ name: "show tag values with key", command: "SHOW TAG VALUES WITH KEY = host", - exp: `{"results":[{"series":[{"name":"hostTagValues","columns":["host"],"values":[["server01"],["server02"],["server03"]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["key","value"],"values":[["host","server01"],["host","server02"]]},{"name":"disk","columns":["key","value"],"values":[["host","server03"]]},{"name":"gpu","columns":["key","value"],"values":[["host","server02"],["host","server03"]]}]}]}`, params: url.Values{"db": []string{"db0"}}, }, &Query{ name: `show tag values with key and where`, command: `SHOW TAG VALUES FROM cpu WITH KEY = host WHERE region = 'uswest'`, - exp: `{"results":[{"series":[{"name":"hostTagValues","columns":["host"],"values":[["server01"]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["key","value"],"values":[["host","server01"]]}]}]}`, params: url.Values{"db": []string{"db0"}}, }, &Query{ - name: `show tag values with key and where matches regular expression`, + name: `show tag values with key and where matches the regular expression`, command: `SHOW TAG VALUES WITH KEY = host WHERE region =~ /ca.*/`, - exp: `{"results":[{"series":[{"name":"hostTagValues","columns":["host"],"values":[["server03"]]}]}]}`, + exp: `{"results":[{"series":[{"name":"disk","columns":["key","value"],"values":[["host","server03"]]},{"name":"gpu","columns":["key","value"],"values":[["host","server03"]]}]}]}`, params: url.Values{"db": []string{"db0"}}, }, &Query{ - name: `show tag values with key and where does not matche regular expression`, + name: `show tag values with key and where does not match the regular expression`, command: `SHOW TAG VALUES WITH KEY = region WHERE host !~ /server0[12]/`, - exp: `{"results":[{"series":[{"name":"regionTagValues","columns":["region"],"values":[["caeast"]]}]}]}`, + exp: `{"results":[{"series":[{"name":"disk","columns":["key","value"],"values":[["region","caeast"]]}]}]}`, params: url.Values{"db": []string{"db0"}}, }, &Query{ - name: `show tag values with key in and where does not matche regular expression`, + name: `show tag values with key in and where does not match the regular expression`, command: `SHOW TAG VALUES FROM cpu WITH KEY IN (host, region) WHERE region = 'uswest'`, - exp: `{"results":[{"series":[{"name":"hostTagValues","columns":["host"],"values":[["server01"]]},{"name":"regionTagValues","columns":["region"],"values":[["uswest"]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["key","value"],"values":[["host","server01"],["region","uswest"]]}]}]}`, params: url.Values{"db": []string{"db0"}}, }, &Query{ name: `show tag values with key and measurement matches regular expression`, command: `SHOW TAG VALUES FROM /[cg]pu/ WITH KEY = host`, - exp: `{"results":[{"series":[{"name":"hostTagValues","columns":["host"],"values":[["server01"],["server02"],["server03"]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["key","value"],"values":[["host","server01"],["host","server02"]]},{"name":"gpu","columns":["key","value"],"values":[["host","server02"],["host","server03"]]}]}]}`, params: url.Values{"db": []string{"db0"}}, }, &Query{ diff --git a/influxql/ast.go b/influxql/ast.go index 08e8b76eff8..f4ab6370541 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -3440,6 +3440,36 @@ type rewriterFunc func(Node) Node func (fn rewriterFunc) Rewrite(n Node) Node { return fn(n) } +// RewriteExpr recursively invokes the function to replace each expr. +// Nodes are traversed depth-first and rewritten from leaf to root. +func RewriteExpr(expr Expr, fn func(Expr) Expr) Expr { + switch e := expr.(type) { + case *BinaryExpr: + e.LHS = RewriteExpr(e.LHS, fn) + e.RHS = RewriteExpr(e.RHS, fn) + if e.LHS != nil && e.RHS == nil { + expr = e.LHS + } else if e.RHS != nil && e.LHS == nil { + expr = e.RHS + } else if e.LHS == nil && e.RHS == nil { + return nil + } + + case *ParenExpr: + e.Expr = RewriteExpr(e.Expr, fn) + if e.Expr == nil { + return nil + } + + case *Call: + for i, expr := range e.Args { + e.Args[i] = RewriteExpr(expr, fn) + } + } + + return fn(expr) +} + // Eval evaluates expr against a map. func Eval(expr Expr, m map[string]interface{}) interface{} { if expr == nil { diff --git a/influxql/ast_test.go b/influxql/ast_test.go index 91b8ef6260d..592218e8971 100644 --- a/influxql/ast_test.go +++ b/influxql/ast_test.go @@ -861,6 +861,27 @@ func TestRewrite(t *testing.T) { } } +// Ensure an Expr can be rewritten handling nils. +func TestRewriteExpr(t *testing.T) { + expr := MustParseExpr(`(time > 1 AND time < 10) OR foo = 2`) + + // Remove all time expressions. + act := influxql.RewriteExpr(expr, func(e influxql.Expr) influxql.Expr { + switch e := e.(type) { + case *influxql.BinaryExpr: + if lhs, ok := e.LHS.(*influxql.VarRef); ok && lhs.Val == "time" { + return nil + } + } + return e + }) + + // Verify that everything is flipped. + if act := act.String(); act != `foo = 2.000` { + t.Fatalf("unexpected result: %s", act) + } +} + // Ensure that the String() value of a statement is parseable func TestParseString(t *testing.T) { var tests = []struct { diff --git a/influxql/statement_rewriter.go b/influxql/statement_rewriter.go index 534335675b7..c72dbb12af6 100644 --- a/influxql/statement_rewriter.go +++ b/influxql/statement_rewriter.go @@ -1,8 +1,6 @@ package influxql -import ( - "errors" -) +import "errors" // RewriteStatement rewrites stmt into a new statement, if applicable. func RewriteStatement(stmt Statement) (Statement, error) { @@ -13,6 +11,8 @@ func RewriteStatement(stmt Statement) (Statement, error) { return rewriteShowMeasurementsStatement(stmt) case *ShowTagKeysStatement: return rewriteShowTagKeysStatement(stmt) + case *ShowTagValuesStatement: + return rewriteShowTagValuesStatement(stmt) default: return stmt, nil } @@ -61,28 +61,8 @@ func rewriteShowMeasurementsStatement(stmt *ShowMeasurementsStatement) (Statemen } condition := stmt.Condition - if source, ok := stmt.Source.(*Measurement); ok { - var expr Expr - if source.Regex != nil { - expr = &BinaryExpr{ - Op: EQREGEX, - LHS: &VarRef{Val: "name"}, - RHS: &RegexLiteral{Val: source.Regex.Val}, - } - } else if source.Name != "" { - expr = &BinaryExpr{ - Op: EQ, - LHS: &VarRef{Val: "name"}, - RHS: &StringLiteral{Val: source.Name}, - } - } - - // Set condition or "AND" together. - if condition == nil { - condition = expr - } else { - condition = &BinaryExpr{Op: AND, LHS: expr, RHS: condition} - } + if stmt.Source != nil { + condition = rewriteSourcesCondition(Sources([]Source{stmt.Source}), stmt.Condition) } return &SelectStatement{ @@ -107,39 +87,70 @@ func rewriteShowTagKeysStatement(stmt *ShowTagKeysStatement) (Statement, error) return nil, errors.New("SHOW TAG KEYS doesn't support time in WHERE clause") } + condition := rewriteSourcesCondition(stmt.Sources, stmt.Condition) + return &SelectStatement{ + Fields: []*Field{ + {Expr: &VarRef{Val: "tagKey"}}, + }, + Sources: []Source{ + &Measurement{Name: "_tagKeys"}, + }, + Condition: condition, + Offset: stmt.Offset, + Limit: stmt.Limit, + SortFields: stmt.SortFields, + OmitTime: true, + Dedupe: true, + }, nil +} + +func rewriteShowTagValuesStatement(stmt *ShowTagValuesStatement) (Statement, error) { + // Check for time in WHERE clause (not supported). + if HasTimeExpr(stmt.Condition) { + return nil, errors.New("SHOW TAG VALUES doesn't support time in WHERE clause") + } + condition := stmt.Condition - if len(stmt.Sources) > 0 { - if source, ok := stmt.Sources[0].(*Measurement); ok { - var expr Expr - if source.Regex != nil { - expr = &BinaryExpr{ - Op: EQREGEX, - LHS: &VarRef{Val: "name"}, - RHS: &RegexLiteral{Val: source.Regex.Val}, - } - } else if source.Name != "" { + if len(stmt.TagKeys) > 0 { + var expr Expr + for _, tagKey := range stmt.TagKeys { + tagExpr := &BinaryExpr{ + Op: EQ, + LHS: &VarRef{Val: "_tagKey"}, + RHS: &StringLiteral{Val: tagKey}, + } + + if expr != nil { expr = &BinaryExpr{ - Op: EQ, - LHS: &VarRef{Val: "name"}, - RHS: &StringLiteral{Val: source.Name}, + Op: OR, + LHS: expr, + RHS: tagExpr, } + } else { + expr = tagExpr } + } - // Set condition or "AND" together. - if condition == nil { - condition = expr - } else { - condition = &BinaryExpr{Op: AND, LHS: expr, RHS: condition} + // Set condition or "AND" together. + if condition == nil { + condition = expr + } else { + condition = &BinaryExpr{ + Op: AND, + LHS: &ParenExpr{Expr: condition}, + RHS: &ParenExpr{Expr: expr}, } } } + condition = rewriteSourcesCondition(stmt.Sources, condition) return &SelectStatement{ Fields: []*Field{ - {Expr: &VarRef{Val: "tagKey"}}, + {Expr: &VarRef{Val: "_tagKey"}, Alias: "key"}, + {Expr: &VarRef{Val: "value"}}, }, Sources: []Source{ - &Measurement{Name: "_tagKeys"}, + &Measurement{Name: "_tags"}, }, Condition: condition, Offset: stmt.Offset, @@ -149,3 +160,52 @@ func rewriteShowTagKeysStatement(stmt *ShowTagKeysStatement) (Statement, error) Dedupe: true, }, nil } + +// rewriteSourcesCondition rewrites sources into `name` expressions. +// Merges with cond and returns a new condition. +func rewriteSourcesCondition(sources Sources, cond Expr) Expr { + if len(sources) == 0 { + return cond + } + + // Generate an OR'd set of filters on source name. + var scond Expr + for _, source := range sources { + mm := source.(*Measurement) + + // Generate a filtering expression on the measurement name. + var expr Expr + if mm.Regex != nil { + expr = &BinaryExpr{ + Op: EQREGEX, + LHS: &VarRef{Val: "name"}, + RHS: &RegexLiteral{Val: mm.Regex.Val}, + } + } else if mm.Name != "" { + expr = &BinaryExpr{ + Op: EQ, + LHS: &VarRef{Val: "name"}, + RHS: &StringLiteral{Val: mm.Name}, + } + } + + if scond == nil { + scond = expr + } else { + scond = &BinaryExpr{ + Op: OR, + LHS: scond, + RHS: expr, + } + } + } + + if cond != nil { + return &BinaryExpr{ + Op: AND, + LHS: &ParenExpr{Expr: scond}, + RHS: &ParenExpr{Expr: cond}, + } + } + return scond +} diff --git a/influxql/statement_rewriter_test.go b/influxql/statement_rewriter_test.go index de5f4fd49a3..73226f28e2a 100644 --- a/influxql/statement_rewriter_test.go +++ b/influxql/statement_rewriter_test.go @@ -41,7 +41,7 @@ func TestRewriteStatement(t *testing.T) { }, { stmt: `SHOW MEASUREMENTS WITH MEASUREMENT = cpu WHERE region = 'uswest'`, - s: `SELECT "name" FROM _measurements WHERE "name" = 'cpu' AND region = 'uswest'`, + s: `SELECT "name" FROM _measurements WHERE ("name" = 'cpu') AND (region = 'uswest')`, }, { stmt: `SHOW TAG KEYS`, @@ -57,7 +57,19 @@ func TestRewriteStatement(t *testing.T) { }, { stmt: `SHOW TAG KEYS FROM cpu WHERE region = 'uswest'`, - s: `SELECT tagKey FROM _tagKeys WHERE "name" = 'cpu' AND region = 'uswest'`, + s: `SELECT tagKey FROM _tagKeys WHERE ("name" = 'cpu') AND (region = 'uswest')`, + }, + { + stmt: `SHOW TAG VALUES WITH KEY = region`, + s: `SELECT _tagKey AS "key", value FROM _tags WHERE _tagKey = 'region'`, + }, + { + stmt: `SHOW TAG VALUES FROM cpu WITH KEY = region`, + s: `SELECT _tagKey AS "key", value FROM _tags WHERE ("name" = 'cpu') AND (_tagKey = 'region')`, + }, + { + stmt: `SHOW TAG VALUES FROM cpu WITH KEY IN (region, host)`, + s: `SELECT _tagKey AS "key", value FROM _tags WHERE ("name" = 'cpu') AND (_tagKey = 'region' OR _tagKey = 'host')`, }, { stmt: `SELECT value FROM cpu`, diff --git a/tsdb/meta.go b/tsdb/meta.go index 2e50d6984cc..8849c775c74 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -142,16 +142,20 @@ func (d *DatabaseIndex) TagsForSeries(key string) map[string]string { return ss.Tags } -// measurementsByExpr takes and expression containing only tags and returns -// a list of matching *Measurement. -func (d *DatabaseIndex) measurementsByExpr(expr influxql.Expr) (Measurements, error) { +// measurementsByExpr takes an expression containing only tags and returns a +// list of matching *Measurement. The bool return argument returns if the +// expression was a measurement expression. It is used to differentiate a list +// of no measurements because all measurements were filtered out (when the bool +// is true) against when there are no measurements because the expression +// wasn't evaluated (when the bool is false). +func (d *DatabaseIndex) measurementsByExpr(expr influxql.Expr) (Measurements, bool, error) { switch e := expr.(type) { case *influxql.BinaryExpr: switch e.Op { case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: tag, ok := e.LHS.(*influxql.VarRef) if !ok { - return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String()) + return nil, false, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String()) } tf := &TagFilter{ @@ -162,46 +166,55 @@ func (d *DatabaseIndex) measurementsByExpr(expr influxql.Expr) (Measurements, er if influxql.IsRegexOp(e.Op) { re, ok := e.RHS.(*influxql.RegexLiteral) if !ok { - return nil, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String()) + return nil, false, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String()) } tf.Regex = re.Val } else { s, ok := e.RHS.(*influxql.StringLiteral) if !ok { - return nil, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String()) + return nil, false, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String()) } tf.Value = s.Val } // Match on name, if specified. if tag.Val == "name" { - return d.measurementsByNameFilter(tf.Op, tf.Value, tf.Regex), nil + return d.measurementsByNameFilter(tf.Op, tf.Value, tf.Regex), true, nil + } else if strings.HasPrefix(tag.Val, "_") { + return nil, false, nil } - return d.measurementsByTagFilters([]*TagFilter{tf}), nil + return d.measurementsByTagFilters([]*TagFilter{tf}), true, nil case influxql.OR, influxql.AND: - lhsIDs, err := d.measurementsByExpr(e.LHS) + lhsIDs, lhsOk, err := d.measurementsByExpr(e.LHS) if err != nil { - return nil, err + return nil, false, err } - rhsIDs, err := d.measurementsByExpr(e.RHS) + rhsIDs, rhsOk, err := d.measurementsByExpr(e.RHS) if err != nil { - return nil, err + return nil, false, err } - if e.Op == influxql.OR { - return lhsIDs.union(rhsIDs), nil - } + if lhsOk && rhsOk { + if e.Op == influxql.OR { + return lhsIDs.union(rhsIDs), true, nil + } - return lhsIDs.intersect(rhsIDs), nil + return lhsIDs.intersect(rhsIDs), true, nil + } else if lhsOk { + return lhsIDs, true, nil + } else if rhsOk { + return rhsIDs, true, nil + } + return nil, false, nil default: - return nil, fmt.Errorf("invalid operator") + return nil, false, fmt.Errorf("invalid operator") } case *influxql.ParenExpr: return d.measurementsByExpr(e.Expr) } - return nil, fmt.Errorf("%#v", expr) + return nil, false, fmt.Errorf("%#v", expr) } // measurementsByNameFilter returns the sorted measurements matching a name. @@ -929,6 +942,95 @@ func (m *Measurement) seriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error return ids, nil } +// tagKeysByExpr extracts the tag keys wanted by the expression. +func (m *Measurement) tagKeysByExpr(expr influxql.Expr) (stringSet, bool, error) { + switch e := expr.(type) { + case *influxql.BinaryExpr: + switch e.Op { + case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: + tag, ok := e.LHS.(*influxql.VarRef) + if !ok { + return nil, false, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String()) + } + + if tag.Val != "_tagKey" { + return nil, false, nil + } + + tf := TagFilter{ + Op: e.Op, + } + + if influxql.IsRegexOp(e.Op) { + re, ok := e.RHS.(*influxql.RegexLiteral) + if !ok { + return nil, false, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String()) + } + tf.Regex = re.Val + } else { + s, ok := e.RHS.(*influxql.StringLiteral) + if !ok { + return nil, false, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String()) + } + tf.Value = s.Val + } + return m.tagKeysByFilter(tf.Op, tf.Value, tf.Regex), true, nil + case influxql.AND, influxql.OR: + lhsKeys, lhsOk, err := m.tagKeysByExpr(e.LHS) + if err != nil { + return nil, false, err + } + + rhsKeys, rhsOk, err := m.tagKeysByExpr(e.RHS) + if err != nil { + return nil, false, err + } + + if lhsOk && rhsOk { + if e.Op == influxql.OR { + return lhsKeys.union(rhsKeys), true, nil + } + + return lhsKeys.intersect(rhsKeys), true, nil + } else if lhsOk { + return lhsKeys, true, nil + } else if rhsOk { + return rhsKeys, true, nil + } + return nil, false, nil + default: + return nil, false, fmt.Errorf("invalid operator") + } + case *influxql.ParenExpr: + return m.tagKeysByExpr(e.Expr) + } + return nil, false, fmt.Errorf("%#v", expr) +} + +// tagKeysByFilter will filter the tag keys for the measurement. +func (m *Measurement) tagKeysByFilter(op influxql.Token, val string, regex *regexp.Regexp) stringSet { + ss := newStringSet() + for _, key := range m.TagKeys() { + var matched bool + switch op { + case influxql.EQ: + matched = key == val + case influxql.NEQ: + matched = key != val + case influxql.EQREGEX: + matched = regex.MatchString(key) + case influxql.NEQREGEX: + matched = !regex.MatchString(key) + } + + if !matched { + continue + } + ss.add(key) + } + return ss +} + // tagValuer is used during expression expansion to evaluate all sets of tag values. type tagValuer struct { tags map[string]*string diff --git a/tsdb/shard.go b/tsdb/shard.go index 6edbd06262a..8603cb44e41 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -10,6 +10,7 @@ import ( "math" "os" "sort" + "strings" "sync" "github.com/gogo/protobuf/proto" @@ -416,6 +417,8 @@ func (s *Shard) createSystemIterator(opt influxql.IteratorOptions) (influxql.Ite return NewMeasurementIterator(s, opt) case "_tagKeys": return NewTagKeysIterator(s, opt) + case "_tags": + return NewTagValuesIterator(s, opt) default: return nil, fmt.Errorf("unknown system source: %s", m.Name) } @@ -455,8 +458,14 @@ func (s *Shard) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, e if len(opt.Sources) > 1 { return nil, errors.New("cannot select from multiple system sources") } - // Meta queries don't need to know the series name and always have a single string. - return []influxql.Series{{Aux: []influxql.DataType{influxql.String}}}, nil + + // Meta queries don't need to know the series name and + // always have a single series of strings. + auxFields := make([]influxql.DataType, len(opt.Aux)) + for i := range auxFields { + auxFields[i] = influxql.String + } + return []influxql.Series{{Aux: auxFields}}, nil } return s.engine.SeriesKeys(opt) @@ -844,7 +853,7 @@ func NewMeasurementIterator(sh *Shard, opt influxql.IteratorOptions) (*Measureme if opt.Condition == nil { itr.mms = sh.index.Measurements() } else { - mms, err := sh.index.measurementsByExpr(opt.Condition) + mms, _, err := sh.index.measurementsByExpr(opt.Condition) if err != nil { return nil, err } @@ -881,6 +890,122 @@ func NewTagKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Itera return newMeasurementKeysIterator(sh, fn, opt) } +// tagValuesIterator emits key/tag values +type tagValuesIterator struct { + series []*Series // remaining series + keys []string // tag keys to select from a series + fields []string // fields to emit (key or value) + buf struct { + s *Series // current series + keys []string // current tag's keys + } +} + +// NewTagValuesIterator returns a new instance of TagValuesIterator. +func NewTagValuesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) { + if opt.Condition == nil { + return nil, errors.New("a condition is required") + } + + mms, ok, err := sh.index.measurementsByExpr(opt.Condition) + if err != nil { + return nil, err + } else if !ok { + mms = sh.index.Measurements() + sort.Sort(mms) + } + + filterExpr := influxql.CloneExpr(opt.Condition) + filterExpr = influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr { + switch e := e.(type) { + case *influxql.BinaryExpr: + switch e.Op { + case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: + tag, ok := e.LHS.(*influxql.VarRef) + if !ok || tag.Val == "name" || strings.HasPrefix(tag.Val, "_") { + return nil + } + } + } + return e + }) + + var series []*Series + keys := newStringSet() + for _, mm := range mms { + ss, ok, err := mm.tagKeysByExpr(opt.Condition) + if err != nil { + return nil, err + } else if !ok { + keys.add(mm.TagKeys()...) + } else { + keys = keys.union(ss) + } + + ids, err := mm.seriesIDsAllOrByExpr(filterExpr) + if err != nil { + return nil, err + } + + for _, id := range ids { + series = append(series, mm.SeriesByID(id)) + } + } + + return &tagValuesIterator{ + series: series, + keys: keys.list(), + fields: opt.Aux, + }, nil +} + +// Close closes the iterator. +func (itr *tagValuesIterator) Close() error { return nil } + +// Next emits the next point in the iterator. +func (itr *tagValuesIterator) Next() *influxql.FloatPoint { + for { + // If there are no more values then move to the next key. + if len(itr.buf.keys) == 0 { + if len(itr.series) == 0 { + return nil + } + + itr.buf.s = itr.series[0] + itr.buf.keys = itr.keys + itr.series = itr.series[1:] + continue + } + + key := itr.buf.keys[0] + value, ok := itr.buf.s.Tags[key] + if !ok { + itr.buf.keys = itr.buf.keys[1:] + continue + } + + // Prepare auxiliary fields. + auxFields := make([]interface{}, len(itr.fields)) + for i, f := range itr.fields { + switch f { + case "_tagKey": + auxFields[i] = key + case "value": + auxFields[i] = value + } + } + + // Return next key. + p := &influxql.FloatPoint{ + Name: itr.buf.s.measurement.Name, + Aux: auxFields, + } + itr.buf.keys = itr.buf.keys[1:] + + return p + } +} + // measurementKeyFunc is the function called by measurementKeysIterator. type measurementKeyFunc func(m *Measurement) []string @@ -891,7 +1016,7 @@ func newMeasurementKeysIterator(sh *Shard, fn measurementKeyFunc, opt influxql.I if opt.Condition == nil { itr.mms = sh.index.Measurements() } else { - mms, err := sh.index.measurementsByExpr(opt.Condition) + mms, _, err := sh.index.measurementsByExpr(opt.Condition) if err != nil { return nil, err }