Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Make queries utilise secondary indexes #1925

Merged
merged 99 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from 91 commits
Commits
Show all changes
99 commits
Select commit Hold shift + click to select a range
38c5572
Enable custom asserters for integration tests
islamaliev Jul 3, 2023
4d0cccf
Add simple integration test for fetching with index
islamaliev Jul 3, 2023
3c8c353
Extract common operations
islamaliev Jul 11, 2023
c07d3d5
Make collection read only by db
islamaliev Jul 11, 2023
2bc0730
Add first primitive indexer version
islamaliev Jul 11, 2023
f623666
fetch indexed value one by one
islamaliev Jul 13, 2023
7e9857e
Create FetcherSwitcher
islamaliev Jul 13, 2023
739ed11
Refactor tests
islamaliev Jul 13, 2023
20a34b1
Add test with _qt filter
islamaliev Jul 13, 2023
e9264b8
Create index iterator
islamaliev Jul 14, 2023
b9d6927
Implement gt index search
islamaliev Jul 17, 2023
74e86c7
Add check to FetcherSwitcher
islamaliev Jul 17, 2023
afbdbda
Implement all filter types
islamaliev Jul 17, 2023
8a0d751
Polish
islamaliev Jul 18, 2023
19bf49d
Fix after lens rebase
islamaliev Jul 18, 2023
cd795bc
Format
islamaliev Jul 27, 2023
c6add52
Rename file
islamaliev Jul 27, 2023
4cbd3ae
Fix tests
islamaliev Jul 28, 2023
eb5d2e8
Use ExecInfo in indexer
islamaliev Jul 28, 2023
78dff86
Fix for nested test actions
islamaliev Jul 28, 2023
661da69
Improve ExplainResultAsserter
islamaliev Jul 28, 2023
97a6786
Set schemaVersionID on doc merge
islamaliev Jul 28, 2023
d99c599
Adjust tests
islamaliev Jul 28, 2023
e7e3017
Adjustments
islamaliev Aug 16, 2023
245b63c
Make combination of index and non-index filters work
islamaliev Aug 18, 2023
cf0a2dd
Refactor tests
islamaliev Aug 21, 2023
5744f3d
Make index iterator with more encapsulated code
islamaliev Aug 22, 2023
01c017a
Adjust tests
islamaliev Aug 22, 2023
05c4686
Introduce indexesFetched metric
islamaliev Aug 22, 2023
0d0fc04
Rename indexesFetches to indexFetches
islamaliev Aug 22, 2023
07a5857
Optimize _in index filter
islamaliev Aug 23, 2023
cf2724c
Refactor _in filter to allow multiple indexes with the same value
islamaliev Aug 23, 2023
8cd7fa8
Adjusted after rebase
islamaliev Sep 13, 2023
e5b9c7f
Polish
islamaliev Sep 13, 2023
fbd4efa
Consistent collection names
islamaliev Sep 15, 2023
6b166f6
Automatic creation of related docs
islamaliev Sep 15, 2023
d8e0780
Make possible to copy/remove field or related objects
islamaliev Sep 21, 2023
6de6139
Add ExtractRelation filter method
islamaliev Sep 21, 2023
1dec53f
Polish filter split
islamaliev Sep 21, 2023
d95fbf3
Make possible to assert total scan metrics
islamaliev Sep 22, 2023
008b71c
Rename
islamaliev Sep 22, 2023
1d32149
Implement ExtractProperties
islamaliev Sep 22, 2023
9400b4e
Basic implementation of one-to-many reverted fetching order
islamaliev Sep 22, 2023
1d5cbb3
Use filter ExtractProperties method
islamaliev Sep 25, 2023
1ba783e
Prepare tests for one-to-one relation
islamaliev Sep 26, 2023
f33a152
Move joinMany code into twoWayFetchDirector
islamaliev Sep 26, 2023
5671701
Move more code into twoWayFetchDirector
islamaliev Sep 26, 2023
03e954c
Polish
islamaliev Sep 26, 2023
4567338
Implement inverted one-to-one
islamaliev Sep 26, 2023
c293200
Restructure schema parsing
islamaliev Sep 26, 2023
038a230
Add check for @primary directive
islamaliev Sep 26, 2023
04333ee
Refactor doc generation
islamaliev Sep 27, 2023
1b64920
Adjust explain asserted to include sub scan
islamaliev Sep 29, 2023
fb9c128
Adjust explain tests
islamaliev Sep 29, 2023
c030199
use twoWayFetchDirector
islamaliev Sep 29, 2023
0254ecf
Use one method for fetching in both directions
islamaliev Oct 2, 2023
b5b4a68
Fix linter
islamaliev Oct 2, 2023
f10c13b
Add tests for more relations
islamaliev Oct 2, 2023
1fd0854
Add docs
islamaliev Oct 4, 2023
4806bcd
Extract index iterator into a separate file
islamaliev Oct 4, 2023
464a97b
Add docs
islamaliev Oct 4, 2023
090a511
Move explain asserter to common folder
islamaliev Oct 4, 2023
ebc476a
Update doc
islamaliev Oct 4, 2023
efef9df
Make result of CollectIndexedFields more deterministic
islamaliev Oct 4, 2023
71915f6
Fix lint
islamaliev Oct 4, 2023
05ae6ec
Core review polish
islamaliev Oct 5, 2023
32af4df
Small refactor
islamaliev Oct 5, 2023
8c7908f
Change tests to list testUtils.Request explicitly
islamaliev Oct 5, 2023
4570de6
Change bench framework to allow directives on schemas
islamaliev Oct 5, 2023
3e86164
Add querying with index to benchmarks
islamaliev Oct 5, 2023
c3084b6
Add benchmark comparison for indexed query
islamaliev Oct 9, 2023
53360da
Fix lint
islamaliev Oct 9, 2023
acf44f8
Fix imports
islamaliev Oct 9, 2023
f3fa9b5
Fix tests
islamaliev Oct 9, 2023
8b4f1ca
Adjust index performance benchmark
islamaliev Oct 9, 2023
f624bf7
Polish
islamaliev Oct 9, 2023
703a0f5
Enable benchmark to focus on client types
islamaliev Oct 9, 2023
154cfdd
Add specs to devices
islamaliev Oct 10, 2023
defc91f
Allow indexing of foreign fields
islamaliev Oct 10, 2023
427245c
Refactor index matchers
islamaliev Oct 11, 2023
d7c2784
Make index filter return error
islamaliev Oct 11, 2023
3b31a0a
Pass schema to CollectIndexedFields
islamaliev Oct 11, 2023
504c5d3
Add test for _like "pref%suf"
islamaliev Oct 11, 2023
35ce118
Add test for _like with double %
islamaliev Oct 11, 2023
1b9b2a5
Add doc
islamaliev Oct 11, 2023
4571107
Polish switch statement
islamaliev Oct 12, 2023
ebc5e0c
Refactor bench options
islamaliev Oct 12, 2023
cc860d3
Polish
islamaliev Oct 12, 2023
9c6b242
Add comment
islamaliev Oct 12, 2023
0f87637
Rename
islamaliev Oct 12, 2023
45ed665
Polish
islamaliev Oct 12, 2023
dd114e2
Polish
islamaliev Oct 13, 2023
c812f3e
Make comparison bench tests local to framework
islamaliev Oct 13, 2023
5cc6f29
Add an error
islamaliev Oct 13, 2023
17297b3
Remove optional nil
islamaliev Oct 13, 2023
0ac0a75
Assert errors for bench action
islamaliev Oct 13, 2023
82934ea
Add doc
islamaliev Oct 13, 2023
db8f075
Fix lint
islamaliev Oct 13, 2023
869f867
Adjust bench
islamaliev Oct 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions client/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,22 @@ type IndexDescription struct {
// Fields contains the fields that are being indexed.
Fields []IndexedFieldDescription
}

// CollectIndexedFields returns all fields that are indexed by all collection indexes.
func (d CollectionDescription) CollectIndexedFields(schema *SchemaDescription) []FieldDescription {
fieldsMap := make(map[string]bool)
fields := make([]FieldDescription, 0, len(d.Indexes))
for _, index := range d.Indexes {
for _, field := range index.Fields {
for i := range schema.Fields {
colField := schema.Fields[i]
if field.Name == colField.Name && !fieldsMap[field.Name] {
fieldsMap[field.Name] = true
fields = append(fields, colField)
break
}
}
}
}
return fields
}
81 changes: 81 additions & 0 deletions datastore/prefix_query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2023 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package datastore

import (
"context"
"encoding/json"

ds "github.com/ipfs/go-datastore"

"github.com/ipfs/go-datastore/query"
)

// DeserializePrefix deserializes all elements with the given prefix from the given storage.
// It returns the keys and their corresponding elements.
func DeserializePrefix[T any](
ctx context.Context,
prefix string,
store ds.Read,
) ([]string, []T, error) {
q, err := store.Query(ctx, query.Query{Prefix: prefix})
if err != nil {
return nil, nil, err
}

keys := make([]string, 0)
elements := make([]T, 0)
for res := range q.Next() {
if res.Error != nil {
_ = q.Close()
return nil, nil, res.Error
}

var element T
err = json.Unmarshal(res.Value, &element)
if err != nil {
_ = q.Close()
return nil, nil, err
}
keys = append(keys, res.Key)
elements = append(elements, element)
}
if err := q.Close(); err != nil {
return nil, nil, err
}

Check warning on line 53 in datastore/prefix_query.go

View check run for this annotation

Codecov / codecov/patch

datastore/prefix_query.go#L52-L53

Added lines #L52 - L53 were not covered by tests
return keys, elements, nil
}

// FetchKeysForPrefix fetches all keys with the given prefix from the given storage.
func FetchKeysForPrefix(
ctx context.Context,
prefix string,
store ds.Read,
) ([]ds.Key, error) {
q, err := store.Query(ctx, query.Query{Prefix: prefix})
if err != nil {
return nil, err
}

keys := make([]ds.Key, 0)
for res := range q.Next() {
if res.Error != nil {
_ = q.Close()
return nil, res.Error
}
keys = append(keys, ds.NewKey(res.Key))
}
if err = q.Close(); err != nil {
return nil, err
}

return keys, nil
}
89 changes: 17 additions & 72 deletions db/collection_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ import (
"strconv"
"strings"

ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/datastore"
Expand Down Expand Up @@ -63,7 +60,7 @@ func (db *db) getAllIndexes(
) (map[client.CollectionName][]client.IndexDescription, error) {
prefix := core.NewCollectionIndexKey("", "")

deserializedIndexes, err := deserializePrefix[client.IndexDescription](ctx,
keys, indexDescriptions, err := datastore.DeserializePrefix[client.IndexDescription](ctx,
prefix.ToString(), txn.Systemstore())

if err != nil {
Expand All @@ -72,12 +69,15 @@ func (db *db) getAllIndexes(

indexes := make(map[client.CollectionName][]client.IndexDescription)

for _, indexRec := range deserializedIndexes {
indexKey, err := core.NewCollectionIndexKeyFromString(indexRec.key)
for i := range keys {
indexKey, err := core.NewCollectionIndexKeyFromString(keys[i])
if err != nil {
return nil, NewErrInvalidStoredIndexKey(indexKey.ToString())
}
indexes[indexKey.CollectionName] = append(indexes[indexKey.CollectionName], indexRec.element)
indexes[indexKey.CollectionName] = append(
indexes[indexKey.CollectionName],
indexDescriptions[i],
)
}

return indexes, nil
Expand All @@ -89,16 +89,12 @@ func (db *db) fetchCollectionIndexDescriptions(
colName string,
) ([]client.IndexDescription, error) {
prefix := core.NewCollectionIndexKey(colName, "")
deserializedIndexes, err := deserializePrefix[client.IndexDescription](ctx,
_, indexDescriptions, err := datastore.DeserializePrefix[client.IndexDescription](ctx,
prefix.ToString(), txn.Systemstore())
if err != nil {
return nil, err
}
indexes := make([]client.IndexDescription, 0, len(deserializedIndexes))
for _, indexRec := range deserializedIndexes {
indexes = append(indexes, indexRec.element)
}
return indexes, nil
return indexDescriptions, nil
}

func (c *collection) indexNewDoc(ctx context.Context, txn datastore.Txn, doc *client.Document) error {
Expand All @@ -115,27 +111,6 @@ func (c *collection) indexNewDoc(ctx context.Context, txn datastore.Txn, doc *cl
return nil
}

// collectIndexedFields returns all fields that are indexed by all collection indexes.
func (c *collection) collectIndexedFields() []client.FieldDescription {
fieldsMap := make(map[string]client.FieldDescription)
for _, index := range c.indexes {
for _, field := range index.Description().Fields {
for i := range c.desc.Schema.Fields {
colField := c.desc.Schema.Fields[i]
if field.Name == colField.Name {
fieldsMap[field.Name] = colField
break
}
}
}
}
fields := make([]client.FieldDescription, 0, len(fieldsMap))
for _, field := range fieldsMap {
fields = append(fields, field)
}
return fields
}

func (c *collection) updateIndexedDoc(
ctx context.Context,
txn datastore.Txn,
Expand All @@ -145,7 +120,13 @@ func (c *collection) updateIndexedDoc(
if err != nil {
return err
}
oldDoc, err := c.get(ctx, txn, c.getPrimaryKeyFromDocKey(doc.Key()), c.collectIndexedFields(), false)
desc := c.Description()
oldDoc, err := c.get(
ctx,
txn,
c.getPrimaryKeyFromDocKey(doc.Key()), desc.CollectIndexedFields(&desc.Schema),
false,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -370,7 +351,7 @@ func (c *collection) dropIndex(ctx context.Context, txn datastore.Txn, indexName
func (c *collection) dropAllIndexes(ctx context.Context, txn datastore.Txn) error {
prefix := core.NewCollectionIndexKey(c.Name(), "")

keys, err := fetchKeysForPrefix(ctx, prefix.ToString(), txn.Systemstore())
keys, err := datastore.FetchKeysForPrefix(ctx, prefix.ToString(), txn.Systemstore())
if err != nil {
return err
}
Expand Down Expand Up @@ -510,39 +491,3 @@ func generateIndexName(col client.Collection, fields []client.IndexedFieldDescri
}
return sb.String()
}

type deserializedElement[T any] struct {
key string
element T
}

func deserializePrefix[T any](
ctx context.Context,
prefix string,
storage ds.Read,
) ([]deserializedElement[T], error) {
q, err := storage.Query(ctx, query.Query{Prefix: prefix})
if err != nil {
return nil, NewErrFailedToCreateCollectionQuery(err)
}

elements := make([]deserializedElement[T], 0)
for res := range q.Next() {
if res.Error != nil {
_ = q.Close()
return nil, res.Error
}

var element T
err = json.Unmarshal(res.Value, &element)
if err != nil {
_ = q.Close()
return nil, NewErrInvalidStoredIndex(err)
}
elements = append(elements, deserializedElement[T]{key: res.Key, element: element})
}
if err := q.Close(); err != nil {
return nil, err
}
return elements, nil
}
18 changes: 18 additions & 0 deletions db/fetcher/encoded_doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,24 @@
return doc, nil
}

// MergeProperties merges the properties of the given document into this document.
// Existing fields of the current document are overwritten.
func (encdoc *encodedDocument) MergeProperties(other EncodedDocument) {
otherEncDoc, ok := other.(*encodedDocument)
if !ok {
return
}

Check warning on line 140 in db/fetcher/encoded_doc.go

View check run for this annotation

Codecov / codecov/patch

db/fetcher/encoded_doc.go#L139-L140

Added lines #L139 - L140 were not covered by tests
for field, prop := range otherEncDoc.properties {
encdoc.properties[field] = prop
}
if other.Key() != nil {
encdoc.key = other.Key()
}
if other.SchemaVersionID() != "" {
encdoc.schemaVersionID = other.SchemaVersionID()
}
}

// DecodeToDoc returns a decoded document as a
// map of field/value pairs
func DecodeToDoc(encdoc EncodedDocument, mapping *core.DocumentMapping, filter bool) (core.Doc, error) {
Expand Down
6 changes: 6 additions & 0 deletions db/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,22 @@ type ExecInfo struct {
DocsFetched uint64
// Number of fields fetched.
FieldsFetched uint64
// Number of indexes fetched.
IndexesFetched uint64
}

// Add adds the other ExecInfo to the current ExecInfo.
func (s *ExecInfo) Add(other ExecInfo) {
s.DocsFetched += other.DocsFetched
s.FieldsFetched += other.FieldsFetched
s.IndexesFetched += other.IndexesFetched
}

// Reset resets the ExecInfo.
func (s *ExecInfo) Reset() {
s.DocsFetched = 0
s.FieldsFetched = 0
s.IndexesFetched = 0
}

// Fetcher is the interface for collecting documents from the underlying data store.
Expand Down Expand Up @@ -576,6 +580,8 @@ func (df *DocumentFetcher) fetchNext(ctx context.Context) (EncodedDocument, Exec
// keyparts := df.kv.Key.List()
// key := keyparts[len(keyparts)-2]

prevExecInfo := df.execInfo
defer func() { df.execInfo.Add(prevExecInfo) }()
df.execInfo.Reset()
// iterate until we have collected all the necessary kv pairs for the doc
// we'll know when were done when either
Expand Down
Loading