diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e864aec953..dfcd19d7e7a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ - [#5691](https://github.com/influxdata/influxdb/pull/5691): Remove associated shard data when retention policies are dropped. - [#5758](https://github.com/influxdata/influxdb/pull/5758): TSM engine stats for cache, WAL, and filestore. Thanks @jonseymour - [#5844](https://github.com/influxdata/influxdb/pull/5844): Tag TSM engine stats with database and retention policy +- [#5593](https://github.com/influxdata/influxdb/issues/5593): Modify `SHOW TAG VALUES` output for the new query engine to normalize the output. ### Bugfixes diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index f5d38c52232..b1926c49f3f 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -4973,37 +4973,37 @@ func TestServer_Query_ShowTagKeys(t *testing.T) { &Query{ name: "show tag values with key", command: "SHOW TAG VALUES WITH KEY = host", - exp: `{"results":[{"series":[{"name":"hostTagValues","columns":["host"],"values":[["server01"],["server02"],["server03"]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["key","value"],"values":[["host","server01"],["host","server02"]]},{"name":"disk","columns":["key","value"],"values":[["host","server03"]]},{"name":"gpu","columns":["key","value"],"values":[["host","server02"],["host","server03"]]}]}]}`, params: url.Values{"db": []string{"db0"}}, }, &Query{ name: `show tag values with key and where`, command: `SHOW TAG VALUES FROM cpu WITH KEY = host WHERE region = 'uswest'`, - exp: `{"results":[{"series":[{"name":"hostTagValues","columns":["host"],"values":[["server01"]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["key","value"],"values":[["host","server01"]]}]}]}`, params: url.Values{"db": []string{"db0"}}, }, &Query{ - name: `show tag values with key and where matches regular expression`, + name: `show tag values with key and where matches the regular expression`, command: `SHOW TAG VALUES WITH KEY = host WHERE region =~ /ca.*/`, - exp: `{"results":[{"series":[{"name":"hostTagValues","columns":["host"],"values":[["server03"]]}]}]}`, + exp: `{"results":[{"series":[{"name":"disk","columns":["key","value"],"values":[["host","server03"]]},{"name":"gpu","columns":["key","value"],"values":[["host","server03"]]}]}]}`, params: url.Values{"db": []string{"db0"}}, }, &Query{ - name: `show tag values with key and where does not matche regular expression`, + name: `show tag values with key and where does not match the regular expression`, command: `SHOW TAG VALUES WITH KEY = region WHERE host !~ /server0[12]/`, - exp: `{"results":[{"series":[{"name":"regionTagValues","columns":["region"],"values":[["caeast"]]}]}]}`, + exp: `{"results":[{"series":[{"name":"disk","columns":["key","value"],"values":[["region","caeast"]]}]}]}`, params: url.Values{"db": []string{"db0"}}, }, &Query{ - name: `show tag values with key in and where does not matche regular expression`, + name: `show tag values with key in and where does not match the regular expression`, command: `SHOW TAG VALUES FROM cpu WITH KEY IN (host, region) WHERE region = 'uswest'`, - exp: `{"results":[{"series":[{"name":"hostTagValues","columns":["host"],"values":[["server01"]]},{"name":"regionTagValues","columns":["region"],"values":[["uswest"]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["key","value"],"values":[["host","server01"],["region","uswest"]]}]}]}`, params: url.Values{"db": []string{"db0"}}, }, &Query{ name: `show tag values with key and measurement matches regular expression`, command: `SHOW TAG VALUES FROM /[cg]pu/ WITH KEY = host`, - exp: `{"results":[{"series":[{"name":"hostTagValues","columns":["host"],"values":[["server01"],["server02"],["server03"]]}]}]}`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["key","value"],"values":[["host","server01"],["host","server02"]]},{"name":"gpu","columns":["key","value"],"values":[["host","server02"],["host","server03"]]}]}]}`, params: url.Values{"db": []string{"db0"}}, }, &Query{ diff --git a/influxql/ast.go b/influxql/ast.go index 08e8b76eff8..f4ab6370541 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -3440,6 +3440,36 @@ type rewriterFunc func(Node) Node func (fn rewriterFunc) Rewrite(n Node) Node { return fn(n) } +// RewriteExpr recursively invokes the function to replace each expr. +// Nodes are traversed depth-first and rewritten from leaf to root. +func RewriteExpr(expr Expr, fn func(Expr) Expr) Expr { + switch e := expr.(type) { + case *BinaryExpr: + e.LHS = RewriteExpr(e.LHS, fn) + e.RHS = RewriteExpr(e.RHS, fn) + if e.LHS != nil && e.RHS == nil { + expr = e.LHS + } else if e.RHS != nil && e.LHS == nil { + expr = e.RHS + } else if e.LHS == nil && e.RHS == nil { + return nil + } + + case *ParenExpr: + e.Expr = RewriteExpr(e.Expr, fn) + if e.Expr == nil { + return nil + } + + case *Call: + for i, expr := range e.Args { + e.Args[i] = RewriteExpr(expr, fn) + } + } + + return fn(expr) +} + // Eval evaluates expr against a map. func Eval(expr Expr, m map[string]interface{}) interface{} { if expr == nil { diff --git a/influxql/ast_test.go b/influxql/ast_test.go index 91b8ef6260d..592218e8971 100644 --- a/influxql/ast_test.go +++ b/influxql/ast_test.go @@ -861,6 +861,27 @@ func TestRewrite(t *testing.T) { } } +// Ensure an Expr can be rewritten handling nils. +func TestRewriteExpr(t *testing.T) { + expr := MustParseExpr(`(time > 1 AND time < 10) OR foo = 2`) + + // Remove all time expressions. + act := influxql.RewriteExpr(expr, func(e influxql.Expr) influxql.Expr { + switch e := e.(type) { + case *influxql.BinaryExpr: + if lhs, ok := e.LHS.(*influxql.VarRef); ok && lhs.Val == "time" { + return nil + } + } + return e + }) + + // Verify that everything is flipped. + if act := act.String(); act != `foo = 2.000` { + t.Fatalf("unexpected result: %s", act) + } +} + // Ensure that the String() value of a statement is parseable func TestParseString(t *testing.T) { var tests = []struct { diff --git a/influxql/statement_rewriter.go b/influxql/statement_rewriter.go index 534335675b7..c72dbb12af6 100644 --- a/influxql/statement_rewriter.go +++ b/influxql/statement_rewriter.go @@ -1,8 +1,6 @@ package influxql -import ( - "errors" -) +import "errors" // RewriteStatement rewrites stmt into a new statement, if applicable. func RewriteStatement(stmt Statement) (Statement, error) { @@ -13,6 +11,8 @@ func RewriteStatement(stmt Statement) (Statement, error) { return rewriteShowMeasurementsStatement(stmt) case *ShowTagKeysStatement: return rewriteShowTagKeysStatement(stmt) + case *ShowTagValuesStatement: + return rewriteShowTagValuesStatement(stmt) default: return stmt, nil } @@ -61,28 +61,8 @@ func rewriteShowMeasurementsStatement(stmt *ShowMeasurementsStatement) (Statemen } condition := stmt.Condition - if source, ok := stmt.Source.(*Measurement); ok { - var expr Expr - if source.Regex != nil { - expr = &BinaryExpr{ - Op: EQREGEX, - LHS: &VarRef{Val: "name"}, - RHS: &RegexLiteral{Val: source.Regex.Val}, - } - } else if source.Name != "" { - expr = &BinaryExpr{ - Op: EQ, - LHS: &VarRef{Val: "name"}, - RHS: &StringLiteral{Val: source.Name}, - } - } - - // Set condition or "AND" together. - if condition == nil { - condition = expr - } else { - condition = &BinaryExpr{Op: AND, LHS: expr, RHS: condition} - } + if stmt.Source != nil { + condition = rewriteSourcesCondition(Sources([]Source{stmt.Source}), stmt.Condition) } return &SelectStatement{ @@ -107,39 +87,70 @@ func rewriteShowTagKeysStatement(stmt *ShowTagKeysStatement) (Statement, error) return nil, errors.New("SHOW TAG KEYS doesn't support time in WHERE clause") } + condition := rewriteSourcesCondition(stmt.Sources, stmt.Condition) + return &SelectStatement{ + Fields: []*Field{ + {Expr: &VarRef{Val: "tagKey"}}, + }, + Sources: []Source{ + &Measurement{Name: "_tagKeys"}, + }, + Condition: condition, + Offset: stmt.Offset, + Limit: stmt.Limit, + SortFields: stmt.SortFields, + OmitTime: true, + Dedupe: true, + }, nil +} + +func rewriteShowTagValuesStatement(stmt *ShowTagValuesStatement) (Statement, error) { + // Check for time in WHERE clause (not supported). + if HasTimeExpr(stmt.Condition) { + return nil, errors.New("SHOW TAG VALUES doesn't support time in WHERE clause") + } + condition := stmt.Condition - if len(stmt.Sources) > 0 { - if source, ok := stmt.Sources[0].(*Measurement); ok { - var expr Expr - if source.Regex != nil { - expr = &BinaryExpr{ - Op: EQREGEX, - LHS: &VarRef{Val: "name"}, - RHS: &RegexLiteral{Val: source.Regex.Val}, - } - } else if source.Name != "" { + if len(stmt.TagKeys) > 0 { + var expr Expr + for _, tagKey := range stmt.TagKeys { + tagExpr := &BinaryExpr{ + Op: EQ, + LHS: &VarRef{Val: "_tagKey"}, + RHS: &StringLiteral{Val: tagKey}, + } + + if expr != nil { expr = &BinaryExpr{ - Op: EQ, - LHS: &VarRef{Val: "name"}, - RHS: &StringLiteral{Val: source.Name}, + Op: OR, + LHS: expr, + RHS: tagExpr, } + } else { + expr = tagExpr } + } - // Set condition or "AND" together. - if condition == nil { - condition = expr - } else { - condition = &BinaryExpr{Op: AND, LHS: expr, RHS: condition} + // Set condition or "AND" together. + if condition == nil { + condition = expr + } else { + condition = &BinaryExpr{ + Op: AND, + LHS: &ParenExpr{Expr: condition}, + RHS: &ParenExpr{Expr: expr}, } } } + condition = rewriteSourcesCondition(stmt.Sources, condition) return &SelectStatement{ Fields: []*Field{ - {Expr: &VarRef{Val: "tagKey"}}, + {Expr: &VarRef{Val: "_tagKey"}, Alias: "key"}, + {Expr: &VarRef{Val: "value"}}, }, Sources: []Source{ - &Measurement{Name: "_tagKeys"}, + &Measurement{Name: "_tags"}, }, Condition: condition, Offset: stmt.Offset, @@ -149,3 +160,52 @@ func rewriteShowTagKeysStatement(stmt *ShowTagKeysStatement) (Statement, error) Dedupe: true, }, nil } + +// rewriteSourcesCondition rewrites sources into `name` expressions. +// Merges with cond and returns a new condition. +func rewriteSourcesCondition(sources Sources, cond Expr) Expr { + if len(sources) == 0 { + return cond + } + + // Generate an OR'd set of filters on source name. + var scond Expr + for _, source := range sources { + mm := source.(*Measurement) + + // Generate a filtering expression on the measurement name. + var expr Expr + if mm.Regex != nil { + expr = &BinaryExpr{ + Op: EQREGEX, + LHS: &VarRef{Val: "name"}, + RHS: &RegexLiteral{Val: mm.Regex.Val}, + } + } else if mm.Name != "" { + expr = &BinaryExpr{ + Op: EQ, + LHS: &VarRef{Val: "name"}, + RHS: &StringLiteral{Val: mm.Name}, + } + } + + if scond == nil { + scond = expr + } else { + scond = &BinaryExpr{ + Op: OR, + LHS: scond, + RHS: expr, + } + } + } + + if cond != nil { + return &BinaryExpr{ + Op: AND, + LHS: &ParenExpr{Expr: scond}, + RHS: &ParenExpr{Expr: cond}, + } + } + return scond +} diff --git a/influxql/statement_rewriter_test.go b/influxql/statement_rewriter_test.go index de5f4fd49a3..73226f28e2a 100644 --- a/influxql/statement_rewriter_test.go +++ b/influxql/statement_rewriter_test.go @@ -41,7 +41,7 @@ func TestRewriteStatement(t *testing.T) { }, { stmt: `SHOW MEASUREMENTS WITH MEASUREMENT = cpu WHERE region = 'uswest'`, - s: `SELECT "name" FROM _measurements WHERE "name" = 'cpu' AND region = 'uswest'`, + s: `SELECT "name" FROM _measurements WHERE ("name" = 'cpu') AND (region = 'uswest')`, }, { stmt: `SHOW TAG KEYS`, @@ -57,7 +57,19 @@ func TestRewriteStatement(t *testing.T) { }, { stmt: `SHOW TAG KEYS FROM cpu WHERE region = 'uswest'`, - s: `SELECT tagKey FROM _tagKeys WHERE "name" = 'cpu' AND region = 'uswest'`, + s: `SELECT tagKey FROM _tagKeys WHERE ("name" = 'cpu') AND (region = 'uswest')`, + }, + { + stmt: `SHOW TAG VALUES WITH KEY = region`, + s: `SELECT _tagKey AS "key", value FROM _tags WHERE _tagKey = 'region'`, + }, + { + stmt: `SHOW TAG VALUES FROM cpu WITH KEY = region`, + s: `SELECT _tagKey AS "key", value FROM _tags WHERE ("name" = 'cpu') AND (_tagKey = 'region')`, + }, + { + stmt: `SHOW TAG VALUES FROM cpu WITH KEY IN (region, host)`, + s: `SELECT _tagKey AS "key", value FROM _tags WHERE ("name" = 'cpu') AND (_tagKey = 'region' OR _tagKey = 'host')`, }, { stmt: `SELECT value FROM cpu`, diff --git a/tsdb/meta.go b/tsdb/meta.go index 2e50d6984cc..8849c775c74 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -142,16 +142,20 @@ func (d *DatabaseIndex) TagsForSeries(key string) map[string]string { return ss.Tags } -// measurementsByExpr takes and expression containing only tags and returns -// a list of matching *Measurement. -func (d *DatabaseIndex) measurementsByExpr(expr influxql.Expr) (Measurements, error) { +// measurementsByExpr takes an expression containing only tags and returns a +// list of matching *Measurement. The bool return argument returns if the +// expression was a measurement expression. It is used to differentiate a list +// of no measurements because all measurements were filtered out (when the bool +// is true) against when there are no measurements because the expression +// wasn't evaluated (when the bool is false). +func (d *DatabaseIndex) measurementsByExpr(expr influxql.Expr) (Measurements, bool, error) { switch e := expr.(type) { case *influxql.BinaryExpr: switch e.Op { case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: tag, ok := e.LHS.(*influxql.VarRef) if !ok { - return nil, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String()) + return nil, false, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String()) } tf := &TagFilter{ @@ -162,46 +166,55 @@ func (d *DatabaseIndex) measurementsByExpr(expr influxql.Expr) (Measurements, er if influxql.IsRegexOp(e.Op) { re, ok := e.RHS.(*influxql.RegexLiteral) if !ok { - return nil, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String()) + return nil, false, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String()) } tf.Regex = re.Val } else { s, ok := e.RHS.(*influxql.StringLiteral) if !ok { - return nil, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String()) + return nil, false, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String()) } tf.Value = s.Val } // Match on name, if specified. if tag.Val == "name" { - return d.measurementsByNameFilter(tf.Op, tf.Value, tf.Regex), nil + return d.measurementsByNameFilter(tf.Op, tf.Value, tf.Regex), true, nil + } else if strings.HasPrefix(tag.Val, "_") { + return nil, false, nil } - return d.measurementsByTagFilters([]*TagFilter{tf}), nil + return d.measurementsByTagFilters([]*TagFilter{tf}), true, nil case influxql.OR, influxql.AND: - lhsIDs, err := d.measurementsByExpr(e.LHS) + lhsIDs, lhsOk, err := d.measurementsByExpr(e.LHS) if err != nil { - return nil, err + return nil, false, err } - rhsIDs, err := d.measurementsByExpr(e.RHS) + rhsIDs, rhsOk, err := d.measurementsByExpr(e.RHS) if err != nil { - return nil, err + return nil, false, err } - if e.Op == influxql.OR { - return lhsIDs.union(rhsIDs), nil - } + if lhsOk && rhsOk { + if e.Op == influxql.OR { + return lhsIDs.union(rhsIDs), true, nil + } - return lhsIDs.intersect(rhsIDs), nil + return lhsIDs.intersect(rhsIDs), true, nil + } else if lhsOk { + return lhsIDs, true, nil + } else if rhsOk { + return rhsIDs, true, nil + } + return nil, false, nil default: - return nil, fmt.Errorf("invalid operator") + return nil, false, fmt.Errorf("invalid operator") } case *influxql.ParenExpr: return d.measurementsByExpr(e.Expr) } - return nil, fmt.Errorf("%#v", expr) + return nil, false, fmt.Errorf("%#v", expr) } // measurementsByNameFilter returns the sorted measurements matching a name. @@ -929,6 +942,95 @@ func (m *Measurement) seriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error return ids, nil } +// tagKeysByExpr extracts the tag keys wanted by the expression. +func (m *Measurement) tagKeysByExpr(expr influxql.Expr) (stringSet, bool, error) { + switch e := expr.(type) { + case *influxql.BinaryExpr: + switch e.Op { + case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: + tag, ok := e.LHS.(*influxql.VarRef) + if !ok { + return nil, false, fmt.Errorf("left side of '%s' must be a tag key", e.Op.String()) + } + + if tag.Val != "_tagKey" { + return nil, false, nil + } + + tf := TagFilter{ + Op: e.Op, + } + + if influxql.IsRegexOp(e.Op) { + re, ok := e.RHS.(*influxql.RegexLiteral) + if !ok { + return nil, false, fmt.Errorf("right side of '%s' must be a regular expression", e.Op.String()) + } + tf.Regex = re.Val + } else { + s, ok := e.RHS.(*influxql.StringLiteral) + if !ok { + return nil, false, fmt.Errorf("right side of '%s' must be a tag value string", e.Op.String()) + } + tf.Value = s.Val + } + return m.tagKeysByFilter(tf.Op, tf.Value, tf.Regex), true, nil + case influxql.AND, influxql.OR: + lhsKeys, lhsOk, err := m.tagKeysByExpr(e.LHS) + if err != nil { + return nil, false, err + } + + rhsKeys, rhsOk, err := m.tagKeysByExpr(e.RHS) + if err != nil { + return nil, false, err + } + + if lhsOk && rhsOk { + if e.Op == influxql.OR { + return lhsKeys.union(rhsKeys), true, nil + } + + return lhsKeys.intersect(rhsKeys), true, nil + } else if lhsOk { + return lhsKeys, true, nil + } else if rhsOk { + return rhsKeys, true, nil + } + return nil, false, nil + default: + return nil, false, fmt.Errorf("invalid operator") + } + case *influxql.ParenExpr: + return m.tagKeysByExpr(e.Expr) + } + return nil, false, fmt.Errorf("%#v", expr) +} + +// tagKeysByFilter will filter the tag keys for the measurement. +func (m *Measurement) tagKeysByFilter(op influxql.Token, val string, regex *regexp.Regexp) stringSet { + ss := newStringSet() + for _, key := range m.TagKeys() { + var matched bool + switch op { + case influxql.EQ: + matched = key == val + case influxql.NEQ: + matched = key != val + case influxql.EQREGEX: + matched = regex.MatchString(key) + case influxql.NEQREGEX: + matched = !regex.MatchString(key) + } + + if !matched { + continue + } + ss.add(key) + } + return ss +} + // tagValuer is used during expression expansion to evaluate all sets of tag values. type tagValuer struct { tags map[string]*string diff --git a/tsdb/shard.go b/tsdb/shard.go index 6edbd06262a..8603cb44e41 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -10,6 +10,7 @@ import ( "math" "os" "sort" + "strings" "sync" "github.com/gogo/protobuf/proto" @@ -416,6 +417,8 @@ func (s *Shard) createSystemIterator(opt influxql.IteratorOptions) (influxql.Ite return NewMeasurementIterator(s, opt) case "_tagKeys": return NewTagKeysIterator(s, opt) + case "_tags": + return NewTagValuesIterator(s, opt) default: return nil, fmt.Errorf("unknown system source: %s", m.Name) } @@ -455,8 +458,14 @@ func (s *Shard) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, e if len(opt.Sources) > 1 { return nil, errors.New("cannot select from multiple system sources") } - // Meta queries don't need to know the series name and always have a single string. - return []influxql.Series{{Aux: []influxql.DataType{influxql.String}}}, nil + + // Meta queries don't need to know the series name and + // always have a single series of strings. + auxFields := make([]influxql.DataType, len(opt.Aux)) + for i := range auxFields { + auxFields[i] = influxql.String + } + return []influxql.Series{{Aux: auxFields}}, nil } return s.engine.SeriesKeys(opt) @@ -844,7 +853,7 @@ func NewMeasurementIterator(sh *Shard, opt influxql.IteratorOptions) (*Measureme if opt.Condition == nil { itr.mms = sh.index.Measurements() } else { - mms, err := sh.index.measurementsByExpr(opt.Condition) + mms, _, err := sh.index.measurementsByExpr(opt.Condition) if err != nil { return nil, err } @@ -881,6 +890,122 @@ func NewTagKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Itera return newMeasurementKeysIterator(sh, fn, opt) } +// tagValuesIterator emits key/tag values +type tagValuesIterator struct { + series []*Series // remaining series + keys []string // tag keys to select from a series + fields []string // fields to emit (key or value) + buf struct { + s *Series // current series + keys []string // current tag's keys + } +} + +// NewTagValuesIterator returns a new instance of TagValuesIterator. +func NewTagValuesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) { + if opt.Condition == nil { + return nil, errors.New("a condition is required") + } + + mms, ok, err := sh.index.measurementsByExpr(opt.Condition) + if err != nil { + return nil, err + } else if !ok { + mms = sh.index.Measurements() + sort.Sort(mms) + } + + filterExpr := influxql.CloneExpr(opt.Condition) + filterExpr = influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr { + switch e := e.(type) { + case *influxql.BinaryExpr: + switch e.Op { + case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: + tag, ok := e.LHS.(*influxql.VarRef) + if !ok || tag.Val == "name" || strings.HasPrefix(tag.Val, "_") { + return nil + } + } + } + return e + }) + + var series []*Series + keys := newStringSet() + for _, mm := range mms { + ss, ok, err := mm.tagKeysByExpr(opt.Condition) + if err != nil { + return nil, err + } else if !ok { + keys.add(mm.TagKeys()...) + } else { + keys = keys.union(ss) + } + + ids, err := mm.seriesIDsAllOrByExpr(filterExpr) + if err != nil { + return nil, err + } + + for _, id := range ids { + series = append(series, mm.SeriesByID(id)) + } + } + + return &tagValuesIterator{ + series: series, + keys: keys.list(), + fields: opt.Aux, + }, nil +} + +// Close closes the iterator. +func (itr *tagValuesIterator) Close() error { return nil } + +// Next emits the next point in the iterator. +func (itr *tagValuesIterator) Next() *influxql.FloatPoint { + for { + // If there are no more values then move to the next key. + if len(itr.buf.keys) == 0 { + if len(itr.series) == 0 { + return nil + } + + itr.buf.s = itr.series[0] + itr.buf.keys = itr.keys + itr.series = itr.series[1:] + continue + } + + key := itr.buf.keys[0] + value, ok := itr.buf.s.Tags[key] + if !ok { + itr.buf.keys = itr.buf.keys[1:] + continue + } + + // Prepare auxiliary fields. + auxFields := make([]interface{}, len(itr.fields)) + for i, f := range itr.fields { + switch f { + case "_tagKey": + auxFields[i] = key + case "value": + auxFields[i] = value + } + } + + // Return next key. + p := &influxql.FloatPoint{ + Name: itr.buf.s.measurement.Name, + Aux: auxFields, + } + itr.buf.keys = itr.buf.keys[1:] + + return p + } +} + // measurementKeyFunc is the function called by measurementKeysIterator. type measurementKeyFunc func(m *Measurement) []string @@ -891,7 +1016,7 @@ func newMeasurementKeysIterator(sh *Shard, fn measurementKeyFunc, opt influxql.I if opt.Condition == nil { itr.mms = sh.index.Measurements() } else { - mms, err := sh.index.measurementsByExpr(opt.Condition) + mms, _, err := sh.index.measurementsByExpr(opt.Condition) if err != nil { return nil, err }