Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

storage/localstore: add subscriptions wait group before closing leveldb #1980

Merged
merged 3 commits into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions network/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"encoding/hex"
"math/rand"
"os"
"runtime/pprof"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1144,6 +1146,9 @@ func (r *Registry) Stop() error {
case <-done:
case <-time.After(5 * time.Second):
log.Error("stream closed with still active handlers")
// Print a full goroutine dump to debug blocking.
// TODO: use a logger to write a goroutine profile
pprof.Lookup("goroutine").WriteTo(os.Stdout, 2)
}

for _, v := range r.providers {
Expand Down
27 changes: 22 additions & 5 deletions storage/localstore/localstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package localstore
import (
"encoding/binary"
"errors"
"os"
"runtime/pprof"
"sync"
"time"

Expand Down Expand Up @@ -119,6 +121,11 @@ type DB struct {
collectGarbageWorkerDone chan struct{}

putToGCCheck func([]byte) bool

// wait for all subscriptions to finish before closing
// underlaying LevelDB to prevent possible panics from
// iterators
subscritionsWG sync.WaitGroup
}

// Options struct holds optional parameters for configuring DB.
Expand Down Expand Up @@ -431,14 +438,24 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
// Close closes the underlying database.
func (db *DB) Close() (err error) {
close(db.close)
db.updateGCWG.Wait()

// wait for gc worker to
// return before closing the shed
// wait for all handlers to finish
done := make(chan struct{})
go func() {
db.updateGCWG.Wait()
db.subscritionsWG.Wait()
// wait for gc worker to
// return before closing the shed
<-db.collectGarbageWorkerDone
close(done)
}()
select {
case <-db.collectGarbageWorkerDone:
case <-done:
case <-time.After(5 * time.Second):
log.Error("localstore: collect garbage worker did not return after db close")
log.Error("localstore closed with still active goroutines")
// Print a full goroutine dump to debug blocking.
// TODO: use a logger to write a goroutine profile
pprof.Lookup("goroutine").WriteTo(os.Stdout, 2)
}
return db.shed.Close()
}
Expand Down
2 changes: 2 additions & 0 deletions storage/localstore/subscription_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
// stop subscription when until chunk descriptor is reached
var errStopSubscription = errors.New("stop subscription")

db.subscritionsWG.Add(1)
go func() {
defer db.subscritionsWG.Done()
defer metrics.GetOrRegisterCounter(metricName+".stop", nil).Inc(1)
// close the returned chunk.Descriptor channel at the end to
// signal that the subscription is done
Expand Down
2 changes: 2 additions & 0 deletions storage/localstore/subscription_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
stopChan := make(chan struct{})
var stopChanOnce sync.Once

db.subscritionsWG.Add(1)
go func() {
defer db.subscritionsWG.Done()
defer metrics.GetOrRegisterCounter(metricName+".done", nil).Inc(1)
// close the returned chunkInfo channel at the end to
// signal that the subscription is done
Expand Down