Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3097 from hashicorp/f-runner-accept-resil
Browse files Browse the repository at this point in the history
internal/runner: make Accept resilient to the server going down
  • Loading branch information
mitchellh authored Mar 15, 2022
2 parents 43723e2 + ee9d016 commit 99de91c
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 61 deletions.
52 changes: 39 additions & 13 deletions internal/runner/accept.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -89,11 +90,7 @@ func (r *Runner) AcceptExact(ctx context.Context, id string) error {
var testRecvDelay time.Duration

func (r *Runner) accept(ctx context.Context, id string) error {
r.runningCond.L.Lock()
shutdown := r.shutdown
r.runningCond.L.Unlock()

if shutdown {
if r.readState(&r.stateExit) > 0 {
return ErrClosed
}

Expand All @@ -111,13 +108,42 @@ func (r *Runner) accept(ctx context.Context, id string) error {
ctx = serverclient.TokenWithContext(ctx, tok)
}

// Open a new job stream. NOTE: we purposely do NOT use ctx above
// since if the context is cancelled we want to continue reporting
// errors.
log.Debug("opening job stream")
client, err := r.client.RunnerJobStream(streamCtx)
if err != nil {
return err
// Open a new job stream. This retries on connection errors. Note that
// this loop doesn't respect the accept timeout because gRPC has no way
// to time out of a "WaitForReady" RPC call (it ignores context cancellation,
// too). TODO: do a manual backoff with WaitForReady(false) so we can
// weave in accept timeout.
retry := false
var client pb.Waypoint_RunnerJobStreamClient
for {
// Get our configuration state value. We use this so that we can detect
// when we've reconnected during failures.
stateGen := r.readState(&r.stateConfig)

// NOTE: we purposely do NOT use ctx above since if the context is
// cancelled we want to continue reporting errors.
log.Debug("opening job stream", "retry", retry)
var err error
client, err = r.client.RunnerJobStream(streamCtx, grpc.WaitForReady(retry))
if err != nil {
if status.Code(err) == codes.Unavailable || status.Code(err) == codes.NotFound {
log.Warn("server down during job stream open, will attempt reconnect")

// Since this is a disconnect, we have to wait for our
// RunnerConfig stream to re-establish. We wait for the config
// generation to increment.
if r.waitStateGreater(&r.stateConfig, stateGen) {
return status.Error(codes.Internal, "early exit while waiting for reconnect")
}

retry = true
continue
}

return err
}

break
}
defer client.CloseSend()

Expand Down Expand Up @@ -193,7 +219,7 @@ func (r *Runner) accept(ctx context.Context, id string) error {
// and return.

r.runningCond.L.Lock()
shutdown = r.shutdown
shutdown := r.readState(&r.stateExit) > 0
if !shutdown {
r.runningJobs++
}
Expand Down
72 changes: 72 additions & 0 deletions internal/runner/accept_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,78 @@ func TestRunnerAccept_timeout(t *testing.T) {
require.True(errors.Is(err, ErrTimeout))
}

// Test how accept behaves when the server is down to begin with.
func TestRunnerAccept_serverDown(t *testing.T) {
require := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Setup the server
restartCh := make(chan struct{})
impl := singleprocess.TestImpl(t)
client := serverpkg.TestServer(t, impl,
serverpkg.TestWithContext(ctx),
serverpkg.TestWithRestart(restartCh),
)

// Setup our runner
runner := TestRunner(t, WithClient(client))
require.NoError(runner.Start(ctx))

// Initialize our app
singleprocess.TestApp(t, client, serverptypes.TestJobNew(t, nil).Application)

// Queue a job
queueResp, err := client.QueueJob(ctx, &pb.QueueJobRequest{
Job: serverptypes.TestJobNew(t, nil),
})
require.NoError(err)
jobId := queueResp.JobId

// Shut it down
cancel()
ctx, cancel = context.WithCancel(context.Background())
defer cancel()

// Wait to get an unavailable error so we know the server is down
require.Eventually(func() bool {
_, err := client.GetRunner(ctx, &pb.GetRunnerRequest{RunnerId: "A"})
return status.Code(err) == codes.Unavailable
}, 5*time.Second, 10*time.Millisecond)

// Start accept
errCh := make(chan error, 1)
go func() {
errCh <- runner.Accept(ctx)
}()

// The runner should not error
select {
case <-time.After(100 * time.Millisecond):
// Good

case <-errCh:
t.Fatal("runner should not return")
}

// Restart
restartCh <- struct{}{}

// Accept should return
select {
case err := <-errCh:
require.NoError(err)

case <-time.After(5 * time.Second):
t.Fatal("accept never returned")
}

// Verify that the job is completed
job, err := client.GetJob(ctx, &pb.GetJobRequest{JobId: jobId})
require.NoError(err)
require.Equal(pb.Job_SUCCESS, job.State)
}

func TestRunnerAccept_closeCancelesAccept(t *testing.T) {
require := require.New(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down
33 changes: 4 additions & 29 deletions internal/runner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,6 @@ func (r *Runner) watchConfig(
log = log.Named("watcher")
defer log.Trace("exiting goroutine")

// If we exit, the job is no longer ready since we can't be sure we're
// processing our config.
defer r.setState(&r.stateJobReady, false)

// Start the app config watcher. This runs in its own goroutine so that
// stuff like dynamic config fetching doesn't block starting things like
// exec sessions.
Expand Down Expand Up @@ -164,15 +160,10 @@ func (r *Runner) watchConfig(
merr = multierror.Append(merr, err)
}

// If we had no errors, then we note that we're ready to process a job.
if merr == nil {
r.setState(&r.stateJobReady, true)
}

// Note that we processed our config at least once. People can
// wait on this state to know that success or fail, one config
// was received.
r.setState(&r.stateConfigOnce, true)
r.incrState(&r.stateConfigOnce)

// If we have an error, then we exit our loop. For runners,
// not being able to set config is a fatal error. We do this so
Expand Down Expand Up @@ -276,19 +267,9 @@ func (r *Runner) recvConfig(

// Keep track of our first receive
first := true
reconnected := false

defer func() {
// Any reason we exit, this client is done so we mark we're done sending, too.
client.CloseSend()

// On exit, we note that we're no longer connected to the config stream.
// We only do this if we didn't reconnect, since reconnection will
// transfer ownership of stateConfig.
if !reconnected {
r.setState(&r.stateConfig, false)
}
}()
// Any reason we exit, this client is done so we mark we're done sending, too.
defer client.CloseSend()

for {
// If the context is closed, exit
Expand All @@ -299,11 +280,6 @@ func (r *Runner) recvConfig(
// Wait for the next configuration
resp, err := client.Recv()
if err != nil {
// Note immediately that we have disconnected regardless of
// error type, since we've failed to receive. If we reconnect,
// it will reset this to true.
r.setState(&r.stateConfig, false)

// EOF means a graceful close, don't reconnect.
if err == io.EOF || clierrors.IsCanceled(err) {
log.Warn("EOF or cancellation received, graceful close of runner config stream")
Expand All @@ -318,7 +294,6 @@ func (r *Runner) recvConfig(

// If we successfully reconnected, then exit this.
if err == nil {
reconnected = true
return
}
}
Expand All @@ -331,7 +306,7 @@ func (r *Runner) recvConfig(
if first {
log.Debug("first config received, switching config state to true")
first = false
r.setState(&r.stateConfig, true)
r.incrState(&r.stateConfig)
}

log.Info("new configuration received")
Expand Down
57 changes: 38 additions & 19 deletions internal/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ type Runner struct {

// protects whether or not the runner is active or not.
runningCond *sync.Cond
shutdown bool
runningJobs int

runningCtx context.Context
Expand All @@ -82,10 +81,9 @@ type Runner struct {
// also verify the context didn't cancel. The stateCond will be broadcasted
// when the root context cancels.
stateCond *sync.Cond
stateConfig bool // config stream is connected
stateConfigOnce bool // true once we process config once, success or error
stateJobReady bool // ready to start accepting jobs
stateExit bool // true when exiting
stateConfig uint64 // config stream is connected, increments for each reconnect
stateConfigOnce uint64 // >0 once we process config once, success or error
stateExit uint64 // >0 when exiting

// config is the current runner config.
config *pb.RunnerConfig
Expand Down Expand Up @@ -235,7 +233,7 @@ func (r *Runner) Id() string {
// server. This will spawn goroutines for management. This will return after
// registration so this should not be executed in a goroutine.
func (r *Runner) Start(ctx context.Context) error {
if r.shutdown {
if r.readState(&r.stateExit) > 0 {
return ErrClosed
}

Expand Down Expand Up @@ -321,13 +319,13 @@ func (r *Runner) Start(ctx context.Context) error {

// Wait for initial registration
log.Debug("waiting for registration")
if r.waitState(&r.stateConfig, true) {
if r.waitState(&r.stateConfig) {
return status.Errorf(codes.Internal, "early exit while waiting for first config")
}

// Wait for the initial configuration to be set
log.Debug("runner registered, waiting for first config processing")
if r.waitState(&r.stateConfigOnce, true) {
if r.waitState(&r.stateConfigOnce) {
return status.Errorf(codes.Internal, "early exit while waiting for first config processing")
}

Expand All @@ -348,7 +346,8 @@ func (r *Runner) Close() error {
r.runningCond.Wait()
}

r.shutdown = true
// Mark we're exiting
r.incrState(&r.stateExit)

// Cancel the context that is used by Accept to wait on the RunnerJobStream.
// This interrupts the Recv() call so that any goroutine running Accept()
Expand All @@ -364,25 +363,45 @@ func (r *Runner) Close() error {
return nil
}

// waitState waits for the given state boolean to go true. This boolean
// must be a pointer to a state field on runner. This will also return if
// stateExit flips true. The return value notes whether we should exit.
func (r *Runner) waitState(state *bool, v bool) (exit bool) {
// waitState waits for the given state to be set to an initial value.
func (r *Runner) waitState(state *uint64) bool {
return r.waitStateGreater(state, 0)
}

// waitStateGreater waits for the given state to increment above the value
// v. This can be used with the fields such as stateConfig to detect
// when a reconnect occurs.
func (r *Runner) waitStateGreater(state *uint64, v uint64) bool {
r.stateCond.L.Lock()
defer r.stateCond.L.Unlock()
for *state != v && !r.stateExit {
for *state <= v && r.stateExit == 0 {
r.stateCond.Wait()
}

return r.stateExit
return r.stateExit > 0
}

// readState reads the current state value. This can be used with
// waitStateGreater to detect a change in state.
func (r *Runner) readState(state *uint64) uint64 {
r.stateCond.L.Lock()
defer r.stateCond.L.Unlock()

// note: we don't use sync/atomic because the writer can't use
// sync/atomic (see incrState)
return *state
}

// setState sets the value of a state var on the runner struct and broadcasts
// the condition variable.
func (r *Runner) setState(state *bool, v bool) {
// incrState increments the value of a state variable. The first time
// this is called will also trigger waitState.
func (r *Runner) incrState(state *uint64) {
r.stateCond.L.Lock()
defer r.stateCond.L.Unlock()
*state = v

// Note: we don't use sync/atomic because we want to pair the increment
// with the condition variable broadcast. The broadcast requires a lock
// anyways so there is no need to bring in atomic ops.
*state += 1
r.stateCond.Broadcast()
}

Expand Down

0 comments on commit 99de91c

Please sign in to comment.