From 675be73f05d34c97fc03e2a3c7263bad446cb370 Mon Sep 17 00:00:00 2001 From: Milan Pavlik Date: Fri, 9 Sep 2022 13:40:29 +0000 Subject: [PATCH] [usage] Refactor controller package into scheduler --- .../ide-metrics/pkg/server/server_test.go | 2 +- components/usage/pkg/scheduler/job.go | 4 ++ components/usage/pkg/scheduler/job_test.go | 12 ++++- components/usage/pkg/scheduler/reporter.go | 2 +- components/usage/pkg/scheduler/scheduler.go | 17 ++++--- .../usage/pkg/scheduler/scheduler_test.go | 47 +++++++++++++++++++ components/usage/pkg/server/server.go | 3 ++ 7 files changed, 77 insertions(+), 10 deletions(-) create mode 100644 components/usage/pkg/scheduler/scheduler_test.go diff --git a/components/ide-metrics/pkg/server/server_test.go b/components/ide-metrics/pkg/server/server_test.go index e5c00a04b755e4..1efefc80683857 100644 --- a/components/ide-metrics/pkg/server/server_test.go +++ b/components/ide-metrics/pkg/server/server_test.go @@ -119,7 +119,7 @@ func Test_allowListCollector_Reconcile(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if got := c.Reconcile(tt.args.labels); !reflect.DeepEqual(got, tt.want) { - t.Errorf("allowListCollector.Run() = %v, want %v", got, tt.want) + t.Errorf("allowListCollector.Reconcile() = %v, want %v", got, tt.want) } }) } diff --git a/components/usage/pkg/scheduler/job.go b/components/usage/pkg/scheduler/job.go index 80783fce6c21d2..85db139470d8b9 100644 --- a/components/usage/pkg/scheduler/job.go +++ b/components/usage/pkg/scheduler/job.go @@ -54,6 +54,10 @@ func (r *LedgerJob) Run() (err error) { // attempt a write to signal we want to run case r.running <- struct{}{}: // we managed to write, there's no other job executing. Cases are not fall through so we continue executing our main logic. + defer func() { + // signal job completed + <-r.running + }() default: // we could not write, so another instance is already running. Skip current run. log.Infof("Skipping ledger run, another run is already in progress.") diff --git a/components/usage/pkg/scheduler/job_test.go b/components/usage/pkg/scheduler/job_test.go index 412f2241d2bb73..eaa44e159207f2 100644 --- a/components/usage/pkg/scheduler/job_test.go +++ b/components/usage/pkg/scheduler/job_test.go @@ -35,6 +35,16 @@ func TestLedgerJob_PreventsConcurrentInvocations(t *testing.T) { require.Equal(t, 1, int(client.ReconcileUsageWithLedgerCallCount)) } +func TestLedgerJob_CanRunRepeatedly(t *testing.T) { + client := &fakeUsageClient{} + job := NewLedgerTrigger(client, nil) + + _ = job.Run() + _ = job.Run() + + require.Equal(t, 2, int(client.ReconcileUsageWithLedgerCallCount)) +} + type fakeUsageClient struct { ReconcileUsageWithLedgerCallCount int32 } @@ -52,7 +62,7 @@ func (c *fakeUsageClient) SetCostCenter(ctx context.Context, in *v1.SetCostCente // Triggers reconciliation of usage with ledger implementation. func (c *fakeUsageClient) ReconcileUsageWithLedger(ctx context.Context, in *v1.ReconcileUsageWithLedgerRequest, opts ...grpc.CallOption) (*v1.ReconcileUsageWithLedgerResponse, error) { atomic.AddInt32(&c.ReconcileUsageWithLedgerCallCount, 1) - time.Sleep(1 * time.Second) + time.Sleep(50 * time.Millisecond) return nil, status.Error(codes.Unauthenticated, "not implemented") } diff --git a/components/usage/pkg/scheduler/reporter.go b/components/usage/pkg/scheduler/reporter.go index 18f7a374a4258d..4096c140520974 100644 --- a/components/usage/pkg/scheduler/reporter.go +++ b/components/usage/pkg/scheduler/reporter.go @@ -19,7 +19,7 @@ var ( jobStartedSeconds = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "scheduler_job_started", + Name: "scheduler_job_started_total", Help: "Number of jobs started", }, []string{"job"}) diff --git a/components/usage/pkg/scheduler/scheduler.go b/components/usage/pkg/scheduler/scheduler.go index 13b3ab3eb65c54..50f9c0bf085e96 100644 --- a/components/usage/pkg/scheduler/scheduler.go +++ b/components/usage/pkg/scheduler/scheduler.go @@ -36,19 +36,21 @@ func (c *Scheduler) Start() { log.Infof("Starting usage scheduler. Setting up %d jobs.", len(c.specs)) for _, job := range c.specs { + // need to re-assign job to avoid pointing to a different job spec once the `cron.FuncJob` executes. + j := job c.cron.Schedule(job.Schedule, cron.FuncJob(func() { c.runningJobs.Add(1) defer c.runningJobs.Done() now := time.Now().UTC() - logger := log.WithField("job_id", job.ID) + logger := log.WithField("job_id", j.ID) - logger.Infof("Starting scheduled job %s", job.ID) - reportJobStarted(job.ID) + logger.Infof("Starting scheduled job %s", j.ID) + reportJobStarted(j.ID) - err := job.Job.Run() + err := j.Job.Run() defer func() { - reportJobCompleted(job.ID, time.Since(now), err) + reportJobCompleted(j.ID, time.Since(now), err) }() if err != nil { logger.WithError(err).Errorf("Scheduled job %s failed.", job.ID) @@ -63,9 +65,10 @@ func (c *Scheduler) Start() { // Stop terminates the Scheduler and awaits for all running jobs to complete. func (c *Scheduler) Stop() { - log.Info("Stopping usage controller.") + log.Info("Stopping scheduler.") + c.cron.Stop() - log.Info("Awaiting existing reconciliation runs to complete..") + log.Info("Awaiting existing jobs to complete.") // Wait for existing jobs to finish c.runningJobs.Wait() diff --git a/components/usage/pkg/scheduler/scheduler_test.go b/components/usage/pkg/scheduler/scheduler_test.go new file mode 100644 index 00000000000000..27a1564ce30d0e --- /dev/null +++ b/components/usage/pkg/scheduler/scheduler_test.go @@ -0,0 +1,47 @@ +// Copyright (c) 2022 Gitpod GmbH. All rights reserved. +// Licensed under the GNU Affero General Public License (AGPL). +// See License-AGPL.txt in the project root for license information. + +package scheduler + +import ( + "github.com/robfig/cron" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +func TestScheduler(t *testing.T) { + firstRan := false + secondRan := false + s := New( + JobSpec{ + Job: JobFunc(func() error { + firstRan = true + return nil + }), + ID: "first", + Schedule: cron.ConstantDelaySchedule{Delay: time.Second}, + }, + JobSpec{ + Job: JobFunc(func() error { + secondRan = true + return nil + }), + ID: "second", + Schedule: cron.ConstantDelaySchedule{Delay: time.Second}, + }, + ) + s.Start() + time.Sleep(1 * time.Second) + s.Stop() + + require.True(t, firstRan) + require.True(t, secondRan) +} + +type JobFunc func() error + +func (f JobFunc) Run() error { + return f() +} diff --git a/components/usage/pkg/server/server.go b/components/usage/pkg/server/server.go index 71af7cb48ee8a7..616be29ae94fe8 100644 --- a/components/usage/pkg/server/server.go +++ b/components/usage/pkg/server/server.go @@ -108,6 +108,9 @@ func Start(cfg Config) error { jobSpec, err := scheduler.NewLedgerTriggerJobSpec(schedule, scheduler.NewLedgerTrigger(v1.NewUsageServiceClient(selfConnection), v1.NewBillingServiceClient(selfConnection)), ) + if err != nil { + return fmt.Errorf("failed to setup ledger trigger job: %w", err) + } sched := scheduler.New(jobSpec) sched.Start()