Skip to content

Commit 42e0a5b

Browse files
authored
Improve push channel to detect when not all data has been received (#157)
* test: add tests for auto-restarting a push channel * fix: push channel monitor watch for accept and complete events
1 parent 8b52e5e commit 42e0a5b

File tree

6 files changed

+520
-126
lines changed

6 files changed

+520
-126
lines changed

impl/impl.go

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -93,29 +93,9 @@ func ChannelRemoveTimeout(timeout time.Duration) DataTransferOption {
9393

9494
// PushChannelRestartConfig sets the configuration options for automatically
9595
// restarting push channels
96-
// - interval is the time over which minBytesSent must have been sent
97-
// - checksPerInterval is the number of times to check per interval
98-
// - minBytesSent is the minimum amount of data that must have been sent over
99-
// the interval
100-
// - restartBackoff is the time to wait before checking again for restarts
101-
// - maxConsecutiveRestarts is the maximum number of restarts in a row to
102-
// attempt where no data is transferred. When the limit is reached the
103-
// channel is closed.
104-
func PushChannelRestartConfig(
105-
interval time.Duration,
106-
checksPerInterval uint32,
107-
minBytesSent uint64,
108-
restartBackoff time.Duration,
109-
maxConsecutiveRestarts uint32,
110-
) DataTransferOption {
96+
func PushChannelRestartConfig(cfg pushchannelmonitor.Config) DataTransferOption {
11197
return func(m *manager) {
112-
m.pushChannelMonitorCfg = &pushchannelmonitor.Config{
113-
Interval: interval,
114-
ChecksPerInterval: checksPerInterval,
115-
MinBytesSent: minBytesSent,
116-
RestartBackoff: restartBackoff,
117-
MaxConsecutiveRestarts: maxConsecutiveRestarts,
118-
}
98+
m.pushChannelMonitorCfg = &cfg
11999
}
120100
}
121101

@@ -318,30 +298,76 @@ func (m *manager) CloseDataTransferChannel(ctx context.Context, chid datatransfe
318298
if err != nil {
319299
return err
320300
}
301+
302+
// Close the channel on the local transport
321303
err = m.transport.CloseChannel(ctx, chid)
322304
if err != nil {
323305
log.Warnf("unable to close channel %s: %s", chid, err)
324306
}
325307

326-
log.Infof("%s: sending close channel to %s for channel %s", m.peerID, chst.OtherPeer(), chid)
308+
// Send a cancel message to the remote peer
309+
log.Infof("%s: sending cancel channel to %s for channel %s", m.peerID, chst.OtherPeer(), chid)
327310
err = m.dataTransferNetwork.SendMessage(ctx, chst.OtherPeer(), m.cancelMessage(chid))
328311
if err != nil {
329-
err = fmt.Errorf("Unable to send cancel message: %w", err)
312+
err = fmt.Errorf("unable to send cancel message for channel %s to peer %s: %w",
313+
chid, m.peerID, err)
330314
_ = m.OnRequestDisconnected(ctx, chid)
331315
log.Warn(err)
332316
}
333317

318+
// Fire a cancel event
334319
fsmerr := m.channels.Cancel(chid)
320+
// If it wasn't possible to send a cancel message to the peer, return
321+
// that error
335322
if err != nil {
336323
return err
337324
}
325+
// If it wasn't possible to fire a cancel event, return that error
338326
if fsmerr != nil {
339327
return xerrors.Errorf("unable to send cancel to channel FSM: %w", fsmerr)
340328
}
341329

342330
return nil
343331
}
344332

333+
// close an open channel and fire an error event
334+
func (m *manager) CloseDataTransferChannelWithError(ctx context.Context, chid datatransfer.ChannelID, cherr error) error {
335+
log.Infof("close channel %s with error %s", chid, cherr)
336+
337+
chst, err := m.channels.GetByID(ctx, chid)
338+
if err != nil {
339+
return err
340+
}
341+
342+
// Cancel the channel on the local transport
343+
err = m.transport.CloseChannel(ctx, chid)
344+
if err != nil {
345+
log.Warnf("unable to close channel %s: %s", chid, err)
346+
}
347+
348+
// Try to send a cancel message to the remote peer. It's quite likely
349+
// we aren't able to send the message to the peer because the channel
350+
// is already in an error state, which is probably because of connection
351+
// issues, so if we cant send the message just log a warning.
352+
log.Infof("%s: sending cancel channel to %s for channel %s", m.peerID, chst.OtherPeer(), chid)
353+
err = m.dataTransferNetwork.SendMessage(ctx, chst.OtherPeer(), m.cancelMessage(chid))
354+
if err != nil {
355+
// Just log a warning here because it's important that we fire the
356+
// error event with the original error so that it doesn't get masked
357+
// by subsequent errors.
358+
log.Warnf("unable to send cancel message for channel %s to peer %s: %w",
359+
chid, m.peerID, err)
360+
}
361+
362+
// Fire an error event
363+
err = m.channels.Error(chid, cherr)
364+
if err != nil {
365+
return xerrors.Errorf("unable to send error %s to channel FSM: %w", cherr, err)
366+
}
367+
368+
return nil
369+
}
370+
345371
// pause a running data transfer channel
346372
func (m *manager) PauseDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error {
347373
log.Infof("pause channel %s", chid)

0 commit comments

Comments
 (0)