Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

runner: static runners accept multiple jobs in parallel #3300

Merged
merged 5 commits into from
May 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changelog/3300.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
```release-note:improvement
runner: runners will now accept and execute multiple jobs concurrently
if multiple jobs are available. On-demand runners continue to execute exactly
one job since they are purpose launched for single job execution.
```
31 changes: 30 additions & 1 deletion internal/cli/runner_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"io/ioutil"
"net"
"runtime"
"time"

"github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -53,6 +54,9 @@ type RunnerAgentCommand struct {

// Labels for the runner.
flagLabels map[string]string

// The amount of concurrent jobs that can be running.
flagConcurrency int
}

// This is how long a runner in ODR mode will wait for its job assignment before
Expand All @@ -76,6 +80,12 @@ func (c *RunnerAgentCommand) Run(args []string) int {

plugin.InsideODR = c.flagODR

// Flag defaults
if c.flagConcurrency < 1 {
log.Warn("concurrency flag less than 1 has no effect, using 1")
c.flagConcurrency = 1
}

// Connect to the server
log.Info("sourcing credentials and connecting to the Waypoint server")
conn, err := serverclient.Connect(ctx,
Expand Down Expand Up @@ -227,9 +237,16 @@ func (c *RunnerAgentCommand) Run(args []string) int {
go func() {
defer cancel()

// In non-ODR mode, we accept many jobs in parallel.
if !c.flagODR {
runner.AcceptParallel(ctx, c.flagConcurrency)
return
}

// In ODR mode, we accept a single job.
for {
err := runner.Accept(ctx)
if err == nil && c.flagODR {
if err == nil {
log.Debug("handled our one job in ODR mode, exiting")
return
}
Expand Down Expand Up @@ -344,6 +361,18 @@ func (c *RunnerAgentCommand) Flags() *flag.Sets {
Target: &c.flagLabels,
Usage: "Labels to set for this runner in 'k=v' format. Can be specified multiple times.",
})

f.IntVar(&flag.IntVar{
Name: "concurrency",
Target: &c.flagConcurrency,
Usage: "The number of concurrent jobs that can be running at one time. " +
"This has no effect if `-odr` is set. A value of less than 1 will " +
"default to 1.",

// Most jobs that a non-ODR runner runs are IO bound, so we use
// just a heuristic here of allowing some multiple above the CPUs.
Default: runtime.NumCPU() * 3,
})
})
}

Expand Down
25 changes: 25 additions & 0 deletions internal/runner/accept.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,31 @@ import (

var heartbeatDuration = 5 * time.Second

// AcceptParallel allows up to count jobs to be accepted and executing
// concurrently.
func (r *Runner) AcceptParallel(ctx context.Context, count int) {
// Create a new cancellable context so we can stop all the goroutines
// when one exits. We do this because if one exits, its likely that the
// unrecoverable error exists in all.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Start up all the goroutines
r.logger.Info("accepting jobs concurrently", "count", count)
var wg sync.WaitGroup
wg.Add(count)
for i := 0; i < count; i++ {
go func() {
defer cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just saying this out loud, should the exit of ANY of the goroutines cause them all to exit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So actually, yes I think this needs to do slightly better, although its edge case-y. Here is the thought:

  1. If the user cancels the context, then this is effectively a no-op and all of the goroutines are canceled anyways. No issue.

  2. If a goroutine has an error, its likely the error will impact all, because there is considerable retry logic already in each Accept call -- including reconnection -- so if it actually errors it is likely unrecoverable. So we DO want to exit all the goroutines.

However, for #2, right now we're canceling the context which just causes a cascade effect to cancel ASAP. I think we can do better by just letting each existing job try to finish gracefully, and then say "don't accept any more jobs thereafter."

I'll work on this Monday.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think this is okay for now. Its a bit non-trivial to get this fix in and looking at the possible reasons for a return from AcceptMany, i do think things are really broken if they exit so cancelling all is okay for now. We can improve this later. I've added a TODO to note it.

defer wg.Done()
r.AcceptMany(ctx)
}()
}

// Wait for them to exit
wg.Wait()
}

// AcceptMany will accept jobs and execute them on after another as they are accepted.
// This is meant to be run in a goroutine and reports its own errors via r's logger.
func (r *Runner) AcceptMany(ctx context.Context) {
Expand Down
78 changes: 78 additions & 0 deletions internal/runner/accept_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,84 @@ func TestRunnerAccept_jobHcl(t *testing.T) {
require.Equal(pb.Job_Config_JOB, job.Config.Source)
}

func TestRunnerAcceptParallel(t *testing.T) {
require := require.New(t)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)

// Setup our runner
client := singleprocess.TestServer(t)
runner := TestRunner(t, WithClient(client))
require.NoError(runner.Start(ctx))

// Block our noop jobs so we can inspect their state
noopCh := make(chan struct{})
runner.noopCh = noopCh

// Initialize our app
singleprocess.TestApp(t, client, serverptypes.TestJobNew(t, nil).Application)

// Queue jobs
queueResp, err := client.QueueJob(ctx, &pb.QueueJobRequest{
Job: serverptypes.TestJobNew(t, &pb.Job{
Workspace: &pb.Ref_Workspace{Workspace: "w1"},
}),
})
require.NoError(err)
jobId_1 := queueResp.JobId

queueResp, err = client.QueueJob(ctx, &pb.QueueJobRequest{
Job: serverptypes.TestJobNew(t, &pb.Job{
Workspace: &pb.Ref_Workspace{Workspace: "w2"},
}),
})
require.NoError(err)
jobId_2 := queueResp.JobId

// Accept should complete
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
runner.AcceptParallel(ctx, 2)
}()

// Both jobs should be running at once eventually
require.Eventually(func() bool {
job, err := client.GetJob(ctx, &pb.GetJobRequest{JobId: jobId_1})
require.NoError(err)
if job.State != pb.Job_RUNNING {
return false
}

job, err = client.GetJob(ctx, &pb.GetJobRequest{JobId: jobId_2})
require.NoError(err)
return job.State == pb.Job_RUNNING
}, 3*time.Second, 10*time.Millisecond)

// Jobs should complete
close(noopCh)
require.Eventually(func() bool {
job, err := client.GetJob(ctx, &pb.GetJobRequest{JobId: jobId_1})
require.NoError(err)
if job.State != pb.Job_SUCCESS {
return false
}

job, err = client.GetJob(ctx, &pb.GetJobRequest{JobId: jobId_2})
require.NoError(err)
return job.State == pb.Job_SUCCESS
}, 3*time.Second, 10*time.Millisecond)

// Loop should exit
cancel()
select {
case <-time.After(2 * time.Second):
t.Fatal("accept should exit")

default:
}
}

// testGitFixture MUST be called before TestRunner since TestRunner
// changes our working directory.
func testGitFixture(t *testing.T, n string) string {
Expand Down
1 change: 1 addition & 0 deletions website/content/commands/runner-agent.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,6 @@ not generally recommended.
- `-cookie=<string>` - The cookie value of the server to validate API requests. This is required for runner adoption. If you do not already have a runner token, this must be set.
- `-state-dir=<string>` - Directory to store state between restarts. This is optional. If this is set, then a runner can restart without re-triggering the adoption process.
- `-label=<key=value>` - Labels to set for this runner in 'k=v' format. Can be specified multiple times.
- `-concurrency=<int>` - The number of concurrent jobs that can be running at one time. This has no effect if `-odr` is set. A value of less than 1 will default to 1.

@include "commands/runner-agent_more.mdx"