Skip to content

Commit

Permalink
Use the standard heap implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
jvshahid committed Oct 10, 2014
1 parent 193c077 commit efcee88
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 148 deletions.
58 changes: 0 additions & 58 deletions common/heap/heap.go

This file was deleted.

58 changes: 0 additions & 58 deletions common/heap/heap_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion datastore/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (self *Shard) executeMergeQuery(querySpec *parser.QuerySpec, processor engi
i++
}

h := &engine.SeriesHeap{Ascending: querySpec.IsAscending()}
h := engine.NewSeriesHeap(querySpec.IsAscending())
merger := engine.NewCME("Shard", streams, h, processor, t == parser.FromClauseMerge)
if _, err := merger.Update(); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion engine/common_merge_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func NewCommonMergeEngine(shards []uint32, mergeColumns bool, ascending bool, ne
streams[i] = s
cme.streams[sh] = s
}
h := &SeriesHeap{Ascending: ascending}
h := NewSeriesHeap(ascending)
cme.merger = NewCME("Engine", streams, h, next, mergeColumns)
return cme
}
Expand Down
4 changes: 2 additions & 2 deletions engine/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Merger struct {
name string
s []StreamQuery
size int
h *SeriesHeap
h SeriesHeap
n Processor
lastStreamIdx int
initializing bool
Expand All @@ -34,7 +34,7 @@ type Merger struct {
// yields `column2` and `column3` then the result time series will
// have all 4 columns with two columns set to `nil` depending on which
// side the point came from.
func NewCME(name string, s []StreamQuery, h *SeriesHeap, n Processor, mergeColumns bool) *Merger {
func NewCME(name string, s []StreamQuery, h SeriesHeap, n Processor, mergeColumns bool) *Merger {
log4go.Debug("%sMerger: created with %d streams", name, len(s))
return &Merger{
name: name,
Expand Down
82 changes: 54 additions & 28 deletions engine/series_heap.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package engine

import (
"sort"
"container/heap"

"github.com/influxdb/influxdb/common/heap"
"github.com/influxdb/influxdb/protocol"
)

Expand All @@ -12,50 +11,79 @@ type Value struct {
s *protocol.Series
}

type ValueSlice []Value
type MinValueSlice struct {
values []Value
}

func (mvs *MinValueSlice) Len() int {
return len(mvs.values)
}

func (mvs *MinValueSlice) Less(i, j int) bool {
return mvs.values[i].s.Points[0].GetTimestamp() < mvs.values[j].s.Points[0].GetTimestamp()
}

func (mvs *MinValueSlice) Swap(i, j int) {
mvs.values[i], mvs.values[j] = mvs.values[j], mvs.values[i]
}

func (mvs *MinValueSlice) Push(x interface{}) {
mvs.values = append(mvs.values, x.(Value))
}

func (vs ValueSlice) Len() int {
return len(vs)
func (mvs *MinValueSlice) Pop() interface{} {
l := len(mvs.values)
var v interface{}
v, mvs.values = mvs.values[l-1], mvs.values[:l-1]
return v
}

func (vs ValueSlice) Less(i, j int) bool {
return vs[i].s.Points[0].GetTimestamp() < vs[j].s.Points[0].GetTimestamp()
type MaxValueSlice struct {
*MinValueSlice
}

func (vs ValueSlice) Swap(i, j int) {
vs[i], vs[j] = vs[j], vs[i]
func (mvs MaxValueSlice) Less(i, j int) bool {
return mvs.values[i].s.Points[0].GetTimestamp() >
mvs.values[j].s.Points[0].GetTimestamp()
}

// A heap that keeps holds one point series' (series that have one
// A heap that holds one point series' (series that have one
// point only) and their stream ids. See http://en.wikipedia.org/wiki/Heap_(data_structure)
// for more info on heaps. The heap is used by the Merger to emit
// points from multiple streams in monotic order.
type SeriesHeap struct {
Ascending bool
values []Value
values *MinValueSlice
}

func NewSeriesHeap(asc bool) SeriesHeap {
return SeriesHeap{
Ascending: asc,
values: &MinValueSlice{},
}
}

// returns the number of values in the heap so far
func (sh *SeriesHeap) Size() int {
return len(sh.values)
func (sh SeriesHeap) Size() int {
return sh.values.Len()
}

// Add another one point series with the given stream id. TODO: This
// is slightly inefficient way to construct the initial value slice,
// if we had a value slice we can construct the heap in O(n) instead
// of O(n logn) which is required if we construct the heap using
// multiple calls to Add()
func (sh *SeriesHeap) Add(streamId int, s *protocol.Series) {
sh.values = append(sh.values, Value{
s: s,
streamId: streamId,
})
l := sh.Size()
func (sh SeriesHeap) Add(streamId int, s *protocol.Series) {
var vs heap.Interface
if sh.Ascending {
heap.BubbleUp(ValueSlice(sh.values), l-1)
vs = sh.values
} else {
heap.BubbleUp(sort.Reverse(ValueSlice(sh.values)), l-1)
vs = MaxValueSlice{sh.values}
}
heap.Push(vs, Value{
s: s,
streamId: streamId,
})
}

// Get and remove the next one point series that has smallest (or
Expand All @@ -65,15 +93,13 @@ func (sh *SeriesHeap) Add(streamId int, s *protocol.Series) {
// the stream will be added immediately after and will cause a
// BubbleUp. In big O() notation this step doesn't change much, it
// only adds a contant to the upper bound.
func (sh *SeriesHeap) Next() (int, *protocol.Series) {
idx := 0
s := sh.Size()
v := sh.values[idx]
sh.values, sh.values[0] = sh.values[:s-1], sh.values[s-1]
func (sh SeriesHeap) Next() (int, *protocol.Series) {
var vs heap.Interface
if sh.Ascending {
heap.BubbleDown(ValueSlice(sh.values), 0)
vs = sh.values
} else {
heap.BubbleDown(sort.Reverse(ValueSlice(sh.values)), 0)
vs = MaxValueSlice{sh.values}
}
v := heap.Remove(vs, 0).(Value)
return v.streamId, v.s
}

0 comments on commit efcee88

Please sign in to comment.