Skip to content

Commit

Permalink
Merge pull request #3986 from influxdb/jw-order-by
Browse files Browse the repository at this point in the history
Support sorting by time desc
  • Loading branch information
jwilder committed Sep 4, 2015
2 parents 3a8b02a + df70a1c commit 6f41c0f
Show file tree
Hide file tree
Showing 16 changed files with 778 additions and 140 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ With this release InfluxDB is moving to Go 1.5.
- [#3901](https://github.com/influxdb/influxdb/pull/3901): Add consistency level option to influx cli Thanks @takayuki
- [#3876](https://github.com/influxdb/influxdb/pull/3876): Allow the following syntax in CQs: INTO "1hPolicy".:MEASUREMENT
- [#3975](https://github.com/influxdb/influxdb/pull/3975): Add shard copy service
- [#3986](https://github.com/influxdb/influxdb/pull/3986): Support sorting by time desc

### Bugfixes
- [#3804](https://github.com/influxdb/influxdb/pull/3804): init.d script fixes, fixes issue 3803.
Expand Down
56 changes: 56 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2201,6 +2201,14 @@ func TestServer_Query_Aggregates(t *testing.T) {
command: `SELECT sum(value)/2 FROM load`,
exp: `{"results":[{"series":[{"name":"load","columns":["time",""],"values":[["1970-01-01T00:00:00Z",75]]}]}]}`,
},

// order by time desc
&Query{
name: "aggregate order by time desc",
params: url.Values{"db": []string{"db0"}},
command: `SELECT max(value) FROM intmany where time >= '2000-01-01T00:00:00Z' AND time <= '2000-01-01T00:01:00Z' group by time(10s) order by time desc`,
exp: `{"results":[{"series":[{"name":"intmany","columns":["time","max"],"values":[["2000-01-01T00:01:00Z",7],["2000-01-01T00:00:50Z",5],["2000-01-01T00:00:40Z",5],["2000-01-01T00:00:30Z",4],["2000-01-01T00:00:20Z",4],["2000-01-01T00:00:10Z",4],["2000-01-01T00:00:00Z",2]]}]}]}`,
},
}...)

for i, query := range test.queries {
Expand Down Expand Up @@ -3877,3 +3885,51 @@ func TestServer_Query_EvilIdentifiers(t *testing.T) {
}
}
}

func TestServer_Query_OrderByTime(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
defer s.Close()

if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}

writes := []string{
fmt.Sprintf(`cpu,host=server1 value=1 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:01Z").UnixNano()),
fmt.Sprintf(`cpu,host=server1 value=2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:02Z").UnixNano()),
fmt.Sprintf(`cpu,host=server1 value=3 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:03Z").UnixNano()),
}

test := NewTest("db0", "rp0")
test.write = strings.Join(writes, "\n")

test.addQueries([]*Query{
&Query{
name: "order on points",
params: url.Values{"db": []string{"db0"}},
command: `select value from "cpu" ORDER BY time DESC`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:03Z",3],["2000-01-01T00:00:02Z",2],["2000-01-01T00:00:01Z",1]]}]}]}`,
},
}...)

for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
2 changes: 1 addition & 1 deletion influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ type SortField struct {
// String returns a string representation of a sort field
func (field *SortField) String() string {
var buf bytes.Buffer
if field.Name == "" {
if field.Name != "" {
_, _ = buf.WriteString(field.Name)
_, _ = buf.WriteString(" ")
}
Expand Down
42 changes: 23 additions & 19 deletions influxql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -1809,24 +1809,29 @@ func (p *Parser) parseOrderBy() (SortFields, error) {
func (p *Parser) parseSortFields() (SortFields, error) {
var fields SortFields

// If first token is ASC or DESC, all fields are sorted.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok == ASC || tok == DESC {
if tok == DESC {
// Token must be ASC, until other sort orders are supported.
return nil, errors.New("only ORDER BY time ASC supported at this time")
}
return append(fields, &SortField{Ascending: (tok == ASC)}), nil
} else if tok != IDENT {
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "ASC", "DESC"}, pos)
}
p.unscan()
tok, pos, lit := p.scanIgnoreWhitespace()

// At least one field is required.
field, err := p.parseSortField()
if err != nil {
return nil, err
switch tok {
// The first field after an order by may not have a field name (e.g. ORDER BY ASC)
case ASC, DESC:
fields = append(fields, &SortField{Ascending: (tok == ASC)})
// If it's a token, parse it as a sort field. At least one is required.
case IDENT:
p.unscan()
field, err := p.parseSortField()
if err != nil {
return nil, err
}

if lit != "time" {
return nil, errors.New("only ORDER BY time supported at this time")
}

fields = append(fields, field)
// Parse error...
default:
return nil, newParseError(tokstr(tok, lit), []string{"identifier", "ASC", "DESC"}, pos)
}
fields = append(fields, field)

// Parse additional fields.
for {
Expand All @@ -1845,9 +1850,8 @@ func (p *Parser) parseSortFields() (SortFields, error) {
fields = append(fields, field)
}

// First SortField must be time ASC, until other sort orders are supported.
if len(fields) > 1 || fields[0].Name != "time" || !fields[0].Ascending {
return nil, errors.New("only ORDER BY time ASC supported at this time")
if len(fields) > 1 {
return nil, errors.New("only ORDER BY time supported at this time")
}

return fields, nil
Expand Down
23 changes: 10 additions & 13 deletions influxql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ func TestParser_ParseStatement(t *testing.T) {

// SELECT statement
{
skip: true,
s: fmt.Sprintf(`SELECT mean(field1), sum(field2) ,count(field3) AS field_x FROM myseries WHERE host = 'hosta.influxdb.org' and time > '%s' GROUP BY time(10h) ORDER BY ASC LIMIT 20 OFFSET 10;`, now.UTC().Format(time.RFC3339Nano)),
s: fmt.Sprintf(`SELECT mean(field1), sum(field2) ,count(field3) AS field_x FROM myseries WHERE host = 'hosta.influxdb.org' and time > '%s' GROUP BY time(10h) ORDER BY DESC LIMIT 20 OFFSET 10;`, now.UTC().Format(time.RFC3339Nano)),
stmt: &influxql.SelectStatement{
IsRawQuery: false,
Fields: []*influxql.Field{
Expand All @@ -101,7 +100,7 @@ func TestParser_ParseStatement(t *testing.T) {
},
Dimensions: []*influxql.Dimension{{Expr: &influxql.Call{Name: "time", Args: []influxql.Expr{&influxql.DurationLiteral{Val: 10 * time.Hour}}}}},
SortFields: []*influxql.SortField{
{Ascending: true},
{Ascending: false},
},
Limit: 20,
Offset: 10,
Expand Down Expand Up @@ -587,17 +586,17 @@ func TestParser_ParseStatement(t *testing.T) {
// SHOW SERIES WHERE with ORDER BY and LIMIT
{
skip: true,
s: `SHOW SERIES WHERE region = 'uswest' ORDER BY ASC, field1, field2 DESC LIMIT 10`,
s: `SHOW SERIES WHERE region = 'order by desc' ORDER BY DESC, field1, field2 DESC LIMIT 10`,
stmt: &influxql.ShowSeriesStatement{
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "region"},
RHS: &influxql.StringLiteral{Val: "uswest"},
RHS: &influxql.StringLiteral{Val: "order by desc"},
},
SortFields: []*influxql.SortField{
{Ascending: true},
{Name: "field1"},
{Name: "field2"},
&influxql.SortField{Ascending: false},
&influxql.SortField{Name: "field1", Ascending: true},
&influxql.SortField{Name: "field2"},
},
Limit: 10,
},
Expand Down Expand Up @@ -1260,10 +1259,7 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `SELECT field1 FROM myseries ORDER BY /`, err: `found /, expected identifier, ASC, DESC at line 1, char 38`},
{s: `SELECT field1 FROM myseries ORDER BY 1`, err: `found 1, expected identifier, ASC, DESC at line 1, char 38`},
{s: `SELECT field1 FROM myseries ORDER BY time ASC,`, err: `found EOF, expected identifier at line 1, char 47`},
{s: `SELECT field1 FROM myseries ORDER BY DESC`, err: `only ORDER BY time ASC supported at this time`},
{s: `SELECT field1 FROM myseries ORDER BY field1`, err: `only ORDER BY time ASC supported at this time`},
{s: `SELECT field1 FROM myseries ORDER BY time DESC`, err: `only ORDER BY time ASC supported at this time`},
{s: `SELECT field1 FROM myseries ORDER BY time, field1`, err: `only ORDER BY time ASC supported at this time`},
{s: `SELECT field1 FROM myseries ORDER BY time, field1`, err: `only ORDER BY time supported at this time`},
{s: `SELECT field1 AS`, err: `found EOF, expected identifier at line 1, char 18`},
{s: `SELECT field1 FROM foo group by time(1s)`, err: `GROUP BY requires at least one aggregate function`},
{s: `SELECT count(value), value FROM foo`, err: `mixing aggregate and non-aggregate queries is not supported`},
Expand Down Expand Up @@ -1440,7 +1436,8 @@ func TestParser_ParseStatement(t *testing.T) {
if !reflect.DeepEqual(tt.err, errstring(err)) {
t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
} else if tt.err == "" && !reflect.DeepEqual(tt.stmt, stmt) {
t.Logf("\nexp=%s\ngot=%s\n", mustMarshalJSON(tt.stmt), mustMarshalJSON(stmt))
t.Logf("\n# %s\nexp=%s\ngot=%s\n", tt.s, mustMarshalJSON(tt.stmt), mustMarshalJSON(stmt))
t.Logf("\nSQL exp=%s\nSQL got=%s\n", tt.stmt.String(), stmt.String())
t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.s, tt.stmt, stmt)
}
}
Expand Down
47 changes: 41 additions & 6 deletions tsdb/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,48 @@ import (
"container/heap"
)

// Direction represents a cursor navigation direction.
type Direction bool

const (
// Forward indicates that a cursor will move forward over its values.
Forward Direction = true
// Reverse indicates that a cursor will move backwards over its values.
Reverse Direction = false
)

func (d Direction) String() string {
if d.Forward() {
return "forward"
}
return "reverse"
}

// Forward returns true if direction is forward
func (d Direction) Forward() bool {
return d == Forward
}

// Forward returns true if direction is reverse
func (d Direction) Reverse() bool {
return d == Reverse
}

// MultiCursor returns a single cursor that combines the results of all cursors in order.
//
// If the same key is returned from multiple cursors then the first cursor
// specified will take precendence. A key will only be returned once from the
// returned cursor.
func MultiCursor(cursors ...Cursor) Cursor {
return &multiCursor{cursors: cursors}
func MultiCursor(d Direction, cursors ...Cursor) Cursor {
return &multiCursor{cursors: cursors, direction: d}
}

// multiCursor represents a cursor that combines multiple cursors into one.
type multiCursor struct {
cursors []Cursor
heap cursorHeap
prev []byte
cursors []Cursor
heap cursorHeap
prev []byte
direction Direction
}

// Seek moves the cursor to a given key.
Expand Down Expand Up @@ -48,6 +76,8 @@ func (mc *multiCursor) Seek(seek []byte) (key, value []byte) {
return mc.pop()
}

func (mc *multiCursor) Direction() Direction { return mc.direction }

// Next returns the next key/value from the cursor.
func (mc *multiCursor) Next() (key, value []byte) { return mc.pop() }

Expand Down Expand Up @@ -90,7 +120,12 @@ type cursorHeap []*cursorHeapItem
func (h cursorHeap) Len() int { return len(h) }
func (h cursorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h cursorHeap) Less(i, j int) bool {
if cmp := bytes.Compare(h[i].key, h[j].key); cmp == -1 {
dir := -1
if !h[i].cursor.Direction() {
dir = 1
}

if cmp := bytes.Compare(h[i].key, h[j].key); cmp == dir {
return true
} else if cmp == 0 {
return h[i].priority > h[j].priority
Expand Down
Loading

0 comments on commit 6f41c0f

Please sign in to comment.