Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

WIP - Basic clustering support - issue #56 #74

Merged
merged 15 commits into from
Dec 16, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 156 additions & 10 deletions metric_tank/aggmetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type AggMetric struct {
aggregators []*Aggregator
writeQueue chan *Chunk
activeWrite bool
firstChunkT0 uint32
}

// re-order the chunks with the oldest at start of the list and newest at the end.
Expand Down Expand Up @@ -79,6 +80,98 @@ func NewAggMetric(key string, chunkSpan, numChunks uint32, maxDirtyChunks uint32
return &m
}

// Sync the saved state of a chunk by its T0.
func (a *AggMetric) SyncChunkSaveState(ts uint32) {
a.RLock()
defer a.RUnlock()
chunk := a.getChunkByT0(ts)
if chunk != nil {
log.Debug("marking chunk %s:%d as saved.", a.Key, chunk.T0)
chunk.Saved = true
}
}

/* Get a chunk by its T0. It is expected that the caller has acquired a.Lock()*/
func (a *AggMetric) getChunkByT0(ts uint32) *Chunk {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function assumes lock is held? put that in comment. also ByT0 suffix can be dropped. rename arg1 to t0.

// we have no chunks.
if len(a.Chunks) == 0 {
return nil
}

currentT0 := a.Chunks[a.CurrentChunkPos].T0

if ts == currentT0 {
//found our chunk.
return a.Chunks[a.CurrentChunkPos]
}

// requested Chunk is not in our dataset.
if ts > currentT0 {
return nil
}

// calculate the number of chunks ago our requested T0 is,
// assuming that chunks are sequential.
chunksAgo := int((currentT0 - ts) / a.ChunkSpan)

numChunks := len(a.Chunks)
oldestPos := a.CurrentChunkPos + 1
if oldestPos >= numChunks {
oldestPos = 0
}

var guess int

if chunksAgo >= (numChunks - 1) {
// set guess to the oldest chunk.
guess = oldestPos
} else {
guess = a.CurrentChunkPos - chunksAgo
if guess < 0 {
guess += numChunks
}
}

// we now have a good guess at which chunk position our requested TO is in.
c := a.Chunks[guess]

if c.T0 == ts {
// found our chunk.
return c
}

if ts > c.T0 {
// we need to check newer chunks
for c.T0 < currentT0 {
guess += 1
if guess >= numChunks {
guess = 0
}
c = a.Chunks[guess]
if c.T0 == ts {
//found our chunk
return c
}
}
} else {
// we need to check older chunks
oldestT0 := a.Chunks[oldestPos].T0
for c.T0 >= oldestT0 {
guess -= 1
if guess < 0 {
guess += numChunks
}
c = a.Chunks[guess]
if c.T0 == ts {
//found or chunk.
return c
}
}
}
// chunk not found.
return nil
}

func (a *AggMetric) getChunk(pos int) *Chunk {
if pos < 0 {
return nil
Expand Down Expand Up @@ -162,6 +255,21 @@ func (a *AggMetric) Get(from, to uint32) (uint32, []Iter) {
return math.MaxInt32, make([]Iter, 0)
}

// The first chunk is likely only a partial chunk. If we are not the primary node
// we should not serve data from this chunk, and should instead get the chunk from cassandra.
// if we are the primary node, then there is likely no data in Cassandra anyway.
if !clusterStatus.IsPrimary() && oldestChunk.T0 == a.firstChunkT0 {
oldestPos++
if oldestPos >= len(a.Chunks) {
oldestPos = 0
}
oldestChunk = a.getChunk(oldestPos)
if oldestChunk == nil {
log.Error(3, "unexpected nil chunk.")
return math.MaxInt32, make([]Iter, 0)
}
}

if to <= oldestChunk.T0 {
// the requested time range ends before any data we have.
log.Debug("AggMetric %s Get(): no data for requested range", a.Key)
Expand Down Expand Up @@ -230,25 +338,53 @@ func (a *AggMetric) addAggregators(ts uint32, val float64) {
func (a *AggMetric) persist(pos int) {
chunk := a.Chunks[pos]
chunk.Finish()
if *dryRun {
chunk.Saved = true
if !clusterStatus.IsPrimary() {
log.Debug("node is not primary, not saving chunk.")
return
}

log.Debug("sending chunk to write queue")
// create an array of chunks that need to be sent to the writeQueue.
pending := make([]*Chunk, 1)
// add the current chunk to the list of chunks to send to the writeQueue
pending[0] = chunk

// if we recently became the primary, there may be older chunks
// that the old primary did not save. We should check for those
// and save them.
previousPos := pos - 1
if previousPos < 0 {
previousPos += len(a.Chunks)
}
previousChunk := a.Chunks[previousPos]
for (previousChunk.T0 < chunk.T0) && !previousChunk.Saved && !previousChunk.Saving {
log.Debug("old chunk needs saving. Adding %s:%d to writeQueue", a.Key, previousChunk.T0)
pending = append(pending, previousChunk)
previousPos--
if previousPos < 0 {
previousPos += len(a.Chunks)
}
previousChunk = a.Chunks[previousPos]
}

log.Debug("sending %d chunks to write queue", len(pending))

ticker := time.NewTicker(2 * time.Second)
// Processing will remain in this for loop until the chunk can be
pendingChunk := len(pending) - 1

// Processing will remain in this for loop until the chunks can be
// added to the writeQueue. If the writeQueue is already full, then
// the calling function will block waiting for persist() complete.
// the calling function will block waiting for persist() to complete.
// This is intended to put backpressure on our message handlers so
// that they stop consuming messages, leaving them to buffer at
// the message bus.
WAIT:
for {
// the message bus. The "pending" array of chunks are proccessed
// last-to-first ensuring that older data is added to the writeQueue
// before newer data.
for pendingChunk >= 0 {
select {
case a.writeQueue <- chunk:
case a.writeQueue <- pending[pendingChunk]:
pending[pendingChunk].Saving = true
pendingChunk--
log.Debug("chunk in write queue: length: %d", len(a.writeQueue))
break WAIT
case <-ticker.C:
log.Warn("%s:%d blocked pushing to writeQueue.", a.Key, chunk.T0)
}
Expand Down Expand Up @@ -295,6 +431,12 @@ WAIT:
a.Lock()
c.Saved = true
a.Unlock()
msg := &PersistMessage{
Instance: *instance,
Key: a.Key,
T0: c.T0,
}
msg.Send()
}()
log.Debug("save complete. %s:%d %v", a.Key, c.T0, c)
chunkSaveOk.Inc(1)
Expand Down Expand Up @@ -348,6 +490,10 @@ func (a *AggMetric) Add(ts uint32, val float64) {
// no data has been added to this metric at all.
a.Chunks = append(a.Chunks, NewChunk(t0))

// The first chunk is typically going to be a partial chunk
// so we keep a record of it.
a.firstChunkT0 = t0

if err := a.Chunks[0].Push(ts, val); err != nil {
panic(fmt.Sprintf("FATAL ERROR: this should never happen. Pushing initial value <%d,%f> to new chunk at pos 0 failed: %q", ts, val, err))
}
Expand Down
55 changes: 29 additions & 26 deletions metric_tank/aggmetric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ import (
"testing"
)

func init() {
*dryRun = true
}

type point struct {
ts uint32
val float64
Expand Down Expand Up @@ -38,7 +34,9 @@ func (c *Checker) Add(ts uint32, val float64) {
// from to is the range that gets requested from AggMetric
// first/last is what we use as data range to compare to (both inclusive)
// these may be different because AggMetric returns broader rangers (due to packed format),
func (c *Checker) Verify(from, to, first, last uint32) {
func (c *Checker) Verify(primary bool, from, to, first, last uint32) {
currentClusterStatus := clusterStatus.IsPrimary()
clusterStatus.Set(primary)
_, iters := c.agg.Get(from, to)
// we don't do checking or fancy logic, it is assumed that the caller made sure first and last are ts of actual points
var pi int // index of first point we want
Expand All @@ -65,62 +63,67 @@ func (c *Checker) Verify(from, to, first, last uint32) {
if index != pj {
c.t.Fatalf("not all values returned. missing %v", c.points[index:pj+1])
}
clusterStatus.Set(currentClusterStatus)
}

func TestAggMetric(t *testing.T) {
stats, _ := helper.New(false, "", "standard", "metrics_tank", "")
clusterStatus = NewClusterStatus("default", false)
initMetrics(stats)

c := NewChecker(t, NewAggMetric("foo", 100, 5, 1, []aggSetting{}...))

// basic case, single range
c.Add(101, 101)
c.Verify(100, 200, 101, 101)
c.Verify(true, 100, 200, 101, 101)
c.Add(105, 105)
c.Verify(100, 199, 101, 105)
c.Verify(true, 100, 199, 101, 105)
c.Add(115, 115)
c.Add(125, 125)
c.Add(135, 135)
c.Verify(100, 199, 101, 135)
c.Verify(true, 100, 199, 101, 135)

// add new ranges, aligned and unaligned
c.Add(200, 200)
c.Add(315, 315)
c.Verify(100, 399, 101, 315)
c.Verify(true, 100, 399, 101, 315)

// verify as secondary node. Data from the first chunk should not be returned.
c.Verify(false, 100, 399, 200, 315)

// get subranges
c.Verify(120, 299, 101, 200)
c.Verify(220, 299, 200, 200)
c.Verify(312, 330, 315, 315)
c.Verify(true, 120, 299, 101, 200)
c.Verify(true, 220, 299, 200, 200)
c.Verify(true, 312, 330, 315, 315)

// border dancing. good for testing inclusivity and exclusivity
c.Verify(100, 199, 101, 135)
c.Verify(100, 200, 101, 135)
c.Verify(100, 201, 101, 200)
c.Verify(198, 199, 101, 135)
c.Verify(199, 200, 101, 135)
c.Verify(200, 201, 200, 200)
c.Verify(201, 202, 200, 200)
c.Verify(299, 300, 200, 200)
c.Verify(300, 301, 315, 315)
c.Verify(true, 100, 199, 101, 135)
c.Verify(true, 100, 200, 101, 135)
c.Verify(true, 100, 201, 101, 200)
c.Verify(true, 198, 199, 101, 135)
c.Verify(true, 199, 200, 101, 135)
c.Verify(true, 200, 201, 200, 200)
c.Verify(true, 201, 202, 200, 200)
c.Verify(true, 299, 300, 200, 200)
c.Verify(true, 300, 301, 315, 315)

// skipping
c.Add(510, 510)
c.Add(512, 512)
c.Verify(100, 599, 101, 512)
c.Verify(true, 100, 599, 101, 512)

// basic wraparound
c.Add(610, 610)
c.Add(612, 612)
c.Add(710, 710)
c.Add(712, 712)
// TODO would be nice to test that it panics when requesting old range. something with recover?
//c.Verify(100, 799, 101, 512)
//c.Verify(true, 100, 799, 101, 512)

// largest range we have so far
c.Verify(300, 799, 315, 712)
c.Verify(true, 300, 799, 315, 712)
// a smaller range
c.Verify(502, 799, 510, 712)
c.Verify(true, 502, 799, 510, 712)

// the circular buffer had these ranges:
// 100 200 300 skipped 500
Expand All @@ -133,7 +136,7 @@ func TestAggMetric(t *testing.T) {
// but we just check we only get this point
c.Add(1299, 1299)
// TODO: implement skips and enable this
// c.Verify(800, 1299, 1299, 1299)
// c.Verify(true, 800, 1299, 1299, 1299)
}

// basic expected RAM usage for 1 iteration (= 1 days)
Expand Down
3 changes: 3 additions & 0 deletions metric_tank/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ func TestAggBoundary(t *testing.T) {

// note that values don't get "committed" to the metric until the aggregation interval is complete
func TestAggregator(t *testing.T) {
clusterStatus = NewClusterStatus("default", false)
compare := func(key string, metric Metric, expected []Point) {
clusterStatus.Set(true)
_, iters := metric.Get(0, 1000)
got := make([]Point, 0, len(expected))
for _, iter := range iters {
Expand All @@ -56,6 +58,7 @@ func TestAggregator(t *testing.T) {
}
}
}
clusterStatus.Set(false)
}
agg := NewAggregator("test", 60, 120, 10, 10)
agg.Add(100, 123.4)
Expand Down
3 changes: 2 additions & 1 deletion metric_tank/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ type Chunk struct {
LastTs uint32 // last TS seen, not computed or anything
NumPoints uint32
Saved bool
Saving bool
LastWrite uint32
}

func NewChunk(t0 uint32) *Chunk {
// we must set LastWrite here as well to make sure a new Chunk doesn't get immediately
// garbage collected right after creating it, before we can push to it
return &Chunk{tsz.New(t0), t0, 0, 0, false, uint32(time.Now().Unix())}
return &Chunk{tsz.New(t0), t0, 0, 0, false, false, uint32(time.Now().Unix())}
}

func (c *Chunk) String() string {
Expand Down
Loading