Skip to content

Commit

Permalink
feat: Add schema version id to commit queries (#1061)
Browse files Browse the repository at this point in the history
* Remove deadcode

Spotted whilst looking for a type, this variable is no longer used and can be removed

* Break up long line

Will be added to shortly

* Persist schemaVersionId on field commits

* Add SchemaVersionId to commit query
  • Loading branch information
AndrewSisley authored Jan 31, 2023
1 parent c9a155a commit a4c6869
Show file tree
Hide file tree
Showing 32 changed files with 275 additions and 160 deletions.
12 changes: 7 additions & 5 deletions client/request/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ const (
LatestCommitsName = "latestCommits"
CommitsName = "commits"

CommitTypeName = "Commit"
LinksFieldName = "links"
HeightFieldName = "height"
CidFieldName = "cid"
DeltaFieldName = "delta"
CommitTypeName = "Commit"
LinksFieldName = "links"
HeightFieldName = "height"
CidFieldName = "cid"
SchemaVersionIDFieldName = "schemaVersionId"
DeltaFieldName = "delta"

LinksNameFieldName = "name"
LinksCidFieldName = "cid"
Expand Down Expand Up @@ -85,6 +86,7 @@ var (
VersionFields = []string{
HeightFieldName,
CidFieldName,
SchemaVersionIDFieldName,
DeltaFieldName,
}

Expand Down
35 changes: 24 additions & 11 deletions core/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ var (
// LWWRegDelta is a single delta operation for an LWWRegister
// @todo: Expand delta metadata (investigate if needed)
type LWWRegDelta struct {
Priority uint64
Data []byte
DocKey []byte
SchemaVersionID string
Priority uint64
Data []byte
DocKey []byte
}

// GetPriority gets the current priority for this delta.
Expand All @@ -56,10 +57,11 @@ func (delta *LWWRegDelta) Marshal() ([]byte, error) {
buf := bytes.NewBuffer(nil)
enc := codec.NewEncoder(buf, h)
err := enc.Encode(struct {
Priority uint64
Data []byte
DocKey []byte
}{delta.Priority, delta.Data, delta.DocKey})
SchemaVersionID string
Priority uint64
Data []byte
DocKey []byte
}{delta.SchemaVersionID, delta.Priority, delta.Data, delta.DocKey})
if err != nil {
return nil, err
}
Expand All @@ -74,12 +76,22 @@ func (delta *LWWRegDelta) Value() any {
// of an arbitrary data type that ensures convergence.
type LWWRegister struct {
baseCRDT

// schemaVersionKey is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at time of commit.
schemaVersionKey core.CollectionSchemaVersionKey
}

// NewLWWRegister returns a new instance of the LWWReg with the given ID.
func NewLWWRegister(store datastore.DSReaderWriter, key core.DataStoreKey) LWWRegister {
func NewLWWRegister(
store datastore.DSReaderWriter,
schemaVersionKey core.CollectionSchemaVersionKey,
key core.DataStoreKey,
) LWWRegister {
return LWWRegister{
baseCRDT: newBaseCRDT(store, key),
baseCRDT: newBaseCRDT(store, key),
schemaVersionKey: schemaVersionKey,
// id: id,
// data: data,
// ts: ts,
Expand All @@ -105,8 +117,9 @@ func (reg LWWRegister) Value(ctx context.Context) ([]byte, error) {
func (reg LWWRegister) Set(value []byte) *LWWRegDelta {
// return NewLWWRegister(reg.id, value, reg.clock.Apply(), reg.clock)
return &LWWRegDelta{
Data: value,
DocKey: reg.key.Bytes(),
Data: value,
DocKey: reg.key.Bytes(),
SchemaVersionID: reg.schemaVersionKey.SchemaVersionId,
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/crdt/lwwreg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func newMockStore() datastore.DSReaderWriter {
func setupLWWRegister() LWWRegister {
store := newMockStore()
key := core.DataStoreKey{DocKey: "AAAA-BBBB"}
return NewLWWRegister(store, key)
return NewLWWRegister(store, core.CollectionSchemaVersionKey{}, key)
}

func setupLoadedLWWRegster(ctx context.Context) LWWRegister {
Expand Down
3 changes: 3 additions & 0 deletions docs/data_format_changes/i1006-commits-schema-v-id.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Add schema version id to commit queries

The field commit block gained a new schema version id property and this caused the cids to change.
4 changes: 2 additions & 2 deletions merkle/clock/clock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ func newTestMerkleClock() *MerkleClock {

rw := datastore.AsDSReaderWriter(s)
multistore := datastore.MultiStoreFrom(rw)
reg := crdt.NewLWWRegister(rw, core.DataStoreKey{})
reg := crdt.NewLWWRegister(rw, core.CollectionSchemaVersionKey{}, core.DataStoreKey{})
return NewMerkleClock(multistore.Headstore(), multistore.DAGstore(), core.HeadStoreKey{DocKey: "dockey", FieldId: "1"}, reg).(*MerkleClock)
}

func TestNewMerkleClock(t *testing.T) {
s := newDS()
rw := datastore.AsDSReaderWriter(s)
multistore := datastore.MultiStoreFrom(rw)
reg := crdt.NewLWWRegister(rw, core.DataStoreKey{})
reg := crdt.NewLWWRegister(rw, core.CollectionSchemaVersionKey{}, core.DataStoreKey{})
clk := NewMerkleClock(multistore.Headstore(), multistore.DAGstore(), core.HeadStoreKey{}, reg).(*MerkleClock)

if clk.headstore != multistore.Headstore() {
Expand Down
6 changes: 4 additions & 2 deletions merkle/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ import (

var (
lwwFactoryFn = MerkleCRDTFactory(
func(mstore datastore.MultiStore, _ core.CollectionSchemaVersionKey, _ events.UpdateChannel) MerkleCRDTInitFn {
func(mstore datastore.MultiStore, schemaID core.CollectionSchemaVersionKey, _ events.UpdateChannel) MerkleCRDTInitFn {
return func(key core.DataStoreKey) MerkleCRDT {
return NewMerkleLWWRegister(
mstore.Datastore(),
mstore.Headstore(),
mstore.DAGstore(),
schemaID,
core.DataStoreKey{},
key,
)
Expand Down Expand Up @@ -60,9 +61,10 @@ func NewMerkleLWWRegister(
datastore datastore.DSReaderWriter,
headstore datastore.DSReaderWriter,
dagstore datastore.DAGStore,
schemaVersionKey core.CollectionSchemaVersionKey,
ns, key core.DataStoreKey,
) *MerkleLWWRegister {
register := corecrdt.NewLWWRegister(datastore, key /* stuff like namespace and ID */)
register := corecrdt.NewLWWRegister(datastore, schemaVersionKey, key /* stuff like namespace and ID */)
clk := clock.NewMerkleClock(headstore, dagstore, key.ToHeadStoreKey(), register)

// newBaseMerkleCRDT(clock, register)
Expand Down
2 changes: 1 addition & 1 deletion merkle/crdt/merklecrdt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func newTestBaseMerkleCRDT() (*baseMerkleCRDT, datastore.DSReaderWriter) {
rw := datastore.AsDSReaderWriter(s)
multistore := datastore.MultiStoreFrom(rw)

reg := corecrdt.NewLWWRegister(multistore.Datastore(), core.DataStoreKey{})
reg := corecrdt.NewLWWRegister(multistore.Datastore(), core.CollectionSchemaVersionKey{}, core.DataStoreKey{})
clk := clock.NewMerkleClock(multistore.Headstore(), multistore.DAGstore(), core.HeadStoreKey{}, reg)
return &baseMerkleCRDT{clock: clk, crdt: reg}, rw
}
Expand Down
5 changes: 5 additions & 0 deletions planner/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,11 @@ func (n *dagScanNode) dagBlockToNodeDoc(block blocks.Block) (core.Doc, []*ipld.L
return core.Doc{}, nil, ErrDeltaMissingPriority
}

schemaVersionId, ok := delta["SchemaVersionID"].(string)
if ok {
n.parsed.DocumentMapping.SetFirstOfName(&commit, "schemaVersionId", schemaVersionId)
}

n.parsed.DocumentMapping.SetFirstOfName(&commit, "height", int64(prio))
n.parsed.DocumentMapping.SetFirstOfName(&commit, "delta", delta["Data"])

Expand Down
3 changes: 3 additions & 0 deletions planner/mapper/commitSelect.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type CommitSelect struct {

// The parent Cid for which commit information has been requested.
Cid immutable.Option[string]

// The SchemaVersionID at the time of commit.
SchemaVersionID immutable.Option[string]
}

func (s *CommitSelect) CloneTo(index int) Requestable {
Expand Down
11 changes: 0 additions & 11 deletions request/graphql/schema/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,6 @@ import (
schemaTypes "github.com/sourcenetwork/defradb/request/graphql/schema/types"
)

var (
QueryLatestCommits = &gql.Field{
Name: "latestCommits",
Type: gql.NewList(schemaTypes.CommitObject),
Args: gql.FieldConfigArgument{
"dockey": schemaTypes.NewArgConfig(gql.NewNonNull(gql.ID)),
"field": schemaTypes.NewArgConfig(gql.String),
},
}
)

// SchemaManager creates an instanced management point
// for schema intake/outtake, and updates.
type SchemaManager struct {
Expand Down
4 changes: 4 additions & 0 deletions request/graphql/schema/types/commits.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var (
// type Commit {
// Height: Int
// CID: String
// SchemaVersionID: String
// Delta: String
// Previous: [Commit]
// Links: [Commit]
Expand All @@ -45,6 +46,9 @@ var (
"cid": &gql.Field{
Type: gql.String,
},
"schemaVersionId": &gql.Field{
Type: gql.String,
},
"delta": &gql.Field{
Type: gql.String,
},
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/events/simple/with_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ func TestEventsSimpleWithUpdate(t *testing.T) {
ExpectedUpdates: []testUtils.ExpectedUpdate{
{
DocKey: immutable.Some(docKey1),
Cid: immutable.Some("bafybeidmgh5m52lo5ht5r6emiisdwf46s2sfrlx47fy5guz76ueh3znxze"),
Cid: immutable.Some("bafybeigpig5csogxswqwdkawjprfcqqumvkra43rwoebh2ugvx7hns3d7e"),
},
{
DocKey: immutable.Some(docKey2),
},
{
DocKey: immutable.Some(docKey1),
Cid: immutable.Some("bafybeibsvh46szrpx67z4rohjylcjb2p6mlhulv2yfp5a4wrz7xzwaolgq"),
Cid: immutable.Some("bafybeid74gzm5rpglto6yviav4gcl5dfoijttl2oj3dvlofsxwqfxek7eu"),
},
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestMutationCreateSimpleReturnVersionCID(t *testing.T) {
{
"_version": []map[string]any{
{
"cid": "bafybeidfn4vxabyimc4xqq7ipkr4pbr65agr6f6vt2nmjwt72zd425pc6e",
"cid": "bafybeidzfrcnjm35ftztl44rskyeroysm3alwluirks5zabx6we55ysqcy",
},
},
},
Expand Down
54 changes: 45 additions & 9 deletions tests/integration/query/commits/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ func TestQueryCommits(t *testing.T) {
},
Results: []map[string]any{
{
"cid": "bafybeidst2mzxhdoh4ayjdjoh4vibo7vwnuoxk3xgyk5mzmep55jklni2a",
"cid": "bafybeigju7dgicfq3fxvtlxtjao7won4xc7kusykkvumngjfx5i2c7ibny",
},
{
"cid": "bafybeihhypcsqt7blkrqtcmpl43eo3yunrog5pchox5naji6hisdme4swm",
"cid": "bafybeiaqarrcayyoly2gdiam6mhh72ls4azwa7brozxxc3q2srnggkkqkq",
},
{
"cid": "bafybeid2b6a5vbqzxyxrzvwvkakqlzgcdpcdpkpmufthy4hnasu4zcyzua",
"cid": "bafybeid5l577igkgcn6wjqjeqxlta4dcc3a3iykwkborf4fklaenjuctoq",
},
},
}
Expand Down Expand Up @@ -70,22 +70,58 @@ func TestQueryCommitsMultipleDocs(t *testing.T) {
},
Results: []map[string]any{
{
"cid": "bafybeidsa74vl7xvw6tzgt5gmux5ts7lxldxculzgpxl5xura45ckf7e5i",
"cid": "bafybeibmprk2bxsv2nj2sf5ofmu7yuqe7dz2dze546nxkzwwylxyzpruoy",
},
{
"cid": "bafybeibek4lmrb5gtmahgsv33njmk3efty53n7z2rac7fuup7mwpho5zqa",
"cid": "bafybeidbb4dv2smuzmeodcrbt2dk6loqj7i3a6fofl32ejbx2gtinxguye",
},
{
"cid": "bafybeibnwzoekenlil5sdltgmkmfngoifd5uwcpgzzry7rokrd5qurgdve",
"cid": "bafybeifmifbksnwuwxhkwjqdojbddw2274f7wzd4jamllaoud3llunm5xu",
},
{
"cid": "bafybeidst2mzxhdoh4ayjdjoh4vibo7vwnuoxk3xgyk5mzmep55jklni2a",
"cid": "bafybeigju7dgicfq3fxvtlxtjao7won4xc7kusykkvumngjfx5i2c7ibny",
},
{
"cid": "bafybeihhypcsqt7blkrqtcmpl43eo3yunrog5pchox5naji6hisdme4swm",
"cid": "bafybeiaqarrcayyoly2gdiam6mhh72ls4azwa7brozxxc3q2srnggkkqkq",
},
{
"cid": "bafybeid2b6a5vbqzxyxrzvwvkakqlzgcdpcdpkpmufthy4hnasu4zcyzua",
"cid": "bafybeid5l577igkgcn6wjqjeqxlta4dcc3a3iykwkborf4fklaenjuctoq",
},
},
}

executeTestCase(t, test)
}

func TestQueryCommitsWithSchemaVersionIdField(t *testing.T) {
test := testUtils.RequestTestCase{
Description: "Simple commits query yielding schemaVersionId",
Request: `query {
commits {
cid
schemaVersionId
}
}`,
Docs: map[int][]string{
0: {
`{
"Name": "John",
"Age": 21
}`,
},
},
Results: []map[string]any{
{
"cid": "bafybeigju7dgicfq3fxvtlxtjao7won4xc7kusykkvumngjfx5i2c7ibny",
"schemaVersionId": "bafkreihaqmvbjvm2q4iwkjnuafavvsakiaztlqnridiybxystfm27uwlde",
},
{
"cid": "bafybeiaqarrcayyoly2gdiam6mhh72ls4azwa7brozxxc3q2srnggkkqkq",
"schemaVersionId": "bafkreihaqmvbjvm2q4iwkjnuafavvsakiaztlqnridiybxystfm27uwlde",
},
{
"cid": "bafybeid5l577igkgcn6wjqjeqxlta4dcc3a3iykwkborf4fklaenjuctoq",
"schemaVersionId": "bafkreihaqmvbjvm2q4iwkjnuafavvsakiaztlqnridiybxystfm27uwlde",
},
},
}
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/query/commits/with_cid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestQueryCommitsWithCid(t *testing.T) {
Description: "Simple all commits query with cid",
Request: `query {
commits(
cid: "bafybeigz4lfwqqunimseeok4w222e2vsje6dr53gpw3mtk7muuxkja3oiq"
cid: "bafybeid5l577igkgcn6wjqjeqxlta4dcc3a3iykwkborf4fklaenjuctoq"
) {
cid
}
Expand All @@ -45,7 +45,7 @@ func TestQueryCommitsWithCid(t *testing.T) {
},
Results: []map[string]any{
{
"cid": "bafybeigz4lfwqqunimseeok4w222e2vsje6dr53gpw3mtk7muuxkja3oiq",
"cid": "bafybeid5l577igkgcn6wjqjeqxlta4dcc3a3iykwkborf4fklaenjuctoq",
},
},
}
Expand All @@ -59,7 +59,7 @@ func TestQueryCommitsWithCidForFieldCommit(t *testing.T) {
Description: "Simple all commits query with cid",
Request: `query {
commits(
cid: "bafybeidst2mzxhdoh4ayjdjoh4vibo7vwnuoxk3xgyk5mzmep55jklni2a"
cid: "bafybeigju7dgicfq3fxvtlxtjao7won4xc7kusykkvumngjfx5i2c7ibny"
) {
cid
}
Expand All @@ -74,7 +74,7 @@ func TestQueryCommitsWithCidForFieldCommit(t *testing.T) {
},
Results: []map[string]any{
{
"cid": "bafybeidst2mzxhdoh4ayjdjoh4vibo7vwnuoxk3xgyk5mzmep55jklni2a",
"cid": "bafybeigju7dgicfq3fxvtlxtjao7won4xc7kusykkvumngjfx5i2c7ibny",
},
},
}
Expand Down
Loading

0 comments on commit a4c6869

Please sign in to comment.