Skip to content

Commit

Permalink
fix #442. Fix some bugs in shouldQuerySequentially and add a test
Browse files Browse the repository at this point in the history
  • Loading branch information
jvshahid committed Apr 15, 2014
1 parent 545cc05 commit 826e9f9
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 1 deletion.
6 changes: 5 additions & 1 deletion src/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (self *CoordinatorImpl) shouldAggregateLocally(shards []*cluster.ShardData,

func (self *CoordinatorImpl) shouldQuerySequentially(shards []*cluster.ShardData, querySpec *parser.QuerySpec) bool {
// if the query isn't a select, then it doesn't matter
if querySpec.SelectQuery != nil {
if querySpec.SelectQuery() == nil {
return false
}

Expand Down Expand Up @@ -244,6 +244,10 @@ func (self *CoordinatorImpl) shouldQuerySequentially(shards []*cluster.ShardData
return true
}

if !self.shouldAggregateLocally(shards, querySpec) {
return true
}

for _, shard := range shards {
bufferSize := shard.QueryResponseBufferSize(querySpec, self.config.LevelDbPointBatchSize)
// if the number of repsonses is too big, do a sequential querying
Expand Down
42 changes: 42 additions & 0 deletions src/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package coordinator

import (
"cluster"
"configuration"
"fmt"
"parser"
"time"
. "launchpad.net/gocheck"
)

type CoordinatorSuite struct{}

var _ = Suite(&CoordinatorSuite{})

func (self *CoordinatorSuite) TestShouldQuerySequentially(c *C) {
end := time.Now().Truncate(24 * time.Hour)
start := end.Add(-7 * 24 * time.Hour)
shard := cluster.NewShard(1, start, end, cluster.SHORT_TERM, false, nil)
shards := []*cluster.ShardData{shard}
coordinator := NewCoordinatorImpl(&configuration.Configuration{
ClusterMaxResponseBufferSize: 1000,
}, nil, nil)
queries := map[string]bool{
"list series": false,
"select count(foo) from /.*bar.*/ group by time(1d)": true,
"select count(foo) from bar": true,
"select foo from bar": true,
"select count(foo) from bar group by baz": true,
"select count(foo) from bar group by time(1d)": false,
"select count(foo) from bar group by time(3d)": true,
}

for query, result := range queries {
fmt.Printf("Testing %s\n", query)
parsedQuery, err := parser.ParseQuery(query)
c.Assert(err, IsNil)
c.Assert(parsedQuery, HasLen, 1)
querySpec := parser.NewQuerySpec(nil, "", parsedQuery[0])
c.Assert(coordinator.shouldQuerySequentially(shards, querySpec), Equals, result)
}
}

0 comments on commit 826e9f9

Please sign in to comment.