diff --git a/datastore/memory/memory.go b/datastore/memory/memory.go index 277c24a0fc..e650776623 100644 --- a/datastore/memory/memory.go +++ b/datastore/memory/memory.go @@ -25,6 +25,7 @@ type dsTxn struct { dsVersion uint64 txnVersion uint64 expiresAt time.Time + txn *basicTxn } func byDSVersion(a, b dsTxn) bool { @@ -57,22 +58,17 @@ func byKeys(a, b dsItem) bool { } } -type commit struct { - tx *basicTxn - err chan error -} - // Datastore uses a btree for internal storage. type Datastore struct { // Latest committed version. version *uint64 values *btree.BTreeG[dsItem] inFlightTxn *btree.BTreeG[dsTxn] - commit chan commit - closing chan struct{} - closed bool - closeLk sync.RWMutex + closing chan struct{} + closed bool + closeLk sync.RWMutex + commitLk sync.Mutex } var _ ds.Datastore = (*Datastore)(nil) @@ -87,10 +83,9 @@ func NewDatastore(ctx context.Context) *Datastore { values: btree.NewBTreeG(byKeys), inFlightTxn: btree.NewBTreeG(byDSVersion), closing: make(chan struct{}), - commit: make(chan commit), } go d.purgeOldVersions(ctx) - go d.commitHandler(ctx) + go d.handleContextDone(ctx) return d } @@ -124,7 +119,14 @@ func (d *Datastore) Close() error { d.closed = true close(d.closing) - close(d.commit) + + iter := d.inFlightTxn.Iter() + + for iter.Next() { + iter.Item().txn.close() + } + iter.Release() + return nil } @@ -203,15 +205,22 @@ func (d *Datastore) NewTransaction(ctx context.Context, readOnly bool) (ds.Txn, } // newTransaction returns a ds.Txn datastore. +// +// isInternal should be set to true if this transaction is created from within the +// datastore and is already protected by stuff like locks. Failure to correctly set +// this to true may result in deadlocks. Failure to correctly set it to false may lead +// to other concurrency issues. func (d *Datastore) newTransaction(readOnly bool) ds.Txn { v := d.getVersion() - d.inFlightTxn.Set(dsTxn{v, v + 1, time.Now().Add(1 * time.Hour)}) - return &basicTxn{ + txn := &basicTxn{ ops: btree.NewBTreeG(byKeys), ds: d, readOnly: readOnly, dsVersion: &v, } + + d.inFlightTxn.Set(dsTxn{v, v + 1, time.Now().Add(1 * time.Hour), txn}) + return txn } // Put implements ds.Put. @@ -337,37 +346,41 @@ func (d *Datastore) executePurge(ctx context.Context) { } } -func (d *Datastore) commitHandler(ctx context.Context) { - for { - select { - case <-ctx.Done(): - // It is safe to ignore the error since the only error that could occur is if the - // datastore is already closed, in which case the purpose of the `Close` call is already covered. - _ = d.Close() - return - case c, open := <-d.commit: - if !open { - return - } - err := c.tx.checkForConflicts(ctx) - if err != nil { - c.err <- err - continue - } - iter := c.tx.ops.Iter() - v := d.nextVersion() - for iter.Next() { - if iter.Item().isGet { - continue - } - item := iter.Item() - item.version = v - d.values.Set(item) - } - iter.Release() - close(c.err) +func (d *Datastore) handleContextDone(ctx context.Context) { + <-ctx.Done() + // It is safe to ignore the error since the only error that could occur is if the + // datastore is already closed, in which case the purpose of the `Close` call is already covered. + _ = d.Close() +} + +// commit commits the given transaction to the datastore. +// +// WARNING: This is a notable bottleneck, as commits can only be commited one at a time (handled internally). +// This is to ensure correct, threadsafe, mututation of the datastore version. +func (d *Datastore) commit(ctx context.Context, t *basicTxn) error { + d.commitLk.Lock() + defer d.commitLk.Unlock() + + // The commitLk scope must include checkForConflicts, and it must be a write lock. The datastore version + // cannot be allowed to change between here and the release of the iterator, else the check for conflicts + // will be stale and potentially out of date. + err := t.checkForConflicts(ctx) + if err != nil { + return err + } + + iter := t.ops.Iter() + v := t.ds.nextVersion() + for iter.Next() { + if iter.Item().isGet { + continue } + item := iter.Item() + item.version = v + t.ds.values.Set(item) } + iter.Release() + return nil } func (d *Datastore) clearOldInFlightTxn(ctx context.Context) { diff --git a/datastore/memory/txn.go b/datastore/memory/txn.go index 416bda6ffc..3cd7ab2bf9 100644 --- a/datastore/memory/txn.go +++ b/datastore/memory/txn.go @@ -12,6 +12,7 @@ package memory import ( "context" + "sync" "sync/atomic" ds "github.com/ipfs/go-datastore" @@ -27,6 +28,9 @@ type basicTxn struct { dsVersion *uint64 readOnly bool discarded bool + + closed bool + closeLk sync.RWMutex } var _ ds.Txn = (*basicTxn)(nil) @@ -41,11 +45,12 @@ func (t *basicTxn) getTxnVersion() uint64 { // Delete implements ds.Delete. func (t *basicTxn) Delete(ctx context.Context, key ds.Key) error { - t.ds.closeLk.RLock() - defer t.ds.closeLk.RUnlock() - if t.ds.closed { + t.closeLk.RLock() + defer t.closeLk.RUnlock() + if t.closed { return ErrClosed } + if t.discarded { return ErrTxnDiscarded } @@ -82,11 +87,12 @@ func (t *basicTxn) get(ctx context.Context, key ds.Key) dsItem { // Get implements ds.Get. func (t *basicTxn) Get(ctx context.Context, key ds.Key) ([]byte, error) { - t.ds.closeLk.RLock() - defer t.ds.closeLk.RUnlock() - if t.ds.closed { + t.closeLk.RLock() + defer t.closeLk.RUnlock() + if t.closed { return nil, ErrClosed } + if t.discarded { return nil, ErrTxnDiscarded } @@ -99,11 +105,12 @@ func (t *basicTxn) Get(ctx context.Context, key ds.Key) ([]byte, error) { // GetSize implements ds.GetSize. func (t *basicTxn) GetSize(ctx context.Context, key ds.Key) (size int, err error) { - t.ds.closeLk.RLock() - defer t.ds.closeLk.RUnlock() - if t.ds.closed { + t.closeLk.RLock() + defer t.closeLk.RUnlock() + if t.closed { return 0, ErrClosed } + if t.discarded { return 0, ErrTxnDiscarded } @@ -121,6 +128,7 @@ func (t *basicTxn) Has(ctx context.Context, key ds.Key) (exists bool, err error) if t.ds.closed { return false, ErrClosed } + if t.discarded { return false, ErrTxnDiscarded } @@ -133,11 +141,12 @@ func (t *basicTxn) Has(ctx context.Context, key ds.Key) (exists bool, err error) // Put implements ds.Put. func (t *basicTxn) Put(ctx context.Context, key ds.Key, value []byte) error { - t.ds.closeLk.RLock() - defer t.ds.closeLk.RUnlock() - if t.ds.closed { + t.closeLk.RLock() + defer t.closeLk.RUnlock() + if t.closed { return ErrClosed } + if t.discarded { return ErrTxnDiscarded } @@ -151,11 +160,12 @@ func (t *basicTxn) Put(ctx context.Context, key ds.Key, value []byte) error { // Query implements ds.Query. func (t *basicTxn) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { - t.ds.closeLk.RLock() - defer t.ds.closeLk.RUnlock() + t.closeLk.RLock() + defer t.closeLk.RUnlock() if t.ds.closed { return nil, ErrClosed } + if t.discarded { return nil, ErrTxnDiscarded } @@ -235,23 +245,19 @@ func (t *basicTxn) Discard(ctx context.Context) { // Commit saves the operations to the underlying datastore. func (t *basicTxn) Commit(ctx context.Context) error { - t.ds.closeLk.RLock() - defer t.ds.closeLk.RUnlock() - if t.ds.closed { + t.closeLk.RLock() + defer t.closeLk.RUnlock() + if t.closed { return ErrClosed } + if t.discarded { return ErrTxnDiscarded } defer t.Discard(ctx) if !t.readOnly { - c := commit{ - tx: t, - err: make(chan error), - } - t.ds.commit <- c - return <-c.err + return t.ds.commit(ctx, t) } return nil @@ -282,3 +288,9 @@ func (t *basicTxn) clearInFlightTxn(ctx context.Context) { ) t.ds.clearOldInFlightTxn(ctx) } + +func (t *basicTxn) close() { + t.closeLk.Lock() + defer t.closeLk.Unlock() + t.closed = true +}