-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Remove CollectionDescription.Schema #1965
Changes from all commits
0a5789c
abe37a1
f2aa873
1c18f7d
2e42d45
0f745b3
ad3c0eb
1d4b765
12ccb43
17bd290
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,6 +47,7 @@ | |
COLLECTION_SCHEMA_VERSION_HISTORY = "/collection/version/h" | ||
COLLECTION_INDEX = "/collection/index" | ||
SCHEMA_MIGRATION = "/schema/migration" | ||
SCHEMA_VERSION = "/schema/version" | ||
SEQ = "/seq" | ||
PRIMARY_KEY = "/pk" | ||
DATASTORE_DOC_VERSION_FIELD_ID = "v" | ||
|
@@ -132,6 +133,15 @@ | |
|
||
var _ Key = (*CollectionIndexKey)(nil) | ||
|
||
// SchemaVersionKey points to the json serialized schema at the specified version. | ||
// | ||
// It's corresponding value is immutable. | ||
type SchemaVersionKey struct { | ||
SchemaVersionID string | ||
} | ||
|
||
var _ Key = (*SchemaVersionKey)(nil) | ||
|
||
// SchemaHistoryKey holds the pathway through the schema version history for | ||
// any given schema. | ||
// | ||
|
@@ -257,6 +267,11 @@ | |
return CollectionSchemaVersionKey{SchemaVersionId: schemaVersionId} | ||
} | ||
|
||
func NewCollectionSchemaVersionKeyFromString(key string) CollectionSchemaVersionKey { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question: Don't you think it makes sense to add some unit tests? There is a dedicated file that tests keys There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not. This is very simple, and tested by pretty much every integration test we have. Unit tests would only slow down development (now and in the future). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why would be relying on an integration test to flag an issue with something that can easily be locally tested? That means that if we make a change to any of the keys, we would have to run the integration test suite to see if there are any problems with the change instead of running the much faster local unit tests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had a closer look and I think that not unit testing our keys could cause bad changes to go unnoticed until the breaking change tests are executed which could be quite annoying. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not really in the mood to get back into the integration vs unit test conversation at the moment. I have written a large amount on this topic before and you know my opinion on this matter. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have two overall objective with testing: I'll end my input on this conversation with reminding you of the above objectives. If you believe that simply relying on integration tests in case contributes towards achieving the above objectives, then I think we're good. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I very strongly believe so. Sometimes the integration tests will need to be more imaginative than our current core suite, for example chaos monkey like magic, or providing a deliberately faulty/test datastore implementation (the root datastore is something that can be publicly provided), but ultimately everything should be testable via public only access points. If it isn't, it is dead code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just another note: it is practically impossible to reach a desired test coverage just with integration tests. And those tests helped me write better code and fix bugs (not reported) in the existing code. Like this: err := someAction()
if err != nil {
return err
}
...
res :=rsultsIter.NextSync()
if res.Error != nil {
return err // here is the problem
} Usually we don't write test that such paths. But we can easily overlook the details and ship incorrect code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Such resilience tests don't need to be coded like traditional do-this, do-that tests. You can use this https://en.wikipedia.org/wiki/Chaos_engineering approach instead. This can allow the tests to be somewhat proactive (searching for bugs) instead reactive (testing known pathways where bugs are expected). Also a lot of our errors can be reached my manipulating the datastore values directly, no need to mess around with mocks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also want to introduce a really magic testing layer, where randomized queries and setup data are fed in. Traditionally you can do this using recorded real-world data/actions, but there has been significant movement in using 'AI' to generate stuff like queries for such things. |
||
elements := strings.Split(key, "/") | ||
return CollectionSchemaVersionKey{SchemaVersionId: elements[len(elements)-1]} | ||
} | ||
|
||
// NewCollectionIndexKey creates a new CollectionIndexKey from a collection name and index name. | ||
func NewCollectionIndexKey(colID, indexName string) CollectionIndexKey { | ||
return CollectionIndexKey{CollectionName: colID, IndexName: indexName} | ||
|
@@ -307,6 +322,10 @@ | |
return ds.NewKey(k.ToString()) | ||
} | ||
|
||
func NewSchemaVersionKey(schemaVersionID string) SchemaVersionKey { | ||
return SchemaVersionKey{SchemaVersionID: schemaVersionID} | ||
} | ||
|
||
func NewSchemaHistoryKey(schemaId string, previousSchemaVersionID string) SchemaHistoryKey { | ||
return SchemaHistoryKey{ | ||
SchemaID: schemaId, | ||
|
@@ -625,6 +644,24 @@ | |
return ds.NewKey(k.ToString()) | ||
} | ||
|
||
func (k SchemaVersionKey) ToString() string { | ||
result := SCHEMA_VERSION | ||
|
||
if k.SchemaVersionID != "" { | ||
result = result + "/" + k.SchemaVersionID | ||
} | ||
|
||
return result | ||
} | ||
|
||
func (k SchemaVersionKey) Bytes() []byte { | ||
return []byte(k.ToString()) | ||
} | ||
|
||
func (k SchemaVersionKey) ToDS() ds.Key { | ||
return ds.NewKey(k.ToString()) | ||
} | ||
|
||
func (k SchemaHistoryKey) ToString() string { | ||
result := COLLECTION_SCHEMA_VERSION_HISTORY | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,14 +28,13 @@ | |
"github.com/sourcenetwork/defradb/client" | ||
"github.com/sourcenetwork/defradb/client/request" | ||
"github.com/sourcenetwork/defradb/core" | ||
ccid "github.com/sourcenetwork/defradb/core/cid" | ||
"github.com/sourcenetwork/defradb/datastore" | ||
"github.com/sourcenetwork/defradb/db/base" | ||
"github.com/sourcenetwork/defradb/db/description" | ||
"github.com/sourcenetwork/defradb/db/fetcher" | ||
"github.com/sourcenetwork/defradb/errors" | ||
"github.com/sourcenetwork/defradb/events" | ||
"github.com/sourcenetwork/defradb/lens" | ||
"github.com/sourcenetwork/defradb/logging" | ||
"github.com/sourcenetwork/defradb/merkle/crdt" | ||
) | ||
|
||
|
@@ -119,67 +118,41 @@ | |
} | ||
desc.ID = uint32(colID) | ||
|
||
for i := range schema.Fields { | ||
schema.Fields[i].ID = client.FieldID(i) | ||
} | ||
|
||
col, err := db.newCollection(desc, schema) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// Local elements such as secondary indexes should be excluded | ||
// from the (global) schemaId. | ||
schemaBuf, err := json.Marshal(schema) | ||
schema, err = description.CreateSchemaVersion(ctx, txn, schema) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// add a reference to this DB by desc hash | ||
cid, err := ccid.NewSHA256CidV1(schemaBuf) | ||
if err != nil { | ||
return nil, err | ||
} | ||
schemaID := cid.String() | ||
|
||
// For new schemas the initial version id will match the schema id | ||
schemaVersionID := schemaID | ||
|
||
schema.VersionID = schemaVersionID | ||
schema.SchemaID = schemaID | ||
desc.Schema = schema | ||
desc.SchemaVersionID = schema.VersionID | ||
|
||
// buffer must include all the ids, as it is saved and loaded from the store later. | ||
buf, err := json.Marshal(desc) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
collectionSchemaVersionKey := core.NewCollectionSchemaVersionKey(schemaVersionID) | ||
collectionSchemaVersionKey := core.NewCollectionSchemaVersionKey(schema.VersionID) | ||
// Whilst the schemaVersionKey is global, the data persisted at the key's location | ||
// is local to the node (the global only elements are not useful beyond key generation). | ||
err = txn.Systemstore().Put(ctx, collectionSchemaVersionKey.ToDS(), buf) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
collectionSchemaKey := core.NewCollectionSchemaKey(schemaID) | ||
err = txn.Systemstore().Put(ctx, collectionSchemaKey.ToDS(), []byte(schemaVersionID)) | ||
collectionSchemaKey := core.NewCollectionSchemaKey(schema.SchemaID) | ||
err = txn.Systemstore().Put(ctx, collectionSchemaKey.ToDS(), []byte(schema.VersionID)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
err = txn.Systemstore().Put(ctx, collectionKey.ToDS(), []byte(schemaVersionID)) | ||
err = txn.Systemstore().Put(ctx, collectionKey.ToDS(), []byte(schema.VersionID)) | ||
Comment on lines
+133
to
+147
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thought: These make no sense anymore. I think this is what you were referring to here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes :) |
||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
log.Debug( | ||
ctx, | ||
"Created collection", | ||
logging.NewKV("Name", col.Name()), | ||
logging.NewKV("SchemaID", col.SchemaID()), | ||
) | ||
col, err := db.newCollection(desc, schema) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
for _, index := range desc.Indexes { | ||
if _, err := col.createIndex(ctx, txn, index); err != nil { | ||
|
@@ -203,12 +176,9 @@ | |
txn datastore.Txn, | ||
existingSchemaByName map[string]client.SchemaDescription, | ||
proposedDescriptionsByName map[string]client.SchemaDescription, | ||
def client.CollectionDefinition, | ||
schema client.SchemaDescription, | ||
setAsDefaultVersion bool, | ||
) (client.Collection, error) { | ||
schema := def.Schema | ||
desc := def.Description | ||
|
||
hasChanged, err := db.validateUpdateSchema( | ||
ctx, | ||
txn, | ||
|
@@ -221,7 +191,7 @@ | |
} | ||
|
||
if !hasChanged { | ||
return db.getCollectionByName(ctx, txn, desc.Name) | ||
return db.getCollectionByName(ctx, txn, schema.Name) | ||
} | ||
|
||
for _, field := range schema.Fields { | ||
|
@@ -239,56 +209,40 @@ | |
} | ||
|
||
for i, field := range schema.Fields { | ||
if field.ID == client.FieldID(0) { | ||
// This is not wonderful and will probably break when we add the ability | ||
// to delete fields, however it is good enough for now and matches the | ||
// create behaviour. | ||
field.ID = client.FieldID(i) | ||
schema.Fields[i] = field | ||
} | ||
|
||
if field.Typ == client.NONE_CRDT { | ||
// If no CRDT Type has been provided, default to LWW_REGISTER. | ||
field.Typ = client.LWW_REGISTER | ||
schema.Fields[i] = field | ||
} | ||
} | ||
|
||
globalSchemaBuf, err := json.Marshal(schema) | ||
schema, err = description.CreateSchemaVersion(ctx, txn, schema) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
cid, err := ccid.NewSHA256CidV1(globalSchemaBuf) | ||
col, err := db.getCollectionByName(ctx, txn, schema.Name) | ||
if err != nil { | ||
return nil, err | ||
} | ||
previousSchemaVersionID := schema.VersionID | ||
schemaVersionID := cid.String() | ||
schema.VersionID = schemaVersionID | ||
desc.Schema = schema | ||
desc := col.Description() | ||
desc.SchemaVersionID = schema.VersionID | ||
|
||
buf, err := json.Marshal(desc) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
collectionSchemaVersionKey := core.NewCollectionSchemaVersionKey(schemaVersionID) | ||
collectionSchemaVersionKey := core.NewCollectionSchemaVersionKey(schema.VersionID) | ||
// Whilst the schemaVersionKey is global, the data persisted at the key's location | ||
// is local to the node (the global only elements are not useful beyond key generation). | ||
err = txn.Systemstore().Put(ctx, collectionSchemaVersionKey.ToDS(), buf) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
schemaVersionHistoryKey := core.NewSchemaHistoryKey(schema.SchemaID, previousSchemaVersionID) | ||
err = txn.Systemstore().Put(ctx, schemaVersionHistoryKey.ToDS(), []byte(schemaVersionID)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if setAsDefaultVersion { | ||
err = db.setDefaultSchemaVersionExplicit(ctx, txn, desc.Name, schema.SchemaID, schemaVersionID) | ||
err = db.setDefaultSchemaVersionExplicit(ctx, txn, desc.Name, schema.SchemaID, schema.VersionID) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -308,6 +262,10 @@ | |
proposedDescriptionsByName map[string]client.SchemaDescription, | ||
proposedDesc client.SchemaDescription, | ||
) (bool, error) { | ||
if proposedDesc.Name == "" { | ||
return false, ErrSchemaNameEmpty | ||
} | ||
|
||
existingDesc, collectionExists := existingDescriptionsByName[proposedDesc.Name] | ||
if !collectionExists { | ||
return false, NewErrAddCollectionWithPatch(proposedDesc.Name) | ||
|
@@ -538,7 +496,7 @@ | |
} | ||
|
||
desc := col.Description() | ||
err = db.setDefaultSchemaVersionExplicit(ctx, txn, desc.Name, desc.Schema.SchemaID, schemaVersionID) | ||
err = db.setDefaultSchemaVersionExplicit(ctx, txn, desc.Name, col.Schema().SchemaID, schemaVersionID) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -597,11 +555,16 @@ | |
return nil, err | ||
} | ||
|
||
schema, err := description.GetSchemaVersion(ctx, txn, desc.SchemaVersionID) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
col := &collection{ | ||
db: db, | ||
def: client.CollectionDefinition{ | ||
Description: desc, | ||
Schema: desc.Schema, | ||
Schema: schema, | ||
}, | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
// Copyright 2023 Democratized Data Foundation | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.txt. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0, included in the file | ||
// licenses/APL.txt. | ||
|
||
package description | ||
|
||
import "github.com/sourcenetwork/defradb/errors" | ||
|
||
const ( | ||
errFailedToCreateSchemaQuery string = "failed to create schema prefix query" | ||
errFailedToCloseSchemaQuery string = "failed to close schema prefix query" | ||
) | ||
|
||
// NewErrFailedToCreateSchemaQuery returns a new error indicating that the query | ||
// to create a schema failed. | ||
func NewErrFailedToCreateSchemaQuery(inner error) error { | ||
return errors.Wrap(errFailedToCreateSchemaQuery, inner) | ||
} | ||
|
||
// NewErrFailedToCreateSchemaQuery returns a new error indicating that the query | ||
// to create a schema failed to close. | ||
func NewErrFailedToCloseSchemaQuery(inner error) error { | ||
return errors.Wrap(errFailedToCloseSchemaQuery, inner) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: I think it needs a comment here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cheers - I missed that, will add