From 8a3df7f08614e2445934c043be9252567c2c3986 Mon Sep 17 00:00:00 2001 From: James Rasell Date: Mon, 30 May 2022 08:56:30 +0100 Subject: [PATCH] eval: add notification method when set enabled called. --- helper/broker/notify.go | 108 +++++++++++++++++++++++++++++++++++ helper/broker/notify_test.go | 55 ++++++++++++++++++ nomad/eval_broker.go | 12 +++- nomad/eval_endpoint.go | 24 +++++--- nomad/server.go | 4 ++ 5 files changed, 194 insertions(+), 9 deletions(-) create mode 100644 helper/broker/notify.go create mode 100644 helper/broker/notify_test.go diff --git a/helper/broker/notify.go b/helper/broker/notify.go new file mode 100644 index 00000000000..95807d6d863 --- /dev/null +++ b/helper/broker/notify.go @@ -0,0 +1,108 @@ +package broker + +import ( + "time" + + "github.com/hashicorp/nomad/helper" +) + +// GenericNotifier allows a process to send updates to many subscribers in an +// easy manner. +type GenericNotifier struct { + + // publishCh is the channel used to receive the update which will be sent + // to all subscribers. + publishCh chan interface{} + + // subscribeCh and unsubscribeCh are the channels used to modify the + // subscription membership mapping. + subscribeCh chan chan interface{} + unsubscribeCh chan chan interface{} +} + +// NewGenericNotifier returns a generic notifier which can be used by a process +// to notify many subscribers when a specific update is triggered. +func NewGenericNotifier() *GenericNotifier { + return &GenericNotifier{ + publishCh: make(chan interface{}, 1), + subscribeCh: make(chan chan interface{}, 1), + unsubscribeCh: make(chan chan interface{}, 1), + } +} + +// Notify allows the implementer to notify all subscribers will a specific +// update. There is no guarantee the order in which subscribers receive the +// message which is sent linearly. +func (g *GenericNotifier) Notify(msg interface{}) { + select { + case g.publishCh <- msg: + default: + } +} + +// Run is a long-lived process which handles updating subscribers as well as +// ensuring any update is sent to them. The passed stopCh is used to coordinate +// shutdown. +func (g *GenericNotifier) Run(stopCh <-chan struct{}) { + + // Store our subscribers inline with a map. This map can only be accessed + // via a single channel update at a time, meaning we can manage with + // without using a lock. + subscribers := map[chan interface{}]struct{}{} + + for { + select { + case <-stopCh: + return + case msgCh := <-g.subscribeCh: + subscribers[msgCh] = struct{}{} + case msgCh := <-g.unsubscribeCh: + delete(subscribers, msgCh) + case update := <-g.publishCh: + for subscriberCh := range subscribers { + + // The subscribers channels are buffered, but ensure we don't + // block the whole process on this. + select { + case subscriberCh <- update: + default: + } + } + } + } +} + +// WaitForChange allows a subscriber to wait until there is a notification +// change, or the timeout is reached. The function will block until one +// condition ie met. +func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} { + + // Create a channel and subscribe to any update. This channel is buffered + // to ensure we do not block the main broker process. + updateCh := make(chan interface{}, 1) + g.subscribeCh <- updateCh + + // Create a timeout timer and use the helper to ensure this routine doesn't + // panic and making the stop call clear. + timeoutTimer, timeoutStop := helper.NewSafeTimer(timeout) + + // Defer a function which performs all the required cleanup of the + // subscriber once it has been notified of a change, or reached its wait + // timeout. + defer func() { + g.unsubscribeCh <- updateCh + close(updateCh) + timeoutStop() + }() + + // Enter the main loop which listens for an update or timeout and returns + // this information to the subscriber. + for { + select { + case <-timeoutTimer.C: + return "wait timed out after " + timeout.String() + case update := <-updateCh: + return update + } + } +} diff --git a/helper/broker/notify_test.go b/helper/broker/notify_test.go new file mode 100644 index 00000000000..51943987e3f --- /dev/null +++ b/helper/broker/notify_test.go @@ -0,0 +1,55 @@ +package broker + +import ( + "sync" + "testing" + "time" + + "github.com/hashicorp/nomad/ci" + "github.com/stretchr/testify/require" +) + +func TestGenericNotifier(t *testing.T) { + ci.Parallel(t) + + // Create the new notifier. + stopChan := make(chan struct{}) + defer close(stopChan) + + notifier := NewGenericNotifier() + go notifier.Run(stopChan) + + // Ensure we have buffered channels. + require.Equal(t, 1, cap(notifier.publishCh)) + require.Equal(t, 1, cap(notifier.subscribeCh)) + require.Equal(t, 1, cap(notifier.unsubscribeCh)) + + // Test that the timeout works. + var timeoutWG sync.WaitGroup + + for i := 0; i < 6; i++ { + go func(wg *sync.WaitGroup) { + wg.Add(1) + msg := notifier.WaitForChange(100 * time.Millisecond) + require.Equal(t, "wait timed out after 100ms", msg) + wg.Done() + }(&timeoutWG) + } + timeoutWG.Wait() + + // Test that all subscribers recieve an update when a single notification + // is sent. + var notifiedWG sync.WaitGroup + + for i := 0; i < 6; i++ { + go func(wg *sync.WaitGroup) { + wg.Add(1) + msg := notifier.WaitForChange(3 * time.Second) + require.Equal(t, "we got an update and not a timeout", msg) + wg.Done() + }(¬ifiedWG) + } + + notifier.Notify("we got an update and not a timeout") + notifiedWG.Wait() +} diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 9aa7542dcfe..b0b0b87d547 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -6,11 +6,13 @@ import ( "errors" "fmt" "math/rand" + "strconv" "sync" "time" metrics "github.com/armon/go-metrics" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/broker" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/lib/delayheap" "github.com/hashicorp/nomad/nomad/structs" @@ -48,8 +50,10 @@ type EvalBroker struct { nackTimeout time.Duration deliveryLimit int - enabled bool - stats *BrokerStats + enabled bool + enabledNotifier *broker.GenericNotifier + + stats *BrokerStats // evals tracks queued evaluations by ID to de-duplicate enqueue. // The counter is the number of times we've attempted delivery, @@ -131,6 +135,7 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, nackTimeout: timeout, deliveryLimit: deliveryLimit, enabled: false, + enabledNotifier: broker.NewGenericNotifier(), stats: new(BrokerStats), evals: make(map[string]int), jobEvals: make(map[structs.NamespacedID]string), @@ -176,6 +181,9 @@ func (b *EvalBroker) SetEnabled(enabled bool) { if !enabled { b.flush() } + + // Notify all subscribers to state changes of the broker enabled value. + b.enabledNotifier.Notify("eval broker enabled status changed to " + strconv.FormatBool(enabled)) } // Enqueue is used to enqueue a new evaluation diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 732c0a333cc..43b4d8081e0 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -126,18 +126,28 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, args.SchedulerVersion, scheduler.SchedulerVersion) } - // If the eval broker is not enabled, but we are the leader, we have - // nothing to do. Not checking this will return errors to the workers which - // does nothing but clog up the logs. - if !e.srv.evalBroker.Enabled() { - return nil - } - // Ensure there is a default timeout if args.Timeout <= 0 { args.Timeout = DefaultDequeueTimeout } + // If the eval broker is paused, attempt to block and wait for a state + // change before returning. This avoids a tight loop and mimics the + // behaviour where there are no evals to process. + // + // The call can return because either the timeout is reached or the broker + // SetEnabled function was called to modify its state. It is possible this + // is because of leadership transition, therefore the RPC should exit to + // allow all safety checks and RPC forwarding to occur again. + // + // The log line is trace, because the default worker timeout is 500ms which + // produces a large amount of logging. + if !e.srv.evalBroker.Enabled() { + message := e.srv.evalBroker.enabledNotifier.WaitForChange(args.Timeout) + e.logger.Trace("eval broker wait for un-pause", "message", message) + return nil + } + // Attempt the dequeue eval, token, err := e.srv.evalBroker.Dequeue(args.Schedulers, args.Timeout) if err != nil { diff --git a/nomad/server.go b/nomad/server.go index e47a72a77ec..4eb794bd0ab 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -431,6 +431,10 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr return nil, fmt.Errorf("failed to create volume watcher: %v", err) } + // Start the eval broker notification system so any subscribers can get + // updates when the processes SetEnabled is triggered. + go s.evalBroker.enabledNotifier.Run(s.shutdownCh) + // Setup the node drainer. s.setupNodeDrainer()