Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dht find connected peers #428

Merged
merged 11 commits into from
Dec 9, 2014
8 changes: 4 additions & 4 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func bootstrap(ctx context.Context,

var notConnected []peer.Peer
for _, p := range bootstrapPeers {
if !n.IsConnected(p) {
if n.Connectedness(p) != inet.Connected {
notConnected = append(notConnected, p)
}
}
Expand Down
31 changes: 29 additions & 2 deletions net/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ type Network interface {
// Listen(*ma.Muliaddr) error
// TODO: for now, only listen on addrs in local peer when initializing.

// LocalPeer returns the local peer associated with this network
LocalPeer() peer.Peer

// DialPeer attempts to establish a connection to a given peer
DialPeer(context.Context, peer.Peer) error

// ClosePeer connection to peer
ClosePeer(peer.Peer) error

// IsConnected returns whether a connection to given peer exists.
IsConnected(peer.Peer) bool
// Connectedness returns a state signaling connection capabilities
Connectedness(peer.Peer) Connectedness

// GetProtocols returns the protocols registered in the network.
GetProtocols() *mux.ProtocolMap
Expand Down Expand Up @@ -71,7 +74,31 @@ type Service srv.Service
// (this is usually just a Network, but other services may not need the whole
// stack, and thus it becomes easier to mock)
type Dialer interface {
// LocalPeer returns the local peer associated with this network
LocalPeer() peer.Peer

// DialPeer attempts to establish a connection to a given peer
DialPeer(context.Context, peer.Peer) error

// Connectedness returns a state signaling connection capabilities
Connectedness(peer.Peer) Connectedness
}

// Connectedness signals the capacity for a connection with a given node.
// It is used to signal to services and other peers whether a node is reachable.
type Connectedness int

const (
// NotConnected means no connection to peer, and no extra information (default)
NotConnected Connectedness = iota

// Connected means has an open, live connection to peer
Connected

// CanConnect means recently connected to peer, terminated gracefully
CanConnect

// CannotConnect means recently attempted connecting but failed to connect.
// (should signal "made effort, failed")
CannotConnect
)
16 changes: 15 additions & 1 deletion net/net.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// package net provides an interface for ipfs to interact with the network through
// Package net provides an interface for ipfs to interact with the network through
package net

import (
Expand Down Expand Up @@ -69,6 +69,11 @@ func (n *IpfsNetwork) DialPeer(ctx context.Context, p peer.Peer) error {
return err
}

// LocalPeer the network's LocalPeer
func (n *IpfsNetwork) LocalPeer() peer.Peer {
return n.swarm.LocalPeer()
}

// ClosePeer connection to peer
func (n *IpfsNetwork) ClosePeer(p peer.Peer) error {
return n.swarm.CloseConnection(p)
Expand Down Expand Up @@ -126,3 +131,12 @@ func (n *IpfsNetwork) ListenAddresses() []ma.Multiaddr {
func (n *IpfsNetwork) InterfaceListenAddresses() ([]ma.Multiaddr, error) {
return n.swarm.InterfaceListenAddresses()
}

// Connectedness returns a state signaling connection capabilities
// For now only returns Connecter || NotConnected. Expand into more later.
func (n *IpfsNetwork) Connectedness(p peer.Peer) Connectedness {
if n.swarm.GetConnection(p.ID()) != nil {
return Connected
}
return NotConnected
}
5 changes: 5 additions & 0 deletions net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,8 @@ func (s *Swarm) GetPeerList() []peer.Peer {
s.connsLock.RUnlock()
return out
}

// LocalPeer returns the local peer swarm is associated to.
func (s *Swarm) LocalPeer() peer.Peer {
return s.local
}
4 changes: 4 additions & 0 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@ func (p *peer) Addresses() []ma.Multiaddr {
// AddAddress adds the given Multiaddr address to Peer's addresses.
// Returns whether this address was a newly added address
func (p *peer) AddAddress(a ma.Multiaddr) bool {
if a == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is programmer error. It's not an error that should be allowed at runtime / handled. variable addresses should be addressed long before we get here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more generally, i'm coming to think that returning err all over the place can actually lead to harder to debug things than well understood panics. it treats error as a catch-all, instead of purposeful error handling of things known to go wrong. imagine if index errors returned err in slices? the stdlib panics all over the place to signal what we definitely shouldnt do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not the errors themselves that cause that. it's masking them behind log statements and continuing over them. those practices will certainly make it harder to debug behavior.

i'm in complete disagreement regarding panics versus erroring. but whatever

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#412 documents a panic that takes down the daemon during ipfs swarm connect. i weep when i see more opportunities for panicking going into subsystems.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's masking them behind log statements and continuing over them. those practices will certainly make it harder to debug behavior.

this is a separate issue, and yes, if that's used incorrectly it's pretty bad. In servers though, logging the error often IS exactly what we want to do (think 404s, 500s, etc). err should be handled correctly OR gracefully bubble back to {exit the application in cli, exit the request in server}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a separate issue, and yes, if that's used incorrectly it's pretty bad. In servers though, logging the error often IS exactly what we want to do (think 404s, 500s, etc)

I mean to call attention to the fact the this separate issue is the actual reason why one would begin to lose confidence in errors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean to call attention to the fact the this separate issue is the actual reason why one would begin to lose confidence in errors.

My point is that adding errors all over the place is bad practice itself. Look at the stdlib: https://gist.github.com/jbenet/9c20705c43a7ae47f9be Programmer error that is always incorrect, should not ship, should panic. We disagree on this and i dont think we'll reach consensus now.

And yeah, #412 is a problem that needs to be addressed.

panic("adding a nil Multiaddr")
}

p.Lock()
defer p.Unlock()

Expand Down
60 changes: 17 additions & 43 deletions routing/dht/dht.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// package dht implements a distributed hash table that satisfies the ipfs routing
// Package dht implements a distributed hash table that satisfies the ipfs routing
// interface. This DHT is modeled after kademlia with Coral and S/Kademlia modifications.
package dht

Expand Down Expand Up @@ -227,7 +227,7 @@ func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) er
pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(key), 0)

// add self as the provider
pmes.ProviderPeers = pb.PeersToPBPeers([]peer.Peer{dht.self})
pmes.ProviderPeers = pb.PeersToPBPeers(dht.dialer, []peer.Peer{dht.self})

rpmes, err := dht.sendRequest(ctx, p, pmes)
if err != nil {
Expand Down Expand Up @@ -274,14 +274,11 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,
}

// Perhaps we were given closer peers
var peers []peer.Peer
for _, pb := range pmes.GetCloserPeers() {
pr, err := dht.peerFromInfo(pb)
peers, errs := pb.PBPeersToPeers(dht.peerstore, pmes.GetCloserPeers())
for _, err := range errs {
if err != nil {
log.Error(err)
continue
}
peers = append(peers, pr)
}

if len(peers) > 0 {
Expand Down Expand Up @@ -426,22 +423,20 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.
return dht.sendRequest(ctx, p, pmes)
}

func (dht *IpfsDHT) addProviders(key u.Key, peers []*pb.Message_Peer) []peer.Peer {
var provArr []peer.Peer
for _, prov := range peers {
p, err := dht.peerFromInfo(prov)
if err != nil {
log.Errorf("error getting peer from info: %v", err)
continue
}

log.Debugf("%s adding provider: %s for %s", dht.self, p, key)
func (dht *IpfsDHT) addProviders(key u.Key, pbps []*pb.Message_Peer) []peer.Peer {
peers, errs := pb.PBPeersToPeers(dht.peerstore, pbps)
for _, err := range errs {
log.Errorf("error converting peer: %v", err)
}

var provArr []peer.Peer
for _, p := range peers {
// Dont add outselves to the list
if p.ID().Equal(dht.self.ID()) {
continue
}

log.Debugf("%s adding provider: %s for %s", dht.self, p, key)
// TODO(jbenet) ensure providers is idempotent
dht.providers.AddProvider(key, p)
provArr = append(provArr, p)
Expand Down Expand Up @@ -500,35 +495,14 @@ func (dht *IpfsDHT) getPeer(id peer.ID) (peer.Peer, error) {
return p, nil
}

// peerFromInfo returns a peer using info in the protobuf peer struct
// to lookup or create a peer
func (dht *IpfsDHT) peerFromInfo(pbp *pb.Message_Peer) (peer.Peer, error) {

id := peer.ID(pbp.GetId())

// bail out if it's ourselves
//TODO(jbenet) not sure this should be an error _here_
if id.Equal(dht.self.ID()) {
return nil, errors.New("found self")
}

p, err := dht.getPeer(id)
if err != nil {
return nil, err
}

maddr, err := pbp.Address()
func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, pbp *pb.Message_Peer) (peer.Peer, error) {
p, err := pb.PBPeerToPeer(dht.peerstore, pbp)
if err != nil {
return nil, err
}
p.AddAddress(maddr)
return p, nil
}

func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, pbp *pb.Message_Peer) (peer.Peer, error) {
p, err := dht.peerFromInfo(pbp)
if err != nil {
return nil, err
if dht.dialer.LocalPeer().ID().Equal(p.ID()) {
return nil, errors.New("attempting to ensure connection to self")
}

// dial connection
Expand Down Expand Up @@ -583,7 +557,7 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) {
rand.Read(id)
p, err := dht.FindPeer(ctx, peer.ID(id))
if err != nil {
log.Error("Bootstrap peer error: %s", err)
log.Errorf("Bootstrap peer error: %s", err)
}
err = dht.dialer.DialPeer(ctx, p)
if err != nil {
Expand Down
103 changes: 103 additions & 0 deletions routing/dht/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dht

import (
"bytes"
"sort"
"testing"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Expand Down Expand Up @@ -64,6 +65,14 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer
return addrs, peers, dhts
}

func makePeerString(t *testing.T, addr string) peer.Peer {
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
t.Fatal(err)
}
return makePeer(maddr)
}

func makePeer(addr ma.Multiaddr) peer.Peer {
sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512)
if err != nil {
Expand Down Expand Up @@ -406,6 +415,100 @@ func TestFindPeer(t *testing.T) {
}
}

func TestFindPeersConnectedToPeer(t *testing.T) {
if testing.Short() {
t.SkipNow()
}

ctx := context.Background()
u.Debug = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prev := u.Debug
u.Debug = false
defer u.Debug = prev

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woah what. sorry idk how that got in there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, lovely pattern.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huh, it's all over the place in this file. dont remember why. @whyrusleeping any idea?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merging. we'll solve this later. #430


_, peers, dhts := setupDHTS(ctx, 4, t)
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Close()
dhts[i].dialer.(inet.Network).Close()
}
}()

// topology:
// 0-1, 1-2, 1-3, 2-3
err := dhts[0].Connect(ctx, peers[1])
if err != nil {
t.Fatal(err)
}

err = dhts[1].Connect(ctx, peers[2])
if err != nil {
t.Fatal(err)
}

err = dhts[1].Connect(ctx, peers[3])
if err != nil {
t.Fatal(err)
}

err = dhts[2].Connect(ctx, peers[3])
if err != nil {
t.Fatal(err)
}

// fmt.Println("0 is", peers[0])
// fmt.Println("1 is", peers[1])
// fmt.Println("2 is", peers[2])
// fmt.Println("3 is", peers[3])

ctxT, _ := context.WithTimeout(ctx, time.Second)
pchan, err := dhts[0].FindPeersConnectedToPeer(ctxT, peers[2].ID())
if err != nil {
t.Fatal(err)
}

// shouldFind := []peer.Peer{peers[1], peers[3]}
found := []peer.Peer{}
for nextp := range pchan {
found = append(found, nextp)
}

// fmt.Printf("querying 0 (%s) FindPeersConnectedToPeer 2 (%s)\n", peers[0], peers[2])
// fmt.Println("should find 1, 3", shouldFind)
// fmt.Println("found", found)

// testPeerListsMatch(t, shouldFind, found)

log.Warning("TestFindPeersConnectedToPeer is not quite correct")
if len(found) == 0 {
t.Fatal("didn't find any peers.")
}
}

func testPeerListsMatch(t *testing.T, p1, p2 []peer.Peer) {

if len(p1) != len(p2) {
t.Fatal("did not find as many peers as should have", p1, p2)
}

ids1 := make([]string, len(p1))
ids2 := make([]string, len(p2))

for i, p := range p1 {
ids1[i] = p.ID().Pretty()
}

for i, p := range p2 {
ids2[i] = p.ID().Pretty()
}

sort.Sort(sort.StringSlice(ids1))
sort.Sort(sort.StringSlice(ids2))

for i := range ids1 {
if ids1[i] != ids2[i] {
t.Fatal("Didnt find expected peer", ids1[i], ids2)
}
}
}

func TestConnectCollision(t *testing.T) {
if testing.Short() {
t.SkipNow()
Expand Down
Loading