From 4b170e1805bf4b416af51e058fbceb1f04bd55c0 Mon Sep 17 00:00:00 2001 From: James Rasell Date: Fri, 1 Jul 2022 14:37:03 +0100 Subject: [PATCH] core: allow pause/un-pause of eval broker on region leader. --- helper/broker/notify.go | 106 ++++++++++++++++++++++++++++++++ helper/broker/notify_test.go | 55 +++++++++++++++++ nomad/eval_broker.go | 12 +++- nomad/eval_endpoint.go | 17 +++++ nomad/eval_endpoint_test.go | 27 ++++++++ nomad/leader.go | 97 ++++++++++++++++++++++++----- nomad/leader_test.go | 86 ++++++++++++++++++++++++++ nomad/operator_endpoint.go | 12 ++++ nomad/operator_endpoint_test.go | 29 +++++---- nomad/server.go | 20 ++++-- nomad/structs/operator.go | 6 ++ 11 files changed, 434 insertions(+), 33 deletions(-) create mode 100644 helper/broker/notify.go create mode 100644 helper/broker/notify_test.go diff --git a/helper/broker/notify.go b/helper/broker/notify.go new file mode 100644 index 00000000000..24320dce45f --- /dev/null +++ b/helper/broker/notify.go @@ -0,0 +1,106 @@ +package broker + +import ( + "time" + + "github.com/hashicorp/nomad/helper" +) + +// GenericNotifier allows a process to send updates to many subscribers in an +// easy manner. +type GenericNotifier struct { + + // publishCh is the channel used to receive the update which will be sent + // to all subscribers. + publishCh chan interface{} + + // subscribeCh and unsubscribeCh are the channels used to modify the + // subscription membership mapping. + subscribeCh chan chan interface{} + unsubscribeCh chan chan interface{} +} + +// NewGenericNotifier returns a generic notifier which can be used by a process +// to notify many subscribers when a specific update is triggered. +func NewGenericNotifier() *GenericNotifier { + return &GenericNotifier{ + publishCh: make(chan interface{}, 1), + subscribeCh: make(chan chan interface{}, 1), + unsubscribeCh: make(chan chan interface{}, 1), + } +} + +// Notify allows the implementer to notify all subscribers with a specific +// update. There is no guarantee the order in which subscribers receive the +// message which is sent linearly. +func (g *GenericNotifier) Notify(msg interface{}) { + select { + case g.publishCh <- msg: + default: + } +} + +// Run is a long-lived process which handles updating subscribers as well as +// ensuring any update is sent to them. The passed stopCh is used to coordinate +// shutdown. +func (g *GenericNotifier) Run(stopCh <-chan struct{}) { + + // Store our subscribers inline with a map. This map can only be accessed + // via a single channel update at a time, meaning we can manage without + // using a lock. + subscribers := map[chan interface{}]struct{}{} + + for { + select { + case <-stopCh: + return + case msgCh := <-g.subscribeCh: + subscribers[msgCh] = struct{}{} + case msgCh := <-g.unsubscribeCh: + delete(subscribers, msgCh) + case update := <-g.publishCh: + for subscriberCh := range subscribers { + + // The subscribers channels are buffered, but ensure we don't + // block the whole process on this. + select { + case subscriberCh <- update: + default: + } + } + } + } +} + +// WaitForChange allows a subscriber to wait until there is a notification +// change, or the timeout is reached. The function will block until one +// condition is met. +func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} { + + // Create a channel and subscribe to any update. This channel is buffered + // to ensure we do not block the main broker process. + updateCh := make(chan interface{}, 1) + g.subscribeCh <- updateCh + + // Create a timeout timer and use the helper to ensure this routine doesn't + // panic and making the stop call clear. + timeoutTimer, timeoutStop := helper.NewSafeTimer(timeout) + + // Defer a function which performs all the required cleanup of the + // subscriber once it has been notified of a change, or reached its wait + // timeout. + defer func() { + g.unsubscribeCh <- updateCh + close(updateCh) + timeoutStop() + }() + + // Enter the main loop which listens for an update or timeout and returns + // this information to the subscriber. + select { + case <-timeoutTimer.C: + return "wait timed out after " + timeout.String() + case update := <-updateCh: + return update + } +} diff --git a/helper/broker/notify_test.go b/helper/broker/notify_test.go new file mode 100644 index 00000000000..51943987e3f --- /dev/null +++ b/helper/broker/notify_test.go @@ -0,0 +1,55 @@ +package broker + +import ( + "sync" + "testing" + "time" + + "github.com/hashicorp/nomad/ci" + "github.com/stretchr/testify/require" +) + +func TestGenericNotifier(t *testing.T) { + ci.Parallel(t) + + // Create the new notifier. + stopChan := make(chan struct{}) + defer close(stopChan) + + notifier := NewGenericNotifier() + go notifier.Run(stopChan) + + // Ensure we have buffered channels. + require.Equal(t, 1, cap(notifier.publishCh)) + require.Equal(t, 1, cap(notifier.subscribeCh)) + require.Equal(t, 1, cap(notifier.unsubscribeCh)) + + // Test that the timeout works. + var timeoutWG sync.WaitGroup + + for i := 0; i < 6; i++ { + go func(wg *sync.WaitGroup) { + wg.Add(1) + msg := notifier.WaitForChange(100 * time.Millisecond) + require.Equal(t, "wait timed out after 100ms", msg) + wg.Done() + }(&timeoutWG) + } + timeoutWG.Wait() + + // Test that all subscribers recieve an update when a single notification + // is sent. + var notifiedWG sync.WaitGroup + + for i := 0; i < 6; i++ { + go func(wg *sync.WaitGroup) { + wg.Add(1) + msg := notifier.WaitForChange(3 * time.Second) + require.Equal(t, "we got an update and not a timeout", msg) + wg.Done() + }(¬ifiedWG) + } + + notifier.Notify("we got an update and not a timeout") + notifiedWG.Wait() +} diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 9aa7542dcfe..b0b0b87d547 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -6,11 +6,13 @@ import ( "errors" "fmt" "math/rand" + "strconv" "sync" "time" metrics "github.com/armon/go-metrics" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/broker" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/lib/delayheap" "github.com/hashicorp/nomad/nomad/structs" @@ -48,8 +50,10 @@ type EvalBroker struct { nackTimeout time.Duration deliveryLimit int - enabled bool - stats *BrokerStats + enabled bool + enabledNotifier *broker.GenericNotifier + + stats *BrokerStats // evals tracks queued evaluations by ID to de-duplicate enqueue. // The counter is the number of times we've attempted delivery, @@ -131,6 +135,7 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, nackTimeout: timeout, deliveryLimit: deliveryLimit, enabled: false, + enabledNotifier: broker.NewGenericNotifier(), stats: new(BrokerStats), evals: make(map[string]int), jobEvals: make(map[structs.NamespacedID]string), @@ -176,6 +181,9 @@ func (b *EvalBroker) SetEnabled(enabled bool) { if !enabled { b.flush() } + + // Notify all subscribers to state changes of the broker enabled value. + b.enabledNotifier.Notify("eval broker enabled status changed to " + strconv.FormatBool(enabled)) } // Enqueue is used to enqueue a new evaluation diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 2d6af727c32..43b4d8081e0 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -131,6 +131,23 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, args.Timeout = DefaultDequeueTimeout } + // If the eval broker is paused, attempt to block and wait for a state + // change before returning. This avoids a tight loop and mimics the + // behaviour where there are no evals to process. + // + // The call can return because either the timeout is reached or the broker + // SetEnabled function was called to modify its state. It is possible this + // is because of leadership transition, therefore the RPC should exit to + // allow all safety checks and RPC forwarding to occur again. + // + // The log line is trace, because the default worker timeout is 500ms which + // produces a large amount of logging. + if !e.srv.evalBroker.Enabled() { + message := e.srv.evalBroker.enabledNotifier.WaitForChange(args.Timeout) + e.logger.Trace("eval broker wait for un-pause", "message", message) + return nil + } + // Attempt the dequeue eval, token, err := e.srv.evalBroker.Dequeue(args.Schedulers, args.Timeout) if err != nil { diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index b23a9f2b285..1a67d0e6a47 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -454,6 +454,33 @@ func TestEvalEndpoint_Dequeue_Version_Mismatch(t *testing.T) { } } +func TestEvalEndpoint_Dequeue_BrokerDisabled(t *testing.T) { + ci.Parallel(t) + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue. + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register a request. + eval1 := mock.Eval() + s1.evalBroker.Enqueue(eval1) + + // Disable the eval broker and try to dequeue. + s1.evalBroker.SetEnabled(false) + + get := &structs.EvalDequeueRequest{ + Schedulers: defaultSched, + SchedulerVersion: scheduler.SchedulerVersion, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.EvalDequeueResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp)) + require.Empty(t, resp.Eval) +} + func TestEvalEndpoint_Ack(t *testing.T) { ci.Parallel(t) diff --git a/nomad/leader.go b/nomad/leader.go index d4b40f94380..8ad91bc4cbd 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -289,8 +289,8 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { s.getOrCreateAutopilotConfig() s.autopilot.Start() - // Initialize scheduler configuration - s.getOrCreateSchedulerConfig() + // Initialize scheduler configuration. + schedulerConfig := s.getOrCreateSchedulerConfig() // Initialize the ClusterID _, _ = s.ClusterID() @@ -302,12 +302,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Start the plan evaluator go s.planApply() - // Enable the eval broker, since we are now the leader - s.evalBroker.SetEnabled(true) - - // Enable the blocked eval tracker, since we are now the leader - s.blockedEvals.SetEnabled(true) - s.blockedEvals.SetTimetable(s.fsm.TimeTable()) + // Start the eval broker and blocked eval broker if these are not paused by + // the operator. + restoreEvals := s.handleEvalBrokerStateChange(schedulerConfig) // Enable the deployment watcher, since we are now the leader s.deploymentWatcher.SetEnabled(true, s.State()) @@ -318,9 +315,12 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Enable the volume watcher, since we are now the leader s.volumeWatcher.SetEnabled(true, s.State(), s.getLeaderAcl()) - // Restore the eval broker state - if err := s.restoreEvals(); err != nil { - return err + // Restore the eval broker state and blocked eval state. If these are + // currently paused, we do not need to do this. + if restoreEvals { + if err := s.restoreEvals(); err != nil { + return err + } } // Activate the vault client @@ -1110,11 +1110,13 @@ func (s *Server) revokeLeadership() error { // Disable the plan queue, since we are no longer leader s.planQueue.SetEnabled(false) - // Disable the eval broker, since it is only useful as a leader + // Disable the eval broker and blocked evals. We do not need to check the + // scheduler configuration paused eval broker value, as the brokers should + // always be paused on the non-leader. + s.brokerLock.Lock() s.evalBroker.SetEnabled(false) - - // Disable the blocked eval tracker, since it is only useful as a leader s.blockedEvals.SetEnabled(false) + s.brokerLock.Unlock() // Disable the periodic dispatcher, since it is only useful as a leader s.periodicDispatcher.SetEnabled(false) @@ -1693,3 +1695,70 @@ func (s *Server) generateClusterID() (string, error) { s.logger.Named("core").Info("established cluster id", "cluster_id", newMeta.ClusterID, "create_time", newMeta.CreateTime) return newMeta.ClusterID, nil } + +// handleEvalBrokerStateChange handles changing the evalBroker and blockedEvals +// enabled status based on the passed scheduler configuration. The boolean +// response indicates whether the caller needs to call restoreEvals() due to +// the brokers being enabled. It is for use when the change must take the +// scheduler configuration into account. This is not needed when calling +// revokeLeadership, as the configuration doesn't matter, and we need to ensure +// the brokers are stopped. +// +// The function checks the server is the leader and uses a mutex to avoid any +// potential timings problems. Consider the following timings: +// - operator updates the configuration via the API +// - the RPC handler applies the change via Raft +// - leadership transitions with write barrier +// - the RPC handler call this function to enact the change +// +// The mutex also protects against a situation where leadership is revoked +// while this function is being called. Ensuring the correct series of actions +// occurs so that state stays consistent. +func (s *Server) handleEvalBrokerStateChange(schedConfig *structs.SchedulerConfiguration) bool { + + // Grab the lock first. Once we have this we can be sure to run everything + // needed before any leader transition can attempt to modify the state. + s.brokerLock.Lock() + defer s.brokerLock.Unlock() + + // If we are no longer the leader, exit early. + if !s.IsLeader() { + return false + } + + // enableEvalBroker tracks whether the evalBroker and blockedEvals + // processes should be enabled or not. It allows us to answer this question + // whether using a persisted Raft configuration, or the default bootstrap + // config. + var enableBrokers, restoreEvals bool + + // The scheduler config can only be persisted to Raft once quorum has been + // established. If this is a fresh cluster, we need to use the default + // scheduler config, otherwise we can use the persisted object. + switch schedConfig { + case nil: + enableBrokers = !s.config.DefaultSchedulerConfig.PauseEvalBroker + default: + enableBrokers = !schedConfig.PauseEvalBroker + } + + // If the evalBroker status is changing, set the new state. + if enableBrokers != s.evalBroker.Enabled() { + s.logger.Info("eval broker status modified", "paused", !enableBrokers) + s.evalBroker.SetEnabled(enableBrokers) + restoreEvals = enableBrokers + } + + // If the blockedEvals status is changing, set the new state. + if enableBrokers != s.blockedEvals.Enabled() { + s.logger.Info("blocked evals status modified", "paused", !enableBrokers) + s.blockedEvals.SetEnabled(enableBrokers) + restoreEvals = enableBrokers + + if enableBrokers { + s.blockedEvals.SetTimetable(s.fsm.TimeTable()) + } + } + + return restoreEvals +} diff --git a/nomad/leader_test.go b/nomad/leader_test.go index c6d92160d3c..62864d115d0 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -1664,3 +1664,89 @@ func waitForStableLeadership(t *testing.T, servers []*Server) *Server { return leader } + +func TestServer_handleEvalBrokerStateChange(t *testing.T) { + ci.Parallel(t) + + testCases := []struct { + startValue bool + testServerCallBackConfig func(c *Config) + inputSchedulerConfiguration *structs.SchedulerConfiguration + expectedOutput bool + name string + }{ + { + startValue: false, + testServerCallBackConfig: func(c *Config) { c.DefaultSchedulerConfig.PauseEvalBroker = false }, + inputSchedulerConfiguration: nil, + expectedOutput: true, + name: "bootstrap un-paused", + }, + { + startValue: false, + testServerCallBackConfig: func(c *Config) { c.DefaultSchedulerConfig.PauseEvalBroker = true }, + inputSchedulerConfiguration: nil, + expectedOutput: false, + name: "bootstrap paused", + }, + { + startValue: true, + testServerCallBackConfig: nil, + inputSchedulerConfiguration: &structs.SchedulerConfiguration{PauseEvalBroker: true}, + expectedOutput: false, + name: "state change to paused", + }, + { + startValue: false, + testServerCallBackConfig: nil, + inputSchedulerConfiguration: &structs.SchedulerConfiguration{PauseEvalBroker: true}, + expectedOutput: false, + name: "no state change to paused", + }, + { + startValue: false, + testServerCallBackConfig: nil, + inputSchedulerConfiguration: &structs.SchedulerConfiguration{PauseEvalBroker: false}, + expectedOutput: true, + name: "state change to un-paused", + }, + { + startValue: false, + testServerCallBackConfig: nil, + inputSchedulerConfiguration: &structs.SchedulerConfiguration{PauseEvalBroker: true}, + expectedOutput: false, + name: "no state change to un-paused", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + // Create a new server and wait for leadership to be established. + testServer, cleanupFn := TestServer(t, nil) + _ = waitForStableLeadership(t, []*Server{testServer}) + defer cleanupFn() + + // If we set a callback config, we are just testing the eventual + // state of the brokers. Otherwise, we set our starting value and + // then perform our state modification change and check. + if tc.testServerCallBackConfig == nil { + testServer.evalBroker.SetEnabled(tc.startValue) + testServer.blockedEvals.SetEnabled(tc.startValue) + actualOutput := testServer.handleEvalBrokerStateChange(tc.inputSchedulerConfiguration) + require.Equal(t, tc.expectedOutput, actualOutput) + } + + // Check the brokers are in the expected state. + var expectedEnabledVal bool + + if tc.inputSchedulerConfiguration == nil { + expectedEnabledVal = !testServer.config.DefaultSchedulerConfig.PauseEvalBroker + } else { + expectedEnabledVal = !tc.inputSchedulerConfiguration.PauseEvalBroker + } + require.Equal(t, expectedEnabledVal, testServer.evalBroker.Enabled()) + require.Equal(t, expectedEnabledVal, testServer.blockedEvals.Enabled()) + }) + } +} diff --git a/nomad/operator_endpoint.go b/nomad/operator_endpoint.go index aab11c539ae..40c27669eac 100644 --- a/nomad/operator_endpoint.go +++ b/nomad/operator_endpoint.go @@ -334,7 +334,19 @@ func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRe if respBool, ok := resp.(bool); ok { reply.Updated = respBool } + reply.Index = index + + // If we updated the configuration, handle any required state changes within + // the eval broker and blocked evals processes. The state change and + // restore functions have protections around leadership transitions and + // restoring into non-running brokers. + if reply.Updated { + if op.srv.handleEvalBrokerStateChange(&args.Config) { + return op.srv.restoreEvals() + } + } + return nil } diff --git a/nomad/operator_endpoint_test.go b/nomad/operator_endpoint_test.go index 1cac9b8029f..e96493e416d 100644 --- a/nomad/operator_endpoint_test.go +++ b/nomad/operator_endpoint_test.go @@ -402,39 +402,42 @@ func TestOperator_SchedulerSetConfiguration(t *testing.T) { c.Build = "0.9.0+unittest" }) defer cleanupS1() - codec := rpcClient(t, s1) + rpcCodec := rpcClient(t, s1) testutil.WaitForLeader(t, s1.RPC) - require := require.New(t) - - // Disable preemption + // Disable preemption and pause the eval broker. arg := structs.SchedulerSetConfigRequest{ Config: structs.SchedulerConfiguration{ PreemptionConfig: structs.PreemptionConfig{ SystemSchedulerEnabled: false, }, + PauseEvalBroker: true, }, } arg.Region = s1.config.Region var setResponse structs.SchedulerSetConfigurationResponse - err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", &arg, &setResponse) - require.Nil(err) - require.NotZero(setResponse.Index) + err := msgpackrpc.CallWithCodec(rpcCodec, "Operator.SchedulerSetConfiguration", &arg, &setResponse) + require.Nil(t, err) + require.NotZero(t, setResponse.Index) - // Read and verify that preemption is disabled + // Read and verify that preemption is disabled and the eval and blocked + // evals systems are disabled. readConfig := structs.GenericRequest{ QueryOptions: structs.QueryOptions{ Region: s1.config.Region, }, } var reply structs.SchedulerConfigurationResponse - if err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerGetConfiguration", &readConfig, &reply); err != nil { - t.Fatalf("err: %v", err) - } + err = msgpackrpc.CallWithCodec(rpcCodec, "Operator.SchedulerGetConfiguration", &readConfig, &reply) + require.NoError(t, err) - require.NotZero(reply.Index) - require.False(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled) + require.NotZero(t, reply.Index) + require.False(t, reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled) + require.True(t, reply.SchedulerConfig.PauseEvalBroker) + + require.False(t, s1.evalBroker.Enabled()) + require.False(t, s1.blockedEvals.Enabled()) } func TestOperator_SchedulerGetConfiguration_ACL(t *testing.T) { diff --git a/nomad/server.go b/nomad/server.go index 9fb64d0a50f..3c5351addb0 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -190,6 +190,18 @@ type Server struct { // capacity changes. blockedEvals *BlockedEvals + // evalBroker is used to manage the in-progress evaluations + // that are waiting to be brokered to a sub-scheduler + evalBroker *EvalBroker + + // brokerLock is used to synchronise the alteration of the blockedEvals and + // evalBroker enabled state. These two subsystems change state when + // leadership changes or when the user modifies the setting via the + // operator scheduler configuration. This lock allows these actions to be + // performed safely, without potential for user interactions and leadership + // transitions to collide and create inconsistent state. + brokerLock sync.Mutex + // deploymentWatcher is used to watch deployments and their allocations and // make the required calls to continue to transition the deployment. deploymentWatcher *deploymentwatcher.Watcher @@ -200,10 +212,6 @@ type Server struct { // volumeWatcher is used to release volume claims volumeWatcher *volumewatcher.Watcher - // evalBroker is used to manage the in-progress evaluations - // that are waiting to be brokered to a sub-scheduler - evalBroker *EvalBroker - // periodicDispatcher is used to track and create evaluations for periodic jobs. periodicDispatcher *PeriodicDispatch @@ -423,6 +431,10 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr return nil, fmt.Errorf("failed to create volume watcher: %v", err) } + // Start the eval broker notification system so any subscribers can get + // updates when the processes SetEnabled is triggered. + go s.evalBroker.enabledNotifier.Run(s.shutdownCh) + // Setup the node drainer. s.setupNodeDrainer() diff --git a/nomad/structs/operator.go b/nomad/structs/operator.go index 633afa6c33e..28452a312f5 100644 --- a/nomad/structs/operator.go +++ b/nomad/structs/operator.go @@ -156,6 +156,12 @@ type SchedulerConfiguration struct { // management ACL token RejectJobRegistration bool `hcl:"reject_job_registration"` + // PauseEvalBroker is a boolean to control whether the evaluation broker + // should be paused on the cluster leader. Only a single broker runs per + // region, and it must be persisted to state so the parameter is consistent + // during leadership transitions. + PauseEvalBroker bool `hcl:"pause_eval_broker"` + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. CreateIndex uint64 ModifyIndex uint64