Skip to content

Commit

Permalink
feat(registry.Datasets): initial dataset registry integration
Browse files Browse the repository at this point in the history
we gave registries a few new tricks, and have started integrating them here.
If a qri repo is configured with a registry, it will fire off notifications
to the registry server within goroutines on dataset creation & deletion, which
will start to build up a list of datasets on the registry side. This comes back
to help users on core.datasetRequests.Get, which now includes a parallel check
to both p2p network *and* registries if we don't have a local copy.

The upside here is we can use registries to do checks if we aren't connect to p2p.
If this goes as expected the next place to expand is adding search on the registry
side, and then move on to tracking who has pinned what.

*might* make some inroads on #397
  • Loading branch information
b5 committed May 15, 2018
1 parent 9bcb303 commit 58d64c2
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 38 deletions.
21 changes: 6 additions & 15 deletions api/profile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/qri-io/qri/core"
"github.com/qri-io/qri/repo/profile"
"github.com/qri-io/qri/repo/test"
"github.com/qri-io/registry/regclient"
regmock "github.com/qri-io/registry/regserver/mock"
)

Expand All @@ -32,12 +31,8 @@ func TestProfileHandler(t *testing.T) {
golog.SetLogLevel("qriapi", "error")
defer golog.SetLogLevel("qriapi", "info")

// use a test registry server
registryServer := regmock.NewMockServer()
// and test registry client
rc := regclient.NewClient(&regclient.Config{Location: registryServer.URL})

// in order to have consistent responses
// use a test registry server & client & client
rc, registryServer := regmock.NewMockServer()
// we need to artificially specify the timestamp
// we use the dsfs.Timestamp func variable to override
// the actual time
Expand Down Expand Up @@ -109,10 +104,8 @@ func TestProfilePhotoHandler(t *testing.T) {
golog.SetLogLevel("qriapi", "error")
defer golog.SetLogLevel("qriapi", "info")

// use a test registry server
registryServer := regmock.NewMockServer()
// and test registry client
rc := regclient.NewClient(&regclient.Config{Location: registryServer.URL})
// use a test registry server & client
rc, registryServer := regmock.NewMockServer()

// in order to have consistent responses
// we need to artificially specify the timestamp
Expand Down Expand Up @@ -202,10 +195,8 @@ func TestProfilePosterHandler(t *testing.T) {
golog.SetLogLevel("qriapi", "error")
defer golog.SetLogLevel("qriapi", "info")

// use a test registry server
registryServer := regmock.NewMockServer()
// and test registry client
rc := regclient.NewClient(&regclient.Config{Location: registryServer.URL})
// use a test registry server & client
rc, registryServer := regmock.NewMockServer()

// in order to have consistent responses
// we need to artificially specify the timestamp
Expand Down
7 changes: 2 additions & 5 deletions api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/qri-io/qri/core"
"github.com/qri-io/qri/repo/profile"
"github.com/qri-io/qri/repo/test"
"github.com/qri-io/registry/regclient"
regmock "github.com/qri-io/registry/regserver/mock"
)

Expand All @@ -44,10 +43,8 @@ func TestServerRoutes(t *testing.T) {
golog.SetLogLevel("qriapi", "error")
defer golog.SetLogLevel("qriapi", "info")

// use a test registry server
registryServer := regmock.NewMockServer()
// and test registry client
rc := regclient.NewClient(&regclient.Config{Location: registryServer.URL})
// use a test registry server & client
rc, registryServer := regmock.NewMockServer()

// in order to have consistent responses
// we need to artificially specify the timestamp
Expand Down
2 changes: 1 addition & 1 deletion cmd/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestCommandsIntegration(t *testing.T) {
t.Skip(err.Error())
}

registryServer := regmock.NewMockServer()
_, registryServer := regmock.NewMockServer()

path := filepath.Join(os.TempDir(), "qri_test_commands_integration")
t.Logf("test filepath: %s", path)
Expand Down
2 changes: 1 addition & 1 deletion cmd/connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

func TestConnect(t *testing.T) {

registryServer := regmock.NewMockServer()
_, registryServer := regmock.NewMockServer()

if err := confirmQriNotRunning(); err != nil {
t.Skip(err.Error())
Expand Down
64 changes: 62 additions & 2 deletions core/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/rpc"
"path/filepath"
"strings"
"time"

"github.com/ipfs/go-datastore"
"github.com/qri-io/cafs"
Expand Down Expand Up @@ -193,12 +194,71 @@ func (r *DatasetRequests) Get(p *repo.DatasetRef, res *repo.DatasetRef) (err err
}

store := r.repo.Store()

// try to load dataset locally
ds, err := dsfs.LoadDataset(store, datastore.NewKey(p.Path))
if err != nil {
var (
refs = make(chan repo.DatasetRef)
errs = make(chan error)
tries, fails int
)

// if we have a p2p node, check p2p network for deets
if r.Node != nil {
return r.Node.RequestDataset(p)
tries++
go func() {
ref := repo.DatasetRef{}
// TODO - should add a context to this call with a timeout
if err := r.Node.RequestDataset(&ref); err == nil {
refs <- ref
} else {
errs <- err
}
}()
}
return err

// if we have a registry check it for details
if rc := r.repo.Registry(); rc != nil {
go func() {
tries++
if dsp, err := rc.GetDataset(p.Peername, p.Name, p.ProfileID.String(), p.Path); err == nil {
ref := repo.DatasetRef{
Path: dsp.Path,
Peername: dsp.Peername,
Name: dsp.Name,
Dataset: dsp,
}

if pid, err := profile.IDB58Decode(dsp.ProfileID); err == nil {
ref.ProfileID = pid
}

refs <- ref
} else {
errs <- err
}
}()
}

for {
select {
case ref := <-refs:
*res = ref
return nil
case err := <-errs:
fails++
log.Debugf("error getting dataset: %s", err.Error())
if fails == tries {
return repo.ErrNotFound
}
case <-time.After(time.Second * 5):
// TODO- replace this with context.WithTimeout funcs on all network calls
return repo.ErrNotFound
}
}

return nil
}

*res = repo.DatasetRef{
Expand Down
26 changes: 16 additions & 10 deletions core/datasets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/qri-io/qri/p2p/test"
"github.com/qri-io/qri/repo"
testrepo "github.com/qri-io/qri/repo/test"
regmock "github.com/qri-io/registry/regserver/mock"
)

func TestDatasetRequestsInit(t *testing.T) {
Expand Down Expand Up @@ -189,7 +190,8 @@ func TestDatasetRequestsListP2p(t *testing.T) {
}

func TestDatasetRequestsGet(t *testing.T) {
mr, err := testrepo.NewTestRepo(nil)
rc, _ := regmock.NewMockServer()
mr, err := testrepo.NewTestRepo(rc)
if err != nil {
t.Errorf("error allocating test repo: %s", err.Error())
return
Expand All @@ -211,8 +213,8 @@ func TestDatasetRequestsGet(t *testing.T) {
res *dataset.Dataset
err string
}{
//TODO: probably delete some of these
{repo.DatasetRef{Peername: "peer", Path: "abc", Name: "ABC"}, nil, "error loading dataset: error getting file bytes: datastore: key not found"},
// TODO: probably delete some of these
{repo.DatasetRef{Peername: "peer", Path: "abc", Name: "ABC"}, nil, "repo: not found"},
{repo.DatasetRef{Peername: "peer", Path: ref.Path, Name: "ABC"}, nil, ""},
{repo.DatasetRef{Peername: "peer", Path: ref.Path, Name: "movies"}, moviesDs, ""},
{repo.DatasetRef{Peername: "peer", Path: ref.Path, Name: "cats"}, moviesDs, ""},
Expand All @@ -233,7 +235,8 @@ func TestDatasetRequestsGet(t *testing.T) {
}

func TestDatasetRequestsSave(t *testing.T) {
mr, err := testrepo.NewTestRepo(nil)
rc, _ := regmock.NewMockServer()
mr, err := testrepo.NewTestRepo(rc)
if err != nil {
t.Errorf("error allocating test repo: %s", err.Error())
return
Expand All @@ -246,7 +249,7 @@ func TestDatasetRequestsSave(t *testing.T) {
// {&SaveParams{Path: datastore.NewKey("abc"), Name: "ABC", Hash: "123"}, nil, "error loading dataset: error getting file bytes: datastore: key not found"},
// {&SaveParams{Path: path, Name: "ABC", Hash: "123"}, nil, ""},
{&SaveParams{Name: "movies", Peername: "peer", MetadataFilename: "meta.json", Metadata: bytes.NewReader([]byte(`{"title":"movies!"}`))}, ""},
{&SaveParams{Name: "unknown_dataset", Peername: "peer"}, "error getting previous dataset: error loading dataset: error getting file bytes: datastore: key not found"},
{&SaveParams{Name: "unknown_dataset", Peername: "peer"}, "error getting previous dataset: repo: not found"},
// {&SaveParams{Path: path, Name: "cats"}, moviesDs, ""},
}

Expand All @@ -265,7 +268,8 @@ func TestDatasetRequestsSave(t *testing.T) {
}

func TestDatasetRequestsRename(t *testing.T) {
mr, err := testrepo.NewTestRepo(nil)
rc, _ := regmock.NewMockServer()
mr, err := testrepo.NewTestRepo(rc)
if err != nil {
t.Errorf("error allocating test repo: %s", err.Error())
return
Expand Down Expand Up @@ -300,7 +304,8 @@ func TestDatasetRequestsRename(t *testing.T) {
}

func TestDatasetRequestsRemove(t *testing.T) {
mr, err := testrepo.NewTestRepo(nil)
rc, _ := regmock.NewMockServer()
mr, err := testrepo.NewTestRepo(rc)
if err != nil {
t.Errorf("error allocating test repo: %s", err.Error())
return
Expand Down Expand Up @@ -334,8 +339,8 @@ func TestDatasetRequestsRemove(t *testing.T) {
}

func TestDatasetRequestsStructuredData(t *testing.T) {

mr, err := testrepo.NewTestRepo(nil)
rc, _ := regmock.NewMockServer()
mr, err := testrepo.NewTestRepo(rc)
if err != nil {
t.Errorf("error allocating test repo: %s", err.Error())
return
Expand Down Expand Up @@ -496,7 +501,8 @@ Pirates of the Caribbean: At World's End ,foo
}

func TestDataRequestsDiff(t *testing.T) {
mr, err := testrepo.NewTestRepo(nil)
rc, _ := regmock.NewMockServer()
mr, err := testrepo.NewTestRepo(rc)
if err != nil {
t.Errorf("error allocating test repo: %s", err.Error())
return
Expand Down
2 changes: 1 addition & 1 deletion core/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func TestSetupTeardown(t *testing.T) {
registryServer := regmock.NewMockServer()
_, registryServer := regmock.NewMockServer()

path := filepath.Join(os.TempDir(), "test_core_setup_teardown")
cfg1 := config.DefaultConfig()
Expand Down
32 changes: 30 additions & 2 deletions repo/actions/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ func (act Dataset) CreateDataset(name string, ds *dataset.Dataset, data cafs.Fil
return
}

if rc := act.Registry(); rc != nil {
go func() {
if err := rc.PutDataset(pro.Peername, name, ds.Encode(), pro.PrivKey.GetPublic()); err != nil {
log.Errorf("registering dataset: %s", err.Error())
return
}
}()
}

if err = act.LogEvent(repo.ETDsCreated, ref); err != nil {
return
}
Expand Down Expand Up @@ -154,12 +163,31 @@ func (act Dataset) UnpinDataset(ref repo.DatasetRef) error {

// DeleteDataset removes a dataset from the store
func (act Dataset) DeleteDataset(ref repo.DatasetRef) error {
if err := act.DeleteRef(ref); err != nil {
pro, err := act.Profile()
if err != nil {
return err
}

ds, err := dsfs.LoadDataset(act.Store(), datastore.NewKey(ref.Path))
if err != nil {
return err
}

if err = act.DeleteRef(ref); err != nil {
return err
}
if err := act.UnpinDataset(ref); err != nil && err != repo.ErrNotPinner {
if err = act.UnpinDataset(ref); err != nil && err != repo.ErrNotPinner {
return err
}

if rc := act.Registry(); rc != nil {
go func() {
if err := rc.DeleteDataset(ref.Peername, ref.Name, ds.Encode(), pro.PrivKey.GetPublic()); err != nil {
log.Errorf("deleting dataset: %s", err.Error())
return
}
}()
}

return act.LogEvent(repo.ETDsDeleted, ref)
}
14 changes: 13 additions & 1 deletion repo/actions/dataset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/libp2p/go-libp2p-crypto"
"github.com/qri-io/cafs"
"github.com/qri-io/dataset/dstest"
"github.com/qri-io/qri/repo"
"github.com/qri-io/qri/repo/profile"
"github.com/qri-io/registry/regserver/mock"
)

// base64-encoded Test Private Key, decoded in init
Expand Down Expand Up @@ -45,8 +47,10 @@ func init() {
}

func TestDataset(t *testing.T) {
rc, _ := mock.NewMockServer()

rmf := func(t *testing.T) repo.Repo {
mr, err := repo.NewMemRepo(testPeerProfile, cafs.NewMapstore(), profile.NewMemStore(), nil)
mr, err := repo.NewMemRepo(testPeerProfile, cafs.NewMapstore(), profile.NewMemStore(), rc)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -93,6 +97,14 @@ func createDataset(t *testing.T, rmf RepoMakerFunc) (repo.Repo, repo.DatasetRef)
t.Error(err.Error())
}

if rc := r.Registry(); rc != nil {
// silly sleep to make sure registry http req goroutine has happened
time.Sleep(time.Millisecond * 150)
if _, err := rc.GetDataset(testPeerProfile.Peername, tc.Name, "", ""); err != nil {
t.Errorf("registry should have received dataset: %s", err.Error())
}
}

return r, ref
}

Expand Down

0 comments on commit 58d64c2

Please sign in to comment.