Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shards should be checking the limit on the query and stop returning data as soon as the limit is hit #370

Merged
merged 1 commit into from
Mar 31, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/cluster/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,20 +219,26 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo
maxDeleteResults := 10000
processor = engine.NewPassthroughEngine(response, maxDeleteResults)
} else {
query := querySpec.SelectQuery()
if self.ShouldAggregateLocally(querySpec) {
log.Debug("creating a query engine\n")
processor, err = engine.NewQueryEngine(querySpec.SelectQuery(), response)
processor, err = engine.NewQueryEngine(query, response)
if err != nil {
response <- &p.Response{Type: &endStreamResponse, ErrorMessage: p.String(err.Error())}
log.Error("Error while creating engine: %s", err)
return
}
processor.SetShardInfo(int(self.Id()), self.IsLocal)
} else {
} else if query.HasAggregates() {
maxPointsToBufferBeforeSending := 1000
log.Debug("creating a passthrough engine\n")
processor = engine.NewPassthroughEngine(response, maxPointsToBufferBeforeSending)
} else {
maxPointsToBufferBeforeSending := 1000
log.Debug("creating a passthrough engine with limit\n")
processor = engine.NewPassthroughEngineWithLimit(response, maxPointsToBufferBeforeSending, query.Limit)
}
processor = engine.NewFilteringEngine(query, processor)
}
shard, err := self.store.GetOrCreateShard(self.id)
if err != nil {
Expand Down
50 changes: 4 additions & 46 deletions src/engine/engine.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package engine

import (
log "code.google.com/p/log4go"
"common"
"fmt"
"parser"
Expand All @@ -10,6 +9,8 @@ import (
"strconv"
"strings"
"time"

log "code.google.com/p/log4go"
)

type QueryEngine struct {
Expand Down Expand Up @@ -158,59 +159,16 @@ func (self *QueryEngine) YieldSeries(seriesIncoming *protocol.Series) (shouldCon
}

func (self *QueryEngine) yieldSeriesData(series *protocol.Series) bool {
var err error
if self.where != nil {
serieses, err := self.filter(series)
if err != nil {
log.Error("Error while filtering points: %s\n", err)
return false
}
for _, series := range serieses {
if len(series.Points) > 0 {
self.limiter.calculateLimitAndSlicePoints(series)
if len(series.Points) > 0 {
if err = self.yield(series); err != nil {
return false
}
}
}
}
} else {
self.limiter.calculateLimitAndSlicePoints(series)
self.limiter.calculateLimitAndSlicePoints(series)

if len(series.Points) > 0 {
err = self.yield(series)
}
}
err := self.yield(series)
if err != nil {
log.Error(err)
return false
}
return !self.limiter.hitLimit(*series.Name)
}

func (self *QueryEngine) filter(series *protocol.Series) ([]*protocol.Series, error) {
aliases := self.query.GetTableAliases(*series.Name)
result := make([]*protocol.Series, len(aliases), len(aliases))
for i, alias := range aliases {
_alias := alias
newSeries := &protocol.Series{Name: &_alias, Points: series.Points, Fields: series.Fields}

filteredResult := newSeries
var err error

// var err error
if self.query.GetFromClause().Type != parser.FromClauseInnerJoin {
filteredResult, err = Filter(self.query, newSeries)
if err != nil {
return nil, err
}
}
result[i] = filteredResult
}
return result, nil
}

func (self *QueryEngine) Close() {
for _, series := range self.seriesToPoints {
if len(series.Points) == 0 {
Expand Down
52 changes: 52 additions & 0 deletions src/engine/filtering_engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package engine

import (
"parser"
p "protocol"
)

type FilteringEngine struct {
query *parser.SelectQuery
processor QueryProcessor
shouldFilter bool
}

func NewFilteringEngine(query *parser.SelectQuery, processor QueryProcessor) *FilteringEngine {
shouldFilter := query.GetWhereCondition() != nil
return &FilteringEngine{query, processor, shouldFilter}
}

// optimize for yield series and use it here
func (self *FilteringEngine) YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool {
return self.YieldSeries(&p.Series{
Name: seriesName,
Fields: columnNames,
Points: []*p.Point{point},
})
}

func (self *FilteringEngine) YieldSeries(seriesIncoming *p.Series) bool {
if !self.shouldFilter {
return self.processor.YieldSeries(seriesIncoming)
}

series, err := Filter(self.query, seriesIncoming)
if err != nil {
panic(err)
}
if len(series.Points) == 0 {
return false
}
return self.processor.YieldSeries(series)
}

func (self *FilteringEngine) Close() {
self.processor.Close()
}

func (self *FilteringEngine) SetShardInfo(shardId int, shardLocal bool) {
self.processor.SetShardInfo(shardId, shardLocal)
}
func (self *FilteringEngine) GetName() string {
return self.processor.GetName()
}
19 changes: 19 additions & 0 deletions src/engine/query_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package engine

import (
p "protocol"
)

type QueryProcessor interface {
// This method returns true if the query should continue. If the query should be stopped,
// like maybe the limit was hit, it should return false
YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool
YieldSeries(seriesIncoming *p.Series) bool
Close()

// Set by the shard, so EXPLAIN query can know query against which shard is being measured
SetShardInfo(shardId int, shardLocal bool)

// Let QueryProcessor identify itself. What if it is a spy and we can't check that?
GetName() string
}