From 6cd7b040b59695e6b4cac91461eb43eb9b1de5a1 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Sat, 12 Aug 2017 22:38:09 +0200 Subject: [PATCH] fix: add deadlines to write and read requests Prevents buildup in case of non-responding peers --- dht_net.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/dht_net.go b/dht_net.go index c2f38682c..d657cd043 100644 --- a/dht_net.go +++ b/dht_net.go @@ -13,7 +13,7 @@ import ( peer "github.com/libp2p/go-libp2p-peer" ) -var dhtReadMessageTimeout = time.Minute +var dhtMessageTimeout = 1 * time.Minute var ErrReadTimeout = fmt.Errorf("timed out reading response") // handleNewStream implements the inet.StreamHandler @@ -34,6 +34,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) { for { // receive msg pmes := new(pb.Message) + s.SetReadDeadline(time.Now().Add(dhtMessageTimeout)) if err := r.ReadMsg(pmes); err != nil { log.Debugf("Error unmarshaling data: %s", err) return @@ -63,6 +64,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) { } // send out response msg + s.SetWriteDeadline(time.Now().Add(dhtMessageTimeout)) if err := w.WriteMsg(rpmes); err != nil { log.Debugf("send response error: %s", err) return @@ -145,7 +147,6 @@ func (ms *messageSender) prep() error { if err != nil { return err } - ms.r = ggio.NewDelimitedReader(nstr, inet.MessageSizeMax) ms.w = ggio.NewDelimitedWriter(nstr) ms.s = nstr @@ -178,6 +179,7 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro } func (ms *messageSender) writeMessage(pmes *pb.Message) error { + ms.s.SetWriteDeadline(time.Now().Add(dhtMessageTimeout)) err := ms.w.WriteMsg(pmes) if err != nil { // If the other side isnt expecting us to be reusing streams, we're gonna @@ -191,6 +193,7 @@ func (ms *messageSender) writeMessage(pmes *pb.Message) error { return err } + ms.s.SetWriteDeadline(time.Now().Add(dhtMessageTimeout)) if err := ms.w.WriteMsg(pmes); err != nil { return err } @@ -233,10 +236,11 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error { errc := make(chan error, 1) go func(r ggio.ReadCloser) { + ms.s.SetReadDeadline(time.Now().Add(dhtMessageTimeout)) errc <- r.ReadMsg(mes) }(ms.r) - t := time.NewTimer(dhtReadMessageTimeout) + t := time.NewTimer(dhtMessageTimeout) defer t.Stop() select {