Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve SHOW TAG KEYS performance. #9073

Merged
merged 1 commit into from
Nov 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
- [#8986](https://github.com/influxdata/influxdb/issues/8986): Add long-line support to client importer. Thanks @lets00!
- [#9021](https://github.com/influxdata/influxdb/pull/9021): Update to go 1.9.2
- [#8891](https://github.com/influxdata/influxdb/pull/8891): Allow human-readable byte sizes in config
- [#9073](https://github.com/influxdata/influxdb/pull/9073): Improve SHOW TAG KEYS performance.

### Bugfixes

Expand Down
97 changes: 97 additions & 0 deletions coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx query.
rows, err = e.executeShowStatsStatement(stmt)
case *influxql.ShowSubscriptionsStatement:
rows, err = e.executeShowSubscriptionsStatement(stmt)
case *influxql.ShowTagKeysStatement:
return e.executeShowTagKeys(stmt, &ctx)
case *influxql.ShowTagValuesStatement:
return e.executeShowTagValues(stmt, &ctx)
case *influxql.ShowUsersStatement:
Expand Down Expand Up @@ -932,6 +934,96 @@ func (e *StatementExecutor) executeShowSubscriptionsStatement(stmt *influxql.Sho
return rows, nil
}

func (e *StatementExecutor) executeShowTagKeys(q *influxql.ShowTagKeysStatement, ctx *query.ExecutionContext) error {
if q.Database == "" {
return ErrDatabaseNameRequired
}

// Determine shard set based on database and time range.
// SHOW TAG KEYS returns all tag keys for the default retention policy.
di := e.MetaClient.Database(q.Database)
if di == nil {
return fmt.Errorf("database not found: %s", q.Database)
}

if di.DefaultRetentionPolicy == "" {
return fmt.Errorf("database %s does not have default retention policy", q.Database)
}

// Determine appropriate time range. If one or fewer time boundaries provided
// then min/max possible time should be used instead.
valuer := &influxql.NowValuer{Now: time.Now()}
cond, timeRange, err := influxql.ConditionExpr(q.Condition, valuer)
if err != nil {
return err
}

sgis, err := e.MetaClient.ShardGroupsByTimeRange(di.Name, di.DefaultRetentionPolicy, timeRange.MinTime(), timeRange.MaxTime())
if err != nil {
return err
}

var shardIDs []uint64
for _, sgi := range sgis {
for _, si := range sgi.Shards {
shardIDs = append(shardIDs, si.ID)
}
}

tagKeys, err := e.TSDBStore.TagKeys(ctx.Authorizer, shardIDs, cond)
if err != nil {
return ctx.Send(&query.Result{
StatementID: ctx.StatementID,
Err: err,
})
}

emitted := false
for _, m := range tagKeys {
keys := m.Keys

if q.Offset > 0 {
if q.Offset >= len(keys) {
keys = nil
} else {
keys = keys[q.Offset:]
}
}
if q.Limit > 0 && q.Limit < len(keys) {
keys = keys[:q.Limit]
}

if len(keys) == 0 {
continue
}

row := &models.Row{
Name: m.Measurement,
Columns: []string{"tagKey"},
Values: make([][]interface{}, len(keys)),
}
for i, key := range keys {
row.Values[i] = []interface{}{key}
}

if err := ctx.Send(&query.Result{
StatementID: ctx.StatementID,
Series: []*models.Row{row},
}); err != nil {
return err
}
emitted = true
}

// Ensure at least one result is emitted.
if !emitted {
return ctx.Send(&query.Result{
StatementID: ctx.StatementID,
})
}
return nil
}

func (e *StatementExecutor) executeShowTagValues(q *influxql.ShowTagValuesStatement, ctx *query.ExecutionContext) error {
if q.Database == "" {
return ErrDatabaseNameRequired
Expand Down Expand Up @@ -1201,6 +1293,10 @@ func (e *StatementExecutor) NormalizeStatement(stmt influxql.Statement, defaultD
if node.Database == "" {
node.Database = defaultDatabase
}
case *influxql.ShowTagKeysStatement:
if node.Database == "" {
node.Database = defaultDatabase
}
case *influxql.ShowTagValuesStatement:
if node.Database == "" {
node.Database = defaultDatabase
Expand Down Expand Up @@ -1281,6 +1377,7 @@ type TSDBStore interface {
DeleteShard(id uint64) error

MeasurementNames(database string, cond influxql.Expr) ([][]byte, error)
TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)

SeriesCardinality(database string) (int64, error)
Expand Down
4 changes: 4 additions & 0 deletions internal/tsdb_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type TSDBStoreMock struct {
ShardRelativePathFn func(id uint64) (string, error)
ShardsFn func(ids []uint64) []*tsdb.Shard
StatisticsFn func(tags map[string]string) []models.Statistic
TagKeysFn func(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValuesFn func(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
WithLoggerFn func(log zap.Logger)
WriteToShardFn func(shardID uint64, points []models.Point) error
Expand Down Expand Up @@ -128,6 +129,9 @@ func (s *TSDBStoreMock) Shards(ids []uint64) []*tsdb.Shard {
func (s *TSDBStoreMock) Statistics(tags map[string]string) []models.Statistic {
return s.StatisticsFn(tags)
}
func (s *TSDBStoreMock) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) {
return s.TagKeysFn(auth, shardIDs, cond)
}
func (s *TSDBStoreMock) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) {
return s.TagValuesFn(auth, shardIDs, cond)
}
Expand Down
39 changes: 8 additions & 31 deletions query/statement_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,38 +337,15 @@ func rewriteShowTagValuesCardinalityStatement(stmt *influxql.ShowTagValuesCardin
}

func rewriteShowTagKeysStatement(stmt *influxql.ShowTagKeysStatement) (influxql.Statement, error) {
s := &influxql.SelectStatement{
Condition: stmt.Condition,
Offset: stmt.Offset,
Limit: stmt.Limit,
return &influxql.ShowTagKeysStatement{
Database: stmt.Database,
Condition: rewriteSourcesCondition(stmt.Sources, stmt.Condition),
SortFields: stmt.SortFields,
OmitTime: true,
Dedupe: true,
IsRawQuery: true,
}

// Check if we can exclusively use the index.
if !influxql.HasTimeExpr(stmt.Condition) {
s.Fields = []*influxql.Field{{Expr: &influxql.VarRef{Val: "tagKey"}}}
s.Sources = rewriteSources(stmt.Sources, "_tagKeys", stmt.Database)
s.Condition = rewriteSourcesCondition(s.Sources, stmt.Condition)
return s, nil
}

// The query is bounded by time then it will have to query TSM data rather
// than utilising the index via system iterators.
s.Fields = []*influxql.Field{
{
Expr: &influxql.Call{
Name: "distinct",
Args: []influxql.Expr{&influxql.VarRef{Val: "_tagKey"}},
},
Alias: "tagKey",
},
}

s.Sources = rewriteSources2(stmt.Sources, stmt.Database)
return s, nil
Limit: stmt.Limit,
Offset: stmt.Offset,
SLimit: stmt.SLimit,
SOffset: stmt.SOffset,
}, nil
}

func rewriteShowTagKeyCardinalityStatement(stmt *influxql.ShowTagKeyCardinalityStatement) (influxql.Statement, error) {
Expand Down
56 changes: 28 additions & 28 deletions query/statement_rewriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,115 +126,115 @@ func TestRewriteStatement(t *testing.T) {
},
{
stmt: `SHOW TAG KEYS`,
s: `SELECT tagKey FROM _tagKeys`,
s: `SHOW TAG KEYS`,
},
{
stmt: `SHOW TAG KEYS ON db0`,
s: `SELECT tagKey FROM db0.._tagKeys`,
s: `SHOW TAG KEYS ON db0`,
},
{
stmt: `SHOW TAG KEYS FROM cpu`,
s: `SELECT tagKey FROM _tagKeys WHERE _name = 'cpu'`,
s: `SHOW TAG KEYS WHERE _name = 'cpu'`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM cpu`,
s: `SELECT tagKey FROM db0.._tagKeys WHERE _name = 'cpu'`,
s: `SHOW TAG KEYS ON db0 WHERE _name = 'cpu'`,
},
{
stmt: `SHOW TAG KEYS FROM /c.*/`,
s: `SELECT tagKey FROM _tagKeys WHERE _name =~ /c.*/`,
s: `SHOW TAG KEYS WHERE _name =~ /c.*/`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM /c.*/`,
s: `SELECT tagKey FROM db0.._tagKeys WHERE _name =~ /c.*/`,
s: `SHOW TAG KEYS ON db0 WHERE _name =~ /c.*/`,
},
{
stmt: `SHOW TAG KEYS FROM cpu WHERE region = 'uswest'`,
s: `SELECT tagKey FROM _tagKeys WHERE (_name = 'cpu') AND (region = 'uswest')`,
s: `SHOW TAG KEYS WHERE (_name = 'cpu') AND (region = 'uswest')`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM cpu WHERE region = 'uswest'`,
s: `SELECT tagKey FROM db0.._tagKeys WHERE (_name = 'cpu') AND (region = 'uswest')`,
s: `SHOW TAG KEYS ON db0 WHERE (_name = 'cpu') AND (region = 'uswest')`,
},
{
stmt: `SHOW TAG KEYS FROM mydb.myrp1.cpu`,
s: `SELECT tagKey FROM mydb.myrp1._tagKeys WHERE _name = 'cpu'`,
s: `SHOW TAG KEYS WHERE _name = 'cpu'`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM mydb.myrp1.cpu`,
s: `SELECT tagKey FROM mydb.myrp1._tagKeys WHERE _name = 'cpu'`,
s: `SHOW TAG KEYS ON db0 WHERE _name = 'cpu'`,
},
{
stmt: `SHOW TAG KEYS FROM mydb.myrp1./c.*/`,
s: `SELECT tagKey FROM mydb.myrp1._tagKeys WHERE _name =~ /c.*/`,
s: `SHOW TAG KEYS WHERE _name =~ /c.*/`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM mydb.myrp1./c.*/`,
s: `SELECT tagKey FROM mydb.myrp1._tagKeys WHERE _name =~ /c.*/`,
s: `SHOW TAG KEYS ON db0 WHERE _name =~ /c.*/`,
},
{
stmt: `SHOW TAG KEYS FROM mydb.myrp1.cpu WHERE region = 'uswest'`,
s: `SELECT tagKey FROM mydb.myrp1._tagKeys WHERE (_name = 'cpu') AND (region = 'uswest')`,
s: `SHOW TAG KEYS WHERE (_name = 'cpu') AND (region = 'uswest')`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM mydb.myrp1.cpu WHERE region = 'uswest'`,
s: `SELECT tagKey FROM mydb.myrp1._tagKeys WHERE (_name = 'cpu') AND (region = 'uswest')`,
s: `SHOW TAG KEYS ON db0 WHERE (_name = 'cpu') AND (region = 'uswest')`,
},
{
stmt: `SHOW TAG KEYS WHERE time > 0`,
s: `SELECT distinct(_tagKey) AS tagKey FROM /.+/ WHERE time > 0`,
s: `SHOW TAG KEYS WHERE time > 0`,
},
{
stmt: `SHOW TAG KEYS ON db0 WHERE time > 0`,
s: `SELECT distinct(_tagKey) AS tagKey FROM db0../.+/ WHERE time > 0`,
s: `SHOW TAG KEYS ON db0 WHERE time > 0`,
},
{
stmt: `SHOW TAG KEYS FROM cpu WHERE time > 0`,
s: `SELECT distinct(_tagKey) AS tagKey FROM cpu WHERE time > 0`,
s: `SHOW TAG KEYS WHERE (_name = 'cpu') AND (time > 0)`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM cpu WHERE time > 0`,
s: `SELECT distinct(_tagKey) AS tagKey FROM db0..cpu WHERE time > 0`,
s: `SHOW TAG KEYS ON db0 WHERE (_name = 'cpu') AND (time > 0)`,
},
{
stmt: `SHOW TAG KEYS FROM /c.*/ WHERE time > 0`,
s: `SELECT distinct(_tagKey) AS tagKey FROM /c.*/ WHERE time > 0`,
s: `SHOW TAG KEYS WHERE (_name =~ /c.*/) AND (time > 0)`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM /c.*/ WHERE time > 0`,
s: `SELECT distinct(_tagKey) AS tagKey FROM db0../c.*/ WHERE time > 0`,
s: `SHOW TAG KEYS ON db0 WHERE (_name =~ /c.*/) AND (time > 0)`,
},
{
stmt: `SHOW TAG KEYS FROM cpu WHERE region = 'uswest' AND time > 0`,
s: `SELECT distinct(_tagKey) AS tagKey FROM cpu WHERE region = 'uswest' AND time > 0`,
s: `SHOW TAG KEYS WHERE (_name = 'cpu') AND (region = 'uswest' AND time > 0)`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM cpu WHERE region = 'uswest' AND time > 0`,
s: `SELECT distinct(_tagKey) AS tagKey FROM db0..cpu WHERE region = 'uswest' AND time > 0`,
s: `SHOW TAG KEYS ON db0 WHERE (_name = 'cpu') AND (region = 'uswest' AND time > 0)`,
},
{
stmt: `SHOW TAG KEYS FROM mydb.myrp1.cpu WHERE time > 0`,
s: `SELECT distinct(_tagKey) AS tagKey FROM mydb.myrp1.cpu WHERE time > 0`,
s: `SHOW TAG KEYS WHERE (_name = 'cpu') AND (time > 0)`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM mydb.myrp1.cpu WHERE time > 0`,
s: `SELECT distinct(_tagKey) AS tagKey FROM mydb.myrp1.cpu WHERE time > 0`,
s: `SHOW TAG KEYS ON db0 WHERE (_name = 'cpu') AND (time > 0)`,
},
{
stmt: `SHOW TAG KEYS FROM mydb.myrp1./c.*/ WHERE time > 0`,
s: `SELECT distinct(_tagKey) AS tagKey FROM mydb.myrp1./c.*/ WHERE time > 0`,
s: `SHOW TAG KEYS WHERE (_name =~ /c.*/) AND (time > 0)`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM mydb.myrp1./c.*/ WHERE time > 0`,
s: `SELECT distinct(_tagKey) AS tagKey FROM mydb.myrp1./c.*/ WHERE time > 0`,
s: `SHOW TAG KEYS ON db0 WHERE (_name =~ /c.*/) AND (time > 0)`,
},
{
stmt: `SHOW TAG KEYS FROM mydb.myrp1.cpu WHERE region = 'uswest' AND time > 0`,
s: `SELECT distinct(_tagKey) AS tagKey FROM mydb.myrp1.cpu WHERE region = 'uswest' AND time > 0`,
s: `SHOW TAG KEYS WHERE (_name = 'cpu') AND (region = 'uswest' AND time > 0)`,
},
{
stmt: `SHOW TAG KEYS ON db0 FROM mydb.myrp1.cpu WHERE region = 'uswest' AND time > 0`,
s: `SELECT distinct(_tagKey) AS tagKey FROM mydb.myrp1.cpu WHERE region = 'uswest' AND time > 0`,
s: `SHOW TAG KEYS ON db0 WHERE (_name = 'cpu') AND (region = 'uswest' AND time > 0)`,
},
{
stmt: `SHOW TAG VALUES WITH KEY = "region"`,
Expand Down
12 changes: 4 additions & 8 deletions tests/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7868,12 +7868,6 @@ func TestServer_Query_ShowTagKeys(t *testing.T) {
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["tagKey"],"values":[["host"],["region"]]},{"name":"gpu","columns":["tagKey"],"values":[["host"],["region"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "show tag keys where",
command: "SHOW TAG KEYS WHERE host = 'server03'",
exp: `{"results":[{"statement_id":0,"series":[{"name":"disk","columns":["tagKey"],"values":[["host"],["region"]]},{"name":"gpu","columns":["tagKey"],"values":[["host"],["region"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "show tag keys measurement not found",
command: "SHOW TAG KEYS FROM doesntexist",
Expand Down Expand Up @@ -7917,12 +7911,14 @@ func TestServer_Query_ShowTagKeys(t *testing.T) {
},
}...)

for i, query := range test.queries {
var initialized bool
for _, query := range test.queries {
t.Run(query.name, func(t *testing.T) {
if i == 0 {
if !initialized {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
initialized = true
}
if query.skip {
t.Skipf("SKIP:: %s", query.name)
Expand Down
8 changes: 8 additions & 0 deletions tsdb/index/inmem/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,14 @@ func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error

// tagKeysByExpr extracts the tag keys wanted by the expression.
func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (map[string]struct{}, error) {
if expr == nil {
set := make(map[string]struct{})
for _, key := range m.TagKeys() {
set[key] = struct{}{}
}
return set, nil
}

switch e := expr.(type) {
case *influxql.BinaryExpr:
switch e.Op {
Expand Down
Loading