Skip to content

Commit

Permalink
core: allow pause/un-pause of eval broker on region leader.
Browse files Browse the repository at this point in the history
  • Loading branch information
jrasell committed Jul 1, 2022
1 parent 1d8d1ab commit 4b170e1
Show file tree
Hide file tree
Showing 11 changed files with 434 additions and 33 deletions.
106 changes: 106 additions & 0 deletions helper/broker/notify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package broker

import (
"time"

"github.com/hashicorp/nomad/helper"
)

// GenericNotifier allows a process to send updates to many subscribers in an
// easy manner.
type GenericNotifier struct {

// publishCh is the channel used to receive the update which will be sent
// to all subscribers.
publishCh chan interface{}

// subscribeCh and unsubscribeCh are the channels used to modify the
// subscription membership mapping.
subscribeCh chan chan interface{}
unsubscribeCh chan chan interface{}
}

// NewGenericNotifier returns a generic notifier which can be used by a process
// to notify many subscribers when a specific update is triggered.
func NewGenericNotifier() *GenericNotifier {
return &GenericNotifier{
publishCh: make(chan interface{}, 1),
subscribeCh: make(chan chan interface{}, 1),
unsubscribeCh: make(chan chan interface{}, 1),
}
}

// Notify allows the implementer to notify all subscribers with a specific
// update. There is no guarantee the order in which subscribers receive the
// message which is sent linearly.
func (g *GenericNotifier) Notify(msg interface{}) {
select {
case g.publishCh <- msg:
default:
}
}

// Run is a long-lived process which handles updating subscribers as well as
// ensuring any update is sent to them. The passed stopCh is used to coordinate
// shutdown.
func (g *GenericNotifier) Run(stopCh <-chan struct{}) {

// Store our subscribers inline with a map. This map can only be accessed
// via a single channel update at a time, meaning we can manage without
// using a lock.
subscribers := map[chan interface{}]struct{}{}

for {
select {
case <-stopCh:
return
case msgCh := <-g.subscribeCh:
subscribers[msgCh] = struct{}{}
case msgCh := <-g.unsubscribeCh:
delete(subscribers, msgCh)
case update := <-g.publishCh:
for subscriberCh := range subscribers {

// The subscribers channels are buffered, but ensure we don't
// block the whole process on this.
select {
case subscriberCh <- update:
default:
}
}
}
}
}

// WaitForChange allows a subscriber to wait until there is a notification
// change, or the timeout is reached. The function will block until one
// condition is met.
func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} {

// Create a channel and subscribe to any update. This channel is buffered
// to ensure we do not block the main broker process.
updateCh := make(chan interface{}, 1)
g.subscribeCh <- updateCh

// Create a timeout timer and use the helper to ensure this routine doesn't
// panic and making the stop call clear.
timeoutTimer, timeoutStop := helper.NewSafeTimer(timeout)

// Defer a function which performs all the required cleanup of the
// subscriber once it has been notified of a change, or reached its wait
// timeout.
defer func() {
g.unsubscribeCh <- updateCh
close(updateCh)
timeoutStop()
}()

// Enter the main loop which listens for an update or timeout and returns
// this information to the subscriber.
select {
case <-timeoutTimer.C:
return "wait timed out after " + timeout.String()
case update := <-updateCh:
return update
}
}
55 changes: 55 additions & 0 deletions helper/broker/notify_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package broker

import (
"sync"
"testing"
"time"

"github.com/hashicorp/nomad/ci"
"github.com/stretchr/testify/require"
)

func TestGenericNotifier(t *testing.T) {
ci.Parallel(t)

// Create the new notifier.
stopChan := make(chan struct{})
defer close(stopChan)

notifier := NewGenericNotifier()
go notifier.Run(stopChan)

// Ensure we have buffered channels.
require.Equal(t, 1, cap(notifier.publishCh))
require.Equal(t, 1, cap(notifier.subscribeCh))
require.Equal(t, 1, cap(notifier.unsubscribeCh))

// Test that the timeout works.
var timeoutWG sync.WaitGroup

for i := 0; i < 6; i++ {
go func(wg *sync.WaitGroup) {
wg.Add(1)
msg := notifier.WaitForChange(100 * time.Millisecond)
require.Equal(t, "wait timed out after 100ms", msg)
wg.Done()
}(&timeoutWG)
}
timeoutWG.Wait()

// Test that all subscribers recieve an update when a single notification
// is sent.
var notifiedWG sync.WaitGroup

for i := 0; i < 6; i++ {
go func(wg *sync.WaitGroup) {
wg.Add(1)
msg := notifier.WaitForChange(3 * time.Second)
require.Equal(t, "we got an update and not a timeout", msg)
wg.Done()
}(&notifiedWG)
}

notifier.Notify("we got an update and not a timeout")
notifiedWG.Wait()
}
12 changes: 10 additions & 2 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"time"

metrics "github.com/armon/go-metrics"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/broker"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/lib/delayheap"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -48,8 +50,10 @@ type EvalBroker struct {
nackTimeout time.Duration
deliveryLimit int

enabled bool
stats *BrokerStats
enabled bool
enabledNotifier *broker.GenericNotifier

stats *BrokerStats

// evals tracks queued evaluations by ID to de-duplicate enqueue.
// The counter is the number of times we've attempted delivery,
Expand Down Expand Up @@ -131,6 +135,7 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration,
nackTimeout: timeout,
deliveryLimit: deliveryLimit,
enabled: false,
enabledNotifier: broker.NewGenericNotifier(),
stats: new(BrokerStats),
evals: make(map[string]int),
jobEvals: make(map[structs.NamespacedID]string),
Expand Down Expand Up @@ -176,6 +181,9 @@ func (b *EvalBroker) SetEnabled(enabled bool) {
if !enabled {
b.flush()
}

// Notify all subscribers to state changes of the broker enabled value.
b.enabledNotifier.Notify("eval broker enabled status changed to " + strconv.FormatBool(enabled))
}

// Enqueue is used to enqueue a new evaluation
Expand Down
17 changes: 17 additions & 0 deletions nomad/eval_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,23 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
args.Timeout = DefaultDequeueTimeout
}

// If the eval broker is paused, attempt to block and wait for a state
// change before returning. This avoids a tight loop and mimics the
// behaviour where there are no evals to process.
//
// The call can return because either the timeout is reached or the broker
// SetEnabled function was called to modify its state. It is possible this
// is because of leadership transition, therefore the RPC should exit to
// allow all safety checks and RPC forwarding to occur again.
//
// The log line is trace, because the default worker timeout is 500ms which
// produces a large amount of logging.
if !e.srv.evalBroker.Enabled() {
message := e.srv.evalBroker.enabledNotifier.WaitForChange(args.Timeout)
e.logger.Trace("eval broker wait for un-pause", "message", message)
return nil
}

// Attempt the dequeue
eval, token, err := e.srv.evalBroker.Dequeue(args.Schedulers, args.Timeout)
if err != nil {
Expand Down
27 changes: 27 additions & 0 deletions nomad/eval_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,33 @@ func TestEvalEndpoint_Dequeue_Version_Mismatch(t *testing.T) {
}
}

func TestEvalEndpoint_Dequeue_BrokerDisabled(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue.
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the register a request.
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)

// Disable the eval broker and try to dequeue.
s1.evalBroker.SetEnabled(false)

get := &structs.EvalDequeueRequest{
Schedulers: defaultSched,
SchedulerVersion: scheduler.SchedulerVersion,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.EvalDequeueResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp))
require.Empty(t, resp.Eval)
}

func TestEvalEndpoint_Ack(t *testing.T) {
ci.Parallel(t)

Expand Down
97 changes: 83 additions & 14 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,8 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
s.getOrCreateAutopilotConfig()
s.autopilot.Start()

// Initialize scheduler configuration
s.getOrCreateSchedulerConfig()
// Initialize scheduler configuration.
schedulerConfig := s.getOrCreateSchedulerConfig()

// Initialize the ClusterID
_, _ = s.ClusterID()
Expand All @@ -302,12 +302,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// Start the plan evaluator
go s.planApply()

// Enable the eval broker, since we are now the leader
s.evalBroker.SetEnabled(true)

// Enable the blocked eval tracker, since we are now the leader
s.blockedEvals.SetEnabled(true)
s.blockedEvals.SetTimetable(s.fsm.TimeTable())
// Start the eval broker and blocked eval broker if these are not paused by
// the operator.
restoreEvals := s.handleEvalBrokerStateChange(schedulerConfig)

// Enable the deployment watcher, since we are now the leader
s.deploymentWatcher.SetEnabled(true, s.State())
Expand All @@ -318,9 +315,12 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// Enable the volume watcher, since we are now the leader
s.volumeWatcher.SetEnabled(true, s.State(), s.getLeaderAcl())

// Restore the eval broker state
if err := s.restoreEvals(); err != nil {
return err
// Restore the eval broker state and blocked eval state. If these are
// currently paused, we do not need to do this.
if restoreEvals {
if err := s.restoreEvals(); err != nil {
return err
}
}

// Activate the vault client
Expand Down Expand Up @@ -1110,11 +1110,13 @@ func (s *Server) revokeLeadership() error {
// Disable the plan queue, since we are no longer leader
s.planQueue.SetEnabled(false)

// Disable the eval broker, since it is only useful as a leader
// Disable the eval broker and blocked evals. We do not need to check the
// scheduler configuration paused eval broker value, as the brokers should
// always be paused on the non-leader.
s.brokerLock.Lock()
s.evalBroker.SetEnabled(false)

// Disable the blocked eval tracker, since it is only useful as a leader
s.blockedEvals.SetEnabled(false)
s.brokerLock.Unlock()

// Disable the periodic dispatcher, since it is only useful as a leader
s.periodicDispatcher.SetEnabled(false)
Expand Down Expand Up @@ -1693,3 +1695,70 @@ func (s *Server) generateClusterID() (string, error) {
s.logger.Named("core").Info("established cluster id", "cluster_id", newMeta.ClusterID, "create_time", newMeta.CreateTime)
return newMeta.ClusterID, nil
}

// handleEvalBrokerStateChange handles changing the evalBroker and blockedEvals
// enabled status based on the passed scheduler configuration. The boolean
// response indicates whether the caller needs to call restoreEvals() due to
// the brokers being enabled. It is for use when the change must take the
// scheduler configuration into account. This is not needed when calling
// revokeLeadership, as the configuration doesn't matter, and we need to ensure
// the brokers are stopped.
//
// The function checks the server is the leader and uses a mutex to avoid any
// potential timings problems. Consider the following timings:
// - operator updates the configuration via the API
// - the RPC handler applies the change via Raft
// - leadership transitions with write barrier
// - the RPC handler call this function to enact the change
//
// The mutex also protects against a situation where leadership is revoked
// while this function is being called. Ensuring the correct series of actions
// occurs so that state stays consistent.
func (s *Server) handleEvalBrokerStateChange(schedConfig *structs.SchedulerConfiguration) bool {

// Grab the lock first. Once we have this we can be sure to run everything
// needed before any leader transition can attempt to modify the state.
s.brokerLock.Lock()
defer s.brokerLock.Unlock()

// If we are no longer the leader, exit early.
if !s.IsLeader() {
return false
}

// enableEvalBroker tracks whether the evalBroker and blockedEvals
// processes should be enabled or not. It allows us to answer this question
// whether using a persisted Raft configuration, or the default bootstrap
// config.
var enableBrokers, restoreEvals bool

// The scheduler config can only be persisted to Raft once quorum has been
// established. If this is a fresh cluster, we need to use the default
// scheduler config, otherwise we can use the persisted object.
switch schedConfig {
case nil:
enableBrokers = !s.config.DefaultSchedulerConfig.PauseEvalBroker
default:
enableBrokers = !schedConfig.PauseEvalBroker
}

// If the evalBroker status is changing, set the new state.
if enableBrokers != s.evalBroker.Enabled() {
s.logger.Info("eval broker status modified", "paused", !enableBrokers)
s.evalBroker.SetEnabled(enableBrokers)
restoreEvals = enableBrokers
}

// If the blockedEvals status is changing, set the new state.
if enableBrokers != s.blockedEvals.Enabled() {
s.logger.Info("blocked evals status modified", "paused", !enableBrokers)
s.blockedEvals.SetEnabled(enableBrokers)
restoreEvals = enableBrokers

if enableBrokers {
s.blockedEvals.SetTimetable(s.fsm.TimeTable())
}
}

return restoreEvals
}
Loading

0 comments on commit 4b170e1

Please sign in to comment.