Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: grafana/metrictank
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 9561e0a92edb893ee20d5fd50507e1d876b696d2
Choose a base ref
..
head repository: grafana/metrictank
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 9e76b90c1c393ee5402921eb56e9c54257f8143a
Choose a head ref
Showing with 11 additions and 4 deletions.
  1. +7 −0 idx/bigtable/bigtable.go
  2. +4 −4 idx/cassandra/cassandra.go
7 changes: 7 additions & 0 deletions idx/bigtable/bigtable.go
Original file line number Diff line number Diff line change
@@ -225,6 +225,8 @@ func (b *BigtableIdx) Stop() {
}
}

// Update updates an existing archive, if found.
// It returns whether it was found, and - if so - the (updated) existing archive and its old partition
func (b *BigtableIdx) Update(point schema.MetricPoint, partition int32) (idx.Archive, int32, bool) {
pre := time.Now()

@@ -265,11 +267,14 @@ func (b *BigtableIdx) Update(point schema.MetricPoint, partition int32) (idx.Arc

func (b *BigtableIdx) AddOrUpdate(mkey schema.MKey, data *schema.MetricData, partition int32) (idx.Archive, int32, bool) {
pre := time.Now()

archive, oldPartition, inMemory := b.MemoryIdx.AddOrUpdate(mkey, data, partition)

stat := statUpdateDuration
if !inMemory {
stat = statAddDuration
}

if !updateBigTableIdx {
stat.Value(time.Since(pre))
return archive, oldPartition, inMemory
@@ -304,6 +309,8 @@ func (b *BigtableIdx) AddOrUpdate(mkey schema.MKey, data *schema.MetricData, par
return archive, oldPartition, inMemory
}

// updateBigtable saves the archive to bigtable and
// updates the memory index with the updated fields.
func (b *BigtableIdx) updateBigtable(now uint32, inMemory bool, archive idx.Archive, partition int32) idx.Archive {
// if the entry has not been saved for 1.5x updateInterval
// then perform a blocking save.
8 changes: 4 additions & 4 deletions idx/cassandra/cassandra.go
Original file line number Diff line number Diff line change
@@ -329,11 +329,10 @@ func (c *CasIdx) AddOrUpdate(mkey schema.MKey, data *schema.MetricData, partitio
// updateCassandra saves the archive to cassandra and
// updates the memory index with the updated fields.
func (c *CasIdx) updateCassandra(now uint32, inMemory bool, archive idx.Archive, partition int32) idx.Archive {

// if the entry has not been saved for 1.5x updateInterval
// then perform a blocking save.
if archive.LastSave < (now - updateInterval32 - updateInterval32/2) {
log.Debug("cassandra-idx: updating def in index.")
log.Debugf("cassandra-idx: updating def %s in index.", archive.MetricDefinition.Id)
c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}
archive.LastSave = now
c.MemoryIdx.UpdateArchive(archive)
@@ -350,7 +349,7 @@ func (c *CasIdx) updateCassandra(now uint32, inMemory bool, archive idx.Archive,
c.MemoryIdx.UpdateArchive(archive)
default:
statSaveSkipped.Inc()
log.Debug("writeQueue is full, update not saved.")
log.Debugf("cassandra-idx: writeQueue is full, update of %s not saved this time.", archive.MetricDefinition.Id)
}
}

@@ -360,11 +359,12 @@ func (c *CasIdx) updateCassandra(now uint32, inMemory bool, archive idx.Archive,
func (c *CasIdx) rebuildIndex() {
log.Info("cassandra-idx: Rebuilding Memory Index from metricDefinitions in Cassandra")
pre := time.Now()
var defs []schema.MetricDefinition

var staleTs uint32
if maxStale != 0 {
staleTs = uint32(time.Now().Add(maxStale * -1).Unix())
}
var defs []schema.MetricDefinition
defs = c.LoadPartitions(cluster.Manager.GetPartitions(), defs, staleTs)

num := c.MemoryIdx.Load(defs)