@@ -19,12 +19,12 @@ package autoscaler
19
19
import (
20
20
"fmt"
21
21
"math"
22
- "sync"
23
22
"time"
24
23
25
24
"github.com/cortexlabs/cortex/pkg/lib/cron"
26
25
"github.com/cortexlabs/cortex/pkg/lib/errors"
27
26
libmath "github.com/cortexlabs/cortex/pkg/lib/math"
27
+ "github.com/cortexlabs/cortex/pkg/lib/pointer"
28
28
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
29
29
libtime "github.com/cortexlabs/cortex/pkg/lib/time"
30
30
"github.com/cortexlabs/cortex/pkg/types/spec"
@@ -44,19 +44,18 @@ type Scaler interface {
44
44
}
45
45
46
46
type Autoscaler struct {
47
- sync.Mutex
48
- logger * zap.SugaredLogger
49
- crons map [string ]cron.Cron
50
- scalers map [userconfig.Kind ]Scaler
51
- lastAwakenTimestamp map [string ]time.Time
47
+ logger * zap.SugaredLogger
48
+ crons map [string ]cron.Cron
49
+ scalers map [userconfig.Kind ]Scaler
50
+ recs map [string ]* recommendations
52
51
}
53
52
54
53
func New (logger * zap.SugaredLogger ) * Autoscaler {
55
54
return & Autoscaler {
56
- logger : logger ,
57
- crons : make (map [string ]cron.Cron ),
58
- scalers : make (map [userconfig.Kind ]Scaler ),
59
- lastAwakenTimestamp : make (map [string ]time. Time ),
55
+ logger : logger ,
56
+ crons : make (map [string ]cron.Cron ),
57
+ scalers : make (map [userconfig.Kind ]Scaler ),
58
+ recs : make (map [string ]* recommendations ),
60
59
}
61
60
}
62
61
@@ -65,9 +64,6 @@ func (a *Autoscaler) AddScaler(scaler Scaler, kind userconfig.Kind) {
65
64
}
66
65
67
66
func (a * Autoscaler ) Awaken (api userconfig.Resource ) error {
68
- a .Lock ()
69
- defer a .Unlock ()
70
-
71
67
scaler , ok := a .scalers [api .Kind ]
72
68
if ! ok {
73
69
return errors .ErrorUnexpected (
@@ -94,7 +90,7 @@ func (a *Autoscaler) Awaken(api userconfig.Resource) error {
94
90
return errors .Wrap (err , "failed to scale api to one" )
95
91
}
96
92
97
- a .lastAwakenTimestamp [api .Name ] = time . Now ( )
93
+ a .recs [api .Name ]. add ( 1 )
98
94
99
95
return nil
100
96
}
@@ -104,11 +100,6 @@ func (a *Autoscaler) AddAPI(api userconfig.Resource) error {
104
100
return nil
105
101
}
106
102
107
- autoscaleFn , err := a .autoscaleFn (api )
108
- if err != nil {
109
- return err
110
- }
111
-
112
103
errorHandler := func (err error ) {
113
104
log := a .logger .With (
114
105
zap .String ("apiName" , api .Name ),
@@ -119,12 +110,12 @@ func (a *Autoscaler) AddAPI(api userconfig.Resource) error {
119
110
telemetry .Error (err )
120
111
}
121
112
122
- a .crons [api .Name ] = cron .Run (autoscaleFn , errorHandler , spec .AutoscalingTickInterval )
113
+ autoscaleFn , err := a .autoscaleFn (api )
114
+ if err != nil {
115
+ return err
116
+ }
123
117
124
- // make sure there is no awaken call registered to an older API with the same name
125
- a .Lock ()
126
- delete (a .lastAwakenTimestamp , api .Name )
127
- a .Unlock ()
118
+ a .crons [api .Name ] = cron .Run (autoscaleFn , errorHandler , spec .AutoscalingTickInterval )
128
119
129
120
return nil
130
121
}
@@ -140,10 +131,7 @@ func (a *Autoscaler) RemoveAPI(api userconfig.Resource) {
140
131
delete (a .crons , api .Name )
141
132
}
142
133
143
- a .Lock ()
144
- delete (a .lastAwakenTimestamp , api .Name )
145
- a .Unlock ()
146
-
134
+ delete (a .recs , api .Name )
147
135
log .Info ("autoscaler stop" )
148
136
}
149
137
@@ -170,7 +158,7 @@ func (a *Autoscaler) autoscaleFn(api userconfig.Resource) (func() error, error)
170
158
log .Info ("autoscaler init" )
171
159
172
160
var startTime time.Time
173
- recs := make ( recommendations )
161
+ a . recs [ api . Name ] = newRecommendations ( )
174
162
175
163
return func () error {
176
164
autoscalingSpec , err := scaler .GetAutoscalingSpec (api .Name )
@@ -227,6 +215,8 @@ func (a *Autoscaler) autoscaleFn(api userconfig.Resource) (func() error, error)
227
215
recommendation = autoscalingSpec .MaxReplicas
228
216
}
229
217
218
+ recs := a .recs [api .Name ]
219
+
230
220
// Rule of thumb: any modifications that don't consider historical recommendations should be performed before
231
221
// recording the recommendation, any modifications that use historical recommendations should be performed after
232
222
recs .add (recommendation )
@@ -240,25 +230,20 @@ func (a *Autoscaler) autoscaleFn(api userconfig.Resource) (func() error, error)
240
230
241
231
if request < currentReplicas {
242
232
downscaleStabilizationFloor = recs .maxSince (autoscalingSpec .DownscaleStabilizationPeriod )
233
+ if downscaleStabilizationFloor != nil {
234
+ downscaleStabilizationFloor = pointer .Int32 (libmath .MinInt32 (* downscaleStabilizationFloor , currentReplicas ))
235
+ }
243
236
if time .Since (startTime ) < autoscalingSpec .DownscaleStabilizationPeriod {
244
237
request = currentReplicas
245
238
} else if downscaleStabilizationFloor != nil && request < * downscaleStabilizationFloor {
246
239
request = * downscaleStabilizationFloor
247
240
}
248
-
249
- // awaken state: was scaled from zero
250
- // This needs to be protected by a Mutex because an Awaken call will also modify it
251
- a .Lock ()
252
- lastAwakenTimestamp := a .lastAwakenTimestamp [api .Name ]
253
-
254
- // Make sure we don't scale below zero if API was recently awaken
255
- if time .Since (lastAwakenTimestamp ) < autoscalingSpec .DownscaleStabilizationPeriod {
256
- request = libmath .MaxInt32 (request , 1 )
257
- }
258
- a .Unlock ()
259
241
}
260
242
if request > currentReplicas {
261
243
upscaleStabilizationCeil = recs .minSince (autoscalingSpec .UpscaleStabilizationPeriod )
244
+ if upscaleStabilizationCeil != nil {
245
+ upscaleStabilizationCeil = pointer .Int32 (libmath .MaxInt32 (* upscaleStabilizationCeil , currentReplicas ))
246
+ }
262
247
if time .Since (startTime ) < autoscalingSpec .UpscaleStabilizationPeriod {
263
248
request = currentReplicas
264
249
} else if upscaleStabilizationCeil != nil && request > * upscaleStabilizationCeil {
0 commit comments