From 58d64c25cc7a20a5a3a1a56b85ab454bc999d026 Mon Sep 17 00:00:00 2001 From: b5 Date: Tue, 15 May 2018 10:40:55 -0400 Subject: [PATCH] feat(registry.Datasets): initial dataset registry integration we gave registries a few new tricks, and have started integrating them here. If a qri repo is configured with a registry, it will fire off notifications to the registry server within goroutines on dataset creation & deletion, which will start to build up a list of datasets on the registry side. This comes back to help users on core.datasetRequests.Get, which now includes a parallel check to both p2p network *and* registries if we don't have a local copy. The upside here is we can use registries to do checks if we aren't connect to p2p. If this goes as expected the next place to expand is adding search on the registry side, and then move on to tracking who has pinned what. *might* make some inroads on #397 --- api/profile_test.go | 21 ++++-------- api/server_test.go | 7 ++-- cmd/cmd_test.go | 2 +- cmd/connect_test.go | 2 +- core/datasets.go | 64 ++++++++++++++++++++++++++++++++++-- core/datasets_test.go | 26 +++++++++------ core/setup_test.go | 2 +- repo/actions/dataset.go | 32 ++++++++++++++++-- repo/actions/dataset_test.go | 14 +++++++- 9 files changed, 132 insertions(+), 38 deletions(-) diff --git a/api/profile_test.go b/api/profile_test.go index 3ac666f71..744d52261 100644 --- a/api/profile_test.go +++ b/api/profile_test.go @@ -19,7 +19,6 @@ import ( "github.com/qri-io/qri/core" "github.com/qri-io/qri/repo/profile" "github.com/qri-io/qri/repo/test" - "github.com/qri-io/registry/regclient" regmock "github.com/qri-io/registry/regserver/mock" ) @@ -32,12 +31,8 @@ func TestProfileHandler(t *testing.T) { golog.SetLogLevel("qriapi", "error") defer golog.SetLogLevel("qriapi", "info") - // use a test registry server - registryServer := regmock.NewMockServer() - // and test registry client - rc := regclient.NewClient(®client.Config{Location: registryServer.URL}) - - // in order to have consistent responses + // use a test registry server & client & client + rc, registryServer := regmock.NewMockServer() // we need to artificially specify the timestamp // we use the dsfs.Timestamp func variable to override // the actual time @@ -109,10 +104,8 @@ func TestProfilePhotoHandler(t *testing.T) { golog.SetLogLevel("qriapi", "error") defer golog.SetLogLevel("qriapi", "info") - // use a test registry server - registryServer := regmock.NewMockServer() - // and test registry client - rc := regclient.NewClient(®client.Config{Location: registryServer.URL}) + // use a test registry server & client + rc, registryServer := regmock.NewMockServer() // in order to have consistent responses // we need to artificially specify the timestamp @@ -202,10 +195,8 @@ func TestProfilePosterHandler(t *testing.T) { golog.SetLogLevel("qriapi", "error") defer golog.SetLogLevel("qriapi", "info") - // use a test registry server - registryServer := regmock.NewMockServer() - // and test registry client - rc := regclient.NewClient(®client.Config{Location: registryServer.URL}) + // use a test registry server & client + rc, registryServer := regmock.NewMockServer() // in order to have consistent responses // we need to artificially specify the timestamp diff --git a/api/server_test.go b/api/server_test.go index 41db0e557..e485e4030 100644 --- a/api/server_test.go +++ b/api/server_test.go @@ -21,7 +21,6 @@ import ( "github.com/qri-io/qri/core" "github.com/qri-io/qri/repo/profile" "github.com/qri-io/qri/repo/test" - "github.com/qri-io/registry/regclient" regmock "github.com/qri-io/registry/regserver/mock" ) @@ -44,10 +43,8 @@ func TestServerRoutes(t *testing.T) { golog.SetLogLevel("qriapi", "error") defer golog.SetLogLevel("qriapi", "info") - // use a test registry server - registryServer := regmock.NewMockServer() - // and test registry client - rc := regclient.NewClient(®client.Config{Location: registryServer.URL}) + // use a test registry server & client + rc, registryServer := regmock.NewMockServer() // in order to have consistent responses // we need to artificially specify the timestamp diff --git a/cmd/cmd_test.go b/cmd/cmd_test.go index 189b0a58d..4d8563e4d 100644 --- a/cmd/cmd_test.go +++ b/cmd/cmd_test.go @@ -98,7 +98,7 @@ func TestCommandsIntegration(t *testing.T) { t.Skip(err.Error()) } - registryServer := regmock.NewMockServer() + _, registryServer := regmock.NewMockServer() path := filepath.Join(os.TempDir(), "qri_test_commands_integration") t.Logf("test filepath: %s", path) diff --git a/cmd/connect_test.go b/cmd/connect_test.go index 90741bc7f..459e34fdd 100644 --- a/cmd/connect_test.go +++ b/cmd/connect_test.go @@ -11,7 +11,7 @@ import ( func TestConnect(t *testing.T) { - registryServer := regmock.NewMockServer() + _, registryServer := regmock.NewMockServer() if err := confirmQriNotRunning(); err != nil { t.Skip(err.Error()) diff --git a/core/datasets.go b/core/datasets.go index 4cd90ab42..2f0d3900b 100644 --- a/core/datasets.go +++ b/core/datasets.go @@ -10,6 +10,7 @@ import ( "net/rpc" "path/filepath" "strings" + "time" "github.com/ipfs/go-datastore" "github.com/qri-io/cafs" @@ -193,12 +194,71 @@ func (r *DatasetRequests) Get(p *repo.DatasetRef, res *repo.DatasetRef) (err err } store := r.repo.Store() + + // try to load dataset locally ds, err := dsfs.LoadDataset(store, datastore.NewKey(p.Path)) if err != nil { + var ( + refs = make(chan repo.DatasetRef) + errs = make(chan error) + tries, fails int + ) + + // if we have a p2p node, check p2p network for deets if r.Node != nil { - return r.Node.RequestDataset(p) + tries++ + go func() { + ref := repo.DatasetRef{} + // TODO - should add a context to this call with a timeout + if err := r.Node.RequestDataset(&ref); err == nil { + refs <- ref + } else { + errs <- err + } + }() } - return err + + // if we have a registry check it for details + if rc := r.repo.Registry(); rc != nil { + go func() { + tries++ + if dsp, err := rc.GetDataset(p.Peername, p.Name, p.ProfileID.String(), p.Path); err == nil { + ref := repo.DatasetRef{ + Path: dsp.Path, + Peername: dsp.Peername, + Name: dsp.Name, + Dataset: dsp, + } + + if pid, err := profile.IDB58Decode(dsp.ProfileID); err == nil { + ref.ProfileID = pid + } + + refs <- ref + } else { + errs <- err + } + }() + } + + for { + select { + case ref := <-refs: + *res = ref + return nil + case err := <-errs: + fails++ + log.Debugf("error getting dataset: %s", err.Error()) + if fails == tries { + return repo.ErrNotFound + } + case <-time.After(time.Second * 5): + // TODO- replace this with context.WithTimeout funcs on all network calls + return repo.ErrNotFound + } + } + + return nil } *res = repo.DatasetRef{ diff --git a/core/datasets_test.go b/core/datasets_test.go index 965dfd02b..d905ae6cd 100644 --- a/core/datasets_test.go +++ b/core/datasets_test.go @@ -19,6 +19,7 @@ import ( "github.com/qri-io/qri/p2p/test" "github.com/qri-io/qri/repo" testrepo "github.com/qri-io/qri/repo/test" + regmock "github.com/qri-io/registry/regserver/mock" ) func TestDatasetRequestsInit(t *testing.T) { @@ -189,7 +190,8 @@ func TestDatasetRequestsListP2p(t *testing.T) { } func TestDatasetRequestsGet(t *testing.T) { - mr, err := testrepo.NewTestRepo(nil) + rc, _ := regmock.NewMockServer() + mr, err := testrepo.NewTestRepo(rc) if err != nil { t.Errorf("error allocating test repo: %s", err.Error()) return @@ -211,8 +213,8 @@ func TestDatasetRequestsGet(t *testing.T) { res *dataset.Dataset err string }{ - //TODO: probably delete some of these - {repo.DatasetRef{Peername: "peer", Path: "abc", Name: "ABC"}, nil, "error loading dataset: error getting file bytes: datastore: key not found"}, + // TODO: probably delete some of these + {repo.DatasetRef{Peername: "peer", Path: "abc", Name: "ABC"}, nil, "repo: not found"}, {repo.DatasetRef{Peername: "peer", Path: ref.Path, Name: "ABC"}, nil, ""}, {repo.DatasetRef{Peername: "peer", Path: ref.Path, Name: "movies"}, moviesDs, ""}, {repo.DatasetRef{Peername: "peer", Path: ref.Path, Name: "cats"}, moviesDs, ""}, @@ -233,7 +235,8 @@ func TestDatasetRequestsGet(t *testing.T) { } func TestDatasetRequestsSave(t *testing.T) { - mr, err := testrepo.NewTestRepo(nil) + rc, _ := regmock.NewMockServer() + mr, err := testrepo.NewTestRepo(rc) if err != nil { t.Errorf("error allocating test repo: %s", err.Error()) return @@ -246,7 +249,7 @@ func TestDatasetRequestsSave(t *testing.T) { // {&SaveParams{Path: datastore.NewKey("abc"), Name: "ABC", Hash: "123"}, nil, "error loading dataset: error getting file bytes: datastore: key not found"}, // {&SaveParams{Path: path, Name: "ABC", Hash: "123"}, nil, ""}, {&SaveParams{Name: "movies", Peername: "peer", MetadataFilename: "meta.json", Metadata: bytes.NewReader([]byte(`{"title":"movies!"}`))}, ""}, - {&SaveParams{Name: "unknown_dataset", Peername: "peer"}, "error getting previous dataset: error loading dataset: error getting file bytes: datastore: key not found"}, + {&SaveParams{Name: "unknown_dataset", Peername: "peer"}, "error getting previous dataset: repo: not found"}, // {&SaveParams{Path: path, Name: "cats"}, moviesDs, ""}, } @@ -265,7 +268,8 @@ func TestDatasetRequestsSave(t *testing.T) { } func TestDatasetRequestsRename(t *testing.T) { - mr, err := testrepo.NewTestRepo(nil) + rc, _ := regmock.NewMockServer() + mr, err := testrepo.NewTestRepo(rc) if err != nil { t.Errorf("error allocating test repo: %s", err.Error()) return @@ -300,7 +304,8 @@ func TestDatasetRequestsRename(t *testing.T) { } func TestDatasetRequestsRemove(t *testing.T) { - mr, err := testrepo.NewTestRepo(nil) + rc, _ := regmock.NewMockServer() + mr, err := testrepo.NewTestRepo(rc) if err != nil { t.Errorf("error allocating test repo: %s", err.Error()) return @@ -334,8 +339,8 @@ func TestDatasetRequestsRemove(t *testing.T) { } func TestDatasetRequestsStructuredData(t *testing.T) { - - mr, err := testrepo.NewTestRepo(nil) + rc, _ := regmock.NewMockServer() + mr, err := testrepo.NewTestRepo(rc) if err != nil { t.Errorf("error allocating test repo: %s", err.Error()) return @@ -496,7 +501,8 @@ Pirates of the Caribbean: At World's End ,foo } func TestDataRequestsDiff(t *testing.T) { - mr, err := testrepo.NewTestRepo(nil) + rc, _ := regmock.NewMockServer() + mr, err := testrepo.NewTestRepo(rc) if err != nil { t.Errorf("error allocating test repo: %s", err.Error()) return diff --git a/core/setup_test.go b/core/setup_test.go index de469c456..6ce54b0ee 100644 --- a/core/setup_test.go +++ b/core/setup_test.go @@ -10,7 +10,7 @@ import ( ) func TestSetupTeardown(t *testing.T) { - registryServer := regmock.NewMockServer() + _, registryServer := regmock.NewMockServer() path := filepath.Join(os.TempDir(), "test_core_setup_teardown") cfg1 := config.DefaultConfig() diff --git a/repo/actions/dataset.go b/repo/actions/dataset.go index fc69cfc4b..9ec4085eb 100644 --- a/repo/actions/dataset.go +++ b/repo/actions/dataset.go @@ -60,6 +60,15 @@ func (act Dataset) CreateDataset(name string, ds *dataset.Dataset, data cafs.Fil return } + if rc := act.Registry(); rc != nil { + go func() { + if err := rc.PutDataset(pro.Peername, name, ds.Encode(), pro.PrivKey.GetPublic()); err != nil { + log.Errorf("registering dataset: %s", err.Error()) + return + } + }() + } + if err = act.LogEvent(repo.ETDsCreated, ref); err != nil { return } @@ -154,12 +163,31 @@ func (act Dataset) UnpinDataset(ref repo.DatasetRef) error { // DeleteDataset removes a dataset from the store func (act Dataset) DeleteDataset(ref repo.DatasetRef) error { - if err := act.DeleteRef(ref); err != nil { + pro, err := act.Profile() + if err != nil { + return err + } + + ds, err := dsfs.LoadDataset(act.Store(), datastore.NewKey(ref.Path)) + if err != nil { + return err + } + + if err = act.DeleteRef(ref); err != nil { return err } - if err := act.UnpinDataset(ref); err != nil && err != repo.ErrNotPinner { + if err = act.UnpinDataset(ref); err != nil && err != repo.ErrNotPinner { return err } + if rc := act.Registry(); rc != nil { + go func() { + if err := rc.DeleteDataset(ref.Peername, ref.Name, ds.Encode(), pro.PrivKey.GetPublic()); err != nil { + log.Errorf("deleting dataset: %s", err.Error()) + return + } + }() + } + return act.LogEvent(repo.ETDsDeleted, ref) } diff --git a/repo/actions/dataset_test.go b/repo/actions/dataset_test.go index 5897d163a..b99566165 100644 --- a/repo/actions/dataset_test.go +++ b/repo/actions/dataset_test.go @@ -6,12 +6,14 @@ import ( "os" "path/filepath" "testing" + "time" "github.com/libp2p/go-libp2p-crypto" "github.com/qri-io/cafs" "github.com/qri-io/dataset/dstest" "github.com/qri-io/qri/repo" "github.com/qri-io/qri/repo/profile" + "github.com/qri-io/registry/regserver/mock" ) // base64-encoded Test Private Key, decoded in init @@ -45,8 +47,10 @@ func init() { } func TestDataset(t *testing.T) { + rc, _ := mock.NewMockServer() + rmf := func(t *testing.T) repo.Repo { - mr, err := repo.NewMemRepo(testPeerProfile, cafs.NewMapstore(), profile.NewMemStore(), nil) + mr, err := repo.NewMemRepo(testPeerProfile, cafs.NewMapstore(), profile.NewMemStore(), rc) if err != nil { panic(err) } @@ -93,6 +97,14 @@ func createDataset(t *testing.T, rmf RepoMakerFunc) (repo.Repo, repo.DatasetRef) t.Error(err.Error()) } + if rc := r.Registry(); rc != nil { + // silly sleep to make sure registry http req goroutine has happened + time.Sleep(time.Millisecond * 150) + if _, err := rc.GetDataset(testPeerProfile.Peername, tc.Name, "", ""); err != nil { + t.Errorf("registry should have received dataset: %s", err.Error()) + } + } + return r, ref }