diff --git a/apis/actions.github.com/v1alpha1/ephemeralrunner_types.go b/apis/actions.github.com/v1alpha1/ephemeralrunner_types.go index 631abde385..1bb74c8247 100644 --- a/apis/actions.github.com/v1alpha1/ephemeralrunner_types.go +++ b/apis/actions.github.com/v1alpha1/ephemeralrunner_types.go @@ -42,6 +42,10 @@ type EphemeralRunner struct { Status EphemeralRunnerStatus `json:"status,omitempty"` } +func (er *EphemeralRunner) IsDone() bool { + return er.Status.Phase == corev1.PodSucceeded || er.Status.Phase == corev1.PodFailed +} + // EphemeralRunnerSpec defines the desired state of EphemeralRunner type EphemeralRunnerSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster diff --git a/apis/actions.github.com/v1alpha1/ephemeralrunnerset_types.go b/apis/actions.github.com/v1alpha1/ephemeralrunnerset_types.go index 88524f2a41..42918b545e 100644 --- a/apis/actions.github.com/v1alpha1/ephemeralrunnerset_types.go +++ b/apis/actions.github.com/v1alpha1/ephemeralrunnerset_types.go @@ -24,6 +24,8 @@ import ( type EphemeralRunnerSetSpec struct { // Replicas is the number of desired EphemeralRunner resources in the k8s namespace. Replicas int `json:"replicas,omitempty"` + // PatchID is the unique identifier for the patch issued by the listener app + PatchID int `json:"patchID"` EphemeralRunnerSpec EphemeralRunnerSpec `json:"ephemeralRunnerSpec,omitempty"` } diff --git a/charts/gha-runner-scale-set-controller/crds/actions.github.com_ephemeralrunnersets.yaml b/charts/gha-runner-scale-set-controller/crds/actions.github.com_ephemeralrunnersets.yaml index 58890ad220..62c75cef46 100644 --- a/charts/gha-runner-scale-set-controller/crds/actions.github.com_ephemeralrunnersets.yaml +++ b/charts/gha-runner-scale-set-controller/crds/actions.github.com_ephemeralrunnersets.yaml @@ -6957,9 +6957,14 @@ spec: - containers type: object type: object + patchID: + description: PatchID is the unique identifier for the patch issued by the listener app + type: integer replicas: description: Replicas is the number of desired EphemeralRunner resources in the k8s namespace. type: integer + required: + - patchID type: object status: description: EphemeralRunnerSetStatus defines the observed state of EphemeralRunnerSet diff --git a/cmd/ghalistener/app/app.go b/cmd/ghalistener/app/app.go index 2d903fa955..76fc38249d 100644 --- a/cmd/ghalistener/app/app.go +++ b/cmd/ghalistener/app/app.go @@ -34,7 +34,7 @@ type Listener interface { //go:generate mockery --name Worker --output ./mocks --outpkg mocks --case underscore type Worker interface { HandleJobStarted(ctx context.Context, jobInfo *actions.JobStarted) error - HandleDesiredRunnerCount(ctx context.Context, count int) (int, error) + HandleDesiredRunnerCount(ctx context.Context, count int, jobsCompleted int) (int, error) } func New(config config.Config) (*App, error) { diff --git a/cmd/ghalistener/app/mocks/worker.go b/cmd/ghalistener/app/mocks/worker.go index 69828c383f..9f24819df1 100644 --- a/cmd/ghalistener/app/mocks/worker.go +++ b/cmd/ghalistener/app/mocks/worker.go @@ -15,23 +15,23 @@ type Worker struct { mock.Mock } -// HandleDesiredRunnerCount provides a mock function with given fields: ctx, count -func (_m *Worker) HandleDesiredRunnerCount(ctx context.Context, count int) (int, error) { - ret := _m.Called(ctx, count) +// HandleDesiredRunnerCount provides a mock function with given fields: ctx, count, acquireCount +func (_m *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, acquireCount int) (int, error) { + ret := _m.Called(ctx, count, acquireCount) var r0 int var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int) (int, error)); ok { - return rf(ctx, count) + if rf, ok := ret.Get(0).(func(context.Context, int, int) (int, error)); ok { + return rf(ctx, count, acquireCount) } - if rf, ok := ret.Get(0).(func(context.Context, int) int); ok { - r0 = rf(ctx, count) + if rf, ok := ret.Get(0).(func(context.Context, int, int) int); ok { + r0 = rf(ctx, count, acquireCount) } else { r0 = ret.Get(0).(int) } - if rf, ok := ret.Get(1).(func(context.Context, int) error); ok { - r1 = rf(ctx, count) + if rf, ok := ret.Get(1).(func(context.Context, int, int) error); ok { + r1 = rf(ctx, count, acquireCount) } else { r1 = ret.Error(1) } diff --git a/cmd/ghalistener/listener/listener.go b/cmd/ghalistener/listener/listener.go index 37aeac0bcb..c9fc680190 100644 --- a/cmd/ghalistener/listener/listener.go +++ b/cmd/ghalistener/listener/listener.go @@ -114,7 +114,7 @@ func New(config Config) (*Listener, error) { //go:generate mockery --name Handler --output ./mocks --outpkg mocks --case underscore type Handler interface { HandleJobStarted(ctx context.Context, jobInfo *actions.JobStarted) error - HandleDesiredRunnerCount(ctx context.Context, count int) (int, error) + HandleDesiredRunnerCount(ctx context.Context, count, jobsCompleted int) (int, error) } // Listen listens for incoming messages and handles them using the provided handler. @@ -145,7 +145,7 @@ func (l *Listener) Listen(ctx context.Context, handler Handler) error { } l.metrics.PublishStatistics(initialMessage.Statistics) - desiredRunners, err := handler.HandleDesiredRunnerCount(ctx, initialMessage.Statistics.TotalAssignedJobs) + desiredRunners, err := handler.HandleDesiredRunnerCount(ctx, initialMessage.Statistics.TotalAssignedJobs, 0) if err != nil { return fmt.Errorf("handling initial message failed: %w", err) } @@ -207,7 +207,7 @@ func (l *Listener) handleMessage(ctx context.Context, handler Handler, msg *acti l.metrics.PublishJobStarted(jobStarted) } - desiredRunners, err := handler.HandleDesiredRunnerCount(ctx, parsedMsg.statistics.TotalAssignedJobs) + desiredRunners, err := handler.HandleDesiredRunnerCount(ctx, parsedMsg.statistics.TotalAssignedJobs, len(parsedMsg.jobsCompleted)) if err != nil { return fmt.Errorf("failed to handle desired runner count: %w", err) } @@ -284,7 +284,6 @@ func (l *Listener) getMessage(ctx context.Context) (*actions.RunnerScaleSetMessa } return msg, nil - } func (l *Listener) deleteLastMessage(ctx context.Context) error { diff --git a/cmd/ghalistener/listener/listener_test.go b/cmd/ghalistener/listener/listener_test.go index c4d01bb6f8..610abc4084 100644 --- a/cmd/ghalistener/listener/listener_test.go +++ b/cmd/ghalistener/listener/listener_test.go @@ -427,7 +427,7 @@ func TestListener_Listen(t *testing.T) { var called bool handler := listenermocks.NewHandler(t) - handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything). + handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything, 0). Return(0, nil). Run( func(mock.Arguments) { @@ -485,11 +485,11 @@ func TestListener_Listen(t *testing.T) { config.Client = client handler := listenermocks.NewHandler(t) - handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything). + handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything, 0). Return(0, nil). Once() - handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything). + handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything, 0). Return(0, nil). Once() diff --git a/cmd/ghalistener/listener/metrics_test.go b/cmd/ghalistener/listener/metrics_test.go index 0f94c28d99..c333615d59 100644 --- a/cmd/ghalistener/listener/metrics_test.go +++ b/cmd/ghalistener/listener/metrics_test.go @@ -86,7 +86,7 @@ func TestInitialMetrics(t *testing.T) { config.Client = client handler := listenermocks.NewHandler(t) - handler.On("HandleDesiredRunnerCount", mock.Anything, sessionStatistics.TotalAssignedJobs). + handler.On("HandleDesiredRunnerCount", mock.Anything, sessionStatistics.TotalAssignedJobs, 0). Return(sessionStatistics.TotalAssignedJobs, nil). Once() @@ -178,7 +178,7 @@ func TestHandleMessageMetrics(t *testing.T) { handler := listenermocks.NewHandler(t) handler.On("HandleJobStarted", mock.Anything, jobsStarted[0]).Return(nil).Once() - handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything).Return(desiredResult, nil).Once() + handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything, 2).Return(desiredResult, nil).Once() client := listenermocks.NewClient(t) client.On("DeleteMessage", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() diff --git a/cmd/ghalistener/listener/mocks/handler.go b/cmd/ghalistener/listener/mocks/handler.go index edc1b30b38..b910d79f9e 100644 --- a/cmd/ghalistener/listener/mocks/handler.go +++ b/cmd/ghalistener/listener/mocks/handler.go @@ -15,23 +15,23 @@ type Handler struct { mock.Mock } -// HandleDesiredRunnerCount provides a mock function with given fields: ctx, count -func (_m *Handler) HandleDesiredRunnerCount(ctx context.Context, count int) (int, error) { - ret := _m.Called(ctx, count) +// HandleDesiredRunnerCount provides a mock function with given fields: ctx, count, jobsCompleted +func (_m *Handler) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCompleted int) (int, error) { + ret := _m.Called(ctx, count, jobsCompleted) var r0 int var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int) (int, error)); ok { - return rf(ctx, count) + if rf, ok := ret.Get(0).(func(context.Context, int, int) (int, error)); ok { + return rf(ctx, count, jobsCompleted) } - if rf, ok := ret.Get(0).(func(context.Context, int) int); ok { - r0 = rf(ctx, count) + if rf, ok := ret.Get(0).(func(context.Context, int, int) int); ok { + r0 = rf(ctx, count, jobsCompleted) } else { r0 = ret.Get(0).(int) } - if rf, ok := ret.Get(1).(func(context.Context, int) error); ok { - r1 = rf(ctx, count) + if rf, ok := ret.Get(1).(func(context.Context, int, int) error); ok { + r1 = rf(ctx, count, jobsCompleted) } else { r1 = ret.Error(1) } diff --git a/cmd/ghalistener/worker/worker.go b/cmd/ghalistener/worker/worker.go index 9387a5657e..3622753524 100644 --- a/cmd/ghalistener/worker/worker.go +++ b/cmd/ghalistener/worker/worker.go @@ -38,18 +38,20 @@ type Config struct { // The Worker's role is to process the messages it receives from the listener. // It then initiates Kubernetes API requests to carry out the necessary actions. type Worker struct { - clientset *kubernetes.Clientset - config Config - lastPatch int - logger *logr.Logger + clientset *kubernetes.Clientset + config Config + lastPatch int + lastPatchID int + logger *logr.Logger } var _ listener.Handler = (*Worker)(nil) func New(config Config, options ...Option) (*Worker, error) { w := &Worker{ - config: config, - lastPatch: -1, + config: config, + lastPatch: -1, + lastPatchID: -1, } conf, err := rest.InClusterConfig() @@ -161,7 +163,7 @@ func (w *Worker) HandleJobStarted(ctx context.Context, jobInfo *actions.JobStart // The function then scales the ephemeral runner set by applying the merge patch. // Finally, it logs the scaled ephemeral runner set details and returns nil if successful. // If any error occurs during the process, it returns an error with a descriptive message. -func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int) (int, error) { +func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCompleted int) (int, error) { // Max runners should always be set by the resource builder either to the configured value, // or the maximum int32 (resourcebuilder.newAutoScalingListener()). targetRunnerCount := min(w.config.MinRunners+count, w.config.MaxRunners) @@ -172,17 +174,22 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int) (int, "min", w.config.MinRunners, "max", w.config.MaxRunners, "currentRunnerCount", w.lastPatch, + "jobsCompleted", jobsCompleted, } - if targetRunnerCount == w.lastPatch { - w.logger.Info("Skipping patching of EphemeralRunnerSet as the desired count has not changed", logValues...) + if w.lastPatch == targetRunnerCount && jobsCompleted == 0 { + w.logger.Info("Skipping patch", logValues...) return targetRunnerCount, nil } + w.lastPatchID++ + w.lastPatch = targetRunnerCount + original, err := json.Marshal( &v1alpha1.EphemeralRunnerSet{ Spec: v1alpha1.EphemeralRunnerSetSpec{ Replicas: -1, + PatchID: -1, }, }, ) @@ -194,6 +201,7 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int) (int, &v1alpha1.EphemeralRunnerSet{ Spec: v1alpha1.EphemeralRunnerSetSpec{ Replicas: targetRunnerCount, + PatchID: w.lastPatchID, }, }, ) diff --git a/config/crd/bases/actions.github.com_ephemeralrunnersets.yaml b/config/crd/bases/actions.github.com_ephemeralrunnersets.yaml index 58890ad220..62c75cef46 100644 --- a/config/crd/bases/actions.github.com_ephemeralrunnersets.yaml +++ b/config/crd/bases/actions.github.com_ephemeralrunnersets.yaml @@ -6957,9 +6957,14 @@ spec: - containers type: object type: object + patchID: + description: PatchID is the unique identifier for the patch issued by the listener app + type: integer replicas: description: Replicas is the number of desired EphemeralRunner resources in the k8s namespace. type: integer + required: + - patchID type: object status: description: EphemeralRunnerSetStatus defines the observed state of EphemeralRunnerSet diff --git a/controllers/actions.github.com/constants.go b/controllers/actions.github.com/constants.go index 8aef858a10..cbf3309c11 100644 --- a/controllers/actions.github.com/constants.go +++ b/controllers/actions.github.com/constants.go @@ -42,6 +42,7 @@ const AutoscalingRunnerSetCleanupFinalizerName = "actions.github.com/cleanup-pro const ( AnnotationKeyGitHubRunnerGroupName = "actions.github.com/runner-group-name" AnnotationKeyGitHubRunnerScaleSetName = "actions.github.com/runner-scale-set-name" + AnnotationKeyPatchID = "actions.github.com/patch-id" ) // Labels applied to listener roles diff --git a/controllers/actions.github.com/ephemeralrunner_controller.go b/controllers/actions.github.com/ephemeralrunner_controller.go index f8ce363502..e5f5b30ec1 100644 --- a/controllers/actions.github.com/ephemeralrunner_controller.go +++ b/controllers/actions.github.com/ephemeralrunner_controller.go @@ -133,6 +133,23 @@ func (r *EphemeralRunnerReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{}, nil } + if ephemeralRunner.IsDone() { + log.Info("Cleaning up resources after after ephemeral runner termination", "phase", ephemeralRunner.Status.Phase) + done, err := r.cleanupResources(ctx, ephemeralRunner, log) + if err != nil { + log.Error(err, "Failed to clean up ephemeral runner owned resources") + return ctrl.Result{}, err + } + if !done { + log.Info("Waiting for ephemeral runner owned resources to be deleted") + return ctrl.Result{Requeue: true}, nil + } + // Stop reconciling on this object. + // The EphemeralRunnerSet is responsible for cleaning it up. + log.Info("EphemeralRunner has already finished. Stopping reconciliation and waiting for EphemeralRunnerSet to clean it up", "phase", ephemeralRunner.Status.Phase) + return ctrl.Result{}, nil + } + if !controllerutil.ContainsFinalizer(ephemeralRunner, ephemeralRunnerActionsFinalizerName) { log.Info("Adding runner registration finalizer") err := patch(ctx, r.Client, ephemeralRunner, func(obj *v1alpha1.EphemeralRunner) { @@ -159,13 +176,6 @@ func (r *EphemeralRunnerReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{}, nil } - if ephemeralRunner.Status.Phase == corev1.PodSucceeded || ephemeralRunner.Status.Phase == corev1.PodFailed { - // Stop reconciling on this object. - // The EphemeralRunnerSet is responsible for cleaning it up. - log.Info("EphemeralRunner has already finished. Stopping reconciliation and waiting for EphemeralRunnerSet to clean it up", "phase", ephemeralRunner.Status.Phase) - return ctrl.Result{}, nil - } - if ephemeralRunner.Status.RunnerId == 0 { log.Info("Creating new ephemeral runner registration and updating status with runner config") return r.updateStatusWithRunnerConfig(ctx, ephemeralRunner, log) @@ -324,7 +334,7 @@ func (r *EphemeralRunnerReconciler) cleanupResources(ctx context.Context, epheme } } return false, nil - case err != nil && !kerrors.IsNotFound(err): + case !kerrors.IsNotFound(err): return false, err } log.Info("Pod is deleted") @@ -341,7 +351,7 @@ func (r *EphemeralRunnerReconciler) cleanupResources(ctx context.Context, epheme } } return false, nil - case err != nil && !kerrors.IsNotFound(err): + case !kerrors.IsNotFound(err): return false, err } log.Info("Secret is deleted") diff --git a/controllers/actions.github.com/ephemeralrunnerset_controller.go b/controllers/actions.github.com/ephemeralrunnerset_controller.go index 4650f9fda6..12a22f1dc3 100644 --- a/controllers/actions.github.com/ephemeralrunnerset_controller.go +++ b/controllers/actions.github.com/ephemeralrunnerset_controller.go @@ -22,6 +22,7 @@ import ( "fmt" "net/http" "sort" + "strconv" "strings" "github.com/actions/actions-runner-controller/apis/actions.github.com/v1alpha1" @@ -156,14 +157,14 @@ func (r *EphemeralRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.R return ctrl.Result{}, err } - pendingEphemeralRunners, runningEphemeralRunners, finishedEphemeralRunners, failedEphemeralRunners, deletingEphemeralRunners := categorizeEphemeralRunners(ephemeralRunnerList) + ephemeralRunnerState := newEphemeralRunnerState(ephemeralRunnerList) log.Info("Ephemeral runner counts", - "pending", len(pendingEphemeralRunners), - "running", len(runningEphemeralRunners), - "finished", len(finishedEphemeralRunners), - "failed", len(failedEphemeralRunners), - "deleting", len(deletingEphemeralRunners), + "pending", len(ephemeralRunnerState.pending), + "running", len(ephemeralRunnerState.running), + "finished", len(ephemeralRunnerState.finished), + "failed", len(ephemeralRunnerState.failed), + "deleting", len(ephemeralRunnerState.deleting), ) if r.PublishMetrics { @@ -183,54 +184,52 @@ func (r *EphemeralRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.R Organization: parsedURL.Organization, Enterprise: parsedURL.Enterprise, }, - len(pendingEphemeralRunners), - len(runningEphemeralRunners), - len(failedEphemeralRunners), + len(ephemeralRunnerState.pending), + len(ephemeralRunnerState.running), + len(ephemeralRunnerState.failed), ) } - // cleanup finished runners and proceed - var errs []error - for i := range finishedEphemeralRunners { - log.Info("Deleting finished ephemeral runner", "name", finishedEphemeralRunners[i].Name) - if err := r.Delete(ctx, finishedEphemeralRunners[i]); err != nil { - if !kerrors.IsNotFound(err) { - errs = append(errs, err) + total := ephemeralRunnerState.scaleTotal() + if ephemeralRunnerSet.Spec.PatchID == 0 || ephemeralRunnerSet.Spec.PatchID != ephemeralRunnerState.latestPatchID { + defer func() { + if err := r.cleanupFinishedEphemeralRunners(ctx, ephemeralRunnerState.finished, log); err != nil { + log.Error(err, "failed to cleanup finished ephemeral runners") + } + }() + + log.Info("Scaling comparison", "current", total, "desired", ephemeralRunnerSet.Spec.Replicas) + switch { + case total < ephemeralRunnerSet.Spec.Replicas: // Handle scale up + count := ephemeralRunnerSet.Spec.Replicas - total + log.Info("Creating new ephemeral runners (scale up)", "count", count) + if err := r.createEphemeralRunners(ctx, ephemeralRunnerSet, count, log); err != nil { + log.Error(err, "failed to make ephemeral runner") + return ctrl.Result{}, err } - } - } - - if len(errs) > 0 { - mergedErrs := multierr.Combine(errs...) - log.Error(mergedErrs, "Failed to delete finished ephemeral runners") - return ctrl.Result{}, mergedErrs - } - - total := len(pendingEphemeralRunners) + len(runningEphemeralRunners) + len(failedEphemeralRunners) - log.Info("Scaling comparison", "current", total, "desired", ephemeralRunnerSet.Spec.Replicas) - switch { - case total < ephemeralRunnerSet.Spec.Replicas: // Handle scale up - count := ephemeralRunnerSet.Spec.Replicas - total - log.Info("Creating new ephemeral runners (scale up)", "count", count) - if err := r.createEphemeralRunners(ctx, ephemeralRunnerSet, count, log); err != nil { - log.Error(err, "failed to make ephemeral runner") - return ctrl.Result{}, err - } - case total > ephemeralRunnerSet.Spec.Replicas: // Handle scale down scenario. - count := total - ephemeralRunnerSet.Spec.Replicas - log.Info("Deleting ephemeral runners (scale down)", "count", count) - if err := r.deleteIdleEphemeralRunners(ctx, ephemeralRunnerSet, pendingEphemeralRunners, runningEphemeralRunners, count, log); err != nil { - log.Error(err, "failed to delete idle runners") - return ctrl.Result{}, err + case total > ephemeralRunnerSet.Spec.Replicas: // Handle scale down scenario. + count := total - ephemeralRunnerSet.Spec.Replicas + log.Info("Deleting ephemeral runners (scale down)", "count", count) + if err := r.deleteIdleEphemeralRunners( + ctx, + ephemeralRunnerSet, + ephemeralRunnerState.pending, + ephemeralRunnerState.running, + count, + log, + ); err != nil { + log.Error(err, "failed to delete idle runners") + return ctrl.Result{}, err + } } } desiredStatus := v1alpha1.EphemeralRunnerSetStatus{ CurrentReplicas: total, - PendingEphemeralRunners: len(pendingEphemeralRunners), - RunningEphemeralRunners: len(runningEphemeralRunners), - FailedEphemeralRunners: len(failedEphemeralRunners), + PendingEphemeralRunners: len(ephemeralRunnerState.pending), + RunningEphemeralRunners: len(ephemeralRunnerState.running), + FailedEphemeralRunners: len(ephemeralRunnerState.failed), } // Update the status if needed. @@ -247,6 +246,21 @@ func (r *EphemeralRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.R return ctrl.Result{}, nil } +func (r *EphemeralRunnerSetReconciler) cleanupFinishedEphemeralRunners(ctx context.Context, finishedEphemeralRunners []*v1alpha1.EphemeralRunner, log logr.Logger) error { + // cleanup finished runners and proceed + var errs []error + for i := range finishedEphemeralRunners { + log.Info("Deleting finished ephemeral runner", "name", finishedEphemeralRunners[i].Name) + if err := r.Delete(ctx, finishedEphemeralRunners[i]); err != nil { + if !kerrors.IsNotFound(err) { + errs = append(errs, err) + } + } + } + + return multierr.Combine(errs...) +} + func (r *EphemeralRunnerSetReconciler) cleanUpProxySecret(ctx context.Context, ephemeralRunnerSet *v1alpha1.EphemeralRunnerSet, log logr.Logger) error { if ephemeralRunnerSet.Spec.EphemeralRunnerSpec.Proxy == nil { return nil @@ -284,19 +298,19 @@ func (r *EphemeralRunnerSetReconciler) cleanUpEphemeralRunners(ctx context.Conte return true, nil } - pendingEphemeralRunners, runningEphemeralRunners, finishedEphemeralRunners, failedEphemeralRunners, deletingEphemeralRunners := categorizeEphemeralRunners(ephemeralRunnerList) + ephemeralRunnerState := newEphemeralRunnerState(ephemeralRunnerList) log.Info("Clean up runner counts", - "pending", len(pendingEphemeralRunners), - "running", len(runningEphemeralRunners), - "finished", len(finishedEphemeralRunners), - "failed", len(failedEphemeralRunners), - "deleting", len(deletingEphemeralRunners), + "pending", len(ephemeralRunnerState.pending), + "running", len(ephemeralRunnerState.running), + "finished", len(ephemeralRunnerState.finished), + "failed", len(ephemeralRunnerState.failed), + "deleting", len(ephemeralRunnerState.deleting), ) log.Info("Cleanup finished or failed ephemeral runners") var errs []error - for _, ephemeralRunner := range append(finishedEphemeralRunners, failedEphemeralRunners...) { + for _, ephemeralRunner := range append(ephemeralRunnerState.finished, ephemeralRunnerState.failed...) { log.Info("Deleting ephemeral runner", "name", ephemeralRunner.Name) if err := r.Delete(ctx, ephemeralRunner); err != nil && !kerrors.IsNotFound(err) { errs = append(errs, err) @@ -310,7 +324,7 @@ func (r *EphemeralRunnerSetReconciler) cleanUpEphemeralRunners(ctx context.Conte } // avoid fetching the client if we have nothing left to do - if len(runningEphemeralRunners) == 0 && len(pendingEphemeralRunners) == 0 { + if len(ephemeralRunnerState.running) == 0 && len(ephemeralRunnerState.pending) == 0 { return false, nil } @@ -321,7 +335,7 @@ func (r *EphemeralRunnerSetReconciler) cleanUpEphemeralRunners(ctx context.Conte log.Info("Cleanup pending or running ephemeral runners") errs = errs[0:0] - for _, ephemeralRunner := range append(pendingEphemeralRunners, runningEphemeralRunners...) { + for _, ephemeralRunner := range append(ephemeralRunnerState.pending, ephemeralRunnerState.running...) { log.Info("Removing the ephemeral runner from the service", "name", ephemeralRunner.Name) _, err := r.deleteEphemeralRunnerWithActionsClient(ctx, ephemeralRunner, actionsClient, log) if err != nil { @@ -427,12 +441,13 @@ func (r *EphemeralRunnerSetReconciler) deleteIdleEphemeralRunners(ctx context.Co deletedCount := 0 for runners.next() { ephemeralRunner := runners.object() - if ephemeralRunner.Status.RunnerId == 0 { + isDone := ephemeralRunner.IsDone() + if !isDone && ephemeralRunner.Status.RunnerId == 0 { log.Info("Skipping ephemeral runner since it is not registered yet", "name", ephemeralRunner.Name) continue } - if ephemeralRunner.Status.JobRequestId > 0 { + if !isDone && ephemeralRunner.Status.JobRequestId > 0 { log.Info("Skipping ephemeral runner since it is running a job", "name", ephemeralRunner.Name, "jobRequestId", ephemeralRunner.Status.JobRequestId) continue } @@ -580,16 +595,22 @@ type ephemeralRunnerStepper struct { index int } -func newEphemeralRunnerStepper(pending, running []*v1alpha1.EphemeralRunner) *ephemeralRunnerStepper { - sort.Slice(pending, func(i, j int) bool { - return pending[i].GetCreationTimestamp().Time.Before(pending[j].GetCreationTimestamp().Time) - }) - sort.Slice(running, func(i, j int) bool { - return running[i].GetCreationTimestamp().Time.Before(running[j].GetCreationTimestamp().Time) +func newEphemeralRunnerStepper(primary []*v1alpha1.EphemeralRunner, othersOrdered ...[]*v1alpha1.EphemeralRunner) *ephemeralRunnerStepper { + sort.Slice(primary, func(i, j int) bool { + return primary[i].GetCreationTimestamp().Time.Before(primary[j].GetCreationTimestamp().Time) }) + for _, bucket := range othersOrdered { + sort.Slice(bucket, func(i, j int) bool { + return bucket[i].GetCreationTimestamp().Time.Before(bucket[j].GetCreationTimestamp().Time) + }) + } + + for _, bucket := range othersOrdered { + primary = append(primary, bucket...) + } return &ephemeralRunnerStepper{ - items: append(pending, running...), + items: primary, index: -1, } } @@ -613,28 +634,48 @@ func (s *ephemeralRunnerStepper) len() int { return len(s.items) } -func categorizeEphemeralRunners(ephemeralRunnerList *v1alpha1.EphemeralRunnerList) (pendingEphemeralRunners, runningEphemeralRunners, finishedEphemeralRunners, failedEphemeralRunners, deletingEphemeralRunners []*v1alpha1.EphemeralRunner) { +type ephemeralRunnerState struct { + pending []*v1alpha1.EphemeralRunner + running []*v1alpha1.EphemeralRunner + finished []*v1alpha1.EphemeralRunner + failed []*v1alpha1.EphemeralRunner + deleting []*v1alpha1.EphemeralRunner + + latestPatchID int +} + +func newEphemeralRunnerState(ephemeralRunnerList *v1alpha1.EphemeralRunnerList) *ephemeralRunnerState { + var ephemeralRunnerState ephemeralRunnerState + for i := range ephemeralRunnerList.Items { r := &ephemeralRunnerList.Items[i] + patchID, err := strconv.Atoi(r.Annotations[AnnotationKeyPatchID]) + if err == nil && patchID > ephemeralRunnerState.latestPatchID { + ephemeralRunnerState.latestPatchID = patchID + } if !r.ObjectMeta.DeletionTimestamp.IsZero() { - deletingEphemeralRunners = append(deletingEphemeralRunners, r) + ephemeralRunnerState.deleting = append(ephemeralRunnerState.deleting, r) continue } switch r.Status.Phase { case corev1.PodRunning: - runningEphemeralRunners = append(runningEphemeralRunners, r) + ephemeralRunnerState.running = append(ephemeralRunnerState.running, r) case corev1.PodSucceeded: - finishedEphemeralRunners = append(finishedEphemeralRunners, r) + ephemeralRunnerState.finished = append(ephemeralRunnerState.finished, r) case corev1.PodFailed: - failedEphemeralRunners = append(failedEphemeralRunners, r) + ephemeralRunnerState.failed = append(ephemeralRunnerState.failed, r) default: // Pending or no phase should be considered as pending. // // If field is not set, that means that the EphemeralRunner // did not yet have chance to update the Status.Phase field. - pendingEphemeralRunners = append(pendingEphemeralRunners, r) + ephemeralRunnerState.pending = append(ephemeralRunnerState.pending, r) } } - return + return &ephemeralRunnerState +} + +func (s *ephemeralRunnerState) scaleTotal() int { + return len(s.pending) + len(s.running) + len(s.failed) } diff --git a/controllers/actions.github.com/ephemeralrunnerset_controller_test.go b/controllers/actions.github.com/ephemeralrunnerset_controller_test.go index d1477596f7..271a352879 100644 --- a/controllers/actions.github.com/ephemeralrunnerset_controller_test.go +++ b/controllers/actions.github.com/ephemeralrunnerset_controller_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "encoding/base64" + "errors" "fmt" "net/http" "net/http/httptest" @@ -274,14 +275,17 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { }) Context("When a new EphemeralRunnerSet scale up and down", func() { - It("It should delete finished EphemeralRunner and create new EphemeralRunner", func() { + It("Should scale only on patch ID change", func() { created := new(actionsv1alpha1.EphemeralRunnerSet) err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, created) Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") + patchID := 1 + // Scale up the EphemeralRunnerSet updated := created.DeepCopy() updated.Spec.Replicas = 5 + updated.Spec.PatchID = patchID err = k8sClient.Update(ctx, updated) Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") @@ -317,7 +321,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { return len(runnerList.Items), nil }, ephemeralRunnerSetTestTimeout, - ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(5), "5 EphemeralRunner should be created") + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(5), "5 EphemeralRunner should be created") // Mark one of the EphemeralRunner as finished finishedRunner := runnerList.Items[4].DeepCopy() @@ -325,7 +330,7 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { err = k8sClient.Status().Patch(ctx, finishedRunner, client.MergeFrom(&runnerList.Items[4])) Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") - // Wait for the finished EphemeralRunner to be deleted + // Wait for the finished EphemeralRunner to be set to succeeded Eventually( func() error { runnerList := new(actionsv1alpha1.EphemeralRunnerList) @@ -335,17 +340,35 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { } for _, runner := range runnerList.Items { - if runner.Name == finishedRunner.Name { - return fmt.Errorf("EphemeralRunner is not deleted") + if runner.Name != finishedRunner.Name { + continue } + + if runner.Status.Phase != corev1.PodSucceeded { + return fmt.Errorf("EphemeralRunner is not finished") + } + // found pod succeeded + return nil } - return nil + return errors.New("Finished ephemeral runner is not found") }, ephemeralRunnerSetTestTimeout, - ephemeralRunnerSetTestInterval).Should(Succeed(), "Finished EphemeralRunner should be deleted") + ephemeralRunnerSetTestInterval, + ).Should(Succeed(), "Finished EphemeralRunner should be deleted") - // We should still have the EphemeralRunnerSet scale up + // After one ephemeral runner is finished, simulate job done patch + patchID++ + original := new(actionsv1alpha1.EphemeralRunnerSet) + err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, original) + Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") + updated = original.DeepCopy() + updated.Spec.PatchID = patchID + updated.Spec.Replicas = 4 + err = k8sClient.Patch(ctx, updated, client.MergeFrom(original)) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") + + // Only finished ephemeral runner should be deleted runnerList = new(actionsv1alpha1.EphemeralRunnerList) Eventually( func() (int, error) { @@ -354,35 +377,27 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { return -1, err } - // Set status to simulate a configured EphemeralRunner - refetch := false - for i, runner := range runnerList.Items { - if runner.Status.RunnerId == 0 { - updatedRunner := runner.DeepCopy() - updatedRunner.Status.Phase = corev1.PodRunning - updatedRunner.Status.RunnerId = i + 100 - err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runner)) - Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") - refetch = true - } - } - - if refetch { - err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) - if err != nil { - return -1, err + for _, runner := range runnerList.Items { + if runner.Status.Phase == corev1.PodSucceeded { + return -1, fmt.Errorf("Finished EphemeralRunner should be deleted") } } return len(runnerList.Items), nil }, ephemeralRunnerSetTestTimeout, - ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(5), "5 EphemeralRunner should be created") + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(4), "4 EphemeralRunner should be created") - // Scale down the EphemeralRunnerSet - updated = created.DeepCopy() + // Scaling down the EphemeralRunnerSet + patchID++ + original = new(actionsv1alpha1.EphemeralRunnerSet) + err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, original) + Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") + updated = original.DeepCopy() + updated.Spec.PatchID = patchID updated.Spec.Replicas = 3 - err = k8sClient.Patch(ctx, updated, client.MergeFrom(created)) + err = k8sClient.Patch(ctx, updated, client.MergeFrom(original)) Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") // Wait for the EphemeralRunnerSet to be scaled down @@ -417,7 +432,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { return len(runnerList.Items), nil }, ephemeralRunnerSetTestTimeout, - ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(3), "3 EphemeralRunner should be created") + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(3), "3 EphemeralRunner should be created") // We will not scale down runner that is running jobs runningRunner := runnerList.Items[0].DeepCopy() @@ -430,10 +446,15 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { err = k8sClient.Status().Patch(ctx, runningRunner, client.MergeFrom(&runnerList.Items[1])) Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") - // Scale down to 1 - updated = created.DeepCopy() + // Scale down to 1 while 2 are running + patchID++ + original = new(actionsv1alpha1.EphemeralRunnerSet) + err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, original) + Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") + updated = original.DeepCopy() + updated.Spec.PatchID = patchID updated.Spec.Replicas = 1 - err = k8sClient.Patch(ctx, updated, client.MergeFrom(created)) + err = k8sClient.Patch(ctx, updated, client.MergeFrom(original)) Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") // Wait for the EphemeralRunnerSet to be scaled down to 2 since we still have 2 runner running jobs @@ -468,7 +489,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { return len(runnerList.Items), nil }, ephemeralRunnerSetTestTimeout, - ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") // We will not scale down failed runner failedRunner := runnerList.Items[0].DeepCopy() @@ -476,15 +498,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { err = k8sClient.Status().Patch(ctx, failedRunner, client.MergeFrom(&runnerList.Items[0])) Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") - // Scale down to 0 - updated = created.DeepCopy() - updated.Spec.Replicas = 0 - err = k8sClient.Patch(ctx, updated, client.MergeFrom(created)) - Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") - - // We should not scale down the EphemeralRunnerSet since we still have 1 runner running job and 1 failed runner runnerList = new(actionsv1alpha1.EphemeralRunnerList) - Consistently( + Eventually( func() (int, error) { err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) if err != nil { @@ -514,7 +529,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { return len(runnerList.Items), nil }, ephemeralRunnerSetTestTimeout, - ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") // We will scale down to 0 when the running job is completed and the failed runner is deleted runningRunner = runnerList.Items[1].DeepCopy() @@ -525,6 +541,17 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { err = k8sClient.Delete(ctx, &runnerList.Items[0]) Expect(err).NotTo(HaveOccurred(), "failed to delete EphemeralRunner") + // Scale down to 0 while 1 ephemeral runner is failed + patchID++ + original = new(actionsv1alpha1.EphemeralRunnerSet) + err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, original) + Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") + updated = original.DeepCopy() + updated.Spec.PatchID = patchID + updated.Spec.Replicas = 0 + err = k8sClient.Patch(ctx, updated, client.MergeFrom(original)) + Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") + // Wait for the EphemeralRunnerSet to be scaled down to 0 runnerList = new(actionsv1alpha1.EphemeralRunnerList) Eventually( @@ -557,7 +584,8 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() { return len(runnerList.Items), nil }, ephemeralRunnerSetTestTimeout, - ephemeralRunnerSetTestInterval).Should(BeEquivalentTo(0), "0 EphemeralRunner should be created") + ephemeralRunnerSetTestInterval, + ).Should(BeEquivalentTo(0), "0 EphemeralRunner should be created") }) It("Should update status on Ephemeral Runner state changes", func() { diff --git a/controllers/actions.github.com/resourcebuilder.go b/controllers/actions.github.com/resourcebuilder.go index 9ab5e21890..f6e54b2948 100644 --- a/controllers/actions.github.com/resourcebuilder.go +++ b/controllers/actions.github.com/resourcebuilder.go @@ -563,6 +563,7 @@ func (b *resourceBuilder) newEphemeralRunner(ephemeralRunnerSet *v1alpha1.Epheme for key, val := range ephemeralRunnerSet.Annotations { annotations[key] = val } + annotations[AnnotationKeyPatchID] = strconv.Itoa(ephemeralRunnerSet.Spec.PatchID) return &v1alpha1.EphemeralRunner{ TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{