diff --git a/api/etok.dev/v1alpha1/run_types.go b/api/etok.dev/v1alpha1/run_types.go index 130d19a2..6a4a1c04 100644 --- a/api/etok.dev/v1alpha1/run_types.go +++ b/api/etok.dev/v1alpha1/run_types.go @@ -100,6 +100,13 @@ func (r *Run) PodName() string { return r.Name } type RunStatus struct { // Current phase of the run's lifecycle. Phase RunPhase `json:"phase,omitempty"` + + // True if resource has been reconciled at least once. + Reconciled bool `json:"reconciled,omitempty"` +} + +func (r *Run) IsReconciled() bool { + return r.RunStatus.Reconciled } type RunPhase string diff --git a/api/etok.dev/v1alpha1/workspace_types.go b/api/etok.dev/v1alpha1/workspace_types.go index 13f99ccf..b0deed45 100644 --- a/api/etok.dev/v1alpha1/workspace_types.go +++ b/api/etok.dev/v1alpha1/workspace_types.go @@ -124,6 +124,13 @@ type WorkspaceStatus struct { // Lifecycle phase of workspace. Phase WorkspacePhase `json:"phase,omitempty"` + + // True if resource has been reconciled at least once. + Reconciled bool `json:"reconciled,omitempty"` +} + +func (ws *Workspace) IsReconciled() bool { + return ws.Status.Reconciled } type WorkspacePhase string diff --git a/cmd/launcher/commands.go b/cmd/launcher/commands.go index 3324f402..1523cfe0 100644 --- a/cmd/launcher/commands.go +++ b/cmd/launcher/commands.go @@ -112,6 +112,8 @@ func (rc runCommand) cobraCommand(opts *cmdutil.Options, o *LauncherOptions) *co cmd.Flags().DurationVar(&o.EnqueueTimeout, "enqueue-timeout", 10*time.Second, "timeout waiting to be queued") cmd.Flags().StringVar(&namespacedWorkspace, "workspace", defaultWorkspace, "etok workspace") + cmd.Flags().DurationVar(&o.ReconcileTimeout, "reconcile-timeout", defaultReconcileTimeout, "timeout for resource to be reconciled") + return cmd } diff --git a/cmd/launcher/launcher.go b/cmd/launcher/launcher.go index 0a8bc278..275ddd36 100644 --- a/cmd/launcher/launcher.go +++ b/cmd/launcher/launcher.go @@ -33,13 +33,15 @@ import ( ) const ( - defaultWorkspace = "default/default" + defaultWorkspace = "default/default" + defaultReconcileTimeout = 10 * time.Second ) var ( errNotAuthorised = errors.New("you are not authorised") errEnqueueTimeout = errors.New("timed out waiting for run to be enqueued") errWorkspaceNotFound = errors.New("workspace not found") + errReconcileTimeout = errors.New("timed out waiting for run to be reconciled") ) // LauncherOptions deploys a new Run. It monitors not only its progress, but @@ -78,6 +80,8 @@ type LauncherOptions struct { PodTimeout time.Duration // timeout waiting to be queued EnqueueTimeout time.Duration + // Timeout for resource to be reconciled (at least once) + ReconcileTimeout time.Duration // Disable TTY detection DisableTTY bool @@ -85,6 +89,9 @@ type LauncherOptions struct { // Recall if resources are created so that if error occurs they can be cleaned up createdRun bool createdArchive bool + + // For testing purposes toggle obj having been reconciled + reconciled bool } func (o *LauncherOptions) lookupEnvFile(cmd *cobra.Command) error { @@ -123,6 +130,14 @@ func (o *LauncherOptions) Run(ctx context.Context) error { g, gctx := errgroup.WithContext(ctx) + // Wait for resource to have been successfully reconciled at least once + // within the ReconcileTimeout (If we don't do this and the operator is + // either not installed or malfunctioning then the user would be none the + // wiser until the much longer PodTimeout had expired). + g.Go(func() error { + return o.waitForReconcile(gctx, run) + }) + // Wait for pod and when ready send pod on chan podch := make(chan *corev1.Pod, 1) g.Go(func() error { @@ -133,7 +148,7 @@ func (o *LauncherOptions) Run(ctx context.Context) error { // Only commands other than plan are queued - wait for run to be // enqueued g.Go(func() error { - return o.waitForEnqueued(ctx, run) + return o.waitForEnqueued(gctx, run) }) } @@ -338,6 +353,9 @@ func (o *LauncherOptions) createRun(ctx context.Context, name, configMapName str run.Verbosity = o.Verbosity + // For testing purposes mimic obj having been reconciled + run.Reconciled = o.reconciled + if isTTY { run.AttachSpec.Handshake = true run.AttachSpec.HandshakeTimeout = o.HandshakeTimeout.String() @@ -384,3 +402,21 @@ func (o *LauncherOptions) createConfigMap(ctx context.Context, tarball []byte, n return nil } + +// waitForReconcile waits for the workspace resource to be reconciled. +func (o *LauncherOptions) waitForReconcile(ctx context.Context, run *v1alpha1.Run) error { + lw := &k8s.RunListWatcher{Client: o.EtokClient, Name: run.Name, Namespace: run.Namespace} + hdlr := handlers.Reconciled(run) + + ctx, cancel := context.WithTimeout(ctx, o.ReconcileTimeout) + defer cancel() + + _, err := watchtools.UntilWithSync(ctx, lw, &v1alpha1.Run{}, nil, hdlr) + if err != nil { + if errors.Is(err, wait.ErrWaitTimeout) { + return errReconcileTimeout + } + return err + } + return nil +} diff --git a/cmd/launcher/launcher_test.go b/cmd/launcher/launcher_test.go index dacb9e06..ddcfc2d9 100644 --- a/cmd/launcher/launcher_test.go +++ b/cmd/launcher/launcher_test.go @@ -43,9 +43,11 @@ func TestLauncher(t *testing.T) { // Size of content to be archived size int // Mock exit code of runner pod - code int32 - setOpts func(*cmdutil.Options) - assertions func(*LauncherOptions) + code int32 + // Toggle mocking a successful reconcile status + disableMockReconcile bool + setOpts func(*cmdutil.Options) + assertions func(*LauncherOptions) }{ { name: "plan", @@ -260,6 +262,13 @@ func TestLauncher(t *testing.T) { size: 1024*1024 + 1, err: archive.MaxSizeError(archive.MaxConfigSize), }, + { + name: "reconcile timeout exceeded", + args: []string{"--reconcile-timeout", "10ms"}, + objs: []runtime.Object{testobj.Workspace("default", "default")}, + disableMockReconcile: true, + err: errReconcileTimeout, + }, } // Run tests for each command @@ -285,6 +294,11 @@ func TestLauncher(t *testing.T) { cmdOpts := &LauncherOptions{RunName: "run-12345"} + if !tt.disableMockReconcile { + // Mock successful reconcile + cmdOpts.reconciled = true + } + // create cobra command cmd := rc.cobraCommand(opts, cmdOpts) cmd.SetOut(out) diff --git a/cmd/workspace/workspace_new.go b/cmd/workspace/workspace_new.go index d85e8ed7..3e54688e 100644 --- a/cmd/workspace/workspace_new.go +++ b/cmd/workspace/workspace_new.go @@ -16,6 +16,7 @@ import ( "github.com/leg100/etok/pkg/labels" "github.com/leg100/etok/pkg/monitors" "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" "github.com/leg100/etok/pkg/env" "github.com/leg100/etok/pkg/logstreamer" @@ -27,7 +28,7 @@ import ( ) const ( - defaultTimeoutWorkspace = 10 * time.Second + defaultReconcileTimeout = 10 * time.Second defaultPodTimeout = 60 * time.Second defaultCacheSize = "1Gi" defaultSecretName = "etok" @@ -35,7 +36,8 @@ const ( ) var ( - errPodTimeout = errors.New("timed out waiting for pod to be ready") + errPodTimeout = errors.New("timed out waiting for pod to be ready") + errReconcileTimeout = errors.New("timed out waiting for workspace to be reconciled") ) type NewOptions struct { @@ -54,8 +56,9 @@ type NewOptions struct { DisableCreateServiceAccount bool // Create a secret if it does not exist DisableCreateSecret bool - // Timeout for workspace to be healthy - TimeoutWorkspace time.Duration + + // Timeout for resource to be reconciled (at least once) + ReconcileTimeout time.Duration // Timeout for workspace pod to be ready PodTimeout time.Duration @@ -68,6 +71,9 @@ type NewOptions struct { createdWorkspace bool createdServiceAccount bool createdSecret bool + + // For testing purposes toggle obj having been reconciled + reconciled bool } func NewCmd(opts *cmdutil.Options) (*cobra.Command, *NewOptions) { @@ -119,7 +125,7 @@ func NewCmd(opts *cmdutil.Options) (*cobra.Command, *NewOptions) { // that so use empty string and override later (see above) o.WorkspaceSpec.Cache.StorageClass = cmd.Flags().String("storage-class", "", "StorageClass of PersistentVolume for cache") - cmd.Flags().DurationVar(&o.TimeoutWorkspace, "timeout", defaultTimeoutWorkspace, "Time to wait for workspace to be healthy") + cmd.Flags().DurationVar(&o.ReconcileTimeout, "reconcile-timeout", defaultReconcileTimeout, "timeout for resource to be reconciled") cmd.Flags().DurationVar(&o.PodTimeout, "pod-timeout", defaultPodTimeout, "timeout for pod to be ready") cmd.Flags().StringSliceVar(&o.WorkspaceSpec.PrivilegedCommands, "privileged-commands", []string{}, "Set privileged commands") @@ -151,13 +157,31 @@ func (o *NewOptions) Run(ctx context.Context) error { o.createdWorkspace = true fmt.Printf("Created workspace %s\n", o.name()) - // Wait until container can be streamed from + g, gctx := errgroup.WithContext(ctx) + fmt.Println("Waiting for workspace pod to be ready...") - pod, err := o.waitForContainer(ctx, ws) - if err != nil { + podch := make(chan *corev1.Pod, 1) + g.Go(func() error { + return o.waitForContainer(gctx, ws, podch) + }) + + // Wait for resource to have been successfully reconciled at least once + // within the ReconcileTimeout (If we don't do this and the operator is + // either not installed or malfunctioning then the user would be none the + // wiser until the much longer PodTimeout had expired). + g.Go(func() error { + return o.waitForReconcile(gctx, ws) + }) + + // Wait for both workspace to have been reconciled and for its pod container + // to be ready + if err := g.Wait(); err != nil { return err } + // Receive ready pod + pod := <-podch + // Monitor exit code; non-blocking exit := monitors.ExitMonitor(ctx, o.KubeClient, pod.Name, pod.Namespace, controllers.InstallerContainerName) @@ -217,6 +241,9 @@ func (o *NewOptions) createWorkspace(ctx context.Context) (*v1alpha1.Workspace, ws.Spec.Verbosity = o.Verbosity + // For testing purposes mimic obj having been reconciled + ws.Status.Reconciled = o.reconciled + return o.WorkspacesClient(o.Namespace).Create(ctx, ws, metav1.CreateOptions{}) } @@ -294,7 +321,7 @@ func (o *NewOptions) createServiceAccount(ctx context.Context, name string) (*co // waitForContainer returns true once the installer container can be streamed // from -func (o *NewOptions) waitForContainer(ctx context.Context, ws *v1alpha1.Workspace) (*corev1.Pod, error) { +func (o *NewOptions) waitForContainer(ctx context.Context, ws *v1alpha1.Workspace, podch chan<- *corev1.Pod) error { lw := &k8s.PodListWatcher{Client: o.KubeClient, Name: ws.PodName(), Namespace: ws.Namespace} hdlr := handlers.ContainerReady(ws.PodName(), controllers.InstallerContainerName, true, false) @@ -304,9 +331,28 @@ func (o *NewOptions) waitForContainer(ctx context.Context, ws *v1alpha1.Workspac event, err := watchtools.UntilWithSync(ctx, lw, &corev1.Pod{}, nil, hdlr) if err != nil { if errors.Is(err, wait.ErrWaitTimeout) { - return nil, errPodTimeout + return errPodTimeout } - return nil, err + return err } - return event.Object.(*corev1.Pod), err + podch <- event.Object.(*corev1.Pod) + return nil +} + +// waitForReconcile waits for the workspace resource to be reconciled. +func (o *NewOptions) waitForReconcile(ctx context.Context, ws *v1alpha1.Workspace) error { + lw := &k8s.WorkspaceListWatcher{Client: o.EtokClient, Name: ws.Name, Namespace: ws.Namespace} + hdlr := handlers.Reconciled(ws) + + ctx, cancel := context.WithTimeout(ctx, o.ReconcileTimeout) + defer cancel() + + _, err := watchtools.UntilWithSync(ctx, lw, &v1alpha1.Workspace{}, nil, hdlr) + if err != nil { + if errors.Is(err, wait.ErrWaitTimeout) { + return errReconcileTimeout + } + return err + } + return nil } diff --git a/cmd/workspace/workspace_new_test.go b/cmd/workspace/workspace_new_test.go index b945f9fc..e5d26da0 100644 --- a/cmd/workspace/workspace_new_test.go +++ b/cmd/workspace/workspace_new_test.go @@ -27,12 +27,14 @@ func TestNewWorkspace(t *testing.T) { var fakeError = errors.New("fake error") tests := []struct { - name string - args []string - err func(*testutil.T, error) - objs []runtime.Object - setOpts func(*cmdutil.Options) - assertions func(*NewOptions) + name string + args []string + err func(*testutil.T, error) + // Toggle mocking a successful reconcile status + disableMockReconcile bool + objs []runtime.Object + setOpts func(*cmdutil.Options) + assertions func(*NewOptions) }{ { name: "missing workspace name", @@ -260,6 +262,14 @@ func TestNewWorkspace(t *testing.T) { assert.Equal(t, []string{"apply", "destroy", "sh"}, ws.Spec.PrivilegedCommands) }, }, + { + name: "reconcile timeout exceeded", + args: []string{"default/foo", "--reconcile-timeout", "10ms"}, + disableMockReconcile: true, + err: func(t *testutil.T, err error) { + assert.True(t, errors.Is(err, errReconcileTimeout)) + }, + }, { name: "pod timeout exceeded", args: []string{"default/foo", "--pod-timeout", "10ms"}, @@ -289,6 +299,11 @@ func TestNewWorkspace(t *testing.T) { path := t.NewTempDir().Chdir().Root() cmdOpts.Path = path + if !tt.disableMockReconcile { + // Mock successful reconcile + cmdOpts.reconciled = true + } + err = cmd.ExecuteContext(context.Background()) if tt.err != nil { tt.err(t, err) diff --git a/config/crd/bases/etok.dev_all.yaml b/config/crd/bases/etok.dev_all.yaml index aac75c61..4e8cbcf2 100644 --- a/config/crd/bases/etok.dev_all.yaml +++ b/config/crd/bases/etok.dev_all.yaml @@ -95,6 +95,9 @@ spec: phase: description: Current phase of the run's lifecycle. type: string + reconciled: + description: True if resource has been reconciled at least once. + type: boolean type: object type: object served: true @@ -211,6 +214,9 @@ spec: items: type: string type: array + reconciled: + description: True if resource has been reconciled at least once. + type: boolean type: object type: object served: true diff --git a/config/crd/bases/etok.dev_runs.yaml b/config/crd/bases/etok.dev_runs.yaml index 92770267..e4cda02d 100644 --- a/config/crd/bases/etok.dev_runs.yaml +++ b/config/crd/bases/etok.dev_runs.yaml @@ -95,6 +95,9 @@ spec: phase: description: Current phase of the run's lifecycle. type: string + reconciled: + description: True if resource has been reconciled at least once. + type: boolean type: object type: object served: true diff --git a/config/crd/bases/etok.dev_workspaces.yaml b/config/crd/bases/etok.dev_workspaces.yaml index 2e82a0e4..2f92754b 100644 --- a/config/crd/bases/etok.dev_workspaces.yaml +++ b/config/crd/bases/etok.dev_workspaces.yaml @@ -101,6 +101,9 @@ spec: items: type: string type: array + reconciled: + description: True if resource has been reconciled at least once. + type: boolean type: object type: object served: true diff --git a/pkg/controllers/run_controller.go b/pkg/controllers/run_controller.go index 2a07e649..9f031373 100644 --- a/pkg/controllers/run_controller.go +++ b/pkg/controllers/run_controller.go @@ -56,7 +56,7 @@ func (r *RunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R // Run completed, nothing more to be done if run.Phase == v1alpha1.RunPhaseCompleted { - return ctrl.Result{}, nil + return r.success(ctx, log, &run) } // Fetch its Workspace object @@ -106,7 +106,7 @@ func (r *RunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R } } // Go no further - return ctrl.Result{}, nil + return r.success(ctx, log, &run) } // Front of queue, so continue reconciliation @@ -161,6 +161,18 @@ func (r *RunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R } } + return r.success(ctx, log, &run) +} + +// success marks a successful reconcile +func (r *RunReconciler) success(ctx context.Context, log logr.Logger, run *v1alpha1.Run) (ctrl.Result, error) { + if !run.Reconciled { + run.Reconciled = true + if err := r.Status().Update(ctx, run); err != nil { + log.Error(err, "unable to update status") + return ctrl.Result{}, err + } + } return ctrl.Result{}, nil } diff --git a/pkg/controllers/workspace_controller.go b/pkg/controllers/workspace_controller.go index 76f5fb61..e93e9cec 100644 --- a/pkg/controllers/workspace_controller.go +++ b/pkg/controllers/workspace_controller.go @@ -94,7 +94,7 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } // Cease reconciliation - return ctrl.Result{}, nil + return r.success(ctx, log, &ws) } // Manage PVC for workspace @@ -176,6 +176,18 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } } + return r.success(ctx, log, &ws) +} + +// success marks a successful reconcile +func (r *WorkspaceReconciler) success(ctx context.Context, log logr.Logger, ws *v1alpha1.Workspace) (ctrl.Result, error) { + if !ws.Status.Reconciled { + ws.Status.Reconciled = true + if err := r.Status().Update(ctx, ws); err != nil { + log.Error(err, "unable to update status") + return ctrl.Result{}, err + } + } return ctrl.Result{}, nil } diff --git a/pkg/handlers/errors.go b/pkg/handlers/errors.go new file mode 100644 index 00000000..3284bef6 --- /dev/null +++ b/pkg/handlers/errors.go @@ -0,0 +1,5 @@ +package handlers + +import "errors" + +var ErrResourceUnexpectedlyDeleted = errors.New("resource unexpectedly deleted") diff --git a/pkg/handlers/object.go b/pkg/handlers/object.go new file mode 100644 index 00000000..7433d3ab --- /dev/null +++ b/pkg/handlers/object.go @@ -0,0 +1,46 @@ +package handlers + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + watchtools "k8s.io/client-go/tools/watch" +) + +// reconcilable is a kubernetes resource that can state whether it has been +// reconciled or not. +type reconcilable interface { + metav1.Object + // IsReconciled is true if obj has been reconciled at least once by an + // operator. + IsReconciled() bool +} + +// Handler that returns true when a resource has been reconciled. Reconciled +// here means it has been reconciled at least once. This handler is useful to +// determine if an operator is functioning. +func Reconciled(obj reconcilable) watchtools.ConditionFunc { + return func(event watch.Event) (bool, error) { + switch event.Type { + case watch.Deleted: + return false, ErrResourceUnexpectedlyDeleted + } + + eventObj, ok := event.Object.(reconcilable) + if !ok { + // Skip non-reconcilable objects + return false, nil + } + + if eventObj.GetName() != obj.GetName() { + return false, nil + } + + if eventObj.IsReconciled() { + // Success: resource has been reconciled + return true, nil + } + + // Obj is yet to be reconciled + return false, nil + } +} diff --git a/pkg/handlers/pod.go b/pkg/handlers/pod.go index 8deebbb0..76fad4c2 100644 --- a/pkg/handlers/pod.go +++ b/pkg/handlers/pod.go @@ -116,7 +116,7 @@ func PodHandlerWrapper(name string, h podHandler) watchtools.ConditionFunc { } if event.Type == watch.Deleted { - return false, fmt.Errorf("pod was unexpectedly deleted") + return false, ErrResourceUnexpectedlyDeleted } if pod.Status.Phase != phase { diff --git a/pkg/handlers/run.go b/pkg/handlers/run.go index db661058..e276203e 100644 --- a/pkg/handlers/run.go +++ b/pkg/handlers/run.go @@ -1,8 +1,6 @@ package handlers import ( - "fmt" - "github.com/leg100/etok/api/etok.dev/v1alpha1" "k8s.io/apimachinery/pkg/watch" watchtools "k8s.io/client-go/tools/watch" @@ -17,7 +15,7 @@ func LogRunPhase() watchtools.ConditionFunc { return func(event watch.Event) (bool, error) { switch event.Type { case watch.Deleted: - return false, fmt.Errorf("run resource deleted") + return false, ErrResourceUnexpectedlyDeleted } switch run := event.Object.(type) { diff --git a/pkg/handlers/workspace_queue.go b/pkg/handlers/workspace_queue.go index eb9c21fc..5c57b9c2 100644 --- a/pkg/handlers/workspace_queue.go +++ b/pkg/handlers/workspace_queue.go @@ -53,7 +53,7 @@ func workspaceHandlerWrapper(handler workspaceHandler) watchtools.ConditionFunc return func(event watch.Event) (bool, error) { switch event.Type { case watch.Deleted: - return false, fmt.Errorf("workspace resource deleted") + return false, ErrResourceUnexpectedlyDeleted } switch ws := event.Object.(type) { diff --git a/pkg/testobj/k8s.go b/pkg/testobj/k8s.go index f9bfcf76..f5607ab3 100644 --- a/pkg/testobj/k8s.go +++ b/pkg/testobj/k8s.go @@ -20,6 +20,10 @@ func Workspace(namespace, name string, opts ...func(*v1alpha1.Workspace)) *v1alp Size: "1Gi", }, }, + Status: v1alpha1.WorkspaceStatus{ + // Mock obj as having been reconciled at least once. + Reconciled: true, + }, } for _, o := range opts { o(ws) @@ -171,6 +175,10 @@ func Run(namespace, name string, command string, opts ...func(*v1alpha1.Run)) *v HandshakeTimeout: "10s", }, }, + RunStatus: v1alpha1.RunStatus{ + // Mock obj as having been reconciled at least once. + Reconciled: true, + }, } for _, o := range opts {