Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge should work with a regex #72

Merged
merged 7 commits into from
Oct 21, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cluster/cluster_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,7 @@ func (self *ClusterConfiguration) GetShardsForQuery(querySpec *parser.QuerySpec)
if err != nil {
return nil, err
}
log.Debug("Querying %d shards for query", len(shards))
shards = self.getShardRange(querySpec, shards)
if querySpec.IsAscending() {
SortShardsByTimeAscending(shards)
Expand Down
4 changes: 3 additions & 1 deletion cluster/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (self *ShardData) getProcessor(querySpec *parser.QuerySpec, processor engin
// We should aggregate at the shard level
if self.ShouldAggregateLocally(querySpec) {
log.Debug("creating a query engine")
processor, err = engine.NewQueryEngine(processor, query)
processor, err = engine.NewQueryEngine(processor, query, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -300,6 +300,8 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan<- *p.Res
var processor engine.Processor = NewResponseChannelProcessor(NewResponseChannelWrapper(response))
var err error

processor = NewShardIdInserterProcessor(self.Id(), processor)

processor, err = self.getProcessor(querySpec, processor)
if err != nil {
response <- &p.Response{
Expand Down
31 changes: 31 additions & 0 deletions cluster/shard_id_inserter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package cluster

import (
"fmt"

"github.com/influxdb/influxdb/engine"
"github.com/influxdb/influxdb/protocol"
)

// A processor to set the ShardId on the series to `id`
type ShardIdInserterProcessor struct {
id uint32
next engine.Processor
}

func NewShardIdInserterProcessor(id uint32, next engine.Processor) ShardIdInserterProcessor {
return ShardIdInserterProcessor{id, next}
}

func (sip ShardIdInserterProcessor) Yield(s *protocol.Series) (bool, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not using "self" because the receiver isn't a pointer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not idomatic to use self in go code, i'm trying to slowly remove all uses of self and replace them with short names, e.g. the initials of the structure.

s.ShardId = &sip.id
return sip.next.Yield(s)
}

func (sip ShardIdInserterProcessor) Close() error {
return sip.next.Close()
}

func (sip ShardIdInserterProcessor) Name() string {
return fmt.Sprintf("ShardIdInserterProcessor (%d)", sip.id)
}
25 changes: 24 additions & 1 deletion coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,11 @@ func (self *Coordinator) getShardsAndProcessor(querySpec *parser.QuerySpec, writ
if !shouldAggregateLocally {
// if we should aggregate in the coordinator (i.e. aggregation
// isn't happening locally at the shard level), create an engine
writer, err = engine.NewQueryEngine(writer, q)
shardIds := make([]uint32, len(shards))
for i, s := range shards {
shardIds[i] = s.Id()
}
writer, err = engine.NewQueryEngine(writer, q, shardIds)
return shards, writer, err
}

Expand Down Expand Up @@ -268,8 +272,27 @@ func (self *Coordinator) queryShards(querySpec *parser.QuerySpec, shards []*clus
return nil
}

func (self *Coordinator) expandRegex(spec *parser.QuerySpec) {
q := spec.SelectQuery()
if q == nil {
return
}

if f := q.FromClause; f.Type == parser.FromClauseMergeFun {
series := self.clusterConfiguration.MetaStore.GetSeriesForDatabaseAndRegex(spec.Database(), q.FromClause.Regex)
f.Type = parser.FromClauseMerge
f.Regex = nil
for _, s := range series {
f.Names = append(f.Names, &parser.TableName{
Name: &parser.Value{Name: s, Type: parser.ValueTableName},
})
}
}
}

// We call this function only if we have a Select query (not continuous) or Delete query
func (self *Coordinator) runQuerySpec(querySpec *parser.QuerySpec, p engine.Processor) error {
self.expandRegex(querySpec)
shards, processor, err := self.getShardsAndProcessor(querySpec, p)
if err != nil {
return err
Expand Down
51 changes: 35 additions & 16 deletions datastore/point_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@ type PointIterator struct {
asc bool
}

func NewPointIterator(itrs []storage.Iterator, fields []*metastore.Field, startTime, endTime time.Time, asc bool) PointIterator {
// Creates a new point iterator using the given column iterator,
// metadata columns, start and end time as well as the ascending
// flag. The iterator returned is already placed at the first point,
// there's no need to call Next() after the call to NewPointIterator,
// but the user should check Valid() to make sure the iterator is
// pointing at a valid point.
func NewPointIterator(itrs []storage.Iterator, fields []*metastore.Field, startTime, endTime time.Time, asc bool) *PointIterator {
pi := PointIterator{
valid: true,
err: nil,
Expand All @@ -38,9 +44,12 @@ func NewPointIterator(itrs []storage.Iterator, fields []*metastore.Field, startT

// seek to the first point
pi.Next()
return pi
return &pi
}

// public api

// Advance the iterator to the next point
func (pi *PointIterator) Next() {
valueBuffer := proto.NewBuffer(nil)
pi.valid = false
Expand Down Expand Up @@ -127,6 +136,30 @@ func (pi *PointIterator) Next() {
pi.point.SequenceNumber = proto.Uint64(next.sequence)
}

// Returns true if the iterator is pointing at a valid
// location. Behavior of Point() is undefined if Valid() is false.
func (pi *PointIterator) Valid() bool {
return pi.valid
}

// Returns the point that the iterator is pointing to.
func (pi *PointIterator) Point() *protocol.Point { return pi.point }

// Returns an error if the iterator became invalid due to an error as
// opposed to reaching the end time.
func (pi *PointIterator) Error() error { return pi.err }

// Close the iterator and free any resources used by the
// iterator. Behavior of the iterator is undefined if the iterator is
// used after it was closed.
func (pi *PointIterator) Close() {
for _, itr := range pi.itrs {
itr.Close()
}
}

// private api

func (pi *PointIterator) getIteratorNextValue() error {
for i, it := range pi.itrs {
if pi.rawColumnValues[i].value != nil {
Expand Down Expand Up @@ -165,21 +198,7 @@ func (pi *PointIterator) getIteratorNextValue() error {
return nil
}

func (pi *PointIterator) Valid() bool {
return pi.valid
}

func (pi *PointIterator) setError(err error) {
pi.err = err
pi.valid = false
}

func (pi *PointIterator) Point() *protocol.Point { return pi.point }

func (pi *PointIterator) Error() error { return pi.err }

func (pi *PointIterator) Close() {
for _, itr := range pi.itrs {
itr.Close()
}
}
35 changes: 35 additions & 0 deletions datastore/point_iterator_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package datastore

import "github.com/influxdb/influxdb/protocol"

// PointIteratorStream is a struct that implements the StreamQuery
// interface and is used by the shard with the Merger to merge the
// data points locally to form a monotic stream of points (increasing
// or decreasing timestamps)
type PointIteratorStream struct {
pi *PointIterator
name string
fields []string
}

// Returns true if the point iterator is still valid
func (pis PointIteratorStream) HasPoint() bool {
return pis.pi.Valid()
}

// Returns the next point from the point iterator
func (pis PointIteratorStream) Next() *protocol.Series {
p := pis.pi.Point()
s := &protocol.Series{
Name: &pis.name,
Fields: pis.fields,
Points: []*protocol.Point{p},
}
pis.pi.Next()
return s
}

// Returns true if the point iterator is not valid
func (pis PointIteratorStream) Closed() bool {
return !pis.pi.Valid()
}
Loading