diff --git a/api/http/handlerfuncs.go b/api/http/handlerfuncs.go index 9e5b212fe3..bd40e76b22 100644 --- a/api/http/handlerfuncs.go +++ b/api/http/handlerfuncs.go @@ -311,7 +311,7 @@ func setMigrationHandler(rw http.ResponseWriter, req *http.Request) { return } - err = db.LensRegistry().SetMigration(req.Context(), txn, cfg) + err = db.LensRegistry().SetMigration(req.Context(), cfg) if err != nil { handleErr(req.Context(), rw, err, http.StatusInternalServerError) return @@ -338,7 +338,7 @@ func getMigrationHandler(rw http.ResponseWriter, req *http.Request) { return } - cfgs := db.LensRegistry().Config() + cfgs, err := db.LensRegistry().Config(req.Context()) if err != nil { handleErr(req.Context(), rw, err, http.StatusInternalServerError) return diff --git a/client/lens.go b/client/lens.go index 1cffa19248..7b1264275f 100644 --- a/client/lens.go +++ b/client/lens.go @@ -43,6 +43,12 @@ type LensConfig struct { // LensRegistry exposes several useful thread-safe migration related functions which may // be used to manage migrations. type LensRegistry interface { + // WithTxn returns a new LensRegistry scoped to the given transaction. + // + // WARNING: Currently this does not provide snapshot isolation, if other transactions are commited + // after this has been created, the results of those commits will be visible within this scope. + WithTxn(datastore.Txn) LensRegistry + // SetMigration sets the migration for the given source-destination schema version IDs. Is equivilent to // calling `Store.SetMigration(ctx, cfg)`. // @@ -55,29 +61,37 @@ type LensRegistry interface { // // Migrations will only run if there is a complete path from the document schema version to the latest local // schema version. - SetMigration(context.Context, datastore.Txn, LensConfig) error + SetMigration(context.Context, LensConfig) error // ReloadLenses clears any cached migrations, loads their configurations from the database and re-initializes // them. It is run on database start if the database already existed. - ReloadLenses(ctx context.Context, txn datastore.Txn) error + ReloadLenses(context.Context) error // MigrateUp returns an enumerable that feeds the given source through the Lens migration for the given // schema version id if one is found, if there is no matching migration the given source will be returned. - MigrateUp(enumerable.Enumerable[map[string]any], string) (enumerable.Enumerable[map[string]any], error) + MigrateUp( + context.Context, + enumerable.Enumerable[map[string]any], + string, + ) (enumerable.Enumerable[map[string]any], error) // MigrateDown returns an enumerable that feeds the given source through the Lens migration for the schema // version that precedes the given schema version id in reverse, if one is found, if there is no matching // migration the given source will be returned. // // This downgrades any documents in the source enumerable if/when enumerated. - MigrateDown(enumerable.Enumerable[map[string]any], string) (enumerable.Enumerable[map[string]any], error) + MigrateDown( + context.Context, + enumerable.Enumerable[map[string]any], + string, + ) (enumerable.Enumerable[map[string]any], error) // Config returns a slice of the configurations of the currently loaded migrations. // // Modifying the slice does not affect the loaded configurations. - Config() []LensConfig + Config(context.Context) ([]LensConfig, error) // HasMigration returns true if there is a migration registered for the given schema version id, otherwise // will return false. - HasMigration(string) bool + HasMigration(context.Context, string) (bool, error) } diff --git a/db/db.go b/db/db.go index 3d6a65afa4..b9ea5ac2a3 100644 --- a/db/db.go +++ b/db/db.go @@ -146,7 +146,7 @@ func newDB(ctx context.Context, rootstore datastore.RootStore, options ...Option // lensPoolSize may be set by `options`, and because they are funcs on db // we have to mutate `db` here to set the registry. - db.lensRegistry = lens.NewRegistry(db.lensPoolSize) + db.lensRegistry = lens.NewRegistry(db.lensPoolSize, db) err = db.initialize(ctx) if err != nil { @@ -172,8 +172,9 @@ func (db *db) NewConcurrentTxn(ctx context.Context, readonly bool) (datastore.Tx // WithTxn returns a new [client.Store] that respects the given transaction. func (db *db) WithTxn(txn datastore.Txn) client.Store { return &explicitTxnDB{ - db: db, - txn: txn, + db: db, + txn: txn, + lensRegistry: db.lensRegistry.WithTxn(txn), } } @@ -221,7 +222,7 @@ func (db *db) initialize(ctx context.Context) error { return err } - err = db.lensRegistry.ReloadLenses(ctx, txn) + err = db.lensRegistry.ReloadLenses(ctx) if err != nil { return err } diff --git a/db/txn_db.go b/db/txn_db.go index a7096a46a7..b307d96e35 100644 --- a/db/txn_db.go +++ b/db/txn_db.go @@ -28,7 +28,8 @@ type implicitTxnDB struct { type explicitTxnDB struct { *db - txn datastore.Txn + txn datastore.Txn + lensRegistry client.LensRegistry } // ExecRequest executes a request against the database. @@ -286,7 +287,7 @@ func (db *implicitTxnDB) SetMigration(ctx context.Context, cfg client.LensConfig } defer txn.Discard(ctx) - err = db.lensRegistry.SetMigration(ctx, txn, cfg) + err = db.lensRegistry.SetMigration(ctx, cfg) if err != nil { return err } @@ -295,7 +296,7 @@ func (db *implicitTxnDB) SetMigration(ctx context.Context, cfg client.LensConfig } func (db *explicitTxnDB) SetMigration(ctx context.Context, cfg client.LensConfig) error { - return db.lensRegistry.SetMigration(ctx, db.txn, cfg) + return db.lensRegistry.SetMigration(ctx, cfg) } // SetReplicator adds a new replicator to the database. @@ -417,3 +418,10 @@ func (db *implicitTxnDB) BasicExport(ctx context.Context, config *client.BackupC func (db *explicitTxnDB) BasicExport(ctx context.Context, config *client.BackupConfig) error { return db.basicExport(ctx, db.txn, config) } + +// LensRegistry returns the LensRegistry in use by this database instance. +// +// It exposes several useful thread-safe migration related functions. +func (db *explicitTxnDB) LensRegistry() client.LensRegistry { + return db.lensRegistry +} diff --git a/lens/fetcher.go b/lens/fetcher.go index bfd8fca3bc..7951f872eb 100644 --- a/lens/fetcher.go +++ b/lens/fetcher.go @@ -76,15 +76,25 @@ func (f *lensedFetcher) Init( f.fieldDescriptionsByName[field.Name] = field } - history, err := getTargetedSchemaHistory(ctx, txn, f.registry.Config(), f.col.Schema.SchemaID, f.col.Schema.VersionID) + cfg, err := f.registry.Config(ctx) if err != nil { return err } - f.lens = new(f.registry, f.col.Schema.VersionID, history) + + history, err := getTargetedSchemaHistory(ctx, txn, cfg, f.col.Schema.SchemaID, f.col.Schema.VersionID) + if err != nil { + return err + } + f.lens = new(ctx, f.registry, f.col.Schema.VersionID, history) f.txn = txn for schemaVersionID := range history { - if f.registry.HasMigration(schemaVersionID) { + hasMigration, err := f.registry.HasMigration(ctx, schemaVersionID) + if err != nil { + return err + } + + if hasMigration { f.hasMigrations = true break } diff --git a/lens/lens.go b/lens/lens.go index e4d4895eea..86fcb0876f 100644 --- a/lens/lens.go +++ b/lens/lens.go @@ -11,6 +11,8 @@ package lens import ( + "context" + "github.com/sourcenetwork/immutable/enumerable" "github.com/sourcenetwork/defradb/client" @@ -42,6 +44,8 @@ type Lens interface { type lens struct { lensRegistry client.LensRegistry + ctx context.Context + // The primary access points to the lens pipes through which all things flow. lensPipesBySchemaVersionIDs map[schemaVersionID]enumerable.Concatenation[LensDoc] @@ -60,6 +64,7 @@ type lens struct { var _ Lens = (*lens)(nil) func new( + ctx context.Context, lensRegistry client.LensRegistry, targetSchemaVersionID schemaVersionID, schemaVersionHistory map[schemaVersionID]*targetedSchemaHistoryLink, @@ -69,6 +74,7 @@ func new( return &lens{ lensRegistry: lensRegistry, + ctx: ctx, source: enumerable.NewQueue[lensInput](), outputPipe: outputPipe, unknownVersionPipe: targetSource, @@ -175,7 +181,7 @@ func (l *lens) Next() (bool, error) { // Aquire a lens migration from the registery, using the junctionPipe as its source. // The new pipeHead will then be connected as a source to the next migration-stage on // the next loop. - pipeHead, err = l.lensRegistry.MigrateUp(junctionPipe, historyLocation.schemaVersionID) + pipeHead, err = l.lensRegistry.MigrateUp(l.ctx, junctionPipe, historyLocation.schemaVersionID) if err != nil { return false, err } @@ -185,7 +191,7 @@ func (l *lens) Next() (bool, error) { // Aquire a lens migration from the registery, using the junctionPipe as its source. // The new pipeHead will then be connected as a source to the next migration-stage on // the next loop. - pipeHead, err = l.lensRegistry.MigrateDown(junctionPipe, historyLocation.previous.Value().schemaVersionID) + pipeHead, err = l.lensRegistry.MigrateDown(l.ctx, junctionPipe, historyLocation.previous.Value().schemaVersionID) if err != nil { return false, err } diff --git a/lens/registry.go b/lens/registry.go index 6ea21433c4..a4074ca7f4 100644 --- a/lens/registry.go +++ b/lens/registry.go @@ -48,12 +48,43 @@ type lensRegistry struct { lensPoolsBySchemaVersionID map[string]*lensPool reversedPoolsBySchemaVersionID map[string]*lensPool + poolLock sync.RWMutex // lens configurations by source schema version ID - configs map[string]client.LensConfig + configs map[string]client.LensConfig + configLock sync.RWMutex + + // Writable transaction contexts by transaction ID. + // + // Read-only transaction contexts are not tracked. + txnCtxs map[uint64]*txnContext + txnLock sync.RWMutex +} + +// txnContext contains uncommitted transaction state tracked by the registry, +// stuff within here should be accessible from within this transaction but not +// from outside. +type txnContext struct { + txn datastore.Txn + lensPoolsBySchemaVersionID map[string]*lensPool + reversedPoolsBySchemaVersionID map[string]*lensPool + configs map[string]client.LensConfig +} + +func newTxnCtx(txn datastore.Txn) *txnContext { + return &txnContext{ + txn: txn, + lensPoolsBySchemaVersionID: map[string]*lensPool{}, + reversedPoolsBySchemaVersionID: map[string]*lensPool{}, + configs: map[string]client.LensConfig{}, + } } -var _ client.LensRegistry = (*lensRegistry)(nil) +// TxnSource represents an object capable of constructing the transactions that +// implicit-transaction registries need internally. +type TxnSource interface { + NewTxn(context.Context, bool) (datastore.Txn, error) +} // DefaultPoolSize is the default size of the lens pool for each schema version. const DefaultPoolSize int = 5 @@ -61,7 +92,7 @@ const DefaultPoolSize int = 5 // NewRegistry instantiates a new registery. // // It will be of size 5 (per schema version) if a size is not provided. -func NewRegistry(lensPoolSize immutable.Option[int]) *lensRegistry { +func NewRegistry(lensPoolSize immutable.Option[int], db TxnSource) client.LensRegistry { var size int if lensPoolSize.HasValue() { size = lensPoolSize.Value() @@ -69,17 +100,76 @@ func NewRegistry(lensPoolSize immutable.Option[int]) *lensRegistry { size = DefaultPoolSize } - return &lensRegistry{ - poolSize: size, - runtime: wazero.New(), - modulesByPath: map[string]module.Module{}, - lensPoolsBySchemaVersionID: map[string]*lensPool{}, - reversedPoolsBySchemaVersionID: map[string]*lensPool{}, - configs: map[string]client.LensConfig{}, + return &implicitTxnLensRegistry{ + db: db, + registry: &lensRegistry{ + poolSize: size, + runtime: wazero.New(), + modulesByPath: map[string]module.Module{}, + lensPoolsBySchemaVersionID: map[string]*lensPool{}, + reversedPoolsBySchemaVersionID: map[string]*lensPool{}, + configs: map[string]client.LensConfig{}, + txnCtxs: map[uint64]*txnContext{}, + }, } } -func (r *lensRegistry) SetMigration(ctx context.Context, txn datastore.Txn, cfg client.LensConfig) error { +func (r *lensRegistry) getCtx(txn datastore.Txn, readonly bool) *txnContext { + r.txnLock.RLock() + if txnCtx, ok := r.txnCtxs[txn.ID()]; ok { + r.txnLock.RUnlock() + return txnCtx + } + r.txnLock.RUnlock() + + txnCtx := newTxnCtx(txn) + if readonly { + return txnCtx + } + + r.txnLock.Lock() + r.txnCtxs[txn.ID()] = txnCtx + r.txnLock.Unlock() + + txnCtx.txn.OnSuccess(func() { + r.poolLock.Lock() + for schemaVersionID, locker := range txnCtx.lensPoolsBySchemaVersionID { + r.lensPoolsBySchemaVersionID[schemaVersionID] = locker + } + for schemaVersionID, locker := range txnCtx.reversedPoolsBySchemaVersionID { + r.reversedPoolsBySchemaVersionID[schemaVersionID] = locker + } + r.poolLock.Unlock() + + r.configLock.Lock() + for schemaVersionID, cfg := range txnCtx.configs { + r.configs[schemaVersionID] = cfg + } + r.configLock.Unlock() + + r.txnLock.Lock() + delete(r.txnCtxs, txn.ID()) + r.txnLock.Unlock() + }) + + txn.OnError(func() { + r.txnLock.Lock() + delete(r.txnCtxs, txn.ID()) + r.txnLock.Unlock() + }) + + txn.OnDiscard(func() { + // Delete it to help reduce the build up of memory, the txnCtx will be re-contructed if the + // txn is reused after discard. + r.txnLock.Lock() + delete(r.txnCtxs, txn.ID()) + r.txnLock.Unlock() + }) + + return txnCtx +} + +func (r *lensRegistry) setMigration(ctx context.Context, txnCtx *txnContext, cfg client.LensConfig) error { key := core.NewSchemaVersionMigrationKey(cfg.SourceSchemaVersionID) json, err := json.Marshal(cfg) @@ -87,12 +177,12 @@ func (r *lensRegistry) SetMigration(ctx context.Context, txn datastore.Txn, cfg return err } - err = txn.Systemstore().Put(ctx, key.ToDS(), json) + err = txnCtx.txn.Systemstore().Put(ctx, key.ToDS(), json) if err != nil { return err } - err = r.cacheLens(txn, cfg) + err = r.cacheLens(txnCtx, cfg) if err != nil { return err } @@ -100,7 +190,7 @@ func (r *lensRegistry) SetMigration(ctx context.Context, txn datastore.Txn, cfg return nil } -func (r *lensRegistry) cacheLens(txn datastore.Txn, cfg client.LensConfig) error { +func (r *lensRegistry) cacheLens(txnCtx *txnContext, cfg client.LensConfig) error { inversedModuleCfgs := make([]model.LensModule, len(cfg.Lenses)) for i, moduleCfg := range cfg.Lenses { // Reverse the order of the lenses for the inverse migration. @@ -122,11 +212,11 @@ func (r *lensRegistry) cacheLens(txn datastore.Txn, cfg client.LensConfig) error }, } - err := r.cachePool(txn, r.lensPoolsBySchemaVersionID, cfg) + err := r.cachePool(txnCtx.txn, txnCtx.lensPoolsBySchemaVersionID, cfg) if err != nil { return err } - err = r.cachePool(txn, r.reversedPoolsBySchemaVersionID, reversedCfg) + err = r.cachePool(txnCtx.txn, txnCtx.reversedPoolsBySchemaVersionID, reversedCfg) // For now, checking this error is the best way of determining if a migration has an inverse. // Inverses are optional. //nolint:revive @@ -134,59 +224,30 @@ func (r *lensRegistry) cacheLens(txn datastore.Txn, cfg client.LensConfig) error return err } - // todo - handling txns like this means that the migrations are not available within the current - // transaction if used for stuff (e.g. GQL requests) before commit. - // https://github.com/sourcenetwork/defradb/issues/1592 - txn.OnSuccess(func() { - r.configs[cfg.SourceSchemaVersionID] = cfg - }) + txnCtx.configs[cfg.SourceSchemaVersionID] = cfg return nil } func (r *lensRegistry) cachePool(txn datastore.Txn, target map[string]*lensPool, cfg client.LensConfig) error { - pool, poolAlreadyExists := target[cfg.SourceSchemaVersionID] - if !poolAlreadyExists { - pool = r.newPool(r.poolSize, cfg) - } + pool := r.newPool(r.poolSize, cfg) - newLensPipes := make([]*lensPipe, r.poolSize) for i := 0; i < r.poolSize; i++ { - var err error - newLensPipes[i], err = r.newLensPipe(cfg) + lensPipe, err := r.newLensPipe(cfg) if err != nil { return err } + pool.returnLens(lensPipe) } - // todo - handling txns like this means that the migrations are not available within the current - // transaction if used for stuff (e.g. GQL requests) before commit. - // https://github.com/sourcenetwork/defradb/issues/1592 - txn.OnSuccess(func() { - if !poolAlreadyExists { - target[cfg.SourceSchemaVersionID] = pool - } - - drainLoop: - for { - select { - case <-pool.pipes: - default: - break drainLoop - } - } - - for _, lensPipe := range newLensPipes { - pool.returnLens(lensPipe) - } - }) + target[cfg.SourceSchemaVersionID] = pool return nil } -func (r *lensRegistry) ReloadLenses(ctx context.Context, txn datastore.Txn) error { +func (r *lensRegistry) reloadLenses(ctx context.Context, txnCtx *txnContext) error { prefix := core.NewSchemaVersionMigrationKey("") - q, err := txn.Systemstore().Query(ctx, query.Query{ + q, err := txnCtx.txn.Systemstore().Query(ctx, query.Query{ Prefix: prefix.ToString(), }) if err != nil { @@ -226,7 +287,7 @@ func (r *lensRegistry) ReloadLenses(ctx context.Context, txn datastore.Txn) erro return err } - err = r.cacheLens(txn, cfg) + err = r.cacheLens(txnCtx, cfg) if err != nil { err = q.Close() if err != nil { @@ -244,26 +305,29 @@ func (r *lensRegistry) ReloadLenses(ctx context.Context, txn datastore.Txn) erro return nil } -func (r *lensRegistry) MigrateUp( +func (r *lensRegistry) migrateUp( + txnCtx *txnContext, src enumerable.Enumerable[LensDoc], schemaVersionID string, ) (enumerable.Enumerable[LensDoc], error) { - return migrate(r.lensPoolsBySchemaVersionID, src, schemaVersionID) + return r.migrate(r.lensPoolsBySchemaVersionID, txnCtx.lensPoolsBySchemaVersionID, src, schemaVersionID) } -func (r *lensRegistry) MigrateDown( +func (r *lensRegistry) migrateDown( + txnCtx *txnContext, src enumerable.Enumerable[LensDoc], schemaVersionID string, ) (enumerable.Enumerable[LensDoc], error) { - return migrate(r.reversedPoolsBySchemaVersionID, src, schemaVersionID) + return r.migrate(r.reversedPoolsBySchemaVersionID, txnCtx.reversedPoolsBySchemaVersionID, src, schemaVersionID) } -func migrate( +func (r *lensRegistry) migrate( pools map[string]*lensPool, + txnPools map[string]*lensPool, src enumerable.Enumerable[LensDoc], schemaVersionID string, ) (enumerable.Enumerable[LensDoc], error) { - lensPool, ok := pools[schemaVersionID] + lensPool, ok := r.getPool(pools, txnPools, schemaVersionID) if !ok { // If there are no migrations for this schema version, just return the given source. return src, nil @@ -279,19 +343,48 @@ func migrate( return lens, nil } -func (r *lensRegistry) Config() []client.LensConfig { +func (r *lensRegistry) config(txnCtx *txnContext) []client.LensConfig { + configs := map[string]client.LensConfig{} + r.configLock.RLock() + for schemaVersionID, cfg := range r.configs { + configs[schemaVersionID] = cfg + } + r.configLock.RUnlock() + + // If within a txn actively writing to this registry overwrite + // values from the (commited) registry. + // Note: Config cannot be removed, only replaced at the moment. + for schemaVersionID, cfg := range txnCtx.configs { + configs[schemaVersionID] = cfg + } + result := []client.LensConfig{} - for _, cfg := range r.configs { + for _, cfg := range configs { result = append(result, cfg) } return result } -func (r *lensRegistry) HasMigration(schemaVersionID string) bool { - _, hasMigration := r.lensPoolsBySchemaVersionID[schemaVersionID] +func (r *lensRegistry) hasMigration(txnCtx *txnContext, schemaVersionID string) bool { + _, hasMigration := r.getPool(r.lensPoolsBySchemaVersionID, txnCtx.lensPoolsBySchemaVersionID, schemaVersionID) return hasMigration } +func (r *lensRegistry) getPool( + pools map[string]*lensPool, + txnPools map[string]*lensPool, + schemaVersionID string, +) (*lensPool, bool) { + if pool, ok := txnPools[schemaVersionID]; ok { + return pool, true + } + + r.poolLock.RLock() + pool, ok := pools[schemaVersionID] + r.poolLock.RUnlock() + return pool, ok +} + // lensPool provides a pool-like mechanic for caching a limited number of wasm lens modules in // a thread safe fashion. // diff --git a/lens/txn_registry.go b/lens/txn_registry.go new file mode 100644 index 0000000000..954db01e0c --- /dev/null +++ b/lens/txn_registry.go @@ -0,0 +1,163 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package lens + +import ( + "context" + + "github.com/sourcenetwork/immutable/enumerable" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/datastore" +) + +type implicitTxnLensRegistry struct { + registry *lensRegistry + db TxnSource +} + +type explicitTxnLensRegistry struct { + registry *lensRegistry + txn datastore.Txn +} + +var _ client.LensRegistry = (*implicitTxnLensRegistry)(nil) +var _ client.LensRegistry = (*explicitTxnLensRegistry)(nil) + +func (r *implicitTxnLensRegistry) WithTxn(txn datastore.Txn) client.LensRegistry { + return &explicitTxnLensRegistry{ + registry: r.registry, + txn: txn, + } +} + +func (r *explicitTxnLensRegistry) WithTxn(txn datastore.Txn) client.LensRegistry { + return &explicitTxnLensRegistry{ + registry: r.registry, + txn: txn, + } +} + +func (r *implicitTxnLensRegistry) SetMigration(ctx context.Context, cfg client.LensConfig) error { + txn, err := r.db.NewTxn(ctx, false) + if err != nil { + return err + } + defer txn.Discard(ctx) + txnCtx := r.registry.getCtx(txn, false) + + err = r.registry.setMigration(ctx, txnCtx, cfg) + if err != nil { + return err + } + + return txn.Commit(ctx) +} + +func (r *explicitTxnLensRegistry) SetMigration(ctx context.Context, cfg client.LensConfig) error { + return r.registry.setMigration(ctx, r.registry.getCtx(r.txn, false), cfg) +} + +func (r *implicitTxnLensRegistry) ReloadLenses(ctx context.Context) error { + txn, err := r.db.NewTxn(ctx, true) + if err != nil { + return err + } + defer txn.Discard(ctx) + txnCtx := r.registry.getCtx(txn, false) + + err = r.registry.reloadLenses(ctx, txnCtx) + if err != nil { + return err + } + + return txn.Commit(ctx) +} + +func (r *explicitTxnLensRegistry) ReloadLenses(ctx context.Context) error { + return r.registry.reloadLenses(ctx, r.registry.getCtx(r.txn, true)) +} + +func (r *implicitTxnLensRegistry) MigrateUp( + ctx context.Context, + src enumerable.Enumerable[LensDoc], + schemaVersionID string, +) (enumerable.Enumerable[map[string]any], error) { + txn, err := r.db.NewTxn(ctx, true) + if err != nil { + return nil, err + } + defer txn.Discard(ctx) + txnCtx := newTxnCtx(txn) + + return r.registry.migrateUp(txnCtx, src, schemaVersionID) +} + +func (r *explicitTxnLensRegistry) MigrateUp( + ctx context.Context, + src enumerable.Enumerable[LensDoc], + schemaVersionID string, +) (enumerable.Enumerable[map[string]any], error) { + return r.registry.migrateUp(r.registry.getCtx(r.txn, true), src, schemaVersionID) +} + +func (r *implicitTxnLensRegistry) MigrateDown( + ctx context.Context, + src enumerable.Enumerable[LensDoc], + schemaVersionID string, +) (enumerable.Enumerable[map[string]any], error) { + txn, err := r.db.NewTxn(ctx, true) + if err != nil { + return nil, err + } + defer txn.Discard(ctx) + txnCtx := newTxnCtx(txn) + + return r.registry.migrateDown(txnCtx, src, schemaVersionID) +} + +func (r *explicitTxnLensRegistry) MigrateDown( + ctx context.Context, + src enumerable.Enumerable[LensDoc], + schemaVersionID string, +) (enumerable.Enumerable[map[string]any], error) { + return r.registry.migrateDown(r.registry.getCtx(r.txn, true), src, schemaVersionID) +} + +func (r *implicitTxnLensRegistry) Config(ctx context.Context) ([]client.LensConfig, error) { + txn, err := r.db.NewTxn(ctx, true) + if err != nil { + return nil, err + } + defer txn.Discard(ctx) + txnCtx := newTxnCtx(txn) + + return r.registry.config(txnCtx), nil +} + +func (r *explicitTxnLensRegistry) Config(ctx context.Context) ([]client.LensConfig, error) { + return r.registry.config(r.registry.getCtx(r.txn, true)), nil +} + +func (r *implicitTxnLensRegistry) HasMigration(ctx context.Context, schemaVersionID string) (bool, error) { + txn, err := r.db.NewTxn(ctx, true) + if err != nil { + return false, err + } + defer txn.Discard(ctx) + txnCtx := newTxnCtx(txn) + + return r.registry.hasMigration(txnCtx, schemaVersionID), nil +} + +func (r *explicitTxnLensRegistry) HasMigration(ctx context.Context, schemaVersionID string) (bool, error) { + return r.registry.hasMigration(r.registry.getCtx(r.txn, true), schemaVersionID), nil +} diff --git a/tests/integration/lens.go b/tests/integration/lens.go index dbdb4c1c70..2959867c1a 100644 --- a/tests/integration/lens.go +++ b/tests/integration/lens.go @@ -13,6 +13,7 @@ package tests import ( "github.com/sourcenetwork/immutable" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/sourcenetwork/defradb/client" ) @@ -72,7 +73,8 @@ func getMigrations( for _, node := range getNodes(action.NodeID, s.nodes) { db := getStore(s, node.DB, action.TransactionID, "") - configs := db.LensRegistry().Config() + configs, err := db.LensRegistry().Config(s.ctx) + require.NoError(s.t, err) // The order of the results is not deterministic, so do not assert on the element // locations. diff --git a/tests/integration/schema/migrations/query/with_txn_test.go b/tests/integration/schema/migrations/query/with_txn_test.go index 059af4d461..3c55fd7748 100644 --- a/tests/integration/schema/migrations/query/with_txn_test.go +++ b/tests/integration/schema/migrations/query/with_txn_test.go @@ -21,8 +21,6 @@ import ( "github.com/sourcenetwork/defradb/tests/lenses" ) -// todo: This test documents unwanted behaviour and should be fixed with -// https://github.com/sourcenetwork/defradb/issues/1592 func TestSchemaMigrationQueryWithTxn(t *testing.T) { test := testUtils.TestCase{ Description: "Test schema migration, with transaction", @@ -74,10 +72,8 @@ func TestSchemaMigrationQueryWithTxn(t *testing.T) { }`, Results: []map[string]any{ { - "name": "John", - // This is the bug - although the request and migration are on the same transaction - // the migration is not picked up during the request. - "verified": nil, + "name": "John", + "verified": true, }, }, }, diff --git a/tests/integration/schema/migrations/with_txn_test.go b/tests/integration/schema/migrations/with_txn_test.go index f8eb5b5611..827f40de5e 100644 --- a/tests/integration/schema/migrations/with_txn_test.go +++ b/tests/integration/schema/migrations/with_txn_test.go @@ -21,8 +21,6 @@ import ( "github.com/sourcenetwork/defradb/tests/lenses" ) -// todo: This test documents unwanted behaviour and should be fixed with -// https://github.com/sourcenetwork/defradb/issues/1592 func TestSchemaMigrationGetMigrationsWithTxn(t *testing.T) { test := testUtils.TestCase{ Description: "Test schema migration, with txn", @@ -49,7 +47,23 @@ func TestSchemaMigrationGetMigrationsWithTxn(t *testing.T) { TransactionID: immutable.Some(0), // This is the bug - although the GetMigrations call and migration are on the same transaction // the migration is not returned in the results. - ExpectedResults: []client.LensConfig{}, + ExpectedResults: []client.LensConfig{ + { + SourceSchemaVersionID: "does not exist", + DestinationSchemaVersionID: "also does not exist", + Lens: model.Lens{ + Lenses: []model.LensModule{ + { + Path: lenses.SetDefaultModulePath, + Arguments: map[string]any{ + "dst": "verified", + "value": false, + }, + }, + }, + }, + }, + }, }, }, }