diff --git a/client/errors.go b/client/errors.go index 048d96c00d..28161c502d 100644 --- a/client/errors.go +++ b/client/errors.go @@ -24,6 +24,7 @@ const ( errMaxTxnRetries string = "reached maximum transaction reties" errRelationOneSided string = "relation must be defined on both schemas" errCollectionNotFound string = "collection not found" + errUnknownCRDT string = "unknown crdt" ) // Errors returnable from this package. @@ -47,6 +48,7 @@ var ( ErrMaxTxnRetries = errors.New(errMaxTxnRetries) ErrRelationOneSided = errors.New(errRelationOneSided) ErrCollectionNotFound = errors.New(errCollectionNotFound) + ErrUnknownCRDT = errors.New(errUnknownCRDT) ) // NewErrFieldNotExist returns an error indicating that the given field does not exist. @@ -123,3 +125,10 @@ func NewErrCollectionNotFoundForSchema(schemaRoot string) error { errors.NewKV("SchemaRoot", schemaRoot), ) } + +func NewErrUnknownCRDT(cType CType) error { + return errors.New( + errUnknownCRDT, + errors.NewKV("Type", cType), + ) +} diff --git a/core/crdt/base.go b/core/crdt/base.go index d24b263645..a0d8b5375f 100644 --- a/core/crdt/base.go +++ b/core/crdt/base.go @@ -28,13 +28,26 @@ import ( type baseCRDT struct { store datastore.DSReaderWriter key core.DataStoreKey + + // schemaVersionKey is the schema version datastore key at the time of commit. + // + // It can be used to identify the collection datastructure state at the time of commit. + schemaVersionKey core.CollectionSchemaVersionKey + + fieldName string } -// @TODO paramaterize ns/suffix -func newBaseCRDT(store datastore.DSReaderWriter, key core.DataStoreKey) baseCRDT { +func newBaseCRDT( + store datastore.DSReaderWriter, + key core.DataStoreKey, + schemaVersionKey core.CollectionSchemaVersionKey, + fieldName string, +) baseCRDT { return baseCRDT{ - store: store, - key: key, + store: store, + key: key, + schemaVersionKey: schemaVersionKey, + fieldName: fieldName, } } diff --git a/core/crdt/base_test.go b/core/crdt/base_test.go index 5fd7d9248e..e69d69f05e 100644 --- a/core/crdt/base_test.go +++ b/core/crdt/base_test.go @@ -29,11 +29,11 @@ func newSeededDS() datastore.DSReaderWriter { } func exampleBaseCRDT() baseCRDT { - return newBaseCRDT(newSeededDS(), core.DataStoreKey{}) + return newBaseCRDT(newSeededDS(), core.DataStoreKey{}, core.CollectionSchemaVersionKey{}, "") } func TestBaseCRDTNew(t *testing.T) { - base := newBaseCRDT(newDS(), core.DataStoreKey{}) + base := newBaseCRDT(newDS(), core.DataStoreKey{}, core.CollectionSchemaVersionKey{}, "") if base.store == nil { t.Error("newBaseCRDT needs to init store") } diff --git a/core/crdt/composite.go b/core/crdt/composite.go index 68f7824329..761cc07828 100644 --- a/core/crdt/composite.go +++ b/core/crdt/composite.go @@ -29,11 +29,6 @@ import ( "github.com/sourcenetwork/defradb/errors" ) -var ( - _ core.ReplicatedData = (*CompositeDAG)(nil) - _ core.CompositeDelta = (*CompositeDAGDelta)(nil) -) - // CompositeDAGDelta represents a delta-state update made of sub-MerkleCRDTs. type CompositeDAGDelta struct { // SchemaVersionID is the schema version datastore key at the time of commit. @@ -51,6 +46,8 @@ type CompositeDAGDelta struct { FieldName string } +var _ core.CompositeDelta = (*CompositeDAGDelta)(nil) + // GetPriority gets the current priority for this delta. func (delta *CompositeDAGDelta) GetPriority() uint64 { return delta.Priority @@ -92,39 +89,21 @@ func (delta *CompositeDAGDelta) Links() []core.DAGLink { // CompositeDAG is a CRDT structure that is used to track a collection of sub MerkleCRDTs. type CompositeDAG struct { - store datastore.DSReaderWriter - key core.DataStoreKey - // schemaVersionKey is the schema version datastore key at the time of commit. - // - // It can be used to identify the collection datastructure state at time of commit. - schemaVersionKey core.CollectionSchemaVersionKey - - fieldName string + baseCRDT } -var _ core.ReplicatedData = CompositeDAG{} +var _ core.ReplicatedData = (*CompositeDAG)(nil) func NewCompositeDAG( store datastore.DSReaderWriter, schemaVersionKey core.CollectionSchemaVersionKey, - namespace core.Key, key core.DataStoreKey, fieldName string, ) CompositeDAG { - return CompositeDAG{ - store: store, - key: key, - schemaVersionKey: schemaVersionKey, - fieldName: fieldName, - } -} - -// ID returns the schema ID of the composite DAG CRDT. -func (c CompositeDAG) ID() string { - return c.key.ToString() + return CompositeDAG{newBaseCRDT(store, key, schemaVersionKey, fieldName)} } -// Value returns the schema ID of the composite DAG CRDT. +// Value is a no-op for a CompositeDAG. func (c CompositeDAG) Value(ctx context.Context) ([]byte, error) { return nil, nil } @@ -226,7 +205,7 @@ func (c CompositeDAG) deleteWithPrefix(ctx context.Context, key core.DataStoreKe } // DeltaDecode is a typed helper to extract. -// a LWWRegDelta from a ipld.Node +// a CompositeDAGDelta from a ipld.Node // for now let's do cbor (quick to implement) func (c CompositeDAG) DeltaDecode(node ipld.Node) (core.Delta, error) { delta := &CompositeDAGDelta{} diff --git a/core/crdt/lwwreg.go b/core/crdt/lwwreg.go index 60df739319..18979c1bfb 100644 --- a/core/crdt/lwwreg.go +++ b/core/crdt/lwwreg.go @@ -26,12 +26,6 @@ import ( "github.com/sourcenetwork/defradb/errors" ) -var ( - // ensure types implements core interfaces - _ core.ReplicatedData = (*LWWRegister)(nil) - _ core.Delta = (*LWWRegDelta)(nil) -) - // LWWRegDelta is a single delta operation for an LWWRegister // @todo: Expand delta metadata (investigate if needed) type LWWRegDelta struct { @@ -42,6 +36,8 @@ type LWWRegDelta struct { FieldName string } +var _ core.Delta = (*LWWRegDelta)(nil) + // GetPriority gets the current priority for this delta. func (delta *LWWRegDelta) GetPriority() uint64 { return delta.Priority @@ -79,15 +75,10 @@ func (delta *LWWRegDelta) Value() any { // of an arbitrary data type that ensures convergence. type LWWRegister struct { baseCRDT - - // schemaVersionKey is the schema version datastore key at the time of commit. - // - // It can be used to identify the collection datastructure state at time of commit. - schemaVersionKey core.CollectionSchemaVersionKey - - fieldName string } +var _ core.ReplicatedData = (*LWWRegister)(nil) + // NewLWWRegister returns a new instance of the LWWReg with the given ID. func NewLWWRegister( store datastore.DSReaderWriter, @@ -95,15 +86,7 @@ func NewLWWRegister( key core.DataStoreKey, fieldName string, ) LWWRegister { - return LWWRegister{ - baseCRDT: newBaseCRDT(store, key), - schemaVersionKey: schemaVersionKey, - fieldName: fieldName, - // id: id, - // data: data, - // ts: ts, - // clock: clock, - } + return LWWRegister{newBaseCRDT(store, key, schemaVersionKey, fieldName)} } // Value gets the current register value @@ -120,7 +103,6 @@ func (reg LWWRegister) Value(ctx context.Context) ([]byte, error) { // Set generates a new delta with the supplied value // RETURN DELTA func (reg LWWRegister) Set(value []byte) *LWWRegDelta { - // return NewLWWRegister(reg.id, value, reg.clock.Apply(), reg.clock) return &LWWRegDelta{ Data: value, DocKey: []byte(reg.key.DocKey), @@ -129,18 +111,6 @@ func (reg LWWRegister) Set(value []byte) *LWWRegDelta { } } -func (reg LWWRegister) ID() string { - return reg.key.ToString() -} - -// RETURN DELTA -// func (reg LWWRegister) setWithClock(value []byte, clock Clock) LWWRegDelta { -// // return NewLWWRegister(reg.id, value, clock.Apply(), clock) -// return LWWRegDelta{ -// data: value, -// } -// } - // Merge implements ReplicatedData interface // Merge two LWWRegisty based on the order of the timestamp (ts), // if they are equal, compare IDs diff --git a/core/replicated.go b/core/replicated.go index 86d0523e42..75a72ece7f 100644 --- a/core/replicated.go +++ b/core/replicated.go @@ -20,7 +20,6 @@ import ( // ReplicatedData is a data type that allows concurrent writers to deterministically merge other // replicated data so as to converge on the same state. type ReplicatedData interface { - ID() string Merge(ctx context.Context, other Delta) error DeltaDecode(node ipld.Node) (Delta, error) // possibly rename to just Decode Value(ctx context.Context) ([]byte, error) @@ -31,8 +30,3 @@ type PersistedReplicatedData interface { ReplicatedData Publish(Delta) (cid.Cid, error) } - -// type EmbedableReplicatedData interface { -// ReplicatedData -// Apply(Operation) error -// } diff --git a/db/collection.go b/db/collection.go index b4586be89b..65b0fbaa22 100644 --- a/db/collection.go +++ b/db/collection.go @@ -34,7 +34,7 @@ import ( "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/events" "github.com/sourcenetwork/defradb/lens" - "github.com/sourcenetwork/defradb/merkle/crdt" + merklecrdt "github.com/sourcenetwork/defradb/merkle/crdt" ) var _ client.Collection = (*collection)(nil) @@ -973,7 +973,7 @@ func (c *collection) save( return cid.Undef, err } - node, _, err := c.saveDocValue(ctx, txn, fieldKey, val) + node, _, err := c.saveFieldToMerkleCRDT(ctx, txn, fieldKey, val) if err != nil { return cid.Undef, err } @@ -1000,11 +1000,10 @@ func (c *collection) save( return cid.Undef, nil } - headNode, priority, err := c.saveValueToMerkleCRDT( + headNode, priority, err := c.saveCompositeToMerkleCRDT( ctx, txn, primaryKey.ToDataStoreKey(), - client.COMPOSITE, buf, links, client.Active, @@ -1179,7 +1178,7 @@ func (c *collection) exists( return true, false, nil } -func (c *collection) saveDocValue( +func (c *collection) saveFieldToMerkleCRDT( ctx context.Context, txn datastore.Txn, key core.DataStoreKey, @@ -1201,20 +1200,7 @@ func (c *collection) saveDocValue( return nil, 0, err } } - return c.saveValueToMerkleCRDT(ctx, txn, key, client.LWW_REGISTER, bytes) - default: - return nil, 0, ErrUnknownCRDT - } -} -func (c *collection) saveValueToMerkleCRDT( - ctx context.Context, - txn datastore.Txn, - key core.DataStoreKey, - ctype client.CType, - args ...any) (ipld.Node, uint64, error) { - switch ctype { - case client.LWW_REGISTER: fieldID, err := strconv.Atoi(key.FieldId) if err != nil { return nil, 0, err @@ -1227,68 +1213,40 @@ func (c *collection) saveValueToMerkleCRDT( return nil, 0, client.NewErrFieldIndexNotExist(fieldID) } - merkleCRDT, err := c.db.crdtFactory.InstanceWithStores( + merkleCRDT := merklecrdt.NewMerkleLWWRegister( txn, core.NewCollectionSchemaVersionKey(schema.VersionID, c.ID()), - c.db.events.Updates, - ctype, key, field.Name, ) - if err != nil { - return nil, 0, err - } - var bytes []byte - // parse args - if len(args) != 1 { - return nil, 0, ErrUnknownCRDTArgument - } - bytes, ok = args[0].([]byte) - if !ok { - return nil, 0, ErrUnknownCRDTArgument - } - lwwreg := merkleCRDT.(*crdt.MerkleLWWRegister) - return lwwreg.Set(ctx, bytes) - case client.COMPOSITE: - key = key.WithFieldId(core.COMPOSITE_NAMESPACE) - merkleCRDT, err := c.db.crdtFactory.InstanceWithStores( - txn, - core.NewCollectionSchemaVersionKey(c.Schema().VersionID, c.ID()), - c.db.events.Updates, - ctype, - key, - "", - ) - if err != nil { - return nil, 0, err - } + return merkleCRDT.Set(ctx, bytes) + default: + return nil, 0, client.NewErrUnknownCRDT(val.Type()) + } +} - // parse args - if len(args) < 2 { - return nil, 0, ErrUnknownCRDTArgument - } - bytes, ok := args[0].([]byte) - if !ok { - return nil, 0, ErrUnknownCRDTArgument - } - links, ok := args[1].([]core.DAGLink) - if !ok { - return nil, 0, ErrUnknownCRDTArgument - } - comp := merkleCRDT.(*crdt.MerkleCompositeDAG) - if len(args) > 2 { - status, ok := args[2].(client.DocumentStatus) - if !ok { - return nil, 0, ErrUnknownCRDTArgument - } - if status.IsDeleted() { - return comp.Delete(ctx, links) - } - } - return comp.Set(ctx, bytes, links) +func (c *collection) saveCompositeToMerkleCRDT( + ctx context.Context, + txn datastore.Txn, + key core.DataStoreKey, + buf []byte, + links []core.DAGLink, + status client.DocumentStatus, +) (ipld.Node, uint64, error) { + key = key.WithFieldId(core.COMPOSITE_NAMESPACE) + merkleCRDT := merklecrdt.NewMerkleCompositeDAG( + txn, + core.NewCollectionSchemaVersionKey(c.Schema().VersionID, c.ID()), + key, + "", + ) + + if status.IsDeleted() { + return merkleCRDT.Delete(ctx, links) } - return nil, 0, ErrUnknownCRDT + + return merkleCRDT.Set(ctx, buf, links) } // getTxn gets or creates a new transaction from the underlying db. diff --git a/db/collection_delete.go b/db/collection_delete.go index 7f6a968a97..afa7d64a92 100644 --- a/db/collection_delete.go +++ b/db/collection_delete.go @@ -261,11 +261,10 @@ func (c *collection) applyDelete( } } - headNode, priority, err := c.saveValueToMerkleCRDT( + headNode, priority, err := c.saveCompositeToMerkleCRDT( ctx, txn, dsKey, - client.COMPOSITE, []byte{}, dagLinks, client.Deleted, diff --git a/db/db.go b/db/db.go index f2f59ecdaf..1046b2db54 100644 --- a/db/db.go +++ b/db/db.go @@ -31,7 +31,6 @@ import ( "github.com/sourcenetwork/defradb/events" "github.com/sourcenetwork/defradb/lens" "github.com/sourcenetwork/defradb/logging" - "github.com/sourcenetwork/defradb/merkle/crdt" "github.com/sourcenetwork/defradb/request/graphql" ) @@ -56,8 +55,6 @@ type db struct { rootstore datastore.RootStore multistore datastore.MultiStore - crdtFactory *crdt.Factory - events events.Events parser core.Parser @@ -114,7 +111,6 @@ func NewDB(ctx context.Context, rootstore datastore.RootStore, options ...Option func newDB(ctx context.Context, rootstore datastore.RootStore, options ...Option) (*implicitTxnDB, error) { log.Debug(ctx, "Loading: internal datastores") multistore := datastore.MultiStoreFrom(rootstore) - crdtFactory := crdt.DefaultFactory.WithStores(multistore) parser, err := graphql.NewParser() if err != nil { @@ -125,8 +121,6 @@ func newDB(ctx context.Context, rootstore datastore.RootStore, options ...Option rootstore: rootstore, multistore: multistore, - crdtFactory: &crdtFactory, - parser: parser, options: options, } diff --git a/db/errors.go b/db/errors.go index 651bcbe42b..17e82c6738 100644 --- a/db/errors.go +++ b/db/errors.go @@ -111,7 +111,6 @@ var ( ErrDocumentAlreadyExists = errors.New(errDocumentAlreadyExists) ErrDocumentDeleted = errors.New(errDocumentDeleted) ErrUnknownCRDTArgument = errors.New("invalid CRDT arguments") - ErrUnknownCRDT = errors.New("unknown crdt") ErrCollectionAlreadyExists = errors.New("collection already exists") ErrCollectionNameEmpty = errors.New("collection name can't be empty") ErrSchemaNameEmpty = errors.New("schema name can't be empty") diff --git a/db/fetcher/versioned.go b/db/fetcher/versioned.go index 4ab8ef54a7..454bcf17c6 100644 --- a/db/fetcher/versioned.go +++ b/db/fetcher/versioned.go @@ -25,8 +25,7 @@ import ( "github.com/sourcenetwork/defradb/datastore/memory" "github.com/sourcenetwork/defradb/db/base" "github.com/sourcenetwork/defradb/errors" - "github.com/sourcenetwork/defradb/events" - "github.com/sourcenetwork/defradb/merkle/crdt" + merklecrdt "github.com/sourcenetwork/defradb/merkle/crdt" "github.com/sourcenetwork/defradb/planner/mapper" ) @@ -94,7 +93,7 @@ type VersionedFetcher struct { col client.Collection // @todo index *client.IndexDescription - mCRDTs map[uint32]crdt.MerkleCRDT + mCRDTs map[uint32]merklecrdt.MerkleCRDT } // Init initializes the VersionedFetcher. @@ -110,7 +109,7 @@ func (vf *VersionedFetcher) Init( ) error { vf.col = col vf.queuedCids = list.New() - vf.mCRDTs = make(map[uint32]crdt.MerkleCRDT) + vf.mCRDTs = make(map[uint32]merklecrdt.MerkleCRDT) vf.txn = txn // create store @@ -385,10 +384,9 @@ func (vf *VersionedFetcher) processNode( if err != nil { return err } - mcrdt, err = crdt.DefaultFactory.InstanceWithStores( + mcrdt, err = merklecrdt.InstanceWithStore( vf.store, core.CollectionSchemaVersionKey{}, - events.EmptyUpdateChannel, ctype, key, fieldName, diff --git a/merkle/crdt/composite.go b/merkle/crdt/composite.go index 704c65fcd0..f837ac3ef7 100644 --- a/merkle/crdt/composite.go +++ b/merkle/crdt/composite.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package crdt +package merklecrdt import ( "context" @@ -18,42 +18,9 @@ import ( "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/core" corecrdt "github.com/sourcenetwork/defradb/core/crdt" - "github.com/sourcenetwork/defradb/datastore" - "github.com/sourcenetwork/defradb/events" "github.com/sourcenetwork/defradb/merkle/clock" ) -var ( - compFactoryFn = MerkleCRDTFactory( - func( - mstore datastore.MultiStore, - schemaRoot core.CollectionSchemaVersionKey, - uCh events.UpdateChannel, - fieldName string, - ) MerkleCRDTInitFn { - return func(key core.DataStoreKey) MerkleCRDT { - return NewMerkleCompositeDAG( - mstore.Datastore(), - mstore.Headstore(), - mstore.DAGstore(), - schemaRoot, - uCh, - core.DataStoreKey{}, - key, - fieldName, - ) - } - }, - ) -) - -func init() { - err := DefaultFactory.Register(client.COMPOSITE, &compFactoryFn) - if err != nil { - panic(err) - } -} - // MerkleCompositeDAG is a MerkleCRDT implementation of the CompositeDAG using MerkleClocks. type MerkleCompositeDAG struct { *baseMerkleCRDT @@ -64,25 +31,20 @@ type MerkleCompositeDAG struct { // NewMerkleCompositeDAG creates a new instance (or loaded from DB) of a MerkleCRDT // backed by a CompositeDAG CRDT. func NewMerkleCompositeDAG( - datastore datastore.DSReaderWriter, - headstore datastore.DSReaderWriter, - dagstore datastore.DAGStore, + store Stores, schemaVersionKey core.CollectionSchemaVersionKey, - uCh events.UpdateChannel, - ns, key core.DataStoreKey, fieldName string, ) *MerkleCompositeDAG { compositeDag := corecrdt.NewCompositeDAG( - datastore, + store.Datastore(), schemaVersionKey, - ns, - key, /* stuff like namespace and ID */ + key, fieldName, ) - clock := clock.NewMerkleClock(headstore, dagstore, key.ToHeadStoreKey(), compositeDag) - base := &baseMerkleCRDT{clock: clock, crdt: compositeDag, updateChannel: uCh} + clock := clock.NewMerkleClock(store.Headstore(), store.DAGstore(), key.ToHeadStoreKey(), compositeDag) + base := &baseMerkleCRDT{clock: clock, crdt: compositeDag} return &MerkleCompositeDAG{ baseMerkleCRDT: base, @@ -100,7 +62,7 @@ func (m *MerkleCompositeDAG) Delete( log.Debug(ctx, "Applying delta-mutator 'Delete' on CompositeDAG") delta := m.reg.Set([]byte{}, links) delta.Status = client.Deleted - nd, err := m.Publish(ctx, delta) + nd, err := m.clock.AddDAGNode(ctx, delta) if err != nil { return nil, 0, err } @@ -118,21 +80,10 @@ func (m *MerkleCompositeDAG) Set( // persist/publish delta log.Debug(ctx, "Applying delta-mutator 'Set' on CompositeDAG") delta := m.reg.Set(patch, links) - nd, err := m.Publish(ctx, delta) + nd, err := m.clock.AddDAGNode(ctx, delta) if err != nil { return nil, 0, err } return nd, delta.GetPriority(), nil } - -// Value is a no-op for a CompositeDAG. -func (m *MerkleCompositeDAG) Value(ctx context.Context) ([]byte, error) { - return m.reg.Value(ctx) -} - -// Merge writes the provided delta to state using a supplied merge semantic. -// @todo -func (m *MerkleCompositeDAG) Merge(ctx context.Context, other core.Delta) error { - return m.reg.Merge(ctx, other) -} diff --git a/merkle/crdt/errors.go b/merkle/crdt/errors.go deleted file mode 100644 index e33ec97a12..0000000000 --- a/merkle/crdt/errors.go +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2022 Democratized Data Foundation -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package crdt - -import ( - "github.com/sourcenetwork/defradb/errors" -) - -var ( - ErrFactoryTypeNoExist = errors.New("no such factory for the given type exists") -) diff --git a/merkle/crdt/factory.go b/merkle/crdt/factory.go deleted file mode 100644 index 04dc3d5aef..0000000000 --- a/merkle/crdt/factory.go +++ /dev/null @@ -1,162 +0,0 @@ -// Copyright 2022 Democratized Data Foundation -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package crdt - -import ( - "github.com/sourcenetwork/defradb/client" - "github.com/sourcenetwork/defradb/core" - "github.com/sourcenetwork/defradb/datastore" - "github.com/sourcenetwork/defradb/events" -) - -// MerkleCRDTInitFn instantiates a MerkleCRDT with a given key. -type MerkleCRDTInitFn func(core.DataStoreKey) MerkleCRDT - -// MerkleCRDTFactory instantiates a MerkleCRDTInitFn with a MultiStore. -// Returns a MerkleCRDTInitFn with all the necessary stores set. -type MerkleCRDTFactory func( - mstore datastore.MultiStore, - schemaVersionKey core.CollectionSchemaVersionKey, - uCh events.UpdateChannel, - fieldName string, -) MerkleCRDTInitFn - -// Factory is a helper utility for instantiating new MerkleCRDTs. -// It removes some of the overhead of having to coordinate all the various -// store parameters on every single new MerkleCRDT creation. -type Factory struct { - crdts map[client.CType]*MerkleCRDTFactory - multistore datastore.MultiStore -} - -var ( - // DefaultFactory is instantiated with no stores - // It is recommended to use this only after you call - // WithStores(...) so you get a new non-shared instance - DefaultFactory = NewFactory(nil) -) - -// NewFactory returns a newly instantiated factory object with the assigned stores. -// It may be called with all stores set to nil. -func NewFactory(multistore datastore.MultiStore) *Factory { - return &Factory{ - crdts: make(map[client.CType]*MerkleCRDTFactory), - multistore: multistore, - } -} - -// Register creates a new entry in the CRDTs map to register a factory function -// to a MerkleCRDT Type. -func (factory *Factory) Register(t client.CType, fn *MerkleCRDTFactory) error { - factory.crdts[t] = fn - return nil -} - -// Instance and execute the registered factory function for a given MerkleCRDT type -// supplied with all the current stores (passed in as a datastore.MultiStore object). -func (factory Factory) Instance( - schemaVersionKey core.CollectionSchemaVersionKey, - uCh events.UpdateChannel, - t client.CType, - key core.DataStoreKey, - fieldName string, -) (MerkleCRDT, error) { - // get the factory function for the given MerkleCRDT type - // and pass in the current factory state as a MultiStore parameter - fn, err := factory.getRegisteredFactory(t) - if err != nil { - return nil, err - } - return (*fn)(factory, schemaVersionKey, uCh, fieldName)(key), nil -} - -// InstanceWithStore executes the registered factory function for the given MerkleCRDT type -// with the additional supplied datastore.MultiStore instead of the saved one on the main Factory. -func (factory Factory) InstanceWithStores( - store datastore.MultiStore, - schemaVersionKey core.CollectionSchemaVersionKey, - uCh events.UpdateChannel, - t client.CType, - key core.DataStoreKey, - fieldName string, -) (MerkleCRDT, error) { - fn, err := factory.getRegisteredFactory(t) - if err != nil { - return nil, err - } - - return (*fn)(store, schemaVersionKey, uCh, fieldName)(key), nil -} - -func (factory Factory) getRegisteredFactory(t client.CType) (*MerkleCRDTFactory, error) { - fn, exists := factory.crdts[t] - if !exists { - return nil, ErrFactoryTypeNoExist - } - return fn, nil -} - -// SetStores sets all the current stores on the Factory in one call. -func (factory *Factory) SetStores(multistore datastore.MultiStore) error { - factory.multistore = multistore - return nil -} - -// WithStores returns a new instance of the Factory with all the stores set. -func (factory Factory) WithStores(multistore datastore.MultiStore) Factory { - factory.multistore = multistore - return factory -} - -// Rootstore implements MultiStore. -func (factory Factory) Rootstore() datastore.DSReaderWriter { - return nil -} - -// Data implements datastore.MultiStore and returns the current Datastore. -func (factory Factory) Datastore() datastore.DSReaderWriter { - if factory.multistore == nil { - return nil - } - return factory.multistore.Datastore() -} - -// Head implements datastore.MultiStore and returns the current Headstore. -func (factory Factory) Headstore() datastore.DSReaderWriter { - if factory.multistore == nil { - return nil - } - return factory.multistore.Headstore() -} - -// Peerstore implements datastore.MultiStore and returns the current Peerstore. -func (factory Factory) Peerstore() datastore.DSBatching { - if factory.multistore == nil { - return nil - } - return factory.multistore.Peerstore() -} - -// Head implements datastore.MultiStore and returns the current Headstore. -func (factory Factory) Systemstore() datastore.DSReaderWriter { - if factory.multistore == nil { - return nil - } - return factory.multistore.Systemstore() -} - -// DAGstore implements datastore.MultiStore and returns the current DAGstore. -func (factory Factory) DAGstore() datastore.DAGStore { - if factory.multistore == nil { - return nil - } - return factory.multistore.DAGstore() -} diff --git a/merkle/crdt/factory_test.go b/merkle/crdt/factory_test.go deleted file mode 100644 index 10e2f5c672..0000000000 --- a/merkle/crdt/factory_test.go +++ /dev/null @@ -1,238 +0,0 @@ -// Copyright 2022 Democratized Data Foundation -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package crdt - -import ( - "context" - "testing" - - ds "github.com/ipfs/go-datastore" - "github.com/stretchr/testify/assert" - - "github.com/sourcenetwork/defradb/client" - "github.com/sourcenetwork/defradb/core" - "github.com/sourcenetwork/defradb/datastore" - "github.com/sourcenetwork/defradb/events" -) - -func newStores() datastore.MultiStore { - root := ds.NewMapDatastore() - return datastore.MultiStoreFrom(root) -} - -func TestNewBlankFactory(t *testing.T) { - f := NewFactory(nil) - if f == nil { - t.Fatal("Returned factory is a nil pointer") - } -} - -func TestNewFactoryWithStores(t *testing.T) { - m := newStores() - f := NewFactory(m) - if f == nil { - t.Fatal("Returned factory is a nil pointer") - } - - assert.Equal(t, m.Datastore(), f.Datastore()) - assert.Equal(t, m.Headstore(), f.Headstore()) - assert.Equal(t, m.DAGstore(), f.DAGstore()) - assert.Equal(t, m.Systemstore(), f.Systemstore()) -} - -func TestFactoryMultiStoreInterface(t *testing.T) { - m := newStores() - f := NewFactory(m) - if f == nil { - t.Fatal("Returned factory is a nil pointer") - } - - // check interface implement - var _ datastore.MultiStore = f - // ms = f - - // check interface functions - assert.Equal(t, m.Datastore(), f.Datastore()) - assert.Equal(t, m.Headstore(), f.Headstore()) - assert.Equal(t, m.DAGstore(), f.DAGstore()) - assert.Equal(t, m.Systemstore(), f.Systemstore()) -} - -func TestFactorySetStores(t *testing.T) { - f := NewFactory(nil) - m := newStores() - err := f.SetStores(m) - assert.Nil(t, err) - - assert.Equal(t, m.Datastore(), f.Datastore()) - assert.Equal(t, m.Headstore(), f.Headstore()) - assert.Equal(t, m.DAGstore(), f.DAGstore()) - assert.Equal(t, m.Systemstore(), f.Systemstore()) -} - -func TestFactoryWithStores(t *testing.T) { - f := NewFactory(nil) - m := newStores() - f2 := f.WithStores(m) - // assert.NotEmpty - - assert.Nil(t, f.Datastore()) - assert.Nil(t, f.Headstore()) - assert.Nil(t, f.DAGstore()) - - assert.Equal(t, m.Datastore(), f2.Datastore()) - assert.Equal(t, m.Headstore(), f2.Headstore()) - assert.Equal(t, m.DAGstore(), f2.DAGstore()) - assert.Equal(t, m.Systemstore(), f2.Systemstore()) -} - -func TestFullFactoryRegister(t *testing.T) { - m := newStores() - f := NewFactory(m) - err := f.Register(client.LWW_REGISTER, &lwwFactoryFn) - assert.Nil(t, err) - assert.Equal(t, &lwwFactoryFn, f.crdts[client.LWW_REGISTER]) -} - -func TestBlankFactoryRegister(t *testing.T) { - f := NewFactory(nil) - err := f.Register(client.LWW_REGISTER, &lwwFactoryFn) - assert.Nil(t, err) - assert.Equal(t, &lwwFactoryFn, f.crdts[client.LWW_REGISTER]) -} - -func TestWithStoresFactoryRegister(t *testing.T) { - f := NewFactory(nil) - f.Register(client.LWW_REGISTER, &lwwFactoryFn) - m := newStores() - f2 := f.WithStores(m) - - assert.Equal(t, &lwwFactoryFn, f2.crdts[client.LWW_REGISTER]) -} - -func TestDefaultFactory(t *testing.T) { - assert.NotNil(t, DefaultFactory) - assert.Equal(t, &lwwFactoryFn, DefaultFactory.crdts[client.LWW_REGISTER]) -} - -func TestFactoryInstanceMissing(t *testing.T) { - m := newStores() - f := NewFactory(m) - - _, err := f.Instance( - core.CollectionSchemaVersionKey{}, - events.EmptyUpdateChannel, - client.LWW_REGISTER, - core.MustNewDataStoreKey("/1/0/MyKey"), - "", - ) - assert.Equal(t, err, ErrFactoryTypeNoExist) -} - -func TestBlankFactoryInstanceWithLWWRegister(t *testing.T) { - m := newStores() - f1 := NewFactory(nil) - f1.Register(client.LWW_REGISTER, &lwwFactoryFn) - f := f1.WithStores(m) - - crdt, err := f.Instance( - core.CollectionSchemaVersionKey{}, - events.EmptyUpdateChannel, - client.LWW_REGISTER, - core.MustNewDataStoreKey("/1/0/MyKey"), - "", - ) - assert.NoError(t, err) - - _, ok := crdt.(*MerkleLWWRegister) - assert.True(t, ok) -} - -func TestBlankFactoryInstanceWithCompositeRegister(t *testing.T) { - m := newStores() - f1 := NewFactory(nil) - f1.Register(client.COMPOSITE, &compFactoryFn) - f := f1.WithStores(m) - - crdt, err := f.Instance( - core.CollectionSchemaVersionKey{}, - events.EmptyUpdateChannel, - client.COMPOSITE, - core.MustNewDataStoreKey("/1/0/MyKey"), - "", - ) - assert.NoError(t, err) - - _, ok := crdt.(*MerkleCompositeDAG) - assert.True(t, ok) -} - -func TestFullFactoryInstanceLWWRegister(t *testing.T) { - m := newStores() - f := NewFactory(m) - f.Register(client.LWW_REGISTER, &lwwFactoryFn) - - crdt, err := f.Instance( - core.CollectionSchemaVersionKey{}, - events.EmptyUpdateChannel, - client.LWW_REGISTER, - core.MustNewDataStoreKey("/1/0/MyKey"), - "", - ) - assert.NoError(t, err) - - _, ok := crdt.(*MerkleLWWRegister) - assert.True(t, ok) -} - -func TestFullFactoryInstanceCompositeRegister(t *testing.T) { - m := newStores() - f := NewFactory(m) - f.Register(client.COMPOSITE, &compFactoryFn) - - crdt, err := f.Instance( - core.CollectionSchemaVersionKey{}, - events.EmptyUpdateChannel, - client.COMPOSITE, - core.MustNewDataStoreKey("/1/0/MyKey"), - "", - ) - assert.NoError(t, err) - - _, ok := crdt.(*MerkleCompositeDAG) - assert.True(t, ok) -} - -func TestLWWRegisterFactoryFn(t *testing.T) { - ctx := context.Background() - m := newStores() - f := NewFactory(m) // here factory is only needed to satisfy datastore.MultiStore interface - crdt := lwwFactoryFn(f, core.CollectionSchemaVersionKey{}, events.EmptyUpdateChannel, "")(core.MustNewDataStoreKey("/1/0/MyKey")) - - lwwreg, ok := crdt.(*MerkleLWWRegister) - assert.True(t, ok) - - _, _, err := lwwreg.Set(ctx, []byte("hi")) - assert.NoError(t, err) -} - -func TestCompositeRegisterFactoryFn(t *testing.T) { - ctx := context.Background() - m := newStores() - f := NewFactory(m) // here factory is only needed to satisfy datastore.MultiStore interface - crdt := compFactoryFn(f, core.CollectionSchemaVersionKey{}, events.EmptyUpdateChannel, "")(core.MustNewDataStoreKey("/1/0/MyKey")) - - merkleReg, ok := crdt.(*MerkleCompositeDAG) - assert.True(t, ok) - - _, _, err := merkleReg.Set(ctx, []byte("hi"), []core.DAGLink{}) - assert.NoError(t, err) -} diff --git a/merkle/crdt/lwwreg.go b/merkle/crdt/lwwreg.go index 796451c041..8b47492b26 100644 --- a/merkle/crdt/lwwreg.go +++ b/merkle/crdt/lwwreg.go @@ -8,55 +8,21 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package crdt +package merklecrdt import ( "context" ipld "github.com/ipfs/go-ipld-format" - "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/core" corecrdt "github.com/sourcenetwork/defradb/core/crdt" - "github.com/sourcenetwork/defradb/datastore" - "github.com/sourcenetwork/defradb/events" "github.com/sourcenetwork/defradb/merkle/clock" ) -var ( - lwwFactoryFn = MerkleCRDTFactory( - func( - mstore datastore.MultiStore, - schemaRoot core.CollectionSchemaVersionKey, - _ events.UpdateChannel, - fieldName string, - ) MerkleCRDTInitFn { - return func(key core.DataStoreKey) MerkleCRDT { - return NewMerkleLWWRegister( - mstore.Datastore(), - mstore.Headstore(), - mstore.DAGstore(), - schemaRoot, - core.DataStoreKey{}, - key, - fieldName, - ) - } - }, - ) -) - -func init() { - err := DefaultFactory.Register(client.LWW_REGISTER, &lwwFactoryFn) - if err != nil { - panic(err) - } -} - // MerkleLWWRegister is a MerkleCRDT implementation of the LWWRegister using MerkleClocks. type MerkleLWWRegister struct { *baseMerkleCRDT - // core.ReplicatedData reg corecrdt.LWWRegister } @@ -64,20 +30,14 @@ type MerkleLWWRegister struct { // NewMerkleLWWRegister creates a new instance (or loaded from DB) of a MerkleCRDT // backed by a LWWRegister CRDT. func NewMerkleLWWRegister( - datastore datastore.DSReaderWriter, - headstore datastore.DSReaderWriter, - dagstore datastore.DAGStore, + store Stores, schemaVersionKey core.CollectionSchemaVersionKey, - ns, key core.DataStoreKey, + key core.DataStoreKey, fieldName string, ) *MerkleLWWRegister { - register := corecrdt.NewLWWRegister(datastore, schemaVersionKey, key, fieldName /* stuff like namespace and ID */) - clk := clock.NewMerkleClock(headstore, dagstore, key.ToHeadStoreKey(), register) - - // newBaseMerkleCRDT(clock, register) + register := corecrdt.NewLWWRegister(store.Datastore(), schemaVersionKey, key, fieldName) + clk := clock.NewMerkleClock(store.Headstore(), store.DAGstore(), key.ToHeadStoreKey(), register) base := &baseMerkleCRDT{clock: clk, crdt: register} - // instantiate MerkleLWWRegister - // return return &MerkleLWWRegister{ baseMerkleCRDT: base, reg: register, @@ -89,17 +49,6 @@ func (mlwwreg *MerkleLWWRegister) Set(ctx context.Context, value []byte) (ipld.N // Set() call on underlying LWWRegister CRDT // persist/publish delta delta := mlwwreg.reg.Set(value) - nd, err := mlwwreg.Publish(ctx, delta) + nd, err := mlwwreg.clock.AddDAGNode(ctx, delta) return nd, delta.GetPriority(), err } - -// Value will retrieve the current value from the db. -func (mlwwreg *MerkleLWWRegister) Value(ctx context.Context) ([]byte, error) { - return mlwwreg.reg.Value(ctx) -} - -// Merge writes the provided delta to state using a supplied -// merge semantic. -func (mlwwreg *MerkleLWWRegister) Merge(ctx context.Context, other core.Delta) error { - return mlwwreg.reg.Merge(ctx, other) -} diff --git a/merkle/crdt/merklecrdt.go b/merkle/crdt/merklecrdt.go index 89e8d0eb2e..07fb83e436 100644 --- a/merkle/crdt/merklecrdt.go +++ b/merkle/crdt/merklecrdt.go @@ -11,15 +11,16 @@ /* Package crdt provides CRDT implementations leveraging MerkleClock. */ -package crdt +package merklecrdt import ( "context" ipld "github.com/ipfs/go-ipld-format" + "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/core" - "github.com/sourcenetwork/defradb/events" + "github.com/sourcenetwork/defradb/datastore" "github.com/sourcenetwork/defradb/logging" ) @@ -27,6 +28,12 @@ var ( log = logging.MustNewLogger("merklecrdt") ) +type Stores interface { + Datastore() datastore.DSReaderWriter + DAGstore() datastore.DAGStore + Headstore() datastore.DSReaderWriter +} + // MerkleCRDT is the implementation of a Merkle Clock along with a // CRDT payload. It implements the ReplicatedData interface // so it can be merged with any given semantics. @@ -35,18 +42,13 @@ type MerkleCRDT interface { Clock() core.MerkleClock } -var ( - // defaultMerkleCRDTs = make(map[Type]MerkleCRDTFactory) - _ core.ReplicatedData = (*baseMerkleCRDT)(nil) -) +var _ core.ReplicatedData = (*baseMerkleCRDT)(nil) // baseMerkleCRDT handles the MerkleCRDT overhead functions that aren't CRDT specific like the mutations and state // retrieval functions. It handles creating and publishing the CRDT DAG with the help of the MerkleClock. type baseMerkleCRDT struct { clock core.MerkleClock crdt core.ReplicatedData - - updateChannel events.UpdateChannel } func (base *baseMerkleCRDT) Clock() core.MerkleClock { @@ -65,19 +67,28 @@ func (base *baseMerkleCRDT) Value(ctx context.Context) ([]byte, error) { return base.crdt.Value(ctx) } -func (base *baseMerkleCRDT) ID() string { - return base.crdt.ID() -} - -// Publishes the delta to state. -func (base *baseMerkleCRDT) Publish( - ctx context.Context, - delta core.Delta, -) (ipld.Node, error) { - log.Debug(ctx, "Processing CRDT state", logging.NewKV("DocKey", base.crdt.ID())) - nd, err := base.clock.AddDAGNode(ctx, delta) - if err != nil { - return nil, err +func InstanceWithStore( + store Stores, + schemaVersionKey core.CollectionSchemaVersionKey, + ctype client.CType, + key core.DataStoreKey, + fieldName string, +) (MerkleCRDT, error) { + switch ctype { + case client.LWW_REGISTER: + return NewMerkleLWWRegister( + store, + schemaVersionKey, + key, + fieldName, + ), nil + case client.COMPOSITE: + return NewMerkleCompositeDAG( + store, + schemaVersionKey, + key, + fieldName, + ), nil } - return nd, nil + return nil, client.NewErrUnknownCRDT(ctype) } diff --git a/merkle/crdt/merklecrdt_test.go b/merkle/crdt/merklecrdt_test.go index 675fcfe38f..47537add09 100644 --- a/merkle/crdt/merklecrdt_test.go +++ b/merkle/crdt/merklecrdt_test.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package crdt +package merklecrdt import ( "context" @@ -45,7 +45,7 @@ func TestMerkleCRDTPublish(t *testing.T) { Data: []byte("test"), } - nd, err := bCRDT.Publish(ctx, delta) + nd, err := bCRDT.clock.AddDAGNode(ctx, delta) if err != nil { t.Error("Failed to publish delta to MerkleCRDT:", err) return diff --git a/net/process.go b/net/process.go index 85748090ff..3d776cc1c1 100644 --- a/net/process.go +++ b/net/process.go @@ -28,9 +28,8 @@ import ( "github.com/sourcenetwork/defradb/datastore" "github.com/sourcenetwork/defradb/db/base" "github.com/sourcenetwork/defradb/errors" - "github.com/sourcenetwork/defradb/events" "github.com/sourcenetwork/defradb/logging" - "github.com/sourcenetwork/defradb/merkle/crdt" + merklecrdt "github.com/sourcenetwork/defradb/merkle/crdt" ) type blockProcessor struct { @@ -123,11 +122,11 @@ func (bp *blockProcessor) processBlock(ctx context.Context, nd ipld.Node, field func initCRDTForType( ctx context.Context, - txn datastore.MultiStore, + txn datastore.Txn, col client.Collection, dsKey core.DataStoreKey, field string, -) (crdt.MerkleCRDT, error) { +) (merklecrdt.MerkleCRDT, error) { var key core.DataStoreKey var ctype client.CType description := col.Description() @@ -140,24 +139,31 @@ func initCRDTForType( ).WithFieldId( core.COMPOSITE_NAMESPACE, ) - } else { - fd, ok := col.Schema().GetField(field) - if !ok { - return nil, errors.New(fmt.Sprintf("Couldn't find field %s for doc %s", field, dsKey)) - } - ctype = fd.Typ - fieldID := fd.ID.String() - key = base.MakeCollectionKey(description).WithInstanceInfo(dsKey).WithFieldId(fieldID) + + log.Debug(ctx, "Got CRDT Type", logging.NewKV("CType", ctype), logging.NewKV("Field", field)) + return merklecrdt.NewMerkleCompositeDAG( + txn, + core.NewCollectionSchemaVersionKey(col.Schema().VersionID, col.ID()), + key, + field, + ), nil } + + fd, ok := col.Schema().GetField(field) + if !ok { + return nil, errors.New(fmt.Sprintf("Couldn't find field %s for doc %s", field, dsKey)) + } + ctype = fd.Typ + fieldID := fd.ID.String() + key = base.MakeCollectionKey(description).WithInstanceInfo(dsKey).WithFieldId(fieldID) + log.Debug(ctx, "Got CRDT Type", logging.NewKV("CType", ctype), logging.NewKV("Field", field)) - return crdt.DefaultFactory.InstanceWithStores( + return merklecrdt.NewMerkleLWWRegister( txn, core.NewCollectionSchemaVersionKey(col.Schema().VersionID, col.ID()), - events.EmptyUpdateChannel, - ctype, key, field, - ) + ), nil } func decodeBlockBuffer(buf []byte, cid cid.Cid) (ipld.Node, error) {