Skip to content

Commit

Permalink
Handle migration transactionality
Browse files Browse the repository at this point in the history
This commit ended up doing quite a lot, as I figured out various intertwined issues, it does:
- Adds locks around the various registry properties, these maps can be accessed concurrently and need to be protected.
- Removes the transaction continuity issue in the client.LenRegistry interface, where db.LensRegistry() returns an object that does not respect the transactionality of the parent store, and takes `txn`s as input parameters to some of its functions. It does this by following the same pattern as `db.db`.
- Fixes the bugs in the lens package where migrations set were not visible/accessible until after commit.  They are now visible within the transaction scope.
  • Loading branch information
AndrewSisley committed Aug 1, 2023
1 parent a0b3faa commit cd32b08
Show file tree
Hide file tree
Showing 11 changed files with 400 additions and 93 deletions.
4 changes: 2 additions & 2 deletions api/http/handlerfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func setMigrationHandler(rw http.ResponseWriter, req *http.Request) {
return
}

err = db.LensRegistry().SetMigration(req.Context(), txn, cfg)
err = db.LensRegistry().SetMigration(req.Context(), cfg)
if err != nil {
handleErr(req.Context(), rw, err, http.StatusInternalServerError)
return
Expand All @@ -338,7 +338,7 @@ func getMigrationHandler(rw http.ResponseWriter, req *http.Request) {
return
}

cfgs := db.LensRegistry().Config()
cfgs, err := db.LensRegistry().Config(req.Context())
if err != nil {
handleErr(req.Context(), rw, err, http.StatusInternalServerError)
return
Expand Down
26 changes: 20 additions & 6 deletions client/lens.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ type LensConfig struct {
// LensRegistry exposes several useful thread-safe migration related functions which may
// be used to manage migrations.
type LensRegistry interface {
// WithTxn returns a new LensRegistry scoped to the given transaction.
//
// WARNING: Currently this does not provide snapshot isolation, if other transactions are commited
// after this has been created, the results of those commits will be visible within this scope.
WithTxn(datastore.Txn) LensRegistry

// SetMigration sets the migration for the given source-destination schema version IDs. Is equivilent to
// calling `Store.SetMigration(ctx, cfg)`.
//
Expand All @@ -55,29 +61,37 @@ type LensRegistry interface {
//
// Migrations will only run if there is a complete path from the document schema version to the latest local
// schema version.
SetMigration(context.Context, datastore.Txn, LensConfig) error
SetMigration(context.Context, LensConfig) error

// ReloadLenses clears any cached migrations, loads their configurations from the database and re-initializes
// them. It is run on database start if the database already existed.
ReloadLenses(ctx context.Context, txn datastore.Txn) error
ReloadLenses(context.Context) error

// MigrateUp returns an enumerable that feeds the given source through the Lens migration for the given
// schema version id if one is found, if there is no matching migration the given source will be returned.
MigrateUp(enumerable.Enumerable[map[string]any], string) (enumerable.Enumerable[map[string]any], error)
MigrateUp(
context.Context,
enumerable.Enumerable[map[string]any],
string,
) (enumerable.Enumerable[map[string]any], error)

// MigrateDown returns an enumerable that feeds the given source through the Lens migration for the schema
// version that precedes the given schema version id in reverse, if one is found, if there is no matching
// migration the given source will be returned.
//
// This downgrades any documents in the source enumerable if/when enumerated.
MigrateDown(enumerable.Enumerable[map[string]any], string) (enumerable.Enumerable[map[string]any], error)
MigrateDown(
context.Context,
enumerable.Enumerable[map[string]any],
string,
) (enumerable.Enumerable[map[string]any], error)

// Config returns a slice of the configurations of the currently loaded migrations.
//
// Modifying the slice does not affect the loaded configurations.
Config() []LensConfig
Config(context.Context) ([]LensConfig, error)

// HasMigration returns true if there is a migration registered for the given schema version id, otherwise
// will return false.
HasMigration(string) bool
HasMigration(context.Context, string) (bool, error)
}
9 changes: 5 additions & 4 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func newDB(ctx context.Context, rootstore datastore.RootStore, options ...Option

// lensPoolSize may be set by `options`, and because they are funcs on db
// we have to mutate `db` here to set the registry.
db.lensRegistry = lens.NewRegistry(db.lensPoolSize)
db.lensRegistry = lens.NewRegistry(db.lensPoolSize, db)

err = db.initialize(ctx)
if err != nil {
Expand All @@ -172,8 +172,9 @@ func (db *db) NewConcurrentTxn(ctx context.Context, readonly bool) (datastore.Tx
// WithTxn returns a new [client.Store] that respects the given transaction.
func (db *db) WithTxn(txn datastore.Txn) client.Store {
return &explicitTxnDB{
db: db,
txn: txn,
db: db,
txn: txn,
lensRegistry: db.lensRegistry.WithTxn(txn),
}
}

Expand Down Expand Up @@ -221,7 +222,7 @@ func (db *db) initialize(ctx context.Context) error {
return err
}

err = db.lensRegistry.ReloadLenses(ctx, txn)
err = db.lensRegistry.ReloadLenses(ctx)
if err != nil {
return err
}
Expand Down
14 changes: 11 additions & 3 deletions db/txn_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ type implicitTxnDB struct {

type explicitTxnDB struct {
*db
txn datastore.Txn
txn datastore.Txn
lensRegistry client.LensRegistry
}

// ExecRequest executes a request against the database.
Expand Down Expand Up @@ -286,7 +287,7 @@ func (db *implicitTxnDB) SetMigration(ctx context.Context, cfg client.LensConfig
}
defer txn.Discard(ctx)

err = db.lensRegistry.SetMigration(ctx, txn, cfg)
err = db.lensRegistry.SetMigration(ctx, cfg)
if err != nil {
return err
}
Expand All @@ -295,7 +296,7 @@ func (db *implicitTxnDB) SetMigration(ctx context.Context, cfg client.LensConfig
}

func (db *explicitTxnDB) SetMigration(ctx context.Context, cfg client.LensConfig) error {
return db.lensRegistry.SetMigration(ctx, db.txn, cfg)
return db.lensRegistry.SetMigration(ctx, cfg)
}

// SetReplicator adds a new replicator to the database.
Expand Down Expand Up @@ -417,3 +418,10 @@ func (db *implicitTxnDB) BasicExport(ctx context.Context, config *client.BackupC
func (db *explicitTxnDB) BasicExport(ctx context.Context, config *client.BackupConfig) error {
return db.basicExport(ctx, db.txn, config)
}

// LensRegistry returns the LensRegistry in use by this database instance.
//
// It exposes several useful thread-safe migration related functions.
func (db *explicitTxnDB) LensRegistry() client.LensRegistry {
return db.lensRegistry
}
16 changes: 13 additions & 3 deletions lens/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,25 @@ func (f *lensedFetcher) Init(
f.fieldDescriptionsByName[field.Name] = field
}

history, err := getTargetedSchemaHistory(ctx, txn, f.registry.Config(), f.col.Schema.SchemaID, f.col.Schema.VersionID)
cfg, err := f.registry.Config(ctx)
if err != nil {
return err
}
f.lens = new(f.registry, f.col.Schema.VersionID, history)

history, err := getTargetedSchemaHistory(ctx, txn, cfg, f.col.Schema.SchemaID, f.col.Schema.VersionID)
if err != nil {
return err
}
f.lens = new(ctx, f.registry, f.col.Schema.VersionID, history)
f.txn = txn

for schemaVersionID := range history {
if f.registry.HasMigration(schemaVersionID) {
hasMigration, err := f.registry.HasMigration(ctx, schemaVersionID)
if err != nil {
return err
}

if hasMigration {
f.hasMigrations = true
break
}
Expand Down
10 changes: 8 additions & 2 deletions lens/lens.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
package lens

import (
"context"

"github.com/sourcenetwork/immutable/enumerable"

"github.com/sourcenetwork/defradb/client"
Expand Down Expand Up @@ -42,6 +44,8 @@ type Lens interface {
type lens struct {
lensRegistry client.LensRegistry

ctx context.Context

// The primary access points to the lens pipes through which all things flow.
lensPipesBySchemaVersionIDs map[schemaVersionID]enumerable.Concatenation[LensDoc]

Expand All @@ -60,6 +64,7 @@ type lens struct {
var _ Lens = (*lens)(nil)

func new(
ctx context.Context,
lensRegistry client.LensRegistry,
targetSchemaVersionID schemaVersionID,
schemaVersionHistory map[schemaVersionID]*targetedSchemaHistoryLink,
Expand All @@ -69,6 +74,7 @@ func new(

return &lens{
lensRegistry: lensRegistry,
ctx: ctx,
source: enumerable.NewQueue[lensInput](),
outputPipe: outputPipe,
unknownVersionPipe: targetSource,
Expand Down Expand Up @@ -175,7 +181,7 @@ func (l *lens) Next() (bool, error) {
// Aquire a lens migration from the registery, using the junctionPipe as its source.
// The new pipeHead will then be connected as a source to the next migration-stage on
// the next loop.
pipeHead, err = l.lensRegistry.MigrateUp(junctionPipe, historyLocation.schemaVersionID)
pipeHead, err = l.lensRegistry.MigrateUp(l.ctx, junctionPipe, historyLocation.schemaVersionID)
if err != nil {
return false, err
}
Expand All @@ -185,7 +191,7 @@ func (l *lens) Next() (bool, error) {
// Aquire a lens migration from the registery, using the junctionPipe as its source.
// The new pipeHead will then be connected as a source to the next migration-stage on
// the next loop.
pipeHead, err = l.lensRegistry.MigrateDown(junctionPipe, historyLocation.previous.Value().schemaVersionID)
pipeHead, err = l.lensRegistry.MigrateDown(l.ctx, junctionPipe, historyLocation.previous.Value().schemaVersionID)
if err != nil {
return false, err
}
Expand Down
Loading

0 comments on commit cd32b08

Please sign in to comment.