Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix memory leak holding onto streams unnecessarily #93

Merged
merged 2 commits into from
Sep 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 58 additions & 13 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {

ms := dht.messageSenderForPeer(p)
ms, err := dht.messageSenderForPeer(p)
if err != nil {
return nil, err
}

start := time.Now()

Expand All @@ -97,8 +100,10 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message

// sendMessage sends out a message
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {

ms := dht.messageSenderForPeer(p)
ms, err := dht.messageSenderForPeer(p)
if err != nil {
return err
}

if err := ms.SendMessage(ctx, pmes); err != nil {
return err
Expand All @@ -112,17 +117,36 @@ func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Me
return nil
}

func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) *messageSender {
func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) (*messageSender, error) {
dht.smlk.Lock()
defer dht.smlk.Unlock()

ms, ok := dht.strmap[p]
if !ok {
ms = dht.newMessageSender(p)
dht.strmap[p] = ms
if ok {
dht.smlk.Unlock()
return ms, nil
}

return ms
ms = &messageSender{p: p, dht: dht}
dht.strmap[p] = ms
dht.smlk.Unlock()

if err := ms.prepOrInvalidate(); err != nil {
dht.smlk.Lock()
defer dht.smlk.Unlock()

if msCur, ok := dht.strmap[p]; ok {
// Changed. Use the new one, old one is invalid and
// not in the map so we can just throw it away.
if ms != msCur {
return msCur, nil
}
// Not changed, remove the now invalid stream from the
// map.
delete(dht.strmap, p)
}
// Invalid but not in map. Must have been removed by a disconnect.
return nil, err
}
// All ready to go.
return ms, nil
}

type messageSender struct {
Expand All @@ -133,14 +157,35 @@ type messageSender struct {
p peer.ID
dht *IpfsDHT

invalid bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the purpose of this? doesn't seem to be used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it is. We were toying with using it and decided we didn't need it but forgot to remove it. Now, we realized we really do need it.


If you can think of a better, race-free way of doing this dance, suggestions welcome!

singleMes int
}

func (dht *IpfsDHT) newMessageSender(p peer.ID) *messageSender {
return &messageSender{p: p, dht: dht}
// invalidate is called before this messageSender is removed from the strmap.
// It prevents the messageSender from being reused/reinitialized and then
// forgotten (leaving the stream open).
func (ms *messageSender) invalidate() {
ms.invalid = true
if ms.s != nil {
ms.s.Reset()
ms.s = nil
}
}

func (ms *messageSender) prepOrInvalidate() error {
ms.lk.Lock()
defer ms.lk.Unlock()
if err := ms.prep(); err != nil {
ms.invalidate()
return err
}
return nil
}

func (ms *messageSender) prep() error {
if ms.invalid {
return fmt.Errorf("message sender has been invalidated")
}
if ms.s != nil {
return nil
}
Expand Down
16 changes: 16 additions & 0 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,22 @@ func TestValueGetSet(t *testing.T) {
}
}

func TestInvalidMessageSenderTracking(t *testing.T) {
ctx := context.Background()
dht := setupDHT(ctx, t, false)
foo := peer.ID("asdasd")
_, err := dht.messageSenderForPeer(foo)
if err == nil {
t.Fatal("that shouldnt have succeeded")
}

dht.smlk.Lock()
defer dht.smlk.Unlock()
if len(dht.strmap) > 0 {
t.Fatal("should have no message senders in map")
}
}

func TestProvides(t *testing.T) {
// t.Skip("skipping test to debug another")
ctx := context.Background()
Expand Down
41 changes: 30 additions & 11 deletions notif.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,21 +92,40 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
default:
}

dht.plk.Lock()
defer dht.plk.Unlock()
p := v.RemotePeer()

conn, ok := nn.peers[v.RemotePeer()]
func() {
dht.plk.Lock()
defer dht.plk.Unlock()

conn, ok := nn.peers[p]
if !ok {
// Unmatched disconnects are fine. It just means that we were
// already connected when we registered the listener.
return
}
conn.refcount -= 1
if conn.refcount == 0 {
delete(nn.peers, p)
conn.cancel()
dht.routingTable.Remove(p)
}
}()

dht.smlk.Lock()
defer dht.smlk.Unlock()
ms, ok := dht.strmap[p]
if !ok {
// Unmatched disconnects are fine. It just means that we were
// already connected when we registered the listener.
return
}
conn.refcount -= 1
if conn.refcount == 0 {
delete(nn.peers, v.RemotePeer())
conn.cancel()
dht.routingTable.Remove(v.RemotePeer())
}
delete(dht.strmap, p)

// Do this asynchronously as ms.lk can block for a while.
go func() {
ms.lk.Lock()
defer ms.lk.Unlock()
ms.invalidate()
}()
}

func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {}
Expand Down