Skip to content

Commit

Permalink
send cancel async (#245)
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 authored Aug 27, 2021
1 parent 6869eb1 commit 5493514
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 17 deletions.
30 changes: 15 additions & 15 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-cid"
Expand All @@ -25,6 +26,7 @@ import (
)

var log = logging.Logger("dt-impl")
var cancelSendTimeout = 30 * time.Second

type manager struct {
dataTransferNetwork network.DataTransferNetwork
Expand Down Expand Up @@ -294,24 +296,22 @@ func (m *manager) CloseDataTransferChannel(ctx context.Context, chid datatransfe
log.Warnf("unable to close channel %s: %s", chid, err)
}

// Send a cancel message to the remote peer
log.Infof("%s: sending cancel channel to %s for channel %s", m.peerID, chst.OtherPeer(), chid)
err = m.dataTransferNetwork.SendMessage(ctx, chst.OtherPeer(), m.cancelMessage(chid))
if err != nil {
err = fmt.Errorf("unable to send cancel message for channel %s to peer %s: %w",
chid, m.peerID, err)
_ = m.OnRequestDisconnected(chid, err)
log.Warn(err)
}
// Send a cancel message to the remote peer async
go func() {
sctx, cancel := context.WithTimeout(context.Background(), cancelSendTimeout)
defer cancel()
log.Infof("%s: sending cancel channel to %s for channel %s", m.peerID, chst.OtherPeer(), chid)
err = m.dataTransferNetwork.SendMessage(sctx, chst.OtherPeer(), m.cancelMessage(chid))
if err != nil {
err = fmt.Errorf("unable to send cancel message for channel %s to peer %s: %w",
chid, m.peerID, err)
_ = m.OnRequestDisconnected(chid, err)
log.Warn(err)
}
}()

// Fire a cancel event
fsmerr := m.channels.Cancel(chid)
// If it wasn't possible to send a cancel message to the peer, return
// that error
if err != nil {
return err
}
// If it wasn't possible to fire a cancel event, return that error
if fsmerr != nil {
return xerrors.Errorf("unable to send cancel to channel FSM: %w", fsmerr)
}
Expand Down
11 changes: 9 additions & 2 deletions impl/initiating_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ func TestDataTransferInitiating(t *testing.T) {
require.NoError(t, err)
require.Len(t, h.transport.ClosedChannels, 1)
require.Equal(t, h.transport.ClosedChannels[0], channelID)
require.Len(t, h.network.SentMessages, 2)

require.Eventually(t, func() bool {
return len(h.network.SentMessages) == 2
}, 5*time.Second, 200*time.Millisecond)
cancelMessage := h.network.SentMessages[1].Message
require.False(t, cancelMessage.IsUpdate())
require.False(t, cancelMessage.IsPaused())
Expand Down Expand Up @@ -266,7 +269,11 @@ func TestDataTransferInitiating(t *testing.T) {
require.NoError(t, err)
require.Len(t, h.transport.ClosedChannels, 1)
require.Equal(t, h.transport.ClosedChannels[0], channelID)
require.Len(t, h.network.SentMessages, 1)

require.Eventually(t, func() bool {
return len(h.network.SentMessages) == 1
}, 5*time.Second, 200*time.Millisecond)

cancelMessage := h.network.SentMessages[0].Message
require.False(t, cancelMessage.IsUpdate())
require.False(t, cancelMessage.IsPaused())
Expand Down

0 comments on commit 5493514

Please sign in to comment.