Skip to content

Commit

Permalink
fix: fail a pull channel when there is a timeout receiving the Comple…
Browse files Browse the repository at this point in the history
…te message (#179)
  • Loading branch information
dirkmc authored Mar 30, 2021
1 parent c2872cb commit de9804f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 21 deletions.
61 changes: 45 additions & 16 deletions channelmonitor/channelmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func checkConfig(cfg *Config) {
// This interface just makes it easier to abstract some methods between the
// push and pull monitor implementations
type monitoredChan interface {
Shutdown()
Shutdown() bool
checkDataRate()
}

Expand Down Expand Up @@ -125,11 +125,23 @@ func (m *Monitor) addChannel(chid datatransfer.ChannelID, isPush bool) monitored
m.lk.Lock()
defer m.lk.Unlock()

// Check if there is already a monitor for this channel
if _, ok := m.channels[chid]; ok {
tp := "push"
if !isPush {
tp = "pull"
}
log.Warnf("ignoring add %s channel %s: %s channel with that id already exists",
tp, chid, tp)
return nil
}

// Create the channel monitor
var mpc monitoredChan
if isPush {
mpc = newMonitoredPushChannel(m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown)
mpc = newMonitoredPushChannel(m.ctx, m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown)
} else {
mpc = newMonitoredPullChannel(m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown)
mpc = newMonitoredPullChannel(m.ctx, m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown)
}
m.channels[chid] = mpc
return mpc
Expand Down Expand Up @@ -207,6 +219,9 @@ func (m *Monitor) checkDataRate() {
// monitoredChannel keeps track of the data-rate for a channel, and
// restarts the channel if the rate falls below the minimum allowed
type monitoredChannel struct {
// The parentCtx is used when sending a close message for a channel, so
// that operation can continue even after the monitoredChannel is shutdown
parentCtx context.Context
ctx context.Context
cancel context.CancelFunc
mgr monitorAPI
Expand All @@ -223,6 +238,7 @@ type monitoredChannel struct {
}

func newMonitoredChannel(
parentCtx context.Context,
mgr monitorAPI,
chid datatransfer.ChannelID,
cfg *Config,
Expand All @@ -231,6 +247,7 @@ func newMonitoredChannel(
) *monitoredChannel {
ctx, cancel := context.WithCancel(context.Background())
mpc := &monitoredChannel{
parentCtx: parentCtx,
ctx: ctx,
cancel: cancel,
mgr: mgr,
Expand All @@ -247,14 +264,15 @@ func newMonitoredChannel(
func (mc *monitoredChannel) checkDataRate() {
}

// Cancel the context and unsubscribe from events
func (mc *monitoredChannel) Shutdown() {
// Cancel the context and unsubscribe from events.
// Returns true if channel has not already been shutdown.
func (mc *monitoredChannel) Shutdown() bool {
mc.shutdownLk.Lock()
defer mc.shutdownLk.Unlock()

// Check if the channel was already shut down
if mc.cancel == nil {
return
return false
}
mc.cancel() // cancel context so all go-routines exit
mc.cancel = nil
Expand All @@ -264,6 +282,8 @@ func (mc *monitoredChannel) Shutdown() {

// Inform the Manager that this channel has shut down
go mc.onShutdown(mc.chid)

return true
}

func (mc *monitoredChannel) start() {
Expand Down Expand Up @@ -301,9 +321,9 @@ func (mc *monitoredChannel) start() {
log.Warnf("%s: data transfer transport send error, restarting data transfer", mc.chid)
go mc.restartChannel()
case datatransfer.FinishTransfer:
// The client has finished sending all data. Watch to make sure
// that the responder sends a message to acknowledge that the
// transfer is complete
// The channel initiator has finished sending / receiving all data.
// Watch to make sure that the responder sends a message to acknowledge
// that the transfer is complete
go mc.watchForResponderComplete()
default:
// Delegate to the push channel monitor or pull channel monitor to
Expand Down Expand Up @@ -425,14 +445,21 @@ func (mc *monitoredChannel) restartChannel() {
mc.restartLk.Unlock()
}

// Shut down the monitor and close the data transfer channel
func (mc *monitoredChannel) closeChannelAndShutdown(cherr error) {
// Shutdown the monitor
firstShutdown := mc.Shutdown()
if !firstShutdown {
// Channel was already shutdown, ignore this second attempt to shutdown
return
}

// Close the data transfer channel and fire an error
log.Errorf("closing data-transfer channel: %s", cherr)
err := mc.mgr.CloseDataTransferChannelWithError(mc.ctx, mc.chid, cherr)
err := mc.mgr.CloseDataTransferChannelWithError(mc.parentCtx, mc.chid, cherr)
if err != nil {
log.Errorf("error closing data-transfer channel %s: %s", mc.chid, err)
}

mc.Shutdown()
}

// Snapshot of the pending and sent data at a particular point in time.
Expand All @@ -454,6 +481,7 @@ type monitoredPushChannel struct {
}

func newMonitoredPushChannel(
parentCtx context.Context,
mgr monitorAPI,
chid datatransfer.ChannelID,
cfg *Config,
Expand All @@ -462,7 +490,7 @@ func newMonitoredPushChannel(
mpc := &monitoredPushChannel{
dataRatePoints: make(chan *dataRatePoint, cfg.ChecksPerInterval),
}
mpc.monitoredChannel = newMonitoredChannel(mgr, chid, cfg, onShutdown, mpc.onDTEvent)
mpc.monitoredChannel = newMonitoredChannel(parentCtx, mgr, chid, cfg, onShutdown, mpc.onDTEvent)
return mpc
}

Expand Down Expand Up @@ -538,6 +566,7 @@ type monitoredPullChannel struct {
}

func newMonitoredPullChannel(
parentCtx context.Context,
mgr monitorAPI,
chid datatransfer.ChannelID,
cfg *Config,
Expand All @@ -546,7 +575,7 @@ func newMonitoredPullChannel(
mpc := &monitoredPullChannel{
dataRatePoints: make(chan uint64, cfg.ChecksPerInterval),
}
mpc.monitoredChannel = newMonitoredChannel(mgr, chid, cfg, onShutdown, mpc.onDTEvent)
mpc.monitoredChannel = newMonitoredChannel(parentCtx, mgr, chid, cfg, onShutdown, mpc.onDTEvent)
return mpc
}

Expand Down Expand Up @@ -578,8 +607,8 @@ func (mc *monitoredPullChannel) checkDataRate() {
log.Debugf("%s: since last check: received: %d - %d = %d, required %d",
mc.chid, mc.received, atIntervalStart, rcvdInInterval, mc.cfg.MinBytesTransferred)
if rcvdInInterval < mc.cfg.MinBytesTransferred {
log.Warnf("%s: data-rate too low, restarting channel: since last check %s ago: received %d, required %d",
mc.chid, mc.cfg.Interval, rcvdInInterval, mc.cfg.MinBytesTransferred)
log.Warnf("%s: data-rate too low, restarting channel: since last check received %d but required %d",
mc.chid, rcvdInInterval, mc.cfg.MinBytesTransferred)
go mc.restartChannel()
}
}
Expand Down
14 changes: 9 additions & 5 deletions impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,13 +614,17 @@ func TestAutoRestart(t *testing.T) {
// right before the responder sends the complete message (ie responder sent
// all blocks, but the responder doesn't get a chance to tell the initiator
// before the disconnect)
name: "push: before requester sends complete message",
isPush: true,
name: "pull: before responder sends complete message",
isPush: false,
expectInitiatorDTFail: true,
disconnectOnRequestComplete: true,
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
expectFailMsg := ""
if tc.expectInitiatorDTFail {
expectFailMsg = " (expect failure)"
}
t.Run(tc.name+expectFailMsg, func(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -659,8 +663,8 @@ func TestAutoRestart(t *testing.T) {

// Set up
restartConf := ChannelRestartConfig(channelmonitor.Config{
MonitorPushChannels: true,
MonitorPullChannels: true,
MonitorPushChannels: tc.isPush,
MonitorPullChannels: !tc.isPush,
AcceptTimeout: 100 * time.Millisecond,
Interval: 100 * time.Millisecond,
MinBytesTransferred: 1,
Expand Down

0 comments on commit de9804f

Please sign in to comment.