Skip to content

Commit

Permalink
refactor: Simplify Merkle CRDT workflow (#2111)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves #2110 
Possibly Resolves #917

## Description

This PR aims to simplify the CRDT packages ahead of the new CRDT types.
  • Loading branch information
fredcarle authored Dec 8, 2023
1 parent eda5b6b commit 48d0c24
Show file tree
Hide file tree
Showing 19 changed files with 146 additions and 735 deletions.
9 changes: 9 additions & 0 deletions client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
errMaxTxnRetries string = "reached maximum transaction reties"
errRelationOneSided string = "relation must be defined on both schemas"
errCollectionNotFound string = "collection not found"
errUnknownCRDT string = "unknown crdt"
)

// Errors returnable from this package.
Expand All @@ -47,6 +48,7 @@ var (
ErrMaxTxnRetries = errors.New(errMaxTxnRetries)
ErrRelationOneSided = errors.New(errRelationOneSided)
ErrCollectionNotFound = errors.New(errCollectionNotFound)
ErrUnknownCRDT = errors.New(errUnknownCRDT)
)

// NewErrFieldNotExist returns an error indicating that the given field does not exist.
Expand Down Expand Up @@ -123,3 +125,10 @@ func NewErrCollectionNotFoundForSchema(schemaRoot string) error {
errors.NewKV("SchemaRoot", schemaRoot),
)
}

func NewErrUnknownCRDT(cType CType) error {
return errors.New(
errUnknownCRDT,
errors.NewKV("Type", cType),
)
}
21 changes: 17 additions & 4 deletions core/crdt/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,26 @@ import (
type baseCRDT struct {
store datastore.DSReaderWriter
key core.DataStoreKey

// schemaVersionKey is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at the time of commit.
schemaVersionKey core.CollectionSchemaVersionKey

fieldName string
}

// @TODO paramaterize ns/suffix
func newBaseCRDT(store datastore.DSReaderWriter, key core.DataStoreKey) baseCRDT {
func newBaseCRDT(
store datastore.DSReaderWriter,
key core.DataStoreKey,
schemaVersionKey core.CollectionSchemaVersionKey,
fieldName string,
) baseCRDT {
return baseCRDT{
store: store,
key: key,
store: store,
key: key,
schemaVersionKey: schemaVersionKey,
fieldName: fieldName,
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/crdt/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ func newSeededDS() datastore.DSReaderWriter {
}

func exampleBaseCRDT() baseCRDT {
return newBaseCRDT(newSeededDS(), core.DataStoreKey{})
return newBaseCRDT(newSeededDS(), core.DataStoreKey{}, core.CollectionSchemaVersionKey{}, "")
}

func TestBaseCRDTNew(t *testing.T) {
base := newBaseCRDT(newDS(), core.DataStoreKey{})
base := newBaseCRDT(newDS(), core.DataStoreKey{}, core.CollectionSchemaVersionKey{}, "")
if base.store == nil {
t.Error("newBaseCRDT needs to init store")
}
Expand Down
35 changes: 7 additions & 28 deletions core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@ import (
"github.com/sourcenetwork/defradb/errors"
)

var (
_ core.ReplicatedData = (*CompositeDAG)(nil)
_ core.CompositeDelta = (*CompositeDAGDelta)(nil)
)

// CompositeDAGDelta represents a delta-state update made of sub-MerkleCRDTs.
type CompositeDAGDelta struct {
// SchemaVersionID is the schema version datastore key at the time of commit.
Expand All @@ -51,6 +46,8 @@ type CompositeDAGDelta struct {
FieldName string
}

var _ core.CompositeDelta = (*CompositeDAGDelta)(nil)

// GetPriority gets the current priority for this delta.
func (delta *CompositeDAGDelta) GetPriority() uint64 {
return delta.Priority
Expand Down Expand Up @@ -92,39 +89,21 @@ func (delta *CompositeDAGDelta) Links() []core.DAGLink {

// CompositeDAG is a CRDT structure that is used to track a collection of sub MerkleCRDTs.
type CompositeDAG struct {
store datastore.DSReaderWriter
key core.DataStoreKey
// schemaVersionKey is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at time of commit.
schemaVersionKey core.CollectionSchemaVersionKey

fieldName string
baseCRDT
}

var _ core.ReplicatedData = CompositeDAG{}
var _ core.ReplicatedData = (*CompositeDAG)(nil)

func NewCompositeDAG(
store datastore.DSReaderWriter,
schemaVersionKey core.CollectionSchemaVersionKey,
namespace core.Key,
key core.DataStoreKey,
fieldName string,
) CompositeDAG {
return CompositeDAG{
store: store,
key: key,
schemaVersionKey: schemaVersionKey,
fieldName: fieldName,
}
}

// ID returns the schema ID of the composite DAG CRDT.
func (c CompositeDAG) ID() string {
return c.key.ToString()
return CompositeDAG{newBaseCRDT(store, key, schemaVersionKey, fieldName)}
}

// Value returns the schema ID of the composite DAG CRDT.
// Value is a no-op for a CompositeDAG.
func (c CompositeDAG) Value(ctx context.Context) ([]byte, error) {
return nil, nil
}
Expand Down Expand Up @@ -226,7 +205,7 @@ func (c CompositeDAG) deleteWithPrefix(ctx context.Context, key core.DataStoreKe
}

// DeltaDecode is a typed helper to extract.
// a LWWRegDelta from a ipld.Node
// a CompositeDAGDelta from a ipld.Node
// for now let's do cbor (quick to implement)
func (c CompositeDAG) DeltaDecode(node ipld.Node) (core.Delta, error) {
delta := &CompositeDAGDelta{}
Expand Down
40 changes: 5 additions & 35 deletions core/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@ import (
"github.com/sourcenetwork/defradb/errors"
)

var (
// ensure types implements core interfaces
_ core.ReplicatedData = (*LWWRegister)(nil)
_ core.Delta = (*LWWRegDelta)(nil)
)

// LWWRegDelta is a single delta operation for an LWWRegister
// @todo: Expand delta metadata (investigate if needed)
type LWWRegDelta struct {
Expand All @@ -42,6 +36,8 @@ type LWWRegDelta struct {
FieldName string
}

var _ core.Delta = (*LWWRegDelta)(nil)

// GetPriority gets the current priority for this delta.
func (delta *LWWRegDelta) GetPriority() uint64 {
return delta.Priority
Expand Down Expand Up @@ -79,31 +75,18 @@ func (delta *LWWRegDelta) Value() any {
// of an arbitrary data type that ensures convergence.
type LWWRegister struct {
baseCRDT

// schemaVersionKey is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at time of commit.
schemaVersionKey core.CollectionSchemaVersionKey

fieldName string
}

var _ core.ReplicatedData = (*LWWRegister)(nil)

// NewLWWRegister returns a new instance of the LWWReg with the given ID.
func NewLWWRegister(
store datastore.DSReaderWriter,
schemaVersionKey core.CollectionSchemaVersionKey,
key core.DataStoreKey,
fieldName string,
) LWWRegister {
return LWWRegister{
baseCRDT: newBaseCRDT(store, key),
schemaVersionKey: schemaVersionKey,
fieldName: fieldName,
// id: id,
// data: data,
// ts: ts,
// clock: clock,
}
return LWWRegister{newBaseCRDT(store, key, schemaVersionKey, fieldName)}
}

// Value gets the current register value
Expand All @@ -120,7 +103,6 @@ func (reg LWWRegister) Value(ctx context.Context) ([]byte, error) {
// Set generates a new delta with the supplied value
// RETURN DELTA
func (reg LWWRegister) Set(value []byte) *LWWRegDelta {
// return NewLWWRegister(reg.id, value, reg.clock.Apply(), reg.clock)
return &LWWRegDelta{
Data: value,
DocKey: []byte(reg.key.DocKey),
Expand All @@ -129,18 +111,6 @@ func (reg LWWRegister) Set(value []byte) *LWWRegDelta {
}
}

func (reg LWWRegister) ID() string {
return reg.key.ToString()
}

// RETURN DELTA
// func (reg LWWRegister) setWithClock(value []byte, clock Clock) LWWRegDelta {
// // return NewLWWRegister(reg.id, value, clock.Apply(), clock)
// return LWWRegDelta{
// data: value,
// }
// }

// Merge implements ReplicatedData interface
// Merge two LWWRegisty based on the order of the timestamp (ts),
// if they are equal, compare IDs
Expand Down
6 changes: 0 additions & 6 deletions core/replicated.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
// ReplicatedData is a data type that allows concurrent writers to deterministically merge other
// replicated data so as to converge on the same state.
type ReplicatedData interface {
ID() string
Merge(ctx context.Context, other Delta) error
DeltaDecode(node ipld.Node) (Delta, error) // possibly rename to just Decode
Value(ctx context.Context) ([]byte, error)
Expand All @@ -31,8 +30,3 @@ type PersistedReplicatedData interface {
ReplicatedData
Publish(Delta) (cid.Cid, error)
}

// type EmbedableReplicatedData interface {
// ReplicatedData
// Apply(Operation) error
// }
Loading

0 comments on commit 48d0c24

Please sign in to comment.