Skip to content

Commit

Permalink
fix: eliminate deadlocks in the barrier node when delete is used
Browse files Browse the repository at this point in the history
Modified two different parts of the barrier node to prevent any
deadlocks. There were two possible methods to deadlock with one of these
found to be hit by people using these nodes.

The first was when a point was received, `Point` would be called and a
message would be sent to reset the timer. If the message queue was
backed up, then it was possible for `emitBarrier` to be called and for
it to try and forward a message to itself. But, the goroutine that was
reading the points from the edge was attempting to reset the timer so
the delete group message was never able to be consumed and the timer was
never able to be reset.

This first one was fixed by adding a buffer of size one to that channel
and making the reset non-blocking when the channel was full. This
guarantees that the message will be sent and prevents it from blocking
if the reset message is sent twice in quick succession.

The second is if the delete group message was received. This would call
`Stop`. If `emitBarrier` was called in between having the delete group
message, then it could deadlock in the same way as the above because the
delete group message needs emit to be called and emit is blocked on
waiting for the idle handler to exit.

The stop method was also not thread safe and it was easy enough to make
it thread safe since it could potentially be called from a different
thread.

This is fixed by changing the stop channel to have a buffer of 1 and
making it so stopping was sending a goroutine to avoid a potential
double close of the goroutine. When the stop message is sent, it will be
done non-blocking to prevent a deadlock since only one message should be
received anyway. The call to `Stop()` in the `idleHandler` is changed to
a non-blocking version and does not wait for the goroutine to exit while
the other stop will wait for the goroutine to exit.
  • Loading branch information
jsternberg committed Mar 27, 2019
1 parent 969536e commit 947b7c7
Showing 1 changed file with 20 additions and 7 deletions.
27 changes: 20 additions & 7 deletions barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ func newIdleBarrier(name string, group edge.GroupInfo, in edge.Edge, idle time.D
lastBarrierT: atomic.Value{},
wg: sync.WaitGroup{},
outs: outs,
stopC: make(chan struct{}),
resetTimerC: make(chan struct{}),
stopC: make(chan struct{}, 1),
resetTimerC: make(chan struct{}, 1),
del: del,
}

Expand All @@ -128,11 +128,17 @@ func (n *idleBarrier) Init() {
}

func (n *idleBarrier) Stop() {
n.stop()
n.wg.Wait()
}

func (n *idleBarrier) stop() {
// Send a stop signal at least once to the stop channel.
// The stop channel has a buffer of size one and only the
// first stop signal matters.
select {
case <-n.stopC:
case n.stopC <- struct{}{}:
default:
close(n.stopC)
n.wg.Wait()
}
}

Expand Down Expand Up @@ -161,7 +167,8 @@ func (n *idleBarrier) Barrier(m edge.BarrierMessage) (edge.Message, error) {
}
func (n *idleBarrier) DeleteGroup(m edge.DeleteGroupMessage) (edge.Message, error) {
if m.GroupID() == n.group.ID {
n.Stop()
// Signal that the idle barrier should stop.
n.stop()
}
return m, nil
}
Expand All @@ -177,7 +184,13 @@ func (n *idleBarrier) Point(m edge.PointMessage) (edge.Message, error) {
}

func (n *idleBarrier) resetTimer() {
n.resetTimerC <- struct{}{}
// The first reset will be buffered and subsequent resets when the
// channel is full can be safely discarded because the reset signal
// has already been sent.
select {
case n.resetTimerC <- struct{}{}:
default:
}
}

func (n *idleBarrier) emitBarrier() error {
Expand Down

0 comments on commit 947b7c7

Please sign in to comment.