From c99cf66f6c64269a38a64ff6bd660c6406aabf41 Mon Sep 17 00:00:00 2001 From: Milan Pavlik Date: Fri, 9 Sep 2022 12:37:41 +0000 Subject: [PATCH] # This is a combination of 2 commits. # This is the 1st commit message: [usage] Refactor controller package into scheduler # This is the commit message #2: [usage] Refactor controller package into scheduler --- .../ide-metrics/pkg/server/server_test.go | 2 +- components/usage/pkg/controller/controller.go | 84 ------------------- .../usage/pkg/controller/controller_test.go | 55 ------------ .../reconciler.go => scheduler/job.go} | 49 +++++++---- components/usage/pkg/scheduler/job_test.go | 63 ++++++++++++++ .../pkg/{controller => scheduler}/reporter.go | 30 +++---- components/usage/pkg/scheduler/scheduler.go | 73 ++++++++++++++++ components/usage/pkg/server/server.go | 20 ++--- 8 files changed, 193 insertions(+), 183 deletions(-) delete mode 100644 components/usage/pkg/controller/controller.go delete mode 100644 components/usage/pkg/controller/controller_test.go rename components/usage/pkg/{controller/reconciler.go => scheduler/job.go} (53%) create mode 100644 components/usage/pkg/scheduler/job_test.go rename components/usage/pkg/{controller => scheduler}/reporter.go (53%) create mode 100644 components/usage/pkg/scheduler/scheduler.go diff --git a/components/ide-metrics/pkg/server/server_test.go b/components/ide-metrics/pkg/server/server_test.go index 1efefc80683857..e5c00a04b755e4 100644 --- a/components/ide-metrics/pkg/server/server_test.go +++ b/components/ide-metrics/pkg/server/server_test.go @@ -119,7 +119,7 @@ func Test_allowListCollector_Reconcile(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if got := c.Reconcile(tt.args.labels); !reflect.DeepEqual(got, tt.want) { - t.Errorf("allowListCollector.Reconcile() = %v, want %v", got, tt.want) + t.Errorf("allowListCollector.Run() = %v, want %v", got, tt.want) } }) } diff --git a/components/usage/pkg/controller/controller.go b/components/usage/pkg/controller/controller.go deleted file mode 100644 index 6ecbcffba41dcb..00000000000000 --- a/components/usage/pkg/controller/controller.go +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright (c) 2022 Gitpod GmbH. All rights reserved. -// Licensed under the GNU Affero General Public License (AGPL). -// See License-AGPL.txt in the project root for license information. - -package controller - -import ( - "fmt" - "github.com/gitpod-io/gitpod/common-go/log" - "github.com/robfig/cron" - "sync" - "time" -) - -func New(schedule time.Duration, reconciler Reconciler) (*Controller, error) { - return &Controller{ - schedule: schedule, - reconciler: reconciler, - scheduler: cron.NewWithLocation(time.UTC), - }, nil -} - -type Controller struct { - schedule time.Duration - reconciler Reconciler - - scheduler *cron.Cron - - jobs chan struct{} - runningJobs sync.WaitGroup -} - -func (c *Controller) Start() error { - log.Info("Starting usage controller.") - // Using channel of size 1 ensures we don't queue up overly many runs when there is already 1 queued up. - c.jobs = make(chan struct{}, 1) - - go func() { - // Here, we guarantee we're only ever executing 1 job at a time - in other words we always wait for the previous job to finish. - for range c.jobs { - c.runningJobs.Add(1) - defer c.runningJobs.Done() - - err := c.reconciler.Reconcile() - if err != nil { - log.WithError(err).Errorf("Reconciliation run failed.") - } else { - log.Info("Completed usage reconciliation run without errors.") - } - } - }() - - err := c.scheduler.AddFunc(fmt.Sprintf("@every %s", c.schedule.String()), cron.FuncJob(func() { - log.Info("Starting usage reconciliation.") - - select { - case c.jobs <- struct{}{}: - log.Info("Triggered next reconciliation.") - default: - log.Info("Previous reconciliation loop is still running, skipping.") - } - })) - if err != nil { - return fmt.Errorf("failed to add function to scheduler: %w", err) - } - - c.scheduler.Start() - - return nil -} - -// Stop terminates the Controller and awaits for all running jobs to complete. -func (c *Controller) Stop() { - log.Info("Stopping usage controller.") - // Stop any new jobs from running - c.scheduler.Stop() - - close(c.jobs) - - log.Info("Awaiting existing reconciliation runs to complete..") - // Wait for existing jobs to finish - c.runningJobs.Wait() - -} diff --git a/components/usage/pkg/controller/controller_test.go b/components/usage/pkg/controller/controller_test.go deleted file mode 100644 index 12026aeaa92130..00000000000000 --- a/components/usage/pkg/controller/controller_test.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright (c) 2022 Gitpod GmbH. All rights reserved. -// Licensed under the GNU Affero General Public License (AGPL). -// See License-AGPL.txt in the project root for license information. - -package controller - -import ( - "github.com/stretchr/testify/require" - "sync/atomic" - "testing" - "time" -) - -func TestController(t *testing.T) { - schedule := time.Second - triggered := false - - ctrl, err := New(schedule, ReconcilerFunc(func() error { - triggered = true - return nil - })) - require.NoError(t, err) - - require.NoError(t, ctrl.Start()) - time.Sleep(schedule + 20*time.Millisecond) - require.True(t, triggered, "must trigger reconciler function") - ctrl.Stop() -} - -func TestController_PreventsConcurrentRunsOfReconcilerFunc(t *testing.T) { - schedule := 1 * time.Second - count := int32(0) - - ctrl, err := New(schedule, ReconcilerFunc(func() error { - atomic.AddInt32(&count, 1) - time.Sleep(3 * time.Second) - return nil - })) - require.NoError(t, err) - - require.NoError(t, ctrl.Start()) - time.Sleep(schedule + 2*time.Second) - require.Equal(t, int32(1), count, "must trigger reconciler function exactly once") - ctrl.Stop() -} - -func TestController_GracefullyHandlesPanic(t *testing.T) { - ctrl, err := New(20*time.Millisecond, ReconcilerFunc(func() error { - panic("pls help") - })) - require.NoError(t, err) - - require.NoError(t, ctrl.Start()) - ctrl.Stop() -} diff --git a/components/usage/pkg/controller/reconciler.go b/components/usage/pkg/scheduler/job.go similarity index 53% rename from components/usage/pkg/controller/reconciler.go rename to components/usage/pkg/scheduler/job.go index df9e5a5593b06b..80783fce6c21d2 100644 --- a/components/usage/pkg/controller/reconciler.go +++ b/components/usage/pkg/scheduler/job.go @@ -2,55 +2,72 @@ // Licensed under the GNU Affero General Public License (AGPL). // See License-AGPL.txt in the project root for license information. -package controller +package scheduler import ( "context" "fmt" "github.com/gitpod-io/gitpod/common-go/log" v1 "github.com/gitpod-io/gitpod/usage-api/v1" + "github.com/robfig/cron" "google.golang.org/protobuf/types/known/timestamppb" "time" ) -type Reconciler interface { - Reconcile() error +type Job interface { + Run() error } -type ReconcilerFunc func() error +func NewLedgerTriggerJobSpec(schedule time.Duration, job Job) (JobSpec, error) { + parsed, err := cron.Parse(fmt.Sprintf("@every %s", schedule.String())) + if err != nil { + return JobSpec{}, fmt.Errorf("failed to parse ledger job schedule: %w", err) + } -func (f ReconcilerFunc) Reconcile() error { - return f() + return JobSpec{ + Job: job, + ID: "ledger", + Schedule: parsed, + }, nil } -func NewLedgerReconciler(usageClient v1.UsageServiceClient, billingClient v1.BillingServiceClient) *LedgerReconciler { - return &LedgerReconciler{ +func NewLedgerTrigger(usageClient v1.UsageServiceClient, billingClient v1.BillingServiceClient) *LedgerJob { + return &LedgerJob{ usageClient: usageClient, billingClient: billingClient, + + running: make(chan struct{}, 1), } } -type LedgerReconciler struct { +type LedgerJob struct { usageClient v1.UsageServiceClient billingClient v1.BillingServiceClient + + running chan struct{} } -func (r *LedgerReconciler) Reconcile() (err error) { +func (r *LedgerJob) Run() (err error) { ctx := context.Background() + select { + // attempt a write to signal we want to run + case r.running <- struct{}{}: + // we managed to write, there's no other job executing. Cases are not fall through so we continue executing our main logic. + default: + // we could not write, so another instance is already running. Skip current run. + log.Infof("Skipping ledger run, another run is already in progress.") + return nil + } + now := time.Now().UTC() hourAgo := now.Add(-1 * time.Hour) - reportUsageReconcileStarted() - defer func() { - reportUsageReconcileFinished(time.Since(now), err) - }() - logger := log. WithField("from", hourAgo). WithField("to", now) - logger.Info("Starting ledger reconciliation.") + logger.Info("Running ledger job. Reconciling usage records.") _, err = r.usageClient.ReconcileUsageWithLedger(ctx, &v1.ReconcileUsageWithLedgerRequest{ From: timestamppb.New(hourAgo), To: timestamppb.New(now), diff --git a/components/usage/pkg/scheduler/job_test.go b/components/usage/pkg/scheduler/job_test.go new file mode 100644 index 00000000000000..412f2241d2bb73 --- /dev/null +++ b/components/usage/pkg/scheduler/job_test.go @@ -0,0 +1,63 @@ +// Copyright (c) 2022 Gitpod GmbH. All rights reserved. +// Licensed under the GNU Affero General Public License (AGPL). +// See License-AGPL.txt in the project root for license information. + +package scheduler + +import ( + "context" + v1 "github.com/gitpod-io/gitpod/usage-api/v1" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestLedgerJob_PreventsConcurrentInvocations(t *testing.T) { + client := &fakeUsageClient{} + job := NewLedgerTrigger(client, nil) + + invocations := 3 + wg := sync.WaitGroup{} + wg.Add(invocations) + for i := 0; i < invocations; i++ { + go func() { + _ = job.Run() + wg.Done() + }() + } + wg.Wait() + + require.Equal(t, 1, int(client.ReconcileUsageWithLedgerCallCount)) +} + +type fakeUsageClient struct { + ReconcileUsageWithLedgerCallCount int32 +} + +// GetCostCenter retrieves the active cost center for the given attributionID +func (c *fakeUsageClient) GetCostCenter(ctx context.Context, in *v1.GetCostCenterRequest, opts ...grpc.CallOption) (*v1.GetCostCenterResponse, error) { + return nil, status.Error(codes.Unauthenticated, "not implemented") +} + +// SetCostCenter stores the given cost center +func (c *fakeUsageClient) SetCostCenter(ctx context.Context, in *v1.SetCostCenterRequest, opts ...grpc.CallOption) (*v1.SetCostCenterResponse, error) { + return nil, status.Error(codes.Unauthenticated, "not implemented") +} + +// Triggers reconciliation of usage with ledger implementation. +func (c *fakeUsageClient) ReconcileUsageWithLedger(ctx context.Context, in *v1.ReconcileUsageWithLedgerRequest, opts ...grpc.CallOption) (*v1.ReconcileUsageWithLedgerResponse, error) { + atomic.AddInt32(&c.ReconcileUsageWithLedgerCallCount, 1) + time.Sleep(1 * time.Second) + + return nil, status.Error(codes.Unauthenticated, "not implemented") +} + +// ListUsage retrieves all usage for the specified attributionId and theb given time range +func (c *fakeUsageClient) ListUsage(ctx context.Context, in *v1.ListUsageRequest, opts ...grpc.CallOption) (*v1.ListUsageResponse, error) { + return nil, status.Error(codes.Unauthenticated, "not implemented") +} diff --git a/components/usage/pkg/controller/reporter.go b/components/usage/pkg/scheduler/reporter.go similarity index 53% rename from components/usage/pkg/controller/reporter.go rename to components/usage/pkg/scheduler/reporter.go index 100c1e133249cb..18f7a374a4258d 100644 --- a/components/usage/pkg/controller/reporter.go +++ b/components/usage/pkg/scheduler/reporter.go @@ -2,7 +2,7 @@ // Licensed under the GNU Affero General Public License (AGPL). // See License-AGPL.txt in the project root for license information. -package controller +package scheduler import ( "fmt" @@ -16,26 +16,26 @@ const ( ) var ( - reconcileStartedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + jobStartedSeconds = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "reconcile_started_total", - Help: "Number of usage reconciliation runs started", - }, []string{}) + Name: "scheduler_job_started", + Help: "Number of jobs started", + }, []string{"job"}) - reconcileStartedDurationSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + jobCompletedSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "reconcile_completed_duration_seconds", - Help: "Histogram of reconcile duration", + Name: "scheduler_job_completed_seconds", + Help: "Histogram of job duration", Buckets: prometheus.LinearBuckets(30, 30, 10), // every 30 secs, starting at 30secs - }, []string{"outcome"}) + }, []string{"job", "outcome"}) ) func RegisterMetrics(reg *prometheus.Registry) error { metrics := []prometheus.Collector{ - reconcileStartedTotal, - reconcileStartedDurationSeconds, + jobStartedSeconds, + jobCompletedSeconds, } for _, metric := range metrics { err := reg.Register(metric) @@ -47,14 +47,14 @@ func RegisterMetrics(reg *prometheus.Registry) error { return nil } -func reportUsageReconcileStarted() { - reconcileStartedTotal.WithLabelValues().Inc() +func reportJobStarted(id string) { + jobStartedSeconds.WithLabelValues(id).Inc() } -func reportUsageReconcileFinished(duration time.Duration, err error) { +func reportJobCompleted(id string, duration time.Duration, err error) { outcome := "success" if err != nil { outcome = "error" } - reconcileStartedDurationSeconds.WithLabelValues(outcome).Observe(duration.Seconds()) + jobCompletedSeconds.WithLabelValues(id, outcome).Observe(duration.Seconds()) } diff --git a/components/usage/pkg/scheduler/scheduler.go b/components/usage/pkg/scheduler/scheduler.go new file mode 100644 index 00000000000000..13b3ab3eb65c54 --- /dev/null +++ b/components/usage/pkg/scheduler/scheduler.go @@ -0,0 +1,73 @@ +// Copyright (c) 2022 Gitpod GmbH. All rights reserved. +// Licensed under the GNU Affero General Public License (AGPL). +// See License-AGPL.txt in the project root for license information. + +package scheduler + +import ( + "github.com/gitpod-io/gitpod/common-go/log" + "github.com/robfig/cron" + "sync" + "time" +) + +func New(jobs ...JobSpec) *Scheduler { + return &Scheduler{ + specs: jobs, + runningJobs: sync.WaitGroup{}, + cron: cron.NewWithLocation(time.UTC), + } +} + +type Scheduler struct { + specs []JobSpec + runningJobs sync.WaitGroup + + cron *cron.Cron +} + +type JobSpec struct { + Job Job + ID string + Schedule cron.Schedule +} + +func (c *Scheduler) Start() { + log.Infof("Starting usage scheduler. Setting up %d jobs.", len(c.specs)) + + for _, job := range c.specs { + c.cron.Schedule(job.Schedule, cron.FuncJob(func() { + c.runningJobs.Add(1) + defer c.runningJobs.Done() + + now := time.Now().UTC() + logger := log.WithField("job_id", job.ID) + + logger.Infof("Starting scheduled job %s", job.ID) + reportJobStarted(job.ID) + + err := job.Job.Run() + defer func() { + reportJobCompleted(job.ID, time.Since(now), err) + }() + if err != nil { + logger.WithError(err).Errorf("Scheduled job %s failed.", job.ID) + return + } + logger.Infof("Scheduled job %s completed succesfully.", job.ID) + })) + } + + c.cron.Start() +} + +// Stop terminates the Scheduler and awaits for all running jobs to complete. +func (c *Scheduler) Stop() { + log.Info("Stopping usage controller.") + + log.Info("Awaiting existing reconciliation runs to complete..") + // Wait for existing jobs to finish + c.runningJobs.Wait() + + log.Infof("All running jobs completed.") +} diff --git a/components/usage/pkg/server/server.go b/components/usage/pkg/server/server.go index 116740631582f0..71af7cb48ee8a7 100644 --- a/components/usage/pkg/server/server.go +++ b/components/usage/pkg/server/server.go @@ -6,6 +6,7 @@ package server import ( "fmt" + "github.com/gitpod-io/gitpod/usage/pkg/scheduler" "net" "os" "time" @@ -19,7 +20,6 @@ import ( "github.com/gitpod-io/gitpod/common-go/log" v1 "github.com/gitpod-io/gitpod/usage-api/v1" "github.com/gitpod-io/gitpod/usage/pkg/apiv1" - "github.com/gitpod-io/gitpod/usage/pkg/controller" "github.com/gitpod-io/gitpod/usage/pkg/db" "github.com/gitpod-io/gitpod/usage/pkg/stripe" "gorm.io/gorm" @@ -105,17 +105,13 @@ func Start(cfg Config) error { return fmt.Errorf("failed to parse schedule duration: %w", err) } - usageClient := v1.NewUsageServiceClient(selfConnection) - ctrl, err := controller.New(schedule, controller.NewLedgerReconciler(usageClient, v1.NewBillingServiceClient(selfConnection))) - if err != nil { - return fmt.Errorf("failed to initialize ledger controller: %w", err) - } + jobSpec, err := scheduler.NewLedgerTriggerJobSpec(schedule, + scheduler.NewLedgerTrigger(v1.NewUsageServiceClient(selfConnection), v1.NewBillingServiceClient(selfConnection)), + ) - err = ctrl.Start() - if err != nil { - return fmt.Errorf("failed tostart ledger controller: %w", err) - } - defer ctrl.Stop() + sched := scheduler.New(jobSpec) + sched.Start() + defer sched.Stop() } else { log.Info("No controller schedule specified, controller will be disabled.") } @@ -125,7 +121,7 @@ func Start(cfg Config) error { return fmt.Errorf("failed to register gRPC services: %w", err) } - err = controller.RegisterMetrics(srv.MetricsRegistry()) + err = scheduler.RegisterMetrics(srv.MetricsRegistry()) if err != nil { return fmt.Errorf("failed to register controller metrics: %w", err) }