From 8bd24083200112b6db00281b4e3b65aed1d7251d Mon Sep 17 00:00:00 2001 From: David Norton Date: Tue, 25 Aug 2015 14:00:07 -0400 Subject: [PATCH 1/4] convert SHOW TAG KEYS to distributed query --- cmd/influxd/run/server_test.go | 4 +- tsdb/executor_test.go | 105 +++++++++++- tsdb/query_executor.go | 87 +++++++++- tsdb/query_executor_test.go | 5 +- tsdb/show_tag_keys.go | 301 +++++++++++++++++++++++++++++++++ tsdb/store.go | 2 + 6 files changed, 492 insertions(+), 12 deletions(-) create mode 100644 tsdb/show_tag_keys.go diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index 6639f378f00..28e1ee9ee24 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -3849,8 +3849,8 @@ func TestServer_Query_ShowTagKeys(t *testing.T) { }, &Query{ name: "show tag keys measurement not found", - command: "SHOW TAG KEYS FROM bad", - exp: `{"results":[{"error":"measurement not found: bad"}]}`, + command: "SHOW TAG KEYS FROM doesntexist", + exp: `{"results":[{}]}`, params: url.Values{"db": []string{"db0"}}, }, &Query{ diff --git a/tsdb/executor_test.go b/tsdb/executor_test.go index a771b8cfcd3..cbae64959b7 100644 --- a/tsdb/executor_test.go +++ b/tsdb/executor_test.go @@ -434,7 +434,7 @@ func TestWritePointsAndExecuteTwoShardsTagSetOrdering(t *testing.T) { } // Test to ensure the engine handles measurements across stores. -func TestWritePointsAndExecuteTwoShardsShowMeasurements(t *testing.T) { +func TestShowMeasurementsMultipleShards(t *testing.T) { // Create two distinct stores, ensuring shard mappers will shard nothing. store0 := testStore() defer os.RemoveAll(store0.Path()) @@ -513,6 +513,109 @@ func TestWritePointsAndExecuteTwoShardsShowMeasurements(t *testing.T) { } } +// Test to ensure the engine handles tag keys across stores. +func TestShowShowTagKeysMultipleShards(t *testing.T) { + // Create two distinct stores, ensuring shard mappers will shard nothing. + store0 := testStore() + defer os.RemoveAll(store0.Path()) + store1 := testStore() + defer os.RemoveAll(store1.Path()) + + // Create a shard in each store. + database := "foo" + retentionPolicy := "bar" + store0.CreateShard(database, retentionPolicy, sID0) + store1.CreateShard(database, retentionPolicy, sID1) + + // Write two points across shards. + pt1time := time.Unix(1, 0).UTC() + if err := store0.WriteToShard(sID0, []tsdb.Point{ + tsdb.NewPoint( + "cpu", + map[string]string{"host": "serverA", "region": "uswest"}, + map[string]interface{}{"value1": 100}, + pt1time, + ), + tsdb.NewPoint( + "cpu", + map[string]string{"host": "serverB", "region": "useast"}, + map[string]interface{}{"value1": 100}, + pt1time, + ), + }); err != nil { + t.Fatalf(err.Error()) + } + pt2time := time.Unix(2, 0).UTC() + if err := store1.WriteToShard(sID1, []tsdb.Point{ + tsdb.NewPoint( + "cpu", + map[string]string{"host": "serverB", "region": "useast", "rack": "12"}, + map[string]interface{}{"value1": 100}, + pt1time, + ), + tsdb.NewPoint( + "mem", + map[string]string{"host": "serverB"}, + map[string]interface{}{"value2": 200}, + pt2time, + )}); err != nil { + t.Fatalf(err.Error()) + } + var tests = []struct { + skip bool // Skip test + stmt string // Query statement + chunkSize int // Chunk size for driving the executor + expected string // Expected results, rendered as a string + }{ + { + stmt: `SHOW TAG KEYS`, + expected: `[{"name":"cpu","columns":["tagKey"],"values":[["host"],["rack"],["region"]]},{"name":"mem","columns":["tagKey"],"values":[["host"]]}]`, + }, + { + stmt: `SHOW TAG KEYS FROM cpu`, + expected: `[{"name":"cpu","columns":["tagKey"],"values":[["host"],["rack"],["region"]]}]`, + }, + { + stmt: `SHOW TAG KEYS FROM cpu WHERE region = 'uswest'`, + expected: `[{"name":"cpu","columns":["tagKey"],"values":[["host"],["region"]]}]`, + }, + { + stmt: `SHOW TAG KEYS FROM doesntexist`, + expected: `null`, + }, + { + stmt: `SHOW TAG KEYS FROM cpu WHERE region = 'doesntexist'`, + expected: `null`, + }, + } + for _, tt := range tests { + if tt.skip { + t.Logf("Skipping test %s", tt.stmt) + continue + } + + parsedStmt := mustParseStatement(tt.stmt).(*influxql.ShowTagKeysStatement) + + // Create Mappers and Executor. + mapper0, err := store0.CreateMapper(sID0, parsedStmt, tt.chunkSize) + if err != nil { + t.Fatalf("failed to create mapper0: %s", err.Error()) + } + mapper1, err := store1.CreateMapper(sID1, parsedStmt, tt.chunkSize) + if err != nil { + t.Fatalf("failed to create mapper1: %s", err.Error()) + } + executor := tsdb.NewShowTagKeysExecutor(parsedStmt, []tsdb.Mapper{mapper0, mapper1}, tt.chunkSize) + + // Check the results. + got := executeAndGetResults(executor) + if got != tt.expected { + t.Fatalf("Test %s\nexp: %s\ngot: %s\n", tt.stmt, tt.expected, got) + } + + } +} + // TestProccessAggregateDerivative tests the RawQueryDerivativeProcessor transformation function on the engine. // The is called for a query with a GROUP BY. func TestProcessAggregateDerivative(t *testing.T) { diff --git a/tsdb/query_executor.go b/tsdb/query_executor.go index 7266f9c3997..0767775d984 100644 --- a/tsdb/query_executor.go +++ b/tsdb/query_executor.go @@ -159,7 +159,7 @@ func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu var res *influxql.Result switch stmt := stmt.(type) { case *influxql.SelectStatement: - if err := q.executeSelectStatement(i, stmt, results, chunkSize); err != nil { + if err := q.executeStatement(i, stmt, database, results, chunkSize); err != nil { results <- &influxql.Result{Err: err} break } @@ -172,12 +172,15 @@ func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu // TODO: handle this in a cluster res = q.executeDropMeasurementStatement(stmt, database) case *influxql.ShowMeasurementsStatement: - if err := q.executeShowMeasurementsStatement(i, stmt, database, results, chunkSize); err != nil { + if err := q.executeStatement(i, stmt, database, results, chunkSize); err != nil { results <- &influxql.Result{Err: err} break } case *influxql.ShowTagKeysStatement: - res = q.executeShowTagKeysStatement(stmt, database) + if err := q.executeStatement(i, stmt, database, results, chunkSize); err != nil { + results <- &influxql.Result{Err: err} + break + } case *influxql.ShowTagValuesStatement: res = q.executeShowTagValuesStatement(stmt, database) case *influxql.ShowFieldKeysStatement: @@ -570,7 +573,20 @@ func (q *QueryExecutor) filterShowSeriesResult(limit, offset int, rows models.Ro return filteredSeries } -// PlanShowMeasurements creates an execution plan for the given SelectStatement and returns an Executor. +func (q *QueryExecutor) planStatement(stmt influxql.Statement, database string, chunkSize int) (Executor, error) { + switch stmt := stmt.(type) { + case *influxql.SelectStatement: + return q.PlanSelect(stmt, chunkSize) + case *influxql.ShowMeasurementsStatement: + return q.PlanShowMeasurements(stmt, database, chunkSize) + case *influxql.ShowTagKeysStatement: + return q.PlanShowTagKeys(stmt, database, chunkSize) + default: + return nil, fmt.Errorf("can't plan statement type: %v", stmt) + } +} + +// PlanShowMeasurements creates an execution plan for a SHOW TAG KEYS statement and returns an Executor. func (q *QueryExecutor) PlanShowMeasurements(stmt *influxql.ShowMeasurementsStatement, database string, chunkSize int) (Executor, error) { // Get the database info. di, err := q.MetaStore.Database(database) @@ -601,8 +617,65 @@ func (q *QueryExecutor) PlanShowMeasurements(stmt *influxql.ShowMeasurementsStat return executor, nil } -func (q *QueryExecutor) executeShowMeasurementsStatement(statementID int, stmt *influxql.ShowMeasurementsStatement, database string, results chan *influxql.Result, chunkSize int) error { +// PlanShowTagKeys creates an execution plan for a SHOW MEASUREMENTS statement and returns an Executor. +func (q *QueryExecutor) PlanShowTagKeys(stmt *influxql.ShowTagKeysStatement, database string, chunkSize int) (Executor, error) { + // Get the database info. + di, err := q.MetaStore.Database(database) + if err != nil { + return nil, err + } else if di == nil { + return nil, ErrDatabaseNotFound(database) + } + + // Get info for all shards in the database. + shards := di.ShardInfos() + + // Build the Mappers, one per shard. + mappers := []Mapper{} + for _, sh := range shards { + m, err := q.ShardMapper.CreateMapper(sh, stmt, chunkSize) + if err != nil { + return nil, err + } + if m == nil { + // No data for this shard, skip it. + continue + } + mappers = append(mappers, m) + } + + executor := NewShowTagKeysExecutor(stmt, mappers, chunkSize) + return executor, nil +} + +func (q *QueryExecutor) executeStatement(statementID int, stmt influxql.Statement, database string, results chan *influxql.Result, chunkSize int) error { // Plan statement execution. + e, err := q.planStatement(stmt, database, chunkSize) + if err != nil { + return err + } + + // Execute plan. + ch := e.Execute() + + // Stream results from the channel. We should send an empty result if nothing comes through. + resultSent := false + for row := range ch { + if row.Err != nil { + return row.Err + } + resultSent = true + results <- &influxql.Result{StatementID: statementID, Series: []*influxql.Row{row}} + } + + if !resultSent { + results <- &influxql.Result{StatementID: statementID, Series: make([]*influxql.Row, 0)} + } + + return nil +} + +func (q *QueryExecutor) executeShowMeasurementsStatement(statementID int, stmt *influxql.ShowMeasurementsStatement, database string, results chan *influxql.Result, chunkSize int) error { // Plan statement execution. e, err := q.PlanShowMeasurements(stmt, database, chunkSize) if err != nil { return err @@ -628,7 +701,7 @@ func (q *QueryExecutor) executeShowMeasurementsStatement(statementID int, stmt * return nil } -func (q *QueryExecutor) executeShowTagKeysStatement(stmt *influxql.ShowTagKeysStatement, database string) *influxql.Result { +func (q *QueryExecutor) executeShowTagKeysStatemen(stmt *influxql.ShowTagKeysStatement, database string) *influxql.Result { // Find the database. db := q.Store.DatabaseIndex(database) if db == nil { @@ -817,7 +890,7 @@ func measurementsFromSourcesOrDB(db *DatabaseIndex, sources ...influxql.Source) if m, ok := source.(*influxql.Measurement); ok { measurement := db.measurements[m.Name] if measurement == nil { - return nil, ErrMeasurementNotFound(m.Name) + continue } measurements = append(measurements, measurement) diff --git a/tsdb/query_executor_test.go b/tsdb/query_executor_test.go index 83ddb2a426a..d2795e2bfc1 100644 --- a/tsdb/query_executor_test.go +++ b/tsdb/query_executor_test.go @@ -144,7 +144,8 @@ func TestDropSeriesStatement(t *testing.T) { } got = executeAndGetJSON("show tag keys from cpu", executor) - exepected = `[{"series":[{"name":"cpu","columns":["tagKey"]}]}]` + exepected = `[{}]` + //exepected = `[{"series":[{"name":"cpu","columns":["tagKey"]}]}]` if exepected != got { t.Fatalf("exp: %s\ngot: %s", exepected, got) } @@ -163,7 +164,7 @@ func TestDropSeriesStatement(t *testing.T) { } got = executeAndGetJSON("show tag keys from cpu", executor) - exepected = `[{"series":[{"name":"cpu","columns":["tagKey"]}]}]` + exepected = `[{}]` if exepected != got { t.Fatalf("exp: %s\ngot: %s", exepected, got) } diff --git a/tsdb/show_tag_keys.go b/tsdb/show_tag_keys.go new file mode 100644 index 00000000000..3f3e1939ac5 --- /dev/null +++ b/tsdb/show_tag_keys.go @@ -0,0 +1,301 @@ +package tsdb + +import ( + "encoding/json" + "fmt" + "sort" + + "github.com/influxdb/influxdb/influxql" +) + +// ShowTagKeysExecutor implements the Executor interface for a SHOW MEASUREMENTS statement. +type ShowTagKeysExecutor struct { + stmt *influxql.ShowTagKeysStatement + mappers []Mapper + chunkSize int +} + +// NewShowTagKeysExecutor returns a new ShowTagKeysExecutor. +func NewShowTagKeysExecutor(stmt *influxql.ShowTagKeysStatement, mappers []Mapper, chunkSize int) *ShowTagKeysExecutor { + return &ShowTagKeysExecutor{ + stmt: stmt, + mappers: mappers, + chunkSize: chunkSize, + } +} + +// Execute begins execution of the query and returns a channel to receive rows. +func (e *ShowTagKeysExecutor) Execute() <-chan *influxql.Row { + // Create output channel and stream data in a separate goroutine. + out := make(chan *influxql.Row, 0) + + // It's important that all resources are released when execution completes. + defer e.close() + + go func() { + defer close(out) + // Open the mappers. + for _, m := range e.mappers { + if err := m.Open(); err != nil { + out <- &influxql.Row{Err: err} + return + } + } + + // Create a map of measurement to tags keys. + set := map[string]map[string]struct{}{} + // Iterate through mappers collecting measurement names. + for _, m := range e.mappers { + // Read all data from the mapper. + for { + c, err := m.NextChunk() + if err != nil { + out <- &influxql.Row{Err: err} + return + } else if c == nil { + // Mapper has been drained. + break + } + + // Convert the mapper chunk to an array of measurements with tag keys. + mtks, ok := c.(MeasurementsTagKeys) + if !ok { + out <- &influxql.Row{Err: fmt.Errorf("show tag keys mapper returned invalid type: %T", c)} + return + } + + // Merge mapper chunk with previous mapper outputs. + for _, mm := range mtks { + for _, key := range mm.TagKeys { + if set[mm.Measurement] == nil { + set[mm.Measurement] = map[string]struct{}{} + } + set[mm.Measurement][key] = struct{}{} + } + } + } + } + + // All mappers are drained. + + // Convert the set into an array of measurements and their tag keys. + mstks := make(MeasurementsTagKeys, 0) + for mm, tks := range set { + mtks := &MeasurementTagKeys{Measurement: mm} + for tk := range tks { + mtks.TagKeys = append(mtks.TagKeys, tk) + } + sort.Strings(mtks.TagKeys) + mstks = append(mstks, mtks) + } + // Sort by measurement name. + sort.Sort(mstks) + + // Calculate OFFSET and LIMIT. + off := e.stmt.Offset + lim := len(mstks) + stmtLim := e.stmt.Limit + + if stmtLim > 0 && off+stmtLim < lim { + lim = off + stmtLim + } else if off > lim { + off, lim = 0, 0 + } + + // Send results. + for _, mtks := range mstks[off:lim] { + row := &influxql.Row{ + Name: mtks.Measurement, + Columns: []string{"tagKey"}, + Values: make([][]interface{}, 0, len(mtks.TagKeys)), + } + + for _, tk := range mtks.TagKeys { + v := []interface{}{tk} + row.Values = append(row.Values, v) + } + + out <- row + } + }() + return out +} + +// Close closes the executor such that all resources are released. Once closed, +// an executor may not be re-used. +func (e *ShowTagKeysExecutor) close() { + if e != nil { + for _, m := range e.mappers { + m.Close() + } + } +} + +// ShowTagKeysMapper is a mapper for collecting measurement names from a shard. +type ShowTagKeysMapper struct { + remote Mapper + shard *Shard + stmt *influxql.ShowTagKeysStatement + chunkSize int + state interface{} +} + +// NewShowTagKeysMapper returns a mapper for the given shard, which will return data for the meta statement. +func NewShowTagKeysMapper(shard *Shard, stmt *influxql.ShowTagKeysStatement, chunkSize int) *ShowTagKeysMapper { + return &ShowTagKeysMapper{ + shard: shard, + stmt: stmt, + chunkSize: chunkSize, + } +} + +// MeasurementTagKeys represents measurement tag keys. +type MeasurementTagKeys struct { + Measurement string `json:"measurement"` + TagKeys []string `json:"tagkeys"` +} + +// MeasurementsTagKeys represents tag keys for multiple measurements. +type MeasurementsTagKeys []*MeasurementTagKeys + +func (a MeasurementsTagKeys) Len() int { return len(a) } +func (a MeasurementsTagKeys) Less(i, j int) bool { return a[i].Measurement < a[j].Measurement } +func (a MeasurementsTagKeys) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +// Size returns the total string length of measurement names & tag keys. +func (a MeasurementsTagKeys) Size() int { + n := 0 + for _, m := range a { + n += len(m.Measurement) + for _, k := range m.TagKeys { + n += len(k) + } + } + return n +} + +// Open opens the mapper for use. +func (m *ShowTagKeysMapper) Open() error { + if m.remote != nil { + return m.remote.Open() + } + + // This can happen when a shard has been assigned to this node but we have not + // written to it so it may not exist yet. + if m.shard == nil { + return nil + } + + sources := influxql.Sources{} + + // Expand regex expressions in the FROM clause. + if m.stmt.Sources != nil { + var err error + sources, err = expandSources(m.stmt.Sources, m.shard.index) + if err != nil { + return err + } + } + + // Get measurements from sources in the statement if provided or database if not. + measurements, err := measurementsFromSourcesOrDB(m.shard.index, sources...) + if err != nil { + return err + } + + // If a WHERE clause was specified, filter the measurements. + if m.stmt.Condition != nil { + var err error + whereMs, err := m.shard.index.measurementsByExpr(m.stmt.Condition) + if err != nil { + return err + } + + sort.Sort(whereMs) + + measurements = measurements.intersect(whereMs) + } + + // Create a channel to send measurement names on. + ch := make(chan *MeasurementTagKeys) + // Start a goroutine to send the names over the channel as needed. + go func() { + for _, mm := range measurements { + ch <- &MeasurementTagKeys{ + Measurement: mm.Name, + TagKeys: mm.TagKeys(), + } + } + close(ch) + }() + + // Store the channel as the state of the mapper. + m.state = ch + + return nil +} + +// SetRemote sets the remote mapper to use. +func (m *ShowTagKeysMapper) SetRemote(remote Mapper) error { + m.remote = remote + return nil +} + +// TagSets is only implemented on this mapper to satisfy the Mapper interface. +func (m *ShowTagKeysMapper) TagSets() []string { return nil } + +// Fields returns a list of field names for this mapper. +func (m *ShowTagKeysMapper) Fields() []string { return []string{"tagKey"} } + +// NextChunk returns the next chunk of measurements and tag keys. +func (m *ShowTagKeysMapper) NextChunk() (interface{}, error) { + if m.remote != nil { + b, err := m.remote.NextChunk() + if err != nil { + return nil, err + } else if b == nil { + return nil, nil + } + + mtks := []*MeasurementTagKeys{} + if err := json.Unmarshal(b.([]byte), &mtks); err != nil { + return nil, err + } else if len(mtks) == 0 { + // Mapper on other node sent 0 values so it's done. + return nil, nil + } + return mtks, nil + } + return m.nextChunk() +} + +// nextChunk implements next chunk logic for a local shard. +func (m *ShowTagKeysMapper) nextChunk() (interface{}, error) { + // Get the channel of measurement tag keys from the state. + ch, ok := m.state.(chan *MeasurementTagKeys) + if !ok { + return nil, nil + } + // Allocate array to hold measurement names. + mtks := make(MeasurementsTagKeys, 0) + // Get the next chunk of tag keys. + for n := range ch { + mtks = append(mtks, n) + if mtks.Size() >= m.chunkSize { + break + } + } + // See if we've read all the names. + if len(mtks) == 0 { + return nil, nil + } + + return mtks, nil +} + +// Close closes the mapper. +func (m *ShowTagKeysMapper) Close() { + if m.remote != nil { + m.remote.Close() + } +} diff --git a/tsdb/store.go b/tsdb/store.go index b37568dd3ab..57e7a037ee8 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -348,6 +348,8 @@ func (s *Store) CreateMapper(shardID uint64, stmt influxql.Statement, chunkSize return NewSelectMapper(shard, st, chunkSize), nil case *influxql.ShowMeasurementsStatement: return NewShowMeasurementsMapper(shard, st, chunkSize), nil + case *influxql.ShowTagKeysStatement: + return NewShowTagKeysMapper(shard, st, chunkSize), nil default: return nil, fmt.Errorf("can't create mapper for statement type: %v", st) } From 8e236532e839f4a8e274de7569d50c0424ecd196 Mon Sep 17 00:00:00 2001 From: David Norton Date: Tue, 15 Sep 2015 13:06:06 -0400 Subject: [PATCH 2/4] delete unused executeShowTagKeysStatement func --- tsdb/query_executor.go | 53 ------------------------------------------ 1 file changed, 53 deletions(-) diff --git a/tsdb/query_executor.go b/tsdb/query_executor.go index 0767775d984..d6d4cc69d66 100644 --- a/tsdb/query_executor.go +++ b/tsdb/query_executor.go @@ -701,59 +701,6 @@ func (q *QueryExecutor) executeShowMeasurementsStatement(statementID int, stmt * return nil } -func (q *QueryExecutor) executeShowTagKeysStatemen(stmt *influxql.ShowTagKeysStatement, database string) *influxql.Result { - // Find the database. - db := q.Store.DatabaseIndex(database) - if db == nil { - return &influxql.Result{} - } - - // Expand regex expressions in the FROM clause. - sources, err := q.expandSources(stmt.Sources) - if err != nil { - return &influxql.Result{Err: err} - } - - // Get the list of measurements we're interested in. - measurements, err := measurementsFromSourcesOrDB(db, sources...) - if err != nil { - return &influxql.Result{Err: err} - } - - // Make result. - result := &influxql.Result{ - Series: make(models.Rows, 0, len(measurements)), - } - - // Add one row per measurement to the result. - for _, m := range measurements { - // TODO: filter tag keys by stmt.Condition - - // Get the tag keys in sorted order. - keys := m.TagKeys() - - // Convert keys to an [][]interface{}. - values := make([][]interface{}, 0, len(m.seriesByTagKeyValue)) - for _, k := range keys { - v := interface{}(k) - values = append(values, []interface{}{v}) - } - - // Make a result row for the measurement. - r := &models.Row{ - Name: m.Name, - Columns: []string{"tagKey"}, - Values: values, - } - - result.Series = append(result.Series, r) - } - - // TODO: LIMIT & OFFSET - - return result -} - func (q *QueryExecutor) executeShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) *influxql.Result { // Find the database. db := q.Store.DatabaseIndex(database) From e176d2c0ddee68cfc9be052d780133a1cde95ed8 Mon Sep 17 00:00:00 2001 From: David Norton Date: Wed, 16 Sep 2015 11:44:05 -0400 Subject: [PATCH 3/4] make SHOW TAG KEYS support (S)LIMIT & (S)OFFSET --- influxql/ast.go | 19 +++++++++++--- influxql/parser.go | 10 +++++++ influxql/parser_test.go | 12 +++++++++ tsdb/executor_test.go | 42 +++++++++++++++++++++++------- tsdb/query_executor.go | 4 +-- tsdb/query_executor_test.go | 1 - tsdb/show_tag_keys.go | 52 +++++++++++++++++++++++-------------- 7 files changed, 106 insertions(+), 34 deletions(-) diff --git a/influxql/ast.go b/influxql/ast.go index 56eb8683e34..0120ed4d86f 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -2006,12 +2006,17 @@ type ShowTagKeysStatement struct { // Fields to sort results by SortFields SortFields - // Maximum number of rows to be returned. - // Unlimited if zero. + // Maximum number of tag keys per measurement. Unlimited if zero. Limit int - // Returns rows starting at an offset from the first row. + // Returns tag keys starting at an offset from the first row. Offset int + + // Maxiumum number of series to be returned. Unlimited if zero. + SLimit int + + // Returns series starting at an offset from the first one. + SOffset int } // String returns a string representation of the statement. @@ -2039,6 +2044,14 @@ func (s *ShowTagKeysStatement) String() string { _, _ = buf.WriteString(" OFFSET ") _, _ = buf.WriteString(strconv.Itoa(s.Offset)) } + if s.SLimit > 0 { + _, _ = buf.WriteString(" SLIMIT ") + _, _ = buf.WriteString(strconv.Itoa(s.SLimit)) + } + if s.SOffset > 0 { + _, _ = buf.WriteString(" SOFFSET ") + _, _ = buf.WriteString(strconv.Itoa(s.SOffset)) + } return buf.String() } diff --git a/influxql/parser.go b/influxql/parser.go index 4c55e5e8456..8dbd964d2a3 100644 --- a/influxql/parser.go +++ b/influxql/parser.go @@ -979,6 +979,16 @@ func (p *Parser) parseShowTagKeysStatement() (*ShowTagKeysStatement, error) { return nil, err } + // Parse series limit: "SLIMIT ". + if stmt.SLimit, err = p.parseOptionalTokenAndInt(SLIMIT); err != nil { + return nil, err + } + + // Parse series offset: "SOFFSET ". + if stmt.SOffset, err = p.parseOptionalTokenAndInt(SOFFSET); err != nil { + return nil, err + } + return stmt, nil } diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 94944398a75..4d7ae3458f4 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -751,6 +751,15 @@ func TestParser_ParseStatement(t *testing.T) { }, }, + // SHOW TAG KEYS with LIMIT + { + s: `SHOW TAG KEYS FROM src LIMIT 2`, + stmt: &influxql.ShowTagKeysStatement{ + Sources: []influxql.Source{&influxql.Measurement{Name: "src"}}, + Limit: 2, + }, + }, + // SHOW TAG KEYS FROM // { s: `SHOW TAG KEYS FROM /[cg]pu/`, @@ -1554,6 +1563,9 @@ func TestParser_ParseStatement(t *testing.T) { } for i, tt := range tests { + if tt.s != `SHOW TAG KEYS FROM src LIMIT 2` { + continue + } if tt.skip { t.Logf("skipping test of '%s'", tt.s) continue diff --git a/tsdb/executor_test.go b/tsdb/executor_test.go index cbae64959b7..f5aa1e498f0 100644 --- a/tsdb/executor_test.go +++ b/tsdb/executor_test.go @@ -252,7 +252,7 @@ func TestWritePointsAndExecuteTwoShardsAlign(t *testing.T) { // Test to ensure the engine handles query re-writing across stores. func TestWritePointsAndExecuteTwoShardsQueryRewrite(t *testing.T) { - // Create two distinct stores, ensuring shard mappers will shard nothing. + // Create two distinct stores, ensuring shard mappers will share nothing. store0 := testStore() defer os.RemoveAll(store0.Path()) store1 := testStore() @@ -435,7 +435,7 @@ func TestWritePointsAndExecuteTwoShardsTagSetOrdering(t *testing.T) { // Test to ensure the engine handles measurements across stores. func TestShowMeasurementsMultipleShards(t *testing.T) { - // Create two distinct stores, ensuring shard mappers will shard nothing. + // Create two distinct stores, ensuring shard mappers will share nothing. store0 := testStore() defer os.RemoveAll(store0.Path()) store1 := testStore() @@ -515,7 +515,7 @@ func TestShowMeasurementsMultipleShards(t *testing.T) { // Test to ensure the engine handles tag keys across stores. func TestShowShowTagKeysMultipleShards(t *testing.T) { - // Create two distinct stores, ensuring shard mappers will shard nothing. + // Create two distinct stores, ensuring shard mappers will share nothing. store0 := testStore() defer os.RemoveAll(store0.Path()) store1 := testStore() @@ -529,14 +529,14 @@ func TestShowShowTagKeysMultipleShards(t *testing.T) { // Write two points across shards. pt1time := time.Unix(1, 0).UTC() - if err := store0.WriteToShard(sID0, []tsdb.Point{ - tsdb.NewPoint( + if err := store0.WriteToShard(sID0, []models.Point{ + models.NewPoint( "cpu", map[string]string{"host": "serverA", "region": "uswest"}, map[string]interface{}{"value1": 100}, pt1time, ), - tsdb.NewPoint( + models.NewPoint( "cpu", map[string]string{"host": "serverB", "region": "useast"}, map[string]interface{}{"value1": 100}, @@ -546,14 +546,14 @@ func TestShowShowTagKeysMultipleShards(t *testing.T) { t.Fatalf(err.Error()) } pt2time := time.Unix(2, 0).UTC() - if err := store1.WriteToShard(sID1, []tsdb.Point{ - tsdb.NewPoint( + if err := store1.WriteToShard(sID1, []models.Point{ + models.NewPoint( "cpu", map[string]string{"host": "serverB", "region": "useast", "rack": "12"}, map[string]interface{}{"value1": 100}, pt1time, ), - tsdb.NewPoint( + models.NewPoint( "mem", map[string]string{"host": "serverB"}, map[string]interface{}{"value2": 200}, @@ -571,6 +571,30 @@ func TestShowShowTagKeysMultipleShards(t *testing.T) { stmt: `SHOW TAG KEYS`, expected: `[{"name":"cpu","columns":["tagKey"],"values":[["host"],["rack"],["region"]]},{"name":"mem","columns":["tagKey"],"values":[["host"]]}]`, }, + { + stmt: `SHOW TAG KEYS SLIMIT 1`, + expected: `[{"name":"cpu","columns":["tagKey"],"values":[["host"],["rack"],["region"]]}]`, + }, + { + stmt: `SHOW TAG KEYS SLIMIT 1 SOFFSET 1`, + expected: `[{"name":"mem","columns":["tagKey"],"values":[["host"]]}]`, + }, + { + stmt: `SHOW TAG KEYS SOFFSET 1`, + expected: `[{"name":"mem","columns":["tagKey"],"values":[["host"]]}]`, + }, + { + stmt: `SHOW TAG KEYS LIMIT 1`, + expected: `[{"name":"cpu","columns":["tagKey"],"values":[["host"]]},{"name":"mem","columns":["tagKey"],"values":[["host"]]}]`, + }, + { + stmt: `SHOW TAG KEYS LIMIT 1 OFFSET 1`, + expected: `[{"name":"cpu","columns":["tagKey"],"values":[["rack"]]},{"name":"mem","columns":["tagKey"]}]`, + }, + { + stmt: `SHOW TAG KEYS OFFSET 1`, + expected: `[{"name":"cpu","columns":["tagKey"],"values":[["rack"],["region"]]},{"name":"mem","columns":["tagKey"]}]`, + }, { stmt: `SHOW TAG KEYS FROM cpu`, expected: `[{"name":"cpu","columns":["tagKey"],"values":[["host"],["rack"],["region"]]}]`, diff --git a/tsdb/query_executor.go b/tsdb/query_executor.go index d6d4cc69d66..2ad33be2781 100644 --- a/tsdb/query_executor.go +++ b/tsdb/query_executor.go @@ -665,11 +665,11 @@ func (q *QueryExecutor) executeStatement(statementID int, stmt influxql.Statemen return row.Err } resultSent = true - results <- &influxql.Result{StatementID: statementID, Series: []*influxql.Row{row}} + results <- &influxql.Result{StatementID: statementID, Series: []*models.Row{row}} } if !resultSent { - results <- &influxql.Result{StatementID: statementID, Series: make([]*influxql.Row, 0)} + results <- &influxql.Result{StatementID: statementID, Series: make([]*models.Row, 0)} } return nil diff --git a/tsdb/query_executor_test.go b/tsdb/query_executor_test.go index d2795e2bfc1..b694ad1add2 100644 --- a/tsdb/query_executor_test.go +++ b/tsdb/query_executor_test.go @@ -145,7 +145,6 @@ func TestDropSeriesStatement(t *testing.T) { got = executeAndGetJSON("show tag keys from cpu", executor) exepected = `[{}]` - //exepected = `[{"series":[{"name":"cpu","columns":["tagKey"]}]}]` if exepected != got { t.Fatalf("exp: %s\ngot: %s", exepected, got) } diff --git a/tsdb/show_tag_keys.go b/tsdb/show_tag_keys.go index 3f3e1939ac5..29a39a7215e 100644 --- a/tsdb/show_tag_keys.go +++ b/tsdb/show_tag_keys.go @@ -6,6 +6,7 @@ import ( "sort" "github.com/influxdb/influxdb/influxql" + "github.com/influxdb/influxdb/models" ) // ShowTagKeysExecutor implements the Executor interface for a SHOW MEASUREMENTS statement. @@ -25,9 +26,9 @@ func NewShowTagKeysExecutor(stmt *influxql.ShowTagKeysStatement, mappers []Mappe } // Execute begins execution of the query and returns a channel to receive rows. -func (e *ShowTagKeysExecutor) Execute() <-chan *influxql.Row { +func (e *ShowTagKeysExecutor) Execute() <-chan *models.Row { // Create output channel and stream data in a separate goroutine. - out := make(chan *influxql.Row, 0) + out := make(chan *models.Row, 0) // It's important that all resources are released when execution completes. defer e.close() @@ -37,7 +38,7 @@ func (e *ShowTagKeysExecutor) Execute() <-chan *influxql.Row { // Open the mappers. for _, m := range e.mappers { if err := m.Open(); err != nil { - out <- &influxql.Row{Err: err} + out <- &models.Row{Err: err} return } } @@ -50,7 +51,7 @@ func (e *ShowTagKeysExecutor) Execute() <-chan *influxql.Row { for { c, err := m.NextChunk() if err != nil { - out <- &influxql.Row{Err: err} + out <- &models.Row{Err: err} return } else if c == nil { // Mapper has been drained. @@ -60,7 +61,7 @@ func (e *ShowTagKeysExecutor) Execute() <-chan *influxql.Row { // Convert the mapper chunk to an array of measurements with tag keys. mtks, ok := c.(MeasurementsTagKeys) if !ok { - out <- &influxql.Row{Err: fmt.Errorf("show tag keys mapper returned invalid type: %T", c)} + out <- &models.Row{Err: fmt.Errorf("show tag keys mapper returned invalid type: %T", c)} return } @@ -91,26 +92,19 @@ func (e *ShowTagKeysExecutor) Execute() <-chan *influxql.Row { // Sort by measurement name. sort.Sort(mstks) - // Calculate OFFSET and LIMIT. - off := e.stmt.Offset - lim := len(mstks) - stmtLim := e.stmt.Limit - - if stmtLim > 0 && off+stmtLim < lim { - lim = off + stmtLim - } else if off > lim { - off, lim = 0, 0 - } + slim, soff := limitAndOffset(e.stmt.SLimit, e.stmt.SOffset, len(mstks)) // Send results. - for _, mtks := range mstks[off:lim] { - row := &influxql.Row{ + for _, mtks := range mstks[soff:slim] { + lim, off := limitAndOffset(e.stmt.Limit, e.stmt.Offset, len(mtks.TagKeys)) + + row := &models.Row{ Name: mtks.Measurement, Columns: []string{"tagKey"}, - Values: make([][]interface{}, 0, len(mtks.TagKeys)), + Values: make([][]interface{}, 0, lim-off), } - for _, tk := range mtks.TagKeys { + for _, tk := range mtks.TagKeys[off:lim] { v := []interface{}{tk} row.Values = append(row.Values, v) } @@ -121,6 +115,26 @@ func (e *ShowTagKeysExecutor) Execute() <-chan *influxql.Row { return out } +// limitAndOffset calculates the limit and offset indexes for n things. +func limitAndOffset(lim, off, n int) (int, int) { + if off >= n { + return 0, 0 + } + + o := off + l := n + + if lim > 0 && o+lim < l { + l = o + lim + } + + if o > l { + return 0, 0 + } + + return l, o +} + // Close closes the executor such that all resources are released. Once closed, // an executor may not be re-used. func (e *ShowTagKeysExecutor) close() { From e4800df00696addb31659ec000c67a2505efebad Mon Sep 17 00:00:00 2001 From: David Norton Date: Tue, 22 Sep 2015 14:25:13 -0400 Subject: [PATCH 4/4] add more parser tests --- influxql/parser_test.go | 62 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 59 insertions(+), 3 deletions(-) diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 4d7ae3458f4..a8d96fe45fb 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -760,6 +760,65 @@ func TestParser_ParseStatement(t *testing.T) { }, }, + // SHOW TAG KEYS with OFFSET + { + s: `SHOW TAG KEYS FROM src OFFSET 1`, + stmt: &influxql.ShowTagKeysStatement{ + Sources: []influxql.Source{&influxql.Measurement{Name: "src"}}, + Offset: 1, + }, + }, + + // SHOW TAG KEYS with LIMIT and OFFSET + { + s: `SHOW TAG KEYS FROM src LIMIT 2 OFFSET 1`, + stmt: &influxql.ShowTagKeysStatement{ + Sources: []influxql.Source{&influxql.Measurement{Name: "src"}}, + Limit: 2, + Offset: 1, + }, + }, + + // SHOW TAG KEYS with SLIMIT + { + s: `SHOW TAG KEYS FROM src SLIMIT 2`, + stmt: &influxql.ShowTagKeysStatement{ + Sources: []influxql.Source{&influxql.Measurement{Name: "src"}}, + SLimit: 2, + }, + }, + + // SHOW TAG KEYS with SOFFSET + { + s: `SHOW TAG KEYS FROM src SOFFSET 1`, + stmt: &influxql.ShowTagKeysStatement{ + Sources: []influxql.Source{&influxql.Measurement{Name: "src"}}, + SOffset: 1, + }, + }, + + // SHOW TAG KEYS with SLIMIT and SOFFSET + { + s: `SHOW TAG KEYS FROM src SLIMIT 2 SOFFSET 1`, + stmt: &influxql.ShowTagKeysStatement{ + Sources: []influxql.Source{&influxql.Measurement{Name: "src"}}, + SLimit: 2, + SOffset: 1, + }, + }, + + // SHOW TAG KEYS with LIMIT, OFFSET, SLIMIT, and SOFFSET + { + s: `SHOW TAG KEYS FROM src LIMIT 4 OFFSET 3 SLIMIT 2 SOFFSET 1`, + stmt: &influxql.ShowTagKeysStatement{ + Sources: []influxql.Source{&influxql.Measurement{Name: "src"}}, + Limit: 4, + Offset: 3, + SLimit: 2, + SOffset: 1, + }, + }, + // SHOW TAG KEYS FROM // { s: `SHOW TAG KEYS FROM /[cg]pu/`, @@ -1563,9 +1622,6 @@ func TestParser_ParseStatement(t *testing.T) { } for i, tt := range tests { - if tt.s != `SHOW TAG KEYS FROM src LIMIT 2` { - continue - } if tt.skip { t.Logf("skipping test of '%s'", tt.s) continue