Skip to content

Commit

Permalink
fix: Fix deadlock on memory-datastore Close (sourcenetwork#1273)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSisley authored Apr 4, 2023
1 parent ef87d69 commit bb7c812
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 66 deletions.
99 changes: 56 additions & 43 deletions datastore/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type dsTxn struct {
dsVersion uint64
txnVersion uint64
expiresAt time.Time
txn *basicTxn
}

func byDSVersion(a, b dsTxn) bool {
Expand Down Expand Up @@ -57,22 +58,17 @@ func byKeys(a, b dsItem) bool {
}
}

type commit struct {
tx *basicTxn
err chan error
}

// Datastore uses a btree for internal storage.
type Datastore struct {
// Latest committed version.
version *uint64
values *btree.BTreeG[dsItem]
inFlightTxn *btree.BTreeG[dsTxn]
commit chan commit

closing chan struct{}
closed bool
closeLk sync.RWMutex
closing chan struct{}
closed bool
closeLk sync.RWMutex
commitLk sync.Mutex
}

var _ ds.Datastore = (*Datastore)(nil)
Expand All @@ -87,10 +83,9 @@ func NewDatastore(ctx context.Context) *Datastore {
values: btree.NewBTreeG(byKeys),
inFlightTxn: btree.NewBTreeG(byDSVersion),
closing: make(chan struct{}),
commit: make(chan commit),
}
go d.purgeOldVersions(ctx)
go d.commitHandler(ctx)
go d.handleContextDone(ctx)
return d
}

Expand Down Expand Up @@ -124,7 +119,14 @@ func (d *Datastore) Close() error {

d.closed = true
close(d.closing)
close(d.commit)

iter := d.inFlightTxn.Iter()

for iter.Next() {
iter.Item().txn.close()
}
iter.Release()

return nil
}

Expand Down Expand Up @@ -203,15 +205,22 @@ func (d *Datastore) NewTransaction(ctx context.Context, readOnly bool) (ds.Txn,
}

// newTransaction returns a ds.Txn datastore.
//
// isInternal should be set to true if this transaction is created from within the
// datastore and is already protected by stuff like locks. Failure to correctly set
// this to true may result in deadlocks. Failure to correctly set it to false may lead
// to other concurrency issues.
func (d *Datastore) newTransaction(readOnly bool) ds.Txn {
v := d.getVersion()
d.inFlightTxn.Set(dsTxn{v, v + 1, time.Now().Add(1 * time.Hour)})
return &basicTxn{
txn := &basicTxn{
ops: btree.NewBTreeG(byKeys),
ds: d,
readOnly: readOnly,
dsVersion: &v,
}

d.inFlightTxn.Set(dsTxn{v, v + 1, time.Now().Add(1 * time.Hour), txn})
return txn
}

// Put implements ds.Put.
Expand Down Expand Up @@ -337,37 +346,41 @@ func (d *Datastore) executePurge(ctx context.Context) {
}
}

func (d *Datastore) commitHandler(ctx context.Context) {
for {
select {
case <-ctx.Done():
// It is safe to ignore the error since the only error that could occur is if the
// datastore is already closed, in which case the purpose of the `Close` call is already covered.
_ = d.Close()
return
case c, open := <-d.commit:
if !open {
return
}
err := c.tx.checkForConflicts(ctx)
if err != nil {
c.err <- err
continue
}
iter := c.tx.ops.Iter()
v := d.nextVersion()
for iter.Next() {
if iter.Item().isGet {
continue
}
item := iter.Item()
item.version = v
d.values.Set(item)
}
iter.Release()
close(c.err)
func (d *Datastore) handleContextDone(ctx context.Context) {
<-ctx.Done()
// It is safe to ignore the error since the only error that could occur is if the
// datastore is already closed, in which case the purpose of the `Close` call is already covered.
_ = d.Close()
}

// commit commits the given transaction to the datastore.
//
// WARNING: This is a notable bottleneck, as commits can only be commited one at a time (handled internally).
// This is to ensure correct, threadsafe, mututation of the datastore version.
func (d *Datastore) commit(ctx context.Context, t *basicTxn) error {
d.commitLk.Lock()
defer d.commitLk.Unlock()

// The commitLk scope must include checkForConflicts, and it must be a write lock. The datastore version
// cannot be allowed to change between here and the release of the iterator, else the check for conflicts
// will be stale and potentially out of date.
err := t.checkForConflicts(ctx)
if err != nil {
return err
}

iter := t.ops.Iter()
v := t.ds.nextVersion()
for iter.Next() {
if iter.Item().isGet {
continue
}
item := iter.Item()
item.version = v
t.ds.values.Set(item)
}
iter.Release()
return nil
}

func (d *Datastore) clearOldInFlightTxn(ctx context.Context) {
Expand Down
58 changes: 35 additions & 23 deletions datastore/memory/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package memory

import (
"context"
"sync"
"sync/atomic"

ds "github.com/ipfs/go-datastore"
Expand All @@ -27,6 +28,9 @@ type basicTxn struct {
dsVersion *uint64
readOnly bool
discarded bool

closed bool
closeLk sync.RWMutex
}

var _ ds.Txn = (*basicTxn)(nil)
Expand All @@ -41,11 +45,12 @@ func (t *basicTxn) getTxnVersion() uint64 {

// Delete implements ds.Delete.
func (t *basicTxn) Delete(ctx context.Context, key ds.Key) error {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
t.closeLk.RLock()
defer t.closeLk.RUnlock()
if t.closed {
return ErrClosed
}

if t.discarded {
return ErrTxnDiscarded
}
Expand Down Expand Up @@ -82,11 +87,12 @@ func (t *basicTxn) get(ctx context.Context, key ds.Key) dsItem {

// Get implements ds.Get.
func (t *basicTxn) Get(ctx context.Context, key ds.Key) ([]byte, error) {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
t.closeLk.RLock()
defer t.closeLk.RUnlock()
if t.closed {
return nil, ErrClosed
}

if t.discarded {
return nil, ErrTxnDiscarded
}
Expand All @@ -99,11 +105,12 @@ func (t *basicTxn) Get(ctx context.Context, key ds.Key) ([]byte, error) {

// GetSize implements ds.GetSize.
func (t *basicTxn) GetSize(ctx context.Context, key ds.Key) (size int, err error) {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
t.closeLk.RLock()
defer t.closeLk.RUnlock()
if t.closed {
return 0, ErrClosed
}

if t.discarded {
return 0, ErrTxnDiscarded
}
Expand All @@ -121,6 +128,7 @@ func (t *basicTxn) Has(ctx context.Context, key ds.Key) (exists bool, err error)
if t.ds.closed {
return false, ErrClosed
}

if t.discarded {
return false, ErrTxnDiscarded
}
Expand All @@ -133,11 +141,12 @@ func (t *basicTxn) Has(ctx context.Context, key ds.Key) (exists bool, err error)

// Put implements ds.Put.
func (t *basicTxn) Put(ctx context.Context, key ds.Key, value []byte) error {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
t.closeLk.RLock()
defer t.closeLk.RUnlock()
if t.closed {
return ErrClosed
}

if t.discarded {
return ErrTxnDiscarded
}
Expand All @@ -151,11 +160,12 @@ func (t *basicTxn) Put(ctx context.Context, key ds.Key, value []byte) error {

// Query implements ds.Query.
func (t *basicTxn) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
t.closeLk.RLock()
defer t.closeLk.RUnlock()
if t.ds.closed {
return nil, ErrClosed
}

if t.discarded {
return nil, ErrTxnDiscarded
}
Expand Down Expand Up @@ -235,23 +245,19 @@ func (t *basicTxn) Discard(ctx context.Context) {

// Commit saves the operations to the underlying datastore.
func (t *basicTxn) Commit(ctx context.Context) error {
t.ds.closeLk.RLock()
defer t.ds.closeLk.RUnlock()
if t.ds.closed {
t.closeLk.RLock()
defer t.closeLk.RUnlock()
if t.closed {
return ErrClosed
}

if t.discarded {
return ErrTxnDiscarded
}
defer t.Discard(ctx)

if !t.readOnly {
c := commit{
tx: t,
err: make(chan error),
}
t.ds.commit <- c
return <-c.err
return t.ds.commit(ctx, t)
}

return nil
Expand Down Expand Up @@ -282,3 +288,9 @@ func (t *basicTxn) clearInFlightTxn(ctx context.Context) {
)
t.ds.clearOldInFlightTxn(ctx)
}

func (t *basicTxn) close() {
t.closeLk.Lock()
defer t.closeLk.Unlock()
t.closed = true
}

0 comments on commit bb7c812

Please sign in to comment.