Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Improve the way migrations handle transactions #1737

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 2 additions & 14 deletions api/http/handlerfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,26 +298,14 @@ func setMigrationHandler(rw http.ResponseWriter, req *http.Request) {
return
}

txn, err := db.NewTxn(req.Context(), false)
if err != nil {
handleErr(req.Context(), rw, err, http.StatusInternalServerError)
return
}

var cfg client.LensConfig
err = json.Unmarshal(cfgStr, &cfg)
if err != nil {
handleErr(req.Context(), rw, err, http.StatusInternalServerError)
return
}

err = db.LensRegistry().SetMigration(req.Context(), txn, cfg)
if err != nil {
handleErr(req.Context(), rw, err, http.StatusInternalServerError)
return
}

err = txn.Commit(req.Context())
err = db.LensRegistry().SetMigration(req.Context(), cfg)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: I think this is not setting the migration with the above txn. You probably need a call to db.WithTxn for the registry to be using it.

Copy link
Contributor Author

@AndrewSisley AndrewSisley Aug 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good spot! It will now be using an implicit txn, and the explicit one here can be deleted completely

  • Remove pointless txn object

if err != nil {
handleErr(req.Context(), rw, err, http.StatusInternalServerError)
return
Expand All @@ -338,7 +326,7 @@ func getMigrationHandler(rw http.ResponseWriter, req *http.Request) {
return
}

cfgs := db.LensRegistry().Config()
cfgs, err := db.LensRegistry().Config(req.Context())
if err != nil {
handleErr(req.Context(), rw, err, http.StatusInternalServerError)
return
Expand Down
26 changes: 20 additions & 6 deletions client/lens.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ type LensConfig struct {
// LensRegistry exposes several useful thread-safe migration related functions which may
// be used to manage migrations.
type LensRegistry interface {
// WithTxn returns a new LensRegistry scoped to the given transaction.
//
// WARNING: Currently this does not provide snapshot isolation, if other transactions are commited
// after this has been created, the results of those commits will be visible within this scope.
WithTxn(datastore.Txn) LensRegistry

// SetMigration sets the migration for the given source-destination schema version IDs. Is equivilent to
// calling `Store.SetMigration(ctx, cfg)`.
//
Expand All @@ -55,29 +61,37 @@ type LensRegistry interface {
//
// Migrations will only run if there is a complete path from the document schema version to the latest local
// schema version.
SetMigration(context.Context, datastore.Txn, LensConfig) error
SetMigration(context.Context, LensConfig) error

// ReloadLenses clears any cached migrations, loads their configurations from the database and re-initializes
// them. It is run on database start if the database already existed.
ReloadLenses(ctx context.Context, txn datastore.Txn) error
ReloadLenses(context.Context) error

// MigrateUp returns an enumerable that feeds the given source through the Lens migration for the given
// schema version id if one is found, if there is no matching migration the given source will be returned.
MigrateUp(enumerable.Enumerable[map[string]any], string) (enumerable.Enumerable[map[string]any], error)
MigrateUp(
context.Context,
enumerable.Enumerable[map[string]any],
string,
) (enumerable.Enumerable[map[string]any], error)

// MigrateDown returns an enumerable that feeds the given source through the Lens migration for the schema
// version that precedes the given schema version id in reverse, if one is found, if there is no matching
// migration the given source will be returned.
//
// This downgrades any documents in the source enumerable if/when enumerated.
MigrateDown(enumerable.Enumerable[map[string]any], string) (enumerable.Enumerable[map[string]any], error)
MigrateDown(
context.Context,
enumerable.Enumerable[map[string]any],
string,
) (enumerable.Enumerable[map[string]any], error)

// Config returns a slice of the configurations of the currently loaded migrations.
//
// Modifying the slice does not affect the loaded configurations.
Config() []LensConfig
Config(context.Context) ([]LensConfig, error)

// HasMigration returns true if there is a migration registered for the given schema version id, otherwise
// will return false.
HasMigration(string) bool
HasMigration(context.Context, string) (bool, error)
}
4 changes: 3 additions & 1 deletion datastore/concurrent_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type concurrentTxn struct {
}

// NewConcurrentTxnFrom creates a new Txn from rootstore that supports concurrent API calls
func NewConcurrentTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readonly bool) (Txn, error) {
func NewConcurrentTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uint64, readonly bool) (Txn, error) {
var rootTxn ds.Txn
var err error

Expand All @@ -54,6 +54,8 @@ func NewConcurrentTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readon
return &txn{
rootConcurentTxn,
multistore,
id,
[]func(){},
[]func(){},
[]func(){},
}, nil
Expand Down
8 changes: 4 additions & 4 deletions datastore/concurrent_txt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestNewConcurrentTxnFrom(t *testing.T) {
rootstore, err := badgerds.NewDatastore("", &opts)
require.NoError(t, err)

txn, err := NewConcurrentTxnFrom(ctx, rootstore, false)
txn, err := NewConcurrentTxnFrom(ctx, rootstore, 0, false)
require.NoError(t, err)

err = txn.Commit(ctx)
Expand All @@ -44,15 +44,15 @@ func TestNewConcurrentTxnFromWithStoreClosed(t *testing.T) {
err = rootstore.Close()
require.NoError(t, err)

_, err = NewConcurrentTxnFrom(ctx, rootstore, false)
_, err = NewConcurrentTxnFrom(ctx, rootstore, 0, false)
require.ErrorIs(t, err, badgerds.ErrClosed)
}

func TestNewConcurrentTxnFromNonIterable(t *testing.T) {
ctx := context.Background()
rootstore := memory.NewDatastore(ctx)

txn, err := NewConcurrentTxnFrom(ctx, rootstore, false)
txn, err := NewConcurrentTxnFrom(ctx, rootstore, 0, false)
require.NoError(t, err)

err = txn.Commit(ctx)
Expand All @@ -66,7 +66,7 @@ func TestNewConcurrentTxnFromNonIterableWithStoreClosed(t *testing.T) {
err := rootstore.Close()
require.NoError(t, err)

_, err = NewConcurrentTxnFrom(ctx, rootstore, false)
_, err = NewConcurrentTxnFrom(ctx, rootstore, 0, false)
require.ErrorIs(t, err, badgerds.ErrClosed)
}

Expand Down
74 changes: 74 additions & 0 deletions datastore/mocks/txn.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 32 additions & 8 deletions datastore/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
type Txn interface {
MultiStore

// ID returns the unique immutable identifier for this transaction.
ID() uint64

// Commit finalizes a transaction, attempting to commit it to the Datastore.
// May return an error if the transaction has gone stale. The presence of an
// error is an indication that the data was not committed to the Datastore.
Expand All @@ -32,22 +35,31 @@ type Txn interface {
// state of the Datastore, making it safe to defer.
Discard(ctx context.Context)

// OnSuccess registers a function to be called when the transaction is committed.
OnSuccess(fn func())

// OnError registers a function to be called when the transaction is rolled back.
OnError(fn func())

// OnDiscard registers a function to be called when the transaction is discarded.
OnDiscard(fn func())
}

type txn struct {
t ds.Txn
MultiStore

id uint64

successFns []func()
errorFns []func()
discardFns []func()
}

var _ Txn = (*txn)(nil)

// NewTxnFrom returns a new Txn from the rootstore.
func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readonly bool) (Txn, error) {
func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uint64, readonly bool) (Txn, error) {
// check if our datastore natively supports iterable transaction, transactions or batching
if iterableTxnStore, ok := rootstore.(iterable.IterableTxnDatastore); ok {
rootTxn, err := iterableTxnStore.NewIterableTransaction(ctx, readonly)
Expand All @@ -58,6 +70,8 @@ func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readonly bool) (
return &txn{
rootTxn,
multistore,
id,
[]func(){},
[]func(){},
[]func(){},
}, nil
Expand All @@ -73,24 +87,32 @@ func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readonly bool) (
return &txn{
rootTxn,
multistore,
id,
[]func(){},
[]func(){},
[]func(){},
}, nil
}

// ID returns the unique immutable identifier for this transaction.
func (t *txn) ID() uint64 {
return t.id
}

// Commit finalizes a transaction, attempting to commit it to the Datastore.
func (t *txn) Commit(ctx context.Context) error {
if err := t.t.Commit(ctx); err != nil {
t.runErrorFns(ctx)
runFns(t.errorFns)
return err
}
t.runSuccessFns(ctx)
runFns(t.successFns)
return nil
}

// Discard throws away changes recorded in a transaction without committing.
func (t *txn) Discard(ctx context.Context) {
t.t.Discard(ctx)
runFns(t.discardFns)
}

// OnSuccess registers a function to be called when the transaction is committed.
Expand All @@ -109,14 +131,16 @@ func (txn *txn) OnError(fn func()) {
txn.errorFns = append(txn.errorFns, fn)
}

func (txn *txn) runErrorFns(ctx context.Context) {
for _, fn := range txn.errorFns {
fn()
// OnDiscard registers a function to be called when the transaction is discarded.
func (txn *txn) OnDiscard(fn func()) {
if fn == nil {
return
}
txn.discardFns = append(txn.discardFns, fn)
}

func (txn *txn) runSuccessFns(ctx context.Context) {
for _, fn := range txn.successFns {
func runFns(fns []func()) {
for _, fn := range fns {
fn()
}
}
Expand Down
8 changes: 4 additions & 4 deletions datastore/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestNewTxnFrom(t *testing.T) {
rootstore, err := badgerds.NewDatastore("", &opts)
require.NoError(t, err)

txn, err := NewTxnFrom(ctx, rootstore, false)
txn, err := NewTxnFrom(ctx, rootstore, 0, false)
require.NoError(t, err)

err = txn.Commit(ctx)
Expand All @@ -43,7 +43,7 @@ func TestNewTxnFromWithStoreClosed(t *testing.T) {
err = rootstore.Close()
require.NoError(t, err)

_, err = NewTxnFrom(ctx, rootstore, false)
_, err = NewTxnFrom(ctx, rootstore, 0, false)
require.ErrorIs(t, err, badgerds.ErrClosed)
}

Expand All @@ -53,7 +53,7 @@ func TestOnSuccess(t *testing.T) {
rootstore, err := badgerds.NewDatastore("", &opts)
require.NoError(t, err)

txn, err := NewTxnFrom(ctx, rootstore, false)
txn, err := NewTxnFrom(ctx, rootstore, 0, false)
require.NoError(t, err)

txn.OnSuccess(nil)
Expand All @@ -74,7 +74,7 @@ func TestOnError(t *testing.T) {
rootstore, err := badgerds.NewDatastore("", &opts)
require.NoError(t, err)

txn, err := NewTxnFrom(ctx, rootstore, false)
txn, err := NewTxnFrom(ctx, rootstore, 0, false)
require.NoError(t, err)

txn.OnError(nil)
Expand Down
Loading