Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit c769795

Browse files
authored
Merge pull request #1114 from grafana/index-rules-bigtable
fix bigtable idx pruning + index-rules.conf pruning for bigtable idx
2 parents 90c56ab + 7e1d50e commit c769795

File tree

10 files changed

+39
-51
lines changed

10 files changed

+39
-51
lines changed

docker/docker-chaos/metrictank.ini

-2
Original file line numberDiff line numberDiff line change
@@ -426,8 +426,6 @@ write-concurrency = 5
426426
update-bigtable-index = true
427427
# frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates
428428
update-interval = 3h
429-
# clear series from the index if they have not been seen for this much time.
430-
max-stale = 0
431429
# Interval at which the index should be checked for stale series.
432430
prune-interval = 3h
433431
# enable the creation of the table and column families

docker/docker-cluster/metrictank.ini

-2
Original file line numberDiff line numberDiff line change
@@ -426,8 +426,6 @@ write-concurrency = 5
426426
update-bigtable-index = true
427427
# frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates
428428
update-interval = 3h
429-
# clear series from the index if they have not been seen for this much time.
430-
max-stale = 0
431429
# Interval at which the index should be checked for stale series.
432430
prune-interval = 3h
433431
# enable the creation of the table and column families

docker/docker-dev-custom-cfg-kafka/metrictank.ini

-2
Original file line numberDiff line numberDiff line change
@@ -426,8 +426,6 @@ write-concurrency = 5
426426
update-bigtable-index = true
427427
# frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates
428428
update-interval = 3h
429-
# clear series from the index if they have not been seen for this much time.
430-
max-stale = 0
431429
# Interval at which the index should be checked for stale series.
432430
prune-interval = 3h
433431
# enable the creation of the table and column families

docs/config.md

-2
Original file line numberDiff line numberDiff line change
@@ -498,8 +498,6 @@ write-concurrency = 5
498498
update-bigtable-index = true
499499
# frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates
500500
update-interval = 3h
501-
# clear series from the index if they have not been seen for this much time.
502-
max-stale = 0
503501
# Interval at which the index should be checked for stale series.
504502
prune-interval = 3h
505503
# enable the creation of the table and column families

idx/bigtable/bigtable.go

+33-28
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func (b *BigtableIdx) Init() error {
182182
}
183183

184184
b.rebuildIndex()
185-
if b.cfg.MaxStale > 0 {
185+
if memory.IndexRules.Prunable() {
186186
b.wg.Add(1)
187187
go b.prune()
188188
}
@@ -311,24 +311,20 @@ func (b *BigtableIdx) rebuildIndex() {
311311
log.Info("bigtable-idx: Rebuilding Memory Index from metricDefinitions in bigtable")
312312
pre := time.Now()
313313

314-
var staleTs uint32
315-
if b.cfg.MaxStale != 0 {
316-
staleTs = uint32(time.Now().Add(b.cfg.MaxStale * -1).Unix())
317-
}
318314
num := 0
319315
var defs []schema.MetricDefinition
320316
for _, partition := range cluster.Manager.GetPartitions() {
321-
defs = b.LoadPartition(partition, defs[:0], staleTs)
317+
defs = b.LoadPartition(partition, defs[:0], pre)
322318
num += b.MemoryIdx.Load(defs)
323319
}
324320

325321
log.Infof("bigtable-idx: Rebuilding Memory Index Complete. Imported %d. Took %s", num, time.Since(pre))
326322
}
327323

328-
func (b *BigtableIdx) LoadPartition(partition int32, defs []schema.MetricDefinition, cutoff uint32) []schema.MetricDefinition {
324+
func (b *BigtableIdx) LoadPartition(partition int32, defs []schema.MetricDefinition, now time.Time) []schema.MetricDefinition {
329325
ctx := context.Background()
330326
rr := bigtable.PrefixRange(fmt.Sprintf("%d_", partition))
331-
defsBySeries := make(map[string][]schema.MetricDefinition)
327+
defsByNames := make(map[string][]schema.MetricDefinition)
332328
var marshalErr error
333329
err := b.tbl.ReadRows(ctx, rr, func(r bigtable.Row) bool {
334330
def := schema.MetricDefinition{}
@@ -337,7 +333,8 @@ func (b *BigtableIdx) LoadPartition(partition int32, defs []schema.MetricDefinit
337333
return false
338334
}
339335
log.Debugf("bigtable-idx: found def %+v", def)
340-
defsBySeries[def.Name] = append(defsBySeries[def.Name], def)
336+
nameWithTags := def.NameWithTags()
337+
defsByNames[nameWithTags] = append(defsByNames[nameWithTags], def)
341338
return true
342339
}, bigtable.RowFilter(bigtable.FamilyFilter(COLUMN_FAMILY)))
343340
if err != nil {
@@ -347,17 +344,23 @@ func (b *BigtableIdx) LoadPartition(partition int32, defs []schema.MetricDefinit
347344
log.Fatalf("bigtable-idx: failed to marshal row to metricDef. %s", marshalErr)
348345
}
349346

350-
LOOP:
351-
for series, defList := range defsBySeries {
352-
for _, def := range defList {
353-
if def.LastUpdate > int64(cutoff) {
354-
// add all defs for this series.
355-
defs = append(defs, defsBySeries[series]...)
356-
continue LOOP
347+
// getting all cutoffs once saves having to recompute everytime we have a match
348+
cutoffs := memory.IndexRules.Cutoffs(now)
349+
350+
NAMES:
351+
for nameWithTags, defsByName := range defsByNames {
352+
irId, _ := memory.IndexRules.Match(nameWithTags)
353+
cutoff := cutoffs[irId]
354+
for _, def := range defsByName {
355+
if def.LastUpdate > cutoff {
356+
// if any of the defs for a given nameWithTags is not stale, then we need to load
357+
// all the defs for that nameWithTags.
358+
defs = append(defs, defsByNames[nameWithTags]...)
359+
continue NAMES
357360
}
358361
}
359362
// all defs are stale
360-
delete(defsBySeries, series)
363+
delete(defsByNames, nameWithTags)
361364
}
362365

363366
return defs
@@ -493,10 +496,17 @@ func (b *BigtableIdx) deleteRow(key string) error {
493496
return nil
494497
}
495498

496-
func (b *BigtableIdx) Prune(oldest time.Time) ([]idx.Archive, error) {
497-
pre := time.Now()
498-
pruned, err := b.MemoryIdx.Prune(oldest)
499-
statPruneDuration.Value(time.Since(pre))
499+
func (b *BigtableIdx) Prune(now time.Time) ([]idx.Archive, error) {
500+
log.Info("bigtable-idx: start pruning of series")
501+
pruned, err := b.MemoryIdx.Prune(now)
502+
duration := time.Since(now)
503+
if err != nil {
504+
log.Errorf("bigtable-idx: prune error. %s", err)
505+
} else {
506+
statPruneDuration.Value(duration)
507+
log.Infof("bigtable-idx: finished pruning of %d series in %s", len(pruned), duration)
508+
509+
}
500510
return pruned, err
501511
}
502512

@@ -505,13 +515,8 @@ func (b *BigtableIdx) prune() {
505515
ticker := time.NewTicker(b.cfg.PruneInterval)
506516
for {
507517
select {
508-
case <-ticker.C:
509-
log.Debugf("bigtable-idx: pruning items from index that have not been seen for %s", b.cfg.MaxStale.String())
510-
staleTs := time.Now().Add(b.cfg.MaxStale * -1)
511-
_, err := b.Prune(staleTs)
512-
if err != nil {
513-
log.Errorf("bigtable-idx: prune error. %s", err)
514-
}
518+
case now := <-ticker.C:
519+
b.Prune(now)
515520
case <-b.shutdown:
516521
return
517522
}

idx/bigtable/config.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ type IdxConfig struct {
2020
UpdateBigtableIdx bool
2121
UpdateInterval time.Duration
2222
updateInterval32 uint32
23-
MaxStale time.Duration
2423
PruneInterval time.Duration
2524
CreateCF bool
2625
}
@@ -33,7 +32,7 @@ func (cfg *IdxConfig) Validate() error {
3332
if cfg.WriteMaxFlushSize >= cfg.WriteQueueSize {
3433
return errors.New("write-queue-size must be larger then write-max-flush-size")
3534
}
36-
if cfg.MaxStale > 0 && cfg.PruneInterval == 0 {
35+
if cfg.PruneInterval == 0 {
3736
return errors.New("pruneInterval must be greater then 0")
3837
}
3938
return nil
@@ -51,7 +50,6 @@ func NewIdxConfig() *IdxConfig {
5150
WriteConcurrency: 5,
5251
UpdateBigtableIdx: true,
5352
UpdateInterval: time.Hour * 3,
54-
MaxStale: 0,
5553
PruneInterval: time.Hour * 3,
5654
CreateCF: true,
5755
}
@@ -71,7 +69,6 @@ func ConfigSetup() {
7169
btIdx.IntVar(&CliConfig.WriteConcurrency, "write-concurrency", CliConfig.WriteConcurrency, "Number of writer threads to use")
7270
btIdx.BoolVar(&CliConfig.UpdateBigtableIdx, "update-bigtable-index", CliConfig.UpdateBigtableIdx, "synchronize index changes to bigtable. not all your nodes need to do this.")
7371
btIdx.DurationVar(&CliConfig.UpdateInterval, "update-interval", CliConfig.UpdateInterval, "frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates")
74-
btIdx.DurationVar(&CliConfig.MaxStale, "max-stale", CliConfig.MaxStale, "clear series from the index if they have not been seen for this much time.")
7572
btIdx.DurationVar(&CliConfig.PruneInterval, "prune-interval", CliConfig.PruneInterval, "Interval at which the index should be checked for stale series.")
7673
btIdx.BoolVar(&CliConfig.CreateCF, "create-cf", CliConfig.CreateCF, "enable the creation of the table and column families")
7774

idx/cassandra/cassandra.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -416,14 +416,14 @@ func (c *CasIdx) load(defs []schema.MetricDefinition, iter cqlIterator, now time
416416
cutoffs := memory.IndexRules.Cutoffs(now)
417417

418418
NAMES:
419-
for name, defsByName := range defsByNames {
420-
irId, _ := memory.IndexRules.Match(name)
419+
for nameWithTags, defsByName := range defsByNames {
420+
irId, _ := memory.IndexRules.Match(nameWithTags)
421421
cutoff := cutoffs[irId]
422422
for _, def := range defsByName {
423423
if def.LastUpdate >= cutoff {
424-
// if one of the defs in a name is not stale, then we'll need to add
425-
// all the associated MDs to the defs slice
426-
for _, defToAdd := range defsByNames[name] {
424+
// if any of the defs for a given nameWithTags is not stale, then we need to load
425+
// all the defs for that nameWithTags.
426+
for _, defToAdd := range defsByNames[nameWithTags] {
427427
defs = append(defs, *defToAdd)
428428
}
429429
continue NAMES

metrictank-sample.ini

-2
Original file line numberDiff line numberDiff line change
@@ -429,8 +429,6 @@ write-concurrency = 5
429429
update-bigtable-index = true
430430
# frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates
431431
update-interval = 3h
432-
# clear series from the index if they have not been seen for this much time.
433-
max-stale = 0
434432
# Interval at which the index should be checked for stale series.
435433
prune-interval = 3h
436434
# enable the creation of the table and column families

scripts/config/metrictank-docker.ini

-2
Original file line numberDiff line numberDiff line change
@@ -426,8 +426,6 @@ write-concurrency = 5
426426
update-bigtable-index = true
427427
# frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates
428428
update-interval = 3h
429-
# clear series from the index if they have not been seen for this much time.
430-
max-stale = 0
431429
# Interval at which the index should be checked for stale series.
432430
prune-interval = 3h
433431
# enable the creation of the table and column families

scripts/config/metrictank-package.ini

-2
Original file line numberDiff line numberDiff line change
@@ -426,8 +426,6 @@ write-concurrency = 5
426426
update-bigtable-index = true
427427
# frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates
428428
update-interval = 3h
429-
# clear series from the index if they have not been seen for this much time.
430-
max-stale = 0
431429
# Interval at which the index should be checked for stale series.
432430
prune-interval = 3h
433431
# enable the creation of the table and column families

0 commit comments

Comments
 (0)