Skip to content

Commit c875cbe

Browse files
authored
feat: better reconnect behaviour (#162)
1 parent 42e0a5b commit c875cbe

File tree

15 files changed

+867
-577
lines changed

15 files changed

+867
-577
lines changed

pushchannelmonitor/pushchannelmonitor.go renamed to channelmonitor/channelmonitor.go

Lines changed: 251 additions & 102 deletions
Large diffs are not rendered by default.

pushchannelmonitor/pushchannelmonitor_test.go renamed to channelmonitor/channelmonitor_test.go

Lines changed: 334 additions & 167 deletions
Large diffs are not rendered by default.

channels/channels.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,8 +315,22 @@ func (c *Channels) Error(chid datatransfer.ChannelID, err error) error {
315315
return c.send(chid, datatransfer.Error, err)
316316
}
317317

318-
func (c *Channels) Disconnected(chid datatransfer.ChannelID) error {
319-
return c.send(chid, datatransfer.Disconnected)
318+
// Disconnected indicates that the connection went down and it was not possible
319+
// to restart it
320+
func (c *Channels) Disconnected(chid datatransfer.ChannelID, err error) error {
321+
return c.send(chid, datatransfer.Disconnected, err)
322+
}
323+
324+
// RequestTimedOut indicates that the transport layer had a timeout trying to
325+
// make a request
326+
func (c *Channels) RequestTimedOut(chid datatransfer.ChannelID, err error) error {
327+
return c.send(chid, datatransfer.RequestTimedOut, err)
328+
}
329+
330+
// SendDataError indicates that the transport layer had an error trying
331+
// to send data to the remote peer
332+
func (c *Channels) SendDataError(chid datatransfer.ChannelID, err error) error {
333+
return c.send(chid, datatransfer.SendDataError, err)
320334
}
321335

322336
// HasChannel returns true if the given channel id is being tracked

channels/channels_fsm.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,12 @@ var transferringStates = []fsm.StateKey{
2424

2525
// ChannelEvents describe the events taht can
2626
var ChannelEvents = fsm.Events{
27+
// Open a channel
2728
fsm.Event(datatransfer.Open).FromAny().To(datatransfer.Requested),
29+
30+
// Remote peer has accepted the Open channel request
2831
fsm.Event(datatransfer.Accept).From(datatransfer.Requested).To(datatransfer.Ongoing),
32+
2933
fsm.Event(datatransfer.Restart).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
3034
chst.Message = ""
3135
return nil
@@ -52,15 +56,27 @@ var ChannelEvents = fsm.Events{
5256
chst.Queued += delta
5357
return nil
5458
}),
55-
fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
56-
chst.Message = datatransfer.ErrDisconnected.Error()
59+
60+
fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
61+
chst.Message = err.Error()
62+
return nil
63+
}),
64+
65+
fsm.Event(datatransfer.SendDataError).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
66+
chst.Message = err.Error()
67+
return nil
68+
}),
69+
70+
fsm.Event(datatransfer.RequestTimedOut).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
71+
chst.Message = err.Error()
5772
return nil
5873
}),
5974

6075
fsm.Event(datatransfer.Error).FromAny().To(datatransfer.Failing).Action(func(chst *internal.ChannelState, err error) error {
6176
chst.Message = err.Error()
6277
return nil
6378
}),
79+
6480
fsm.Event(datatransfer.NewVoucher).FromAny().ToNoChange().
6581
Action(func(chst *internal.ChannelState, vtype datatransfer.TypeIdentifier, voucherBytes []byte) error {
6682
chst.Vouchers = append(chst.Vouchers, internal.EncodedVoucher{Type: vtype, Voucher: &cbg.Deferred{Raw: voucherBytes}})
@@ -72,41 +88,54 @@ var ChannelEvents = fsm.Events{
7288
internal.EncodedVoucherResult{Type: vtype, VoucherResult: &cbg.Deferred{Raw: voucherResultBytes}})
7389
return nil
7490
}),
91+
7592
fsm.Event(datatransfer.PauseInitiator).
7693
FromMany(datatransfer.Requested, datatransfer.Ongoing).To(datatransfer.InitiatorPaused).
7794
From(datatransfer.ResponderPaused).To(datatransfer.BothPaused).
7895
FromAny().ToJustRecord(),
96+
7997
fsm.Event(datatransfer.PauseResponder).
8098
FromMany(datatransfer.Requested, datatransfer.Ongoing).To(datatransfer.ResponderPaused).
8199
From(datatransfer.InitiatorPaused).To(datatransfer.BothPaused).
82100
FromAny().ToJustRecord(),
101+
83102
fsm.Event(datatransfer.ResumeInitiator).
84103
From(datatransfer.InitiatorPaused).To(datatransfer.Ongoing).
85104
From(datatransfer.BothPaused).To(datatransfer.ResponderPaused).
86105
FromAny().ToJustRecord(),
106+
87107
fsm.Event(datatransfer.ResumeResponder).
88108
From(datatransfer.ResponderPaused).To(datatransfer.Ongoing).
89109
From(datatransfer.BothPaused).To(datatransfer.InitiatorPaused).
90110
From(datatransfer.Finalizing).To(datatransfer.Completing).
91111
FromAny().ToJustRecord(),
112+
113+
// The transfer has finished on the local node - all data was sent / received
92114
fsm.Event(datatransfer.FinishTransfer).
93115
FromAny().To(datatransfer.TransferFinished).
94116
FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
95117
From(datatransfer.ResponderCompleted).To(datatransfer.Completing).
96118
From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderFinalizingTransferFinished),
119+
97120
fsm.Event(datatransfer.ResponderBeginsFinalization).
98121
FromAny().To(datatransfer.ResponderFinalizing).
99122
FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
100123
From(datatransfer.TransferFinished).To(datatransfer.ResponderFinalizingTransferFinished),
124+
125+
// The remote peer sent a Complete message, meaning it has sent / received all data
101126
fsm.Event(datatransfer.ResponderCompletes).
102127
FromAny().To(datatransfer.ResponderCompleted).
103128
FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
104129
From(datatransfer.ResponderPaused).To(datatransfer.ResponderFinalizing).
105130
From(datatransfer.TransferFinished).To(datatransfer.Completing).
106131
From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderCompleted).
107132
From(datatransfer.ResponderFinalizingTransferFinished).To(datatransfer.Completing),
133+
108134
fsm.Event(datatransfer.BeginFinalizing).FromAny().To(datatransfer.Finalizing),
135+
136+
// Both the local node and the remote peer have completed the transfer
109137
fsm.Event(datatransfer.Complete).FromAny().To(datatransfer.Completing),
138+
110139
fsm.Event(datatransfer.CleanupComplete).
111140
From(datatransfer.Cancelling).To(datatransfer.Cancelled).
112141
From(datatransfer.Failing).To(datatransfer.Failed).

channels/channels_test.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/ipfs/go-cid"
1313
"github.com/ipfs/go-datastore"
14+
dss "github.com/ipfs/go-datastore/sync"
1415
"github.com/ipld/go-ipld-prime/codec/dagcbor"
1516
basicnode "github.com/ipld/go-ipld-prime/node/basic"
1617
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
@@ -37,7 +38,7 @@ func TestChannels(t *testing.T) {
3738
ctx := context.Background()
3839
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
3940
defer cancel()
40-
ds := datastore.NewMapDatastore()
41+
ds := dss.MutexWrap(datastore.NewMapDatastore())
4142
received := make(chan event)
4243
notifier := func(evt datatransfer.Event, chst datatransfer.ChannelState) {
4344
received <- event{evt, chst}
@@ -127,7 +128,7 @@ func TestChannels(t *testing.T) {
127128
})
128129

129130
t.Run("updating send/receive values", func(t *testing.T) {
130-
ds := datastore.NewMapDatastore()
131+
ds := dss.MutexWrap(datastore.NewMapDatastore())
131132
dir := os.TempDir()
132133
cidLists, err := cidlists.NewCIDLists(dir)
133134
require.NoError(t, err)
@@ -302,7 +303,7 @@ func TestChannels(t *testing.T) {
302303
})
303304

304305
t.Run("test disconnected", func(t *testing.T) {
305-
ds := datastore.NewMapDatastore()
306+
ds := dss.MutexWrap(datastore.NewMapDatastore())
306307
received := make(chan event)
307308
notifier := func(evt datatransfer.Event, chst datatransfer.ChannelState) {
308309
received <- event{evt, chst}
@@ -320,10 +321,11 @@ func TestChannels(t *testing.T) {
320321
state := checkEvent(ctx, t, received, datatransfer.Open)
321322
require.Equal(t, datatransfer.Requested, state.Status())
322323

323-
err = channelList.Disconnected(chid)
324+
disconnectErr := xerrors.Errorf("disconnected")
325+
err = channelList.Disconnected(chid, disconnectErr)
324326
require.NoError(t, err)
325327
state = checkEvent(ctx, t, received, datatransfer.Disconnected)
326-
require.Equal(t, datatransfer.ErrDisconnected.Error(), state.Message())
328+
require.Equal(t, disconnectErr.Error(), state.Message())
327329
})
328330

329331
t.Run("test self peer and other peer", func(t *testing.T) {
@@ -364,7 +366,7 @@ func TestMigrationsV0(t *testing.T) {
364366
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
365367
defer cancel()
366368

367-
ds := datastore.NewMapDatastore()
369+
ds := dss.MutexWrap(datastore.NewMapDatastore())
368370
received := make(chan event)
369371
notifier := func(evt datatransfer.Event, chst datatransfer.ChannelState) {
370372
received <- event{evt, chst}
@@ -484,7 +486,7 @@ func TestMigrationsV1(t *testing.T) {
484486
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
485487
defer cancel()
486488

487-
ds := datastore.NewMapDatastore()
489+
ds := dss.MutexWrap(datastore.NewMapDatastore())
488490
received := make(chan event)
489491
notifier := func(evt datatransfer.Event, chst datatransfer.ChannelState) {
490492
received <- event{evt, chst}

errors.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,3 @@ const ErrRejected = errorType("response rejected")
3030

3131
// ErrUnsupported indicates an operation is not supported by the transport protocol
3232
const ErrUnsupported = errorType("unsupported")
33-
34-
// ErrDisconnected indicates the other peer may have hung up and you should try restarting the channel.
35-
const ErrDisconnected = errorType("other peer appears to have hung up. restart Channel")
36-
37-
// ErrRemoved indicates the channel was inactive long enough that it was put in a permaneant error state
38-
const ErrRemoved = errorType("channel removed due to inactivity")

events.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,14 @@ const (
9090
// the remote peer. It is used to measure progress of how much of the total
9191
// data has been received.
9292
DataReceivedProgress
93+
94+
// RequestTimedOut indicates that the transport layer had a timeout trying to
95+
// make a request
96+
RequestTimedOut
97+
98+
// SendDataError indicates that the transport layer had an error trying
99+
// to send data to the remote peer
100+
SendDataError
93101
)
94102

95103
// Events are human readable names for data transfer events

impl/environment.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,5 @@ func (ce *channelEnvironment) ID() peer.ID {
2323
}
2424

2525
func (ce *channelEnvironment) CleanupChannel(chid datatransfer.ChannelID) {
26-
ce.m.reconnectsLk.Lock()
27-
delete(ce.m.reconnects, chid)
28-
ce.m.reconnectsLk.Unlock()
2926
ce.m.transport.CleanupChannel(chid)
3027
}

0 commit comments

Comments
 (0)