Skip to content

Commit

Permalink
feat: Make queries utilise secondary indexes (#1925)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves #1555

## Description

With this change the the secondary indexes are utilised during querying
data.

A dedicated `Indexer` fetcher is implemented to perform fetching of
values of indexed fields.

Now there is a separate `filter` package that houses a lot of methods
for working with filters.

A new metric `indexesFetched` is introduced into `@explain` to provide
information about how many indexes has been fetched.

It also includes an update to the testing framework to allow adding
custom asserters.
The new ExplainResultsAsserter is used with this new feature.
  • Loading branch information
islamaliev authored Oct 13, 2023
1 parent bc4c704 commit c8bde64
Show file tree
Hide file tree
Showing 67 changed files with 4,219 additions and 598 deletions.
19 changes: 19 additions & 0 deletions client/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,22 @@ type IndexDescription struct {
// Fields contains the fields that are being indexed.
Fields []IndexedFieldDescription
}

// CollectIndexedFields returns all fields that are indexed by all collection indexes.
func (d CollectionDescription) CollectIndexedFields(schema *SchemaDescription) []FieldDescription {
fieldsMap := make(map[string]bool)
fields := make([]FieldDescription, 0, len(d.Indexes))
for _, index := range d.Indexes {
for _, field := range index.Fields {
for i := range schema.Fields {
colField := schema.Fields[i]
if field.Name == colField.Name && !fieldsMap[field.Name] {
fieldsMap[field.Name] = true
fields = append(fields, colField)
break
}
}
}
}
return fields
}
10 changes: 10 additions & 0 deletions datastore/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
"github.com/sourcenetwork/defradb/errors"
)

const (
errInvalidStoredValue string = "invalid stored value"
)

// Errors returnable from this package.
//
// This list is incomplete and undefined errors may also be returned.
Expand All @@ -26,3 +30,9 @@ var (
// ErrNotFound is an error returned when a block is not found.
ErrNotFound = errors.New("blockstore: block not found")
)

// NewErrInvalidStoredValue returns a new error indicating that the stored
// value in the database is invalid.
func NewErrInvalidStoredValue(inner error) error {
return errors.Wrap(errInvalidStoredValue, inner)
}
81 changes: 81 additions & 0 deletions datastore/prefix_query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2023 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package datastore

import (
"context"
"encoding/json"

ds "github.com/ipfs/go-datastore"

"github.com/ipfs/go-datastore/query"
)

// DeserializePrefix deserializes all elements with the given prefix from the given storage.
// It returns the keys and their corresponding elements.
func DeserializePrefix[T any](
ctx context.Context,
prefix string,
store ds.Read,
) ([]string, []T, error) {
q, err := store.Query(ctx, query.Query{Prefix: prefix})
if err != nil {
return nil, nil, err
}

keys := make([]string, 0)
elements := make([]T, 0)
for res := range q.Next() {
if res.Error != nil {
_ = q.Close()
return nil, nil, res.Error
}

var element T
err = json.Unmarshal(res.Value, &element)
if err != nil {
_ = q.Close()
return nil, nil, NewErrInvalidStoredValue(err)
}
keys = append(keys, res.Key)
elements = append(elements, element)
}
if err := q.Close(); err != nil {
return nil, nil, err
}
return keys, elements, nil
}

// FetchKeysForPrefix fetches all keys with the given prefix from the given storage.
func FetchKeysForPrefix(
ctx context.Context,
prefix string,
store ds.Read,
) ([]ds.Key, error) {
q, err := store.Query(ctx, query.Query{Prefix: prefix})
if err != nil {
return nil, err
}

keys := make([]ds.Key, 0)
for res := range q.Next() {
if res.Error != nil {
_ = q.Close()
return nil, res.Error
}
keys = append(keys, ds.NewKey(res.Key))
}
if err = q.Close(); err != nil {
return nil, err
}

return keys, nil
}
89 changes: 17 additions & 72 deletions db/collection_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ import (
"strconv"
"strings"

ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/datastore"
Expand Down Expand Up @@ -63,7 +60,7 @@ func (db *db) getAllIndexes(
) (map[client.CollectionName][]client.IndexDescription, error) {
prefix := core.NewCollectionIndexKey("", "")

deserializedIndexes, err := deserializePrefix[client.IndexDescription](ctx,
keys, indexDescriptions, err := datastore.DeserializePrefix[client.IndexDescription](ctx,
prefix.ToString(), txn.Systemstore())

if err != nil {
Expand All @@ -72,12 +69,15 @@ func (db *db) getAllIndexes(

indexes := make(map[client.CollectionName][]client.IndexDescription)

for _, indexRec := range deserializedIndexes {
indexKey, err := core.NewCollectionIndexKeyFromString(indexRec.key)
for i := range keys {
indexKey, err := core.NewCollectionIndexKeyFromString(keys[i])
if err != nil {
return nil, NewErrInvalidStoredIndexKey(indexKey.ToString())
}
indexes[indexKey.CollectionName] = append(indexes[indexKey.CollectionName], indexRec.element)
indexes[indexKey.CollectionName] = append(
indexes[indexKey.CollectionName],
indexDescriptions[i],
)
}

return indexes, nil
Expand All @@ -89,16 +89,12 @@ func (db *db) fetchCollectionIndexDescriptions(
colName string,
) ([]client.IndexDescription, error) {
prefix := core.NewCollectionIndexKey(colName, "")
deserializedIndexes, err := deserializePrefix[client.IndexDescription](ctx,
_, indexDescriptions, err := datastore.DeserializePrefix[client.IndexDescription](ctx,
prefix.ToString(), txn.Systemstore())
if err != nil {
return nil, err
}
indexes := make([]client.IndexDescription, 0, len(deserializedIndexes))
for _, indexRec := range deserializedIndexes {
indexes = append(indexes, indexRec.element)
}
return indexes, nil
return indexDescriptions, nil
}

func (c *collection) indexNewDoc(ctx context.Context, txn datastore.Txn, doc *client.Document) error {
Expand All @@ -115,27 +111,6 @@ func (c *collection) indexNewDoc(ctx context.Context, txn datastore.Txn, doc *cl
return nil
}

// collectIndexedFields returns all fields that are indexed by all collection indexes.
func (c *collection) collectIndexedFields() []client.FieldDescription {
fieldsMap := make(map[string]client.FieldDescription)
for _, index := range c.indexes {
for _, field := range index.Description().Fields {
for i := range c.desc.Schema.Fields {
colField := c.desc.Schema.Fields[i]
if field.Name == colField.Name {
fieldsMap[field.Name] = colField
break
}
}
}
}
fields := make([]client.FieldDescription, 0, len(fieldsMap))
for _, field := range fieldsMap {
fields = append(fields, field)
}
return fields
}

func (c *collection) updateIndexedDoc(
ctx context.Context,
txn datastore.Txn,
Expand All @@ -145,7 +120,13 @@ func (c *collection) updateIndexedDoc(
if err != nil {
return err
}
oldDoc, err := c.get(ctx, txn, c.getPrimaryKeyFromDocKey(doc.Key()), c.collectIndexedFields(), false)
desc := c.Description()
oldDoc, err := c.get(
ctx,
txn,
c.getPrimaryKeyFromDocKey(doc.Key()), desc.CollectIndexedFields(&desc.Schema),
false,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -370,7 +351,7 @@ func (c *collection) dropIndex(ctx context.Context, txn datastore.Txn, indexName
func (c *collection) dropAllIndexes(ctx context.Context, txn datastore.Txn) error {
prefix := core.NewCollectionIndexKey(c.Name(), "")

keys, err := fetchKeysForPrefix(ctx, prefix.ToString(), txn.Systemstore())
keys, err := datastore.FetchKeysForPrefix(ctx, prefix.ToString(), txn.Systemstore())
if err != nil {
return err
}
Expand Down Expand Up @@ -510,39 +491,3 @@ func generateIndexName(col client.Collection, fields []client.IndexedFieldDescri
}
return sb.String()
}

type deserializedElement[T any] struct {
key string
element T
}

func deserializePrefix[T any](
ctx context.Context,
prefix string,
storage ds.Read,
) ([]deserializedElement[T], error) {
q, err := storage.Query(ctx, query.Query{Prefix: prefix})
if err != nil {
return nil, NewErrFailedToCreateCollectionQuery(err)
}

elements := make([]deserializedElement[T], 0)
for res := range q.Next() {
if res.Error != nil {
_ = q.Close()
return nil, res.Error
}

var element T
err = json.Unmarshal(res.Value, &element)
if err != nil {
_ = q.Close()
return nil, NewErrInvalidStoredIndex(err)
}
elements = append(elements, deserializedElement[T]{key: res.Key, element: element})
}
if err := q.Close(); err != nil {
return nil, err
}
return elements, nil
}
18 changes: 18 additions & 0 deletions db/fetcher/encoded_doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,24 @@ func Decode(encdoc EncodedDocument) (*client.Document, error) {
return doc, nil
}

// MergeProperties merges the properties of the given document into this document.
// Existing fields of the current document are overwritten.
func (encdoc *encodedDocument) MergeProperties(other EncodedDocument) {
otherEncDoc, ok := other.(*encodedDocument)
if !ok {
return
}
for field, prop := range otherEncDoc.properties {
encdoc.properties[field] = prop
}
if other.Key() != nil {
encdoc.key = other.Key()
}
if other.SchemaVersionID() != "" {
encdoc.schemaVersionID = other.SchemaVersionID()
}
}

// DecodeToDoc returns a decoded document as a
// map of field/value pairs
func DecodeToDoc(encdoc EncodedDocument, mapping *core.DocumentMapping, filter bool) (core.Doc, error) {
Expand Down
6 changes: 6 additions & 0 deletions db/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,22 @@ type ExecInfo struct {
DocsFetched uint64
// Number of fields fetched.
FieldsFetched uint64
// Number of indexes fetched.
IndexesFetched uint64
}

// Add adds the other ExecInfo to the current ExecInfo.
func (s *ExecInfo) Add(other ExecInfo) {
s.DocsFetched += other.DocsFetched
s.FieldsFetched += other.FieldsFetched
s.IndexesFetched += other.IndexesFetched
}

// Reset resets the ExecInfo.
func (s *ExecInfo) Reset() {
s.DocsFetched = 0
s.FieldsFetched = 0
s.IndexesFetched = 0
}

// Fetcher is the interface for collecting documents from the underlying data store.
Expand Down Expand Up @@ -576,6 +580,8 @@ func (df *DocumentFetcher) fetchNext(ctx context.Context) (EncodedDocument, Exec
// keyparts := df.kv.Key.List()
// key := keyparts[len(keyparts)-2]

prevExecInfo := df.execInfo
defer func() { df.execInfo.Add(prevExecInfo) }()
df.execInfo.Reset()
// iterate until we have collected all the necessary kv pairs for the doc
// we'll know when were done when either
Expand Down
Loading

0 comments on commit c8bde64

Please sign in to comment.