Skip to content

Commit 740baee

Browse files
committed
fix: push channel monitor watch for accept and complete events
1 parent 20d3d75 commit 740baee

File tree

4 files changed

+338
-58
lines changed

4 files changed

+338
-58
lines changed

impl/impl.go

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -93,29 +93,9 @@ func ChannelRemoveTimeout(timeout time.Duration) DataTransferOption {
9393

9494
// PushChannelRestartConfig sets the configuration options for automatically
9595
// restarting push channels
96-
// - interval is the time over which minBytesSent must have been sent
97-
// - checksPerInterval is the number of times to check per interval
98-
// - minBytesSent is the minimum amount of data that must have been sent over
99-
// the interval
100-
// - restartBackoff is the time to wait before checking again for restarts
101-
// - maxConsecutiveRestarts is the maximum number of restarts in a row to
102-
// attempt where no data is transferred. When the limit is reached the
103-
// channel is closed.
104-
func PushChannelRestartConfig(
105-
interval time.Duration,
106-
checksPerInterval uint32,
107-
minBytesSent uint64,
108-
restartBackoff time.Duration,
109-
maxConsecutiveRestarts uint32,
110-
) DataTransferOption {
96+
func PushChannelRestartConfig(cfg pushchannelmonitor.Config) DataTransferOption {
11197
return func(m *manager) {
112-
m.pushChannelMonitorCfg = &pushchannelmonitor.Config{
113-
Interval: interval,
114-
ChecksPerInterval: checksPerInterval,
115-
MinBytesSent: minBytesSent,
116-
RestartBackoff: restartBackoff,
117-
MaxConsecutiveRestarts: maxConsecutiveRestarts,
118-
}
98+
m.pushChannelMonitorCfg = &cfg
11999
}
120100
}
121101

@@ -318,30 +298,76 @@ func (m *manager) CloseDataTransferChannel(ctx context.Context, chid datatransfe
318298
if err != nil {
319299
return err
320300
}
301+
302+
// Close the channel on the local transport
321303
err = m.transport.CloseChannel(ctx, chid)
322304
if err != nil {
323305
log.Warnf("unable to close channel %s: %s", chid, err)
324306
}
325307

326-
log.Infof("%s: sending close channel to %s for channel %s", m.peerID, chst.OtherPeer(), chid)
308+
// Send a cancel message to the remote peer
309+
log.Infof("%s: sending cancel channel to %s for channel %s", m.peerID, chst.OtherPeer(), chid)
327310
err = m.dataTransferNetwork.SendMessage(ctx, chst.OtherPeer(), m.cancelMessage(chid))
328311
if err != nil {
329-
err = fmt.Errorf("Unable to send cancel message: %w", err)
312+
err = fmt.Errorf("unable to send cancel message for channel %s to peer %s: %w",
313+
chid, m.peerID, err)
330314
_ = m.OnRequestDisconnected(ctx, chid)
331315
log.Warn(err)
332316
}
333317

318+
// Fire a cancel event
334319
fsmerr := m.channels.Cancel(chid)
320+
// If it wasn't possible to send a cancel message to the peer, return
321+
// that error
335322
if err != nil {
336323
return err
337324
}
325+
// If it wasn't possible to fire a cancel event, return that error
338326
if fsmerr != nil {
339327
return xerrors.Errorf("unable to send cancel to channel FSM: %w", fsmerr)
340328
}
341329

342330
return nil
343331
}
344332

333+
// close an open channel and fire an error event
334+
func (m *manager) CloseDataTransferChannelWithError(ctx context.Context, chid datatransfer.ChannelID, cherr error) error {
335+
log.Infof("close channel %s with error %s", chid, cherr)
336+
337+
chst, err := m.channels.GetByID(ctx, chid)
338+
if err != nil {
339+
return err
340+
}
341+
342+
// Cancel the channel on the local transport
343+
err = m.transport.CloseChannel(ctx, chid)
344+
if err != nil {
345+
log.Warnf("unable to close channel %s: %s", chid, err)
346+
}
347+
348+
// Try to send a cancel message to the remote peer. It's quite likely
349+
// we aren't able to send the message to the peer because the channel
350+
// is already in an error state, which is probably because of connection
351+
// issues, so if we cant send the message just log a warning.
352+
log.Infof("%s: sending cancel channel to %s for channel %s", m.peerID, chst.OtherPeer(), chid)
353+
err = m.dataTransferNetwork.SendMessage(ctx, chst.OtherPeer(), m.cancelMessage(chid))
354+
if err != nil {
355+
// Just log a warning here because it's important that we fire the
356+
// error event with the original error so that it doesn't get masked
357+
// by subsequent errors.
358+
log.Warnf("unable to send cancel message for channel %s to peer %s: %w",
359+
chid, m.peerID, err)
360+
}
361+
362+
// Fire an error event
363+
err = m.channels.Error(chid, cherr)
364+
if err != nil {
365+
return xerrors.Errorf("unable to send error %s to channel FSM: %w", cherr, err)
366+
}
367+
368+
return nil
369+
}
370+
345371
// pause a running data transfer channel
346372
func (m *manager) PauseDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error {
347373
log.Infof("pause channel %s", chid)

impl/integration_test.go

Lines changed: 68 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
. "github.com/filecoin-project/go-data-transfer/impl"
3737
"github.com/filecoin-project/go-data-transfer/message"
3838
"github.com/filecoin-project/go-data-transfer/network"
39+
"github.com/filecoin-project/go-data-transfer/pushchannelmonitor"
3940
"github.com/filecoin-project/go-data-transfer/testutil"
4041
tp "github.com/filecoin-project/go-data-transfer/transport/graphsync"
4142
"github.com/filecoin-project/go-data-transfer/transport/graphsync/extension"
@@ -526,33 +527,37 @@ func (dc *disconnectCoordinator) onDisconnect() {
526527
// TestPushRequestAutoRestart tests that if the connection for a push request
527528
// goes down, it will automatically restart (given the right config options)
528529
func TestPushRequestAutoRestart(t *testing.T) {
530+
//logging.SetLogLevel("dt-pushchanmon", "debug")
531+
529532
testCases := []struct {
530533
name string
534+
expectInitiatorDTFail bool
531535
disconnectOnRequestComplete bool
532536
registerResponder func(responder datatransfer.Manager, dc *disconnectCoordinator)
533537
}{{
534-
// Test what happens when the disconnect happens right when the
535-
// responder receives the open channel request (ie the responder
536-
// doesn't get a chance to respond to the open channel request)
537-
name: "when responder receives incoming request",
538+
// Verify that the client fires an error event when the disconnect
539+
// occurs right when the responder receives the open channel request
540+
// (ie the responder doesn't get a chance to respond to the open
541+
// channel request)
542+
name: "when responder receives incoming request",
543+
expectInitiatorDTFail: true,
538544
registerResponder: func(responder datatransfer.Manager, dc *disconnectCoordinator) {
539545
subscriber := func(event datatransfer.Event, channelState datatransfer.ChannelState) {
540-
t.Logf("%s: %s\n", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()])
541-
542546
if event.Code == datatransfer.Open {
543547
dc.signalReadyForDisconnect(true)
544548
}
545549
}
546550
responder.SubscribeToEvents(subscriber)
547551
},
548552
}, {
549-
// Test what happens when the disconnect happens right after the
550-
// responder receives the first block
553+
// Verify that if a disconnect happens right after the responder
554+
// receives the first block, the transfer will complete automatically
555+
// when the link comes back up
551556
name: "when responder receives first block",
552557
registerResponder: func(responder datatransfer.Manager, dc *disconnectCoordinator) {
553558
rcvdCount := 0
554559
subscriber := func(event datatransfer.Event, channelState datatransfer.ChannelState) {
555-
t.Logf("%s: %s\n", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()])
560+
//t.Logf("resp: %s / %s\n", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()])
556561
if event.Code == datatransfer.DataReceived {
557562
rcvdCount++
558563
if rcvdCount == 1 {
@@ -563,11 +568,12 @@ func TestPushRequestAutoRestart(t *testing.T) {
563568
responder.SubscribeToEvents(subscriber)
564569
},
565570
}, {
566-
// Test what happens when the disconnect happens right before the
567-
// requester sends the complete message (ie all blocks have been
568-
// received but the responder doesn't get a chance to tell
571+
// Verify that the client fires an error event when disconnect occurs
572+
// right before the responder sends the complete message (ie all blocks
573+
// have been received but the responder doesn't get a chance to tell
569574
// the initiator before the disconnect)
570575
name: "before requester sends complete message",
576+
expectInitiatorDTFail: true,
571577
disconnectOnRequestComplete: true,
572578
}}
573579
for _, tc := range testCases {
@@ -579,8 +585,8 @@ func TestPushRequestAutoRestart(t *testing.T) {
579585
// Create an object to coordinate disconnect events
580586
dc := newDisconnectCoordinator()
581587

582-
// If the test should disconnect before the request is complete,
583-
// add a hook to do so
588+
// If the test should disconnect just before the responder sends
589+
// the Complete message, add a hook to do so
584590
var responderTransportOpts []tp.Option
585591
if tc.disconnectOnRequestComplete {
586592
responderTransportOpts = []tp.Option{
@@ -599,14 +605,27 @@ func TestPushRequestAutoRestart(t *testing.T) {
599605
initiatorGSTspt := gsData.SetupGSTransportHost1()
600606
responderGSTspt := gsData.SetupGSTransportHost2(responderTransportOpts...)
601607

602-
restartConf := PushChannelRestartConfig(100*time.Millisecond, 1, 10, 200*time.Millisecond, 5)
608+
restartConf := PushChannelRestartConfig(pushchannelmonitor.Config{
609+
AcceptTimeout: 100 * time.Millisecond,
610+
Interval: 100 * time.Millisecond,
611+
MinBytesSent: 1,
612+
ChecksPerInterval: 10,
613+
RestartBackoff: 200 * time.Millisecond,
614+
MaxConsecutiveRestarts: 5,
615+
CompleteTimeout: 100 * time.Millisecond,
616+
})
603617
initiator, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, initiatorGSTspt, gsData.StoredCounter1, restartConf)
604618
require.NoError(t, err)
605619
testutil.StartAndWaitForReady(ctx, t, initiator)
606620
responder, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, responderGSTspt, gsData.StoredCounter2)
607621
require.NoError(t, err)
608622
testutil.StartAndWaitForReady(ctx, t, responder)
609623

624+
//initiator.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
625+
// t.Logf("clnt: evt %s / status %s", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()])
626+
//})
627+
628+
// Watch for successful completion
610629
finished := make(chan struct{}, 2)
611630
var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) {
612631
if channelState.Status() == datatransfer.Completed {
@@ -633,6 +652,16 @@ func TestPushRequestAutoRestart(t *testing.T) {
633652
tc.registerResponder(responder, dc)
634653
}
635654

655+
// If the initiator is expected to fail, watch for the Failed event
656+
initiatorFailed := make(chan struct{})
657+
if tc.expectInitiatorDTFail {
658+
initiator.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
659+
if channelState.Status() == datatransfer.Failed {
660+
close(initiatorFailed)
661+
}
662+
})
663+
}
664+
636665
// Open a push channel
637666
chid, err := initiator.OpenPushDataChannel(ctx, host2.ID(), &voucher, rootCid, gsData.AllSelector)
638667
require.NoError(t, err)
@@ -655,14 +684,25 @@ func TestPushRequestAutoRestart(t *testing.T) {
655684
t.Logf("Sleep for a second")
656685
time.Sleep(1 * time.Second)
657686

658-
// Restore connection
659-
t.Logf("Restore connection")
687+
// Restore link
688+
t.Logf("Restore link")
660689
require.NoError(t, gsData.Mn.LinkAll())
661690
time.Sleep(200 * time.Millisecond)
662-
conn, err := gsData.Mn.ConnectPeers(host1.ID(), host2.ID())
663-
require.NoError(t, err)
664-
require.NotNil(t, conn)
665691

692+
// If we're expecting a Failed event, verify that it occurs
693+
if tc.expectInitiatorDTFail {
694+
select {
695+
case <-ctx.Done():
696+
t.Fatal("Initiator data-transfer did not fail as expected")
697+
return
698+
case <-initiatorFailed:
699+
t.Logf("Initiator data-transfer failed as expected")
700+
return
701+
}
702+
}
703+
704+
// We're not expecting a failure event, wait for the transfer to
705+
// complete
666706
t.Logf("Waiting for auto-restart on push channel %s", chid)
667707

668708
(func() {
@@ -1569,3 +1609,11 @@ func (r *receiver) ReceiveRestartExistingChannelRequest(ctx context.Context,
15691609
incoming datatransfer.Request) {
15701610

15711611
}
1612+
1613+
//func SetDTLogLevelDebug() {
1614+
// _ = logging.SetLogLevel("dt-impl", "debug")
1615+
// _ = logging.SetLogLevel("dt-pushchanmon", "debug")
1616+
// _ = logging.SetLogLevel("dt_graphsync", "debug")
1617+
// _ = logging.SetLogLevel("data_transfer", "debug")
1618+
// _ = logging.SetLogLevel("data_transfer_network", "debug")
1619+
//}

0 commit comments

Comments
 (0)