diff --git a/posting/mvcc.go b/posting/mvcc.go index c15b8744618..2f5b3d0a9d1 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -112,6 +112,28 @@ func (ir *incrRollupi) Process(closer *y.Closer) { defer limiter.Stop() cleanupTick := time.NewTicker(5 * time.Minute) defer cleanupTick.Stop() + forceRollupTick := time.NewTicker(2 * time.Second) + defer forceRollupTick.Stop() + + var batch *[][]byte + + doRollup := func() { + currTs := time.Now().Unix() + for _, key := range *batch { + hash := z.MemHash(key) + if elem := m[hash]; currTs-elem >= 10 { + // Key not present or Key present but last roll up was more than 10 sec ago. + // Add/Update map and rollup. + m[hash] = currTs + if err := ir.rollUpKey(writer, key); err != nil { + glog.Warningf("Error %v rolling up key %v\n", err, key) + } + } + } + // clear the batch and put it back in Sync keysPool + *batch = (*batch)[:0] + ir.keysPool.Put(batch) + } for { select { @@ -125,23 +147,15 @@ func (ir *incrRollupi) Process(closer *y.Closer) { delete(m, hash) } } - case batch := <-ir.keysCh: - currTs := time.Now().Unix() - for _, key := range *batch { - hash := z.MemHash(key) - if elem := m[hash]; currTs-elem >= 10 { - // Key not present or Key present but last roll up was more than 10 sec ago. - // Add/Update map and rollup. - m[hash] = currTs - if err := ir.rollUpKey(writer, key); err != nil { - glog.Warningf("Error %v rolling up key %v\n", err, key) - } - } + case <-forceRollupTick.C: + batch = ir.keysPool.Get().(*[][]byte) + if len(*batch) > 0 { + doRollup() + } else { + ir.keysPool.Put(batch) } - // clear the batch and put it back in Sync keysPool - *batch = (*batch)[:0] - ir.keysPool.Put(batch) - + case batch = <-ir.keysCh: + doRollup() // throttle to 1 batch = 64 rollups per 100 ms. <-limiter.C }