Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit c8bde64
Author: Islam Aliev <aliev.islam@gmail.com>
Date:   Sat Oct 14 00:26:22 2023 +0200

    feat: Make queries utilise secondary indexes (sourcenetwork#1925)

    ## Relevant issue(s)

    Resolves sourcenetwork#1555

    ## Description

    With this change the the secondary indexes are utilised during querying
    data.

    A dedicated `Indexer` fetcher is implemented to perform fetching of
    values of indexed fields.

    Now there is a separate `filter` package that houses a lot of methods
    for working with filters.

    A new metric `indexesFetched` is introduced into `@explain` to provide
    information about how many indexes has been fetched.

    It also includes an update to the testing framework to allow adding
    custom asserters.
    The new ExplainResultsAsserter is used with this new feature.
  • Loading branch information
nasdf committed Oct 13, 2023
1 parent 76c9b5f commit 03d2a6e
Show file tree
Hide file tree
Showing 68 changed files with 4,218 additions and 598 deletions.
19 changes: 19 additions & 0 deletions client/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,22 @@ type IndexDescription struct {
// Fields contains the fields that are being indexed.
Fields []IndexedFieldDescription
}

// CollectIndexedFields returns all fields that are indexed by all collection indexes.
func (d CollectionDescription) CollectIndexedFields(schema *SchemaDescription) []FieldDescription {
fieldsMap := make(map[string]bool)
fields := make([]FieldDescription, 0, len(d.Indexes))
for _, index := range d.Indexes {
for _, field := range index.Fields {
for i := range schema.Fields {
colField := schema.Fields[i]
if field.Name == colField.Name && !fieldsMap[field.Name] {
fieldsMap[field.Name] = true
fields = append(fields, colField)
break
}
}
}
}
return fields
}
10 changes: 10 additions & 0 deletions datastore/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
"github.com/sourcenetwork/defradb/errors"
)

const (
errInvalidStoredValue string = "invalid stored value"
)

// Errors returnable from this package.
//
// This list is incomplete and undefined errors may also be returned.
Expand All @@ -26,3 +30,9 @@ var (
// ErrNotFound is an error returned when a block is not found.
ErrNotFound = errors.New("blockstore: block not found")
)

// NewErrInvalidStoredValue returns a new error indicating that the stored
// value in the database is invalid.
func NewErrInvalidStoredValue(inner error) error {
return errors.Wrap(errInvalidStoredValue, inner)
}
81 changes: 81 additions & 0 deletions datastore/prefix_query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2023 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package datastore

import (
"context"
"encoding/json"

ds "github.com/ipfs/go-datastore"

"github.com/ipfs/go-datastore/query"
)

// DeserializePrefix deserializes all elements with the given prefix from the given storage.
// It returns the keys and their corresponding elements.
func DeserializePrefix[T any](
ctx context.Context,
prefix string,
store ds.Read,
) ([]string, []T, error) {
q, err := store.Query(ctx, query.Query{Prefix: prefix})
if err != nil {
return nil, nil, err
}

keys := make([]string, 0)
elements := make([]T, 0)
for res := range q.Next() {
if res.Error != nil {
_ = q.Close()
return nil, nil, res.Error
}

var element T
err = json.Unmarshal(res.Value, &element)
if err != nil {
_ = q.Close()
return nil, nil, NewErrInvalidStoredValue(err)
}
keys = append(keys, res.Key)
elements = append(elements, element)
}
if err := q.Close(); err != nil {
return nil, nil, err
}
return keys, elements, nil
}

// FetchKeysForPrefix fetches all keys with the given prefix from the given storage.
func FetchKeysForPrefix(
ctx context.Context,
prefix string,
store ds.Read,
) ([]ds.Key, error) {
q, err := store.Query(ctx, query.Query{Prefix: prefix})
if err != nil {
return nil, err
}

keys := make([]ds.Key, 0)
for res := range q.Next() {
if res.Error != nil {
_ = q.Close()
return nil, res.Error
}
keys = append(keys, ds.NewKey(res.Key))
}
if err = q.Close(); err != nil {
return nil, err
}

return keys, nil
}
89 changes: 17 additions & 72 deletions db/collection_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ import (
"strconv"
"strings"

ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/datastore"
Expand Down Expand Up @@ -63,7 +60,7 @@ func (db *db) getAllIndexes(
) (map[client.CollectionName][]client.IndexDescription, error) {
prefix := core.NewCollectionIndexKey("", "")

deserializedIndexes, err := deserializePrefix[client.IndexDescription](ctx,
keys, indexDescriptions, err := datastore.DeserializePrefix[client.IndexDescription](ctx,
prefix.ToString(), txn.Systemstore())

if err != nil {
Expand All @@ -72,12 +69,15 @@ func (db *db) getAllIndexes(

indexes := make(map[client.CollectionName][]client.IndexDescription)

for _, indexRec := range deserializedIndexes {
indexKey, err := core.NewCollectionIndexKeyFromString(indexRec.key)
for i := range keys {
indexKey, err := core.NewCollectionIndexKeyFromString(keys[i])
if err != nil {
return nil, NewErrInvalidStoredIndexKey(indexKey.ToString())
}
indexes[indexKey.CollectionName] = append(indexes[indexKey.CollectionName], indexRec.element)
indexes[indexKey.CollectionName] = append(
indexes[indexKey.CollectionName],
indexDescriptions[i],
)
}

return indexes, nil
Expand All @@ -89,16 +89,12 @@ func (db *db) fetchCollectionIndexDescriptions(
colName string,
) ([]client.IndexDescription, error) {
prefix := core.NewCollectionIndexKey(colName, "")
deserializedIndexes, err := deserializePrefix[client.IndexDescription](ctx,
_, indexDescriptions, err := datastore.DeserializePrefix[client.IndexDescription](ctx,
prefix.ToString(), txn.Systemstore())
if err != nil {
return nil, err
}
indexes := make([]client.IndexDescription, 0, len(deserializedIndexes))
for _, indexRec := range deserializedIndexes {
indexes = append(indexes, indexRec.element)
}
return indexes, nil
return indexDescriptions, nil
}

func (c *collection) indexNewDoc(ctx context.Context, txn datastore.Txn, doc *client.Document) error {
Expand All @@ -115,27 +111,6 @@ func (c *collection) indexNewDoc(ctx context.Context, txn datastore.Txn, doc *cl
return nil
}

// collectIndexedFields returns all fields that are indexed by all collection indexes.
func (c *collection) collectIndexedFields() []client.FieldDescription {
fieldsMap := make(map[string]client.FieldDescription)
for _, index := range c.indexes {
for _, field := range index.Description().Fields {
for i := range c.desc.Schema.Fields {
colField := c.desc.Schema.Fields[i]
if field.Name == colField.Name {
fieldsMap[field.Name] = colField
break
}
}
}
}
fields := make([]client.FieldDescription, 0, len(fieldsMap))
for _, field := range fieldsMap {
fields = append(fields, field)
}
return fields
}

func (c *collection) updateIndexedDoc(
ctx context.Context,
txn datastore.Txn,
Expand All @@ -145,7 +120,13 @@ func (c *collection) updateIndexedDoc(
if err != nil {
return err
}
oldDoc, err := c.get(ctx, txn, c.getPrimaryKeyFromDocKey(doc.Key()), c.collectIndexedFields(), false)
desc := c.Description()
oldDoc, err := c.get(
ctx,
txn,
c.getPrimaryKeyFromDocKey(doc.Key()), desc.CollectIndexedFields(&desc.Schema),
false,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -370,7 +351,7 @@ func (c *collection) dropIndex(ctx context.Context, txn datastore.Txn, indexName
func (c *collection) dropAllIndexes(ctx context.Context, txn datastore.Txn) error {
prefix := core.NewCollectionIndexKey(c.Name(), "")

keys, err := fetchKeysForPrefix(ctx, prefix.ToString(), txn.Systemstore())
keys, err := datastore.FetchKeysForPrefix(ctx, prefix.ToString(), txn.Systemstore())
if err != nil {
return err
}
Expand Down Expand Up @@ -510,39 +491,3 @@ func generateIndexName(col client.Collection, fields []client.IndexedFieldDescri
}
return sb.String()
}

type deserializedElement[T any] struct {
key string
element T
}

func deserializePrefix[T any](
ctx context.Context,
prefix string,
storage ds.Read,
) ([]deserializedElement[T], error) {
q, err := storage.Query(ctx, query.Query{Prefix: prefix})
if err != nil {
return nil, NewErrFailedToCreateCollectionQuery(err)
}

elements := make([]deserializedElement[T], 0)
for res := range q.Next() {
if res.Error != nil {
_ = q.Close()
return nil, res.Error
}

var element T
err = json.Unmarshal(res.Value, &element)
if err != nil {
_ = q.Close()
return nil, NewErrInvalidStoredIndex(err)
}
elements = append(elements, deserializedElement[T]{key: res.Key, element: element})
}
if err := q.Close(); err != nil {
return nil, err
}
return elements, nil
}
18 changes: 18 additions & 0 deletions db/fetcher/encoded_doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,24 @@ func Decode(encdoc EncodedDocument) (*client.Document, error) {
return doc, nil
}

// MergeProperties merges the properties of the given document into this document.
// Existing fields of the current document are overwritten.
func (encdoc *encodedDocument) MergeProperties(other EncodedDocument) {
otherEncDoc, ok := other.(*encodedDocument)
if !ok {
return
}
for field, prop := range otherEncDoc.properties {
encdoc.properties[field] = prop
}
if other.Key() != nil {
encdoc.key = other.Key()
}
if other.SchemaVersionID() != "" {
encdoc.schemaVersionID = other.SchemaVersionID()
}
}

// DecodeToDoc returns a decoded document as a
// map of field/value pairs
func DecodeToDoc(encdoc EncodedDocument, mapping *core.DocumentMapping, filter bool) (core.Doc, error) {
Expand Down
6 changes: 6 additions & 0 deletions db/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,22 @@ type ExecInfo struct {
DocsFetched uint64
// Number of fields fetched.
FieldsFetched uint64
// Number of indexes fetched.
IndexesFetched uint64
}

// Add adds the other ExecInfo to the current ExecInfo.
func (s *ExecInfo) Add(other ExecInfo) {
s.DocsFetched += other.DocsFetched
s.FieldsFetched += other.FieldsFetched
s.IndexesFetched += other.IndexesFetched
}

// Reset resets the ExecInfo.
func (s *ExecInfo) Reset() {
s.DocsFetched = 0
s.FieldsFetched = 0
s.IndexesFetched = 0
}

// Fetcher is the interface for collecting documents from the underlying data store.
Expand Down Expand Up @@ -576,6 +580,8 @@ func (df *DocumentFetcher) fetchNext(ctx context.Context) (EncodedDocument, Exec
// keyparts := df.kv.Key.List()
// key := keyparts[len(keyparts)-2]

prevExecInfo := df.execInfo
defer func() { df.execInfo.Add(prevExecInfo) }()
df.execInfo.Reset()
// iterate until we have collected all the necessary kv pairs for the doc
// we'll know when were done when either
Expand Down
Loading

0 comments on commit 03d2a6e

Please sign in to comment.