Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rfac: Improve cron scaler #6166

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ New deprecation(s):

### Other

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **Cron scaler**: Simplify cron scaler code ([#6056](https://github.com/kedacore/keda/issues/6056))

## v2.15.1

Expand Down
95 changes: 39 additions & 56 deletions pkg/scalers/cron_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ import (
)

const (
defaultDesiredReplicas = 1
cronMetricType = "External"
cronMetricType = "External"
)

type cronScaler struct {
metricType v2.MetricTargetType
metadata cronMetadata
logger logr.Logger
metricType v2.MetricTargetType
metadata cronMetadata
logger logr.Logger
startSchedule cron.Schedule
endSchedule cron.Schedule
}

type cronMetadata struct {
Expand All @@ -35,21 +36,11 @@ type cronMetadata struct {
}

func (m *cronMetadata) Validate() error {
if m.Timezone == "" {
return fmt.Errorf("no timezone specified")
}

parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
if m.Start == "" {
return fmt.Errorf("no start schedule specified")
}
if _, err := parser.Parse(m.Start); err != nil {
return fmt.Errorf("error parsing start schedule: %w", err)
}

if m.End == "" {
return fmt.Errorf("no end schedule specified")
}
if _, err := parser.Parse(m.End); err != nil {
return fmt.Errorf("error parsing end schedule: %w", err)
}
Expand All @@ -65,7 +56,6 @@ func (m *cronMetadata) Validate() error {
return nil
}

// NewCronScaler creates a new cronScaler
func NewCronScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
Expand All @@ -77,25 +67,23 @@ func NewCronScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
return nil, fmt.Errorf("error parsing cron metadata: %w", err)
}

parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)

startSchedule, _ := parser.Parse(meta.Start)
endSchedule, _ := parser.Parse(meta.End)

return &cronScaler{
metricType: metricType,
metadata: meta,
logger: InitializeLogger(config, "cron_scaler"),
metricType: metricType,
metadata: meta,
logger: InitializeLogger(config, "cron_scaler"),
startSchedule: startSchedule,
endSchedule: endSchedule,
}, nil
}

func getCronTime(location *time.Location, spec string) (int64, error) {
c := cron.New(cron.WithLocation(location))
_, err := c.AddFunc(spec, func() { _ = fmt.Sprintf("Cron initialized for location %s", location.String()) })
if err != nil {
return 0, err
}

c.Start()
cronTime := c.Entries()[0].Next.Unix()
c.Stop()

return cronTime, nil
func getCronTime(location *time.Location, schedule cron.Schedule) time.Time {
// Use the pre-parsed cron schedule directly to get the next time
return schedule.Next(time.Now().In(location))
}

func parseCronMetadata(config *scalersconfig.ScalerConfig) (cronMetadata, error) {
Expand All @@ -120,48 +108,43 @@ func parseCronTimeFormat(s string) string {

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *cronScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
var specReplicas int64 = 1
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, kedautil.NormalizeString(fmt.Sprintf("cron-%s-%s-%s", s.metadata.Timezone, parseCronTimeFormat(s.metadata.Start), parseCronTimeFormat(s.metadata.End)))),
},
Target: GetMetricTarget(s.metricType, specReplicas),
Target: GetMetricTarget(s.metricType, s.metadata.DesiredReplicas),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: cronMetricType}
return []v2.MetricSpec{metricSpec}
}

// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric
func (s *cronScaler) GetMetricsAndActivity(_ context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
var defaultDesiredReplicas = int64(defaultDesiredReplicas)

location, err := time.LoadLocation(s.metadata.Timezone)
if err != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("unable to load timezone. Error: %w", err)
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("unable to load timezone: %w", err)
}

// Since we are considering the timestamp here and not the exact time, timezone does matter.
currentTime := time.Now().Unix()
currentTime := time.Now().In(location)

nextStartTime, startTimecronErr := getCronTime(location, s.metadata.Start)
if startTimecronErr != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error initializing start cron: %w", startTimecronErr)
}
// Use the pre-parsed schedules to get the next start and end times
nextStartTime := getCronTime(location, s.startSchedule)
nextEndTime := getCronTime(location, s.endSchedule)

isWithinInterval := false

nextEndTime, endTimecronErr := getCronTime(location, s.metadata.End)
if endTimecronErr != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error intializing end cron: %w", endTimecronErr)
if nextStartTime.Before(nextEndTime) {
// Interval within the same day
isWithinInterval = currentTime.After(nextStartTime) && currentTime.Before(nextEndTime)
} else {
// Interval spans midnight
isWithinInterval = currentTime.After(nextStartTime) || currentTime.Before(nextEndTime)
}

switch {
case nextStartTime < nextEndTime && currentTime < nextStartTime:
metric := GenerateMetricInMili(metricName, float64(defaultDesiredReplicas))
return []external_metrics.ExternalMetricValue{metric}, false, nil
case currentTime <= nextEndTime:
metric := GenerateMetricInMili(metricName, float64(s.metadata.DesiredReplicas))
return []external_metrics.ExternalMetricValue{metric}, true, nil
default:
metric := GenerateMetricInMili(metricName, float64(defaultDesiredReplicas))
return []external_metrics.ExternalMetricValue{metric}, false, nil
metricValue := float64(1)
if isWithinInterval {
metricValue = float64(s.metadata.DesiredReplicas)
}

metric := GenerateMetricInMili(metricName, metricValue)
return []external_metrics.ExternalMetricValue{metric}, isWithinInterval, nil
}
14 changes: 13 additions & 1 deletion pkg/scalers/cron_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"

"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
Expand Down Expand Up @@ -121,7 +122,18 @@ func TestCronGetMetricSpecForScaling(t *testing.T) {
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockCronScaler := cronScaler{"", meta, logr.Discard()}

parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
startSchedule, _ := parser.Parse(meta.Start)
endSchedule, _ := parser.Parse(meta.End)

mockCronScaler := cronScaler{
metricType: "",
metadata: meta,
logger: logr.Discard(),
startSchedule: startSchedule,
endSchedule: endSchedule,
}

metricSpec := mockCronScaler.GetMetricSpecForScaling(context.Background())
metricName := metricSpec[0].External.Metric.Name
Expand Down
Loading