Skip to content

Commit

Permalink
Local first optimizations (#450)
Browse files Browse the repository at this point in the history
feat: adds mod tags + other nits
other nits include:
* feat: don't error on delete of non-existant keys
* feat: allow save of non-existent instances
* feat: include mod tag in all instances
  • Loading branch information
carsonfarmer authored Oct 19, 2020
1 parent 8b51ad1 commit 43cf56e
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 24 deletions.
2 changes: 1 addition & 1 deletion db/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ with logical clocks.
#### Dispatcher
This is an internal component not available in the public API.
Main responsibility: Source of truth regarding known `db.Event`s for the
`DB`. Will notify registered parties to let them know about new ones..
`DB`. Will notify registered parties to let them know about new ones.

Every `Event` generated in the `DB` is sent to a `Dispatcher` when write
transactions are committed. The dispatcher is responsible for broadcasting
Expand Down
121 changes: 113 additions & 8 deletions db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"

"github.com/alecthomas/jsonschema"
"github.com/dop251/goja"
jsonpatch "github.com/evanphx/json-patch"
"github.com/ipfs/go-ipld-format"
format "github.com/ipfs/go-ipld-format"
ds "github.com/textileio/go-datastore"
"github.com/textileio/go-datastore/query"
"github.com/textileio/go-threads/core/app"
core "github.com/textileio/go-threads/core/db"
"github.com/textileio/go-threads/core/thread"
Expand All @@ -36,9 +39,9 @@ var (
ErrInvalidSchemaInstance = errors.New("instance doesn't correspond to schema")

errMissingInstanceID = errors.New("invalid instance: missing _id attribute")
errMissingModTag = errors.New("invalid instance: missing _mod attribute")
errAlreadyDiscardedCommitedTxn = errors.New("can't commit discarded/committed txn")
errCantCreateExistingInstance = errors.New("can't create already existing instance")
errCantSaveNonExistentInstance = errors.New("can't save unkown instance")

baseKey = dsPrefix.ChildString("collection")
)
Expand Down Expand Up @@ -263,6 +266,68 @@ func (c *Collection) Find(q *Query, opts ...TxnOption) (instances [][]byte, err
return
}

type filter struct {
Collection string
Time int
}

func (f filter) Filter(e query.Entry) bool {
// Easy out, are we in the right collection?
key := ds.NewKey(e.Key)
base := key.Parent().BaseNamespace()
number, _ := strconv.Atoi(base)
kind := key.Type()
return kind == f.Collection && number > f.Time

}

// ModifiedSince returns a list of all instances that have been modified (and/or touched) since `time`.
func (c *Collection) ModifiedSince(time int64, opts ...TxnOption) (modified []core.InstanceID, err error) {
// Need to identify instances that have been touched (created, saved, or deleted) since `time`.
// The _mod field tracks modified instances, but not those that have been deleted, so we need
// to query the dispatcher for all (unique) instances in this collection that have been modified
// at all since `time`.
c.db.dispatcher.Lock().Lock()
defer c.db.dispatcher.Lock().Unlock()

txn, err := c.db.dispatcher.Store().NewTransaction(false)
if err != nil {
return nil, err
}
defer txn.Discard()

query := query.Query{
Prefix: dsDispatcherPrefix.String(),
SeekPrefix: dsDispatcherPrefix.ChildString(strconv.FormatInt(time, 10)).String(),
Filters: []query.Filter{
filter{
Collection: c.name,
},
},
KeysOnly: true,
}
results, err := txn.Query(query)
if err != nil {
return nil, err
}

type void struct{}
var member void
set := make(map[core.InstanceID]void)

for r := range results.Next() {
if r.Error != nil {
return nil, r.Error
}
id := ds.NewKey(r.Key)
set[core.InstanceID(id.Name())] = member
}
for k := range set {
modified = append(modified, k)
}
return modified, nil
}

// validInstance validates the json object against the collection schema.
func (c *Collection) validInstance(v []byte) error {
r, err := gojsonschema.Validate(c.schemaLoader, gojsonschema.NewBytesLoader(v))
Expand Down Expand Up @@ -322,9 +387,8 @@ func (c *Collection) validWrite(identity thread.PubKey, e core.Event) error {
case bool:
if out.(bool) {
return nil
} else {
return app.ErrInvalidNetRecordBody
}
return app.ErrInvalidNetRecordBody
case nil:
return app.ErrInvalidNetRecordBody
default:
Expand Down Expand Up @@ -407,6 +471,9 @@ func (t *Txn) Create(new ...[]byte) ([]core.InstanceID, error) {
return nil, errCantCreateExistingInstance
}

// Update readonly/protected mod tag
_, updated = setModifiedTag(updated)

a := core.Action{
Type: core.Create,
InstanceID: id,
Expand Down Expand Up @@ -455,6 +522,7 @@ func (t *Txn) Save(updated ...[]byte) error {
if err != nil {
return err
}

t.actions = append(t.actions, actions...)
return nil
}
Expand All @@ -473,14 +541,28 @@ func (t *Txn) createSaveActions(identity thread.PubKey, updated ...[]byte) ([]co
return nil, err
}

// Update readonly/protected mod tag
_, next = setModifiedTag(next)

// Because this is a save event, even though we might still create the new instance
// it has to have a valid _id ahead of time.
id, err := getInstanceID(next)
if err != nil {
return nil, err
}
key := baseKey.ChildString(t.collection.name).ChildString(id.String())
previous, err := t.collection.db.datastore.Get(key)
if err == ds.ErrNotFound {
return nil, errCantSaveNonExistentInstance
a := core.Action{
Type: core.Create,
InstanceID: id,
CollectionName: t.collection.name,
Previous: nil,
Current: next,
}
t.actions = append(t.actions, a)
// Early out and make this a create event. Assumes valid _id as specified above
continue
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -513,7 +595,8 @@ func (t *Txn) Delete(ids ...core.InstanceID) error {
return err
}
if !exists {
return ErrInstanceNotFound
// Nothing to be done here
return nil
}
a := core.Action{
Type: core.Delete,
Expand Down Expand Up @@ -656,11 +739,11 @@ func getSchemaTypeProperties(jt *jsonschema.Type, defs jsonschema.Definitions) (
if jt.Ref != "" {
parts := strings.Split(jt.Ref, "/")
if len(parts) < 1 {
return nil, ErrInvalidCollectionSchema
return make(map[string]*jsonschema.Type), nil
}
def := defs[parts[len(parts)-1]]
if def == nil {
return nil, ErrInvalidCollectionSchema
return make(map[string]*jsonschema.Type), nil
}
properties = def.Properties
}
Expand Down Expand Up @@ -689,6 +772,28 @@ func setNewInstanceID(t []byte) (core.InstanceID, []byte) {
return newID, patchedValue
}

func getModifiedTag(t []byte) (int64, error) {
partial := &struct {
Mod *int64 `json:"_mod"`
}{}
if err := json.Unmarshal(t, partial); err != nil {
return 0, fmt.Errorf("unmarshaling json instance: %v", err)
}
if partial.Mod == nil {
return 0, errMissingInstanceID
}
return *partial.Mod, nil
}

func setModifiedTag(t []byte) (newTime int64, patchedValue []byte) {
newTime = time.Now().UnixNano()
patchedValue, err := jsonpatch.MergePatch(t, []byte(fmt.Sprintf(`{"%s": %d}`, modFieldName, newTime)))
if err != nil {
log.Fatalf("while automatically patching autogenerated _mod: %v", err)
}
return
}

func (t *Txn) createEvents(actions []core.Action) (events []core.Event, node format.Node, err error) {
if t.discarded || t.committed {
return nil, nil, errAlreadyDiscardedCommitedTxn
Expand Down
Loading

0 comments on commit 43cf56e

Please sign in to comment.