Skip to content

Commit

Permalink
[usage] Refactor controller package into scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
easyCZ authored and roboquat committed Sep 13, 2022
1 parent 24c50d2 commit 8dcbb57
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 10 deletions.
2 changes: 1 addition & 1 deletion components/ide-metrics/pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func Test_allowListCollector_Reconcile(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := c.Reconcile(tt.args.labels); !reflect.DeepEqual(got, tt.want) {
t.Errorf("allowListCollector.Run() = %v, want %v", got, tt.want)
t.Errorf("allowListCollector.Reconcile() = %v, want %v", got, tt.want)
}
})
}
Expand Down
4 changes: 4 additions & 0 deletions components/usage/pkg/scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (r *LedgerJob) Run() (err error) {
// attempt a write to signal we want to run
case r.running <- struct{}{}:
// we managed to write, there's no other job executing. Cases are not fall through so we continue executing our main logic.
defer func() {
// signal job completed
<-r.running
}()
default:
// we could not write, so another instance is already running. Skip current run.
log.Infof("Skipping ledger run, another run is already in progress.")
Expand Down
12 changes: 11 additions & 1 deletion components/usage/pkg/scheduler/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ func TestLedgerJob_PreventsConcurrentInvocations(t *testing.T) {
require.Equal(t, 1, int(client.ReconcileUsageWithLedgerCallCount))
}

func TestLedgerJob_CanRunRepeatedly(t *testing.T) {
client := &fakeUsageClient{}
job := NewLedgerTrigger(client, nil)

_ = job.Run()
_ = job.Run()

require.Equal(t, 2, int(client.ReconcileUsageWithLedgerCallCount))
}

type fakeUsageClient struct {
ReconcileUsageWithLedgerCallCount int32
}
Expand All @@ -52,7 +62,7 @@ func (c *fakeUsageClient) SetCostCenter(ctx context.Context, in *v1.SetCostCente
// Triggers reconciliation of usage with ledger implementation.
func (c *fakeUsageClient) ReconcileUsageWithLedger(ctx context.Context, in *v1.ReconcileUsageWithLedgerRequest, opts ...grpc.CallOption) (*v1.ReconcileUsageWithLedgerResponse, error) {
atomic.AddInt32(&c.ReconcileUsageWithLedgerCallCount, 1)
time.Sleep(1 * time.Second)
time.Sleep(50 * time.Millisecond)

return nil, status.Error(codes.Unauthenticated, "not implemented")
}
Expand Down
2 changes: 1 addition & 1 deletion components/usage/pkg/scheduler/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var (
jobStartedSeconds = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "scheduler_job_started",
Name: "scheduler_job_started_total",
Help: "Number of jobs started",
}, []string{"job"})

Expand Down
17 changes: 10 additions & 7 deletions components/usage/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,21 @@ func (c *Scheduler) Start() {
log.Infof("Starting usage scheduler. Setting up %d jobs.", len(c.specs))

for _, job := range c.specs {
// need to re-assign job to avoid pointing to a different job spec once the `cron.FuncJob` executes.
j := job
c.cron.Schedule(job.Schedule, cron.FuncJob(func() {
c.runningJobs.Add(1)
defer c.runningJobs.Done()

now := time.Now().UTC()
logger := log.WithField("job_id", job.ID)
logger := log.WithField("job_id", j.ID)

logger.Infof("Starting scheduled job %s", job.ID)
reportJobStarted(job.ID)
logger.Infof("Starting scheduled job %s", j.ID)
reportJobStarted(j.ID)

err := job.Job.Run()
err := j.Job.Run()
defer func() {
reportJobCompleted(job.ID, time.Since(now), err)
reportJobCompleted(j.ID, time.Since(now), err)
}()
if err != nil {
logger.WithError(err).Errorf("Scheduled job %s failed.", job.ID)
Expand All @@ -63,9 +65,10 @@ func (c *Scheduler) Start() {

// Stop terminates the Scheduler and awaits for all running jobs to complete.
func (c *Scheduler) Stop() {
log.Info("Stopping usage controller.")
log.Info("Stopping scheduler.")
c.cron.Stop()

log.Info("Awaiting existing reconciliation runs to complete..")
log.Info("Awaiting existing jobs to complete.")
// Wait for existing jobs to finish
c.runningJobs.Wait()

Expand Down
47 changes: 47 additions & 0 deletions components/usage/pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License-AGPL.txt in the project root for license information.

package scheduler

import (
"github.com/robfig/cron"
"github.com/stretchr/testify/require"
"testing"
"time"
)

func TestScheduler(t *testing.T) {
firstRan := false
secondRan := false
s := New(
JobSpec{
Job: JobFunc(func() error {
firstRan = true
return nil
}),
ID: "first",
Schedule: cron.ConstantDelaySchedule{Delay: time.Second},
},
JobSpec{
Job: JobFunc(func() error {
secondRan = true
return nil
}),
ID: "second",
Schedule: cron.ConstantDelaySchedule{Delay: time.Second},
},
)
s.Start()
time.Sleep(1 * time.Second)
s.Stop()

require.True(t, firstRan)
require.True(t, secondRan)
}

type JobFunc func() error

func (f JobFunc) Run() error {
return f()
}
3 changes: 3 additions & 0 deletions components/usage/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ func Start(cfg Config) error {
jobSpec, err := scheduler.NewLedgerTriggerJobSpec(schedule,
scheduler.NewLedgerTrigger(v1.NewUsageServiceClient(selfConnection), v1.NewBillingServiceClient(selfConnection)),
)
if err != nil {
return fmt.Errorf("failed to setup ledger trigger job: %w", err)
}

sched := scheduler.New(jobSpec)
sched.Start()
Expand Down

0 comments on commit 8dcbb57

Please sign in to comment.