Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

559 deterministic update distribution #569

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Godeps/Readme

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 11 additions & 8 deletions idx/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package cassandra
import (
"flag"
"fmt"
"math/rand"
"strings"
"sync"
"time"

"github.com/gocql/gocql"
"github.com/huichen/murmur"
"github.com/raintank/metrictank/cassandra"
"github.com/raintank/metrictank/cluster"
"github.com/raintank/metrictank/idx"
Expand Down Expand Up @@ -233,6 +233,15 @@ func (c *CasIdx) Stop() {
c.session.Close()
}

func (c *CasIdx) shouldIdxUpdate(def *schema.MetricDefinition, now int64) bool {
if updateInterval == 0 {
return def.LastUpdate < now
}
updateIntervalInt64 := int64(updateInterval)
idHash := int64(murmur.Murmur3([]byte(def.Id)))
return now > (def.LastUpdate + (updateIntervalInt64 - (def.LastUpdate % updateIntervalInt64)) + (idHash % updateIntervalInt64))
}

func (c *CasIdx) AddOrUpdate(data *schema.MetricData, partition int32) idx.Archive {
pre := time.Now()
existing, inMemory := c.MemoryIdx.Get(data.Id)
Expand All @@ -241,13 +250,7 @@ func (c *CasIdx) AddOrUpdate(data *schema.MetricData, partition int32) idx.Archi

if inMemory {
if existing.Partition == partition {
var oldest time.Time
if updateInterval > 0 {
oldest = time.Now().Add(-1 * updateInterval).Add(-1 * time.Duration(rand.Int63n(updateInterval.Nanoseconds()*int64(updateFuzzyness*100)/100)))
} else {
oldest = time.Now()
}
updateIdx = (existing.LastUpdate < oldest.Unix())
updateIdx = c.shouldIdxUpdate(&existing.MetricDefinition, time.Now().Unix())
} else {
if updateCassIdx {
// the partition of the metric has changed. So we need to delete
Expand Down
61 changes: 61 additions & 0 deletions idx/cassandra/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,67 @@ func TestGetAddKey(t *testing.T) {
})
}

func TestShouldIdxUpdateWithIntervalZero(t *testing.T) {
ix := New()
ix.Init()

originalUpdateInterval := updateInterval
defer func() { updateInterval = originalUpdateInterval }()
updateInterval = 0

def := schema.MetricDefinition{}
decision := ix.shouldIdxUpdate(&def, def.LastUpdate+1)

Convey("When updateInterval is 0 and time>LastUpdate the decision", t, func() {
So(decision, ShouldBeTrue)
})
}

func TestShouldIdxUpdateWithVariousTimeRanges(t *testing.T) {
ix := New()
ix.Init()

originalUpdateInterval := updateInterval
defer func() { updateInterval = originalUpdateInterval }()
updateInterval = 10

getResults := func(from, to int64) []bool {
results := make([]bool, 0, int(to-from)*(int(updateInterval)))
def := schema.MetricDefinition{
Id: "someString",
}

for now := from; now < to; now++ {
for lastUpdate := now - int64(updateInterval); lastUpdate < now; lastUpdate++ {
def.LastUpdate = lastUpdate
results = append(results, ix.shouldIdxUpdate(&def, now))
}
}

return results
}

results1 := getResults(10000, 10100)
results2 := getResults(10050, 10150)
results3 := getResults(90, 190)

equal := true
for i := 0; i < len(results1); i++ {
if results1[i] != results2[i] {
equal = false
break
}
if results2[i] != results3[i] {
equal = false
break
}
}

Convey("When iterating over full updateIntervals the true/false sets comparison", t, func() {
So(equal, ShouldBeTrue)
})
}

func TestFind(t *testing.T) {
ix := New()
ix.Init()
Expand Down
8 changes: 8 additions & 0 deletions vendor/github.com/huichen/murmur/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions vendor/github.com/huichen/murmur/license.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 58 additions & 0 deletions vendor/github.com/huichen/murmur/murmur.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.