Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Commit

Permalink
internal/runner: make Accept resilient to the server going down
Browse files Browse the repository at this point in the history
This starts by addressing only the initial job stream opening. This
required some changes to how we track the state of the underlying runner
registration. This changes the state tracking from a bool to a monotonic
integer.
  • Loading branch information
mitchellh committed Mar 14, 2022
1 parent 9a1a65d commit 187c00b
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 58 deletions.
44 changes: 34 additions & 10 deletions internal/runner/accept.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -89,11 +90,7 @@ func (r *Runner) AcceptExact(ctx context.Context, id string) error {
var testRecvDelay time.Duration

func (r *Runner) accept(ctx context.Context, id string) error {
r.runningCond.L.Lock()
shutdown := r.shutdown
r.runningCond.L.Unlock()

if shutdown {
if r.readState(&r.stateExit) > 0 {
return ErrClosed
}

Expand All @@ -114,10 +111,37 @@ func (r *Runner) accept(ctx context.Context, id string) error {
// Open a new job stream. NOTE: we purposely do NOT use ctx above
// since if the context is cancelled we want to continue reporting
// errors.
log.Debug("opening job stream")
client, err := r.client.RunnerJobStream(streamCtx)
if err != nil {
return err
retry := false
var client pb.Waypoint_RunnerJobStreamClient
for {
// Get our configuration state value. We use this so that we can detect
// when we've reconnected during failures.
stateGen := r.readState(&r.stateConfig)

log.Debug("opening job stream", "retry", retry)
var err error
client, err = r.client.RunnerJobStream(streamCtx, grpc.WaitForReady(retry))
if err != nil {
// If we decide to retry, then it'll be a retry, so mark it
retry = true

if status.Code(err) == codes.Unavailable || status.Code(err) == codes.NotFound {
log.Warn("server down during job stream open, will attempt reconnect")

// Since this is a disconnect, we have to wait for our
// RunnerConfig stream to re-establish. We wait for the config
// generation to increment.
if r.waitStateGreater(&r.stateConfig, stateGen) {
return status.Errorf(codes.Internal, "early exit while waiting for reconnect")
}

continue
}

return err
}

break
}
defer client.CloseSend()

Expand Down Expand Up @@ -193,7 +217,7 @@ func (r *Runner) accept(ctx context.Context, id string) error {
// and return.

r.runningCond.L.Lock()
shutdown = r.shutdown
shutdown := r.readState(&r.stateExit) > 0
if !shutdown {
r.runningJobs++
}
Expand Down
72 changes: 72 additions & 0 deletions internal/runner/accept_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,78 @@ func TestRunnerAccept_timeout(t *testing.T) {
require.True(errors.Is(err, ErrTimeout))
}

// Test how accept behaves when the server is down to begin with.
func TestRunnerAccept_serverDown(t *testing.T) {
require := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Setup the server
restartCh := make(chan struct{})
impl := singleprocess.TestImpl(t)
client := serverpkg.TestServer(t, impl,
serverpkg.TestWithContext(ctx),
serverpkg.TestWithRestart(restartCh),
)

// Setup our runner
runner := TestRunner(t, WithClient(client))
require.NoError(runner.Start(ctx))

// Initialize our app
singleprocess.TestApp(t, client, serverptypes.TestJobNew(t, nil).Application)

// Queue a job
queueResp, err := client.QueueJob(ctx, &pb.QueueJobRequest{
Job: serverptypes.TestJobNew(t, nil),
})
require.NoError(err)
jobId := queueResp.JobId

// Shut it down
cancel()
ctx, cancel = context.WithCancel(context.Background())
defer cancel()

// Wait to get an unavailable error so we know the server is down
require.Eventually(func() bool {
_, err := client.GetRunner(ctx, &pb.GetRunnerRequest{RunnerId: "A"})
return status.Code(err) == codes.Unavailable
}, 5*time.Second, 10*time.Millisecond)

// Start accept
errCh := make(chan error, 1)
go func() {
errCh <- runner.Accept(ctx)
}()

// The runner should not error
select {
case <-time.After(100 * time.Millisecond):
// Good

case <-errCh:
t.Fatal("runner should not return")
}

// Restart
restartCh <- struct{}{}

// Accept should return
select {
case err := <-errCh:
require.NoError(err)

case <-time.After(5 * time.Second):
t.Fatal("accept never returned")
}

// Verify that the job is completed
job, err := client.GetJob(ctx, &pb.GetJobRequest{JobId: jobId})
require.NoError(err)
require.Equal(pb.Job_SUCCESS, job.State)
}

func TestRunnerAccept_closeCancelesAccept(t *testing.T) {
require := require.New(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down
33 changes: 4 additions & 29 deletions internal/runner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,6 @@ func (r *Runner) watchConfig(
log = log.Named("watcher")
defer log.Trace("exiting goroutine")

// If we exit, the job is no longer ready since we can't be sure we're
// processing our config.
defer r.setState(&r.stateJobReady, false)

// Start the app config watcher. This runs in its own goroutine so that
// stuff like dynamic config fetching doesn't block starting things like
// exec sessions.
Expand Down Expand Up @@ -164,15 +160,10 @@ func (r *Runner) watchConfig(
merr = multierror.Append(merr, err)
}

// If we had no errors, then we note that we're ready to process a job.
if merr == nil {
r.setState(&r.stateJobReady, true)
}

// Note that we processed our config at least once. People can
// wait on this state to know that success or fail, one config
// was received.
r.setState(&r.stateConfigOnce, true)
r.incrState(&r.stateConfigOnce)

// If we have an error, then we exit our loop. For runners,
// not being able to set config is a fatal error. We do this so
Expand Down Expand Up @@ -276,19 +267,9 @@ func (r *Runner) recvConfig(

// Keep track of our first receive
first := true
reconnected := false

defer func() {
// Any reason we exit, this client is done so we mark we're done sending, too.
client.CloseSend()

// On exit, we note that we're no longer connected to the config stream.
// We only do this if we didn't reconnect, since reconnection will
// transfer ownership of stateConfig.
if !reconnected {
r.setState(&r.stateConfig, false)
}
}()
// Any reason we exit, this client is done so we mark we're done sending, too.
defer client.CloseSend()

for {
// If the context is closed, exit
Expand All @@ -299,11 +280,6 @@ func (r *Runner) recvConfig(
// Wait for the next configuration
resp, err := client.Recv()
if err != nil {
// Note immediately that we have disconnected regardless of
// error type, since we've failed to receive. If we reconnect,
// it will reset this to true.
r.setState(&r.stateConfig, false)

// EOF means a graceful close, don't reconnect.
if err == io.EOF || clierrors.IsCanceled(err) {
log.Warn("EOF or cancellation received, graceful close of runner config stream")
Expand All @@ -318,7 +294,6 @@ func (r *Runner) recvConfig(

// If we successfully reconnected, then exit this.
if err == nil {
reconnected = true
return
}
}
Expand All @@ -331,7 +306,7 @@ func (r *Runner) recvConfig(
if first {
log.Debug("first config received, switching config state to true")
first = false
r.setState(&r.stateConfig, true)
r.incrState(&r.stateConfig)
}

log.Info("new configuration received")
Expand Down
51 changes: 32 additions & 19 deletions internal/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ type Runner struct {

// protects whether or not the runner is active or not.
runningCond *sync.Cond
shutdown bool
runningJobs int

runningCtx context.Context
Expand All @@ -82,10 +81,10 @@ type Runner struct {
// also verify the context didn't cancel. The stateCond will be broadcasted
// when the root context cancels.
stateCond *sync.Cond
stateConfig bool // config stream is connected
stateConfigOnce bool // true once we process config once, success or error
stateJobReady bool // ready to start accepting jobs
stateExit bool // true when exiting
stateConfig uint // config stream is connected
stateConfigOnce uint // true once we process config once, success or error
stateJobReady uint // ready to start accepting jobs
stateExit uint // true when exiting

// config is the current runner config.
config *pb.RunnerConfig
Expand Down Expand Up @@ -235,7 +234,7 @@ func (r *Runner) Id() string {
// server. This will spawn goroutines for management. This will return after
// registration so this should not be executed in a goroutine.
func (r *Runner) Start(ctx context.Context) error {
if r.shutdown {
if r.readState(&r.stateExit) > 0 {
return ErrClosed
}

Expand Down Expand Up @@ -321,13 +320,13 @@ func (r *Runner) Start(ctx context.Context) error {

// Wait for initial registration
log.Debug("waiting for registration")
if r.waitState(&r.stateConfig, true) {
if r.waitState(&r.stateConfig) {
return status.Errorf(codes.Internal, "early exit while waiting for first config")
}

// Wait for the initial configuration to be set
log.Debug("runner registered, waiting for first config processing")
if r.waitState(&r.stateConfigOnce, true) {
if r.waitState(&r.stateConfigOnce) {
return status.Errorf(codes.Internal, "early exit while waiting for first config processing")
}

Expand All @@ -348,7 +347,8 @@ func (r *Runner) Close() error {
r.runningCond.Wait()
}

r.shutdown = true
// Mark we're exiting
r.incrState(&r.stateExit)

// Cancel the context that is used by Accept to wait on the RunnerJobStream.
// This interrupts the Recv() call so that any goroutine running Accept()
Expand All @@ -364,25 +364,38 @@ func (r *Runner) Close() error {
return nil
}

// waitState waits for the given state boolean to go true. This boolean
// must be a pointer to a state field on runner. This will also return if
// stateExit flips true. The return value notes whether we should exit.
func (r *Runner) waitState(state *bool, v bool) (exit bool) {
// waitState waits for the given state to be set to an initial value.
func (r *Runner) waitState(state *uint) bool {
return r.waitStateGreater(state, 0)
}

// waitStateGreater waits for the given state to increment above the value
// v. This can be used with the fields such as stateConfig to detect
// when a reconnect occurs.
func (r *Runner) waitStateGreater(state *uint, v uint) bool {
r.stateCond.L.Lock()
defer r.stateCond.L.Unlock()
for *state != v && !r.stateExit {
for *state <= v && r.stateExit == 0 {
r.stateCond.Wait()
}

return r.stateExit
return r.stateExit > 0
}

// readState reads the current state value. This can be used with
// waitStateGreater to detect a change in state.
func (r *Runner) readState(state *uint) uint {
r.stateCond.L.Lock()
defer r.stateCond.L.Unlock()
return *state
}

// setState sets the value of a state var on the runner struct and broadcasts
// the condition variable.
func (r *Runner) setState(state *bool, v bool) {
// incrState increments the value of a state variable. The first time
// this is called will also trigger waitState.
func (r *Runner) incrState(state *uint) {
r.stateCond.L.Lock()
defer r.stateCond.L.Unlock()
*state = v
*state += 1
r.stateCond.Broadcast()
}

Expand Down

0 comments on commit 187c00b

Please sign in to comment.