Skip to content

Commit

Permalink
Merge pull request #5937 from benbjohnson/show-series
Browse files Browse the repository at this point in the history
Rewrite SHOW SERIES to SELECT
  • Loading branch information
benbjohnson committed Mar 8, 2016
2 parents 8b6efb4 + 41dde61 commit 425ef2c
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 132 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
- [#5880](https://github.com/influxdata/influxdb/issues/5880): TCP connection closed after write (regression/change from 0.9.6)
- [#5865](https://github.com/influxdata/influxdb/issues/5865): Conversion to tsm fails with exceeds max index value
- [#5924](https://github.com/influxdata/influxdb/issues/5924): Missing data after using influx\_tsm
- [#5937](https://github.com/influxdata/influxdb/pull/5937): Rewrite SHOW SERIES to use query engine

## v0.10.2 [2016-03-03]
### Bugfixes
Expand Down
7 changes: 0 additions & 7 deletions cluster/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,6 @@ func (e *QueryExecutor) executeQuery(query *influxql.Query, database string, chu
rows, err = e.executeShowGrantsForUserStatement(stmt)
case *influxql.ShowRetentionPoliciesStatement:
rows, err = e.executeShowRetentionPoliciesStatement(stmt)
case *influxql.ShowSeriesStatement:
rows, err = e.executeShowSeriesStatement(stmt, database)
case *influxql.ShowServersStatement:
rows, err = e.executeShowServersStatement(stmt)
case *influxql.ShowShardsStatement:
Expand Down Expand Up @@ -667,10 +665,6 @@ func (e *QueryExecutor) executeShowRetentionPoliciesStatement(q *influxql.ShowRe
return []*models.Row{row}, nil
}

func (e *QueryExecutor) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string) (models.Rows, error) {
return e.TSDBStore.ExecuteShowSeriesStatement(stmt, database)
}

func (e *QueryExecutor) executeShowServersStatement(q *influxql.ShowServersStatement) (models.Rows, error) {
nis, err := e.MetaClient.DataNodes()
if err != nil {
Expand Down Expand Up @@ -1100,7 +1094,6 @@ type TSDBStore interface {
DeleteRetentionPolicy(database, name string) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
ExecuteShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error)
ExecuteShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string) (models.Rows, error)
ExecuteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error)
ExpandSources(sources influxql.Sources) (influxql.Sources, error)
ShardIteratorCreator(id uint64) influxql.IteratorCreator
Expand Down
5 changes: 0 additions & 5 deletions cluster/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ type TSDBStore struct {
DeleteRetentionPolicyFn func(database, name string) error
DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr) error
ExecuteShowFieldKeysStatementFn func(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error)
ExecuteShowSeriesStatementFn func(stmt *influxql.ShowSeriesStatement, database string) (models.Rows, error)
ExecuteShowTagValuesStatementFn func(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error)
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
ShardIteratorCreatorFn func(id uint64) influxql.IteratorCreator
Expand Down Expand Up @@ -232,10 +231,6 @@ func (s *TSDBStore) ExecuteShowFieldKeysStatement(stmt *influxql.ShowFieldKeysSt
return s.ExecuteShowFieldKeysStatementFn(stmt, database)
}

func (s *TSDBStore) ExecuteShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string) (models.Rows, error) {
return s.ExecuteShowSeriesStatementFn(stmt, database)
}

func (s *TSDBStore) ExecuteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error) {
return s.ExecuteShowTagValuesStatementFn(stmt, database)
}
Expand Down
12 changes: 6 additions & 6 deletions cmd/influxd/run/server_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func init() {
&Query{
name: "Show series is present",
command: `SHOW SERIES`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
Expand Down Expand Up @@ -239,7 +239,7 @@ func init() {
&Query{
name: "Show series is present again after re-write",
command: `SHOW SERIES`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
},
Expand All @@ -260,7 +260,7 @@ func init() {
&Query{
name: "Show series is present",
command: `SHOW SERIES`,
exp: `{"results":[{"series":[{"name":"a","columns":["_key","host","region"],"values":[["a,host=serverA,region=uswest","serverA","uswest"]]},{"name":"aa","columns":["_key","host","region"],"values":[["aa,host=serverA,region=uswest","serverA","uswest"]]},{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["a,host=serverA,region=uswest"],["aa,host=serverA,region=uswest"],["b,host=serverA,region=uswest"],["c,host=serverA,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
Expand All @@ -273,7 +273,7 @@ func init() {
&Query{
name: "Show series is gone",
command: `SHOW SERIES`,
exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["b,host=serverA,region=uswest"],["c,host=serverA,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
Expand All @@ -286,7 +286,7 @@ func init() {
&Query{
name: "make sure DROP SERIES doesn't delete anything when regex doesn't match",
command: `SHOW SERIES`,
exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["b,host=serverA,region=uswest"],["c,host=serverA,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
Expand All @@ -298,7 +298,7 @@ func init() {
&Query{
name: "make sure DROP SERIES with field in WHERE didn't delete data",
command: `SHOW SERIES`,
exp: `{"results":[{"series":[{"name":"b","columns":["_key","host","region"],"values":[["b,host=serverA,region=uswest","serverA","uswest"]]},{"name":"c","columns":["_key","host","region"],"values":[["c,host=serverA,region=uswest","serverA","uswest"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["b,host=serverA,region=uswest"],["c,host=serverA,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
Expand Down
20 changes: 10 additions & 10 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4599,7 +4599,7 @@ func TestServer_Query_DropAndRecreateMeasurement(t *testing.T) {
&Query{
name: "show series",
command: `SHOW SERIES`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=serverA,region=uswest","serverA","uswest"]]},{"name":"memory","columns":["_key","host","region"],"values":[["memory,host=serverB,region=uswest","serverB","uswest"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"],["memory,host=serverB,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
Expand All @@ -4623,7 +4623,7 @@ func TestServer_Query_DropAndRecreateMeasurement(t *testing.T) {
&Query{
name: "verify series",
command: `SHOW SERIES`,
exp: `{"results":[{"series":[{"name":"memory","columns":["_key","host","region"],"values":[["memory,host=serverB,region=uswest","serverB","uswest"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["memory,host=serverB,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
Expand Down Expand Up @@ -4751,43 +4751,43 @@ func TestServer_Query_ShowSeries(t *testing.T) {
&Query{
name: `show series`,
command: "SHOW SERIES",
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=server01","server01",""],["cpu,host=server01,region=uswest","server01","uswest"],["cpu,host=server01,region=useast","server01","useast"],["cpu,host=server02,region=useast","server02","useast"]]},{"name":"disk","columns":["_key","host","region"],"values":[["disk,host=server03,region=caeast","server03","caeast"]]},{"name":"gpu","columns":["_key","host","region"],"values":[["gpu,host=server02,region=useast","server02","useast"],["gpu,host=server03,region=caeast","server03","caeast"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=server01"],["cpu,host=server01,region=useast"],["cpu,host=server01,region=uswest"],["cpu,host=server02,region=useast"],["disk,host=server03,region=caeast"],["gpu,host=server02,region=useast"],["gpu,host=server03,region=caeast"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show series from measurement`,
command: "SHOW SERIES FROM cpu",
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=server01","server01",""],["cpu,host=server01,region=uswest","server01","uswest"],["cpu,host=server01,region=useast","server01","useast"],["cpu,host=server02,region=useast","server02","useast"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=server01"],["cpu,host=server01,region=useast"],["cpu,host=server01,region=uswest"],["cpu,host=server02,region=useast"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show series from regular expression`,
command: "SHOW SERIES FROM /[cg]pu/",
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=server01","server01",""],["cpu,host=server01,region=uswest","server01","uswest"],["cpu,host=server01,region=useast","server01","useast"],["cpu,host=server02,region=useast","server02","useast"]]},{"name":"gpu","columns":["_key","host","region"],"values":[["gpu,host=server02,region=useast","server02","useast"],["gpu,host=server03,region=caeast","server03","caeast"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=server01"],["cpu,host=server01,region=useast"],["cpu,host=server01,region=uswest"],["cpu,host=server02,region=useast"],["gpu,host=server02,region=useast"],["gpu,host=server03,region=caeast"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show series with where tag`,
command: "SHOW SERIES WHERE region = 'uswest'",
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=server01,region=uswest","server01","uswest"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=server01,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show series where tag matches regular expression`,
command: "SHOW SERIES WHERE region =~ /ca.*/",
exp: `{"results":[{"series":[{"name":"disk","columns":["_key","host","region"],"values":[["disk,host=server03,region=caeast","server03","caeast"]]},{"name":"gpu","columns":["_key","host","region"],"values":[["gpu,host=server03,region=caeast","server03","caeast"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["disk,host=server03,region=caeast"],["gpu,host=server03,region=caeast"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show series`,
command: "SHOW SERIES WHERE host !~ /server0[12]/",
exp: `{"results":[{"series":[{"name":"disk","columns":["_key","host","region"],"values":[["disk,host=server03,region=caeast","server03","caeast"]]},{"name":"gpu","columns":["_key","host","region"],"values":[["gpu,host=server03,region=caeast","server03","caeast"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["disk,host=server03,region=caeast"],["gpu,host=server03,region=caeast"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: `show series with from and where`,
command: "SHOW SERIES FROM cpu WHERE region = 'useast'",
exp: `{"results":[{"series":[{"name":"cpu","columns":["_key","host","region"],"values":[["cpu,host=server01,region=useast","server01","useast"],["cpu,host=server02,region=useast","server02","useast"]]}]}]}`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=server01,region=useast"],["cpu,host=server02,region=useast"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
Expand All @@ -4799,7 +4799,7 @@ func TestServer_Query_ShowSeries(t *testing.T) {
&Query{
name: `show series with WHERE field should fail`,
command: "SHOW SERIES WHERE value > 10.0",
exp: `{"results":[{"error":"SHOW SERIES doesn't support fields in WHERE clause"}]}`,
exp: `{"results":[{"error":"invalid tag comparison operator"}]}`,
params: url.Values{"db": []string{"db0"}},
},
}...)
Expand Down
35 changes: 29 additions & 6 deletions influxql/statement_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ func RewriteStatement(stmt Statement) (Statement, error) {
return rewriteShowFieldKeysStatement(stmt)
case *ShowMeasurementsStatement:
return rewriteShowMeasurementsStatement(stmt)
case *ShowSeriesStatement:
return rewriteShowSeriesStatement(stmt)
case *ShowTagKeysStatement:
return rewriteShowTagKeysStatement(stmt)
case *ShowTagValuesStatement:
Expand Down Expand Up @@ -81,21 +83,20 @@ func rewriteShowMeasurementsStatement(stmt *ShowMeasurementsStatement) (Statemen
}, nil
}

func rewriteShowTagKeysStatement(stmt *ShowTagKeysStatement) (Statement, error) {
func rewriteShowSeriesStatement(stmt *ShowSeriesStatement) (Statement, error) {
// Check for time in WHERE clause (not supported).
if HasTimeExpr(stmt.Condition) {
return nil, errors.New("SHOW TAG KEYS doesn't support time in WHERE clause")
return nil, errors.New("SHOW SERIES doesn't support time in WHERE clause")
}

condition := rewriteSourcesCondition(stmt.Sources, stmt.Condition)
return &SelectStatement{
Fields: []*Field{
{Expr: &VarRef{Val: "tagKey"}},
{Expr: &VarRef{Val: "key"}},
},
Sources: []Source{
&Measurement{Name: "_tagKeys"},
&Measurement{Name: "_series"},
},
Condition: condition,
Condition: rewriteSourcesCondition(stmt.Sources, stmt.Condition),
Offset: stmt.Offset,
Limit: stmt.Limit,
SortFields: stmt.SortFields,
Expand Down Expand Up @@ -161,6 +162,28 @@ func rewriteShowTagValuesStatement(stmt *ShowTagValuesStatement) (Statement, err
}, nil
}

func rewriteShowTagKeysStatement(stmt *ShowTagKeysStatement) (Statement, error) {
// Check for time in WHERE clause (not supported).
if HasTimeExpr(stmt.Condition) {
return nil, errors.New("SHOW TAG KEYS doesn't support time in WHERE clause")
}

return &SelectStatement{
Fields: []*Field{
{Expr: &VarRef{Val: "tagKey"}},
},
Sources: []Source{
&Measurement{Name: "_tagKeys"},
},
Condition: rewriteSourcesCondition(stmt.Sources, stmt.Condition),
Offset: stmt.Offset,
Limit: stmt.Limit,
SortFields: stmt.SortFields,
OmitTime: true,
Dedupe: true,
}, nil
}

// rewriteSourcesCondition rewrites sources into `name` expressions.
// Merges with cond and returns a new condition.
func rewriteSourcesCondition(sources Sources, cond Expr) Expr {
Expand Down
23 changes: 20 additions & 3 deletions tsdb/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (d *DatabaseIndex) measurementsByExpr(expr influxql.Expr) (Measurements, bo
}
return nil, false, nil
default:
return nil, false, fmt.Errorf("invalid operator")
return nil, false, fmt.Errorf("invalid tag comparison operator")
}
case *influxql.ParenExpr:
return d.measurementsByExpr(e.Expr)
Expand Down Expand Up @@ -740,19 +740,27 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex

// For fields, return all series IDs from this measurement and return
// the expression passed in, as the filter.
if m.HasField(name.Val) {
if name.Val != "name" && m.HasField(name.Val) {
return m.seriesIDs, n, nil
}

tagVals, ok := m.seriesByTagKeyValue[name.Val]
if !ok {
if name.Val != "name" && !ok {
return nil, nil, nil
}

// if we're looking for series with a specific tag value
if str, ok := value.(*influxql.StringLiteral); ok {
var ids SeriesIDs

// Special handling for "name" to match measurement name.
if name.Val == "name" {
if (n.Op == influxql.EQ && str.Val == m.Name) || (n.Op == influxql.NEQ && str.Val != m.Name) {
return m.seriesIDs, &influxql.BooleanLiteral{Val: true}, nil
}
return nil, &influxql.BooleanLiteral{Val: true}, nil
}

if n.Op == influxql.EQ {
// return series that have a tag of specific value.
ids = tagVals[str.Val]
Expand All @@ -766,6 +774,15 @@ func (m *Measurement) idsForExpr(n *influxql.BinaryExpr) (SeriesIDs, influxql.Ex
if re, ok := value.(*influxql.RegexLiteral); ok {
var ids SeriesIDs

// Special handling for "name" to match measurement name.
if name.Val == "name" {
match := re.Val.MatchString(m.Name)
if (n.Op == influxql.EQREGEX && match) || (n.Op == influxql.NEQREGEX && !match) {
return m.seriesIDs, &influxql.BooleanLiteral{Val: true}, nil
}
return nil, &influxql.BooleanLiteral{Val: true}, nil
}

// The operation is a NEQREGEX, code must start by assuming all match, even
// series without any tags.
if n.Op == influxql.NEQREGEX {
Expand Down
78 changes: 78 additions & 0 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ func (s *Shard) createSystemIterator(opt influxql.IteratorOptions) (influxql.Ite
return NewFieldKeysIterator(s, opt)
case "_measurements":
return NewMeasurementIterator(s, opt)
case "_series":
return NewSeriesIterator(s, opt)
case "_tagKeys":
return NewTagKeysIterator(s, opt)
case "_tags":
Expand Down Expand Up @@ -882,6 +884,82 @@ func (itr *MeasurementIterator) Next() *influxql.FloatPoint {
}
}

// seriesIterator emits series ids.
type seriesIterator struct {
keys []string // remaining series
fields []string // fields to emit (key)
}

// NewSeriesIterator returns a new instance of SeriesIterator.
func NewSeriesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) {
// Retrieve a list of all measurements.
mms := sh.index.Measurements()
sort.Sort(mms)

// Only equality operators are allowed.
var err error
influxql.WalkFunc(opt.Condition, func(n influxql.Node) {
switch n := n.(type) {
case *influxql.BinaryExpr:
switch n.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX,
influxql.OR, influxql.AND:
default:
err = errors.New("invalid tag comparison operator")
}
}
})
if err != nil {
return nil, err
}

// Generate a list of all series keys.
keys := newStringSet()
for _, mm := range mms {
ids, err := mm.seriesIDsAllOrByExpr(opt.Condition)
if err != nil {
return nil, err
}

for _, id := range ids {
keys.add(mm.SeriesByID(id).Key)
}
}

return &seriesIterator{
keys: keys.list(),
fields: opt.Aux,
}, nil
}

// Close closes the iterator.
func (itr *seriesIterator) Close() error { return nil }

// Next emits the next point in the iterator.
func (itr *seriesIterator) Next() *influxql.FloatPoint {
// If there are no more keys then return nil.
if len(itr.keys) == 0 {
return nil
}

// Prepare auxiliary fields.
aux := make([]interface{}, len(itr.fields))
for i, f := range itr.fields {
switch f {
case "key":
aux[i] = itr.keys[0]
}
}

// Return next key.
p := &influxql.FloatPoint{
Aux: aux,
}
itr.keys = itr.keys[1:]

return p
}

// NewTagKeysIterator returns a new instance of TagKeysIterator.
func NewTagKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error) {
fn := func(m *Measurement) []string {
Expand Down
Loading

0 comments on commit 425ef2c

Please sign in to comment.