Skip to content

Commit

Permalink
fix: close databases in deterministic order (#1623)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae authored Oct 18, 2022
1 parent 7c83b32 commit 7a8b33c
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 31 deletions.
9 changes: 0 additions & 9 deletions pkg/storage/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,6 @@ func (s *Storage) newBadger(name string, p Prefix, codec cache.Codec) (BadgerDBW
return d, nil
}

func (d *db) Close() {
if d.Cache != nil {
d.Cache.Flush()
}
if err := d.DB.Close(); err != nil {
d.logger.WithError(err).Error("closing database")
}
}

func (d *db) Size() bytesize.ByteSize {
// The value is updated once per minute.
lsm, vlog := d.DB.Size()
Expand Down
64 changes: 43 additions & 21 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,49 @@ func (s *Storage) Close() error {
s.logger.Debug("waiting for storage tasks to finish")
s.tasksWG.Wait()
s.logger.Debug("storage tasks finished")
// Dictionaries DB has to close last because trees and profiles DBs depend on it.
s.goDB(func(d BadgerDBWithCache) {
if d != s.dicts {
d.Close()
}
})
s.dicts.Close()

// Flush caches. Dictionaries DB has to close last because trees depends on it.
// Exemplars DB does not have a cache but depends on Dictionaries DB as well:
// there is no need to force synchronization, as exemplars storage listens to
// the s.stop channel and stops synchronously.
caches := []BadgerDBWithCache{
s.trees,
s.segments,
s.dimensions,
}
wg := new(sync.WaitGroup)
wg.Add(len(caches))
for _, d := range caches {
go func(d BadgerDBWithCache) {
d.CacheInstance().Flush()
wg.Done()
}(d)
}
wg.Wait()

// Flush dictionaries cache only when all the dependant caches are flushed.
s.dicts.CacheInstance().Flush()

// Close databases. Order does not matter.
dbs := []BadgerDBWithCache{
s.trees,
s.segments,
s.dimensions,
s.exemplars.db,
s.dicts,
s.main, // Also stores labels.
}
wg = new(sync.WaitGroup)
wg.Add(len(dbs))
for _, d := range dbs {
go func(d BadgerDBWithCache) {
defer wg.Done()
if err := d.DBInstance().Close(); err != nil {
s.logger.WithField("name", d.Name()).WithError(err).Error("closing database")
}
}(d)
}
wg.Wait()
return nil
}

Expand Down Expand Up @@ -202,20 +238,6 @@ func (s *Storage) withContext(fn func(context.Context)) {
fn(ctx)
}

// goDB runs f for all DBs concurrently.
func (s *Storage) goDB(f func(BadgerDBWithCache)) {
dbs := s.databases()
wg := new(sync.WaitGroup)
wg.Add(len(dbs))
for _, d := range dbs {
go func(db BadgerDBWithCache) {
defer wg.Done()
f(db)
}(d)
}
wg.Wait()
}

// maintenanceTask periodically runs f exclusively.
func (s *Storage) maintenanceTask(interval time.Duration, f func()) {
s.periodicTask(interval, func() {
Expand Down
1 change: 0 additions & 1 deletion pkg/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ type BadgerDBWithCache interface {
BadgerDB
CacheLayer

Close()
Size() bytesize.ByteSize
CacheSize() uint64

Expand Down

0 comments on commit 7a8b33c

Please sign in to comment.