Skip to content

Commit

Permalink
metadata: hold lock on storageitem update
Browse files Browse the repository at this point in the history
The locks usage is mixed up because two locks separate locks
are actually needed. With a specific lock, calls to SetValue
can be protected.

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
  • Loading branch information
tonistiigi committed Jun 15, 2021
1 parent 6a4a14b commit 976b7e9
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions cache/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,11 @@ func (s *Store) Close() error {

type StorageItem struct {
id string
vmu sync.RWMutex
values map[string]*Value
qmu sync.Mutex
queue []func(*bolt.Bucket) error
storage *Store
mu sync.RWMutex
}

func newStorageItem(id string, b *bolt.Bucket, s *Store) (*StorageItem, error) {
Expand Down Expand Up @@ -242,10 +243,6 @@ func (s *StorageItem) ID() string {
return s.id
}

func (s *StorageItem) View(fn func(b *bolt.Bucket) error) error {
return s.storage.View(s.id, fn)
}

func (s *StorageItem) Update(fn func(b *bolt.Bucket) error) error {
return s.storage.Update(s.id, fn)
}
Expand All @@ -255,17 +252,19 @@ func (s *StorageItem) Metadata() *StorageItem {
}

func (s *StorageItem) Keys() []string {
s.vmu.RLock()
keys := make([]string, 0, len(s.values))
for k := range s.values {
keys = append(keys, k)
}
s.vmu.RUnlock()
return keys
}

func (s *StorageItem) Get(k string) *Value {
s.mu.RLock()
s.vmu.RLock()
v := s.values[k]
s.mu.RUnlock()
s.vmu.RUnlock()
return v
}

Expand Down Expand Up @@ -310,14 +309,14 @@ func (s *StorageItem) SetExternal(k string, dt []byte) error {
}

func (s *StorageItem) Queue(fn func(b *bolt.Bucket) error) {
s.mu.Lock()
defer s.mu.Unlock()
s.qmu.Lock()
defer s.qmu.Unlock()
s.queue = append(s.queue, fn)
}

func (s *StorageItem) Commit() error {
s.mu.Lock()
defer s.mu.Unlock()
s.qmu.Lock()
defer s.qmu.Unlock()
return errors.WithStack(s.Update(func(b *bolt.Bucket) error {
for _, fn := range s.queue {
if err := fn(b); err != nil {
Expand All @@ -330,15 +329,23 @@ func (s *StorageItem) Commit() error {
}

func (s *StorageItem) Indexes() (out []string) {
s.vmu.RLock()
for _, v := range s.values {
if v.Index != "" {
out = append(out, v.Index)
}
}
s.vmu.RUnlock()
return
}

func (s *StorageItem) SetValue(b *bolt.Bucket, key string, v *Value) error {
s.vmu.Lock()
defer s.vmu.Unlock()
return s.setValue(b, key, v)
}

func (s *StorageItem) setValue(b *bolt.Bucket, key string, v *Value) error {
if v == nil {
if old, ok := s.values[key]; ok {
if old.Index != "" {
Expand Down Expand Up @@ -378,16 +385,16 @@ func (s *StorageItem) SetValue(b *bolt.Bucket, key string, v *Value) error {
var ErrSkipSetValue = errors.New("skip setting metadata value")

func (s *StorageItem) GetAndSetValue(key string, fn func(*Value) (*Value, error)) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.Update(func(b *bolt.Bucket) error {
s.vmu.Lock()
defer s.vmu.Unlock()
v, err := fn(s.values[key])
if errors.Is(err, ErrSkipSetValue) {
return nil
} else if err != nil {
return err
}
return s.SetValue(b, key, v)
return s.setValue(b, key, v)
})
}

Expand Down

0 comments on commit 976b7e9

Please sign in to comment.