Skip to content

Commit

Permalink
refactor(repo.Repo): overhaul repo interface
Browse files Browse the repository at this point in the history
Our repo interface has needed work for some time, this is a first crack at refactoring, focusing on
the seam between methods in the core package and the business logic repos take on. This moves a
bunch of logic from core into repos

Based on this refactor it seems repos need a deeper rethink, possibly with a split into two
packages, where the first contains just the business logic that echos the methods in core that work
with local storage (CreateDataset, ListDatasets, etc), and the primitive API's that business logic
needs to do it's job (LogEvent, PutRef, etc.)
  • Loading branch information
b5 committed Mar 21, 2018
1 parent 35686fd commit 7a7de42
Show file tree
Hide file tree
Showing 38 changed files with 644 additions and 953 deletions.
1 change: 1 addition & 0 deletions api/testdata/addResponseFromURL.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"timestamp": "2001-01-01T01:01:01.000000001Z",
"title": "initial commit"
},
"dataPath": "/map/QmXGZq5E3HmHzNHuoF9bHuCNaBSxpVtazPbCAQMcMnQqAG",
"meta": {
"accrualPeriodicity": "R/P1W",
"downloadPath": "http://insight.dev.schoolwires.com/HelpAssets/C2Assets/C2Files/C2ImportFamRelSample.csv",
Expand Down
3 changes: 2 additions & 1 deletion api/testdata/historyResponse.json
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@
"timestamp": "2001-01-01T01:01:01.000000001Z",
"title": "added row to include Seoul, Korea"
},
"dataPath": "/map/QmVYgdpvgnq3FABZFVWUgxr7UCwNSRJz97vBU9YX5g5pQ4",
"dataPath": "/map/QmY3dTEQLQFe3VTMi646N5NK2TRijPtCS2fi85z72owHPD",
"meta": {
"description": "test description",
"keywords": null,
Expand Down Expand Up @@ -206,6 +206,7 @@
"timestamp": "2001-01-01T01:01:01.000000001Z",
"title": "initial commit"
},
"dataPath": "/map/QmVYgdpvgnq3FABZFVWUgxr7UCwNSRJz97vBU9YX5g5pQ4",
"meta": {
"description": "test description",
"keywords": null,
Expand Down
3 changes: 2 additions & 1 deletion api/testdata/historyResponseAt.json
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@
"timestamp": "2001-01-01T01:01:01.000000001Z",
"title": "added row to include Seoul, Korea"
},
"dataPath": "/map/QmVYgdpvgnq3FABZFVWUgxr7UCwNSRJz97vBU9YX5g5pQ4",
"dataPath": "/map/QmY3dTEQLQFe3VTMi646N5NK2TRijPtCS2fi85z72owHPD",
"meta": {
"description": "test description",
"keywords": null,
Expand Down Expand Up @@ -206,6 +206,7 @@
"timestamp": "2001-01-01T01:01:01.000000001Z",
"title": "initial commit"
},
"dataPath": "/map/QmVYgdpvgnq3FABZFVWUgxr7UCwNSRJz97vBU9YX5g5pQ4",
"meta": {
"description": "test description",
"keywords": null,
Expand Down
3 changes: 2 additions & 1 deletion api/testdata/historyResponsePath.json
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@
"timestamp": "2001-01-01T01:01:01.000000001Z",
"title": "added row to include Seoul, Korea"
},
"dataPath": "/map/QmVYgdpvgnq3FABZFVWUgxr7UCwNSRJz97vBU9YX5g5pQ4",
"dataPath": "/map/QmY3dTEQLQFe3VTMi646N5NK2TRijPtCS2fi85z72owHPD",
"meta": {
"description": "test description",
"keywords": null,
Expand Down Expand Up @@ -206,6 +206,7 @@
"timestamp": "2001-01-01T01:01:01.000000001Z",
"title": "initial commit"
},
"dataPath": "/map/QmVYgdpvgnq3FABZFVWUgxr7UCwNSRJz97vBU9YX5g5pQ4",
"meta": {
"description": "test description",
"keywords": null,
Expand Down
3 changes: 2 additions & 1 deletion api/testdata/saveMetaResponse.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"data": {
"peername": "me",
"peername": "peer",
"peerID": "QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt",
"name": "cities",
"path": "/map/QmcQsi93yUryyWvw6mPyDNoKRb7FcBx8QGBAeJ25kXQjnC",
"dataset": {
Expand Down
3 changes: 2 additions & 1 deletion cmd/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ const profileData = `
// This is a basic integration test that makes sure basic happy paths work on the CLI
func TestCommandsIntegration(t *testing.T) {
path := filepath.Join(os.TempDir(), "qri_test_commands_integration")
t.Logf("temp path: %s", path)
// t.Logf("temp path: %s", path)
log.Info(path)
//clean up if previous cleanup failed
if _, err := os.Stat(path); os.IsNotExist(err) {
os.RemoveAll(path)
Expand Down
4 changes: 1 addition & 3 deletions cmd/connect.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cmd

import (
"github.com/qri-io/analytics"
"github.com/qri-io/cafs"
"github.com/qri-io/qri/api"
"github.com/qri-io/qri/core"
Expand Down Expand Up @@ -59,8 +58,7 @@ call it a “prime” port number.`,
Peername: "mem user",
},
cafs.NewMapstore(),
repo.MemPeers{},
&analytics.Memstore{})
repo.MemProfiles{})
ExitIfErr(err)
} else {
r = getRepo(true)
Expand Down
3 changes: 1 addition & 2 deletions core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package core
import (
"testing"

"github.com/qri-io/analytics"
"github.com/qri-io/cafs"
"github.com/qri-io/qri/p2p"
"github.com/qri-io/qri/repo"
Expand All @@ -25,7 +24,7 @@ func TestReceivers(t *testing.T) {
}

func testQriNode(cfgs ...func(c *p2p.NodeCfg)) (*p2p.QriNode, error) {
r, err := repo.NewMemRepo(&profile.Profile{}, cafs.NewMapstore(), repo.MemPeers{}, &analytics.Memstore{})
r, err := repo.NewMemRepo(&profile.Profile{}, cafs.NewMapstore(), repo.MemProfiles{})
if err != nil {
return nil, err
}
Expand Down
57 changes: 12 additions & 45 deletions core/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (r *DatasetRequests) Get(p *repo.DatasetRef, res *repo.DatasetRef) (err err
if r.Node != nil {
ref := p
err := r.Node.RequestDataset(ref)
if ref != nil {
if ref != nil && ref.Dataset != nil {
ds := ref.Dataset
// TODO - this is really stupid, p2p.RequestDatasetInfo should return an error here
if ds.IsEmpty() {
Expand Down Expand Up @@ -290,13 +290,6 @@ func (r *DatasetRequests) Init(p *InitParams, res *repo.DatasetRef) error {
return fmt.Errorf("invalid structure: %s", err.Error())
}

// TODO - restore
// if err := validate.DataFormat(st.Format, bytes.NewReader(data)); err != nil {
// return fmt.Errorf("invalid data format: %s", err.Error())
// }

// TODO - check for errors in dataset and warn user if errors exist

datakey, err := store.Put(cafs.NewMemfileBytes("data."+st.Format.String(), data), false)
if err != nil {
return fmt.Errorf("error putting data file in store: %s", err.Error())
Expand Down Expand Up @@ -334,29 +327,13 @@ func (r *DatasetRequests) Init(p *InitParams, res *repo.DatasetRef) error {
}

dataf := cafs.NewMemfileBytes("data."+st.Format.String(), data)
dskey, err := r.repo.CreateDataset(name, ds, dataf, true)
*res, err = r.repo.CreateDataset(name, ds, dataf, true)
if err != nil {
log.Debugf("error creating dataset: %s\n", err.Error())
return err
}

ref := repo.DatasetRef{Peername: p.Peername, Name: name, Path: dskey.String(), Dataset: ds}

if err := repo.CanonicalizePeer(r.repo, &ref); err != nil {
return fmt.Errorf("error canonicalizing peername: %s", err.Error())
}

if err = r.repo.PutRef(ref); err != nil {
return fmt.Errorf("error adding dataset name to repo: %s", err.Error())
}

// ds, err = r.repo.GetDataset(dskey)
// if err != nil {
// return fmt.Errorf("error reading dataset: '%s': %s", dskey.String(), err.Error())
// }

*res = ref
return nil
return r.repo.ReadDataset(res)
}

// SaveParams defines permeters for Dataset Saves
Expand Down Expand Up @@ -509,30 +486,20 @@ func (r *DatasetRequests) Save(p *SaveParams, res *repo.DatasetRef) (err error)
ds.Structure.SetPath("")

dataf = cafs.NewMemfileBytes("data."+st.Format.String(), data)
dspath, err := r.repo.CreateDataset(p.Name, ds, dataf, true)
ref, err := r.repo.CreateDataset(p.Name, ds, dataf, true)
if err != nil {
fmt.Printf("create ds error: %s\n", err.Error())
return err
}
ref.Dataset = ds

if prev.Name != "" {
if err := r.repo.DeleteRef(*prev); err != nil {
log.Debug(err.Error())
return err
}
prev.Path = dspath.String()
if err := r.repo.PutRef(*prev); err != nil {
log.Debug(err.Error())
return err
}
}

*res = repo.DatasetRef{
Peername: p.Peername,
Name: p.Name,
Path: dspath.String(),
Dataset: ds,
}
// *res = repo.DatasetRef{
// Peername: p.Peername,
// Name: p.Name,
// Path: dspath.String(),
// Dataset: ds,
// }
*res = ref

return nil
}
Expand Down
13 changes: 10 additions & 3 deletions core/datasets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,15 +451,22 @@ func TestDataRequestsDiff(t *testing.T) {
// Metadata: jobsMeta,
}
err = req.Init(initParams, dsRef1)
t.Log(dsRef1.String())
if err != nil {
t.Errorf("couldn't load file 1: %s", err.Error())
return
}
dsBase, err := dsfs.LoadDataset(mr.Store(), datastore.NewKey(dsRef1.Path))
if err != nil {
t.Errorf("error loading dataset 1: %s", err.Error())
if err := mr.ReadDataset(dsRef1); err != nil {
t.Errorf("error reading dataset 1: %s", err.Error())
return
}

dsBase := dsRef1.Dataset

// dsBase, err := dsfs.LoadDataset(mr.Store(), datastore.NewKey(dsRef1.Path))
// if err != nil {
// return
// }
// File 2
dsRef2 := &repo.DatasetRef{}
initParams = &InitParams{
Expand Down
4 changes: 1 addition & 3 deletions core/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"net/rpc"

"github.com/ipfs/go-datastore"
"github.com/qri-io/qri/p2p"
"github.com/qri-io/qri/repo"
)
Expand Down Expand Up @@ -81,8 +80,7 @@ func (d *HistoryRequests) Log(params *LogParams, res *[]repo.DatasetRef) (err er
limit := params.Limit

for {
ref.Dataset, err = d.repo.GetDataset(datastore.NewKey(ref.Path))
if err != nil {
if err = d.repo.ReadDataset(&ref); err != nil {
log.Debug(err.Error())
return fmt.Errorf("error adding datasets to log: %s", err.Error())
}
Expand Down
10 changes: 5 additions & 5 deletions core/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (d *PeerRequests) List(p *ListParams, res *[]*profile.Profile) error {
return err
}

ps, err := r.Peers().List()
ps, err := r.Profiles().List()
if err != nil {
return fmt.Errorf("error listing peers: %s", err.Error())
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func (d *PeerRequests) ConnectToPeer(pid *peer.ID, res *profile.Profile) error {
return d.cli.Call("PeerRequests.ConnectToPeer", pid, res)
}

if profile, err := d.qriNode.Repo.Peers().GetPeer(*pid); err == nil {
if profile, err := d.qriNode.Repo.Profiles().GetPeer(*pid); err == nil {
*pid, err = profile.IPFSPeerID()
if err != nil {
return fmt.Errorf("error getting IPFS peer ID: %s", err.Error())
Expand All @@ -136,7 +136,7 @@ func (d *PeerRequests) ConnectToPeer(pid *peer.ID, res *profile.Profile) error {
return fmt.Errorf("error connecting to peer: %s", err.Error())
}

profile, err := d.qriNode.Repo.Peers().GetPeer(*pid)
profile, err := d.qriNode.Repo.Profiles().GetPeer(*pid)
if err != nil {
return fmt.Errorf("error getting peer profile: %s", err.Error())
}
Expand All @@ -159,7 +159,7 @@ func (d *PeerRequests) Info(p *PeerInfoParams, res *profile.Profile) error {

r := d.qriNode.Repo

peers, err := r.Peers().List()
peers, err := r.Profiles().List()
if err != nil {
log.Debug(err.Error())
return err
Expand Down Expand Up @@ -193,7 +193,7 @@ func (d *PeerRequests) GetReferences(p *PeerRefsParams, res *[]repo.DatasetRef)
return fmt.Errorf("error decoding peer Id: %s", err.Error())
}

profile, err := d.qriNode.Repo.Peers().GetPeer(id)
profile, err := d.qriNode.Repo.Profiles().GetPeer(id)
if err != nil || profile == nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions core/self_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
)

/*
* TODO - work in progress
* self-updating checks IPNS entry that represents the desired hash of the previous version of
* this program, which is the result of adding the complied binary of this program to IPFS.
* If the returned value of a lookup differs, we have a version mismatch, and need to perform
Expand Down
10 changes: 3 additions & 7 deletions p2p/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"

"github.com/ipfs/go-datastore"
"github.com/qri-io/qri/repo"

peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
Expand All @@ -29,8 +28,7 @@ func (n *QriNode) RequestDataset(ref *repo.DatasetRef) (err error) {
// network request
if ref.PeerID != "" {
if pro, err := n.Repo.Profile(); err == nil && pro.ID == ref.PeerID {
if ds, err := n.Repo.GetDataset(datastore.NewKey(ref.Path)); err == nil {
ref.Dataset = ds
if err := n.Repo.ReadDataset(ref); err == nil {
return nil
}
}
Expand All @@ -45,8 +43,6 @@ func (n *QriNode) RequestDataset(ref *repo.DatasetRef) (err error) {

pids := n.ClosestConnectedPeers(pid, 15)
if len(pids) == 0 {
log.Debug(err.Error())

// TODO - start checking peerstore peers?
// something else should probably be trying to establish
// rolling connections
Expand Down Expand Up @@ -96,8 +92,8 @@ func (n *QriNode) handleDataset(ws *WrappedStream, msg Message) (hangup bool) {
if err := repo.CanonicalizeDatasetRef(n.Repo, &dsr); err == nil {
if ref, err := n.Repo.GetRef(dsr); err == nil {

if ds, err := n.Repo.GetDataset(datastore.NewKey(ref.Path)); err == nil {
ref.Dataset = ds
if err := n.Repo.ReadDataset(&ref); err != nil {
log.Debug(err.Error())
}

res, err = msg.UpdateJSON(ref)
Expand Down
2 changes: 1 addition & 1 deletion p2p/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (n *QriNode) ConnectedQriPeers() map[peer.ID]*profile.Profile {
for _, c := range conns {
id := c.RemotePeer()
// if support, err := n.SupportsQriProtocol(id); err == nil && support {
if p, err := n.Repo.Peers().GetPeer(id); err == nil {
if p, err := n.Repo.Profiles().GetPeer(id); err == nil {
peers[id] = p
}
// }
Expand Down
3 changes: 1 addition & 2 deletions p2p/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"testing"

"github.com/qri-io/analytics"
"github.com/qri-io/cafs"
"github.com/qri-io/qri/repo"
"github.com/qri-io/qri/repo/profile"
Expand Down Expand Up @@ -36,5 +35,5 @@ func NewTestRepo(id peer.ID) (repo.Repo, error) {
return repo.NewMemRepo(&profile.Profile{
ID: id.Pretty(),
Peername: fmt.Sprintf("test-repo-%d", repoID),
}, cafs.NewMapstore(), repo.MemPeers{}, &analytics.Memstore{})
}, cafs.NewMapstore(), repo.MemProfiles{})
}
3 changes: 3 additions & 0 deletions p2p/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
// at a bare minimum we should grab a randomized set of peers
func (n *QriNode) ClosestConnectedPeers(id peer.ID, max int) (pid []peer.ID) {
added := 0
if !n.Online {
return []peer.ID{}
}

if len(n.Host.Network().ConnsToPeer(id)) > 0 {
added++
Expand Down
4 changes: 2 additions & 2 deletions p2p/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (n *QriNode) RequestProfile(pid peer.ID) (*profile.Profile, error) {
return nil, err
}

if err := n.Repo.Peers().PutPeer(pid, pro); err != nil {
if err := n.Repo.Profiles().PutPeer(pid, pro); err != nil {
log.Debug(err.Error())
return nil, err
}
Expand All @@ -69,7 +69,7 @@ func (n *QriNode) handleProfile(ws *WrappedStream, msg Message) (hangup bool) {
pro.Updated = time.Now()

log.Debugf("adding peer: %s", pid.Pretty())
if err := n.Repo.Peers().PutPeer(pid, pro); err != nil {
if err := n.Repo.Profiles().PutPeer(pid, pro); err != nil {
log.Debug(err.Error())
return
}
Expand Down
Loading

0 comments on commit 7a7de42

Please sign in to comment.