Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supporting scaler 0.2.0 #15

Merged
Merged
4 changes: 2 additions & 2 deletions autoscaler/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ COPY . .

RUN go get github.com/v3io/scaler-types \
&& cd $GOPATH/src/github.com/v3io/scaler-types \
&& git checkout v1.3.0 \
&& git checkout v1.4.0 \
&& cd /go/src/github.com/v3io/app_resourcescaler

RUN mkdir -p /home/app_resourcescaler/bin \
Expand All @@ -30,6 +30,6 @@ WORKDIR /home/app_resourcescaler
ENV PATH=/home/app_resourcescaler:$PATH

COPY --from=builder /home/app_resourcescaler/plugins/plugin.so /home/app_resourcescaler/plugins/plugin.so
COPY --from=quay.io/v3io/autoscaler:v0.1.2 /home/v3io/bin/autoscaler /home/app_resourcescaler/autoscaler
COPY --from=quay.io/v3io/autoscaler:v0.2.0 /home/v3io/bin/autoscaler /home/app_resourcescaler/autoscaler

CMD ["autoscaler"]
4 changes: 2 additions & 2 deletions dlx/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ COPY . .

RUN go get github.com/v3io/scaler-types \
&& cd $GOPATH/src/github.com/v3io/scaler-types \
&& git checkout v1.3.0 \
&& git checkout v1.4.0 \
&& cd /go/src/github.com/v3io/app_resourcescaler

RUN mkdir -p /home/app_resourcescaler/bin \
Expand All @@ -30,6 +30,6 @@ WORKDIR /home/app_resourcescaler
ENV PATH=/home/app_resourcescaler:$PATH

COPY --from=builder /home/app_resourcescaler/plugins/plugin.so /home/app_resourcescaler/plugins/plugin.so
COPY --from=quay.io/v3io/dlx:v0.1.2 /home/v3io/bin/dlx /home/app_resourcescaler/dlx
COPY --from=quay.io/v3io/dlx:v0.2.0 /home/v3io/bin/dlx /home/app_resourcescaler/dlx

CMD ["dlx"]
186 changes: 103 additions & 83 deletions resourcescaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ func New(kubeconfigPath string, namespace string) (scaler_types.ResourceScaler,
}, nil
}

func (s *AppResourceScaler) SetScale(resource scaler_types.Resource, scale int) error {
func (s *AppResourceScaler) SetScale(resources []scaler_types.Resource, scale int) error {
serviceNames := make([]string, 0)
for _, resource := range resources {
serviceNames = append(serviceNames, resource.Name)
}
if scale == 0 {
return s.scaleServiceToZero(s.namespace, resource.Name)
return s.scaleServicesToZero(s.namespace, serviceNames)
}
return s.scaleServiceFromZero(s.namespace, resource.Name)
return s.scaleServicesFromZero(s.namespace, serviceNames)
}

func (s *AppResourceScaler) GetResources() ([]scaler_types.Resource, error) {
Expand Down Expand Up @@ -115,91 +119,89 @@ func (s *AppResourceScaler) GetConfig() (*scaler_types.ResourceScalerConfig, err
return nil, nil
}

func (s *AppResourceScaler) scaleServiceFromZero(namespace string, serviceName string) error {
func (s *AppResourceScaler) scaleServicesFromZero(namespace string, serviceNames []string) error {
var jsonPatchMapper []map[string]interface{}
s.logger.DebugWith("Scaling from zero", "namespace", namespace, "serviceName", serviceName)
desiredStatePath := fmt.Sprintf("/spec/spec/tenants/0/spec/services/%s/desired_state", string(serviceName))
markForRestartPath := fmt.Sprintf("/spec/spec/tenants/0/spec/services/%s/mark_for_restart", string(serviceName))
scaleToZeroStatusPath := fmt.Sprintf("/status/services/%s/scale_to_zero", string(serviceName))
lastScaleStatePath := fmt.Sprintf("/status/services/%s/scale_to_zero/last_scale_event", string(serviceName))
lastScaleStateTimePath := fmt.Sprintf("/status/services/%s/scale_to_zero/last_scale_event_time", string(serviceName))
s.logger.DebugWith("Scaling from zero", "namespace", namespace, "serviceNames", serviceNames)
marshaledTime, err := time.Now().MarshalText()
if err != nil {
return errors.Wrap(err, "Failed to marshal time")
}
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": desiredStatePath,
"value": "ready",
})
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": markForRestartPath,
"value": false,
})
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": "/status/state",
"value": "waitingForProvisioning",
})
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": scaleToZeroStatusPath,
"value": map[string]interface{}{},
})
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": lastScaleStatePath,
"value": string(scaler_types.ScaleFromZeroStartedScaleEvent),
})
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": lastScaleStateTimePath,
"value": string(marshaledTime),
})
for _, serviceName := range serviceNames {
jsonPatchMapper, err = s.appendServiceStateChangeJsonPatchOperations(jsonPatchMapper,
serviceName,
"ready",
scaler_types.ScaleFromZeroStartedScaleEvent,
marshaledTime)
if err != nil {
return errors.Wrap(err, "Failed appending service state change json patch operations")
}
}

err = s.patchIguazioTenantAppServiceSets(namespace, jsonPatchMapper)

if err != nil {
return errors.Wrap(err, "Failed to patch iguazio tenant app service sets")
}

err = s.waitForServiceState(namespace, serviceName, "ready")
err = s.waitForServicesState(namespace, serviceNames, "ready")

if err != nil {
return errors.Wrap(err, "Failed to wait for service readiness")
return errors.Wrap(err, "Failed to wait for services readiness")
}

return nil
}

func (s *AppResourceScaler) scaleServiceToZero(namespace string, serviceName string) error {
func (s *AppResourceScaler) scaleServicesToZero(namespace string, serviceNames []string) error {
var jsonPatchMapper []map[string]interface{}
s.logger.DebugWith("Scaling to zero", "namespace", namespace, "serviceName", serviceName)
s.logger.DebugWith("Scaling to zero", "namespace", namespace, "serviceNames", serviceNames)
marshaledTime, err := time.Now().MarshalText()
if err != nil {
return errors.Wrap(err, "Failed to marshal time")
}
for _, serviceName := range serviceNames {

jsonPatchMapper, err = s.appendServiceStateChangeJsonPatchOperations(jsonPatchMapper,
serviceName,
"scaledToZero",
scaler_types.ScaleToZeroStartedScaleEvent,
marshaledTime)
if err != nil {
return errors.Wrap(err, "Failed appending service state change json patch operations")
}
}

err = s.patchIguazioTenantAppServiceSets(namespace, jsonPatchMapper)

if err != nil {
return errors.Wrap(err, "Failed to patch iguazio tenant app service sets")
}

err = s.waitForServicesState(namespace, serviceNames, "scaledToZero")

if err != nil {
return errors.Wrap(err, "Failed to wait for services to scale to zero")
}

return nil
}

func (s *AppResourceScaler) appendServiceStateChangeJsonPatchOperations(jsonPatchMapper []map[string]interface{}, serviceName string, desiredState string, scaleEvent scaler_types.ScaleEvent, marshaledTime []byte) ([]map[string]interface{}, error) {
desiredStatePath := fmt.Sprintf("/spec/spec/tenants/0/spec/services/%s/desired_state", string(serviceName))
markForRestartPath := fmt.Sprintf("/spec/spec/tenants/0/spec/services/%s/mark_for_restart", string(serviceName))
scaleToZeroStatusPath := fmt.Sprintf("/status/services/%s/scale_to_zero", string(serviceName))
lastScaleStatePath := fmt.Sprintf("/status/services/%s/scale_to_zero/last_scale_event", string(serviceName))
lastScaleStateTimePath := fmt.Sprintf("/status/services/%s/scale_to_zero/last_scale_event_time", string(serviceName))
marshaledTime, err := time.Now().MarshalText()
if err != nil {
return errors.Wrap(err, "Failed to marshal time")
}
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": desiredStatePath,
"value": "scaledToZero",
"value": desiredState,
})
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": markForRestartPath,
"value": false,
})
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": "/status/state",
"value": "waitingForProvisioning",
})
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": scaleToZeroStatusPath,
Expand All @@ -208,30 +210,24 @@ func (s *AppResourceScaler) scaleServiceToZero(namespace string, serviceName str
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": lastScaleStatePath,
"value": string(scaler_types.ScaleToZeroStartedScaleEvent),
"value": string(scaleEvent),
})
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": lastScaleStateTimePath,
"value": string(marshaledTime),
})

err = s.patchIguazioTenantAppServiceSets(namespace, jsonPatchMapper)

if err != nil {
return errors.Wrap(err, "Failed to patch iguazio tenant app service sets")
}

err = s.waitForServiceState(namespace, serviceName, "scaledToZero")

if err != nil {
return errors.Wrap(err, "Failed to wait for service to scale to zero")
}

return nil
return jsonPatchMapper, nil
}

func (s *AppResourceScaler) patchIguazioTenantAppServiceSets(namespace string, jsonPatchMapper []map[string]interface{}) error {
jsonPatchMapper = append(jsonPatchMapper, map[string]interface{}{
"op": "add",
"path": "/status/state",
"value": "waitingForProvisioning",
})

err := s.waitForNoProvisioningInProcess(namespace)
if err != nil {
return errors.Wrap(err, "Failed waiting for IguazioTenantAppServiceSet to finish provisioning")
Expand Down Expand Up @@ -276,40 +272,45 @@ func (s *AppResourceScaler) waitForNoProvisioningInProcess(namespace string) err
}
}

func (s *AppResourceScaler) waitForServiceState(namespace string, serviceName string, state string) error {
s.logger.DebugWith("Waiting for service to reach state", "serviceName", serviceName, "state", state)
timeout := time.After(5 * time.Minute)
func (s *AppResourceScaler) waitForServicesState(namespace string, serviceNames []string, desiredState string) error {
s.logger.DebugWith("Waiting for services to reach desired state", "serviceNames", serviceNames, "desiredState", desiredState)
timeout := time.After(10 * time.Minute)
tick := time.Tick(5 * time.Second)
for {
select {
case <-timeout:
return errors.New("Timed out waiting for service state")
return errors.New("Timed out waiting for services to reach desired state")
case <-tick:

servicesToCheck := append([]string(nil), serviceNames...)
_, statusServicesMap, _, err := s.getIguazioTenantAppServiceSets()
if err != nil {
return errors.Wrap(err, "Failed to get iguazio tenant app service sets")
}

for statusServiceName, serviceStatus := range statusServicesMap {
if statusServiceName != serviceName {
for serviceName, serviceStatus := range statusServicesMap {
if !stringSliceContainsString(servicesToCheck, serviceName) {
continue
}

stateString, err := s.parseServiceState(serviceStatus)
currentState, err := s.parseServiceState(serviceStatus)
if err != nil {
return errors.Wrap(err, "Failed parsing the service state")
}

if stateString == state {
s.logger.InfoWith("Service reached state", "serviceName", serviceName, "state", state)
return nil
if currentState != desiredState {
s.logger.DebugWith("Service did not reach desired state yet",
"serviceName", serviceName,
"currentState", currentState,
"desiredState", desiredState)
break
}

s.logger.DebugWith("Service did not reach state yet",
"serviceName", serviceName,
"currentState", stateString,
"desiredState", state)
s.logger.DebugWith("Service reached desired state", "serviceName", serviceName, "desiredState", desiredState)
servicesToCheck = removeStringFromSlice(serviceName, servicesToCheck)

if len(servicesToCheck) == 0 {
return nil
}
}
}
}
Expand Down Expand Up @@ -532,3 +533,22 @@ func (s *AppResourceScaler) parseScaleResources(serviceSpecInterface interface{}

return parsedScaleResources, nil
}

func removeStringFromSlice(someString string, slice []string) []string {
var newSlice []string
for _, item := range slice {
if item != someString {
newSlice = append(newSlice, item)
}
}
return newSlice
}

func stringSliceContainsString(slice []string, str string) bool {
for _, stringInSlice := range slice {
if stringInSlice == str {
return true
}
}
return false
}