Skip to content

Commit

Permalink
fix(rollups): rollup a batch if more than 2 seconds elapsed since las…
Browse files Browse the repository at this point in the history
…t batch (#6118)

(cherry picked from commit aca0921)
  • Loading branch information
parasssh committed Aug 4, 2020
1 parent 8bdc648 commit a5848af
Showing 1 changed file with 41 additions and 15 deletions.
56 changes: 41 additions & 15 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,27 +109,53 @@ func (ir *incrRollupi) Process(closer *y.Closer) {

m := make(map[uint64]int64) // map hash(key) to ts. hash(key) to limit the size of the map.
limiter := time.NewTicker(100 * time.Millisecond)
defer limiter.Stop()
cleanupTick := time.NewTicker(5 * time.Minute)
defer cleanupTick.Stop()
forceRollupTick := time.NewTicker(2 * time.Second)
defer forceRollupTick.Stop()

var batch *[][]byte

doRollup := func() {
currTs := time.Now().Unix()
for _, key := range *batch {
hash := z.MemHash(key)
if elem := m[hash]; currTs-elem >= 10 {
// Key not present or Key present but last roll up was more than 10 sec ago.
// Add/Update map and rollup.
m[hash] = currTs
if err := ir.rollUpKey(writer, key); err != nil {
glog.Warningf("Error %v rolling up key %v\n", err, key)
}
}
}
// clear the batch and put it back in Sync keysPool
*batch = (*batch)[:0]
ir.keysPool.Put(batch)
}

for {
select {
case <-closer.HasBeenClosed():
return
case batch := <-ir.keysCh:
currTs := time.Now().Unix()
for _, key := range *batch {
hash := z.MemHash(key)
if elem := m[hash]; currTs-elem >= 10 {
// Key not present or Key present but last roll up was more than 10 sec ago.
// Add/Update map and rollup.
m[hash] = currTs
if err := ir.rollUpKey(writer, key); err != nil {
glog.Warningf("Error %v rolling up key %v\n", err, key)
}
case <-cleanupTick.C:
currTs := time.Now().UnixNano()
for hash, ts := range m {
// Remove entries from map which have been there for there more than 10 seconds.
if currTs-ts >= int64(10*time.Second) {
delete(m, hash)
}
}
// clear the batch and put it back in Sync keysPool
*batch = (*batch)[:0]
ir.keysPool.Put(batch)

case <-forceRollupTick.C:
batch = ir.keysPool.Get().(*[][]byte)
if len(*batch) > 0 {
doRollup()
} else {
ir.keysPool.Put(batch)
}
case batch = <-ir.keysCh:
doRollup()
// throttle to 1 batch = 64 rollups per 100 ms.
<-limiter.C
}
Expand Down

0 comments on commit a5848af

Please sign in to comment.