Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into refactor-select-m…
Browse files Browse the repository at this point in the history
…apper

Conflicts:
	tsdb/store.go
  • Loading branch information
benbjohnson committed Sep 22, 2015
2 parents 96715d7 + 410eb4e commit 8e27cf1
Show file tree
Hide file tree
Showing 9 changed files with 615 additions and 61 deletions.
4 changes: 2 additions & 2 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3851,8 +3851,8 @@ func TestServer_Query_ShowTagKeys(t *testing.T) {
},
&Query{
name: "show tag keys measurement not found",
command: "SHOW TAG KEYS FROM bad",
exp: `{"results":[{"error":"measurement not found: bad"}]}`,
command: "SHOW TAG KEYS FROM doesntexist",
exp: `{"results":[{}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
Expand Down
19 changes: 16 additions & 3 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -2042,12 +2042,17 @@ type ShowTagKeysStatement struct {
// Fields to sort results by
SortFields SortFields

// Maximum number of rows to be returned.
// Unlimited if zero.
// Maximum number of tag keys per measurement. Unlimited if zero.
Limit int

// Returns rows starting at an offset from the first row.
// Returns tag keys starting at an offset from the first row.
Offset int

// Maxiumum number of series to be returned. Unlimited if zero.
SLimit int

// Returns series starting at an offset from the first one.
SOffset int
}

// String returns a string representation of the statement.
Expand Down Expand Up @@ -2075,6 +2080,14 @@ func (s *ShowTagKeysStatement) String() string {
_, _ = buf.WriteString(" OFFSET ")
_, _ = buf.WriteString(strconv.Itoa(s.Offset))
}
if s.SLimit > 0 {
_, _ = buf.WriteString(" SLIMIT ")
_, _ = buf.WriteString(strconv.Itoa(s.SLimit))
}
if s.SOffset > 0 {
_, _ = buf.WriteString(" SOFFSET ")
_, _ = buf.WriteString(strconv.Itoa(s.SOffset))
}
return buf.String()
}

Expand Down
10 changes: 10 additions & 0 deletions influxql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,16 @@ func (p *Parser) parseShowTagKeysStatement() (*ShowTagKeysStatement, error) {
return nil, err
}

// Parse series limit: "SLIMIT <n>".
if stmt.SLimit, err = p.parseOptionalTokenAndInt(SLIMIT); err != nil {
return nil, err
}

// Parse series offset: "SOFFSET <n>".
if stmt.SOffset, err = p.parseOptionalTokenAndInt(SOFFSET); err != nil {
return nil, err
}

return stmt, nil
}

Expand Down
68 changes: 68 additions & 0 deletions influxql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,74 @@ func TestParser_ParseStatement(t *testing.T) {
},
},

// SHOW TAG KEYS with LIMIT
{
s: `SHOW TAG KEYS FROM src LIMIT 2`,
stmt: &influxql.ShowTagKeysStatement{
Sources: []influxql.Source{&influxql.Measurement{Name: "src"}},
Limit: 2,
},
},

// SHOW TAG KEYS with OFFSET
{
s: `SHOW TAG KEYS FROM src OFFSET 1`,
stmt: &influxql.ShowTagKeysStatement{
Sources: []influxql.Source{&influxql.Measurement{Name: "src"}},
Offset: 1,
},
},

// SHOW TAG KEYS with LIMIT and OFFSET
{
s: `SHOW TAG KEYS FROM src LIMIT 2 OFFSET 1`,
stmt: &influxql.ShowTagKeysStatement{
Sources: []influxql.Source{&influxql.Measurement{Name: "src"}},
Limit: 2,
Offset: 1,
},
},

// SHOW TAG KEYS with SLIMIT
{
s: `SHOW TAG KEYS FROM src SLIMIT 2`,
stmt: &influxql.ShowTagKeysStatement{
Sources: []influxql.Source{&influxql.Measurement{Name: "src"}},
SLimit: 2,
},
},

// SHOW TAG KEYS with SOFFSET
{
s: `SHOW TAG KEYS FROM src SOFFSET 1`,
stmt: &influxql.ShowTagKeysStatement{
Sources: []influxql.Source{&influxql.Measurement{Name: "src"}},
SOffset: 1,
},
},

// SHOW TAG KEYS with SLIMIT and SOFFSET
{
s: `SHOW TAG KEYS FROM src SLIMIT 2 SOFFSET 1`,
stmt: &influxql.ShowTagKeysStatement{
Sources: []influxql.Source{&influxql.Measurement{Name: "src"}},
SLimit: 2,
SOffset: 1,
},
},

// SHOW TAG KEYS with LIMIT, OFFSET, SLIMIT, and SOFFSET
{
s: `SHOW TAG KEYS FROM src LIMIT 4 OFFSET 3 SLIMIT 2 SOFFSET 1`,
stmt: &influxql.ShowTagKeysStatement{
Sources: []influxql.Source{&influxql.Measurement{Name: "src"}},
Limit: 4,
Offset: 3,
SLimit: 2,
SOffset: 1,
},
},

// SHOW TAG KEYS FROM /<regex>/
{
s: `SHOW TAG KEYS FROM /[cg]pu/`,
Expand Down
133 changes: 130 additions & 3 deletions tsdb/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func TestWritePointsAndExecuteTwoShardsAlign(t *testing.T) {

// Test to ensure the engine handles query re-writing across stores.
func TestWritePointsAndExecuteTwoShardsQueryRewrite(t *testing.T) {
// Create two distinct stores, ensuring shard mappers will shard nothing.
// Create two distinct stores, ensuring shard mappers will share nothing.
store0 := testStore()
defer os.RemoveAll(store0.Path())
store1 := testStore()
Expand Down Expand Up @@ -434,8 +434,8 @@ func TestWritePointsAndExecuteTwoShardsTagSetOrdering(t *testing.T) {
}

// Test to ensure the engine handles measurements across stores.
func TestWritePointsAndExecuteTwoShardsShowMeasurements(t *testing.T) {
// Create two distinct stores, ensuring shard mappers will shard nothing.
func TestShowMeasurementsMultipleShards(t *testing.T) {
// Create two distinct stores, ensuring shard mappers will share nothing.
store0 := testStore()
defer os.RemoveAll(store0.Path())
store1 := testStore()
Expand Down Expand Up @@ -532,6 +532,133 @@ func TestWritePointsAndExecuteTwoShardsShowMeasurements(t *testing.T) {
}
}

// Test to ensure the engine handles tag keys across stores.
func TestShowShowTagKeysMultipleShards(t *testing.T) {
// Create two distinct stores, ensuring shard mappers will share nothing.
store0 := testStore()
defer os.RemoveAll(store0.Path())
store1 := testStore()
defer os.RemoveAll(store1.Path())

// Create a shard in each store.
database := "foo"
retentionPolicy := "bar"
store0.CreateShard(database, retentionPolicy, sID0)
store1.CreateShard(database, retentionPolicy, sID1)

// Write two points across shards.
pt1time := time.Unix(1, 0).UTC()
if err := store0.WriteToShard(sID0, []models.Point{
models.NewPoint(
"cpu",
map[string]string{"host": "serverA", "region": "uswest"},
map[string]interface{}{"value1": 100},
pt1time,
),
models.NewPoint(
"cpu",
map[string]string{"host": "serverB", "region": "useast"},
map[string]interface{}{"value1": 100},
pt1time,
),
}); err != nil {
t.Fatalf(err.Error())
}
pt2time := time.Unix(2, 0).UTC()
if err := store1.WriteToShard(sID1, []models.Point{
models.NewPoint(
"cpu",
map[string]string{"host": "serverB", "region": "useast", "rack": "12"},
map[string]interface{}{"value1": 100},
pt1time,
),
models.NewPoint(
"mem",
map[string]string{"host": "serverB"},
map[string]interface{}{"value2": 200},
pt2time,
)}); err != nil {
t.Fatalf(err.Error())
}
var tests = []struct {
skip bool // Skip test
stmt string // Query statement
chunkSize int // Chunk size for driving the executor
expected string // Expected results, rendered as a string
}{
{
stmt: `SHOW TAG KEYS`,
expected: `[{"name":"cpu","columns":["tagKey"],"values":[["host"],["rack"],["region"]]},{"name":"mem","columns":["tagKey"],"values":[["host"]]}]`,
},
{
stmt: `SHOW TAG KEYS SLIMIT 1`,
expected: `[{"name":"cpu","columns":["tagKey"],"values":[["host"],["rack"],["region"]]}]`,
},
{
stmt: `SHOW TAG KEYS SLIMIT 1 SOFFSET 1`,
expected: `[{"name":"mem","columns":["tagKey"],"values":[["host"]]}]`,
},
{
stmt: `SHOW TAG KEYS SOFFSET 1`,
expected: `[{"name":"mem","columns":["tagKey"],"values":[["host"]]}]`,
},
{
stmt: `SHOW TAG KEYS LIMIT 1`,
expected: `[{"name":"cpu","columns":["tagKey"],"values":[["host"]]},{"name":"mem","columns":["tagKey"],"values":[["host"]]}]`,
},
{
stmt: `SHOW TAG KEYS LIMIT 1 OFFSET 1`,
expected: `[{"name":"cpu","columns":["tagKey"],"values":[["rack"]]},{"name":"mem","columns":["tagKey"]}]`,
},
{
stmt: `SHOW TAG KEYS OFFSET 1`,
expected: `[{"name":"cpu","columns":["tagKey"],"values":[["rack"],["region"]]},{"name":"mem","columns":["tagKey"]}]`,
},
{
stmt: `SHOW TAG KEYS FROM cpu`,
expected: `[{"name":"cpu","columns":["tagKey"],"values":[["host"],["rack"],["region"]]}]`,
},
{
stmt: `SHOW TAG KEYS FROM cpu WHERE region = 'uswest'`,
expected: `[{"name":"cpu","columns":["tagKey"],"values":[["host"],["region"]]}]`,
},
{
stmt: `SHOW TAG KEYS FROM doesntexist`,
expected: `null`,
},
{
stmt: `SHOW TAG KEYS FROM cpu WHERE region = 'doesntexist'`,
expected: `null`,
},
}
for _, tt := range tests {
if tt.skip {
t.Logf("Skipping test %s", tt.stmt)
continue
}

parsedStmt := mustParseStatement(tt.stmt).(*influxql.ShowTagKeysStatement)

// Create Mappers and Executor.
mapper0, err := store0.CreateMapper(sID0, parsedStmt, tt.chunkSize)
if err != nil {
t.Fatalf("failed to create mapper0: %s", err.Error())
}
mapper1, err := store1.CreateMapper(sID1, parsedStmt, tt.chunkSize)
if err != nil {
t.Fatalf("failed to create mapper1: %s", err.Error())
}
executor := tsdb.NewShowTagKeysExecutor(parsedStmt, []tsdb.Mapper{mapper0, mapper1}, tt.chunkSize)

// Check the results.
got := executeAndGetResults(executor)
if got != tt.expected {
t.Fatalf("Test %s\nexp: %s\ngot: %s\n", tt.stmt, tt.expected, got)
}

}
}

// TestProccessAggregateDerivative tests the RawQueryDerivativeProcessor transformation function on the engine.
// The is called for a query with a GROUP BY.
func TestProcessAggregateDerivative(t *testing.T) {
Expand Down
Loading

0 comments on commit 8e27cf1

Please sign in to comment.