Skip to content

Commit

Permalink
fixed dispatcher() on cutoff conditions (#210)
Browse files Browse the repository at this point in the history
* fixed dispatcher() on cutoff conditions

* added comments
  • Loading branch information
kristinapathak authored May 14, 2020
1 parent 0723afd commit 76e8eab
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- cleaned up shutdown logic for outbound sender [#205](https://github.com/xmidt-org/caduceus/pull/205)
- added resetting queue depth and current workers gauges to outbound sender [#205](https://github.com/xmidt-org/caduceus/pull/205)
- removed queueEmpty variable from outbound sender [#209](https://github.com/xmidt-org/caduceus/pull/209)
- fixed outbound sender's long running dispatcher() goroutine to not exit when a cutoff occurs [#210](https://github.com/xmidt-org/caduceus/pull/210)

## [v0.2.7]
- pared down logging, especially debugging logs [#196](https://github.com/xmidt-org/caduceus/pull/196)
Expand Down
36 changes: 34 additions & 2 deletions outboundSender.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,10 @@ func (obs *CaduceusOutboundSender) isValidTimeWindow(now, dropUntil, deliverUnti
return true
}

// Empty is called on cutoff or shutdown and swaps out the current queue for
// a fresh one, counting any current messages in the queue as dropped.
// It should never close a queue, as a queue not referenced anywhere will be
// cleaned up by the garbage collector without needing to be closed.
func (obs *CaduceusOutboundSender) Empty(droppedCounter metrics.Counter) {
droppedMsgs := obs.queue.Load().(chan *wrp.Message)
obs.queue.Store(make(chan *wrp.Message, obs.queueSize))
Expand All @@ -459,16 +463,40 @@ func (obs *CaduceusOutboundSender) dispatcher() {

Loop:
for {
// Always pull a new queue in case we have been cutoff or are shutting
// down.
msgQueue := obs.queue.Load().(chan *wrp.Message)
select {
// The dispatcher cannot get stuck blocking here forever (caused by an
// empty queue that is replaced and then Queue() starts adding to the
// new queue) because:
// - queue is only replaced on cutoff and shutdown
// - on cutoff, the first queue is always full so we will definitely
// get a message, drop it because we're cut off, then get the new
// queue and block until the cut off ends and Queue() starts queueing
// messages again.
// - on graceful shutdown, the queue is closed and then the dispatcher
// will send all messages, then break the loop, gather workers, and
// exit.
// - on non graceful shutdown, the queue is closed and then replaced
// with a new, empty queue that is also closed.
// - If the first queue is empty, we immediately break the loop,
// gather workers, and exit.
// - If the first queue has messages, we drop a message as expired
// pull in the new queue which is empty and closed, break the
// loop, gather workers, and exit.
case msg, ok = <-msgQueue:
// This is only true when a queue is empty and closed, which for us
// only happens on Shutdown().
if !ok {
break Loop
}
obs.queueDepthGauge.Add(-1.0)
obs.mutex.RLock()
urls = obs.urls
// Move to the next URL to try 1st the next time.
// This is okay because we run a single dispatcher and it's the
// only one updating this field.
obs.urls = obs.urls.Next()
deliverUntil := obs.deliverUntil
dropUntil := obs.dropUntil
Expand All @@ -480,7 +508,7 @@ Loop:

if now.Before(dropUntil) {
obs.droppedCutoffCounter.Add(1.0)
break Loop
continue
}
if now.After(deliverUntil) {
obs.droppedExpiredCounter.Add(1.0)
Expand Down Expand Up @@ -609,7 +637,9 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri
obs.deliveryCounter.With("url", obs.id, "code", code, "event", event).Add(1.0)
}

// queueOverflow handles the logic of what to do when a queue overflows
// queueOverflow handles the logic of what to do when a queue overflows:
// cutting off the webhook for a time and sending a cut off notification
// to the failure URL.
func (obs *CaduceusOutboundSender) queueOverflow() {
obs.mutex.Lock()
if time.Now().Before(obs.dropUntil) {
Expand All @@ -629,6 +659,8 @@ func (obs *CaduceusOutboundSender) queueOverflow() {

obs.cutOffCounter.Add(1.0)

// We empty the queue but don't close the channel, because we're not
// shutting down.
obs.Empty(obs.droppedCutoffCounter)

msg, err := json.Marshal(failureMsg)
Expand Down

0 comments on commit 76e8eab

Please sign in to comment.