Skip to content

Commit

Permalink
feat(p2p): shiny new peer-2-peer communication library
Browse files Browse the repository at this point in the history
This one has been a loooooooong time coming. Up until this point we've been heavily focused on
getting our dataset definitions to work properly, assuming that if peers are communicating about
inaccurate datasets, well, that's not a good starting point.

In the last little while datasets have turned a corner, and we can finally turn our attention to p2p
While I don't imagine this will be anywhere near a finished state for p2p, this might be one of the
biggest steps forward this package takes.

We've reworked the request/response model into an architecture that allows methods to form fit
to the task, polling peers & sending messages until the data in question is found, or a deadline
passes, whichever comes first.

What's more, WE ACTUALLY HAVE TESTS FOR THIS STUFF. While the tests, again, have a ways to go,
coverage of this package was around 18% before now.

Before we merge this I'm hoping to ship a modification to fsrepo that caches a bunch of the info
that's flying around the network to facilitate peers covering for each other in network churn.

fuego.
  • Loading branch information
b5 committed Mar 17, 2018
1 parent f7f5ad4 commit 7a4e292
Show file tree
Hide file tree
Showing 59 changed files with 24,826 additions and 1,276 deletions.
5 changes: 5 additions & 0 deletions api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@ import (
"testing"
"time"

golog "github.com/ipfs/go-log"
"github.com/qri-io/dataset/dsfs"
"github.com/qri-io/qri/repo/test"
)

func TestServerRoutes(t *testing.T) {
// bump up log level to keep test output clean
golog.SetLogLevel("qriapi", "error")
defer golog.SetLogLevel("qriapi", "info")

// in order to have consistent responses
// we need to artificially specify the timestamp
// we use the dsfs.Timestamp func variable to override
Expand Down
2 changes: 1 addition & 1 deletion api/testdata/getResponseFamilyRelationships.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"data": {
"peername": "peer",
"name": "family_relationships",
"peerID": "QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt",
"name": "family_relationships",
"path": "/map/QmdbJGpmZKsbKpBGQbWS7PjodGtrXX3hAHvxdgUsuf9a3N",
"dataset": {
"commit": {
Expand Down
8 changes: 4 additions & 4 deletions api/testdata/historyResponse.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
"data": [
{
"peername": "peer",
"name": "cities",
"peerID": "QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt",
"name": "cities",
"path": "/map/QmcQsi93yUryyWvw6mPyDNoKRb7FcBx8QGBAeJ25kXQjnC",
"dataset": {
"commit": {
Expand Down Expand Up @@ -68,8 +68,8 @@
},
{
"peername": "peer",
"name": "cities",
"peerID": "QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt",
"name": "cities",
"path": "/map/QmdvEDH2hNqasqWtWwJn6Jwdvi56jGoxT5u5DsSHbtSPYM",
"dataset": {
"commit": {
Expand Down Expand Up @@ -134,8 +134,8 @@
},
{
"peername": "peer",
"name": "cities",
"peerID": "QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt",
"name": "cities",
"path": "/map/Qme1fYAJWVfcdJtqsiTcLNWFF8b4Dq7jy7knjUYUTuVmZS",
"dataset": {
"commit": {
Expand Down Expand Up @@ -196,8 +196,8 @@
},
{
"peername": "peer",
"name": "cities",
"peerID": "QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt",
"name": "cities",
"path": "/map/QmaR2c8PFeUWPpbE6rxpD3WbhtSTuACbFBa7JMSZLAHRFX",
"dataset": {
"commit": {
Expand Down
8 changes: 4 additions & 4 deletions api/testdata/historyResponseAt.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
"data": [
{
"peername": "peer",
"name": "cities",
"peerID": "QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt",
"name": "cities",
"path": "/map/QmcQsi93yUryyWvw6mPyDNoKRb7FcBx8QGBAeJ25kXQjnC",
"dataset": {
"commit": {
Expand Down Expand Up @@ -68,8 +68,8 @@
},
{
"peername": "peer",
"name": "cities",
"peerID": "QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt",
"name": "cities",
"path": "/map/QmdvEDH2hNqasqWtWwJn6Jwdvi56jGoxT5u5DsSHbtSPYM",
"dataset": {
"commit": {
Expand Down Expand Up @@ -134,8 +134,8 @@
},
{
"peername": "peer",
"name": "cities",
"peerID": "QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt",
"name": "cities",
"path": "/map/Qme1fYAJWVfcdJtqsiTcLNWFF8b4Dq7jy7knjUYUTuVmZS",
"dataset": {
"commit": {
Expand Down Expand Up @@ -196,8 +196,8 @@
},
{
"peername": "peer",
"name": "cities",
"peerID": "QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt",
"name": "cities",
"path": "/map/QmaR2c8PFeUWPpbE6rxpD3WbhtSTuACbFBa7JMSZLAHRFX",
"dataset": {
"commit": {
Expand Down
8 changes: 4 additions & 4 deletions api/testdata/historyResponsePath.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
"data": [
{
"peername": "peer",
"name": "cities",
"peerID": "QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt",
"name": "cities",
"path": "/map/QmcQsi93yUryyWvw6mPyDNoKRb7FcBx8QGBAeJ25kXQjnC",
"dataset": {
"commit": {
Expand Down Expand Up @@ -68,8 +68,8 @@
},
{
"peername": "peer",
"name": "cities",
"peerID": "QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt",
"name": "cities",
"path": "/map/QmdvEDH2hNqasqWtWwJn6Jwdvi56jGoxT5u5DsSHbtSPYM",
"dataset": {
"commit": {
Expand Down Expand Up @@ -134,8 +134,8 @@
},
{
"peername": "peer",
"name": "cities",
"peerID": "QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt",
"name": "cities",
"path": "/map/Qme1fYAJWVfcdJtqsiTcLNWFF8b4Dq7jy7knjUYUTuVmZS",
"dataset": {
"commit": {
Expand Down Expand Up @@ -196,8 +196,8 @@
},
{
"peername": "peer",
"name": "cities",
"peerID": "QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt",
"name": "cities",
"path": "/map/QmaR2c8PFeUWPpbE6rxpD3WbhtSTuACbFBa7JMSZLAHRFX",
"dataset": {
"commit": {
Expand Down
8 changes: 4 additions & 4 deletions api/testdata/listResponse.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
"data": [
{
"peername": "peer",
"name": "cities",
"peerID": "QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt",
"name": "cities",
"path": "/map/QmdvEDH2hNqasqWtWwJn6Jwdvi56jGoxT5u5DsSHbtSPYM",
"dataset": {
"commit": {
Expand Down Expand Up @@ -68,8 +68,8 @@
},
{
"peername": "peer",
"name": "counter",
"peerID": "QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt",
"name": "counter",
"path": "/map/Qmdg3KMzTT7UwxBZu8BGqBdF4YXM17rN51hn3hbQkJWT9v",
"dataset": {
"commit": {
Expand Down Expand Up @@ -111,8 +111,8 @@
},
{
"peername": "peer",
"name": "craigslist",
"peerID": "QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt",
"name": "craigslist",
"path": "/map/QmdV6TqbjvwDqZrqPDyXeMYujEZaiaG18YjXVRhD66VFfZ",
"dataset": {
"commit": {
Expand Down Expand Up @@ -24022,8 +24022,8 @@
},
{
"peername": "peer",
"name": "movies",
"peerID": "QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt",
"name": "movies",
"path": "/map/QmV8H7KjK3u3tVfJEeUqTR42Tbia7oQo1hMfeZjmkBQZQn",
"dataset": {
"commit": {
Expand Down
2 changes: 1 addition & 1 deletion api/testdata/removeResponseByPath.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"data": {
"peername": "peer",
"name": "family_relationships",
"peerID": "QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt",
"name": "family_relationships",
"path": "/map/QmdbJGpmZKsbKpBGQbWS7PjodGtrXX3hAHvxdgUsuf9a3N",
"dataset": {
"commit": {
Expand Down
2 changes: 1 addition & 1 deletion api/testdata/removeResponseWithPath.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"data": {
"peername": "peer",
"name": "cities",
"peerID": "QmZePf5LeXow3RW5U1AgEiNbW46YnRGhZ7HPvm1UmPFPwt",
"name": "cities",
"path": "/map/QmcQsi93yUryyWvw6mPyDNoKRb7FcBx8QGBAeJ25kXQjnC",
"dataset": {
"commit": {
Expand Down
21 changes: 14 additions & 7 deletions core/datasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/qri-io/jsonschema"
"github.com/qri-io/qri/p2p"
"github.com/qri-io/qri/repo"
"github.com/qri-io/qri/repo/profile"
"github.com/qri-io/varName"
)

Expand Down Expand Up @@ -87,7 +88,14 @@ func (r *DatasetRequests) List(p *ListParams, res *[]repo.DatasetRef) error {
return fmt.Errorf("cannot list remote datasets without p2p connection")
}

replies, err := r.Node.RequestDatasetsList(ds.Peername)
id, err := profile.IDB58Decode(ds.PeerID)
if err != nil {
return fmt.Errorf("error %s", err.Error())
}
replies, err := r.Node.RequestDatasetsList(id, p2p.DatasetsListParams{
Limit: p.Limit,
Offset: p.Offset,
})
*res = replies
return err
}
Expand Down Expand Up @@ -148,7 +156,8 @@ func (r *DatasetRequests) Get(p *repo.DatasetRef, res *repo.DatasetRef) (err err

getRemote := func(err error) error {
if r.Node != nil {
ref, err := r.Node.RequestDatasetInfo(p)
ref := p
err := r.Node.RequestDataset(ref)
if ref != nil {
ds := ref.Dataset
// TODO - this is really stupid, p2p.RequestDatasetInfo should return an error here
Expand Down Expand Up @@ -325,7 +334,7 @@ func (r *DatasetRequests) Init(p *InitParams, res *repo.DatasetRef) error {
}

dataf := cafs.NewMemfileBytes("data."+st.Format.String(), data)
dskey, err := r.repo.CreateDataset(ds, dataf, true)
dskey, err := r.repo.CreateDataset(name, ds, dataf, true)
if err != nil {
log.Debugf("error creating dataset: %s\n", err.Error())
return err
Expand Down Expand Up @@ -500,7 +509,7 @@ func (r *DatasetRequests) Save(p *SaveParams, res *repo.DatasetRef) (err error)
ds.Structure.SetPath("")

dataf = cafs.NewMemfileBytes("data."+st.Format.String(), data)
dspath, err := r.repo.CreateDataset(ds, dataf, true)
dspath, err := r.repo.CreateDataset(p.Name, ds, dataf, true)
if err != nil {
fmt.Printf("create ds error: %s\n", err.Error())
return err
Expand Down Expand Up @@ -735,11 +744,9 @@ func (r *DatasetRequests) Add(ref *repo.DatasetRef, res *repo.DatasetRef) (err e
}

if ref.Path == "" && r.Node != nil {
res, err := r.Node.RequestDatasetInfo(ref)
if err != nil {
if err := r.Node.RequestDataset(ref); err != nil {
return err
}
ref = res
}

fs, ok := r.repo.Store().(*ipfs.Filestore)
Expand Down
1 change: 0 additions & 1 deletion core/datasets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
func TestDatasetRequestsInit(t *testing.T) {
badDataFile := testrepo.BadDataFile
jobsByAutomationFile := testrepo.JobsByAutomationFile
// jobsByAutomationFile2 := testrepo.JobsByAutomationFile2
// badDataFormatFile := testrepo.BadDataFormatFile
// badStructureFile := testrepo.BadStructureFile

Expand Down
26 changes: 4 additions & 22 deletions core/peers.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package core

import (
"encoding/json"
"fmt"
"net/rpc"

// "github.com/ipfs/go-datastore/query"
"github.com/qri-io/qri/p2p"
"github.com/qri-io/qri/repo"
"github.com/qri-io/qri/repo/profile"
Expand Down Expand Up @@ -200,27 +198,11 @@ func (d *PeerRequests) GetReferences(p *PeerRefsParams, res *[]repo.DatasetRef)
return err
}

r, err := d.qriNode.SendMessage(id, &p2p.Message{
Phase: p2p.MpRequest,
Type: p2p.MtDatasets,
Payload: &p2p.DatasetsReqParams{
Limit: p.Limit,
Offset: p.Offset,
},
refs, err := d.qriNode.RequestDatasetsList(id, p2p.DatasetsListParams{
Limit: p.Limit,
Offset: p.Offset,
})
if err != nil {
return fmt.Errorf("error sending message to peer: %s", err.Error())
}

data, err := json.Marshal(r.Payload)
if err != nil {
return fmt.Errorf("error encoding peer response: %s", err.Error())
}
refs := []repo.DatasetRef{}
if err := json.Unmarshal(data, &refs); err != nil {
return fmt.Errorf("error parsing peer response: %s", err.Error())
}

*res = refs
return nil
return err
}
21 changes: 5 additions & 16 deletions p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ type NodeCfg struct {
PeerID peer.ID
PubKey crypto.PubKey
PrivKey crypto.PrivKey

// Port default port to bind a tcp listener to
// ignored if Addrs is supplied
Port int
Expand Down Expand Up @@ -54,14 +53,10 @@ func DefaultNodeCfg() *NodeCfg {
}

return &NodeCfg{
Online: true,
PeerID: pid,
PrivKey: priv,
PubKey: pub,
// RepoPath: "~/qri",
// TODO - enabling this causes all nodes to broadcast
// on the same address, which isn't good. figure out why
// Port: 4444,
Online: true,
PeerID: pid,
PrivKey: priv,
PubKey: pub,
QriBootstrapAddrs: DefaultBootstrapAddresses,
Secure: true,
}
Expand All @@ -83,14 +78,8 @@ func (cfg *NodeCfg) Validate(r repo.Repo) error {
// If no listening addresses are set, allocate
// a tcp multiaddress on local host bound to the default port
if cfg.Addrs == nil {
// find an open tcp port
port, err := LocalOpenPort("tcp", cfg.Port)
if err != nil {
return err
}

// Create a multiaddress
addr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port))
addr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", cfg.Port))
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 7a4e292

Please sign in to comment.