diff --git a/api/http/handlerfuncs.go b/api/http/handlerfuncs.go index 9e5b212fe3..e4163de05f 100644 --- a/api/http/handlerfuncs.go +++ b/api/http/handlerfuncs.go @@ -298,12 +298,6 @@ func setMigrationHandler(rw http.ResponseWriter, req *http.Request) { return } - txn, err := db.NewTxn(req.Context(), false) - if err != nil { - handleErr(req.Context(), rw, err, http.StatusInternalServerError) - return - } - var cfg client.LensConfig err = json.Unmarshal(cfgStr, &cfg) if err != nil { @@ -311,13 +305,7 @@ func setMigrationHandler(rw http.ResponseWriter, req *http.Request) { return } - err = db.LensRegistry().SetMigration(req.Context(), txn, cfg) - if err != nil { - handleErr(req.Context(), rw, err, http.StatusInternalServerError) - return - } - - err = txn.Commit(req.Context()) + err = db.LensRegistry().SetMigration(req.Context(), cfg) if err != nil { handleErr(req.Context(), rw, err, http.StatusInternalServerError) return @@ -338,7 +326,7 @@ func getMigrationHandler(rw http.ResponseWriter, req *http.Request) { return } - cfgs := db.LensRegistry().Config() + cfgs, err := db.LensRegistry().Config(req.Context()) if err != nil { handleErr(req.Context(), rw, err, http.StatusInternalServerError) return diff --git a/client/lens.go b/client/lens.go index 1cffa19248..7b1264275f 100644 --- a/client/lens.go +++ b/client/lens.go @@ -43,6 +43,12 @@ type LensConfig struct { // LensRegistry exposes several useful thread-safe migration related functions which may // be used to manage migrations. type LensRegistry interface { + // WithTxn returns a new LensRegistry scoped to the given transaction. + // + // WARNING: Currently this does not provide snapshot isolation, if other transactions are commited + // after this has been created, the results of those commits will be visible within this scope. + WithTxn(datastore.Txn) LensRegistry + // SetMigration sets the migration for the given source-destination schema version IDs. Is equivilent to // calling `Store.SetMigration(ctx, cfg)`. // @@ -55,29 +61,37 @@ type LensRegistry interface { // // Migrations will only run if there is a complete path from the document schema version to the latest local // schema version. - SetMigration(context.Context, datastore.Txn, LensConfig) error + SetMigration(context.Context, LensConfig) error // ReloadLenses clears any cached migrations, loads their configurations from the database and re-initializes // them. It is run on database start if the database already existed. - ReloadLenses(ctx context.Context, txn datastore.Txn) error + ReloadLenses(context.Context) error // MigrateUp returns an enumerable that feeds the given source through the Lens migration for the given // schema version id if one is found, if there is no matching migration the given source will be returned. - MigrateUp(enumerable.Enumerable[map[string]any], string) (enumerable.Enumerable[map[string]any], error) + MigrateUp( + context.Context, + enumerable.Enumerable[map[string]any], + string, + ) (enumerable.Enumerable[map[string]any], error) // MigrateDown returns an enumerable that feeds the given source through the Lens migration for the schema // version that precedes the given schema version id in reverse, if one is found, if there is no matching // migration the given source will be returned. // // This downgrades any documents in the source enumerable if/when enumerated. - MigrateDown(enumerable.Enumerable[map[string]any], string) (enumerable.Enumerable[map[string]any], error) + MigrateDown( + context.Context, + enumerable.Enumerable[map[string]any], + string, + ) (enumerable.Enumerable[map[string]any], error) // Config returns a slice of the configurations of the currently loaded migrations. // // Modifying the slice does not affect the loaded configurations. - Config() []LensConfig + Config(context.Context) ([]LensConfig, error) // HasMigration returns true if there is a migration registered for the given schema version id, otherwise // will return false. - HasMigration(string) bool + HasMigration(context.Context, string) (bool, error) } diff --git a/datastore/concurrent_txn.go b/datastore/concurrent_txn.go index 5b2b6defd2..2a8aed015e 100644 --- a/datastore/concurrent_txn.go +++ b/datastore/concurrent_txn.go @@ -31,7 +31,7 @@ type concurrentTxn struct { } // NewConcurrentTxnFrom creates a new Txn from rootstore that supports concurrent API calls -func NewConcurrentTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readonly bool) (Txn, error) { +func NewConcurrentTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uint64, readonly bool) (Txn, error) { var rootTxn ds.Txn var err error @@ -54,6 +54,8 @@ func NewConcurrentTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readon return &txn{ rootConcurentTxn, multistore, + id, + []func(){}, []func(){}, []func(){}, }, nil diff --git a/datastore/concurrent_txt_test.go b/datastore/concurrent_txt_test.go index c2c2de7985..f3e03b8c3e 100644 --- a/datastore/concurrent_txt_test.go +++ b/datastore/concurrent_txt_test.go @@ -28,7 +28,7 @@ func TestNewConcurrentTxnFrom(t *testing.T) { rootstore, err := badgerds.NewDatastore("", &opts) require.NoError(t, err) - txn, err := NewConcurrentTxnFrom(ctx, rootstore, false) + txn, err := NewConcurrentTxnFrom(ctx, rootstore, 0, false) require.NoError(t, err) err = txn.Commit(ctx) @@ -44,7 +44,7 @@ func TestNewConcurrentTxnFromWithStoreClosed(t *testing.T) { err = rootstore.Close() require.NoError(t, err) - _, err = NewConcurrentTxnFrom(ctx, rootstore, false) + _, err = NewConcurrentTxnFrom(ctx, rootstore, 0, false) require.ErrorIs(t, err, badgerds.ErrClosed) } @@ -52,7 +52,7 @@ func TestNewConcurrentTxnFromNonIterable(t *testing.T) { ctx := context.Background() rootstore := memory.NewDatastore(ctx) - txn, err := NewConcurrentTxnFrom(ctx, rootstore, false) + txn, err := NewConcurrentTxnFrom(ctx, rootstore, 0, false) require.NoError(t, err) err = txn.Commit(ctx) @@ -66,7 +66,7 @@ func TestNewConcurrentTxnFromNonIterableWithStoreClosed(t *testing.T) { err := rootstore.Close() require.NoError(t, err) - _, err = NewConcurrentTxnFrom(ctx, rootstore, false) + _, err = NewConcurrentTxnFrom(ctx, rootstore, 0, false) require.ErrorIs(t, err, badgerds.ErrClosed) } diff --git a/datastore/mocks/txn.go b/datastore/mocks/txn.go index c9840e3d0c..dd3fb60def 100644 --- a/datastore/mocks/txn.go +++ b/datastore/mocks/txn.go @@ -226,6 +226,80 @@ func (_c *Txn_Headstore_Call) RunAndReturn(run func() datastore.DSReaderWriter) return _c } +// ID provides a mock function with given fields: +func (_m *Txn) ID() uint64 { + ret := _m.Called() + + var r0 uint64 + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + return r0 +} + +// Txn_ID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ID' +type Txn_ID_Call struct { + *mock.Call +} + +// ID is a helper method to define mock.On call +func (_e *Txn_Expecter) ID() *Txn_ID_Call { + return &Txn_ID_Call{Call: _e.mock.On("ID")} +} + +func (_c *Txn_ID_Call) Run(run func()) *Txn_ID_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *Txn_ID_Call) Return(_a0 uint64) *Txn_ID_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Txn_ID_Call) RunAndReturn(run func() uint64) *Txn_ID_Call { + _c.Call.Return(run) + return _c +} + +// OnDiscard provides a mock function with given fields: fn +func (_m *Txn) OnDiscard(fn func()) { + _m.Called(fn) +} + +// Txn_OnDiscard_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnDiscard' +type Txn_OnDiscard_Call struct { + *mock.Call +} + +// OnDiscard is a helper method to define mock.On call +// - fn func() +func (_e *Txn_Expecter) OnDiscard(fn interface{}) *Txn_OnDiscard_Call { + return &Txn_OnDiscard_Call{Call: _e.mock.On("OnDiscard", fn)} +} + +func (_c *Txn_OnDiscard_Call) Run(run func(fn func())) *Txn_OnDiscard_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(func())) + }) + return _c +} + +func (_c *Txn_OnDiscard_Call) Return() *Txn_OnDiscard_Call { + _c.Call.Return() + return _c +} + +func (_c *Txn_OnDiscard_Call) RunAndReturn(run func(func())) *Txn_OnDiscard_Call { + _c.Call.Return(run) + return _c +} + // OnError provides a mock function with given fields: fn func (_m *Txn) OnError(fn func()) { _m.Called(fn) diff --git a/datastore/txn.go b/datastore/txn.go index 45b968ea98..d0fa3d2f35 100644 --- a/datastore/txn.go +++ b/datastore/txn.go @@ -22,6 +22,9 @@ import ( type Txn interface { MultiStore + // ID returns the unique immutable identifier for this transaction. + ID() uint64 + // Commit finalizes a transaction, attempting to commit it to the Datastore. // May return an error if the transaction has gone stale. The presence of an // error is an indication that the data was not committed to the Datastore. @@ -32,22 +35,31 @@ type Txn interface { // state of the Datastore, making it safe to defer. Discard(ctx context.Context) + // OnSuccess registers a function to be called when the transaction is committed. OnSuccess(fn func()) + + // OnError registers a function to be called when the transaction is rolled back. OnError(fn func()) + + // OnDiscard registers a function to be called when the transaction is discarded. + OnDiscard(fn func()) } type txn struct { t ds.Txn MultiStore + id uint64 + successFns []func() errorFns []func() + discardFns []func() } var _ Txn = (*txn)(nil) // NewTxnFrom returns a new Txn from the rootstore. -func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readonly bool) (Txn, error) { +func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uint64, readonly bool) (Txn, error) { // check if our datastore natively supports iterable transaction, transactions or batching if iterableTxnStore, ok := rootstore.(iterable.IterableTxnDatastore); ok { rootTxn, err := iterableTxnStore.NewIterableTransaction(ctx, readonly) @@ -58,6 +70,8 @@ func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readonly bool) ( return &txn{ rootTxn, multistore, + id, + []func(){}, []func(){}, []func(){}, }, nil @@ -73,24 +87,32 @@ func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readonly bool) ( return &txn{ rootTxn, multistore, + id, + []func(){}, []func(){}, []func(){}, }, nil } +// ID returns the unique immutable identifier for this transaction. +func (t *txn) ID() uint64 { + return t.id +} + // Commit finalizes a transaction, attempting to commit it to the Datastore. func (t *txn) Commit(ctx context.Context) error { if err := t.t.Commit(ctx); err != nil { - t.runErrorFns(ctx) + runFns(t.errorFns) return err } - t.runSuccessFns(ctx) + runFns(t.successFns) return nil } // Discard throws away changes recorded in a transaction without committing. func (t *txn) Discard(ctx context.Context) { t.t.Discard(ctx) + runFns(t.discardFns) } // OnSuccess registers a function to be called when the transaction is committed. @@ -109,14 +131,16 @@ func (txn *txn) OnError(fn func()) { txn.errorFns = append(txn.errorFns, fn) } -func (txn *txn) runErrorFns(ctx context.Context) { - for _, fn := range txn.errorFns { - fn() +// OnDiscard registers a function to be called when the transaction is discarded. +func (txn *txn) OnDiscard(fn func()) { + if fn == nil { + return } + txn.discardFns = append(txn.discardFns, fn) } -func (txn *txn) runSuccessFns(ctx context.Context) { - for _, fn := range txn.successFns { +func runFns(fns []func()) { + for _, fn := range fns { fn() } } diff --git a/datastore/txn_test.go b/datastore/txn_test.go index 2c25e9be00..e46dbdae8f 100644 --- a/datastore/txn_test.go +++ b/datastore/txn_test.go @@ -27,7 +27,7 @@ func TestNewTxnFrom(t *testing.T) { rootstore, err := badgerds.NewDatastore("", &opts) require.NoError(t, err) - txn, err := NewTxnFrom(ctx, rootstore, false) + txn, err := NewTxnFrom(ctx, rootstore, 0, false) require.NoError(t, err) err = txn.Commit(ctx) @@ -43,7 +43,7 @@ func TestNewTxnFromWithStoreClosed(t *testing.T) { err = rootstore.Close() require.NoError(t, err) - _, err = NewTxnFrom(ctx, rootstore, false) + _, err = NewTxnFrom(ctx, rootstore, 0, false) require.ErrorIs(t, err, badgerds.ErrClosed) } @@ -53,7 +53,7 @@ func TestOnSuccess(t *testing.T) { rootstore, err := badgerds.NewDatastore("", &opts) require.NoError(t, err) - txn, err := NewTxnFrom(ctx, rootstore, false) + txn, err := NewTxnFrom(ctx, rootstore, 0, false) require.NoError(t, err) txn.OnSuccess(nil) @@ -74,7 +74,7 @@ func TestOnError(t *testing.T) { rootstore, err := badgerds.NewDatastore("", &opts) require.NoError(t, err) - txn, err := NewTxnFrom(ctx, rootstore, false) + txn, err := NewTxnFrom(ctx, rootstore, 0, false) require.NoError(t, err) txn.OnError(nil) diff --git a/db/db.go b/db/db.go index 8ffda296b4..0bc9a361c3 100644 --- a/db/db.go +++ b/db/db.go @@ -17,6 +17,7 @@ package db import ( "context" "sync" + "sync/atomic" blockstore "github.com/ipfs/boxo/blockstore" ds "github.com/ipfs/go-datastore" @@ -70,6 +71,9 @@ type db struct { // The options used to init the database options any + + // The ID of the last transaction created. + previousTxnID atomic.Uint64 } // Functional option type. @@ -138,7 +142,7 @@ func newDB(ctx context.Context, rootstore datastore.RootStore, options ...Option // lensPoolSize may be set by `options`, and because they are funcs on db // we have to mutate `db` here to set the registry. - db.lensRegistry = lens.NewRegistry(db.lensPoolSize) + db.lensRegistry = lens.NewRegistry(db.lensPoolSize, db) err = db.initialize(ctx) if err != nil { @@ -150,19 +154,22 @@ func newDB(ctx context.Context, rootstore datastore.RootStore, options ...Option // NewTxn creates a new transaction. func (db *db) NewTxn(ctx context.Context, readonly bool) (datastore.Txn, error) { - return datastore.NewTxnFrom(ctx, db.rootstore, readonly) + txnId := db.previousTxnID.Add(1) + return datastore.NewTxnFrom(ctx, db.rootstore, txnId, readonly) } // NewConcurrentTxn creates a new transaction that supports concurrent API calls. func (db *db) NewConcurrentTxn(ctx context.Context, readonly bool) (datastore.Txn, error) { - return datastore.NewConcurrentTxnFrom(ctx, db.rootstore, readonly) + txnId := db.previousTxnID.Add(1) + return datastore.NewConcurrentTxnFrom(ctx, db.rootstore, txnId, readonly) } // WithTxn returns a new [client.Store] that respects the given transaction. func (db *db) WithTxn(txn datastore.Txn) client.Store { return &explicitTxnDB{ - db: db, - txn: txn, + db: db, + txn: txn, + lensRegistry: db.lensRegistry.WithTxn(txn), } } @@ -210,7 +217,7 @@ func (db *db) initialize(ctx context.Context) error { return err } - err = db.lensRegistry.ReloadLenses(ctx, txn) + err = db.lensRegistry.ReloadLenses(ctx) if err != nil { return err } diff --git a/db/fetcher/versioned.go b/db/fetcher/versioned.go index 53ae6b8eaf..69d54abd4f 100644 --- a/db/fetcher/versioned.go +++ b/db/fetcher/versioned.go @@ -121,6 +121,8 @@ func (vf *VersionedFetcher) Init( vf.store, err = datastore.NewTxnFrom( ctx, vf.root, + // We can take the parent txn id here + txn.ID(), false, ) // were going to discard and nuke this later if err != nil { diff --git a/db/txn_db.go b/db/txn_db.go index a7096a46a7..b307d96e35 100644 --- a/db/txn_db.go +++ b/db/txn_db.go @@ -28,7 +28,8 @@ type implicitTxnDB struct { type explicitTxnDB struct { *db - txn datastore.Txn + txn datastore.Txn + lensRegistry client.LensRegistry } // ExecRequest executes a request against the database. @@ -286,7 +287,7 @@ func (db *implicitTxnDB) SetMigration(ctx context.Context, cfg client.LensConfig } defer txn.Discard(ctx) - err = db.lensRegistry.SetMigration(ctx, txn, cfg) + err = db.lensRegistry.SetMigration(ctx, cfg) if err != nil { return err } @@ -295,7 +296,7 @@ func (db *implicitTxnDB) SetMigration(ctx context.Context, cfg client.LensConfig } func (db *explicitTxnDB) SetMigration(ctx context.Context, cfg client.LensConfig) error { - return db.lensRegistry.SetMigration(ctx, db.txn, cfg) + return db.lensRegistry.SetMigration(ctx, cfg) } // SetReplicator adds a new replicator to the database. @@ -417,3 +418,10 @@ func (db *implicitTxnDB) BasicExport(ctx context.Context, config *client.BackupC func (db *explicitTxnDB) BasicExport(ctx context.Context, config *client.BackupConfig) error { return db.basicExport(ctx, db.txn, config) } + +// LensRegistry returns the LensRegistry in use by this database instance. +// +// It exposes several useful thread-safe migration related functions. +func (db *explicitTxnDB) LensRegistry() client.LensRegistry { + return db.lensRegistry +} diff --git a/lens/fetcher.go b/lens/fetcher.go index cefe59a4b0..ee01aa7983 100644 --- a/lens/fetcher.go +++ b/lens/fetcher.go @@ -76,15 +76,25 @@ func (f *lensedFetcher) Init( f.fieldDescriptionsByName[field.Name] = field } - history, err := getTargetedSchemaHistory(ctx, txn, f.registry.Config(), f.col.Schema.SchemaID, f.col.Schema.VersionID) + cfg, err := f.registry.Config(ctx) if err != nil { return err } - f.lens = new(f.registry, f.col.Schema.VersionID, history) + + history, err := getTargetedSchemaHistory(ctx, txn, cfg, f.col.Schema.SchemaID, f.col.Schema.VersionID) + if err != nil { + return err + } + f.lens = new(ctx, f.registry, f.col.Schema.VersionID, history) f.txn = txn for schemaVersionID := range history { - if f.registry.HasMigration(schemaVersionID) { + hasMigration, err := f.registry.HasMigration(ctx, schemaVersionID) + if err != nil { + return err + } + + if hasMigration { f.hasMigrations = true break } diff --git a/lens/lens.go b/lens/lens.go index e4d4895eea..86fcb0876f 100644 --- a/lens/lens.go +++ b/lens/lens.go @@ -11,6 +11,8 @@ package lens import ( + "context" + "github.com/sourcenetwork/immutable/enumerable" "github.com/sourcenetwork/defradb/client" @@ -42,6 +44,8 @@ type Lens interface { type lens struct { lensRegistry client.LensRegistry + ctx context.Context + // The primary access points to the lens pipes through which all things flow. lensPipesBySchemaVersionIDs map[schemaVersionID]enumerable.Concatenation[LensDoc] @@ -60,6 +64,7 @@ type lens struct { var _ Lens = (*lens)(nil) func new( + ctx context.Context, lensRegistry client.LensRegistry, targetSchemaVersionID schemaVersionID, schemaVersionHistory map[schemaVersionID]*targetedSchemaHistoryLink, @@ -69,6 +74,7 @@ func new( return &lens{ lensRegistry: lensRegistry, + ctx: ctx, source: enumerable.NewQueue[lensInput](), outputPipe: outputPipe, unknownVersionPipe: targetSource, @@ -175,7 +181,7 @@ func (l *lens) Next() (bool, error) { // Aquire a lens migration from the registery, using the junctionPipe as its source. // The new pipeHead will then be connected as a source to the next migration-stage on // the next loop. - pipeHead, err = l.lensRegistry.MigrateUp(junctionPipe, historyLocation.schemaVersionID) + pipeHead, err = l.lensRegistry.MigrateUp(l.ctx, junctionPipe, historyLocation.schemaVersionID) if err != nil { return false, err } @@ -185,7 +191,7 @@ func (l *lens) Next() (bool, error) { // Aquire a lens migration from the registery, using the junctionPipe as its source. // The new pipeHead will then be connected as a source to the next migration-stage on // the next loop. - pipeHead, err = l.lensRegistry.MigrateDown(junctionPipe, historyLocation.previous.Value().schemaVersionID) + pipeHead, err = l.lensRegistry.MigrateDown(l.ctx, junctionPipe, historyLocation.previous.Value().schemaVersionID) if err != nil { return false, err } diff --git a/lens/registry.go b/lens/registry.go index 6ea21433c4..a4074ca7f4 100644 --- a/lens/registry.go +++ b/lens/registry.go @@ -48,12 +48,43 @@ type lensRegistry struct { lensPoolsBySchemaVersionID map[string]*lensPool reversedPoolsBySchemaVersionID map[string]*lensPool + poolLock sync.RWMutex // lens configurations by source schema version ID - configs map[string]client.LensConfig + configs map[string]client.LensConfig + configLock sync.RWMutex + + // Writable transaction contexts by transaction ID. + // + // Read-only transaction contexts are not tracked. + txnCtxs map[uint64]*txnContext + txnLock sync.RWMutex +} + +// txnContext contains uncommitted transaction state tracked by the registry, +// stuff within here should be accessible from within this transaction but not +// from outside. +type txnContext struct { + txn datastore.Txn + lensPoolsBySchemaVersionID map[string]*lensPool + reversedPoolsBySchemaVersionID map[string]*lensPool + configs map[string]client.LensConfig +} + +func newTxnCtx(txn datastore.Txn) *txnContext { + return &txnContext{ + txn: txn, + lensPoolsBySchemaVersionID: map[string]*lensPool{}, + reversedPoolsBySchemaVersionID: map[string]*lensPool{}, + configs: map[string]client.LensConfig{}, + } } -var _ client.LensRegistry = (*lensRegistry)(nil) +// TxnSource represents an object capable of constructing the transactions that +// implicit-transaction registries need internally. +type TxnSource interface { + NewTxn(context.Context, bool) (datastore.Txn, error) +} // DefaultPoolSize is the default size of the lens pool for each schema version. const DefaultPoolSize int = 5 @@ -61,7 +92,7 @@ const DefaultPoolSize int = 5 // NewRegistry instantiates a new registery. // // It will be of size 5 (per schema version) if a size is not provided. -func NewRegistry(lensPoolSize immutable.Option[int]) *lensRegistry { +func NewRegistry(lensPoolSize immutable.Option[int], db TxnSource) client.LensRegistry { var size int if lensPoolSize.HasValue() { size = lensPoolSize.Value() @@ -69,17 +100,76 @@ func NewRegistry(lensPoolSize immutable.Option[int]) *lensRegistry { size = DefaultPoolSize } - return &lensRegistry{ - poolSize: size, - runtime: wazero.New(), - modulesByPath: map[string]module.Module{}, - lensPoolsBySchemaVersionID: map[string]*lensPool{}, - reversedPoolsBySchemaVersionID: map[string]*lensPool{}, - configs: map[string]client.LensConfig{}, + return &implicitTxnLensRegistry{ + db: db, + registry: &lensRegistry{ + poolSize: size, + runtime: wazero.New(), + modulesByPath: map[string]module.Module{}, + lensPoolsBySchemaVersionID: map[string]*lensPool{}, + reversedPoolsBySchemaVersionID: map[string]*lensPool{}, + configs: map[string]client.LensConfig{}, + txnCtxs: map[uint64]*txnContext{}, + }, } } -func (r *lensRegistry) SetMigration(ctx context.Context, txn datastore.Txn, cfg client.LensConfig) error { +func (r *lensRegistry) getCtx(txn datastore.Txn, readonly bool) *txnContext { + r.txnLock.RLock() + if txnCtx, ok := r.txnCtxs[txn.ID()]; ok { + r.txnLock.RUnlock() + return txnCtx + } + r.txnLock.RUnlock() + + txnCtx := newTxnCtx(txn) + if readonly { + return txnCtx + } + + r.txnLock.Lock() + r.txnCtxs[txn.ID()] = txnCtx + r.txnLock.Unlock() + + txnCtx.txn.OnSuccess(func() { + r.poolLock.Lock() + for schemaVersionID, locker := range txnCtx.lensPoolsBySchemaVersionID { + r.lensPoolsBySchemaVersionID[schemaVersionID] = locker + } + for schemaVersionID, locker := range txnCtx.reversedPoolsBySchemaVersionID { + r.reversedPoolsBySchemaVersionID[schemaVersionID] = locker + } + r.poolLock.Unlock() + + r.configLock.Lock() + for schemaVersionID, cfg := range txnCtx.configs { + r.configs[schemaVersionID] = cfg + } + r.configLock.Unlock() + + r.txnLock.Lock() + delete(r.txnCtxs, txn.ID()) + r.txnLock.Unlock() + }) + + txn.OnError(func() { + r.txnLock.Lock() + delete(r.txnCtxs, txn.ID()) + r.txnLock.Unlock() + }) + + txn.OnDiscard(func() { + // Delete it to help reduce the build up of memory, the txnCtx will be re-contructed if the + // txn is reused after discard. + r.txnLock.Lock() + delete(r.txnCtxs, txn.ID()) + r.txnLock.Unlock() + }) + + return txnCtx +} + +func (r *lensRegistry) setMigration(ctx context.Context, txnCtx *txnContext, cfg client.LensConfig) error { key := core.NewSchemaVersionMigrationKey(cfg.SourceSchemaVersionID) json, err := json.Marshal(cfg) @@ -87,12 +177,12 @@ func (r *lensRegistry) SetMigration(ctx context.Context, txn datastore.Txn, cfg return err } - err = txn.Systemstore().Put(ctx, key.ToDS(), json) + err = txnCtx.txn.Systemstore().Put(ctx, key.ToDS(), json) if err != nil { return err } - err = r.cacheLens(txn, cfg) + err = r.cacheLens(txnCtx, cfg) if err != nil { return err } @@ -100,7 +190,7 @@ func (r *lensRegistry) SetMigration(ctx context.Context, txn datastore.Txn, cfg return nil } -func (r *lensRegistry) cacheLens(txn datastore.Txn, cfg client.LensConfig) error { +func (r *lensRegistry) cacheLens(txnCtx *txnContext, cfg client.LensConfig) error { inversedModuleCfgs := make([]model.LensModule, len(cfg.Lenses)) for i, moduleCfg := range cfg.Lenses { // Reverse the order of the lenses for the inverse migration. @@ -122,11 +212,11 @@ func (r *lensRegistry) cacheLens(txn datastore.Txn, cfg client.LensConfig) error }, } - err := r.cachePool(txn, r.lensPoolsBySchemaVersionID, cfg) + err := r.cachePool(txnCtx.txn, txnCtx.lensPoolsBySchemaVersionID, cfg) if err != nil { return err } - err = r.cachePool(txn, r.reversedPoolsBySchemaVersionID, reversedCfg) + err = r.cachePool(txnCtx.txn, txnCtx.reversedPoolsBySchemaVersionID, reversedCfg) // For now, checking this error is the best way of determining if a migration has an inverse. // Inverses are optional. //nolint:revive @@ -134,59 +224,30 @@ func (r *lensRegistry) cacheLens(txn datastore.Txn, cfg client.LensConfig) error return err } - // todo - handling txns like this means that the migrations are not available within the current - // transaction if used for stuff (e.g. GQL requests) before commit. - // https://github.com/sourcenetwork/defradb/issues/1592 - txn.OnSuccess(func() { - r.configs[cfg.SourceSchemaVersionID] = cfg - }) + txnCtx.configs[cfg.SourceSchemaVersionID] = cfg return nil } func (r *lensRegistry) cachePool(txn datastore.Txn, target map[string]*lensPool, cfg client.LensConfig) error { - pool, poolAlreadyExists := target[cfg.SourceSchemaVersionID] - if !poolAlreadyExists { - pool = r.newPool(r.poolSize, cfg) - } + pool := r.newPool(r.poolSize, cfg) - newLensPipes := make([]*lensPipe, r.poolSize) for i := 0; i < r.poolSize; i++ { - var err error - newLensPipes[i], err = r.newLensPipe(cfg) + lensPipe, err := r.newLensPipe(cfg) if err != nil { return err } + pool.returnLens(lensPipe) } - // todo - handling txns like this means that the migrations are not available within the current - // transaction if used for stuff (e.g. GQL requests) before commit. - // https://github.com/sourcenetwork/defradb/issues/1592 - txn.OnSuccess(func() { - if !poolAlreadyExists { - target[cfg.SourceSchemaVersionID] = pool - } - - drainLoop: - for { - select { - case <-pool.pipes: - default: - break drainLoop - } - } - - for _, lensPipe := range newLensPipes { - pool.returnLens(lensPipe) - } - }) + target[cfg.SourceSchemaVersionID] = pool return nil } -func (r *lensRegistry) ReloadLenses(ctx context.Context, txn datastore.Txn) error { +func (r *lensRegistry) reloadLenses(ctx context.Context, txnCtx *txnContext) error { prefix := core.NewSchemaVersionMigrationKey("") - q, err := txn.Systemstore().Query(ctx, query.Query{ + q, err := txnCtx.txn.Systemstore().Query(ctx, query.Query{ Prefix: prefix.ToString(), }) if err != nil { @@ -226,7 +287,7 @@ func (r *lensRegistry) ReloadLenses(ctx context.Context, txn datastore.Txn) erro return err } - err = r.cacheLens(txn, cfg) + err = r.cacheLens(txnCtx, cfg) if err != nil { err = q.Close() if err != nil { @@ -244,26 +305,29 @@ func (r *lensRegistry) ReloadLenses(ctx context.Context, txn datastore.Txn) erro return nil } -func (r *lensRegistry) MigrateUp( +func (r *lensRegistry) migrateUp( + txnCtx *txnContext, src enumerable.Enumerable[LensDoc], schemaVersionID string, ) (enumerable.Enumerable[LensDoc], error) { - return migrate(r.lensPoolsBySchemaVersionID, src, schemaVersionID) + return r.migrate(r.lensPoolsBySchemaVersionID, txnCtx.lensPoolsBySchemaVersionID, src, schemaVersionID) } -func (r *lensRegistry) MigrateDown( +func (r *lensRegistry) migrateDown( + txnCtx *txnContext, src enumerable.Enumerable[LensDoc], schemaVersionID string, ) (enumerable.Enumerable[LensDoc], error) { - return migrate(r.reversedPoolsBySchemaVersionID, src, schemaVersionID) + return r.migrate(r.reversedPoolsBySchemaVersionID, txnCtx.reversedPoolsBySchemaVersionID, src, schemaVersionID) } -func migrate( +func (r *lensRegistry) migrate( pools map[string]*lensPool, + txnPools map[string]*lensPool, src enumerable.Enumerable[LensDoc], schemaVersionID string, ) (enumerable.Enumerable[LensDoc], error) { - lensPool, ok := pools[schemaVersionID] + lensPool, ok := r.getPool(pools, txnPools, schemaVersionID) if !ok { // If there are no migrations for this schema version, just return the given source. return src, nil @@ -279,19 +343,48 @@ func migrate( return lens, nil } -func (r *lensRegistry) Config() []client.LensConfig { +func (r *lensRegistry) config(txnCtx *txnContext) []client.LensConfig { + configs := map[string]client.LensConfig{} + r.configLock.RLock() + for schemaVersionID, cfg := range r.configs { + configs[schemaVersionID] = cfg + } + r.configLock.RUnlock() + + // If within a txn actively writing to this registry overwrite + // values from the (commited) registry. + // Note: Config cannot be removed, only replaced at the moment. + for schemaVersionID, cfg := range txnCtx.configs { + configs[schemaVersionID] = cfg + } + result := []client.LensConfig{} - for _, cfg := range r.configs { + for _, cfg := range configs { result = append(result, cfg) } return result } -func (r *lensRegistry) HasMigration(schemaVersionID string) bool { - _, hasMigration := r.lensPoolsBySchemaVersionID[schemaVersionID] +func (r *lensRegistry) hasMigration(txnCtx *txnContext, schemaVersionID string) bool { + _, hasMigration := r.getPool(r.lensPoolsBySchemaVersionID, txnCtx.lensPoolsBySchemaVersionID, schemaVersionID) return hasMigration } +func (r *lensRegistry) getPool( + pools map[string]*lensPool, + txnPools map[string]*lensPool, + schemaVersionID string, +) (*lensPool, bool) { + if pool, ok := txnPools[schemaVersionID]; ok { + return pool, true + } + + r.poolLock.RLock() + pool, ok := pools[schemaVersionID] + r.poolLock.RUnlock() + return pool, ok +} + // lensPool provides a pool-like mechanic for caching a limited number of wasm lens modules in // a thread safe fashion. // diff --git a/lens/txn_registry.go b/lens/txn_registry.go new file mode 100644 index 0000000000..954db01e0c --- /dev/null +++ b/lens/txn_registry.go @@ -0,0 +1,163 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package lens + +import ( + "context" + + "github.com/sourcenetwork/immutable/enumerable" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/datastore" +) + +type implicitTxnLensRegistry struct { + registry *lensRegistry + db TxnSource +} + +type explicitTxnLensRegistry struct { + registry *lensRegistry + txn datastore.Txn +} + +var _ client.LensRegistry = (*implicitTxnLensRegistry)(nil) +var _ client.LensRegistry = (*explicitTxnLensRegistry)(nil) + +func (r *implicitTxnLensRegistry) WithTxn(txn datastore.Txn) client.LensRegistry { + return &explicitTxnLensRegistry{ + registry: r.registry, + txn: txn, + } +} + +func (r *explicitTxnLensRegistry) WithTxn(txn datastore.Txn) client.LensRegistry { + return &explicitTxnLensRegistry{ + registry: r.registry, + txn: txn, + } +} + +func (r *implicitTxnLensRegistry) SetMigration(ctx context.Context, cfg client.LensConfig) error { + txn, err := r.db.NewTxn(ctx, false) + if err != nil { + return err + } + defer txn.Discard(ctx) + txnCtx := r.registry.getCtx(txn, false) + + err = r.registry.setMigration(ctx, txnCtx, cfg) + if err != nil { + return err + } + + return txn.Commit(ctx) +} + +func (r *explicitTxnLensRegistry) SetMigration(ctx context.Context, cfg client.LensConfig) error { + return r.registry.setMigration(ctx, r.registry.getCtx(r.txn, false), cfg) +} + +func (r *implicitTxnLensRegistry) ReloadLenses(ctx context.Context) error { + txn, err := r.db.NewTxn(ctx, true) + if err != nil { + return err + } + defer txn.Discard(ctx) + txnCtx := r.registry.getCtx(txn, false) + + err = r.registry.reloadLenses(ctx, txnCtx) + if err != nil { + return err + } + + return txn.Commit(ctx) +} + +func (r *explicitTxnLensRegistry) ReloadLenses(ctx context.Context) error { + return r.registry.reloadLenses(ctx, r.registry.getCtx(r.txn, true)) +} + +func (r *implicitTxnLensRegistry) MigrateUp( + ctx context.Context, + src enumerable.Enumerable[LensDoc], + schemaVersionID string, +) (enumerable.Enumerable[map[string]any], error) { + txn, err := r.db.NewTxn(ctx, true) + if err != nil { + return nil, err + } + defer txn.Discard(ctx) + txnCtx := newTxnCtx(txn) + + return r.registry.migrateUp(txnCtx, src, schemaVersionID) +} + +func (r *explicitTxnLensRegistry) MigrateUp( + ctx context.Context, + src enumerable.Enumerable[LensDoc], + schemaVersionID string, +) (enumerable.Enumerable[map[string]any], error) { + return r.registry.migrateUp(r.registry.getCtx(r.txn, true), src, schemaVersionID) +} + +func (r *implicitTxnLensRegistry) MigrateDown( + ctx context.Context, + src enumerable.Enumerable[LensDoc], + schemaVersionID string, +) (enumerable.Enumerable[map[string]any], error) { + txn, err := r.db.NewTxn(ctx, true) + if err != nil { + return nil, err + } + defer txn.Discard(ctx) + txnCtx := newTxnCtx(txn) + + return r.registry.migrateDown(txnCtx, src, schemaVersionID) +} + +func (r *explicitTxnLensRegistry) MigrateDown( + ctx context.Context, + src enumerable.Enumerable[LensDoc], + schemaVersionID string, +) (enumerable.Enumerable[map[string]any], error) { + return r.registry.migrateDown(r.registry.getCtx(r.txn, true), src, schemaVersionID) +} + +func (r *implicitTxnLensRegistry) Config(ctx context.Context) ([]client.LensConfig, error) { + txn, err := r.db.NewTxn(ctx, true) + if err != nil { + return nil, err + } + defer txn.Discard(ctx) + txnCtx := newTxnCtx(txn) + + return r.registry.config(txnCtx), nil +} + +func (r *explicitTxnLensRegistry) Config(ctx context.Context) ([]client.LensConfig, error) { + return r.registry.config(r.registry.getCtx(r.txn, true)), nil +} + +func (r *implicitTxnLensRegistry) HasMigration(ctx context.Context, schemaVersionID string) (bool, error) { + txn, err := r.db.NewTxn(ctx, true) + if err != nil { + return false, err + } + defer txn.Discard(ctx) + txnCtx := newTxnCtx(txn) + + return r.registry.hasMigration(txnCtx, schemaVersionID), nil +} + +func (r *explicitTxnLensRegistry) HasMigration(ctx context.Context, schemaVersionID string) (bool, error) { + return r.registry.hasMigration(r.registry.getCtx(r.txn, true), schemaVersionID), nil +} diff --git a/tests/bench/query/planner/utils.go b/tests/bench/query/planner/utils.go index 148347aa2f..2f70245b23 100644 --- a/tests/bench/query/planner/utils.go +++ b/tests/bench/query/planner/utils.go @@ -133,3 +133,5 @@ func (*dummyTxn) Commit(ctx context.Context) error { return nil } func (*dummyTxn) Discard(ctx context.Context) {} func (*dummyTxn) OnSuccess(fn func()) {} func (*dummyTxn) OnError(fn func()) {} +func (*dummyTxn) OnDiscard(fn func()) {} +func (*dummyTxn) ID() uint64 { return 0 } diff --git a/tests/integration/lens.go b/tests/integration/lens.go index dbdb4c1c70..2959867c1a 100644 --- a/tests/integration/lens.go +++ b/tests/integration/lens.go @@ -13,6 +13,7 @@ package tests import ( "github.com/sourcenetwork/immutable" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/sourcenetwork/defradb/client" ) @@ -72,7 +73,8 @@ func getMigrations( for _, node := range getNodes(action.NodeID, s.nodes) { db := getStore(s, node.DB, action.TransactionID, "") - configs := db.LensRegistry().Config() + configs, err := db.LensRegistry().Config(s.ctx) + require.NoError(s.t, err) // The order of the results is not deterministic, so do not assert on the element // locations. diff --git a/tests/integration/schema/migrations/query/with_txn_test.go b/tests/integration/schema/migrations/query/with_txn_test.go index 059af4d461..3c55fd7748 100644 --- a/tests/integration/schema/migrations/query/with_txn_test.go +++ b/tests/integration/schema/migrations/query/with_txn_test.go @@ -21,8 +21,6 @@ import ( "github.com/sourcenetwork/defradb/tests/lenses" ) -// todo: This test documents unwanted behaviour and should be fixed with -// https://github.com/sourcenetwork/defradb/issues/1592 func TestSchemaMigrationQueryWithTxn(t *testing.T) { test := testUtils.TestCase{ Description: "Test schema migration, with transaction", @@ -74,10 +72,8 @@ func TestSchemaMigrationQueryWithTxn(t *testing.T) { }`, Results: []map[string]any{ { - "name": "John", - // This is the bug - although the request and migration are on the same transaction - // the migration is not picked up during the request. - "verified": nil, + "name": "John", + "verified": true, }, }, }, diff --git a/tests/integration/schema/migrations/with_txn_test.go b/tests/integration/schema/migrations/with_txn_test.go index f8eb5b5611..827f40de5e 100644 --- a/tests/integration/schema/migrations/with_txn_test.go +++ b/tests/integration/schema/migrations/with_txn_test.go @@ -21,8 +21,6 @@ import ( "github.com/sourcenetwork/defradb/tests/lenses" ) -// todo: This test documents unwanted behaviour and should be fixed with -// https://github.com/sourcenetwork/defradb/issues/1592 func TestSchemaMigrationGetMigrationsWithTxn(t *testing.T) { test := testUtils.TestCase{ Description: "Test schema migration, with txn", @@ -49,7 +47,23 @@ func TestSchemaMigrationGetMigrationsWithTxn(t *testing.T) { TransactionID: immutable.Some(0), // This is the bug - although the GetMigrations call and migration are on the same transaction // the migration is not returned in the results. - ExpectedResults: []client.LensConfig{}, + ExpectedResults: []client.LensConfig{ + { + SourceSchemaVersionID: "does not exist", + DestinationSchemaVersionID: "also does not exist", + Lens: model.Lens{ + Lenses: []model.LensModule{ + { + Path: lenses.SetDefaultModulePath, + Arguments: map[string]any{ + "dst": "verified", + "value": false, + }, + }, + }, + }, + }, + }, }, }, }