From a5848af343bb6de997790d4d058fad7c70d2d273 Mon Sep 17 00:00:00 2001 From: parasssh Date: Tue, 4 Aug 2020 15:28:17 -0700 Subject: [PATCH] fix(rollups): rollup a batch if more than 2 seconds elapsed since last batch (#6118) (cherry picked from commit aca09216f0cfa5a125a81235127285d45e2fe546) --- posting/mvcc.go | 56 ++++++++++++++++++++++++++++++++++++------------- 1 file changed, 41 insertions(+), 15 deletions(-) diff --git a/posting/mvcc.go b/posting/mvcc.go index d71213e6745..c494b737edc 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -109,27 +109,53 @@ func (ir *incrRollupi) Process(closer *y.Closer) { m := make(map[uint64]int64) // map hash(key) to ts. hash(key) to limit the size of the map. limiter := time.NewTicker(100 * time.Millisecond) + defer limiter.Stop() + cleanupTick := time.NewTicker(5 * time.Minute) + defer cleanupTick.Stop() + forceRollupTick := time.NewTicker(2 * time.Second) + defer forceRollupTick.Stop() + + var batch *[][]byte + + doRollup := func() { + currTs := time.Now().Unix() + for _, key := range *batch { + hash := z.MemHash(key) + if elem := m[hash]; currTs-elem >= 10 { + // Key not present or Key present but last roll up was more than 10 sec ago. + // Add/Update map and rollup. + m[hash] = currTs + if err := ir.rollUpKey(writer, key); err != nil { + glog.Warningf("Error %v rolling up key %v\n", err, key) + } + } + } + // clear the batch and put it back in Sync keysPool + *batch = (*batch)[:0] + ir.keysPool.Put(batch) + } + for { select { case <-closer.HasBeenClosed(): return - case batch := <-ir.keysCh: - currTs := time.Now().Unix() - for _, key := range *batch { - hash := z.MemHash(key) - if elem := m[hash]; currTs-elem >= 10 { - // Key not present or Key present but last roll up was more than 10 sec ago. - // Add/Update map and rollup. - m[hash] = currTs - if err := ir.rollUpKey(writer, key); err != nil { - glog.Warningf("Error %v rolling up key %v\n", err, key) - } + case <-cleanupTick.C: + currTs := time.Now().UnixNano() + for hash, ts := range m { + // Remove entries from map which have been there for there more than 10 seconds. + if currTs-ts >= int64(10*time.Second) { + delete(m, hash) } } - // clear the batch and put it back in Sync keysPool - *batch = (*batch)[:0] - ir.keysPool.Put(batch) - + case <-forceRollupTick.C: + batch = ir.keysPool.Get().(*[][]byte) + if len(*batch) > 0 { + doRollup() + } else { + ir.keysPool.Put(batch) + } + case batch = <-ir.keysCh: + doRollup() // throttle to 1 batch = 64 rollups per 100 ms. <-limiter.C }