Skip to content

Commit 3ed5718

Browse files
committed
fix: push channel monitor watch for accept and complete events
1 parent 25979d5 commit 3ed5718

File tree

4 files changed

+340
-58
lines changed

4 files changed

+340
-58
lines changed

impl/impl.go

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -93,29 +93,9 @@ func ChannelRemoveTimeout(timeout time.Duration) DataTransferOption {
9393

9494
// PushChannelRestartConfig sets the configuration options for automatically
9595
// restarting push channels
96-
// - interval is the time over which minBytesSent must have been sent
97-
// - checksPerInterval is the number of times to check per interval
98-
// - minBytesSent is the minimum amount of data that must have been sent over
99-
// the interval
100-
// - restartBackoff is the time to wait before checking again for restarts
101-
// - maxConsecutiveRestarts is the maximum number of restarts in a row to
102-
// attempt where no data is transferred. When the limit is reached the
103-
// channel is closed.
104-
func PushChannelRestartConfig(
105-
interval time.Duration,
106-
checksPerInterval uint32,
107-
minBytesSent uint64,
108-
restartBackoff time.Duration,
109-
maxConsecutiveRestarts uint32,
110-
) DataTransferOption {
96+
func PushChannelRestartConfig(cfg pushchannelmonitor.Config) DataTransferOption {
11197
return func(m *manager) {
112-
m.pushChannelMonitorCfg = &pushchannelmonitor.Config{
113-
Interval: interval,
114-
ChecksPerInterval: checksPerInterval,
115-
MinBytesSent: minBytesSent,
116-
RestartBackoff: restartBackoff,
117-
MaxConsecutiveRestarts: maxConsecutiveRestarts,
118-
}
98+
m.pushChannelMonitorCfg = &cfg
11999
}
120100
}
121101

@@ -318,30 +298,74 @@ func (m *manager) CloseDataTransferChannel(ctx context.Context, chid datatransfe
318298
if err != nil {
319299
return err
320300
}
301+
302+
// Close the channel on the local transport
321303
err = m.transport.CloseChannel(ctx, chid)
322304
if err != nil {
323305
log.Warnf("unable to close channel %s: %s", chid, err)
324306
}
325307

326-
log.Infof("%s: sending close channel to %s for channel %s", m.peerID, chst.OtherPeer(), chid)
308+
// Send a cancel message to the remote peer
309+
log.Infof("%s: sending cancel channel to %s for channel %s", m.peerID, chst.OtherPeer(), chid)
327310
err = m.dataTransferNetwork.SendMessage(ctx, chst.OtherPeer(), m.cancelMessage(chid))
328311
if err != nil {
329-
err = fmt.Errorf("Unable to send cancel message: %w", err)
312+
err = fmt.Errorf("unable to send cancel message for channel %s to peer %s: %w",
313+
chid, m.peerID, err)
330314
_ = m.OnRequestDisconnected(ctx, chid)
331315
log.Warn(err)
332316
}
333317

318+
// Fire a cancel event
334319
fsmerr := m.channels.Cancel(chid)
320+
// If it wasn't possible to send a cancel message to the peer, return
321+
// that error
335322
if err != nil {
336323
return err
337324
}
325+
// If it wasn't possible to fire a cancel event, return that error
338326
if fsmerr != nil {
339327
return xerrors.Errorf("unable to send cancel to channel FSM: %w", fsmerr)
340328
}
341329

342330
return nil
343331
}
344332

333+
// close an open channel (effectively a cancel)
334+
func (m *manager) CloseDataTransferChannelWithError(ctx context.Context, chid datatransfer.ChannelID, cherr error) error {
335+
log.Infof("close channel %s with error %s", chid, cherr)
336+
337+
chst, err := m.channels.GetByID(ctx, chid)
338+
if err != nil {
339+
return err
340+
}
341+
342+
// Cancel the channel on the local transport
343+
err = m.transport.CloseChannel(ctx, chid)
344+
if err != nil {
345+
log.Warnf("unable to close channel %s: %s", chid, err)
346+
}
347+
348+
// Try to send a cancel message to the remote peer. It's quite likely
349+
// we aren't able to send the message to the peer because the channel
350+
// is already in an error state, which is probably because of connection
351+
// issues, so if we cant send the message just log a warning.
352+
log.Infof("%s: sending cancel channel to %s for channel %s", m.peerID, chst.OtherPeer(), chid)
353+
err = m.dataTransferNetwork.SendMessage(ctx, chst.OtherPeer(), m.cancelMessage(chid))
354+
if err != nil {
355+
err = fmt.Errorf("unable to send cancel message for channel %s to peer %s: %w",
356+
chid, m.peerID, err)
357+
log.Warn(err)
358+
}
359+
360+
// Fire an error event
361+
err = m.channels.Error(chid, cherr)
362+
if err != nil {
363+
return xerrors.Errorf("unable to send error to channel FSM: %w", err)
364+
}
365+
366+
return nil
367+
}
368+
345369
// pause a running data transfer channel
346370
func (m *manager) PauseDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error {
347371
log.Infof("pause channel %s", chid)

impl/integration_test.go

Lines changed: 71 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ import (
88
"testing"
99
"time"
1010

11+
"github.com/filecoin-project/go-data-transfer/pushchannelmonitor"
12+
13+
logging "github.com/ipfs/go-log/v2"
14+
1115
"github.com/ipfs/go-blockservice"
1216
"github.com/ipfs/go-datastore"
1317
"github.com/ipfs/go-datastore/namespace"
@@ -526,33 +530,37 @@ func (dc *disconnectCoordinator) onDisconnect() {
526530
// TestPushRequestAutoRestart tests that if the connection for a push request
527531
// goes down, it will automatically restart (given the right config options)
528532
func TestPushRequestAutoRestart(t *testing.T) {
533+
//logging.SetLogLevel("dt-pushchanmon", "debug")
534+
529535
testCases := []struct {
530536
name string
537+
expectInitiatorDTFail bool
531538
disconnectOnRequestComplete bool
532539
registerResponder func(responder datatransfer.Manager, dc *disconnectCoordinator)
533540
}{{
534-
// Test what happens when the disconnect happens right when the
535-
// responder receives the open channel request (ie the responder
536-
// doesn't get a chance to respond to the open channel request)
537-
name: "when responder receives incoming request",
541+
// Verify that the client fires an error event when the disconnect
542+
// occurs right when the responder receives the open channel request
543+
// (ie the responder doesn't get a chance to respond to the open
544+
// channel request)
545+
name: "when responder receives incoming request",
546+
expectInitiatorDTFail: true,
538547
registerResponder: func(responder datatransfer.Manager, dc *disconnectCoordinator) {
539548
subscriber := func(event datatransfer.Event, channelState datatransfer.ChannelState) {
540-
t.Logf("%s: %s\n", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()])
541-
542549
if event.Code == datatransfer.Open {
543550
dc.signalReadyForDisconnect(true)
544551
}
545552
}
546553
responder.SubscribeToEvents(subscriber)
547554
},
548555
}, {
549-
// Test what happens when the disconnect happens right after the
550-
// responder receives the first block
556+
// Verify that if a disconnect happens right after the responder
557+
// receives the first block, the transfer will complete automatically
558+
// when the link comes back up
551559
name: "when responder receives first block",
552560
registerResponder: func(responder datatransfer.Manager, dc *disconnectCoordinator) {
553561
rcvdCount := 0
554562
subscriber := func(event datatransfer.Event, channelState datatransfer.ChannelState) {
555-
t.Logf("%s: %s\n", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()])
563+
//t.Logf("resp: %s / %s\n", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()])
556564
if event.Code == datatransfer.DataReceived {
557565
rcvdCount++
558566
if rcvdCount == 1 {
@@ -563,11 +571,12 @@ func TestPushRequestAutoRestart(t *testing.T) {
563571
responder.SubscribeToEvents(subscriber)
564572
},
565573
}, {
566-
// Test what happens when the disconnect happens right before the
567-
// requester sends the complete message (ie all blocks have been
568-
// received but the responder doesn't get a chance to tell
574+
// Verify that the client fires an error event when disconnect occurs
575+
// right before the responder sends the complete message (ie all blocks
576+
// have been received but the responder doesn't get a chance to tell
569577
// the initiator before the disconnect)
570578
name: "before requester sends complete message",
579+
expectInitiatorDTFail: true,
571580
disconnectOnRequestComplete: true,
572581
}}
573582
for _, tc := range testCases {
@@ -579,8 +588,8 @@ func TestPushRequestAutoRestart(t *testing.T) {
579588
// Create an object to coordinate disconnect events
580589
dc := newDisconnectCoordinator()
581590

582-
// If the test should disconnect before the request is complete,
583-
// add a hook to do so
591+
// If the test should disconnect just before the responder sends
592+
// the Complete message, add a hook to do so
584593
var responderTransportOpts []tp.Option
585594
if tc.disconnectOnRequestComplete {
586595
responderTransportOpts = []tp.Option{
@@ -599,14 +608,27 @@ func TestPushRequestAutoRestart(t *testing.T) {
599608
initiatorGSTspt := gsData.SetupGSTransportHost1()
600609
responderGSTspt := gsData.SetupGSTransportHost2(responderTransportOpts...)
601610

602-
restartConf := PushChannelRestartConfig(100*time.Millisecond, 1, 10, 200*time.Millisecond, 5)
611+
restartConf := PushChannelRestartConfig(pushchannelmonitor.Config{
612+
AcceptTimeout: 100 * time.Millisecond,
613+
Interval: 100 * time.Millisecond,
614+
MinBytesSent: 1,
615+
ChecksPerInterval: 10,
616+
RestartBackoff: 200 * time.Millisecond,
617+
MaxConsecutiveRestarts: 5,
618+
CompleteTimeout: 100 * time.Millisecond,
619+
})
603620
initiator, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, initiatorGSTspt, gsData.StoredCounter1, restartConf)
604621
require.NoError(t, err)
605622
testutil.StartAndWaitForReady(ctx, t, initiator)
606623
responder, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, responderGSTspt, gsData.StoredCounter2)
607624
require.NoError(t, err)
608625
testutil.StartAndWaitForReady(ctx, t, responder)
609626

627+
//initiator.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
628+
// t.Logf("clnt: evt %s / status %s", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()])
629+
//})
630+
631+
// Watch for successful completion
610632
finished := make(chan struct{}, 2)
611633
var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) {
612634
if channelState.Status() == datatransfer.Completed {
@@ -633,6 +655,16 @@ func TestPushRequestAutoRestart(t *testing.T) {
633655
tc.registerResponder(responder, dc)
634656
}
635657

658+
// If the initiator is expected to fail, watch for the Failed event
659+
initiatorFailed := make(chan struct{})
660+
if tc.expectInitiatorDTFail {
661+
initiator.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
662+
if channelState.Status() == datatransfer.Failed {
663+
close(initiatorFailed)
664+
}
665+
})
666+
}
667+
636668
// Open a push channel
637669
chid, err := initiator.OpenPushDataChannel(ctx, host2.ID(), &voucher, rootCid, gsData.AllSelector)
638670
require.NoError(t, err)
@@ -655,14 +687,25 @@ func TestPushRequestAutoRestart(t *testing.T) {
655687
t.Logf("Sleep for a second")
656688
time.Sleep(1 * time.Second)
657689

658-
// Restore connection
659-
t.Logf("Restore connection")
690+
// Restore link
691+
t.Logf("Restore link")
660692
require.NoError(t, gsData.Mn.LinkAll())
661693
time.Sleep(200 * time.Millisecond)
662-
conn, err := gsData.Mn.ConnectPeers(host1.ID(), host2.ID())
663-
require.NoError(t, err)
664-
require.NotNil(t, conn)
665694

695+
// If we're expecting a Failed event, verify that it occurs
696+
if tc.expectInitiatorDTFail {
697+
select {
698+
case <-ctx.Done():
699+
t.Fatal("Initiator data-transfer did not fail as expected")
700+
return
701+
case <-initiatorFailed:
702+
t.Logf("Initiator data-transfer failed as expected")
703+
return
704+
}
705+
}
706+
707+
// We're not expecting a failure event, wait for the transfer to
708+
// complete
666709
t.Logf("Waiting for auto-restart on push channel %s", chid)
667710

668711
(func() {
@@ -1584,3 +1627,11 @@ func (r *receiver) ReceiveRestartExistingChannelRequest(ctx context.Context,
15841627
incoming datatransfer.Request) {
15851628

15861629
}
1630+
1631+
func SetDTLogLevelDebug() {
1632+
logging.SetLogLevel("dt-impl", "debug")
1633+
logging.SetLogLevel("dt-pushchanmon", "debug")
1634+
logging.SetLogLevel("dt_graphsync", "debug")
1635+
logging.SetLogLevel("data_transfer", "debug")
1636+
logging.SetLogLevel("data_transfer_network", "debug")
1637+
}

0 commit comments

Comments
 (0)