diff --git a/datastore/concurrent_txn.go b/datastore/concurrent_txn.go index 2a8aed015e..151c499360 100644 --- a/datastore/concurrent_txn.go +++ b/datastore/concurrent_txn.go @@ -31,7 +31,7 @@ type concurrentTxn struct { } // NewConcurrentTxnFrom creates a new Txn from rootstore that supports concurrent API calls -func NewConcurrentTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uint64, readonly bool) (Txn, error) { +func NewConcurrentTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readonly bool) (Txn, error) { var rootTxn ds.Txn var err error @@ -54,7 +54,6 @@ func NewConcurrentTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uin return &txn{ rootConcurentTxn, multistore, - id, []func(){}, []func(){}, []func(){}, diff --git a/datastore/concurrent_txt_test.go b/datastore/concurrent_txt_test.go index f3e03b8c3e..c2c2de7985 100644 --- a/datastore/concurrent_txt_test.go +++ b/datastore/concurrent_txt_test.go @@ -28,7 +28,7 @@ func TestNewConcurrentTxnFrom(t *testing.T) { rootstore, err := badgerds.NewDatastore("", &opts) require.NoError(t, err) - txn, err := NewConcurrentTxnFrom(ctx, rootstore, 0, false) + txn, err := NewConcurrentTxnFrom(ctx, rootstore, false) require.NoError(t, err) err = txn.Commit(ctx) @@ -44,7 +44,7 @@ func TestNewConcurrentTxnFromWithStoreClosed(t *testing.T) { err = rootstore.Close() require.NoError(t, err) - _, err = NewConcurrentTxnFrom(ctx, rootstore, 0, false) + _, err = NewConcurrentTxnFrom(ctx, rootstore, false) require.ErrorIs(t, err, badgerds.ErrClosed) } @@ -52,7 +52,7 @@ func TestNewConcurrentTxnFromNonIterable(t *testing.T) { ctx := context.Background() rootstore := memory.NewDatastore(ctx) - txn, err := NewConcurrentTxnFrom(ctx, rootstore, 0, false) + txn, err := NewConcurrentTxnFrom(ctx, rootstore, false) require.NoError(t, err) err = txn.Commit(ctx) @@ -66,7 +66,7 @@ func TestNewConcurrentTxnFromNonIterableWithStoreClosed(t *testing.T) { err := rootstore.Close() require.NoError(t, err) - _, err = NewConcurrentTxnFrom(ctx, rootstore, 0, false) + _, err = NewConcurrentTxnFrom(ctx, rootstore, false) require.ErrorIs(t, err, badgerds.ErrClosed) } diff --git a/datastore/txn.go b/datastore/txn.go index d0fa3d2f35..499d225556 100644 --- a/datastore/txn.go +++ b/datastore/txn.go @@ -22,9 +22,6 @@ import ( type Txn interface { MultiStore - // ID returns the unique immutable identifier for this transaction. - ID() uint64 - // Commit finalizes a transaction, attempting to commit it to the Datastore. // May return an error if the transaction has gone stale. The presence of an // error is an indication that the data was not committed to the Datastore. @@ -49,8 +46,6 @@ type txn struct { t ds.Txn MultiStore - id uint64 - successFns []func() errorFns []func() discardFns []func() @@ -59,7 +54,7 @@ type txn struct { var _ Txn = (*txn)(nil) // NewTxnFrom returns a new Txn from the rootstore. -func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uint64, readonly bool) (Txn, error) { +func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readonly bool) (Txn, error) { // check if our datastore natively supports iterable transaction, transactions or batching if iterableTxnStore, ok := rootstore.(iterable.IterableTxnDatastore); ok { rootTxn, err := iterableTxnStore.NewIterableTransaction(ctx, readonly) @@ -70,7 +65,6 @@ func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uint64, reado return &txn{ rootTxn, multistore, - id, []func(){}, []func(){}, []func(){}, @@ -87,18 +81,12 @@ func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uint64, reado return &txn{ rootTxn, multistore, - id, []func(){}, []func(){}, []func(){}, }, nil } -// ID returns the unique immutable identifier for this transaction. -func (t *txn) ID() uint64 { - return t.id -} - // Commit finalizes a transaction, attempting to commit it to the Datastore. func (t *txn) Commit(ctx context.Context) error { if err := t.t.Commit(ctx); err != nil { diff --git a/datastore/txn_test.go b/datastore/txn_test.go index e46dbdae8f..2c25e9be00 100644 --- a/datastore/txn_test.go +++ b/datastore/txn_test.go @@ -27,7 +27,7 @@ func TestNewTxnFrom(t *testing.T) { rootstore, err := badgerds.NewDatastore("", &opts) require.NoError(t, err) - txn, err := NewTxnFrom(ctx, rootstore, 0, false) + txn, err := NewTxnFrom(ctx, rootstore, false) require.NoError(t, err) err = txn.Commit(ctx) @@ -43,7 +43,7 @@ func TestNewTxnFromWithStoreClosed(t *testing.T) { err = rootstore.Close() require.NoError(t, err) - _, err = NewTxnFrom(ctx, rootstore, 0, false) + _, err = NewTxnFrom(ctx, rootstore, false) require.ErrorIs(t, err, badgerds.ErrClosed) } @@ -53,7 +53,7 @@ func TestOnSuccess(t *testing.T) { rootstore, err := badgerds.NewDatastore("", &opts) require.NoError(t, err) - txn, err := NewTxnFrom(ctx, rootstore, 0, false) + txn, err := NewTxnFrom(ctx, rootstore, false) require.NoError(t, err) txn.OnSuccess(nil) @@ -74,7 +74,7 @@ func TestOnError(t *testing.T) { rootstore, err := badgerds.NewDatastore("", &opts) require.NoError(t, err) - txn, err := NewTxnFrom(ctx, rootstore, 0, false) + txn, err := NewTxnFrom(ctx, rootstore, false) require.NoError(t, err) txn.OnError(nil) diff --git a/db/db.go b/db/db.go index b9ea5ac2a3..241f96a536 100644 --- a/db/db.go +++ b/db/db.go @@ -17,7 +17,6 @@ package db import ( "context" "sync" - "sync/atomic" blockstore "github.com/ipfs/boxo/blockstore" ds "github.com/ipfs/go-datastore" @@ -51,13 +50,6 @@ const ( // DB is the main interface for interacting with the // DefraDB storage system. type db struct { - // The ID of the last transaction created. - // - // Warning: we currently rely on this prop being 64-bit aligned in memory - // relative to the start of the `db` struct (for atomic.Add calls). The - // easiest way to ensure this alignment is to declare it at the top. - previousTxnID uint64 - glock sync.RWMutex rootstore datastore.RootStore @@ -158,15 +150,12 @@ func newDB(ctx context.Context, rootstore datastore.RootStore, options ...Option // NewTxn creates a new transaction. func (db *db) NewTxn(ctx context.Context, readonly bool) (datastore.Txn, error) { - txnId := atomic.AddUint64(&db.previousTxnID, 1) - - return datastore.NewTxnFrom(ctx, db.rootstore, txnId, readonly) + return datastore.NewTxnFrom(ctx, db.rootstore, readonly) } // NewConcurrentTxn creates a new transaction that supports concurrent API calls. func (db *db) NewConcurrentTxn(ctx context.Context, readonly bool) (datastore.Txn, error) { - txnId := atomic.AddUint64(&db.previousTxnID, 1) - return datastore.NewConcurrentTxnFrom(ctx, db.rootstore, txnId, readonly) + return datastore.NewConcurrentTxnFrom(ctx, db.rootstore, readonly) } // WithTxn returns a new [client.Store] that respects the given transaction. diff --git a/db/fetcher/versioned.go b/db/fetcher/versioned.go index 69d54abd4f..53ae6b8eaf 100644 --- a/db/fetcher/versioned.go +++ b/db/fetcher/versioned.go @@ -121,8 +121,6 @@ func (vf *VersionedFetcher) Init( vf.store, err = datastore.NewTxnFrom( ctx, vf.root, - // We can take the parent txn id here - txn.ID(), false, ) // were going to discard and nuke this later if err != nil { diff --git a/lens/registry.go b/lens/registry.go index a4074ca7f4..eb0c95f1bf 100644 --- a/lens/registry.go +++ b/lens/registry.go @@ -53,12 +53,6 @@ type lensRegistry struct { // lens configurations by source schema version ID configs map[string]client.LensConfig configLock sync.RWMutex - - // Writable transaction contexts by transaction ID. - // - // Read-only transaction contexts are not tracked. - txnCtxs map[uint64]*txnContext - txnLock sync.RWMutex } // txnContext contains uncommitted transaction state tracked by the registry, @@ -109,29 +103,16 @@ func NewRegistry(lensPoolSize immutable.Option[int], db TxnSource) client.LensRe lensPoolsBySchemaVersionID: map[string]*lensPool{}, reversedPoolsBySchemaVersionID: map[string]*lensPool{}, configs: map[string]client.LensConfig{}, - txnCtxs: map[uint64]*txnContext{}, }, } } func (r *lensRegistry) getCtx(txn datastore.Txn, readonly bool) *txnContext { - r.txnLock.RLock() - if txnCtx, ok := r.txnCtxs[txn.ID()]; ok { - r.txnLock.RUnlock() - return txnCtx - } - r.txnLock.RUnlock() - txnCtx := newTxnCtx(txn) if readonly { return txnCtx } - - r.txnLock.Lock() - r.txnCtxs[txn.ID()] = txnCtx - r.txnLock.Unlock() - - txnCtx.txn.OnSuccess(func() { + txn.OnSuccess(func() { r.poolLock.Lock() for schemaVersionID, locker := range txnCtx.lensPoolsBySchemaVersionID { r.lensPoolsBySchemaVersionID[schemaVersionID] = locker @@ -146,24 +127,6 @@ func (r *lensRegistry) getCtx(txn datastore.Txn, readonly bool) *txnContext { r.configs[schemaVersionID] = cfg } r.configLock.Unlock() - - r.txnLock.Lock() - delete(r.txnCtxs, txn.ID()) - r.txnLock.Unlock() - }) - - txn.OnError(func() { - r.txnLock.Lock() - delete(r.txnCtxs, txn.ID()) - r.txnLock.Unlock() - }) - - txn.OnDiscard(func() { - // Delete it to help reduce the build up of memory, the txnCtx will be re-contructed if the - // txn is reused after discard. - r.txnLock.Lock() - delete(r.txnCtxs, txn.ID()) - r.txnLock.Unlock() }) return txnCtx diff --git a/lens/txn_registry.go b/lens/txn_registry.go index 954db01e0c..38129f4983 100644 --- a/lens/txn_registry.go +++ b/lens/txn_registry.go @@ -27,6 +27,7 @@ type implicitTxnLensRegistry struct { type explicitTxnLensRegistry struct { registry *lensRegistry txn datastore.Txn + txnCtx *txnContext } var _ client.LensRegistry = (*implicitTxnLensRegistry)(nil) @@ -36,6 +37,7 @@ func (r *implicitTxnLensRegistry) WithTxn(txn datastore.Txn) client.LensRegistry return &explicitTxnLensRegistry{ registry: r.registry, txn: txn, + txnCtx: r.registry.getCtx(txn, false), } } @@ -43,6 +45,7 @@ func (r *explicitTxnLensRegistry) WithTxn(txn datastore.Txn) client.LensRegistry return &explicitTxnLensRegistry{ registry: r.registry, txn: txn, + txnCtx: r.registry.getCtx(txn, false), } } @@ -63,7 +66,7 @@ func (r *implicitTxnLensRegistry) SetMigration(ctx context.Context, cfg client.L } func (r *explicitTxnLensRegistry) SetMigration(ctx context.Context, cfg client.LensConfig) error { - return r.registry.setMigration(ctx, r.registry.getCtx(r.txn, false), cfg) + return r.registry.setMigration(ctx, r.txnCtx, cfg) } func (r *implicitTxnLensRegistry) ReloadLenses(ctx context.Context) error { @@ -83,7 +86,7 @@ func (r *implicitTxnLensRegistry) ReloadLenses(ctx context.Context) error { } func (r *explicitTxnLensRegistry) ReloadLenses(ctx context.Context) error { - return r.registry.reloadLenses(ctx, r.registry.getCtx(r.txn, true)) + return r.registry.reloadLenses(ctx, r.txnCtx) } func (r *implicitTxnLensRegistry) MigrateUp( @@ -106,7 +109,7 @@ func (r *explicitTxnLensRegistry) MigrateUp( src enumerable.Enumerable[LensDoc], schemaVersionID string, ) (enumerable.Enumerable[map[string]any], error) { - return r.registry.migrateUp(r.registry.getCtx(r.txn, true), src, schemaVersionID) + return r.registry.migrateUp(r.txnCtx, src, schemaVersionID) } func (r *implicitTxnLensRegistry) MigrateDown( @@ -144,7 +147,7 @@ func (r *implicitTxnLensRegistry) Config(ctx context.Context) ([]client.LensConf } func (r *explicitTxnLensRegistry) Config(ctx context.Context) ([]client.LensConfig, error) { - return r.registry.config(r.registry.getCtx(r.txn, true)), nil + return r.registry.config(r.txnCtx), nil } func (r *implicitTxnLensRegistry) HasMigration(ctx context.Context, schemaVersionID string) (bool, error) { @@ -159,5 +162,5 @@ func (r *implicitTxnLensRegistry) HasMigration(ctx context.Context, schemaVersio } func (r *explicitTxnLensRegistry) HasMigration(ctx context.Context, schemaVersionID string) (bool, error) { - return r.registry.hasMigration(r.registry.getCtx(r.txn, true), schemaVersionID), nil + return r.registry.hasMigration(r.txnCtx, schemaVersionID), nil }