Skip to content

Commit

Permalink
Merge pull request ipfs#74 from libp2p/fix/70
Browse files Browse the repository at this point in the history
Properly track connections to peers in the DHT.
  • Loading branch information
Stebalien committed Jul 27, 2017
2 parents 9b87e92 + 1d55311 commit 27c6861
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 27 deletions.
2 changes: 1 addition & 1 deletion .gx/lastpubver
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.5.5: QmTHyAbD9KzGrseLNzmEoNkVxA8F2h7LQG2iV6uhBqs6kX
2.5.6: QmRKEzkaiwud2LnwJ9CgBrKw122ddKPTMtLizV3DNimVRD
4 changes: 4 additions & 0 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type IpfsDHT struct {

strmap map[peer.ID]*messageSender
smlk sync.Mutex

plk sync.Mutex
peers map[peer.ID]*peerTracker
}

// NewDHT creates a new DHT object with the given peer as the 'local' host
Expand Down Expand Up @@ -106,6 +109,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
birth: time.Now(),
routingTable: kb.NewRoutingTable(KValue, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore()),
peers: make(map[peer.ID]*peerTracker),

Validator: make(record.Validator),
Selector: make(record.Selector),
Expand Down
86 changes: 60 additions & 26 deletions notif.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,24 @@ package dht
import (
"context"
"io"
"time"

inet "github.com/libp2p/go-libp2p-net"
ma "github.com/multiformats/go-multiaddr"
mstream "github.com/multiformats/go-multistream"
)

// TODO: There is a race condition here where we could process notifications
// out-of-order and incorrectly mark some peers as DHT nodes (or not DHT nodes).
// The correct fix for this is nasty so I'm not really sure it's worth it.
// Incorrectly recording or failing to record a DHT node in the routing table
// isn't a big issue.

const dhtCheckTimeout = 10 * time.Second

// netNotifiee defines methods to be used with the IpfsDHT
type netNotifiee IpfsDHT

func (nn *netNotifiee) DHT() *IpfsDHT {
return (*IpfsDHT)(nn)
}

type peerTracker struct {
refcount int
cancel func()
}

func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
dht := nn.DHT()
select {
Expand All @@ -33,36 +29,59 @@ func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
default:
}

go func() {
dht.plk.Lock()
defer dht.plk.Unlock()

conn, ok := nn.peers[v.RemotePeer()]
if ok {
conn.refcount++
return
}

ctx, cancel := context.WithCancel(dht.Context())

nn.peers[v.RemotePeer()] = &peerTracker{
refcount: 1,
cancel: cancel,
}

// Note: We *could* just check the peerstore to see if the remote side supports the dht
// protocol, but its not clear that that information will make it into the peerstore
// by the time this notification is sent. So just to be very careful, we brute force this
// and open a new stream
// Note: We *could* just check the peerstore to see if the remote side supports the dht
// protocol, but its not clear that that information will make it into the peerstore
// by the time this notification is sent. So just to be very careful, we brute force this
// and open a new stream
go nn.testConnection(ctx, v)

// TODO: There's a race condition here where the connection may
// not be open (and we may sit here trying to connect). I've
// added a timeout but that's not really the correct fix.
}

ctx, cancel := context.WithTimeout(dht.Context(), dhtCheckTimeout)
defer cancel()
func (nn *netNotifiee) testConnection(ctx context.Context, v inet.Conn) {
dht := nn.DHT()
for {
s, err := dht.host.NewStream(ctx, v.RemotePeer(), ProtocolDHT, ProtocolDHTOld)

switch err {
case nil:
s.Close()
// connected fine? full dht node
dht.Update(dht.Context(), v.RemotePeer())
dht.plk.Lock()

// Check if canceled under the lock.
if ctx.Err() == nil {
dht.Update(dht.Context(), v.RemotePeer())
}

dht.plk.Unlock()
case io.EOF:
if ctx.Err() == nil {
// Connection died but we may still have *an* open connection (context not canceled) so try again.
continue
}
case mstream.ErrNotSupported:
// Client mode only, don't bother adding them to our routing table
case io.EOF:
// This is kindof an error, but it happens someone often so make it a warning
log.Warningf("checking dht client type: %s", err)
default:
// real error? thats odd
log.Errorf("checking dht client type: %s", err)
}
}()
return
}
}

func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
Expand All @@ -72,7 +91,22 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
return
default:
}
go dht.routingTable.Remove(v.RemotePeer())

dht.plk.Lock()
defer dht.plk.Unlock()

conn, ok := nn.peers[v.RemotePeer()]
if !ok {
// Unmatched disconnects are fine. It just means that we were
// already connected when we registered the listener.
return
}
conn.refcount -= 1
if conn.refcount == 0 {
delete(nn.peers, v.RemotePeer())
conn.cancel()
dht.routingTable.Remove(v.RemotePeer())
}
}

func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {}
Expand Down
71 changes: 71 additions & 0 deletions notify_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package dht

import (
"context"
"testing"
)

func TestNotifieeMultipleConn(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

d1 := setupDHT(ctx, t, false)
d2 := setupDHT(ctx, t, false)

nn1 := (*netNotifiee)(d1)
nn2 := (*netNotifiee)(d2)

connect(t, ctx, d1, d2)
c12 := d1.host.Network().ConnsToPeer(d2.self)[0]
c21 := d2.host.Network().ConnsToPeer(d1.self)[0]

// Pretend to reestablish/re-kill connection
nn1.Connected(d1.host.Network(), c12)
nn2.Connected(d2.host.Network(), c21)

if !checkRoutingTable(d1, d2) {
t.Fatal("no routes")
}
nn1.Disconnected(d1.host.Network(), c12)
nn2.Disconnected(d2.host.Network(), c21)

if !checkRoutingTable(d1, d2) {
t.Fatal("no routes")
}

for _, conn := range d1.host.Network().ConnsToPeer(d2.self) {
conn.Close()
}
for _, conn := range d2.host.Network().ConnsToPeer(d1.self) {
conn.Close()
}

if checkRoutingTable(d1, d2) {
t.Fatal("routes")
}
}

func TestNotifieeFuzz(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

d1 := setupDHT(ctx, t, false)
d2 := setupDHT(ctx, t, false)

for i := 0; i < 100; i++ {
connectNoSync(t, ctx, d1, d2)
for _, conn := range d1.host.Network().ConnsToPeer(d2.self) {
conn.Close()
}
}
if checkRoutingTable(d1, d2) {
t.Fatal("should not have routes")
}
connect(t, ctx, d1, d2)
}

func checkRoutingTable(a, b *IpfsDHT) bool {
// loop until connection notification has been received.
// under high load, this may not happen as immediately as we would like.
return a.routingTable.Find(b.self) != "" && b.routingTable.Find(a.self) != ""
}

0 comments on commit 27c6861

Please sign in to comment.