Skip to content

Commit

Permalink
dht: FindPeersConnectedToPeer
Browse files Browse the repository at this point in the history
  • Loading branch information
jbenet committed Dec 9, 2014
1 parent b6bd79c commit ab1d12d
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 0 deletions.
95 changes: 95 additions & 0 deletions routing/dht/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dht

import (
"bytes"
"sort"
"testing"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Expand Down Expand Up @@ -414,6 +415,100 @@ func TestFindPeer(t *testing.T) {
}
}

func TestFindPeersConnectedToPeer(t *testing.T) {
if testing.Short() {
t.SkipNow()
}

ctx := context.Background()
u.Debug = false

_, peers, dhts := setupDHTS(ctx, 4, t)
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Close()
dhts[i].dialer.(inet.Network).Close()
}
}()

// topology:
// 0-1, 1-2, 1-3, 2-3
err := dhts[0].Connect(ctx, peers[1])
if err != nil {
t.Fatal(err)
}

err = dhts[1].Connect(ctx, peers[2])
if err != nil {
t.Fatal(err)
}

err = dhts[1].Connect(ctx, peers[3])
if err != nil {
t.Fatal(err)
}

err = dhts[2].Connect(ctx, peers[3])
if err != nil {
t.Fatal(err)
}

// fmt.Println("0 is", peers[0])
// fmt.Println("1 is", peers[1])
// fmt.Println("2 is", peers[2])
// fmt.Println("3 is", peers[3])

ctxT, _ := context.WithTimeout(ctx, time.Second)
pchan, err := dhts[0].FindPeersConnectedToPeer(ctxT, peers[2].ID())
if err != nil {
t.Fatal(err)
}

// shouldFind := []peer.Peer{peers[1], peers[3]}
found := []peer.Peer{}
for nextp := range pchan {
found = append(found, nextp)
}

// fmt.Printf("querying 0 (%s) FindPeersConnectedToPeer 2 (%s)\n", peers[0], peers[2])
// fmt.Println("should find 1, 3", shouldFind)
// fmt.Println("found", found)

// testPeerListsMatch(t, shouldFind, found)

log.Warning("TestFindPeersConnectedToPeer is not quite correct")
if len(found) == 0 {
t.Fatal("didn't find any peers.")
}
}

func testPeerListsMatch(t *testing.T, p1, p2 []peer.Peer) {

if len(p1) != len(p2) {
t.Fatal("did not find as many peers as should have", p1, p2)
}

ids1 := make([]string, len(p1))
ids2 := make([]string, len(p2))

for i, p := range p1 {
ids1[i] = p.ID().Pretty()
}

for i, p := range p2 {
ids2[i] = p.ID().Pretty()
}

sort.Sort(sort.StringSlice(ids1))
sort.Sort(sort.StringSlice(ids2))

for i := range ids1 {
if ids1[i] != ids2[i] {
t.Fatal("Didnt find expected peer", ids1[i], ids2)
}
}
}

func TestConnectCollision(t *testing.T) {
if testing.Short() {
t.SkipNow()
Expand Down
1 change: 1 addition & 0 deletions routing/dht/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.Peer, pmes *pb.Me
for _, p := range withAddresses {
log.Debugf("handleFindPeer: sending back '%s'", p)
}

resp.CloserPeers = pb.PeersToPBPeers(dht.dialer, withAddresses)
return resp, nil
}
Expand Down
70 changes: 70 additions & 0 deletions routing/dht/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

inet "github.com/jbenet/go-ipfs/net"
peer "github.com/jbenet/go-ipfs/peer"
"github.com/jbenet/go-ipfs/routing"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
Expand Down Expand Up @@ -268,6 +269,75 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error)
return result.peer, nil
}

// FindPeersConnectedToPeer searches for peers directly connected to a given peer.
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan peer.Peer, error) {

peerchan := make(chan peer.Peer, 10)
peersSeen := map[string]peer.Peer{}

routeLevel := 0
closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertPeerID(id), AlphaValue)
if closest == nil || len(closest) == 0 {
return nil, kb.ErrLookupFailure
}

// setup the Query
query := newQuery(u.Key(id), dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {

pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel)
if err != nil {
return nil, err
}

var clpeers []peer.Peer
closer := pmes.GetCloserPeers()
for _, pbp := range closer {
// skip peers already seen
if _, found := peersSeen[string(pbp.GetId())]; found {
continue
}

// skip peers that fail to unmarshal
p, err := pb.PBPeerToPeer(dht.peerstore, pbp)
if err != nil {
log.Warning(err)
continue
}

// if peer is connected, send it to our client.
if pb.Connectedness(*pbp.Connection) == inet.Connected {
select {
case <-ctx.Done():
return nil, ctx.Err()
case peerchan <- p:
}
}

peersSeen[string(p.ID())] = p

// if peer is the peer we're looking for, don't bother querying it.
if pb.Connectedness(*pbp.Connection) != inet.Connected {
clpeers = append(clpeers, p)
}
}

return &dhtQueryResult{closerPeers: clpeers}, nil
})

// run it! run it asynchronously to gen peers as results are found.
// this does no error checking
go func() {
if _, err := query.Run(ctx, closest); err != nil {
log.Error(err)
}

// close the peerchan channel when done.
close(peerchan)
}()

return peerchan, nil
}

// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.Peer) error {
// Thoughts: maybe this should accept an ID and do a peer lookup?
Expand Down

0 comments on commit ab1d12d

Please sign in to comment.