-
Notifications
You must be signed in to change notification settings - Fork 107
update idx handling #574
update idx handling #574
Changes from all commits
b853661
779c607
913cb6f
c3749a2
66df35a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,6 @@ package cassandra | |
import ( | ||
"flag" | ||
"fmt" | ||
"math/rand" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
@@ -60,7 +59,9 @@ var ( | |
statPruneDuration = stats.NewLatencyHistogram15s32("idx.cassandra.prune") | ||
// metric idx.cassandra.delete is the duration of a delete of one or more metrics from the cassandra idx, including the delete from the in-memory index and the delete query | ||
statDeleteDuration = stats.NewLatencyHistogram15s32("idx.cassandra.delete") | ||
errmetrics = cassandra.NewErrMetrics("idx.cassandra") | ||
// metric idx.cassandra.save.skipped is how many saves have been skipped due to the writeQueue being full | ||
statSaveSkipped = stats.NewCounter32("idx.cassandra.save.skipped") | ||
errmetrics = cassandra.NewErrMetrics("idx.cassandra") | ||
|
||
Enabled bool | ||
ssl bool | ||
|
@@ -80,7 +81,7 @@ var ( | |
pruneInterval time.Duration | ||
updateCassIdx bool | ||
updateInterval time.Duration | ||
updateFuzzyness float64 | ||
updateInterval32 uint32 | ||
) | ||
|
||
func ConfigSetup() *flag.FlagSet { | ||
|
@@ -95,7 +96,6 @@ func ConfigSetup() *flag.FlagSet { | |
casIdx.IntVar(&writeQueueSize, "write-queue-size", 100000, "Max number of metricDefs allowed to be unwritten to cassandra") | ||
casIdx.BoolVar(&updateCassIdx, "update-cassandra-index", true, "synchronize index changes to cassandra. not all your nodes need to do this.") | ||
casIdx.DurationVar(&updateInterval, "update-interval", time.Hour*3, "frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates") | ||
casIdx.Float64Var(&updateFuzzyness, "update-fuzzyness", 0.5, "fuzzyness factor for update-interval. should be in the range 0 > fuzzyness <= 1. With an updateInterval of 4hours and fuzzyness of 0.5, metricDefs will be updated every 4-6hours.") | ||
casIdx.DurationVar(&maxStale, "max-stale", 0, "clear series from the index if they have not been seen for this much time.") | ||
casIdx.DurationVar(&pruneInterval, "prune-interval", time.Hour*3, "Interval at which the index should be checked for stale series.") | ||
casIdx.IntVar(&protoVer, "protocol-version", 4, "cql protocol version to use") | ||
|
@@ -154,6 +154,7 @@ func New() *CasIdx { | |
if updateCassIdx { | ||
idx.writeQueue = make(chan writeReq, writeQueueSize) | ||
} | ||
updateInterval32 = uint32(updateInterval.Nanoseconds() / int64(time.Second)) | ||
return idx | ||
} | ||
|
||
|
@@ -236,46 +237,68 @@ func (c *CasIdx) Stop() { | |
func (c *CasIdx) AddOrUpdate(data *schema.MetricData, partition int32) idx.Archive { | ||
pre := time.Now() | ||
existing, inMemory := c.MemoryIdx.Get(data.Id) | ||
updateIdx := false | ||
archive := c.MemoryIdx.AddOrUpdate(data, partition) | ||
stat := statUpdateDuration | ||
if !inMemory { | ||
stat = statAddDuration | ||
} | ||
if !updateCassIdx { | ||
stat.Value(time.Since(pre)) | ||
return archive | ||
} | ||
|
||
if inMemory { | ||
if existing.Partition == partition { | ||
var oldest time.Time | ||
if updateInterval > 0 { | ||
oldest = time.Now().Add(-1 * updateInterval).Add(-1 * time.Duration(rand.Int63n(updateInterval.Nanoseconds()*int64(updateFuzzyness*100)/100))) | ||
} else { | ||
oldest = time.Now() | ||
} | ||
updateIdx = (existing.LastUpdate < oldest.Unix()) | ||
} else { | ||
if updateCassIdx { | ||
// the partition of the metric has changed. So we need to delete | ||
// the current metricDef from cassandra. We do this in a separate | ||
// goroutine as we dont want to block waiting for the delete to succeed. | ||
go func() { | ||
if err := c.deleteDef(&existing); err != nil { | ||
log.Error(3, err.Error()) | ||
} | ||
}() | ||
now := uint32(time.Now().Unix()) | ||
|
||
// Cassandra uses partition id asthe partitionin key, so an "update" that changes the partition for | ||
// an existing metricDef will just create a new row in the table and wont remove the old row. | ||
// So we need to explicitly delete the old entry. | ||
if inMemory && existing.Partition != partition { | ||
go func() { | ||
if err := c.deleteDef(&existing); err != nil { | ||
log.Error(3, err.Error()) | ||
} | ||
updateIdx = true | ||
} | ||
} else { | ||
updateIdx = true | ||
stat = statAddDuration | ||
}() | ||
} | ||
|
||
if updateIdx { | ||
archive := c.MemoryIdx.AddOrUpdate(data, partition) | ||
if updateCassIdx { | ||
log.Debug("cassandra-idx updating def in index.") | ||
c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition} | ||
} | ||
// check if we need to save to cassandra. | ||
if archive.LastSave >= (now - updateInterval32) { | ||
stat.Value(time.Since(pre)) | ||
return archive | ||
} | ||
return existing | ||
|
||
// This is just a safety precaution to prevent corrupt index entries. | ||
// This ensures that the index entry always contains the correct metricDefinition data. | ||
if inMemory { | ||
archive.MetricDefinition = *schema.MetricDefinitionFromMetricData(data) | ||
archive.MetricDefinition.Partition = partition | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. MemoryIdx.AddOrUpdate already made sure to update/set Partition field correctly? |
||
} | ||
|
||
// if the entry has not been saved for 1.5x updateInterval | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we use 1.5x the interval? wouldn't it make more sense (and be easier to reason about) to start doing blocking writes at exactly the 1 updateInterval mark? BTW the compiler should optimize uint divisions by factors of two, i don't think we need to do it manually? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. Writes aren't tried until exactly the updateInterval or greater has passed. If you forced a bocking write at exactly the 1 updateInterval then you would only ever try the non-blocking once, forcing all saves to be completed within 2x your metric interval. That is way to aggressive. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as for why 1.5X, because that is what we have been using in hosted-metrics. ie updateFuzziness of 0.5 leading to updates to happen between updateInterval and updateInterval x 1.5 |
||
// then perform a blocking save. (bit shifting to the right 1 bit, divides by 2) | ||
if archive.LastSave < (now - updateInterval32 - (updateInterval32 >> 1)) { | ||
log.Debug("cassandra-idx updating def in index.") | ||
c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition} | ||
archive.LastSave = now | ||
c.MemoryIdx.Update(archive) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we maybe collect some statistics that track how many % of the updates happen due to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is a good idea. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i added a counter idx.cassandra.save.skipped to keep track of how many saves are being skipped due to the writeQ being full. Spikes in this counter would be normal, but continued growth over an extended time would indicate a performance problem. |
||
} else { | ||
// perform a non-blocking write to the writeQueue. If the queue is full, then | ||
// this will fail and we wont update the LastSave timestamp. The next time | ||
// the metric is seen, the previous lastSave timestamp will still be in place and so | ||
// we will try and save again. This will continue until we are successful or the | ||
// lastSave timestamp become more then 1.5 x UpdateInterval, in which case we will | ||
// do a blocking write to the queue. | ||
select { | ||
case c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. faster? yes. But as we use this for measuring how long items are spending in the queue |
||
archive.LastSave = now | ||
c.MemoryIdx.Update(archive) | ||
default: | ||
statSaveSkipped.Inc() | ||
log.Debug("writeQueue is full, update not saved.") | ||
} | ||
} | ||
|
||
stat.Value(time.Since(pre)) | ||
return archive | ||
} | ||
|
||
func (c *CasIdx) rebuildIndex() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note that the id is generated from (almost) all the properties. properties not included in the name are Partition, Lastupdate (and Name but Name should always be same as Metric so it's not relevant here, that's still something we have to clean up at some point btw). MemoryIdx.AddOrUpdate already made sure to update Partition and LastUpdate, so i see no need for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is to fix https://github.com/raintank/ops/issues/394
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aha ok. so after this has run for a while, at some point we'll be able to take out these lines again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. Unless we introduce metadata fields in future that do not contribute to the generated id.