Skip to content

Commit

Permalink
fix(remove): RemoveEntireDataset is better at cleaning up broken states
Browse files Browse the repository at this point in the history
RemoveEntireDataset is a function that cleans up all state related to a dataset in the repository: ipfs pinning, logbook, and the refstore. It continues on despite errors happening, in order to properly clean up when problems happen. Move it to its own file, and add a test that demonstrates how it fixes a problem caused by running `qri init` followed by `rm`ing the working directory. Add better logging to the `Remove` method in order to more easily debug problems like this in the future.
  • Loading branch information
dustmop committed Feb 12, 2020
1 parent 4e713ca commit efaf600
Show file tree
Hide file tree
Showing 9 changed files with 452 additions and 316 deletions.
Binary file modified api/testdata/api.snapshot
Binary file not shown.
81 changes: 0 additions & 81 deletions base/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/qri-io/dataset"
"github.com/qri-io/qfs"
"github.com/qri-io/qfs/cafs"
"github.com/qri-io/qri/base/dsfs"
"github.com/qri-io/qri/logbook"
"github.com/qri-io/qri/repo"
reporef "github.com/qri-io/qri/repo/ref"
)
Expand Down Expand Up @@ -309,82 +307,3 @@ func UnpinDataset(ctx context.Context, r repo.Repo, ref reporef.DatasetRef) erro
}
return repo.ErrNotPinner
}

// RemoveNVersionsFromStore removes n versions of a dataset from the store starting with
// the most recent version
// when n == -1, remove all versions
// does not remove the dataset reference
func RemoveNVersionsFromStore(ctx context.Context, r repo.Repo, ref *reporef.DatasetRef, n int) (*reporef.DatasetRef, error) {
var err error
if r == nil {
return nil, fmt.Errorf("need a repo")
}
// ref is nil or ref has no path err
if ref == nil || ref.Path == "" {
return nil, fmt.Errorf("need a dataset reference with a path")
}

if n < -1 {
return nil, fmt.Errorf("invalid 'n', n should be n >= 0 or n == -1 to indicate removing all versions")
}

// load previous dataset into prev
ref.Dataset, err = dsfs.LoadDatasetRefs(ctx, r.Store(), ref.Path)
if err != nil {
return nil, err
}

curr := *ref

// Set a timeout for looking up the previous dataset versions.
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer timeoutCancel()

i := n

for i != 0 {
// Decrement our counter. If counter was -1, this loop will continue forever, until a
// blank PreviousPath is found.
i--
// unpin dataset, ignoring "not pinned" errors
if err = UnpinDataset(ctx, r, curr); err != nil && !strings.Contains(err.Error(), "not pinned") {
return nil, err
}
// if no previous path, break
if curr.Dataset.PreviousPath == "" {
break
}
// Load previous dataset into prev. Use a timeout on this lookup, because we don't want
// IPFS to hang if it's not available, since that would only happen if it's not local to
// our machine. This situation can occur if we have added a foreign dataset from the
// registry, and don't have previous versions. In situations like that, it's okay to
// break this loop since the only purpose of loading previous versions is to unpin them;
// if we don't have all previous versions, we probably don't have them pinned.
// TODO(dlong): If IPFS gains the ability to ask "do I have these blocks locally", use
// that instead of the network-aware LoadDatasetRefs.
next, err := dsfs.LoadDatasetRefs(timeoutCtx, r.Store(), curr.Dataset.PreviousPath)
if err != nil {
// Note: We want delete to succeed even if datasets are remote, so we don't fail on
// this error, and break early instead.
if strings.Contains(err.Error(), "context deadline exceeded") {
log.Debugf("could not load dataset ref, not found locally")
break
}
// TODO (b5) - removing dataset versions should rely on logbook, which is able
// to traverse across missing datasets in qfs
log.Debugf("error fetching previous: %s", err)
break
}
curr = reporef.DatasetRef{
Path: next.Path,
Dataset: next,
}
}

err = r.Logbook().WriteVersionDelete(ctx, reporef.ConvertToDsref(*ref), n)
if err == logbook.ErrNoLogbook {
err = nil
}

return &curr, nil
}
193 changes: 0 additions & 193 deletions base/dataset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package base

import (
"context"
"fmt"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/qri-io/dataset"
"github.com/qri-io/dataset/dstest"
"github.com/qri-io/qfs/cafs"
"github.com/qri-io/qri/base/dsfs"
Expand Down Expand Up @@ -134,197 +132,6 @@ func TestDatasetPinning(t *testing.T) {
}
}

func TestRemoveNVersionsFromStore(t *testing.T) {
ctx := context.Background()
r := newTestRepo(t)

bad := []struct {
description string
store repo.Repo
ref *reporef.DatasetRef
n int
err string
}{
{"No repo", nil, nil, 0, "need a repo"},
{"No ref", r, nil, 0, "need a dataset reference with a path"},
{"No ref.Path", r, &reporef.DatasetRef{}, 0, "need a dataset reference with a path"},
{"invalid n", r, &reporef.DatasetRef{Path: "path", Dataset: &dataset.Dataset{}}, -2, "invalid 'n', n should be n >= 0 or n == -1 to indicate removing all versions"},
}

for _, c := range bad {
_, err := RemoveNVersionsFromStore(ctx, c.store, c.ref, c.n)
if err == nil {
t.Errorf("case %s expected: '%s', got no error", c.description, c.err)
continue
}
if c.err != err.Error() {
t.Errorf("case %s error mismatch. expected: '%s', got: '%s'", c.description, c.err, err.Error())
}
}

// create test repo and history
// create history of 10 versions
initDs := addCitiesDataset(t, r)
refs := []*reporef.DatasetRef{&initDs}
historyTotal := 10
for i := 2; i <= historyTotal; i++ {
update := updateCitiesDataset(t, r, fmt.Sprintf("example city data version %d", i))
refs = append(refs, &update)
}

good := []struct {
description string
n int
}{
{"remove n == 0 versions", 0},
{"remove n == 1 versions", 1},
{"remove n == 3 versions", 3},
// should not error when n is greater then the length of history
{"remove n == 10 versions", 10},
}

for _, c := range good {
// remove
latestRef := refs[len(refs)-1]
_, err := RemoveNVersionsFromStore(ctx, r, latestRef, c.n)
if err != nil {
t.Errorf("case '%s', unexpected err: %s", c.description, err.Error())
}
// verifyRefsRemoved will return an empty string
// if the correct number of refs have been removed
s := verifyRefsRemoved(ctx, r.Store(), refs, c.n)
if s != "" {
t.Errorf("case '%s', refs removed incorrectly: %s", c.description, s)
}
shorten := len(refs) - c.n
if shorten < 0 {
shorten = len(refs)
}
refs = refs[:shorten]
}

// remove the ds reference to the cities dataset before we populate
// the repo with cities datasets again
r.DeleteRef(initDs)

// create test repo and history
// create history of 10 versions
initDs = addCitiesDataset(t, r)
refs = []*reporef.DatasetRef{&initDs}
for i := 2; i <= historyTotal; i++ {
update := updateCitiesDataset(t, r, fmt.Sprintf("example city data version %d", i))
refs = append(refs, &update)
}
_, err := RemoveNVersionsFromStore(ctx, r, refs[len(refs)-1], -1)
if err != nil {
t.Errorf("case 'remove all', unexpected err: %s", err.Error())
}
s := verifyRefsRemoved(ctx, r.Store(), refs, len(refs))
if s != "" {
t.Errorf("case 'remove all', refs removed incorrectly: %s", s)
}

}

// takes store s, where datasets have been added/removed
// takes a list of refs, where refs[0] is the initial (oldest) dataset
// take int n where n is the number of MOST RECENT datasets that should
// have been removed
// assumes that each Dataset has a Meta component with a Title
func verifyRefsRemoved(ctx context.Context, s cafs.Filestore, refs []*reporef.DatasetRef, n int) string {

// datasets from index len(refs) - n - 1 SHOULD EXISTS
// we should error if they DON't exist
errString := ""
for i, ref := range refs {
// datasets from index len(refs) - 1 to len(refs) - n SHOULD NOT EXISTS
// we should error if they exist

exists, err := s.Has(ctx, ref.Path)
if err != nil {
return fmt.Sprintf("error checking ref '%s' with title '%s' from store", ref.Dataset.Path, ref.Dataset.Meta.Title)
}

// datasets that are less then len(refs) - n, should exist
if i < len(refs)-n {
if exists == true {
continue
}
errString += fmt.Sprintf("\nref '%s' should exist in the store, but does NOT", ref.Dataset.Meta.Title)
continue
}

// datasets that are greater then len(refs) - n, should NOT exist
if exists == false {
continue
}
errString += fmt.Sprintf("\nref '%s' should NOT exist in the store, but does", ref.Dataset.Meta.Title)

}
return errString
}

func TestVerifyRefsRemove(t *testing.T) {
ctx := context.Background()
r := newTestRepo(t)
// create test repo and history
// create history of 10 versions
initDs := addCitiesDataset(t, r)

//
refs := []*reporef.DatasetRef{&initDs}
historyTotal := 3
for i := 2; i <= historyTotal; i++ {
update := updateCitiesDataset(t, r, fmt.Sprintf("example city data version %d", i))
refs = append(refs, &update)
}
// test that all real refs exist
// aka n = 0
s := verifyRefsRemoved(ctx, r.Store(), refs, 0)
if s != "" {
t.Errorf("case 'all refs should exist' should return empty string, got '%s'", s)
}

// test that when we have refs in the store
// but we say that there should be no refs in the store
// we get the proper response:
s = verifyRefsRemoved(ctx, r.Store(), refs, 2)
sExpected := "\nref 'example city data version 2' should NOT exist in the store, but does\nref 'example city data version 3' should NOT exist in the store, but does"
if s != sExpected {
t.Errorf("case 'all refs exist, but we say 2 should not' response mismatch: expected '%s', got '%s'", sExpected, s)
}

for i := 0; i < 3; i++ {
fakeRef := reporef.DatasetRef{
Path: fmt.Sprintf("/map/%d", i),
Dataset: &dataset.Dataset{
Meta: &dataset.Meta{
Title: fmt.Sprintf("Fake Ref version %d", i),
},
},
}
refs = append(refs, &fakeRef)
}
// test that all real refs exist in store
// and all fake refs do not exist in store
// aka n = 3
s = verifyRefsRemoved(ctx, r.Store(), refs, 3)
if s != "" {
t.Errorf("case '3 fake refs, with n == 3' should return empty string, got '%s'", s)
}

// test that when we say we do have refs in the store
// but we really don't, we get the proper response:
s = verifyRefsRemoved(ctx, r.Store(), refs, 0)
sExpected = `
ref 'Fake Ref version 0' should exist in the store, but does NOT
ref 'Fake Ref version 1' should exist in the store, but does NOT
ref 'Fake Ref version 2' should exist in the store, but does NOT`
if s != sExpected {
t.Errorf("case 'expect empty refs to exist' response mismatch: expected '%s', got '%s'", sExpected, s)
}
}

func TestRawDatasetRefs(t *testing.T) {
// to keep hashes consistent, artificially specify the timestamp by overriding
// the dsfs.Timestamp func
Expand Down
Loading

0 comments on commit efaf600

Please sign in to comment.