Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Make peers sync secondary index #2390

Merged
merged 6 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,15 @@ type Store interface {
// GetAllIndexes returns all the indexes that currently exist within this [Store].
GetAllIndexes(context.Context) (map[CollectionName][]IndexDescription, error)

// CreateDocIndex creates an index for the given document.
CreateDocIndex(context.Context, Collection, *Document) error
AndrewSisley marked this conversation as resolved.
Show resolved Hide resolved
islamaliev marked this conversation as resolved.
Show resolved Hide resolved

// UpdateDocIndex updates the index for the given document.
UpdateDocIndex(ctx context.Context, col Collection, oldDoc, newDoc *Document) error

// DeleteDocIndex deletes the index for the given document.
DeleteDocIndex(context.Context, Collection, *Document) error

// ExecRequest executes the given GQL request against the [Store].
ExecRequest(context.Context, string) *RequestResult
}
Expand Down
20 changes: 20 additions & 0 deletions client/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@

package client

import (
"context"

"github.com/sourcenetwork/defradb/datastore"
)

// IndexFieldDescription describes how a field is being indexed.
type IndexedFieldDescription struct {
// Name contains the name of the field.
Expand All @@ -30,6 +36,20 @@ type IndexDescription struct {
Unique bool
}

// CollectionIndex is an interface for indexing documents in a collection.
type CollectionIndex interface {
// Save indexes a document by storing it
Save(context.Context, datastore.Txn, *Document) error
// Update updates an existing document in the index
Update(context.Context, datastore.Txn, *Document, *Document) error
// Delete deletes an existing document from the index
Delete(context.Context, datastore.Txn, *Document) error
// Name returns the name of the index
Name() string
// Description returns the description of the index
Description() IndexDescription
}

// CollectIndexedFields returns all fields that are indexed by all collection indexes.
func (d CollectionDefinition) CollectIndexedFields() []FieldDefinition {
fieldsMap := make(map[string]bool)
Expand Down
27 changes: 24 additions & 3 deletions db/collection_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@
return col.DropIndex(ctx, indexName)
}

// getAllIndexes returns all the indexes in the database.
func (db *db) getAllIndexes(
// getAllIndexDescriptions returns all the index descriptions in the database.
func (db *db) getAllIndexDescriptions(
ctx context.Context,
txn datastore.Txn,
) (map[client.CollectionName][]client.IndexDescription, error) {
Expand Down Expand Up @@ -107,6 +107,26 @@
return indexDescriptions, nil
}

func (db *db) getCollectionIndexes(
ctx context.Context,
txn datastore.Txn,
col client.Collection,
) ([]CollectionIndex, error) {
indexDescriptions, err := db.fetchCollectionIndexDescriptions(ctx, txn, col.ID())
islamaliev marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}

Check warning on line 118 in db/collection_index.go

View check run for this annotation

Codecov / codecov/patch

db/collection_index.go#L117-L118

Added lines #L117 - L118 were not covered by tests
colIndexes := make([]CollectionIndex, 0, len(indexDescriptions))
for _, indexDesc := range indexDescriptions {
index, err := NewCollectionIndex(col, indexDesc)
if err != nil {
return nil, err
}

Check warning on line 124 in db/collection_index.go

View check run for this annotation

Codecov / codecov/patch

db/collection_index.go#L123-L124

Added lines #L123 - L124 were not covered by tests
colIndexes = append(colIndexes, index)
}
return colIndexes, nil
}

func (c *collection) indexNewDoc(ctx context.Context, txn datastore.Txn, doc *client.Document) error {
err := c.loadIndexes(ctx, txn)
if err != nil {
Expand All @@ -133,7 +153,8 @@
oldDoc, err := c.get(
ctx,
txn,
c.getPrimaryKeyFromDocID(doc.ID()), c.Definition().CollectIndexedFields(),
c.getPrimaryKeyFromDocID(doc.ID()),
c.Definition().CollectIndexedFields(),
false,
)
if err != nil {
Expand Down
25 changes: 17 additions & 8 deletions db/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,9 @@
// It abstracts away common index functionality to be implemented
// by different index types: non-unique, unique, and composite
type CollectionIndex interface {
// Save indexes a document by storing it
Save(context.Context, datastore.Txn, *client.Document) error
// Update updates an existing document in the index
Update(context.Context, datastore.Txn, *client.Document, *client.Document) error
client.CollectionIndex
// RemoveAll removes all documents from the index
RemoveAll(context.Context, datastore.Txn) error
// Name returns the name of the index
Name() string
// Description returns the description of the index
Description() client.IndexDescription
}

func canConvertIndexFieldValue[T any](val any) bool {
Expand Down Expand Up @@ -248,6 +241,14 @@
return index.Save(ctx, txn, newDoc)
}

func (index *collectionSimpleIndex) Delete(
ctx context.Context,
txn datastore.Txn,
doc *client.Document,
) error {
return index.deleteDocIndex(ctx, txn, doc)
}

func (index *collectionSimpleIndex) deleteDocIndex(
ctx context.Context,
txn datastore.Txn,
Expand Down Expand Up @@ -358,6 +359,14 @@
return key, val, nil
}

func (index *collectionUniqueIndex) Delete(
ctx context.Context,
txn datastore.Txn,
doc *client.Document,
) error {
return index.deleteDocIndex(ctx, txn, doc)

Check warning on line 367 in db/index.go

View check run for this annotation

Codecov / codecov/patch

db/index.go#L366-L367

Added lines #L366 - L367 were not covered by tests
}

func (index *collectionUniqueIndex) Update(
ctx context.Context,
txn datastore.Txn,
Expand Down
2 changes: 1 addition & 1 deletion db/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (f *indexTestFixture) createCollectionIndexFor(
}

func (f *indexTestFixture) getAllIndexes() (map[client.CollectionName][]client.IndexDescription, error) {
return f.db.getAllIndexes(f.ctx, f.txn)
return f.db.getAllIndexDescriptions(f.ctx, f.txn)
}

func (f *indexTestFixture) getCollectionIndexes(colID uint32) ([]client.IndexDescription, error) {
Expand Down
144 changes: 142 additions & 2 deletions db/txn_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,154 @@
}
defer txn.Discard(ctx)

return db.getAllIndexes(ctx, txn)
return db.getAllIndexDescriptions(ctx, txn)
}

// GetAllIndexes gets all the indexes in the database.
func (db *explicitTxnDB) GetAllIndexes(
ctx context.Context,
) (map[client.CollectionName][]client.IndexDescription, error) {
return db.getAllIndexes(ctx, db.txn)
return db.getAllIndexDescriptions(ctx, db.txn)

Check warning on line 184 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L184

Added line #L184 was not covered by tests
}

// CreateDocIndex creates a new index for the given document.
func (db *implicitTxnDB) CreateDocIndex(
ctx context.Context,
col client.Collection,
doc *client.Document,
) error {
txn, err := db.NewTxn(ctx, true)
if err != nil {
return err
}
defer txn.Discard(ctx)

indexes, err := db.getCollectionIndexes(ctx, txn, col)
if err != nil {
return err
}

Check warning on line 202 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L192-L202

Added lines #L192 - L202 were not covered by tests

for _, index := range indexes {
islamaliev marked this conversation as resolved.
Show resolved Hide resolved
err := index.Save(ctx, txn, doc)
if err != nil {
return err
}

Check warning on line 208 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L204-L208

Added lines #L204 - L208 were not covered by tests
}
return txn.Commit(ctx)

Check warning on line 210 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L210

Added line #L210 was not covered by tests
}

// CreateDocIndex creates a new index for the given document.
func (db *explicitTxnDB) CreateDocIndex(
ctx context.Context,
col client.Collection,
doc *client.Document,
) error {
indexes, err := db.getCollectionIndexes(ctx, db.txn, col)
if err != nil {
return err
}

Check warning on line 222 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L221-L222

Added lines #L221 - L222 were not covered by tests

for _, index := range indexes {
err := index.Save(ctx, db.txn, doc)
if err != nil {
return err
}

Check warning on line 228 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L227-L228

Added lines #L227 - L228 were not covered by tests
}
return nil
}

// UpdateDocIndex updates the indexes for the given document.
func (db *implicitTxnDB) UpdateDocIndex(
ctx context.Context,
col client.Collection,
oldDoc *client.Document,
newDoc *client.Document,
) error {
txn, err := db.NewTxn(ctx, true)
if err != nil {
return err
}
defer txn.Discard(ctx)

indexes, err := db.getCollectionIndexes(ctx, txn, col)
if err != nil {
return err
}

Check warning on line 249 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L239-L249

Added lines #L239 - L249 were not covered by tests

for _, index := range indexes {
err := index.Update(ctx, txn, oldDoc, newDoc)
if err != nil {
return err
}

Check warning on line 255 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L251-L255

Added lines #L251 - L255 were not covered by tests
}
return txn.Commit(ctx)

Check warning on line 257 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L257

Added line #L257 was not covered by tests
}

// UpdateDocIndex updates the indexes for the given document.
func (db *explicitTxnDB) UpdateDocIndex(
ctx context.Context,
col client.Collection,
oldDoc *client.Document,
newDoc *client.Document,
) error {
indexes, err := db.getCollectionIndexes(ctx, db.txn, col)
if err != nil {
return err
}

Check warning on line 270 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L269-L270

Added lines #L269 - L270 were not covered by tests

for _, index := range indexes {
err := index.Update(ctx, db.txn, oldDoc, newDoc)
if err != nil {
return err
}

Check warning on line 276 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L275-L276

Added lines #L275 - L276 were not covered by tests
}
return nil
}

// DeleteDocIndex deletes the indexes for the given document.
func (db *implicitTxnDB) DeleteDocIndex(
ctx context.Context,
col client.Collection,
doc *client.Document,
) error {
txn, err := db.NewTxn(ctx, true)
if err != nil {
return err
}
defer txn.Discard(ctx)

indexes, err := db.getCollectionIndexes(ctx, txn, col)
if err != nil {
return err
}

Check warning on line 296 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L286-L296

Added lines #L286 - L296 were not covered by tests

for _, index := range indexes {
err := index.Delete(ctx, txn, doc)
if err != nil {
return err
}

Check warning on line 302 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L298-L302

Added lines #L298 - L302 were not covered by tests
}
return txn.Commit(ctx)

Check warning on line 304 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L304

Added line #L304 was not covered by tests
}

// DeleteDocIndex deletes the indexes for the given document.
func (db *explicitTxnDB) DeleteDocIndex(
ctx context.Context,
col client.Collection,
doc *client.Document,
) error {
indexes, err := db.getCollectionIndexes(ctx, db.txn, col)
if err != nil {
return err
}

Check warning on line 316 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L315-L316

Added lines #L315 - L316 were not covered by tests

for _, index := range indexes {
err := index.Delete(ctx, db.txn, doc)
if err != nil {
return err
}

Check warning on line 322 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L321-L322

Added lines #L321 - L322 were not covered by tests
}
return nil
}

// AddSchema takes the provided GQL schema in SDL format, and applies it to the database,
Expand Down
29 changes: 27 additions & 2 deletions http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@
return result
}
if res.Header.Get("Content-Type") == "text/event-stream" {
result.Pub = c.execRequestSubscription(ctx, res.Body)
result.Pub = c.execRequestSubscription(res.Body)
return result
}
// ignore close errors because they have
Expand All @@ -367,7 +367,7 @@
return result
}

func (c *Client) execRequestSubscription(ctx context.Context, r io.ReadCloser) *events.Publisher[events.Update] {
func (c *Client) execRequestSubscription(r io.ReadCloser) *events.Publisher[events.Update] {
pubCh := events.New[events.Update](0, 0)
pub, err := events.NewPublisher[events.Update](pubCh, 0)
if err != nil {
Expand Down Expand Up @@ -434,3 +434,28 @@
func (c *Client) MaxTxnRetries() int {
panic("client side database")
}

func (c *Client) CreateDocIndex(
ctx context.Context,
col client.Collection,
doc *client.Document,
) error {
panic("client side database")

Check warning on line 443 in http/client.go

View check run for this annotation

Codecov / codecov/patch

http/client.go#L442-L443

Added lines #L442 - L443 were not covered by tests
islamaliev marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *Client) UpdateDocIndex(
ctx context.Context,
col client.Collection,
oldDoc *client.Document,
newDoc *client.Document,
) error {
panic("client side database")

Check warning on line 452 in http/client.go

View check run for this annotation

Codecov / codecov/patch

http/client.go#L451-L452

Added lines #L451 - L452 were not covered by tests
}

func (w *Client) DeleteDocIndex(
ctx context.Context,
col client.Collection,
newDoc *client.Document,
) error {
panic("client side database")

Check warning on line 460 in http/client.go

View check run for this annotation

Codecov / codecov/patch

http/client.go#L459-L460

Added lines #L459 - L460 were not covered by tests
}
Loading
Loading