Skip to content

Commit

Permalink
handle error cases on streams
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <why@ipfs.io>
  • Loading branch information
whyrusleeping committed Jun 8, 2016
1 parent bda3034 commit 970cc79
Showing 1 changed file with 26 additions and 17 deletions.
43 changes: 26 additions & 17 deletions exchange/bitswap/wantmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,6 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
mq.out = fullwantlist
mq.work <- struct{}{}

s, err := pm.network.NewMessageSender(pm.ctx, p)
if err != nil {
log.Error("error opening stream to peer: ", err)
return nil
}

mq.sender = s

pm.peers[p] = mq
go mq.runQueue(pm.ctx)
return mq
Expand All @@ -163,7 +155,11 @@ func (pm *WantManager) stopPeerHandler(p peer.ID) {
}

func (mq *msgQueue) runQueue(ctx context.Context) {
defer mq.sender.Close()
defer func() {
if mq.sender != nil {
mq.sender.Close()
}
}()
for {
select {
case <-mq.work: // there is work to be done
Expand All @@ -180,14 +176,25 @@ func (mq *msgQueue) doWork(ctx context.Context) {
// allow ten minutes for connections
// this includes looking them up in the dht
// dialing them, and handshaking
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()
if mq.sender == nil {
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()

err := mq.network.ConnectTo(conctx, mq.p)
if err != nil {
log.Infof("cant connect to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
return
}

err := mq.network.ConnectTo(conctx, mq.p)
if err != nil {
log.Infof("cant connect to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
return
nsender, err := mq.network.NewMessageSender(ctx, mq.p)
if err != nil {
log.Infof("cant open new stream to peer %s: %s", mq.p, err)
// TODO: cant open stream, what now?
return
}

mq.sender = nsender
}

// grab outgoing message
Expand All @@ -201,9 +208,11 @@ func (mq *msgQueue) doWork(ctx context.Context) {
mq.outlk.Unlock()

// send wantlist updates
err = mq.sender.SendMsg(wlm)
err := mq.sender.SendMsg(wlm)
if err != nil {
log.Infof("bitswap send error: %s", err)
mq.sender.Close()
mq.sender = nil
// TODO: what do we do if this fails?
return
}
Expand Down

0 comments on commit 970cc79

Please sign in to comment.