Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit eb2f741

Browse files
committed
add writeQueue buffer to memoryIdx
Add support for a writeQueue that holds new MetricDefinitions in a buffer, then writes them into the index as a single batch.
1 parent 9f624c2 commit eb2f741

15 files changed

+327
-37
lines changed

docker/docker-chaos/metrictank.ini

+6
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,12 @@ find-cache-invalidate-max-size = 100
454454
find-cache-invalidate-max-wait = 5s
455455
# amount of time to disable the findCache when the invalidate queue fills up.
456456
find-cache-backoff-time = 60s
457+
# enable buffering new metricDefinitions and writing them to the index in batches
458+
write-queue-enabled = false
459+
# maximum delay between flushing buffered metric writes to the index
460+
write-queue-delay = 30s
461+
# maximum number of metricDefinitions that can be added to the index in a single batch
462+
write-max-batch-size = 5000
457463

458464
### Bigtable index
459465
[bigtable-idx]

docker/docker-cluster/metrictank.ini

+6
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,12 @@ find-cache-invalidate-max-size = 100
454454
find-cache-invalidate-max-wait = 5s
455455
# amount of time to disable the findCache when the invalidate queue fills up.
456456
find-cache-backoff-time = 60s
457+
# enable buffering new metricDefinitions and writing them to the index in batches
458+
write-queue-enabled = false
459+
# maximum delay between flushing buffered metric writes to the index
460+
write-queue-delay = 30s
461+
# maximum number of metricDefinitions that can be added to the index in a single batch
462+
write-max-batch-size = 5000
457463

458464
### Bigtable index
459465
[bigtable-idx]

docker/docker-dev-custom-cfg-kafka/metrictank.ini

+6
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,12 @@ find-cache-invalidate-max-size = 100
454454
find-cache-invalidate-max-wait = 5s
455455
# amount of time to disable the findCache when the invalidate queue fills up.
456456
find-cache-backoff-time = 60s
457+
# enable buffering new metricDefinitions and writing them to the index in batches
458+
write-queue-enabled = false
459+
# maximum delay between flushing buffered metric writes to the index
460+
write-queue-delay = 30s
461+
# maximum number of metricDefinitions that can be added to the index in a single batch
462+
write-max-batch-size = 5000
457463

458464
### Bigtable index
459465
[bigtable-idx]

docs/config.md

+6
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,12 @@ find-cache-invalidate-max-size = 100
528528
find-cache-invalidate-max-wait = 5s
529529
# amount of time to disable the findCache when the invalidate queue fills up.
530530
find-cache-backoff-time = 60s
531+
# enable buffering new metricDefinitions and writing them to the index in batches
532+
write-queue-enabled = false
533+
# maximum delay between flushing buffered metric writes to the index
534+
write-queue-delay = 30s
535+
# maximum number of metricDefinitions that can be added to the index in a single batch
536+
write-max-batch-size = 5000
531537
```
532538

533539
### Bigtable index

idx/bigtable/bigtable.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ func (b *BigtableIdx) updateBigtable(now uint32, inMemory bool, archive idx.Arch
286286
log.Debugf("bigtable-idx: updating def %s in index.", archive.MetricDefinition.Id)
287287
b.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}
288288
archive.LastSave = now
289-
b.MemoryIndex.UpdateArchive(archive)
289+
b.MemoryIndex.UpdateArchiveLastSave(archive.Id, archive.Partition, now)
290290
} else {
291291
// perform a non-blocking write to the writeQueue. If the queue is full, then
292292
// this will fail and we won't update the LastSave timestamp. The next time
@@ -297,7 +297,7 @@ func (b *BigtableIdx) updateBigtable(now uint32, inMemory bool, archive idx.Arch
297297
select {
298298
case b.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}:
299299
archive.LastSave = now
300-
b.MemoryIndex.UpdateArchive(archive)
300+
b.MemoryIndex.UpdateArchiveLastSave(archive.Id, archive.Partition, now)
301301
default:
302302
statSaveSkipped.Inc()
303303
log.Debugf("bigtable-idx: writeQueue is full, update of %s not saved this time", archive.MetricDefinition.Id)

idx/cassandra/cassandra.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ func (c *CasIdx) updateCassandra(now uint32, inMemory bool, archive idx.Archive,
320320
log.Debugf("cassandra-idx: updating def %s in index.", archive.MetricDefinition.Id)
321321
c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}
322322
archive.LastSave = now
323-
c.MemoryIndex.UpdateArchive(archive)
323+
c.MemoryIndex.UpdateArchiveLastSave(archive.Id, archive.Partition, now)
324324
} else {
325325
// perform a non-blocking write to the writeQueue. If the queue is full, then
326326
// this will fail and we won't update the LastSave timestamp. The next time
@@ -331,7 +331,7 @@ func (c *CasIdx) updateCassandra(now uint32, inMemory bool, archive idx.Archive,
331331
select {
332332
case c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}:
333333
archive.LastSave = now
334-
c.MemoryIndex.UpdateArchive(archive)
334+
c.MemoryIndex.UpdateArchiveLastSave(archive.Id, archive.Partition, now)
335335
default:
336336
statSaveSkipped.Inc()
337337
log.Debugf("cassandra-idx: writeQueue is full, update of %s not saved this time.", archive.MetricDefinition.Id)

idx/cassandra/cassandra_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ func TestAddToWriteQueue(t *testing.T) {
248248

249249
archive, _ := ix.Get(mkey)
250250
archive.LastSave = uint32(time.Now().Unix() - 100)
251-
ix.UpdateArchive(archive)
251+
ix.UpdateArchiveLastSave(archive.Id, archive.Partition, archive.LastSave)
252252
}
253253
for _, s := range metrics {
254254
mkey, err := schema.MKeyFromString(s.Id)

idx/memory/memory.go

+102-26
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ var (
6161
findCacheInvalidateMaxSize = 100
6262
findCacheInvalidateMaxWait = 5 * time.Second
6363
findCacheBackoffTime = time.Minute
64+
writeQueueEnabled = false
65+
writeQueueDelay = 30 * time.Second
66+
writeMaxBatchSize = 5000
6467
)
6568

6669
func ConfigSetup() {
@@ -73,6 +76,9 @@ func ConfigSetup() {
7376
memoryIdx.IntVar(&findCacheSize, "find-cache-size", 1000, "number of find expressions to cache (per org). 0 disables cache")
7477
memoryIdx.IntVar(&findCacheInvalidateQueueSize, "find-cache-invalidate-queue-size", 200, "size of queue for invalidating findCache entries")
7578
memoryIdx.IntVar(&findCacheInvalidateMaxSize, "find-cache-invalidate-max-size", 100, "max amount of invalidations to queue up in one batch")
79+
memoryIdx.BoolVar(&writeQueueEnabled, "write-queue-enabled", false, "enable buffering new metricDefinitions and writing them to the index in batches")
80+
memoryIdx.DurationVar(&writeQueueDelay, "write-queue-delay", 30*time.Second, "maximum delay between flushing buffered metric writes to the index")
81+
memoryIdx.IntVar(&writeMaxBatchSize, "write-max-batch-size", 5000, "maximum number of metricDefinitions that can be added to the index in a single batch")
7682
memoryIdx.DurationVar(&findCacheInvalidateMaxWait, "find-cache-invalidate-max-wait", 5*time.Second, "max duration to wait building up a batch to invalidate")
7783
memoryIdx.DurationVar(&findCacheBackoffTime, "find-cache-backoff-time", time.Minute, "amount of time to disable the findCache when the invalidate queue fills up.")
7884
memoryIdx.StringVar(&indexRulesFile, "rules-file", "/etc/metrictank/index-rules.conf", "path to index-rules.conf file")
@@ -109,8 +115,8 @@ func ConfigProcess() {
109115
type MemoryIndex interface {
110116
idx.MetricIndex
111117
LoadPartition(int32, []schema.MetricDefinition) int
112-
UpdateArchive(idx.Archive)
113-
add(*schema.MetricDefinition) idx.Archive
118+
UpdateArchiveLastSave(schema.MKey, int32, uint32)
119+
add(*idx.Archive)
114120
idsByTagQuery(uint32, TagQuery) IdSet
115121
PurgeFindCache()
116122
ForceInvalidationFindCache()
@@ -254,24 +260,29 @@ type UnpartitionedMemoryIdx struct {
254260
metaTagRecords map[uint32]metaTagRecords // by orgId
255261

256262
findCache *FindCache
263+
264+
writeQueue *WriteQueue
257265
}
258266

259267
func NewUnpartitionedMemoryIdx() *UnpartitionedMemoryIdx {
260-
m := UnpartitionedMemoryIdx{
268+
m := &UnpartitionedMemoryIdx{
261269
defById: make(map[schema.MKey]*idx.Archive),
262270
defByTagSet: make(defByTagSet),
263271
tree: make(map[uint32]*Tree),
264272
tags: make(map[uint32]TagIndex),
265273
metaTags: make(map[uint32]metaTagIndex),
266274
metaTagRecords: make(map[uint32]metaTagRecords),
267275
}
268-
return &m
276+
return m
269277
}
270278

271279
func (m *UnpartitionedMemoryIdx) Init() error {
272280
if findCacheSize > 0 {
273281
m.findCache = NewFindCache(findCacheSize, findCacheInvalidateQueueSize, findCacheInvalidateMaxSize, findCacheInvalidateMaxWait, findCacheBackoffTime)
274282
}
283+
if writeQueueEnabled {
284+
m.writeQueue = NewWriteQueue(m, writeQueueDelay, writeMaxBatchSize)
285+
}
275286
return nil
276287
}
277288

@@ -280,6 +291,10 @@ func (m *UnpartitionedMemoryIdx) Stop() {
280291
m.findCache.Shutdown()
281292
m.findCache = nil
282293
}
294+
if m.writeQueue != nil {
295+
m.writeQueue.Stop()
296+
m.writeQueue = nil
297+
}
283298
return
284299
}
285300

@@ -303,7 +318,6 @@ func (m *UnpartitionedMemoryIdx) Update(point schema.MetricPoint, partition int3
303318
pre := time.Now()
304319

305320
m.RLock()
306-
defer m.RUnlock()
307321

308322
existing, ok := m.defById[point.MKey]
309323
if ok {
@@ -316,8 +330,45 @@ func (m *UnpartitionedMemoryIdx) Update(point schema.MetricPoint, partition int3
316330
oldPart := atomic.SwapInt32(&existing.Partition, partition)
317331
statUpdate.Inc()
318332
statUpdateDuration.Value(time.Since(pre))
333+
m.RUnlock()
319334
return *existing, oldPart, true
320335
}
336+
m.RUnlock()
337+
338+
if m.writeQueue != nil {
339+
// if we are using the writeQueue, then the archive for this MKey might be queued
340+
// and not yet flushed to the index yet.
341+
existing, ok := m.writeQueue.Get(point.MKey)
342+
if ok {
343+
if log.IsLevelEnabled(log.DebugLevel) {
344+
log.Debugf("memory-idx: metricDef with id %v is in the writeQueue", point.MKey)
345+
}
346+
347+
bumpLastUpdate(&existing.LastUpdate, int64(point.Time))
348+
349+
oldPart := atomic.SwapInt32(&existing.Partition, partition)
350+
statUpdate.Inc()
351+
statUpdateDuration.Value(time.Since(pre))
352+
return *existing, oldPart, true
353+
}
354+
355+
// we need to do one final check of m.defById, as the writeQueue may have been flushed between
356+
// when we released m.RLock() and when the call to m.writeQueue.Get() as able to obtain its own lock.
357+
existing, ok = m.defById[point.MKey]
358+
if ok {
359+
if log.IsLevelEnabled(log.DebugLevel) {
360+
log.Debugf("memory-idx: metricDef with id %v already in index", point.MKey)
361+
}
362+
363+
bumpLastUpdate(&existing.LastUpdate, int64(point.Time))
364+
365+
oldPart := atomic.SwapInt32(&existing.Partition, partition)
366+
statUpdate.Inc()
367+
statUpdateDuration.Value(time.Since(pre))
368+
m.RUnlock()
369+
return *existing, oldPart, true
370+
}
371+
}
321372

322373
return idx.Archive{}, 0, false
323374
}
@@ -345,26 +396,34 @@ func (m *UnpartitionedMemoryIdx) AddOrUpdate(mkey schema.MKey, data *schema.Metr
345396
}
346397

347398
m.RUnlock()
348-
m.Lock()
349-
defer m.Unlock()
350-
351399
def := schema.MetricDefinitionFromMetricData(data)
352400
def.Partition = partition
353-
archive := m.add(def)
354-
statMetricsActive.Inc()
355-
statAddDuration.Value(time.Since(pre))
401+
archive := getArchive(def)
402+
if m.writeQueue == nil {
403+
m.Lock()
404+
m.add(archive)
405+
m.Unlock()
406+
statMetricsActive.Inc()
407+
statAddDuration.Value(time.Since(pre))
408+
} else {
409+
// push the new archive into the writeQueue
410+
m.writeQueue.Queue(archive)
411+
}
356412

357-
return archive, 0, false
413+
return *archive, 0, false
358414
}
359415

360-
// UpdateArchive updates the archive information
361-
func (m *UnpartitionedMemoryIdx) UpdateArchive(archive idx.Archive) {
362-
m.Lock()
363-
defer m.Unlock()
364-
if _, ok := m.defById[archive.Id]; !ok {
416+
// UpdateArchiveLastSave updates the LastSave timestamp of the archive
417+
func (m *UnpartitionedMemoryIdx) UpdateArchiveLastSave(id schema.MKey, partition int32, lastSave uint32) {
418+
m.RLock()
419+
if _, ok := m.defById[id]; !ok {
420+
// This will happen if the metricDefinition was saved to a backend store
421+
// before the archive was added to the index.
422+
m.RUnlock()
365423
return
366424
}
367-
*(m.defById[archive.Id]) = archive
425+
atomic.StoreUint32(&m.defById[id].LastSave, lastSave)
426+
m.RUnlock()
368427
}
369428

370429
// MetaTagRecordUpsert inserts or updates a meta record, depending on whether
@@ -530,7 +589,7 @@ func (m *UnpartitionedMemoryIdx) Load(defs []schema.MetricDefinition) int {
530589
continue
531590
}
532591

533-
m.add(def)
592+
m.add(getArchive(def))
534593

535594
// as we are loading the metricDefs from a persistent store, set the lastSave
536595
// to the lastUpdate timestamp. This won't exactly match the true lastSave Timstamp,
@@ -545,32 +604,46 @@ func (m *UnpartitionedMemoryIdx) Load(defs []schema.MetricDefinition) int {
545604
return num
546605
}
547606

548-
func (m *UnpartitionedMemoryIdx) add(def *schema.MetricDefinition) idx.Archive {
607+
func getArchive(def *schema.MetricDefinition) *idx.Archive {
549608
path := def.NameWithTags()
550609
schemaId, _ := mdata.MatchSchema(path, def.Interval)
551610
aggId, _ := mdata.MatchAgg(path)
552611
irId, _ := IndexRules.Match(path)
553612

554-
archive := &idx.Archive{
613+
return &idx.Archive{
555614
MetricDefinition: *def,
556615
SchemaId: schemaId,
557616
AggId: aggId,
558617
IrId: irId,
559618
}
619+
}
620+
621+
func (m *UnpartitionedMemoryIdx) add(archive *idx.Archive) {
622+
// there is a race condition that can lead to an archive being added
623+
// to the writeQueue just after a queued copy of the archive was flushed.
624+
// If that happens, we update the contents of the passed 'archive' to be
625+
// the contents of the 'existing' archive found.
626+
if existing, ok := m.defById[archive.Id]; ok {
627+
*archive = *existing
628+
return
629+
}
630+
631+
def := &archive.MetricDefinition
632+
path := def.NameWithTags()
560633

561634
if TagSupport {
562635
// Even if there are no tags, index at least "name". It's important to use the definition
563636
// in the archive pointer that we add to defById, because the pointers must reference the
564637
// same underlying object in m.defById and m.defByTagSet
565-
m.indexTags(&archive.MetricDefinition)
638+
m.indexTags(def)
566639

567640
if len(def.Tags) > 0 {
568641
if _, ok := m.defById[def.Id]; !ok {
569642
m.defById[def.Id] = archive
570643
statAdd.Inc()
571644
log.Debugf("memory-idx: adding %s to DefById", path)
572645
}
573-
return *archive
646+
return
574647
}
575648
}
576649

@@ -602,7 +675,7 @@ func (m *UnpartitionedMemoryIdx) add(def *schema.MetricDefinition) idx.Archive {
602675
node.Defs = append(node.Defs, def.Id)
603676
m.defById[def.Id] = archive
604677
statAdd.Inc()
605-
return *archive
678+
return
606679
}
607680
}
608681

@@ -649,7 +722,7 @@ func (m *UnpartitionedMemoryIdx) add(def *schema.MetricDefinition) idx.Archive {
649722
m.defById[def.Id] = archive
650723
statAdd.Inc()
651724

652-
return *archive
725+
return
653726
}
654727

655728
func (m *UnpartitionedMemoryIdx) Get(id schema.MKey) (idx.Archive, bool) {
@@ -1140,7 +1213,10 @@ func (m *UnpartitionedMemoryIdx) Find(orgId uint32, pattern string, from int64)
11401213
log.Debugf("memory-idx: from is %d, so skipping %s which has LastUpdate %d", from, def.Id, atomic.LoadInt64(&def.LastUpdate))
11411214
continue
11421215
}
1143-
log.Debugf("memory-idx: Find: adding to path %s archive id=%s name=%s int=%d schemaId=%d aggId=%d irId=%d lastSave=%d", n.Path, def.Id, def.Name, def.Interval, def.SchemaId, def.AggId, def.IrId, def.LastSave)
1216+
if log.IsLevelEnabled(log.DebugLevel) {
1217+
lastSave := atomic.LoadUint32(&def.LastSave)
1218+
log.Debugf("memory-idx: Find: adding to path %s archive id=%s name=%s int=%d schemaId=%d aggId=%d irId=%d lastSave=%d", n.Path, def.Id, def.Name, def.Interval, def.SchemaId, def.AggId, def.IrId, lastSave)
1219+
}
11441220
idxNode.Defs = append(idxNode.Defs, *def)
11451221
}
11461222
if len(idxNode.Defs) == 0 {

idx/memory/memory_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -1228,7 +1228,7 @@ func testMatchSchemaWithTags(t *testing.T) {
12281228
defer ix.Stop()
12291229

12301230
data := make([]*schema.MetricDefinition, 10)
1231-
archives := make([]idx.Archive, 10)
1231+
archives := make([]*idx.Archive, 10)
12321232
for i := 0; i < 10; i++ {
12331233
name := fmt.Sprintf("some.id.of.a.metric.%d", i)
12341234
data[i] = &schema.MetricDefinition{
@@ -1239,7 +1239,8 @@ func testMatchSchemaWithTags(t *testing.T) {
12391239
Partition: getPartitionFromName(name),
12401240
}
12411241
data[i].SetId()
1242-
archives[i] = ix.add(data[i])
1242+
archives[i] = getArchive(data[i])
1243+
ix.add(archives[i])
12431244
}
12441245

12451246
// only those MDs with tag1=value3 or tag1=value5 should get the first schema id

0 commit comments

Comments
 (0)