Skip to content

Commit

Permalink
Indicate to client successful reconcile
Browse files Browse the repository at this point in the history
  • Loading branch information
leg100 committed Dec 20, 2020
1 parent 675ffcd commit 2376e4d
Show file tree
Hide file tree
Showing 18 changed files with 251 additions and 31 deletions.
7 changes: 7 additions & 0 deletions api/etok.dev/v1alpha1/run_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ func (r *Run) PodName() string { return r.Name }
type RunStatus struct {
// Current phase of the run's lifecycle.
Phase RunPhase `json:"phase,omitempty"`

// True if resource has been reconciled at least once.
Reconciled bool `json:"reconciled,omitempty"`
}

func (r *Run) IsReconciled() bool {
return r.RunStatus.Reconciled
}

type RunPhase string
Expand Down
7 changes: 7 additions & 0 deletions api/etok.dev/v1alpha1/workspace_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@ type WorkspaceStatus struct {

// Lifecycle phase of workspace.
Phase WorkspacePhase `json:"phase,omitempty"`

// True if resource has been reconciled at least once.
Reconciled bool `json:"reconciled,omitempty"`
}

func (ws *Workspace) IsReconciled() bool {
return ws.Status.Reconciled
}

type WorkspacePhase string
Expand Down
2 changes: 2 additions & 0 deletions cmd/launcher/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ func (rc runCommand) cobraCommand(opts *cmdutil.Options, o *LauncherOptions) *co
cmd.Flags().DurationVar(&o.EnqueueTimeout, "enqueue-timeout", 10*time.Second, "timeout waiting to be queued")
cmd.Flags().StringVar(&namespacedWorkspace, "workspace", defaultWorkspace, "etok workspace")

cmd.Flags().DurationVar(&o.ReconcileTimeout, "reconcile-timeout", defaultReconcileTimeout, "timeout for resource to be reconciled")

return cmd
}

Expand Down
40 changes: 38 additions & 2 deletions cmd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ import (
)

const (
defaultWorkspace = "default/default"
defaultWorkspace = "default/default"
defaultReconcileTimeout = 10 * time.Second
)

var (
errNotAuthorised = errors.New("you are not authorised")
errEnqueueTimeout = errors.New("timed out waiting for run to be enqueued")
errWorkspaceNotFound = errors.New("workspace not found")
errReconcileTimeout = errors.New("timed out waiting for run to be reconciled")
)

// LauncherOptions deploys a new Run. It monitors not only its progress, but
Expand Down Expand Up @@ -78,13 +80,18 @@ type LauncherOptions struct {
PodTimeout time.Duration
// timeout waiting to be queued
EnqueueTimeout time.Duration
// Timeout for resource to be reconciled (at least once)
ReconcileTimeout time.Duration

// Disable TTY detection
DisableTTY bool

// Recall if resources are created so that if error occurs they can be cleaned up
createdRun bool
createdArchive bool

// For testing purposes toggle obj having been reconciled
reconciled bool
}

func (o *LauncherOptions) lookupEnvFile(cmd *cobra.Command) error {
Expand Down Expand Up @@ -123,6 +130,14 @@ func (o *LauncherOptions) Run(ctx context.Context) error {

g, gctx := errgroup.WithContext(ctx)

// Wait for resource to have been successfully reconciled at least once
// within the ReconcileTimeout (If we don't do this and the operator is
// either not installed or malfunctioning then the user would be none the
// wiser until the much longer PodTimeout had expired).
g.Go(func() error {
return o.waitForReconcile(gctx, run)
})

// Wait for pod and when ready send pod on chan
podch := make(chan *corev1.Pod, 1)
g.Go(func() error {
Expand All @@ -133,7 +148,7 @@ func (o *LauncherOptions) Run(ctx context.Context) error {
// Only commands other than plan are queued - wait for run to be
// enqueued
g.Go(func() error {
return o.waitForEnqueued(ctx, run)
return o.waitForEnqueued(gctx, run)
})
}

Expand Down Expand Up @@ -338,6 +353,9 @@ func (o *LauncherOptions) createRun(ctx context.Context, name, configMapName str

run.Verbosity = o.Verbosity

// For testing purposes mimic obj having been reconciled
run.Reconciled = o.reconciled

if isTTY {
run.AttachSpec.Handshake = true
run.AttachSpec.HandshakeTimeout = o.HandshakeTimeout.String()
Expand Down Expand Up @@ -384,3 +402,21 @@ func (o *LauncherOptions) createConfigMap(ctx context.Context, tarball []byte, n

return nil
}

// waitForReconcile waits for the workspace resource to be reconciled.
func (o *LauncherOptions) waitForReconcile(ctx context.Context, run *v1alpha1.Run) error {
lw := &k8s.RunListWatcher{Client: o.EtokClient, Name: run.Name, Namespace: run.Namespace}
hdlr := handlers.Reconciled(run)

ctx, cancel := context.WithTimeout(ctx, o.ReconcileTimeout)
defer cancel()

_, err := watchtools.UntilWithSync(ctx, lw, &v1alpha1.Run{}, nil, hdlr)
if err != nil {
if errors.Is(err, wait.ErrWaitTimeout) {
return errReconcileTimeout
}
return err
}
return nil
}
20 changes: 17 additions & 3 deletions cmd/launcher/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ func TestLauncher(t *testing.T) {
// Size of content to be archived
size int
// Mock exit code of runner pod
code int32
setOpts func(*cmdutil.Options)
assertions func(*LauncherOptions)
code int32
// Toggle mocking a successful reconcile status
disableMockReconcile bool
setOpts func(*cmdutil.Options)
assertions func(*LauncherOptions)
}{
{
name: "plan",
Expand Down Expand Up @@ -260,6 +262,13 @@ func TestLauncher(t *testing.T) {
size: 1024*1024 + 1,
err: archive.MaxSizeError(archive.MaxConfigSize),
},
{
name: "reconcile timeout exceeded",
args: []string{"--reconcile-timeout", "10ms"},
objs: []runtime.Object{testobj.Workspace("default", "default")},
disableMockReconcile: true,
err: errReconcileTimeout,
},
}

// Run tests for each command
Expand All @@ -285,6 +294,11 @@ func TestLauncher(t *testing.T) {

cmdOpts := &LauncherOptions{RunName: "run-12345"}

if !tt.disableMockReconcile {
// Mock successful reconcile
cmdOpts.reconciled = true
}

// create cobra command
cmd := rc.cobraCommand(opts, cmdOpts)
cmd.SetOut(out)
Expand Down
70 changes: 58 additions & 12 deletions cmd/workspace/workspace_new.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/leg100/etok/pkg/labels"
"github.com/leg100/etok/pkg/monitors"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"

"github.com/leg100/etok/pkg/env"
"github.com/leg100/etok/pkg/logstreamer"
Expand All @@ -27,15 +28,16 @@ import (
)

const (
defaultTimeoutWorkspace = 10 * time.Second
defaultReconcileTimeout = 10 * time.Second
defaultPodTimeout = 60 * time.Second
defaultCacheSize = "1Gi"
defaultSecretName = "etok"
defaultServiceAccountName = "etok"
)

var (
errPodTimeout = errors.New("timed out waiting for pod to be ready")
errPodTimeout = errors.New("timed out waiting for pod to be ready")
errReconcileTimeout = errors.New("timed out waiting for workspace to be reconciled")
)

type NewOptions struct {
Expand All @@ -54,8 +56,9 @@ type NewOptions struct {
DisableCreateServiceAccount bool
// Create a secret if it does not exist
DisableCreateSecret bool
// Timeout for workspace to be healthy
TimeoutWorkspace time.Duration

// Timeout for resource to be reconciled (at least once)
ReconcileTimeout time.Duration

// Timeout for workspace pod to be ready
PodTimeout time.Duration
Expand All @@ -68,6 +71,9 @@ type NewOptions struct {
createdWorkspace bool
createdServiceAccount bool
createdSecret bool

// For testing purposes toggle obj having been reconciled
reconciled bool
}

func NewCmd(opts *cmdutil.Options) (*cobra.Command, *NewOptions) {
Expand Down Expand Up @@ -119,7 +125,7 @@ func NewCmd(opts *cmdutil.Options) (*cobra.Command, *NewOptions) {
// that so use empty string and override later (see above)
o.WorkspaceSpec.Cache.StorageClass = cmd.Flags().String("storage-class", "", "StorageClass of PersistentVolume for cache")

cmd.Flags().DurationVar(&o.TimeoutWorkspace, "timeout", defaultTimeoutWorkspace, "Time to wait for workspace to be healthy")
cmd.Flags().DurationVar(&o.ReconcileTimeout, "reconcile-timeout", defaultReconcileTimeout, "timeout for resource to be reconciled")
cmd.Flags().DurationVar(&o.PodTimeout, "pod-timeout", defaultPodTimeout, "timeout for pod to be ready")

cmd.Flags().StringSliceVar(&o.WorkspaceSpec.PrivilegedCommands, "privileged-commands", []string{}, "Set privileged commands")
Expand Down Expand Up @@ -151,13 +157,31 @@ func (o *NewOptions) Run(ctx context.Context) error {
o.createdWorkspace = true
fmt.Printf("Created workspace %s\n", o.name())

// Wait until container can be streamed from
g, gctx := errgroup.WithContext(ctx)

fmt.Println("Waiting for workspace pod to be ready...")
pod, err := o.waitForContainer(ctx, ws)
if err != nil {
podch := make(chan *corev1.Pod, 1)
g.Go(func() error {
return o.waitForContainer(gctx, ws, podch)
})

// Wait for resource to have been successfully reconciled at least once
// within the ReconcileTimeout (If we don't do this and the operator is
// either not installed or malfunctioning then the user would be none the
// wiser until the much longer PodTimeout had expired).
g.Go(func() error {
return o.waitForReconcile(gctx, ws)
})

// Wait for both workspace to have been reconciled and for its pod container
// to be ready
if err := g.Wait(); err != nil {
return err
}

// Receive ready pod
pod := <-podch

// Monitor exit code; non-blocking
exit := monitors.ExitMonitor(ctx, o.KubeClient, pod.Name, pod.Namespace, controllers.InstallerContainerName)

Expand Down Expand Up @@ -217,6 +241,9 @@ func (o *NewOptions) createWorkspace(ctx context.Context) (*v1alpha1.Workspace,

ws.Spec.Verbosity = o.Verbosity

// For testing purposes mimic obj having been reconciled
ws.Status.Reconciled = o.reconciled

return o.WorkspacesClient(o.Namespace).Create(ctx, ws, metav1.CreateOptions{})
}

Expand Down Expand Up @@ -294,7 +321,7 @@ func (o *NewOptions) createServiceAccount(ctx context.Context, name string) (*co

// waitForContainer returns true once the installer container can be streamed
// from
func (o *NewOptions) waitForContainer(ctx context.Context, ws *v1alpha1.Workspace) (*corev1.Pod, error) {
func (o *NewOptions) waitForContainer(ctx context.Context, ws *v1alpha1.Workspace, podch chan<- *corev1.Pod) error {
lw := &k8s.PodListWatcher{Client: o.KubeClient, Name: ws.PodName(), Namespace: ws.Namespace}
hdlr := handlers.ContainerReady(ws.PodName(), controllers.InstallerContainerName, true, false)

Expand All @@ -304,9 +331,28 @@ func (o *NewOptions) waitForContainer(ctx context.Context, ws *v1alpha1.Workspac
event, err := watchtools.UntilWithSync(ctx, lw, &corev1.Pod{}, nil, hdlr)
if err != nil {
if errors.Is(err, wait.ErrWaitTimeout) {
return nil, errPodTimeout
return errPodTimeout
}
return nil, err
return err
}
return event.Object.(*corev1.Pod), err
podch <- event.Object.(*corev1.Pod)
return nil
}

// waitForReconcile waits for the workspace resource to be reconciled.
func (o *NewOptions) waitForReconcile(ctx context.Context, ws *v1alpha1.Workspace) error {
lw := &k8s.WorkspaceListWatcher{Client: o.EtokClient, Name: ws.Name, Namespace: ws.Namespace}
hdlr := handlers.Reconciled(ws)

ctx, cancel := context.WithTimeout(ctx, o.ReconcileTimeout)
defer cancel()

_, err := watchtools.UntilWithSync(ctx, lw, &v1alpha1.Workspace{}, nil, hdlr)
if err != nil {
if errors.Is(err, wait.ErrWaitTimeout) {
return errReconcileTimeout
}
return err
}
return nil
}
27 changes: 21 additions & 6 deletions cmd/workspace/workspace_new_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ func TestNewWorkspace(t *testing.T) {
var fakeError = errors.New("fake error")

tests := []struct {
name string
args []string
err func(*testutil.T, error)
objs []runtime.Object
setOpts func(*cmdutil.Options)
assertions func(*NewOptions)
name string
args []string
err func(*testutil.T, error)
// Toggle mocking a successful reconcile status
disableMockReconcile bool
objs []runtime.Object
setOpts func(*cmdutil.Options)
assertions func(*NewOptions)
}{
{
name: "missing workspace name",
Expand Down Expand Up @@ -260,6 +262,14 @@ func TestNewWorkspace(t *testing.T) {
assert.Equal(t, []string{"apply", "destroy", "sh"}, ws.Spec.PrivilegedCommands)
},
},
{
name: "reconcile timeout exceeded",
args: []string{"default/foo", "--reconcile-timeout", "10ms"},
disableMockReconcile: true,
err: func(t *testutil.T, err error) {
assert.True(t, errors.Is(err, errReconcileTimeout))
},
},
{
name: "pod timeout exceeded",
args: []string{"default/foo", "--pod-timeout", "10ms"},
Expand Down Expand Up @@ -289,6 +299,11 @@ func TestNewWorkspace(t *testing.T) {
path := t.NewTempDir().Chdir().Root()
cmdOpts.Path = path

if !tt.disableMockReconcile {
// Mock successful reconcile
cmdOpts.reconciled = true
}

err = cmd.ExecuteContext(context.Background())
if tt.err != nil {
tt.err(t, err)
Expand Down
6 changes: 6 additions & 0 deletions config/crd/bases/etok.dev_all.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ spec:
phase:
description: Current phase of the run's lifecycle.
type: string
reconciled:
description: True if resource has been reconciled at least once.
type: boolean
type: object
type: object
served: true
Expand Down Expand Up @@ -211,6 +214,9 @@ spec:
items:
type: string
type: array
reconciled:
description: True if resource has been reconciled at least once.
type: boolean
type: object
type: object
served: true
Expand Down
3 changes: 3 additions & 0 deletions config/crd/bases/etok.dev_runs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ spec:
phase:
description: Current phase of the run's lifecycle.
type: string
reconciled:
description: True if resource has been reconciled at least once.
type: boolean
type: object
type: object
served: true
Expand Down
Loading

0 comments on commit 2376e4d

Please sign in to comment.