From 970cc79e0fec21c6754ac9dcd1f6032b3f162a31 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 6 Jun 2016 15:45:58 -0700 Subject: [PATCH] handle error cases on streams License: MIT Signed-off-by: Jeromy --- exchange/bitswap/wantmanager.go | 43 ++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index e3714be9b0b8..24fd75c1e41a 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -133,14 +133,6 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue { mq.out = fullwantlist mq.work <- struct{}{} - s, err := pm.network.NewMessageSender(pm.ctx, p) - if err != nil { - log.Error("error opening stream to peer: ", err) - return nil - } - - mq.sender = s - pm.peers[p] = mq go mq.runQueue(pm.ctx) return mq @@ -163,7 +155,11 @@ func (pm *WantManager) stopPeerHandler(p peer.ID) { } func (mq *msgQueue) runQueue(ctx context.Context) { - defer mq.sender.Close() + defer func() { + if mq.sender != nil { + mq.sender.Close() + } + }() for { select { case <-mq.work: // there is work to be done @@ -180,14 +176,25 @@ func (mq *msgQueue) doWork(ctx context.Context) { // allow ten minutes for connections // this includes looking them up in the dht // dialing them, and handshaking - conctx, cancel := context.WithTimeout(ctx, time.Minute*10) - defer cancel() + if mq.sender == nil { + conctx, cancel := context.WithTimeout(ctx, time.Minute*10) + defer cancel() + + err := mq.network.ConnectTo(conctx, mq.p) + if err != nil { + log.Infof("cant connect to peer %s: %s", mq.p, err) + // TODO: cant connect, what now? + return + } - err := mq.network.ConnectTo(conctx, mq.p) - if err != nil { - log.Infof("cant connect to peer %s: %s", mq.p, err) - // TODO: cant connect, what now? - return + nsender, err := mq.network.NewMessageSender(ctx, mq.p) + if err != nil { + log.Infof("cant open new stream to peer %s: %s", mq.p, err) + // TODO: cant open stream, what now? + return + } + + mq.sender = nsender } // grab outgoing message @@ -201,9 +208,11 @@ func (mq *msgQueue) doWork(ctx context.Context) { mq.outlk.Unlock() // send wantlist updates - err = mq.sender.SendMsg(wlm) + err := mq.sender.SendMsg(wlm) if err != nil { log.Infof("bitswap send error: %s", err) + mq.sender.Close() + mq.sender = nil // TODO: what do we do if this fails? return }