Skip to content

Commit

Permalink
Convert SHOW FIELD KEYS to the new query engine
Browse files Browse the repository at this point in the history
Fixes #5579.
  • Loading branch information
jsternberg committed Feb 25, 2016
1 parent 1cec8e7 commit aa0b603
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 14 deletions.
2 changes: 0 additions & 2 deletions cluster/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,6 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
rows, err = e.executeShowDatabasesStatement(stmt)
case *influxql.ShowDiagnosticsStatement:
rows, err = e.executeShowDiagnosticsStatement(stmt)
case *influxql.ShowFieldKeysStatement:
rows, err = e.executeShowFieldKeysStatement(stmt, database)
case *influxql.ShowGrantsForUserStatement:
rows, err = e.executeShowGrantsForUserStatement(stmt)
case *influxql.ShowRetentionPoliciesStatement:
Expand Down
38 changes: 38 additions & 0 deletions influxql/statement_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
// RewriteStatement rewrites stmt into a new statement, if applicable.
func RewriteStatement(stmt Statement) (Statement, error) {
switch stmt := stmt.(type) {
case *ShowFieldKeysStatement:
return rewriteShowFieldKeysStatement(stmt)
case *ShowMeasurementsStatement:
return rewriteShowMeasurementsStatement(stmt)
case *ShowTagKeysStatement:
Expand All @@ -16,6 +18,42 @@ func RewriteStatement(stmt Statement) (Statement, error) {
}
}

func rewriteShowFieldKeysStatement(stmt *ShowFieldKeysStatement) (Statement, error) {
var condition Expr
if len(stmt.Sources) > 0 {
if source, ok := stmt.Sources[0].(*Measurement); ok {
if source.Regex != nil {
condition = &BinaryExpr{
Op: EQREGEX,
LHS: &VarRef{Val: "name"},
RHS: &RegexLiteral{Val: source.Regex.Val},
}
} else if source.Name != "" {
condition = &BinaryExpr{
Op: EQ,
LHS: &VarRef{Val: "name"},
RHS: &StringLiteral{Val: source.Name},
}
}
}
}

return &SelectStatement{
Fields: Fields([]*Field{
{Expr: &VarRef{Val: "fieldKey"}},
}),
Sources: Sources([]Source{
&Measurement{Name: "_fieldKeys"},
}),
Condition: condition,
Offset: stmt.Offset,
Limit: stmt.Limit,
SortFields: stmt.SortFields,
OmitTime: true,
Dedupe: true,
}, nil
}

func rewriteShowMeasurementsStatement(stmt *ShowMeasurementsStatement) (Statement, error) {
// Check for time in WHERE clause (not supported).
if HasTimeExpr(stmt.Condition) {
Expand Down
81 changes: 81 additions & 0 deletions influxql/statement_rewriter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package influxql_test

import (
"testing"

"github.com/influxdata/influxdb/influxql"
)

func TestRewriteStatement(t *testing.T) {
tests := []struct {
stmt string
s string
}{
{
stmt: `SHOW FIELD KEYS`,
s: `SELECT fieldKey FROM _fieldKeys`,
},
{
stmt: `SHOW FIELD KEYS FROM cpu`,
s: `SELECT fieldKey FROM _fieldKeys WHERE "name" = 'cpu'`,
},
{
stmt: `SHOW FIELD KEYS FROM /c.*/`,
s: `SELECT fieldKey FROM _fieldKeys WHERE "name" =~ /c.*/`,
},
{
stmt: `SHOW MEASUREMENTS`,
s: `SELECT "name" FROM _measurements`,
},
{
stmt: `SHOW MEASUREMENTS WITH MEASUREMENT = cpu`,
s: `SELECT "name" FROM _measurements WHERE "name" = 'cpu'`,
},
{
stmt: `SHOW MEASUREMENTS WITH MEASUREMENT =~ /c.*/`,
s: `SELECT "name" FROM _measurements WHERE "name" =~ /c.*/`,
},
{
stmt: `SHOW MEASUREMENTS WHERE region = 'uswest'`,
s: `SELECT "name" FROM _measurements WHERE region = 'uswest'`,
},
{
stmt: `SHOW MEASUREMENTS WITH MEASUREMENT = cpu WHERE region = 'uswest'`,
s: `SELECT "name" FROM _measurements WHERE "name" = 'cpu' AND region = 'uswest'`,
},
{
stmt: `SHOW TAG KEYS`,
s: `SELECT tagKey FROM _tagKeys`,
},
{
stmt: `SHOW TAG KEYS FROM cpu`,
s: `SELECT tagKey FROM _tagKeys WHERE "name" = 'cpu'`,
},
{
stmt: `SHOW TAG KEYS FROM /c.*/`,
s: `SELECT tagKey FROM _tagKeys WHERE "name" =~ /c.*/`,
},
{
stmt: `SHOW TAG KEYS FROM cpu WHERE region = 'uswest'`,
s: `SELECT tagKey FROM _tagKeys WHERE "name" = 'cpu' AND region = 'uswest'`,
},
{
stmt: `SELECT value FROM cpu`,
s: `SELECT value FROM cpu`,
},
}

for _, test := range tests {
stmt, err := influxql.ParseStatement(test.stmt)
if err != nil {
t.Errorf("error parsing statement: %s", err)
} else {
stmt, err = influxql.RewriteStatement(stmt)
if err != nil {
t.Errorf("error rewriting statement: %s", err)
} else if s := stmt.String(); s != test.s {
t.Errorf("error rendering string. expected %s, actual: %s", test.s, s)
}
}
}
}
46 changes: 34 additions & 12 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,8 @@ func (s *Shard) createSystemIterator(opt influxql.IteratorOptions) (influxql.Ite

m := opt.Sources[0].(*influxql.Measurement)
switch m.Name {
case "_fieldKeys":
return NewFieldKeysIterator(s, opt)
case "_measurements":
return NewMeasurementIterator(s, opt)
case "_tagKeys":
Expand Down Expand Up @@ -824,6 +826,15 @@ func (ic *shardIteratorCreator) SeriesKeys(opt influxql.IteratorOptions) (influx
return ic.sh.SeriesKeys(opt)
}

func NewFieldKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) {
fn := func(m *Measurement) []string {
keys := m.FieldNames()
sort.Strings(keys)
return keys
}
return newMeasurementKeysIterator(sh, fn, opt)
}

// MeasurementIterator represents a string iterator that emits all measurement names in a shard.
type MeasurementIterator struct {
mms Measurements
Expand Down Expand Up @@ -872,18 +883,19 @@ func (itr *MeasurementIterator) Next() *influxql.FloatPoint {
}
}

// TagKeysIterator represents a string iterator that emits all tag keys in a shard.
type TagKeysIterator struct {
mms Measurements // remaining measurements
buf struct {
mm *Measurement // current measurement
keys []string // current measurement's keys
// NewTagKeysIterator returns a new instance of TagKeysIterator.
func NewTagKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) {
fn := func(m *Measurement) []string {
return m.TagKeys()
}
return newMeasurementKeysIterator(sh, fn, opt)
}

// NewTagKeysIterator returns a new instance of TagKeysIterator.
func NewTagKeysIterator(sh *Shard, opt influxql.IteratorOptions) (*TagKeysIterator, error) {
itr := &TagKeysIterator{}
// measurementKeyFunc is the function called by measurementKeysIterator.
type measurementKeyFunc func(m *Measurement) []string

func newMeasurementKeysIterator(sh *Shard, fn measurementKeyFunc, opt influxql.IteratorOptions) (*measurementKeysIterator, error) {
itr := &measurementKeysIterator{fn: fn}

// Retrieve measurements from shard. Filter if condition specified.
if opt.Condition == nil {
Expand All @@ -902,11 +914,21 @@ func NewTagKeysIterator(sh *Shard, opt influxql.IteratorOptions) (*TagKeysIterat
return itr, nil
}

// measurementKeysIterator iterates over measurements and gets keys from each measurement.
type measurementKeysIterator struct {
mms Measurements // remaining measurements
buf struct {
mm *Measurement // current measurement
keys []string // current measurement's keys
}
fn measurementKeyFunc
}

// Close closes the iterator.
func (itr *TagKeysIterator) Close() error { return nil }
func (itr *measurementKeysIterator) Close() error { return nil }

// Next emits the next tag key name.
func (itr *TagKeysIterator) Next() *influxql.FloatPoint {
func (itr *measurementKeysIterator) Next() *influxql.FloatPoint {
for {
// If there are no more keys then move to the next measurements.
if len(itr.buf.keys) == 0 {
Expand All @@ -915,7 +937,7 @@ func (itr *TagKeysIterator) Next() *influxql.FloatPoint {
}

itr.buf.mm = itr.mms[0]
itr.buf.keys = itr.buf.mm.TagKeys()
itr.buf.keys = itr.fn(itr.buf.mm)
itr.mms = itr.mms[1:]
continue
}
Expand Down

0 comments on commit aa0b603

Please sign in to comment.