diff --git a/src/cluster/shard.go b/src/cluster/shard.go index 580fed51e11..6a535ea3fad 100644 --- a/src/cluster/shard.go +++ b/src/cluster/shard.go @@ -219,20 +219,26 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo maxDeleteResults := 10000 processor = engine.NewPassthroughEngine(response, maxDeleteResults) } else { + query := querySpec.SelectQuery() if self.ShouldAggregateLocally(querySpec) { log.Debug("creating a query engine\n") - processor, err = engine.NewQueryEngine(querySpec.SelectQuery(), response) + processor, err = engine.NewQueryEngine(query, response) if err != nil { response <- &p.Response{Type: &endStreamResponse, ErrorMessage: p.String(err.Error())} log.Error("Error while creating engine: %s", err) return } processor.SetShardInfo(int(self.Id()), self.IsLocal) - } else { + } else if query.HasAggregates() { maxPointsToBufferBeforeSending := 1000 log.Debug("creating a passthrough engine\n") processor = engine.NewPassthroughEngine(response, maxPointsToBufferBeforeSending) + } else { + maxPointsToBufferBeforeSending := 1000 + log.Debug("creating a passthrough engine with limit\n") + processor = engine.NewPassthroughEngineWithLimit(response, maxPointsToBufferBeforeSending, query.Limit) } + processor = engine.NewFilteringEngine(query, processor) } shard, err := self.store.GetOrCreateShard(self.id) if err != nil { diff --git a/src/engine/engine.go b/src/engine/engine.go index ba03b15be08..86a35df6808 100644 --- a/src/engine/engine.go +++ b/src/engine/engine.go @@ -1,7 +1,6 @@ package engine import ( - log "code.google.com/p/log4go" "common" "fmt" "parser" @@ -10,6 +9,8 @@ import ( "strconv" "strings" "time" + + log "code.google.com/p/log4go" ) type QueryEngine struct { @@ -158,30 +159,9 @@ func (self *QueryEngine) YieldSeries(seriesIncoming *protocol.Series) (shouldCon } func (self *QueryEngine) yieldSeriesData(series *protocol.Series) bool { - var err error - if self.where != nil { - serieses, err := self.filter(series) - if err != nil { - log.Error("Error while filtering points: %s\n", err) - return false - } - for _, series := range serieses { - if len(series.Points) > 0 { - self.limiter.calculateLimitAndSlicePoints(series) - if len(series.Points) > 0 { - if err = self.yield(series); err != nil { - return false - } - } - } - } - } else { - self.limiter.calculateLimitAndSlicePoints(series) + self.limiter.calculateLimitAndSlicePoints(series) - if len(series.Points) > 0 { - err = self.yield(series) - } - } + err := self.yield(series) if err != nil { log.Error(err) return false @@ -189,28 +169,6 @@ func (self *QueryEngine) yieldSeriesData(series *protocol.Series) bool { return !self.limiter.hitLimit(*series.Name) } -func (self *QueryEngine) filter(series *protocol.Series) ([]*protocol.Series, error) { - aliases := self.query.GetTableAliases(*series.Name) - result := make([]*protocol.Series, len(aliases), len(aliases)) - for i, alias := range aliases { - _alias := alias - newSeries := &protocol.Series{Name: &_alias, Points: series.Points, Fields: series.Fields} - - filteredResult := newSeries - var err error - - // var err error - if self.query.GetFromClause().Type != parser.FromClauseInnerJoin { - filteredResult, err = Filter(self.query, newSeries) - if err != nil { - return nil, err - } - } - result[i] = filteredResult - } - return result, nil -} - func (self *QueryEngine) Close() { for _, series := range self.seriesToPoints { if len(series.Points) == 0 { diff --git a/src/engine/filtering_engine.go b/src/engine/filtering_engine.go new file mode 100644 index 00000000000..2fa23a64311 --- /dev/null +++ b/src/engine/filtering_engine.go @@ -0,0 +1,52 @@ +package engine + +import ( + "parser" + p "protocol" +) + +type FilteringEngine struct { + query *parser.SelectQuery + processor QueryProcessor + shouldFilter bool +} + +func NewFilteringEngine(query *parser.SelectQuery, processor QueryProcessor) *FilteringEngine { + shouldFilter := query.GetWhereCondition() != nil + return &FilteringEngine{query, processor, shouldFilter} +} + +// optimize for yield series and use it here +func (self *FilteringEngine) YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool { + return self.YieldSeries(&p.Series{ + Name: seriesName, + Fields: columnNames, + Points: []*p.Point{point}, + }) +} + +func (self *FilteringEngine) YieldSeries(seriesIncoming *p.Series) bool { + if !self.shouldFilter { + return self.processor.YieldSeries(seriesIncoming) + } + + series, err := Filter(self.query, seriesIncoming) + if err != nil { + panic(err) + } + if len(series.Points) == 0 { + return false + } + return self.processor.YieldSeries(series) +} + +func (self *FilteringEngine) Close() { + self.processor.Close() +} + +func (self *FilteringEngine) SetShardInfo(shardId int, shardLocal bool) { + self.processor.SetShardInfo(shardId, shardLocal) +} +func (self *FilteringEngine) GetName() string { + return self.processor.GetName() +} diff --git a/src/engine/query_processor.go b/src/engine/query_processor.go new file mode 100644 index 00000000000..a00d9e8f103 --- /dev/null +++ b/src/engine/query_processor.go @@ -0,0 +1,19 @@ +package engine + +import ( + p "protocol" +) + +type QueryProcessor interface { + // This method returns true if the query should continue. If the query should be stopped, + // like maybe the limit was hit, it should return false + YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool + YieldSeries(seriesIncoming *p.Series) bool + Close() + + // Set by the shard, so EXPLAIN query can know query against which shard is being measured + SetShardInfo(shardId int, shardLocal bool) + + // Let QueryProcessor identify itself. What if it is a spy and we can't check that? + GetName() string +}