From b6b39a8253ec187a4a561c1706f4330c0d0596de Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Fri, 29 Apr 2022 12:16:00 -0700 Subject: [PATCH 1/5] runner: static runners accept multiple jobs in parallel This modifies `internal/runner` to support accepting multiple jobs in parallel. Not much work here since we always designed the runner struct from the beginning to support this so there are no data races. This modifies `internal/cli` so that runners in non-ODR run in parallel mode by default. ODR doesn't make sense to have any parallelism since they always run exactly one job. Non-ODR runners typically ONLY launch ODR tasks, which are highly IO-bound, so we default to a multiple above CPU count for concurrency. This is a necessary pre-requisite for pipelines since they'll likely perform blocking jobs on the static runners to "watch" tasks. Today, tasks are launched and stopped, but not watched so this is not an issue. --- internal/cli/runner_agent.go | 31 +++++++++++- internal/runner/accept.go | 24 ++++++++++ internal/runner/accept_test.go | 86 ++++++++++++++++++++++++++++++++++ 3 files changed, 140 insertions(+), 1 deletion(-) diff --git a/internal/cli/runner_agent.go b/internal/cli/runner_agent.go index 50b0c2a426a..d68e9deab5a 100644 --- a/internal/cli/runner_agent.go +++ b/internal/cli/runner_agent.go @@ -5,6 +5,7 @@ import ( "errors" "io/ioutil" "net" + "runtime" "time" "github.com/hashicorp/go-hclog" @@ -53,6 +54,9 @@ type RunnerAgentCommand struct { // Labels for the runner. flagLabels map[string]string + + // The amount of concurrent jobs that can be running. + flagConcurrency int } // This is how long a runner in ODR mode will wait for its job assignment before @@ -76,6 +80,12 @@ func (c *RunnerAgentCommand) Run(args []string) int { plugin.InsideODR = c.flagODR + // Flag defaults + if c.flagConcurrency < 1 { + log.Warn("concurrency flag less than 1 has no effect, using 1") + c.flagConcurrency = 1 + } + // Connect to the server log.Info("sourcing credentials and connecting to the Waypoint server") conn, err := serverclient.Connect(ctx, @@ -227,9 +237,16 @@ func (c *RunnerAgentCommand) Run(args []string) int { go func() { defer cancel() + // In non-ODR mode, we accept many jobs in parallel. + if !c.flagODR { + runner.AcceptParallel(ctx, c.flagConcurrency) + return + } + + // In ODR mode, we accept a single job. for { err := runner.Accept(ctx) - if err == nil && c.flagODR { + if err == nil { log.Debug("handled our one job in ODR mode, exiting") return } @@ -344,6 +361,18 @@ func (c *RunnerAgentCommand) Flags() *flag.Sets { Target: &c.flagLabels, Usage: "Labels to set for this runner in 'k=v' format. Can be specified multiple times.", }) + + f.IntVar(&flag.IntVar{ + Name: "concurrency", + Target: &c.flagConcurrency, + Usage: "The number of concurrent jobs that can be running at one time. " + + "This has no effect if `-odr` is set. A value of less than 1 will " + + "default to 1.", + + // Most jobs that a non-ODR runner runs are IO bound, so we use + // just a heuristic here of allowing some multiple above the CPUs. + Default: runtime.NumCPU() * 3, + }) }) } diff --git a/internal/runner/accept.go b/internal/runner/accept.go index ab47a91fe43..28eb292452f 100644 --- a/internal/runner/accept.go +++ b/internal/runner/accept.go @@ -22,6 +22,30 @@ import ( var heartbeatDuration = 5 * time.Second +// AcceptParallel allows up to count jobs to be accepted and executing +// concurrently. +func (r *Runner) AcceptParallel(ctx context.Context, count int) { + // Create a new cancellable context so we can stop all the goroutines + // when one exits. We do this because if one exits, its likely that the + // unrecoverable error exists in all. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Start up all the goroutines + var wg sync.WaitGroup + wg.Add(count) + for i := 0; i < count; i++ { + go func() { + defer cancel() + defer wg.Done() + r.AcceptMany(ctx) + }() + } + + // Wait for them to exit + wg.Wait() +} + // AcceptMany will accept jobs and execute them on after another as they are accepted. // This is meant to be run in a goroutine and reports its own errors via r's logger. func (r *Runner) AcceptMany(ctx context.Context) { diff --git a/internal/runner/accept_test.go b/internal/runner/accept_test.go index 60b27730841..14634ab1a23 100644 --- a/internal/runner/accept_test.go +++ b/internal/runner/accept_test.go @@ -821,6 +821,92 @@ func TestRunnerAccept_jobHcl(t *testing.T) { require.Equal(pb.Job_Config_JOB, job.Config.Source) } +func TestRunnerAcceptParallel(t *testing.T) { + require := require.New(t) + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + + // Setup our runner + client := singleprocess.TestServer(t) + runner := TestRunner(t, WithClient(client)) + require.NoError(runner.Start(ctx)) + + // Block our noop jobs so we can inspect their state + noopCh := make(chan struct{}) + runner.noopCh = noopCh + + // Initialize our app + singleprocess.TestApp(t, client, serverptypes.TestJobNew(t, nil).Application) + + // Queue jobs + queueResp, err := client.QueueJob(ctx, &pb.QueueJobRequest{ + Job: serverptypes.TestJobNew(t, &pb.Job{ + Workspace: &pb.Ref_Workspace{Workspace: "w1"}, + }), + }) + require.NoError(err) + jobId_1 := queueResp.JobId + + queueResp, err = client.QueueJob(ctx, &pb.QueueJobRequest{ + Job: serverptypes.TestJobNew(t, &pb.Job{ + Workspace: &pb.Ref_Workspace{Workspace: "w2"}, + }), + }) + require.NoError(err) + jobId_2 := queueResp.JobId + + // Accept should complete + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + runner.AcceptParallel(ctx, 2) + }() + + // Both jobs should be running at once eventually + require.Eventually(func() bool { + job, err := client.GetJob(ctx, &pb.GetJobRequest{JobId: jobId_1}) + require.NoError(err) + if job.State != pb.Job_RUNNING { + return false + } + + job, err = client.GetJob(ctx, &pb.GetJobRequest{JobId: jobId_2}) + require.NoError(err) + if job.State != pb.Job_RUNNING { + return false + } + + return true + }, 3*time.Second, 10*time.Millisecond) + + // Jobs should complete + close(noopCh) + require.Eventually(func() bool { + job, err := client.GetJob(ctx, &pb.GetJobRequest{JobId: jobId_1}) + require.NoError(err) + if job.State != pb.Job_SUCCESS { + return false + } + + job, err = client.GetJob(ctx, &pb.GetJobRequest{JobId: jobId_2}) + require.NoError(err) + if job.State != pb.Job_SUCCESS { + return false + } + + return true + }, 3*time.Second, 10*time.Millisecond) + + // Loop should exit + cancel() + select { + case <-time.After(2 * time.Second): + t.Fatal("accept should exit") + + default: + } +} + // testGitFixture MUST be called before TestRunner since TestRunner // changes our working directory. func testGitFixture(t *testing.T, n string) string { From b543b0ef2fab3875003b34e54f2e41fafb9f5350 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Fri, 29 Apr 2022 12:20:04 -0700 Subject: [PATCH 2/5] changelog --- .changelog/3300.txt | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changelog/3300.txt diff --git a/.changelog/3300.txt b/.changelog/3300.txt new file mode 100644 index 00000000000..a664534c743 --- /dev/null +++ b/.changelog/3300.txt @@ -0,0 +1,5 @@ +```release-note:improvement +runner: runners will now accept and execute multiple jobs concurrently +if multiple jobs are available. On-demand runners continue to execute exactly +one job since they are purpose launched for single job execution. +``` From 51efbbf03bebea4da60b1ecd07f3b235ab66949a Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Fri, 29 Apr 2022 12:23:40 -0700 Subject: [PATCH 3/5] lint --- internal/runner/accept_test.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/internal/runner/accept_test.go b/internal/runner/accept_test.go index 14634ab1a23..aec15a969ff 100644 --- a/internal/runner/accept_test.go +++ b/internal/runner/accept_test.go @@ -872,11 +872,7 @@ func TestRunnerAcceptParallel(t *testing.T) { job, err = client.GetJob(ctx, &pb.GetJobRequest{JobId: jobId_2}) require.NoError(err) - if job.State != pb.Job_RUNNING { - return false - } - - return true + return job.State == pb.Job_RUNNING }, 3*time.Second, 10*time.Millisecond) // Jobs should complete @@ -890,11 +886,7 @@ func TestRunnerAcceptParallel(t *testing.T) { job, err = client.GetJob(ctx, &pb.GetJobRequest{JobId: jobId_2}) require.NoError(err) - if job.State != pb.Job_SUCCESS { - return false - } - - return true + return job.State == pb.Job_SUCCESS }, 3*time.Second, 10*time.Millisecond) // Loop should exit From 62bcd9fc7021b11866c4ccfb3582b13c5b313d8a Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Fri, 29 Apr 2022 12:26:09 -0700 Subject: [PATCH 4/5] internal/runner: log how many jobs we will accept concurrently --- internal/runner/accept.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/runner/accept.go b/internal/runner/accept.go index 28eb292452f..72b7d38bfb3 100644 --- a/internal/runner/accept.go +++ b/internal/runner/accept.go @@ -32,6 +32,7 @@ func (r *Runner) AcceptParallel(ctx context.Context, count int) { defer cancel() // Start up all the goroutines + r.logger.Info("accepting jobs concurrently", "count", count) var wg sync.WaitGroup wg.Add(count) for i := 0; i < count; i++ { From 70724c9b16392f42eb84fdcaf2595cc69794f249 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Fri, 29 Apr 2022 12:46:44 -0700 Subject: [PATCH 5/5] website: regen for new concurrency flag --- website/content/commands/runner-agent.mdx | 1 + 1 file changed, 1 insertion(+) diff --git a/website/content/commands/runner-agent.mdx b/website/content/commands/runner-agent.mdx index 9f02ea5243a..689ccceaf80 100644 --- a/website/content/commands/runner-agent.mdx +++ b/website/content/commands/runner-agent.mdx @@ -54,5 +54,6 @@ not generally recommended. - `-cookie=` - The cookie value of the server to validate API requests. This is required for runner adoption. If you do not already have a runner token, this must be set. - `-state-dir=` - Directory to store state between restarts. This is optional. If this is set, then a runner can restart without re-triggering the adoption process. - `-label=` - Labels to set for this runner in 'k=v' format. Can be specified multiple times. +- `-concurrency=` - The number of concurrent jobs that can be running at one time. This has no effect if `-odr` is set. A value of less than 1 will default to 1. @include "commands/runner-agent_more.mdx"