Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

storage/localstore: add subscriptions wait group before closing leveldb #1980

Merged
merged 3 commits into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions storage/localstore/localstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ type DB struct {
collectGarbageWorkerDone chan struct{}

putToGCCheck func([]byte) bool

// wait for all subscriptions to finish before closing
// underlaying LevelDB to prevent possible panics from
// iterators
subscritionsWG sync.WaitGroup
}

// Options struct holds optional parameters for configuring DB.
Expand Down Expand Up @@ -432,6 +437,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
func (db *DB) Close() (err error) {
close(db.close)
db.updateGCWG.Wait()
db.subscritionsWG.Wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not introduced by this PR only, but is it ok to do Wait() on an actual Close method, which is supposed to be for shutting down something? I mean, is it ensured that Wait() will not wait forever, e.g. because of blocked go routines?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well if you want a clean shutdown you'd probably want to wait until all of the goroutines that use a certain resource exit no?
if they don't exit it means that there's a deadlock and that should be fixed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we talked, I've added a deadline of 5 seconds, that should be reasonable. And also I have added by @acud suggestion a goroutine profile dump, which we should improve int he future to control where it is written, currently it is in stdout.


// wait for gc worker to
// return before closing the shed
Expand Down
2 changes: 2 additions & 0 deletions storage/localstore/subscription_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
// stop subscription when until chunk descriptor is reached
var errStopSubscription = errors.New("stop subscription")

db.subscritionsWG.Add(1)
go func() {
defer db.subscritionsWG.Done()
defer metrics.GetOrRegisterCounter(metricName+".stop", nil).Inc(1)
// close the returned chunk.Descriptor channel at the end to
// signal that the subscription is done
Expand Down
2 changes: 2 additions & 0 deletions storage/localstore/subscription_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
stopChan := make(chan struct{})
var stopChanOnce sync.Once

db.subscritionsWG.Add(1)
go func() {
defer db.subscritionsWG.Done()
defer metrics.GetOrRegisterCounter(metricName+".done", nil).Inc(1)
// close the returned chunkInfo channel at the end to
// signal that the subscription is done
Expand Down