Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make webhook-based scale race-free #1477

Merged
merged 9 commits into from
Jun 27, 2022
3 changes: 3 additions & 0 deletions cmd/githubwebhookserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func main() {
enableLeaderElection bool
syncPeriod time.Duration
logLevel string
queueLimit int

ghClient *github.Client
)
Expand All @@ -92,6 +93,7 @@ func main() {
"Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")
flag.DurationVar(&syncPeriod, "sync-period", 10*time.Minute, "Determines the minimum frequency at which K8s resources managed by this controller are reconciled. When you use autoscaling, set to a lower value like 10 minute, because this corresponds to the minimum time to react on demand change")
flag.StringVar(&logLevel, "log-level", logging.LogLevelDebug, `The verbosity of the logging. Valid values are "debug", "info", "warn", "error". Defaults to "debug".`)
flag.IntVar(&queueLimit, "queue-limit", controllers.DefaultQueueLimit, `The maximum length of the scale operation queue. The scale opration is enqueued per every matching webhook event, and the server returns a 500 HTTP status when the queue was already full on enqueue attempt.`)
flag.StringVar(&webhookSecretToken, "github-webhook-secret-token", "", "The personal access token of GitHub.")
flag.StringVar(&c.Token, "github-token", c.Token, "The personal access token of GitHub.")
flag.Int64Var(&c.AppID, "github-app-id", c.AppID, "The application ID of GitHub App.")
Expand Down Expand Up @@ -164,6 +166,7 @@ func main() {
SecretKeyBytes: []byte(webhookSecretToken),
Namespace: watchNamespace,
GitHubClient: ghClient,
QueueLimit: queueLimit,
}

if err = hraGitHubWebhook.SetupWithManager(mgr); err != nil {
Expand Down
207 changes: 207 additions & 0 deletions controllers/horizontal_runner_autoscaler_batch_scale.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package controllers

import (
"context"
"fmt"
"sync"
"time"

"github.com/actions-runner-controller/actions-runner-controller/api/v1alpha1"
"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type batchScaler struct {
Ctx context.Context
Client client.Client
Log logr.Logger
interval time.Duration

queue chan *ScaleTarget
workerStart sync.Once
}

func newBatchScaler(ctx context.Context, client client.Client, log logr.Logger) *batchScaler {
return &batchScaler{
Ctx: ctx,
Client: client,
Log: log,
interval: 3 * time.Second,
}
}

type batchScaleOperation struct {
namespacedName types.NamespacedName
scaleOps []scaleOperation
}

type scaleOperation struct {
trigger v1alpha1.ScaleUpTrigger
log logr.Logger
}

// Add the scale target to the unbounded queue, blocking until the target is successfully added to the queue.
// All the targets in the queue are dequeued every 3 seconds, grouped by the HRA, and applied.
// In a happy path, batchScaler update each HRA only once, even though the HRA had two or more associated webhook events in the 3 seconds interval,
// which results in less K8s API calls and less HRA update conflicts in case your ARC installation receives a lot of webhook events
func (s *batchScaler) Add(st *ScaleTarget) {
if st == nil {
return
}

s.workerStart.Do(func() {
var expBackoff = []time.Duration{time.Second, 2 * time.Second, 4 * time.Second, 8 * time.Second, 16 * time.Second}

s.queue = make(chan *ScaleTarget)

log := s.Log

go func() {
log.Info("Starting batch worker")
defer log.Info("Stopped batch worker")

for {
select {
case <-s.Ctx.Done():
return
default:
}

log.V(2).Info("Batch worker is dequeueing operations")

batches := map[types.NamespacedName]batchScaleOperation{}
after := time.After(s.interval)
var ops uint

batch:
for {
select {
case <-after:
after = nil
break batch
case st := <-s.queue:
nsName := types.NamespacedName{
Namespace: st.HorizontalRunnerAutoscaler.Namespace,
Name: st.HorizontalRunnerAutoscaler.Name,
}
b, ok := batches[nsName]
if !ok {
b = batchScaleOperation{
namespacedName: nsName,
}
}
b.scaleOps = append(b.scaleOps, scaleOperation{
log: *st.log,
trigger: st.ScaleUpTrigger,
})
batches[nsName] = b
ops++
}
}

log.V(2).Info("Batch worker dequeued operations", "ops", ops, "batches", len(batches))

retry:
for i := 0; ; i++ {
failed := map[types.NamespacedName]batchScaleOperation{}

for nsName, b := range batches {
b := b
if err := s.batchScale(context.Background(), b); err != nil {
log.V(2).Info("Failed to scale due to error", "error", err)
failed[nsName] = b
} else {
log.V(2).Info("Successfully ran batch scale", "hra", b.namespacedName)
}
}

if len(failed) == 0 {
break retry
}

batches = failed

delay := 16 * time.Second
if i < len(expBackoff) {
delay = expBackoff[i]
}
time.Sleep(delay)
}
}
}()
})

s.queue <- st
}

func (s *batchScaler) batchScale(ctx context.Context, batch batchScaleOperation) error {
var hra v1alpha1.HorizontalRunnerAutoscaler

if err := s.Client.Get(ctx, batch.namespacedName, &hra); err != nil {
return err
}

copy := hra.DeepCopy()

copy.Spec.CapacityReservations = getValidCapacityReservations(copy)

var added, completed int

for _, scale := range batch.scaleOps {
amount := 1

if scale.trigger.Amount != 0 {
amount = scale.trigger.Amount
}

scale.log.V(2).Info("Adding capacity reservation", "amount", amount)

if amount > 0 {
now := time.Now()
copy.Spec.CapacityReservations = append(copy.Spec.CapacityReservations, v1alpha1.CapacityReservation{
EffectiveTime: metav1.Time{Time: now},
ExpirationTime: metav1.Time{Time: now.Add(scale.trigger.Duration.Duration)},
Replicas: amount,
})

added += amount
} else if amount < 0 {
var reservations []v1alpha1.CapacityReservation

var found bool

for _, r := range copy.Spec.CapacityReservations {
if !found && r.Replicas+amount == 0 {
found = true
} else {
reservations = append(reservations, r)
}
}

copy.Spec.CapacityReservations = reservations

completed += amount
}
}

before := len(hra.Spec.CapacityReservations)
expired := before - len(copy.Spec.CapacityReservations)
after := len(copy.Spec.CapacityReservations)

s.Log.V(1).Info(
fmt.Sprintf("Updating hra %s for capacityReservations update", hra.Name),
"before", before,
"expired", expired,
"added", added,
"completed", completed,
"after", after,
)

if err := s.Client.Update(ctx, copy); err != nil {
return fmt.Errorf("updating horizontalrunnerautoscaler to add capacity reservation: %w", err)
}

return nil
}
86 changes: 26 additions & 60 deletions controllers/horizontal_runner_autoscaler_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"io/ioutil"
"net/http"
"strings"
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

Expand All @@ -46,6 +46,8 @@ const (

keyPrefixEnterprise = "enterprises/"
keyRunnerGroup = "/group/"

DefaultQueueLimit = 100
)

// HorizontalRunnerAutoscalerGitHubWebhook autoscales a HorizontalRunnerAutoscaler and the RunnerDeployment on each
Expand All @@ -68,6 +70,15 @@ type HorizontalRunnerAutoscalerGitHubWebhook struct {
// Set to empty for letting it watch for all namespaces.
Namespace string
Name string

// QueueLimit is the maximum length of the bounded queue of scale targets and their associated operations
// A scale target is enqueued on each retrieval of each eligible webhook event, so that it is processed asynchronously.
QueueLimit int

worker *worker
workerInit sync.Once
workerStart sync.Once
batchCh chan *ScaleTarget
}

func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) Reconcile(_ context.Context, request reconcile.Request) (reconcile.Result, error) {
Expand Down Expand Up @@ -312,9 +323,19 @@ func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) Handle(w http.Respons
return
}

if err := autoscaler.tryScale(context.TODO(), target); err != nil {
log.Error(err, "could not scale up")
autoscaler.workerInit.Do(func() {
batchScaler := newBatchScaler(context.Background(), autoscaler.Client, autoscaler.Log)

queueLimit := autoscaler.QueueLimit
if queueLimit == 0 {
queueLimit = DefaultQueueLimit
}
autoscaler.worker = newWorker(context.Background(), queueLimit, batchScaler.Add)
})

target.log = &log
if ok := autoscaler.worker.Add(target); !ok {
log.Error(err, "Could not scale up due to queue full")
return
}

Expand Down Expand Up @@ -383,6 +404,8 @@ func matchTriggerConditionAgainstEvent(types []string, eventAction *string) bool
type ScaleTarget struct {
v1alpha1.HorizontalRunnerAutoscaler
v1alpha1.ScaleUpTrigger

log *logr.Logger
}

func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) searchScaleTargets(hras []v1alpha1.HorizontalRunnerAutoscaler, f func(v1alpha1.ScaleUpTrigger) bool) []ScaleTarget {
Expand Down Expand Up @@ -770,63 +793,6 @@ HRA:
return nil, nil
}

func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) tryScale(ctx context.Context, target *ScaleTarget) error {
if target == nil {
return nil
}

copy := target.HorizontalRunnerAutoscaler.DeepCopy()

amount := 1

if target.ScaleUpTrigger.Amount != 0 {
amount = target.ScaleUpTrigger.Amount
}

capacityReservations := getValidCapacityReservations(copy)

if amount > 0 {
now := time.Now()
copy.Spec.CapacityReservations = append(capacityReservations, v1alpha1.CapacityReservation{
EffectiveTime: metav1.Time{Time: now},
ExpirationTime: metav1.Time{Time: now.Add(target.ScaleUpTrigger.Duration.Duration)},
Replicas: amount,
})
} else if amount < 0 {
var reservations []v1alpha1.CapacityReservation

var found bool

for _, r := range capacityReservations {
if !found && r.Replicas+amount == 0 {
found = true
} else {
reservations = append(reservations, r)
}
}

copy.Spec.CapacityReservations = reservations
}

before := len(target.HorizontalRunnerAutoscaler.Spec.CapacityReservations)
expired := before - len(capacityReservations)
after := len(copy.Spec.CapacityReservations)

autoscaler.Log.V(1).Info(
fmt.Sprintf("Patching hra %s for capacityReservations update", target.HorizontalRunnerAutoscaler.Name),
"before", before,
"expired", expired,
"amount", amount,
"after", after,
)

if err := autoscaler.Client.Patch(ctx, copy, client.MergeFrom(&target.HorizontalRunnerAutoscaler)); err != nil {
return fmt.Errorf("patching horizontalrunnerautoscaler to add capacity reservation: %w", err)
}

return nil
}

func getValidCapacityReservations(autoscaler *v1alpha1.HorizontalRunnerAutoscaler) []v1alpha1.CapacityReservation {
var capacityReservations []v1alpha1.CapacityReservation

Expand Down
Loading