Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ability to delete multiple documents using filter. #206

Merged
merged 4 commits into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 133 additions & 40 deletions db/collection_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ func (c *Collection) DeleteWith(
}

// DeleteWithKey deletes using a DocKey to target a single document for delete.
func (c *Collection) DeleteWithKey(ctx context.Context, key key.DocKey, opts ...client.DeleteOpt) (*client.DeleteResult, error) {
func (c *Collection) DeleteWithKey(
ctx context.Context,
key key.DocKey,
opts ...client.DeleteOpt) (*client.DeleteResult, error) {

txn, err := c.getTxn(ctx, false)
if err != nil {
Expand All @@ -90,7 +93,10 @@ func (c *Collection) DeleteWithKey(ctx context.Context, key key.DocKey, opts ...
}

// DeleteWithKeys is the same as DeleteWithKey but accepts multiple keys as a slice.
func (c *Collection) DeleteWithKeys(ctx context.Context, keys []key.DocKey, opts ...client.DeleteOpt) (*client.DeleteResult, error) {
func (c *Collection) DeleteWithKeys(
ctx context.Context,
keys []key.DocKey,
opts ...client.DeleteOpt) (*client.DeleteResult, error) {

txn, err := c.getTxn(ctx, false)
if err != nil {
Expand All @@ -107,9 +113,67 @@ func (c *Collection) DeleteWithKeys(ctx context.Context, keys []key.DocKey, opts
return res, c.commitImplicitTxn(ctx, txn)
}

func (c *Collection) deleteWithKeys(ctx context.Context, txn core.Txn, keys []key.DocKey, opts ...client.DeleteOpt) (*client.DeleteResult, error) {
// DeleteWithFilter deletes using a filter to target documents for delete.
func (c *Collection) DeleteWithFilter(
ctx context.Context,
filter interface{},
opts ...client.DeleteOpt) (*client.DeleteResult, error) {

txn, err := c.getTxn(ctx, false)
if err != nil {
return nil, err
}

defer c.discardImplicitTxn(ctx, txn)

res, err := c.deleteWithFilter(ctx, txn, filter, opts...)
if err != nil {
return nil, err
}

return res, c.commitImplicitTxn(ctx, txn)

}

func (c *Collection) deleteWithKey(
ctx context.Context,
txn core.Txn,
key key.DocKey,
opts ...client.DeleteOpt) (*client.DeleteResult, error) {
// Check the docKey we have been given to delete with actually has a corresponding
// document (i.e. document actually exists in the collection).
found, err := c.exists(ctx, txn, key)
if err != nil {
return nil, err
}
if !found {
return nil, ErrDocumentNotFound
}

// Apply the function that will perform the full deletion of the document.
err = c.applyFullDelete(ctx, txn, key)
if err != nil {
return nil, err
}

// Upon successfull deletion, record a summary.
results := &client.DeleteResult{
Count: 1,
DocKeys: []string{key.String()},
}

return results, nil
}

keysDeleted := []string{}
func (c *Collection) deleteWithKeys(
ctx context.Context,
txn core.Txn,
keys []key.DocKey,
opts ...client.DeleteOpt) (*client.DeleteResult, error) {

results := &client.DeleteResult{
DocKeys: make([]string, 0),
}

for _, key := range keys {

Expand All @@ -130,47 +194,79 @@ func (c *Collection) deleteWithKeys(ctx context.Context, txn core.Txn, keys []ke
}

// Add this deleted key to our list.
keysDeleted = append(keysDeleted, key.String())
results.DocKeys = append(results.DocKeys, key.String())
}

// Upon successfull deletion, record a summary.
results := &client.DeleteResult{
Count: int64(len(keysDeleted)),
DocKeys: keysDeleted,
}
// Upon successfull deletion, record a summary of how many we deleted.
results.Count = int64(len(results.DocKeys))

return results, nil
}

func (c *Collection) deleteWithKey(ctx context.Context, txn core.Txn, key key.DocKey, opts ...client.DeleteOpt) (*client.DeleteResult, error) {
// Check the docKey we have been given to delete with actually has a corresponding
// document (i.e. document actually exists in the collection).
found, err := c.exists(ctx, txn, key)
func (c *Collection) deleteWithFilter(
ctx context.Context,
txn core.Txn,
filter interface{},
opts ...client.DeleteOpt) (*client.DeleteResult, error) {

// Do a selection query to scan through documents using the given filter.
query, err := c.makeSelectionQuery(ctx, txn, filter)
if err != nil {
return nil, err
}
if !found {
return nil, ErrDocumentNotFound
}

// Apply the function that will perform the full deletion of the document.
err = c.applyFullDelete(ctx, txn, key)
if err != nil {
if err := query.Start(); err != nil {
shahzadlone marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}

// Upon successfull deletion, record a summary.
// If the query object isn't properly closed at any exit point log the error.
defer func() {
if err := query.Close(); err != nil {
log.Errorf("Failed to close query after filter delete: %w", err)
}
}()

results := &client.DeleteResult{
Count: 1,
DocKeys: []string{key.String()},
DocKeys: make([]string, 0),
}

// Keep looping until results from the filter query have been iterated through.
for {
next, err := query.Next()
if err != nil {
return nil, err
}

// If no results remaining / or gotten then break out of the loop.
if !next {
break
}

// Extract the dockey in the string format from the document value.
docKey := query.Values()[parser.DocKeyFieldName].(string)

// Convert from string to key.DocKey.
key, err := key.NewFromString(docKey)
if err != nil {
return nil, err
}

// Delete the document that is assosiated with this key we got from the filter.
err = c.applyFullDelete(ctx, txn, key)
if err != nil {
return nil, err
}

// Add key of successfully deleted document to our list.
results.DocKeys = append(results.DocKeys, docKey)
}

results.Count = int64(len(results.DocKeys))

return results, nil
}

type dagDeleter struct {
bstore core.DAGStore
// queue *list.List
}

func newDagDeleter(bstore core.DAGStore) dagDeleter {
Expand Down Expand Up @@ -204,7 +300,7 @@ func (c *Collection) applyFullDelete(
}

// 1. =========================== Delete blockstore state ===========================
// blocks: /db/blocks/CIQSDFKLJGHFKLSJGHHJKKLGHGLHSKLHKJGS => KLJSFHGLKJFHJKDLGKHDGLHGLFDHGLFDGKGHL
// blocks: /db/blocks/CIQSDFKLJGHFKLGHGLHSKLHKJGS => KGLKJFHJKDLGKHDGLHGLFDHGLFDGKGHL

// Covert dockey to compositeKey as follows:
// * dockey: bae-kljhLKHJG-lkjhgkldjhlzkdf-kdhflkhjsklgh-kjdhlkghjs
Expand Down Expand Up @@ -294,11 +390,14 @@ func (d dagDeleter) run(ctx context.Context, targetCid cid.Cid) error {
return err
}

// Attempt deleting the current block and it's links (in a mutally recursive fashion.)
// Attempt deleting the current block and it's links (in a mutally recursive fashion)
return d.delete(ctx, targetCid, block)
}

// (ipld.Block(ipldProtobufNode{Data: (cbor(crdt deltaPayload)), Links: (_head => parentCid, fieldName => fieldCid)))
// (ipld.Block
// (ipldProtobufNode{
// Data: (cbor(crdt deltaPayload)),
// Links: (_head => parentCid, fieldName => fieldCid)))
func (d dagDeleter) delete(
ctx context.Context,
targetCid cid.Cid,
Expand Down Expand Up @@ -326,22 +425,16 @@ func (d dagDeleter) delete(

// =================================== UNIMPLEMENTED ===================================

// DeleteWithFilter deletes using a filter to target documents for delete.
func (c *Collection) DeleteWithFilter(ctx context.Context, filter interface{}, opts ...client.DeleteOpt) (*client.DeleteResult, error) {
return nil, nil
}

// DeleteWithDoc deletes targeting the supplied document.
func (c *Collection) DeleteWithDoc(doc *document.SimpleDocument, opts ...client.DeleteOpt) error {
func (c *Collection) DeleteWithDoc(
doc *document.SimpleDocument,
opts ...client.DeleteOpt) error {
return nil
}

// DeleteWithDocs deletes all the supplied documents in the slice.
func (c *Collection) DeleteWithDocs(docs []*document.SimpleDocument, opts ...client.DeleteOpt) error {
func (c *Collection) DeleteWithDocs(
docs []*document.SimpleDocument,
opts ...client.DeleteOpt) error {
return nil
}

//nolint:unused
func (c *Collection) deleteWithFilter(ctx context.Context, txn core.Txn, filter interface{}, opts ...client.DeleteOpt) (*client.DeleteResult, error) {
return nil, nil
}
59 changes: 15 additions & 44 deletions db/collection_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,13 @@ func (c *Collection) updateWithKeys(ctx context.Context, txn core.Txn, keys []ke
return results, nil
}

func (c *Collection) updateWithFilter(ctx context.Context, txn core.Txn, filter interface{}, updater interface{}, opts ...client.UpdateOpt) (*client.UpdateResult, error) {
func (c *Collection) updateWithFilter(
ctx context.Context,
txn core.Txn,
filter interface{},
updater interface{},
opts ...client.UpdateOpt) (*client.UpdateResult, error) {

patch, err := parseUpdater(updater)
if err != nil {
return nil, err
Expand All @@ -234,14 +240,21 @@ func (c *Collection) updateWithFilter(ctx context.Context, txn core.Txn, filter
}

// scan through docs with filter
query, err := c.makeSelectionQuery(ctx, txn, filter, opts...)
query, err := c.makeSelectionQuery(ctx, txn, filter)
if err != nil {
return nil, err
}
if err = query.Start(); err != nil {
return nil, err
}

// If the query object isn't properly closed at any exit point log the error.
defer func() {
if err := query.Close(); err != nil {
log.Errorf("Failed to close query after filter update: %w", err)
}
}()

results := &client.UpdateResult{
DocKeys: make([]string, 0),
}
Expand Down Expand Up @@ -273,51 +286,9 @@ func (c *Collection) updateWithFilter(ctx context.Context, txn core.Txn, filter
results.Count++
}

err = query.Close()
if err != nil {
return nil, err
}
return results, nil
}

// func (c *Collection) updateWithFilterPatch(txn *Txn, filter map[string]interface{}, patch []map[string]interface{}, opts ...client.UpdateOpt) (*UpdateResult, error) {
// // scan through docs with filter
// query, err := c.makeQuery(filter, opts...)
// if err != nil {
// return nil, err
// }
// if err := query.Start(); err != nil {
// return nil, err
// }

// // loop while we still have results from the filter query
// for {
// next, err := query.Next()
// if err != nil {
// return nil, err
// }

// // if theres no more records from the query, jump out of the loop
// if !next {
// break
// }

// // Get the document, and apply the patch
// doc := query.Values()
// }

// // loop through patch ops
// // apply each
// // if op is a sub field, get target collection and docID, call c.applyUpdateWithPatch()
// return nil, nil
// }

// func (c *Collection) updateWithFilterMergePatch(txn *Txn, filter map[string]interface{}, merge map[string]interface{}, opts ...client.UpdateOpt) (*UpdateResult, error) {
// // loop through the fields of merge patch
// // apply
// return nil, nil
// }

func (c *Collection) applyPatch(txn core.Txn, doc map[string]interface{}, patch []map[string]interface{}) error {
for _, op := range patch {
path, ok := op["path"].(string)
Expand Down
Loading