Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: DB transactions context #2513

Merged
merged 16 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Execute queries, add schema types, obtain node info, etc.`,
if err := setContextTransaction(cmd, txID); err != nil {
return err
}
return setContextStore(cmd)
return setContextDB(cmd)
},
}
cmd.PersistentFlags().Uint64Var(&txID, "tx", 0, "Transaction ID")
Expand Down
7 changes: 1 addition & 6 deletions cli/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/spf13/cobra"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/datastore"
)

func MakeCollectionCommand() *cobra.Command {
Expand All @@ -41,7 +40,7 @@ func MakeCollectionCommand() *cobra.Command {
if err := setContextTransaction(cmd, txID); err != nil {
return err
}
if err := setContextStore(cmd); err != nil {
if err := setContextDB(cmd); err != nil {
return err
}
store := mustGetContextStore(cmd)
Expand Down Expand Up @@ -71,10 +70,6 @@ func MakeCollectionCommand() *cobra.Command {
}
col := cols[0]

if tx, ok := cmd.Context().Value(txContextKey).(datastore.Txn); ok {
col = col.WithTxn(tx)
}

ctx := context.WithValue(cmd.Context(), colContextKey, col)
cmd.SetContext(ctx)
return nil
Expand Down
4 changes: 0 additions & 4 deletions cli/index_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/spf13/cobra"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/datastore"
)

func MakeIndexCreateCommand() *cobra.Command {
Expand Down Expand Up @@ -52,9 +51,6 @@ Example: create a named index for 'Users' collection on 'name' field:
if err != nil {
return err
}
if tx, ok := cmd.Context().Value(txContextKey).(datastore.Txn); ok {
col = col.WithTxn(tx)
}
desc, err = col.CreateIndex(cmd.Context(), desc)
if err != nil {
return err
Expand Down
5 changes: 0 additions & 5 deletions cli/index_drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ package cli

import (
"github.com/spf13/cobra"

"github.com/sourcenetwork/defradb/datastore"
)

func MakeIndexDropCommand() *cobra.Command {
Expand All @@ -34,9 +32,6 @@ Example: drop the index 'UsersByName' for 'Users' collection:
if err != nil {
return err
}
if tx, ok := cmd.Context().Value(txContextKey).(datastore.Txn); ok {
col = col.WithTxn(tx)
}
return col.DropIndex(cmd.Context(), nameArg)
},
}
Expand Down
5 changes: 0 additions & 5 deletions cli/index_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ package cli

import (
"github.com/spf13/cobra"

"github.com/sourcenetwork/defradb/datastore"
)

func MakeIndexListCommand() *cobra.Command {
Expand All @@ -38,9 +36,6 @@ Example: show all index for 'Users' collection:
if err != nil {
return err
}
if tx, ok := cmd.Context().Value(txContextKey).(datastore.Txn); ok {
col = col.WithTxn(tx)
}
indexes, err := col.GetIndexes(cmd.Context())
if err != nil {
return err
Expand Down
9 changes: 1 addition & 8 deletions cli/schema_migration_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (

"github.com/sourcenetwork/immutable/enumerable"
"github.com/spf13/cobra"

"github.com/sourcenetwork/defradb/datastore"
)

func MakeSchemaMigrationDownCommand() *cobra.Command {
Expand Down Expand Up @@ -67,12 +65,7 @@ Example: migrate from stdin
if err := json.Unmarshal(srcData, &src); err != nil {
return err
}
lens := store.LensRegistry()
if tx, ok := cmd.Context().Value(txContextKey).(datastore.Txn); ok {
lens = lens.WithTxn(tx)
}

out, err := lens.MigrateDown(cmd.Context(), enumerable.New(src), collectionID)
out, err := store.LensRegistry().MigrateDown(cmd.Context(), enumerable.New(src), collectionID)
if err != nil {
return err
}
Expand Down
9 changes: 1 addition & 8 deletions cli/schema_migration_reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ package cli

import (
"github.com/spf13/cobra"

"github.com/sourcenetwork/defradb/datastore"
)

func MakeSchemaMigrationReloadCommand() *cobra.Command {
Expand All @@ -23,12 +21,7 @@ func MakeSchemaMigrationReloadCommand() *cobra.Command {
Long: `Reload the schema migrations within DefraDB`,
RunE: func(cmd *cobra.Command, args []string) error {
store := mustGetContextStore(cmd)

lens := store.LensRegistry()
if tx, ok := cmd.Context().Value(txContextKey).(datastore.Txn); ok {
lens = lens.WithTxn(tx)
}
return lens.ReloadLenses(cmd.Context())
return store.LensRegistry().ReloadLenses(cmd.Context())
},
}
return cmd
Expand Down
9 changes: 1 addition & 8 deletions cli/schema_migration_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (

"github.com/sourcenetwork/immutable/enumerable"
"github.com/spf13/cobra"

"github.com/sourcenetwork/defradb/datastore"
)

func MakeSchemaMigrationUpCommand() *cobra.Command {
Expand Down Expand Up @@ -67,12 +65,7 @@ Example: migrate from stdin
if err := json.Unmarshal(srcData, &src); err != nil {
return err
}
lens := store.LensRegistry()
if tx, ok := cmd.Context().Value(txContextKey).(datastore.Txn); ok {
lens = lens.WithTxn(tx)
}

out, err := lens.MigrateUp(cmd.Context(), enumerable.New(src), collectionID)
out, err := store.LensRegistry().MigrateUp(cmd.Context(), enumerable.New(src), collectionID)
if err != nil {
return err
}
Expand Down
44 changes: 15 additions & 29 deletions cli/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/spf13/viper"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/db"
"github.com/sourcenetwork/defradb/http"
)

Expand All @@ -32,17 +32,8 @@ var (
cfgContextKey = contextKey("cfg")
// rootDirContextKey is the context key for the root directory.
rootDirContextKey = contextKey("rootDir")
// txContextKey is the context key for the datastore.Txn
//
// This will only be set if a transaction id is specified.
txContextKey = contextKey("tx")
// dbContextKey is the context key for the client.DB
dbContextKey = contextKey("db")
// storeContextKey is the context key for the client.Store
//
// If a transaction exists, all operations will be executed
// in the current transaction context.
storeContextKey = contextKey("store")
// colContextKey is the context key for the client.Collection
//
// If a transaction exists, all operations will be executed
Expand All @@ -61,7 +52,7 @@ func mustGetContextDB(cmd *cobra.Command) client.DB {
//
// If a store is not set in the current context this function panics.
func mustGetContextStore(cmd *cobra.Command) client.Store {
return cmd.Context().Value(storeContextKey).(client.Store)
return cmd.Context().Value(dbContextKey).(client.Store)
}

// mustGetContextP2P returns the p2p implementation for the current command context.
Expand Down Expand Up @@ -92,6 +83,18 @@ func tryGetContextCollection(cmd *cobra.Command) (client.Collection, bool) {
return col, ok
}

// setContextDB sets the db for the current command context.
func setContextDB(cmd *cobra.Command) error {
cfg := mustGetContextConfig(cmd)
db, err := http.NewClient(cfg.GetString("api.address"))
if err != nil {
return err
}
ctx := context.WithValue(cmd.Context(), dbContextKey, db)
cmd.SetContext(ctx)
return nil
}

// setContextConfig sets teh config for the current command context.
func setContextConfig(cmd *cobra.Command) error {
rootdir := mustGetContextRootDir(cmd)
Expand All @@ -115,24 +118,7 @@ func setContextTransaction(cmd *cobra.Command, txId uint64) error {
if err != nil {
return err
}
ctx := context.WithValue(cmd.Context(), txContextKey, tx)
cmd.SetContext(ctx)
return nil
}

// setContextStore sets the store for the current command context.
func setContextStore(cmd *cobra.Command) error {
cfg := mustGetContextConfig(cmd)
db, err := http.NewClient(cfg.GetString("api.address"))
if err != nil {
return err
}
ctx := context.WithValue(cmd.Context(), dbContextKey, db)
if tx, ok := ctx.Value(txContextKey).(datastore.Txn); ok {
ctx = context.WithValue(ctx, storeContextKey, db.WithTxn(tx))
} else {
ctx = context.WithValue(ctx, storeContextKey, db)
}
ctx := db.SetContextTxn(cmd.Context(), tx)
cmd.SetContext(ctx)
return nil
}
Expand Down
6 changes: 0 additions & 6 deletions client/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"context"

"github.com/sourcenetwork/immutable"

"github.com/sourcenetwork/defradb/datastore"
)

// Collection represents a defradb collection.
Expand Down Expand Up @@ -192,10 +190,6 @@ type Collection interface {
showDeleted bool,
) (*Document, error)

// WithTxn returns a new instance of the collection, with a transaction
// handle instead of a raw DB handle.
WithTxn(datastore.Txn) Collection

// GetAllDocIDs returns all the document IDs that exist in the collection.
GetAllDocIDs(ctx context.Context, identity immutable.Option[string]) (<-chan DocIDResult, error)

Expand Down
3 changes: 0 additions & 3 deletions client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ type DB interface {
// can safely operate on it concurrently.
NewConcurrentTxn(context.Context, bool) (datastore.Txn, error)

// WithTxn returns a new [client.Store] that respects the given transaction.
WithTxn(datastore.Txn) Store

// Root returns the underlying root store, within which all data managed by DefraDB is held.
Root() datastore.RootStore

Expand Down
8 changes: 0 additions & 8 deletions client/lens.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (

"github.com/lens-vm/lens/host-go/config/model"
"github.com/sourcenetwork/immutable/enumerable"

"github.com/sourcenetwork/defradb/datastore"
)

// LensConfig represents the configuration of a Lens migration in Defra.
Expand All @@ -43,12 +41,6 @@ type LensConfig struct {
// LensRegistry exposes several useful thread-safe migration related functions which may
// be used to manage migrations.
type LensRegistry interface {
// WithTxn returns a new LensRegistry scoped to the given transaction.
//
// WARNING: Currently this does not provide snapshot isolation, if other transactions are committed
// after this has been created, the results of those commits will be visible within this scope.
WithTxn(datastore.Txn) LensRegistry

// SetMigration caches the migration for the given collection ID. It does not persist the migration in long
// term storage, for that one should call [Store.SetMigration(ctx, cfg)].
//
Expand Down
9 changes: 4 additions & 5 deletions db/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (db *db) basicImport(ctx context.Context, txn datastore.Txn, filepath strin
}

// TODO-ACP: https://github.com/sourcenetwork/defradb/issues/2430 - Add identity ability to backup
err = col.WithTxn(txn).Create(ctx, acpIdentity.NoIdentity, doc)
err = col.Create(ctx, acpIdentity.NoIdentity, doc)
if err != nil {
return NewErrDocCreate(err)
}
Expand All @@ -104,7 +104,7 @@ func (db *db) basicImport(ctx context.Context, txn datastore.Txn, filepath strin
return NewErrDocUpdate(err)
}
// TODO-ACP: https://github.com/sourcenetwork/defradb/issues/2430 - Add identity ability to backup
err = col.WithTxn(txn).Update(ctx, acpIdentity.NoIdentity, doc)
err = col.Update(ctx, acpIdentity.NoIdentity, doc)
if err != nil {
return NewErrDocUpdate(err)
}
Expand Down Expand Up @@ -191,9 +191,8 @@ func (db *db) basicExport(ctx context.Context, txn datastore.Txn, config *client
if err != nil {
return err
}
colTxn := col.WithTxn(txn)
// TODO-ACP: https://github.com/sourcenetwork/defradb/issues/2430 - Add identity ability to export
docIDsCh, err := colTxn.GetAllDocIDs(ctx, acpIdentity.NoIdentity)
docIDsCh, err := col.GetAllDocIDs(ctx, acpIdentity.NoIdentity)
if err != nil {
return err
}
Expand All @@ -210,7 +209,7 @@ func (db *db) basicExport(ctx context.Context, txn datastore.Txn, config *client
}
}
// TODO-ACP: https://github.com/sourcenetwork/defradb/issues/2430 - Add identity ability to export
doc, err := colTxn.Get(ctx, acpIdentity.NoIdentity, docResultWithID.ID, false)
doc, err := col.Get(ctx, acpIdentity.NoIdentity, docResultWithID.ID, false)
if err != nil {
return err
}
Expand Down
Loading
Loading