Skip to content

Commit

Permalink
fix(peer sharing): fix peers not sharing info with each other
Browse files Browse the repository at this point in the history
Trying to make inroads on #510, I've re-worked the way peers share info about
each other. Instead of asking for profile information, a new request method "qri_peers"
asks for just the list of peer & profile id's a connected peer is connected to.
From there the local peer examines the list, and attempts to connect to any addresses it
hasn't seen before. This process repeats until no new peers are found.

Lots of work left to do here, but this should be a massive improvement
  • Loading branch information
b5 committed Oct 10, 2018
1 parent 00199a9 commit 8219bab
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 369 deletions.
7 changes: 4 additions & 3 deletions p2p/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,14 +385,15 @@ func (n *QriNode) HostNetwork() net.Network {
// MakeHandlers generates a map of MsgTypes to their corresponding handler functions
func MakeHandlers(n *QriNode) map[MsgType]HandlerFunc {
return map[MsgType]HandlerFunc{
MtPing: n.handlePing,
MtProfile: n.handleProfile,
MtProfiles: n.handleProfiles,
MtPing: n.handlePing,
MtProfile: n.handleProfile,
// MtProfiles: n.handleProfiles,
MtDatasetInfo: n.handleDataset,
MtDatasets: n.handleDatasetsList,
MtEvents: n.handleEvents,
MtConnected: n.handleConnected,
MtResolveDatasetRef: n.handleResolveDatasetRef,
MtDatasetLog: n.handleDatasetLog,
MtQriPeers: n.handleQriPeers,
}
}
18 changes: 18 additions & 0 deletions p2p/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ func (n *QriNode) ConnectedQriProfiles() map[profile.ID]*config.ProfilePod {
if p, err := n.Repo.Profiles().PeerProfile(conn.RemotePeer()); err == nil {
if pe, err := p.Encode(); err == nil {
pe.Online = true
// Build host multiaddress,
// TODO - this should be a convenience func
hostAddr, err := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", conn.RemotePeer().Pretty()))
if err != nil {
log.Debug(err.Error())
return nil
}

pe.NetworkAddrs = []string{conn.RemoteMultiaddr().Encapsulate(hostAddr).String()}
peers[p.ID] = pe
}
}
Expand Down Expand Up @@ -114,6 +123,15 @@ func (n *QriNode) AddQriPeer(pinfo pstore.PeerInfo) error {
return err
}

go func() {
ps, err := n.RequestQriPeers(pinfo.ID)
if err != nil {
log.Debugf("error fetching qri peers: %s", err)
return
}
n.RequestNewPeers(n.ctx, ps)
}()

return nil
}

Expand Down
16 changes: 11 additions & 5 deletions p2p/peers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ import (
p2ptest "github.com/qri-io/qri/p2p/test"
)

// Convert from test nodes to non-test nodes.
func asQriNodes(testPeers []p2ptest.TestablePeerNode) []*QriNode {
// Convert from test nodes to non-test nodes.
peers := make([]*QriNode, len(testPeers))
for i, node := range testPeers {
peers[i] = node.(*QriNode)
}
return peers
}

func TestConnectedQriProfiles(t *testing.T) {
ctx := context.Background()
f := p2ptest.NewTestNodeFactory(NewTestableQriNode)
Expand All @@ -21,11 +31,7 @@ func TestConnectedQriProfiles(t *testing.T) {
return
}

// Convert from test nodes to non-test nodes.
nodes := make([]*QriNode, len(testPeers))
for i, node := range testPeers {
nodes[i] = node.(*QriNode)
}
nodes := asQriNodes(testPeers)

pros := nodes[0].ConnectedQriProfiles()
if len(pros) != len(nodes)-1 {
Expand Down
117 changes: 0 additions & 117 deletions p2p/profiles.go

This file was deleted.

133 changes: 133 additions & 0 deletions p2p/qri_peers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package p2p

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/qri-io/qri/config"
"github.com/qri-io/qri/repo/profile"

ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
peer "gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer"
)

// MtQriPeers is a request to get a list of known qri peers
var MtQriPeers = MsgType("qri_peers")

// QriPeer is a minimial struct that combines a profileID & network addresses
type QriPeer struct {
ProfileID string
PeerID string
NetworkAddrs []string
}

func toQriPeers(psm map[profile.ID]*config.ProfilePod) (peers []QriPeer) {
for id, pp := range psm {
p := QriPeer{
ProfileID: id.String(),
NetworkAddrs: pp.NetworkAddrs,
}
if len(pp.PeerIDs) >= 1 {
p.PeerID = pp.PeerIDs[0]
}
peers = append(peers, p)
}
return
}

// RequestNewPeers intersects a provided list of peer info with this node's existing
// connections and attempts to connect to any peers it doesn't have connections to
func (n *QriNode) RequestNewPeers(ctx context.Context, peers []QriPeer) {
var newPeers []QriPeer
connected := n.ConnectedQriProfiles()
for _, p := range peers {
proID, err := profile.NewB58ID(p.ProfileID)
if err != nil {
continue
}

if connected[proID] != nil {
continue
}
newPeers = append(newPeers, p)
}

for _, p := range newPeers {
// TODO -
ID, err := peer.IDB58Decode(strings.TrimPrefix(p.PeerID, "/ipfs/"))
if err != nil {
continue
}

ms, err := ParseMultiaddrs(p.NetworkAddrs)
if err != nil {
continue
}

var m ma.Multiaddr
if len(ms) > 0 {
m = ms[0]
}

go n.ConnectToPeer(ctx, PeerConnectionParams{
PeerID: ID,
Multiaddr: m,
})
}
}

// RequestQriPeers asks a designated peer for a list of qri peers
func (n *QriNode) RequestQriPeers(id peer.ID) ([]QriPeer, error) {
log.Debugf("%s RequestQriPeers: %s", n.ID, id)

if id == n.ID {
// requesting self isn't a network operation
return toQriPeers(n.ConnectedQriProfiles()), nil
}

if !n.Online {
return nil, fmt.Errorf("not connected to p2p network")
}

req, err := NewJSONBodyMessage(n.ID, MtQriPeers, nil)
if err != nil {
log.Debug(err.Error())
return nil, err
}

req = req.WithHeaders("phase", "request")

replies := make(chan Message)
err = n.SendMessage(req, replies, id)
if err != nil {
log.Debug(err.Error())
return nil, fmt.Errorf("send dataset info message error: %s", err.Error())
}

res := <-replies
peers := []QriPeer{}
err = json.Unmarshal(res.Body, &peers)
return peers, err
}

func (n *QriNode) handleQriPeers(ws *WrappedStream, msg Message) (hangup bool) {
hangup = true
switch msg.Header("phase") {
case "request":
ps := toQriPeers(n.ConnectedQriProfiles())
reply, err := msg.UpdateJSON(ps)
if err != nil {
log.Debug(err.Error())
return
}

reply = reply.WithHeaders("phase", "response")
if err := ws.sendMessage(reply); err != nil {
log.Debug(err.Error())
return
}
}
return
}
52 changes: 52 additions & 0 deletions p2p/qri_peers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package p2p

import (
"context"
"fmt"
"testing"
"time"

"github.com/qri-io/qri/p2p/test"
)

// This test connects four nodes to each other, then connects a fifth node to
// one of those four nodes.
// Test passes when the fifth node connects to the other three nodes by asking
// it's one connection for the other three peer's profiles
func TestSharePeers(t *testing.T) {
fmt.Println("hallo?")
ctx := context.Background()
f := p2ptest.NewTestNodeFactory(NewTestableQriNode)
testPeers, err := p2ptest.NewTestNetwork(ctx, f, 5)
if err != nil {
t.Fatalf("error creating network: %s", err.Error())
}

single := testPeers[0]
group := testPeers[1:]

if err := p2ptest.ConnectQriPeers(ctx, group); err != nil {
t.Errorf("error connecting peers: %s", err.Error())
}

nasma := single.(*QriNode)
done := make(chan bool)
deadline := time.NewTimer(time.Second * 2)
go func() {
for msg := range nasma.ReceiveMessages() {
if len(nasma.ConnectedPeers()) == len(group) {
fmt.Println(msg.Type, len(nasma.ConnectedPeers()))
done <- true
}
}
}()

nasma.AddQriPeer(group[0].SimplePeerInfo())

select {
case <-done:
return
case <-deadline.C:
t.Errorf("peers took too long to connect")
}
}
Loading

0 comments on commit 8219bab

Please sign in to comment.