Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add schema version id to commit queries #1061

Merged
merged 4 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions client/request/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ const (
LatestCommitsName = "latestCommits"
CommitsName = "commits"

CommitTypeName = "Commit"
LinksFieldName = "links"
HeightFieldName = "height"
CidFieldName = "cid"
DeltaFieldName = "delta"
CommitTypeName = "Commit"
LinksFieldName = "links"
HeightFieldName = "height"
CidFieldName = "cid"
SchemaVersionIDFieldName = "schemaVersionId"
DeltaFieldName = "delta"

LinksNameFieldName = "name"
LinksCidFieldName = "cid"
Expand Down Expand Up @@ -85,6 +86,7 @@ var (
VersionFields = []string{
HeightFieldName,
CidFieldName,
SchemaVersionIDFieldName,
DeltaFieldName,
}

Expand Down
35 changes: 24 additions & 11 deletions core/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ var (
// LWWRegDelta is a single delta operation for an LWWRegister
// @todo: Expand delta metadata (investigate if needed)
type LWWRegDelta struct {
Priority uint64
Data []byte
DocKey []byte
SchemaVersionID string
Priority uint64
Data []byte
DocKey []byte
}

// GetPriority gets the current priority for this delta.
Expand All @@ -56,10 +57,11 @@ func (delta *LWWRegDelta) Marshal() ([]byte, error) {
buf := bytes.NewBuffer(nil)
enc := codec.NewEncoder(buf, h)
err := enc.Encode(struct {
Priority uint64
Data []byte
DocKey []byte
}{delta.Priority, delta.Data, delta.DocKey})
SchemaVersionID string
Priority uint64
Data []byte
DocKey []byte
}{delta.SchemaVersionID, delta.Priority, delta.Data, delta.DocKey})
Comment on lines +60 to +64
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Since the struct represents the LWWRegDelta struct, you could probably just use

LWWRegDelta{delta.SchemaVersionID, delta.Priority, delta.Data, delta.DocKey}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed a bit in the standup - this comment prompted the creation of #1065. In the short term I'll leave this as is, as it is not new and can be sorted out better if done in a dedicated yet broader ticket.

if err != nil {
return nil, err
}
Expand All @@ -74,12 +76,22 @@ func (delta *LWWRegDelta) Value() any {
// of an arbitrary data type that ensures convergence.
type LWWRegister struct {
baseCRDT

// schemaVersionKey is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at time of commit.
schemaVersionKey core.CollectionSchemaVersionKey
}

// NewLWWRegister returns a new instance of the LWWReg with the given ID.
func NewLWWRegister(store datastore.DSReaderWriter, key core.DataStoreKey) LWWRegister {
func NewLWWRegister(
store datastore.DSReaderWriter,
schemaVersionKey core.CollectionSchemaVersionKey,
key core.DataStoreKey,
) LWWRegister {
return LWWRegister{
baseCRDT: newBaseCRDT(store, key),
baseCRDT: newBaseCRDT(store, key),
schemaVersionKey: schemaVersionKey,
// id: id,
// data: data,
// ts: ts,
Expand All @@ -105,8 +117,9 @@ func (reg LWWRegister) Value(ctx context.Context) ([]byte, error) {
func (reg LWWRegister) Set(value []byte) *LWWRegDelta {
// return NewLWWRegister(reg.id, value, reg.clock.Apply(), reg.clock)
return &LWWRegDelta{
Data: value,
DocKey: reg.key.Bytes(),
Data: value,
DocKey: reg.key.Bytes(),
SchemaVersionID: reg.schemaVersionKey.SchemaVersionId,
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/crdt/lwwreg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func newMockStore() datastore.DSReaderWriter {
func setupLWWRegister() LWWRegister {
store := newMockStore()
key := core.DataStoreKey{DocKey: "AAAA-BBBB"}
return NewLWWRegister(store, key)
return NewLWWRegister(store, core.CollectionSchemaVersionKey{}, key)
}

func setupLoadedLWWRegster(ctx context.Context) LWWRegister {
Expand Down
3 changes: 3 additions & 0 deletions docs/data_format_changes/i1006-commits-schema-v-id.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Add schema version id to commit queries

The field commit block gained a new schema version id property and this caused the cids to change.
4 changes: 2 additions & 2 deletions merkle/clock/clock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ func newTestMerkleClock() *MerkleClock {

rw := datastore.AsDSReaderWriter(s)
multistore := datastore.MultiStoreFrom(rw)
reg := crdt.NewLWWRegister(rw, core.DataStoreKey{})
reg := crdt.NewLWWRegister(rw, core.CollectionSchemaVersionKey{}, core.DataStoreKey{})
return NewMerkleClock(multistore.Headstore(), multistore.DAGstore(), core.HeadStoreKey{DocKey: "dockey", FieldId: "1"}, reg).(*MerkleClock)
}

func TestNewMerkleClock(t *testing.T) {
s := newDS()
rw := datastore.AsDSReaderWriter(s)
multistore := datastore.MultiStoreFrom(rw)
reg := crdt.NewLWWRegister(rw, core.DataStoreKey{})
reg := crdt.NewLWWRegister(rw, core.CollectionSchemaVersionKey{}, core.DataStoreKey{})
clk := NewMerkleClock(multistore.Headstore(), multistore.DAGstore(), core.HeadStoreKey{}, reg).(*MerkleClock)

if clk.headstore != multistore.Headstore() {
Expand Down
6 changes: 4 additions & 2 deletions merkle/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ import (

var (
lwwFactoryFn = MerkleCRDTFactory(
func(mstore datastore.MultiStore, _ core.CollectionSchemaVersionKey, _ events.UpdateChannel) MerkleCRDTInitFn {
func(mstore datastore.MultiStore, schemaID core.CollectionSchemaVersionKey, _ events.UpdateChannel) MerkleCRDTInitFn {
return func(key core.DataStoreKey) MerkleCRDT {
return NewMerkleLWWRegister(
mstore.Datastore(),
mstore.Headstore(),
mstore.DAGstore(),
schemaID,
core.DataStoreKey{},
key,
)
Expand Down Expand Up @@ -60,9 +61,10 @@ func NewMerkleLWWRegister(
datastore datastore.DSReaderWriter,
headstore datastore.DSReaderWriter,
dagstore datastore.DAGStore,
schemaVersionKey core.CollectionSchemaVersionKey,
ns, key core.DataStoreKey,
) *MerkleLWWRegister {
register := corecrdt.NewLWWRegister(datastore, key /* stuff like namespace and ID */)
register := corecrdt.NewLWWRegister(datastore, schemaVersionKey, key /* stuff like namespace and ID */)
clk := clock.NewMerkleClock(headstore, dagstore, key.ToHeadStoreKey(), register)

// newBaseMerkleCRDT(clock, register)
Expand Down
2 changes: 1 addition & 1 deletion merkle/crdt/merklecrdt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func newTestBaseMerkleCRDT() (*baseMerkleCRDT, datastore.DSReaderWriter) {
rw := datastore.AsDSReaderWriter(s)
multistore := datastore.MultiStoreFrom(rw)

reg := corecrdt.NewLWWRegister(multistore.Datastore(), core.DataStoreKey{})
reg := corecrdt.NewLWWRegister(multistore.Datastore(), core.CollectionSchemaVersionKey{}, core.DataStoreKey{})
clk := clock.NewMerkleClock(multistore.Headstore(), multistore.DAGstore(), core.HeadStoreKey{}, reg)
return &baseMerkleCRDT{clock: clk, crdt: reg}, rw
}
Expand Down
5 changes: 5 additions & 0 deletions planner/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,11 @@ func (n *dagScanNode) dagBlockToNodeDoc(block blocks.Block) (core.Doc, []*ipld.L
return core.Doc{}, nil, ErrDeltaMissingPriority
}

schemaVersionId, ok := delta["SchemaVersionID"].(string)
if ok {
n.parsed.DocumentMapping.SetFirstOfName(&commit, "schemaVersionId", schemaVersionId)
}

n.parsed.DocumentMapping.SetFirstOfName(&commit, "height", int64(prio))
n.parsed.DocumentMapping.SetFirstOfName(&commit, "delta", delta["Data"])

Expand Down
3 changes: 3 additions & 0 deletions planner/mapper/commitSelect.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type CommitSelect struct {

// The parent Cid for which commit information has been requested.
Cid immutable.Option[string]

// The SchemaVersionID at the time of commit.
SchemaVersionID immutable.Option[string]
}

func (s *CommitSelect) CloneTo(index int) Requestable {
Expand Down
11 changes: 0 additions & 11 deletions request/graphql/schema/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,6 @@ import (
schemaTypes "github.com/sourcenetwork/defradb/request/graphql/schema/types"
)

var (
QueryLatestCommits = &gql.Field{
Name: "latestCommits",
Type: gql.NewList(schemaTypes.CommitObject),
Args: gql.FieldConfigArgument{
"dockey": schemaTypes.NewArgConfig(gql.NewNonNull(gql.ID)),
"field": schemaTypes.NewArgConfig(gql.String),
},
}
)

// SchemaManager creates an instanced management point
// for schema intake/outtake, and updates.
type SchemaManager struct {
Expand Down
4 changes: 4 additions & 0 deletions request/graphql/schema/types/commits.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var (
// type Commit {
// Height: Int
// CID: String
// SchemaVersionID: String
// Delta: String
// Previous: [Commit]
// Links: [Commit]
Expand All @@ -45,6 +46,9 @@ var (
"cid": &gql.Field{
Type: gql.String,
},
"schemaVersionId": &gql.Field{
Type: gql.String,
},
"delta": &gql.Field{
Type: gql.String,
},
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/events/simple/with_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ func TestEventsSimpleWithUpdate(t *testing.T) {
ExpectedUpdates: []testUtils.ExpectedUpdate{
{
DocKey: immutable.Some(docKey1),
Cid: immutable.Some("bafybeidmgh5m52lo5ht5r6emiisdwf46s2sfrlx47fy5guz76ueh3znxze"),
Cid: immutable.Some("bafybeigpig5csogxswqwdkawjprfcqqumvkra43rwoebh2ugvx7hns3d7e"),
},
{
DocKey: immutable.Some(docKey2),
},
{
DocKey: immutable.Some(docKey1),
Cid: immutable.Some("bafybeibsvh46szrpx67z4rohjylcjb2p6mlhulv2yfp5a4wrz7xzwaolgq"),
Cid: immutable.Some("bafybeid74gzm5rpglto6yviav4gcl5dfoijttl2oj3dvlofsxwqfxek7eu"),
},
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestMutationCreateSimpleReturnVersionCID(t *testing.T) {
{
"_version": []map[string]any{
{
"cid": "bafybeidfn4vxabyimc4xqq7ipkr4pbr65agr6f6vt2nmjwt72zd425pc6e",
"cid": "bafybeidzfrcnjm35ftztl44rskyeroysm3alwluirks5zabx6we55ysqcy",
},
},
},
Expand Down
54 changes: 45 additions & 9 deletions tests/integration/query/commits/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ func TestQueryCommits(t *testing.T) {
},
Results: []map[string]any{
{
"cid": "bafybeidst2mzxhdoh4ayjdjoh4vibo7vwnuoxk3xgyk5mzmep55jklni2a",
"cid": "bafybeigju7dgicfq3fxvtlxtjao7won4xc7kusykkvumngjfx5i2c7ibny",
},
{
"cid": "bafybeihhypcsqt7blkrqtcmpl43eo3yunrog5pchox5naji6hisdme4swm",
"cid": "bafybeiaqarrcayyoly2gdiam6mhh72ls4azwa7brozxxc3q2srnggkkqkq",
},
{
"cid": "bafybeid2b6a5vbqzxyxrzvwvkakqlzgcdpcdpkpmufthy4hnasu4zcyzua",
"cid": "bafybeid5l577igkgcn6wjqjeqxlta4dcc3a3iykwkborf4fklaenjuctoq",
},
},
}
Expand Down Expand Up @@ -70,22 +70,58 @@ func TestQueryCommitsMultipleDocs(t *testing.T) {
},
Results: []map[string]any{
{
"cid": "bafybeidsa74vl7xvw6tzgt5gmux5ts7lxldxculzgpxl5xura45ckf7e5i",
"cid": "bafybeibmprk2bxsv2nj2sf5ofmu7yuqe7dz2dze546nxkzwwylxyzpruoy",
},
{
"cid": "bafybeibek4lmrb5gtmahgsv33njmk3efty53n7z2rac7fuup7mwpho5zqa",
"cid": "bafybeidbb4dv2smuzmeodcrbt2dk6loqj7i3a6fofl32ejbx2gtinxguye",
},
{
"cid": "bafybeibnwzoekenlil5sdltgmkmfngoifd5uwcpgzzry7rokrd5qurgdve",
"cid": "bafybeifmifbksnwuwxhkwjqdojbddw2274f7wzd4jamllaoud3llunm5xu",
},
{
"cid": "bafybeidst2mzxhdoh4ayjdjoh4vibo7vwnuoxk3xgyk5mzmep55jklni2a",
"cid": "bafybeigju7dgicfq3fxvtlxtjao7won4xc7kusykkvumngjfx5i2c7ibny",
},
{
"cid": "bafybeihhypcsqt7blkrqtcmpl43eo3yunrog5pchox5naji6hisdme4swm",
"cid": "bafybeiaqarrcayyoly2gdiam6mhh72ls4azwa7brozxxc3q2srnggkkqkq",
},
{
"cid": "bafybeid2b6a5vbqzxyxrzvwvkakqlzgcdpcdpkpmufthy4hnasu4zcyzua",
"cid": "bafybeid5l577igkgcn6wjqjeqxlta4dcc3a3iykwkborf4fklaenjuctoq",
},
},
}

executeTestCase(t, test)
}

func TestQueryCommitsWithSchemaVersionIdField(t *testing.T) {
test := testUtils.RequestTestCase{
Description: "Simple commits query yielding schemaVersionId",
Request: `query {
commits {
cid
schemaVersionId
}
}`,
Docs: map[int][]string{
0: {
`{
"Name": "John",
"Age": 21
}`,
},
},
Results: []map[string]any{
{
"cid": "bafybeigju7dgicfq3fxvtlxtjao7won4xc7kusykkvumngjfx5i2c7ibny",
"schemaVersionId": "bafkreihaqmvbjvm2q4iwkjnuafavvsakiaztlqnridiybxystfm27uwlde",
},
{
"cid": "bafybeiaqarrcayyoly2gdiam6mhh72ls4azwa7brozxxc3q2srnggkkqkq",
"schemaVersionId": "bafkreihaqmvbjvm2q4iwkjnuafavvsakiaztlqnridiybxystfm27uwlde",
},
{
"cid": "bafybeid5l577igkgcn6wjqjeqxlta4dcc3a3iykwkborf4fklaenjuctoq",
"schemaVersionId": "bafkreihaqmvbjvm2q4iwkjnuafavvsakiaztlqnridiybxystfm27uwlde",
},
},
}
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/query/commits/with_cid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestQueryCommitsWithCid(t *testing.T) {
Description: "Simple all commits query with cid",
Request: `query {
commits(
cid: "bafybeigz4lfwqqunimseeok4w222e2vsje6dr53gpw3mtk7muuxkja3oiq"
cid: "bafybeid5l577igkgcn6wjqjeqxlta4dcc3a3iykwkborf4fklaenjuctoq"
) {
cid
}
Expand All @@ -45,7 +45,7 @@ func TestQueryCommitsWithCid(t *testing.T) {
},
Results: []map[string]any{
{
"cid": "bafybeigz4lfwqqunimseeok4w222e2vsje6dr53gpw3mtk7muuxkja3oiq",
"cid": "bafybeid5l577igkgcn6wjqjeqxlta4dcc3a3iykwkborf4fklaenjuctoq",
},
},
}
Expand All @@ -59,7 +59,7 @@ func TestQueryCommitsWithCidForFieldCommit(t *testing.T) {
Description: "Simple all commits query with cid",
Request: `query {
commits(
cid: "bafybeidst2mzxhdoh4ayjdjoh4vibo7vwnuoxk3xgyk5mzmep55jklni2a"
cid: "bafybeigju7dgicfq3fxvtlxtjao7won4xc7kusykkvumngjfx5i2c7ibny"
) {
cid
}
Expand All @@ -74,7 +74,7 @@ func TestQueryCommitsWithCidForFieldCommit(t *testing.T) {
},
Results: []map[string]any{
{
"cid": "bafybeidst2mzxhdoh4ayjdjoh4vibo7vwnuoxk3xgyk5mzmep55jklni2a",
"cid": "bafybeigju7dgicfq3fxvtlxtjao7won4xc7kusykkvumngjfx5i2c7ibny",
},
},
}
Expand Down
Loading