diff --git a/docker/docker-cluster/metrictank.ini b/docker/docker-cluster/metrictank.ini index 7b28a16e10..9d1ac189df 100644 --- a/docker/docker-cluster/metrictank.ini +++ b/docker/docker-cluster/metrictank.ini @@ -248,10 +248,8 @@ max-stale = 0 prune-interval = 3h # synchronize index changes to cassandra. not all your nodes need to do this. update-cassandra-index = true -#frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates +#frequency at which we should update flush changes to cassandra. only relevant if update-cassandra-index is true. update-interval = 4h -#fuzzyness factor for update-interval. should be in the range 0 > fuzzyness <= 1. With an updateInterval of 4hours and fuzzyness of 0.5, metricDefs will be updated every 4-6hours. -update-fuzzyness = 0.5 # enable SSL connection to cassandra ssl = false # cassandra CA certficate path when using SSL diff --git a/docker/docker-dev-custom-cfg-kafka/metrictank.ini b/docker/docker-dev-custom-cfg-kafka/metrictank.ini index 9c1e3b3ade..72d4519367 100644 --- a/docker/docker-dev-custom-cfg-kafka/metrictank.ini +++ b/docker/docker-dev-custom-cfg-kafka/metrictank.ini @@ -248,10 +248,8 @@ max-stale = 0 prune-interval = 3h # synchronize index changes to cassandra. not all your nodes need to do this. update-cassandra-index = true -#frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates +#frequency at which we should update flush changes to cassandra. only relevant if update-cassandra-index is true. update-interval = 4h -#fuzzyness factor for update-interval. should be in the range 0 > fuzzyness <= 1. With an updateInterval of 4hours and fuzzyness of 0.5, metricDefs will be updated every 4-6hours. -update-fuzzyness = 0.5 # enable SSL connection to cassandra ssl = false # cassandra CA certficate path when using SSL diff --git a/docs/config.md b/docs/config.md index 45eaf4bad7..418a4a274d 100644 --- a/docs/config.md +++ b/docs/config.md @@ -297,10 +297,8 @@ max-stale = 0 prune-interval = 3h # synchronize index changes to cassandra. not all your nodes need to do this. update-cassandra-index = true -#frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates +#frequency at which we should update flush changes to cassandra. only relevant if update-cassandra-index is true. update-interval = 4h -#fuzzyness factor for update-interval. should be in the range 0 > fuzzyness <= 1. With an updateInterval of 4hours and fuzzyness of 0.5, metricDefs will be updated every 4-6hours. -update-fuzzyness = 0.5 # enable SSL connection to cassandra ssl = false # cassandra CA certficate path when using SSL diff --git a/docs/metadata.md b/docs/metadata.md index 1e8eb1ce49..220ab118f0 100644 --- a/docs/metadata.md +++ b/docs/metadata.md @@ -25,7 +25,7 @@ enabled = true This is the recommended option because it persists. * type: Memory-Idx for search queries, backed by Cassandra for persistence -* persistence: persists new metricDefinitions as they are seen. At startup, the internal memory index is rebuilt from all metricDefinitions that have been stored in Cassandra. Metrictank won’t be considered ready (be able to ingest metrics or handle searches) until the index has been completely rebuilt. +* persistence: persists new metricDefinitions as they are seen and every update-interval. At startup, the internal memory index is rebuilt from all metricDefinitions that have been stored in Cassandra. Metrictank won’t be considered ready (be able to ingest metrics or handle searches) until the index has been completely rebuilt. * efficiency: On low end hardware the index rebuilds at about 70000 metricDefinitions per second. Saving new metrics works pretty fast. Metrictank will initialize Cassandra with the needed keyspace and tabe. However if you are running a Cassandra cluster then you should tune the keyspace to suit your deployment. @@ -51,8 +51,6 @@ num-conns = 10 write-queue-size = 100000 ``` -Note: -* All metrictanks write to Cassandra. this is not very efficient. ## The anatomy of a metricdef @@ -65,16 +63,15 @@ The schema is as follows: ``` type MetricDefinition struct { Id string - OrgId int + OrgId int + Partition int Name string // graphite format Metric string // kairosdb format (like graphite, but not including some tags) Interval int Unit string Mtype string Tags []string - LastUpdate int64 - Nodes map[string]string - NodeCount int + LastUpdate int64 } ``` diff --git a/docs/metrics.md b/docs/metrics.md index bab3e2eac3..ac019bf0e7 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -132,6 +132,8 @@ how many insert queries for a metric failed (triggered by an add or an update) time inserts spent in queue before being executed * `idx.cassandra.update`: the duration of an update of one metric to the cassandra idx, including the update to the in-memory index, excluding any insert/delete queries +* `idx.cassandra.save.skipped`: +how many saves have been skipped due to the writeQueue being full * `idx.memory.add`: the duration of an add of a metric to the memory idx * `idx.memory.ops.add`: diff --git a/idx/cassandra/cassandra.go b/idx/cassandra/cassandra.go index ff3ac3c118..b7b4742d14 100644 --- a/idx/cassandra/cassandra.go +++ b/idx/cassandra/cassandra.go @@ -3,7 +3,6 @@ package cassandra import ( "flag" "fmt" - "math/rand" "strings" "sync" "time" @@ -60,7 +59,9 @@ var ( statPruneDuration = stats.NewLatencyHistogram15s32("idx.cassandra.prune") // metric idx.cassandra.delete is the duration of a delete of one or more metrics from the cassandra idx, including the delete from the in-memory index and the delete query statDeleteDuration = stats.NewLatencyHistogram15s32("idx.cassandra.delete") - errmetrics = cassandra.NewErrMetrics("idx.cassandra") + // metric idx.cassandra.save.skipped is how many saves have been skipped due to the writeQueue being full + statSaveSkipped = stats.NewCounter32("idx.cassandra.save.skipped") + errmetrics = cassandra.NewErrMetrics("idx.cassandra") Enabled bool ssl bool @@ -80,7 +81,7 @@ var ( pruneInterval time.Duration updateCassIdx bool updateInterval time.Duration - updateFuzzyness float64 + updateInterval32 uint32 ) func ConfigSetup() *flag.FlagSet { @@ -95,7 +96,6 @@ func ConfigSetup() *flag.FlagSet { casIdx.IntVar(&writeQueueSize, "write-queue-size", 100000, "Max number of metricDefs allowed to be unwritten to cassandra") casIdx.BoolVar(&updateCassIdx, "update-cassandra-index", true, "synchronize index changes to cassandra. not all your nodes need to do this.") casIdx.DurationVar(&updateInterval, "update-interval", time.Hour*3, "frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates") - casIdx.Float64Var(&updateFuzzyness, "update-fuzzyness", 0.5, "fuzzyness factor for update-interval. should be in the range 0 > fuzzyness <= 1. With an updateInterval of 4hours and fuzzyness of 0.5, metricDefs will be updated every 4-6hours.") casIdx.DurationVar(&maxStale, "max-stale", 0, "clear series from the index if they have not been seen for this much time.") casIdx.DurationVar(&pruneInterval, "prune-interval", time.Hour*3, "Interval at which the index should be checked for stale series.") casIdx.IntVar(&protoVer, "protocol-version", 4, "cql protocol version to use") @@ -154,6 +154,7 @@ func New() *CasIdx { if updateCassIdx { idx.writeQueue = make(chan writeReq, writeQueueSize) } + updateInterval32 = uint32(updateInterval.Nanoseconds() / int64(time.Second)) return idx } @@ -236,46 +237,68 @@ func (c *CasIdx) Stop() { func (c *CasIdx) AddOrUpdate(data *schema.MetricData, partition int32) idx.Archive { pre := time.Now() existing, inMemory := c.MemoryIdx.Get(data.Id) - updateIdx := false + archive := c.MemoryIdx.AddOrUpdate(data, partition) stat := statUpdateDuration + if !inMemory { + stat = statAddDuration + } + if !updateCassIdx { + stat.Value(time.Since(pre)) + return archive + } - if inMemory { - if existing.Partition == partition { - var oldest time.Time - if updateInterval > 0 { - oldest = time.Now().Add(-1 * updateInterval).Add(-1 * time.Duration(rand.Int63n(updateInterval.Nanoseconds()*int64(updateFuzzyness*100)/100))) - } else { - oldest = time.Now() - } - updateIdx = (existing.LastUpdate < oldest.Unix()) - } else { - if updateCassIdx { - // the partition of the metric has changed. So we need to delete - // the current metricDef from cassandra. We do this in a separate - // goroutine as we dont want to block waiting for the delete to succeed. - go func() { - if err := c.deleteDef(&existing); err != nil { - log.Error(3, err.Error()) - } - }() + now := uint32(time.Now().Unix()) + + // Cassandra uses partition id asthe partitionin key, so an "update" that changes the partition for + // an existing metricDef will just create a new row in the table and wont remove the old row. + // So we need to explicitly delete the old entry. + if inMemory && existing.Partition != partition { + go func() { + if err := c.deleteDef(&existing); err != nil { + log.Error(3, err.Error()) } - updateIdx = true - } - } else { - updateIdx = true - stat = statAddDuration + }() } - if updateIdx { - archive := c.MemoryIdx.AddOrUpdate(data, partition) - if updateCassIdx { - log.Debug("cassandra-idx updating def in index.") - c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition} - } + // check if we need to save to cassandra. + if archive.LastSave >= (now - updateInterval32) { stat.Value(time.Since(pre)) return archive } - return existing + + // This is just a safety precaution to prevent corrupt index entries. + // This ensures that the index entry always contains the correct metricDefinition data. + if inMemory { + archive.MetricDefinition = *schema.MetricDefinitionFromMetricData(data) + archive.MetricDefinition.Partition = partition + } + + // if the entry has not been saved for 1.5x updateInterval + // then perform a blocking save. (bit shifting to the right 1 bit, divides by 2) + if archive.LastSave < (now - updateInterval32 - (updateInterval32 >> 1)) { + log.Debug("cassandra-idx updating def in index.") + c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition} + archive.LastSave = now + c.MemoryIdx.Update(archive) + } else { + // perform a non-blocking write to the writeQueue. If the queue is full, then + // this will fail and we wont update the LastSave timestamp. The next time + // the metric is seen, the previous lastSave timestamp will still be in place and so + // we will try and save again. This will continue until we are successful or the + // lastSave timestamp become more then 1.5 x UpdateInterval, in which case we will + // do a blocking write to the queue. + select { + case c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}: + archive.LastSave = now + c.MemoryIdx.Update(archive) + default: + statSaveSkipped.Inc() + log.Debug("writeQueue is full, update not saved.") + } + } + + stat.Value(time.Since(pre)) + return archive } func (c *CasIdx) rebuildIndex() { diff --git a/idx/cassandra/cassandra_test.go b/idx/cassandra/cassandra_test.go index 9afbb94de8..2f70863ac7 100644 --- a/idx/cassandra/cassandra_test.go +++ b/idx/cassandra/cassandra_test.go @@ -22,10 +22,18 @@ func init() { numConns = 1 writeQueueSize = 1000 protoVer = 4 + updateCassIdx = false cluster.Init("default", "test", time.Now(), "http", 6060) } +func initForTests(c *CasIdx) error { + if err := c.MemoryIdx.Init(); err != nil { + return err + } + return nil +} + func getSeriesNames(depth, count int, prefix string) []string { series := make([]string, count) for i := 0; i < count; i++ { @@ -71,7 +79,7 @@ func getMetricData(orgId, depth, count, interval int, prefix string) []*schema.M func TestGetAddKey(t *testing.T) { ix := New() - ix.Init() + initForTests(ix) publicSeries := getMetricData(-1, 2, 5, 10, "metric.public") org1Series := getMetricData(1, 2, 5, 10, "metric.org1") @@ -108,9 +116,104 @@ func TestGetAddKey(t *testing.T) { }) } +func TestAddToWriteQueue(t *testing.T) { + originalUpdateCassIdx := updateCassIdx + originalUpdateInterval := updateInterval + originalWriteQSize := writeQueueSize + + defer func() { + updateCassIdx = originalUpdateCassIdx + updateInterval = originalUpdateInterval + writeQueueSize = originalWriteQSize + }() + + updateCassIdx = true + updateInterval = 10 + writeQueueSize = 5 + ix := New() + initForTests(ix) + metrics := getMetricData(1, 2, 5, 10, "metric.demo") + Convey("When writeQueue is enabled", t, func() { + Convey("When new metrics being added", func() { + for _, s := range metrics { + ix.AddOrUpdate(s, 1) + select { + case wr := <-ix.writeQueue: + So(wr.def.Id, ShouldEqual, s.Id) + archive, inMem := ix.Get(wr.def.Id) + So(inMem, ShouldBeTrue) + now := uint32(time.Now().Unix()) + So(archive.LastSave, ShouldBeBetweenOrEqual, now-1, now+1) + case <-time.After(time.Second): + t.Fail() + } + } + }) + Convey("When existing metrics are added and lastSave is recent", func() { + for _, s := range metrics { + s.Time = time.Now().Unix() + ix.AddOrUpdate(s, 1) + } + wrCount := 0 + + LOOP_WR: + for { + select { + case <-ix.writeQueue: + wrCount++ + default: + break LOOP_WR + } + } + So(wrCount, ShouldEqual, 0) + }) + Convey("When existing metrics are added and lastSave is old", func() { + for _, s := range metrics { + s.Time = time.Now().Unix() + archive, _ := ix.Get(s.Id) + archive.LastSave = uint32(time.Now().Unix() - 100) + ix.Update(archive) + } + for _, s := range metrics { + ix.AddOrUpdate(s, 1) + select { + case wr := <-ix.writeQueue: + So(wr.def.Id, ShouldEqual, s.Id) + archive, inMem := ix.Get(wr.def.Id) + So(inMem, ShouldBeTrue) + now := uint32(time.Now().Unix()) + So(archive.LastSave, ShouldBeBetweenOrEqual, now-1, now+1) + case <-time.After(time.Second): + t.Fail() + } + } + }) + Convey("When new metrics are added and writeQueue is full", func() { + newMetrics := getMetricData(1, 2, 6, 10, "metric.demo2") + pre := time.Now() + go func() { + time.Sleep(time.Second) + //drain the writeQueue + for range ix.writeQueue { + continue + } + }() + + for _, s := range newMetrics { + ix.AddOrUpdate(s, 1) + } + //it should take at least 1 second to add the defs, as the queue will be full + // until the above goroutine empties it, leading to a blocking write. + So(time.Now(), ShouldHappenAfter, pre.Add(time.Second)) + }) + }) + ix.MemoryIdx.Stop() + close(ix.writeQueue) +} + func TestFind(t *testing.T) { ix := New() - ix.Init() + initForTests(ix) for _, s := range getMetricData(-1, 2, 5, 10, "metric.demo") { ix.AddOrUpdate(s, 1) } @@ -217,7 +320,7 @@ func BenchmarkIndexing(b *testing.B) { writeQueueSize = 10 protoVer = 4 updateInterval = time.Hour - updateFuzzyness = 1.0 + updateCassIdx = true ix := New() tmpSession, err := ix.cluster.CreateSession() if err != nil { @@ -262,7 +365,7 @@ func BenchmarkLoad(b *testing.B) { writeQueueSize = 10 protoVer = 4 updateInterval = time.Hour - updateFuzzyness = 1.0 + updateCassIdx = true ix := New() tmpSession, err := ix.cluster.CreateSession() @@ -275,7 +378,6 @@ func BenchmarkLoad(b *testing.B) { if err != nil { b.Skipf("can't initialize cassandra: %s", err) } - insertDefs(ix, b.N) ix.Stop() @@ -285,3 +387,50 @@ func BenchmarkLoad(b *testing.B) { ix.Init() ix.Stop() } + +func BenchmarkIndexingWithUpdates(b *testing.B) { + cluster.Manager.SetPartitions([]int32{1}) + keyspace = "metrictank" + hosts = "localhost:9042" + consistency = "one" + timeout = time.Second + numConns = 10 + writeQueueSize = 10 + protoVer = 4 + updateInterval = time.Hour + updateCassIdx = true + ix := New() + tmpSession, err := ix.cluster.CreateSession() + if err != nil { + b.Skipf("can't connect to cassandra: %s", err) + } + tmpSession.Query("TRUNCATE metrictank.metric_idx").Exec() + tmpSession.Close() + if err != nil { + b.Skipf("can't connect to cassandra: %s", err) + } + ix.Init() + insertDefs(ix, b.N) + updates := make([]*schema.MetricData, b.N) + var series string + var data *schema.MetricData + for n := 0; n < b.N; n++ { + series = "some.metric." + strconv.Itoa(n) + data = &schema.MetricData{ + Name: series, + Metric: series, + Interval: 10, + OrgId: 1, + Time: 10, + } + data.SetId() + updates[n] = data + } + b.ReportAllocs() + b.ResetTimer() + + for n := 0; n < b.N; n++ { + ix.AddOrUpdate(updates[n], 1) + } + ix.Stop() +} diff --git a/idx/idx.go b/idx/idx.go index b83169d064..6cac955b56 100644 --- a/idx/idx.go +++ b/idx/idx.go @@ -26,16 +26,15 @@ type Archive struct { schema.MetricDefinition SchemaId uint16 // index in mdata.schemas (not persisted) AggId uint16 // index in mdata.aggregations (not persisted) + LastSave uint32 // last time the metricDefinition was saved to a backend store (cassandra) } // used primarily by tests, for convenience func NewArchiveBare(name string) Archive { return Archive{ - schema.MetricDefinition{ + MetricDefinition: schema.MetricDefinition{ Name: name, }, - 0, - 0, } } diff --git a/idx/memory/memory.go b/idx/memory/memory.go index df39fa496c..eec9bd1caa 100644 --- a/idx/memory/memory.go +++ b/idx/memory/memory.go @@ -120,6 +120,16 @@ func (m *MemoryIdx) AddOrUpdate(data *schema.MetricData, partition int32) idx.Ar return archive } +func (m *MemoryIdx) Update(entry idx.Archive) { + m.Lock() + if _, ok := m.DefById[entry.Id]; !ok { + m.Unlock() + return + } + *(m.DefById[entry.Id]) = entry + m.Unlock() +} + // Used to rebuild the index from an existing set of metricDefinitions. func (m *MemoryIdx) Load(defs []schema.MetricDefinition) int { m.Lock() @@ -132,6 +142,12 @@ func (m *MemoryIdx) Load(defs []schema.MetricDefinition) int { continue } m.add(def) + // as we are loading the metricDefs from a persistant store, set the lastSave + // to the lastUpdate timestamp. This wont exactly match the true lastSave Timstamp, + // but it will be close enough and it will always be true that the lastSave was at + // or after this time. For metrics that are sent at or close to real time (the typical + // use case), then the value will be within a couple of seconds of the true lastSave. + m.DefById[def.Id].LastSave = uint32(def.LastUpdate) num++ statMetricsActive.Inc() statAddDuration.Value(time.Since(pre)) @@ -145,9 +161,9 @@ func (m *MemoryIdx) add(def *schema.MetricDefinition) idx.Archive { schemaId, _ := mdata.MatchSchema(def.Name) aggId, _ := mdata.MatchAgg(def.Name) archive := &idx.Archive{ - *def, - schemaId, - aggId, + MetricDefinition: *def, + SchemaId: schemaId, + AggId: aggId, } //first check to see if a tree has been created for this OrgId diff --git a/metrictank-sample.ini b/metrictank-sample.ini index d0c7834ea5..58ff9cb3e9 100644 --- a/metrictank-sample.ini +++ b/metrictank-sample.ini @@ -251,10 +251,8 @@ max-stale = 0 prune-interval = 3h # synchronize index changes to cassandra. not all your nodes need to do this. update-cassandra-index = true -#frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates +#frequency at which we should update flush changes to cassandra. only relevant if update-cassandra-index is true. update-interval = 4h -#fuzzyness factor for update-interval. should be in the range 0 > fuzzyness <= 1. With an updateInterval of 4hours and fuzzyness of 0.5, metricDefs will be updated every 4-6hours. -update-fuzzyness = 0.5 # enable SSL connection to cassandra ssl = false # cassandra CA certficate path when using SSL diff --git a/scripts/config/metrictank-docker.ini b/scripts/config/metrictank-docker.ini index e8453da1de..2307f960ad 100644 --- a/scripts/config/metrictank-docker.ini +++ b/scripts/config/metrictank-docker.ini @@ -248,10 +248,8 @@ max-stale = 0 prune-interval = 3h # synchronize index changes to cassandra. not all your nodes need to do this. update-cassandra-index = true -#frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates +#frequency at which we should update flush changes to cassandra. only relevant if update-cassandra-index is true. update-interval = 4h -#fuzzyness factor for update-interval. should be in the range 0 > fuzzyness <= 1. With an updateInterval of 4hours and fuzzyness of 0.5, metricDefs will be updated every 4-6hours. -update-fuzzyness = 0.5 # enable SSL connection to cassandra ssl = false # cassandra CA certficate path when using SSL diff --git a/scripts/config/metrictank-package.ini b/scripts/config/metrictank-package.ini index 438bec27eb..24d5ce6b9b 100644 --- a/scripts/config/metrictank-package.ini +++ b/scripts/config/metrictank-package.ini @@ -248,10 +248,8 @@ max-stale = 0 prune-interval = 3h # synchronize index changes to cassandra. not all your nodes need to do this. update-cassandra-index = true -#frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates +#frequency at which we should update flush changes to cassandra. only relevant if update-cassandra-index is true. update-interval = 4h -#fuzzyness factor for update-interval. should be in the range 0 > fuzzyness <= 1. With an updateInterval of 4hours and fuzzyness of 0.5, metricDefs will be updated every 4-6hours. -update-fuzzyness = 0.5 # enable SSL connection to cassandra ssl = false # cassandra CA certficate path when using SSL