Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Breakup core/keys.go file #3198

Merged
merged 3 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/core/crdt/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (

"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/keys"
)

func setPriority(
ctx context.Context,
store datastore.DSReaderWriter,
key core.DataStoreKey,
key keys.DataStoreKey,
priority uint64,
) error {
prioK := key.WithPriorityFlag()
Expand All @@ -41,7 +41,7 @@ func setPriority(
func getPriority(
ctx context.Context,
store datastore.DSReaderWriter,
key core.DataStoreKey,
key keys.DataStoreKey,
) (uint64, error) {
pKey := key.WithPriorityFlag()
pbuf, err := store.Get(ctx, pKey.ToDS())
Expand Down
10 changes: 5 additions & 5 deletions internal/core/crdt/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,22 @@ import (
ds "github.com/ipfs/go-datastore"

"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/keys"
)

func newDS() datastore.DSReaderWriter {
return datastore.AsDSReaderWriter(ds.NewMapDatastore())
}

func TestBaseCRDTvalueKey(t *testing.T) {
vk := core.DataStoreKey{}.WithDocID("mykey").WithValueFlag()
vk := keys.DataStoreKey{}.WithDocID("mykey").WithValueFlag()
if vk.ToString() != "/v/mykey" {
t.Errorf("Incorrect valueKey. Have %v, want %v", vk.ToString(), "/v/mykey")
}
}

func TestBaseCRDTprioryKey(t *testing.T) {
pk := core.DataStoreKey{}.WithDocID("mykey").WithPriorityFlag()
pk := keys.DataStoreKey{}.WithDocID("mykey").WithPriorityFlag()
if pk.ToString() != "/p/mykey" {
t.Errorf("Incorrect priorityKey. Have %v, want %v", pk.ToString(), "/p/mykey")
}
Expand All @@ -42,13 +42,13 @@ func TestBaseCRDTSetGetPriority(t *testing.T) {
store := newDS()

ctx := context.Background()
err := setPriority(ctx, store, core.DataStoreKey{}.WithDocID("mykey"), 10)
err := setPriority(ctx, store, keys.DataStoreKey{}.WithDocID("mykey"), 10)
if err != nil {
t.Errorf("baseCRDT failed to set Priority. err: %v", err)
return
}

priority, err := getPriority(ctx, store, core.DataStoreKey{}.WithDocID("mykey"))
priority, err := getPriority(ctx, store, keys.DataStoreKey{}.WithDocID("mykey"))
if err != nil {
t.Errorf("baseCRDT failed to get priority. err: %v", err)
return
Expand Down
17 changes: 9 additions & 8 deletions internal/core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/db/base"
"github.com/sourcenetwork/defradb/internal/keys"
)

// CompositeDAGDelta represents a delta-state update made of sub-MerkleCRDTs.
Expand Down Expand Up @@ -77,20 +78,20 @@ func (delta *CompositeDAGDelta) SetPriority(prio uint64) {
// CompositeDAG is a CRDT structure that is used to track a collection of sub MerkleCRDTs.
type CompositeDAG struct {
store datastore.DSReaderWriter
key core.DataStoreKey
key keys.DataStoreKey

// schemaVersionKey is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at the time of commit.
schemaVersionKey core.CollectionSchemaVersionKey
schemaVersionKey keys.CollectionSchemaVersionKey
}

var _ core.ReplicatedData = (*CompositeDAG)(nil)

func NewCompositeDAG(
store datastore.DSReaderWriter,
schemaVersionKey core.CollectionSchemaVersionKey,
key core.DataStoreKey,
schemaVersionKey keys.CollectionSchemaVersionKey,
key keys.DataStoreKey,
) CompositeDAG {
return CompositeDAG{
store: store,
Expand Down Expand Up @@ -125,7 +126,7 @@ func (c CompositeDAG) Merge(ctx context.Context, delta core.Delta) error {
// We cannot rely on the dagDelta.Status here as it may have been deleted locally, this is not
// reflected in `dagDelta.Status` if sourced via P2P. Updates synced via P2P should not undelete
// the local representation of the document.
versionKey := c.key.WithValueFlag().WithFieldID(core.DATASTORE_DOC_VERSION_FIELD_ID)
versionKey := c.key.WithValueFlag().WithFieldID(keys.DATASTORE_DOC_VERSION_FIELD_ID)
objectMarker, err := c.store.Get(ctx, c.key.ToPrimaryDataStoreKey().ToDS())
hasObjectMarker := !errors.Is(err, ds.ErrNotFound)
if err != nil && hasObjectMarker {
Expand Down Expand Up @@ -159,7 +160,7 @@ func (c CompositeDAG) Merge(ctx context.Context, delta core.Delta) error {
return nil
}

func (c CompositeDAG) deleteWithPrefix(ctx context.Context, key core.DataStoreKey) error {
func (c CompositeDAG) deleteWithPrefix(ctx context.Context, key keys.DataStoreKey) error {
q := query.Query{
Prefix: key.ToString(),
}
Expand All @@ -168,12 +169,12 @@ func (c CompositeDAG) deleteWithPrefix(ctx context.Context, key core.DataStoreKe
if e.Error != nil {
return err
}
dsKey, err := core.NewDataStoreKey(e.Key)
dsKey, err := keys.NewDataStoreKey(e.Key)
if err != nil {
return err
}

if dsKey.InstanceType == core.ValueKey {
if dsKey.InstanceType == keys.ValueKey {
err = c.store.Put(ctx, dsKey.WithDeletedFlag().ToDS(), e.Value)
if err != nil {
return err
Expand Down
13 changes: 7 additions & 6 deletions internal/core/crdt/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/db/base"
"github.com/sourcenetwork/defradb/internal/keys"
)

type Incrementable interface {
Expand Down Expand Up @@ -78,12 +79,12 @@ func (delta *CounterDelta) SetPriority(prio uint64) {
// of an Int and Float data types that ensures convergence.
type Counter struct {
store datastore.DSReaderWriter
key core.DataStoreKey
key keys.DataStoreKey

// schemaVersionKey is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at the time of commit.
schemaVersionKey core.CollectionSchemaVersionKey
schemaVersionKey keys.CollectionSchemaVersionKey

// fieldName holds the name of the field hosting this CRDT, if this is a field level
// commit.
Expand All @@ -98,8 +99,8 @@ var _ core.ReplicatedData = (*Counter)(nil)
// NewCounter returns a new instance of the Counter with the given ID.
func NewCounter(
store datastore.DSReaderWriter,
schemaVersionKey core.CollectionSchemaVersionKey,
key core.DataStoreKey,
schemaVersionKey keys.CollectionSchemaVersionKey,
key keys.DataStoreKey,
fieldName string,
allowDecrement bool,
kind client.ScalarKind,
Expand Down Expand Up @@ -205,7 +206,7 @@ func (c Counter) CType() client.CType {
func validateAndIncrement[T Incrementable](
ctx context.Context,
store datastore.DSReaderWriter,
key core.DataStoreKey,
key keys.DataStoreKey,
valueAsBytes []byte,
allowDecrement bool,
) ([]byte, error) {
Expand All @@ -230,7 +231,7 @@ func validateAndIncrement[T Incrementable](
func getCurrentValue[T Incrementable](
ctx context.Context,
store datastore.DSReaderWriter,
key core.DataStoreKey,
key keys.DataStoreKey,
) (T, error) {
curValue, err := store.Get(ctx, key.ToDS())
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions internal/core/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/db/base"
"github.com/sourcenetwork/defradb/internal/keys"
)

// LWWRegDelta is a single delta operation for an LWWRegister
Expand Down Expand Up @@ -66,12 +67,12 @@ func (delta *LWWRegDelta) SetPriority(prio uint64) {
// of an arbitrary data type that ensures convergence.
type LWWRegister struct {
store datastore.DSReaderWriter
key core.DataStoreKey
key keys.DataStoreKey

// schemaVersionKey is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at the time of commit.
schemaVersionKey core.CollectionSchemaVersionKey
schemaVersionKey keys.CollectionSchemaVersionKey

// fieldName holds the name of the field hosting this CRDT, if this is a field level
// commit.
Expand All @@ -83,8 +84,8 @@ var _ core.ReplicatedData = (*LWWRegister)(nil)
// NewLWWRegister returns a new instance of the LWWReg with the given ID.
func NewLWWRegister(
store datastore.DSReaderWriter,
schemaVersionKey core.CollectionSchemaVersionKey,
key core.DataStoreKey,
schemaVersionKey keys.CollectionSchemaVersionKey,
key keys.DataStoreKey,
fieldName string,
) LWWRegister {
return LWWRegister{
Expand Down
5 changes: 3 additions & 2 deletions internal/core/crdt/lwwreg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/keys"
)

func newMockStore() datastore.DSReaderWriter {
Expand All @@ -26,8 +27,8 @@ func newMockStore() datastore.DSReaderWriter {

func setupLWWRegister() LWWRegister {
store := newMockStore()
key := core.DataStoreKey{DocID: "AAAA-BBBB"}
return NewLWWRegister(store, core.CollectionSchemaVersionKey{}, key, "")
key := keys.DataStoreKey{DocID: "AAAA-BBBB"}
return NewLWWRegister(store, keys.CollectionSchemaVersionKey{}, key, "")
}

func TestLWWRegisterAddDelta(t *testing.T) {
Expand Down
29 changes: 13 additions & 16 deletions internal/core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,44 @@

package core

import "strings"
import (
"strings"

"github.com/sourcenetwork/defradb/internal/keys"
)

// Span is a range of keys from [Start, End).
type Span interface {
// Start returns the starting key of the Span.
Start() DataStoreKey
Start() keys.DataStoreKey
// End returns the ending key of the Span.
End() DataStoreKey
End() keys.DataStoreKey
// Compare returns -1 if the provided span is less, 0 if it is equal, and 1 if its greater.
Compare(Span) SpanComparisonResult
}

type span struct {
start DataStoreKey
end DataStoreKey
start keys.DataStoreKey
end keys.DataStoreKey
}

var _ Span = span{}

// NewSpan creates a new Span from the provided start and end keys.
func NewSpan(start, end DataStoreKey) Span {
func NewSpan(start, end keys.DataStoreKey) Span {
return span{
start: start,
end: end,
}
}

// Start returns the starting key of the Span.
func (s span) Start() DataStoreKey {
func (s span) Start() keys.DataStoreKey {
return s.start
}

// End returns the ending key of the Span.
func (s span) End() DataStoreKey {
func (s span) End() keys.DataStoreKey {
return s.end
}

Expand Down Expand Up @@ -136,7 +140,7 @@ func (this span) Compare(other Span) SpanComparisonResult {
return After
}

func isAdjacent(this DataStoreKey, other DataStoreKey) bool {
func isAdjacent(this keys.DataStoreKey, other keys.DataStoreKey) bool {
return len(this.ToString()) == len(other.ToString()) &&
(this.PrefixEnd().ToString() == other.ToString() ||
this.ToString() == other.PrefixEnd().ToString())
Expand All @@ -156,13 +160,6 @@ func NewSpans(spans ...Span) Spans {
}
}

// HeadKeyValue is a KV store response containing the resulting core.HeadStoreKey
// and byte array value.
type HeadKeyValue struct {
Key HeadStoreKey
Value []byte
}

// Merges an unordered, potentially overlapping and/or duplicated collection of Spans into
// a unique set in ascending order, where overlapping spans are merged into a single span.
// Will handle spans with keys of different lengths, where one might be a prefix of another.
Expand Down
Loading
Loading