Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

storage/localstore: add subscriptions wait group before closing leveldb #1980

Merged
merged 3 commits into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion network/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"encoding/hex"
"math/rand"
"os"
"runtime/pprof"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1143,7 +1145,10 @@ func (r *Registry) Stop() error {
select {
case <-done:
case <-time.After(5 * time.Second):
log.Error("stream closed with still active handlers")
r.logger.Error("stream closed with still active handlers")
// Print a full goroutine dump to debug blocking.
// TODO: use a logger to write a goroutine profile
pprof.Lookup("goroutine").WriteTo(os.Stdout, 2)
}

for _, v := range r.providers {
Expand Down
27 changes: 22 additions & 5 deletions storage/localstore/localstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package localstore
import (
"encoding/binary"
"errors"
"os"
"runtime/pprof"
"sync"
"time"

Expand Down Expand Up @@ -119,6 +121,11 @@ type DB struct {
collectGarbageWorkerDone chan struct{}

putToGCCheck func([]byte) bool

// wait for all subscriptions to finish before closing
// underlaying LevelDB to prevent possible panics from
// iterators
subscritionsWG sync.WaitGroup
}

// Options struct holds optional parameters for configuring DB.
Expand Down Expand Up @@ -431,14 +438,24 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
// Close closes the underlying database.
func (db *DB) Close() (err error) {
close(db.close)
db.updateGCWG.Wait()

// wait for gc worker to
// return before closing the shed
// wait for all handlers to finish
done := make(chan struct{})
go func() {
db.updateGCWG.Wait()
db.subscritionsWG.Wait()
// wait for gc worker to
// return before closing the shed
<-db.collectGarbageWorkerDone
close(done)
}()
select {
case <-db.collectGarbageWorkerDone:
case <-done:
case <-time.After(5 * time.Second):
log.Error("localstore: collect garbage worker did not return after db close")
log.Error("localstore closed with still active goroutines")
// Print a full goroutine dump to debug blocking.
// TODO: use a logger to write a goroutine profile
pprof.Lookup("goroutine").WriteTo(os.Stdout, 2)
}
return db.shed.Close()
}
Expand Down
2 changes: 2 additions & 0 deletions storage/localstore/subscription_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
// stop subscription when until chunk descriptor is reached
var errStopSubscription = errors.New("stop subscription")

db.subscritionsWG.Add(1)
go func() {
defer db.subscritionsWG.Done()
defer metrics.GetOrRegisterCounter(metricName+".stop", nil).Inc(1)
// close the returned chunk.Descriptor channel at the end to
// signal that the subscription is done
Expand Down
2 changes: 2 additions & 0 deletions storage/localstore/subscription_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
stopChan := make(chan struct{})
var stopChanOnce sync.Once

db.subscritionsWG.Add(1)
go func() {
defer db.subscritionsWG.Done()
defer metrics.GetOrRegisterCounter(metricName+".done", nil).Inc(1)
// close the returned chunkInfo channel at the end to
// signal that the subscription is done
Expand Down