Skip to content

Commit

Permalink
fix: ensure channel monitor shuts down when transfer complete (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc authored Mar 23, 2021
1 parent 5f71a20 commit b1a43bd
Showing 1 changed file with 14 additions and 13 deletions.
27 changes: 14 additions & 13 deletions channelmonitor/channelmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Monitor struct {
cfg *Config

lk sync.RWMutex
channels map[monitoredChan]struct{}
channels map[datatransfer.ChannelID]monitoredChan
}

type Config struct {
Expand Down Expand Up @@ -59,7 +59,7 @@ func NewMonitor(mgr monitorAPI, cfg *Config) *Monitor {
stop: cancel,
mgr: mgr,
cfg: cfg,
channels: make(map[monitoredChan]struct{}),
channels: make(map[datatransfer.ChannelID]monitoredChan),
}
}

Expand Down Expand Up @@ -121,7 +121,7 @@ func (m *Monitor) addChannel(chid datatransfer.ChannelID, isPush bool) monitored
} else {
mpc = newMonitoredPullChannel(m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown)
}
m.channels[mpc] = struct{}{}
m.channels[chid] = mpc
return mpc
}

Expand All @@ -136,17 +136,17 @@ func (m *Monitor) onShutdown() {
m.lk.RLock()
defer m.lk.RUnlock()

for ch := range m.channels {
for _, ch := range m.channels {
ch.Shutdown()
}
}

// onMonitoredChannelShutdown is called when a monitored channel shuts down
func (m *Monitor) onMonitoredChannelShutdown(mpc *monitoredChannel) {
func (m *Monitor) onMonitoredChannelShutdown(chid datatransfer.ChannelID) {
m.lk.Lock()
defer m.lk.Unlock()

delete(m.channels, mpc)
delete(m.channels, chid)
}

// enabled indicates whether the channel monitor is running
Expand Down Expand Up @@ -189,7 +189,7 @@ func (m *Monitor) checkDataRate() {
m.lk.RLock()
defer m.lk.RUnlock()

for ch := range m.channels {
for _, ch := range m.channels {
ch.checkDataRate()
}
}
Expand All @@ -203,7 +203,7 @@ type monitoredChannel struct {
chid datatransfer.ChannelID
cfg *Config
unsub datatransfer.Unsubscribe
onShutdown func(*monitoredChannel)
onShutdown func(datatransfer.ChannelID)
onDTEvent datatransfer.Subscriber
shutdownLk sync.Mutex

Expand All @@ -216,7 +216,7 @@ func newMonitoredChannel(
mgr monitorAPI,
chid datatransfer.ChannelID,
cfg *Config,
onShutdown func(*monitoredChannel),
onShutdown func(datatransfer.ChannelID),
onDTEvent datatransfer.Subscriber,
) *monitoredChannel {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -253,7 +253,7 @@ func (mc *monitoredChannel) Shutdown() {
mc.unsub()

// Inform the Manager that this channel has shut down
go mc.onShutdown(mc)
go mc.onShutdown(mc.chid)
}

func (mc *monitoredChannel) start() {
Expand All @@ -275,7 +275,8 @@ func (mc *monitoredChannel) start() {
// Once the channel completes, shut down the monitor
state := channelState.Status()
if channels.IsChannelCleaningUp(state) || channels.IsChannelTerminated(state) {
log.Debugf("%s: stopping channel data-rate monitoring", mc.chid)
log.Debugf("%s: stopping channel data-rate monitoring (event: %s / state: %s)",
mc.chid, datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()])
go mc.Shutdown()
return
}
Expand Down Expand Up @@ -446,7 +447,7 @@ func newMonitoredPushChannel(
mgr monitorAPI,
chid datatransfer.ChannelID,
cfg *Config,
onShutdown func(*monitoredChannel),
onShutdown func(datatransfer.ChannelID),
) *monitoredPushChannel {
mpc := &monitoredPushChannel{
dataRatePoints: make(chan *dataRatePoint, cfg.ChecksPerInterval),
Expand Down Expand Up @@ -530,7 +531,7 @@ func newMonitoredPullChannel(
mgr monitorAPI,
chid datatransfer.ChannelID,
cfg *Config,
onShutdown func(*monitoredChannel),
onShutdown func(datatransfer.ChannelID),
) *monitoredPullChannel {
mpc := &monitoredPullChannel{
dataRatePoints: make(chan uint64, cfg.ChecksPerInterval),
Expand Down

0 comments on commit b1a43bd

Please sign in to comment.