Skip to content

Commit d45f281

Browse files
author
Ben Reed
committed
Add timeouts to orch package
This change implements timeouts on orchestration events, preventing rouge operations from blocking indefinitely. Specifically, it: - Switches the Executor.Execute method to accept a context.Context - Modifies (*kubeExecutor).provision and (*kubeExecutor).monitor to use a context.Context. They now return an error when the context is cancelled before completion. - Adds a TestTimeout field to ControllerOptions. This sets the timeout on the Executor's context. It also performs a handful of clean-up tasks, including: - Using UUIDs to name executors - Making TestKubeExecutorProvision a table-driven test
1 parent c3458e1 commit d45f281

File tree

4 files changed

+230
-123
lines changed

4 files changed

+230
-123
lines changed

testctrl/svc/orch/controller.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"sync"
2222
"time"
2323

24+
"github.com/google/uuid"
2425
"github.com/golang/glog"
2526
corev1 "k8s.io/api/core/v1"
2627
"k8s.io/client-go/kubernetes"
@@ -44,6 +45,7 @@ type Controller struct {
4445
wg sync.WaitGroup
4546
mux sync.Mutex
4647
newExecutorFunc func() Executor
48+
testTimeout time.Duration
4749
}
4850

4951
// ControllerOptions overrides the defaults of the controller, allowing it to be
@@ -58,6 +60,14 @@ type ControllerOptions struct {
5860
// additional kubernetes events without blocking for reads. It defaults
5961
// to 32 events.
6062
WatchEventBufferSize int
63+
64+
// TestTimeout is the maximum duration to wait for component containers
65+
// to provision and terminate with a successful exit code. If this
66+
// timeout is reached before an exit, the session will error.
67+
//
68+
// The zero value provides unlimited time to the test, so it should be
69+
// avoided in production.
70+
TestTimeout time.Duration
6171
}
6272

6373
// NewController creates a controller using a Kubernetes clientset, store and an
@@ -104,10 +114,16 @@ func NewController(clientset kubernetes.Interface, store store.Store, options *C
104114
watcher: NewWatcher(podInterface, watcherOpts),
105115
store: store,
106116
executorCount: executorCount,
117+
testTimeout: opts.TestTimeout,
107118
}
108119

109120
c.newExecutorFunc = func() Executor {
110-
return newKubeExecutor(0, c.pcd, c.watcher, c.store)
121+
return &kubeExecutor{
122+
name: uuid.New().String(),
123+
pcd: c.pcd,
124+
watcher: c.watcher,
125+
store: c.store,
126+
}
111127
}
112128

113129
return c, nil
@@ -269,12 +285,16 @@ func (c *Controller) spawnExecutor(session *types.Session) {
269285
c.incExecutors()
270286

271287
go func() {
272-
defer func() {
273-
c.decExecutors()
274-
c.waitQueue.Done(session)
275-
}()
288+
ctx, cancel := context.WithCancel(context.Background())
289+
if c.testTimeout > 0 {
290+
ctx, _ = context.WithTimeout(ctx, c.testTimeout)
291+
}
292+
293+
defer cancel()
294+
defer c.decExecutors()
295+
defer c.waitQueue.Done(session)
276296

277-
if err := executor.Execute(session); err != nil {
297+
if err := executor.Execute(ctx, session); err != nil {
278298
glog.Infof("%v", err)
279299
}
280300
}()

testctrl/svc/orch/executor.go

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package orch
22

33
import (
4+
"context"
45
"fmt"
56
"strings"
67
"time"
@@ -13,45 +14,40 @@ import (
1314
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1415
)
1516

16-
// Executor executes sessions, returning an error if there is a problem with the infrastructure.
17-
// Executors are expected to provision, monitor and clean up resources related to a session.
18-
// An error is not indicative of a success or failure of the tests, but signals an orchestration
19-
// issue.
17+
// Executors can run a test session by provisioning its components, monitoring
18+
// its health and cleaning up after its termination.
2019
type Executor interface {
21-
Execute(*types.Session) error
20+
// Execute runs a test session. It accepts a context, which can prevent
21+
// problematic sessions from running indefinitely.
22+
//
23+
// An error is returned if there is a problem regarding the test itself.
24+
// This does not include internal errors that are not specific to the
25+
// test.
26+
Execute(context.Context, *types.Session) error
2227
}
2328

2429
type kubeExecutor struct {
25-
name string
26-
watcher *Watcher
27-
eventChan <-chan *PodWatchEvent
28-
session *types.Session
29-
pcd podCreateDeleter
30-
store store.Store
30+
name string
31+
watcher *Watcher
32+
pcd podCreateDeleter
33+
store store.Store
34+
session *types.Session
35+
eventChan <-chan *PodWatchEvent
3136
}
3237

33-
func newKubeExecutor(index int, pcd podCreateDeleter, watcher *Watcher, store store.Store) *kubeExecutor {
34-
return &kubeExecutor{
35-
name: fmt.Sprintf("%d", index),
36-
watcher: watcher,
37-
pcd: pcd,
38-
store: store,
39-
}
40-
}
41-
42-
func (k *kubeExecutor) Execute(session *types.Session) error {
38+
func (k *kubeExecutor) Execute(ctx context.Context, session *types.Session) error {
4339
k.setSession(session)
4440
k.writeEvent(types.AcceptEvent, nil, "kubernetes executor %v assigned session %v", k.name, session.Name)
4541

4642
k.writeEvent(types.ProvisionEvent, nil, "started provisioning components for session")
47-
err := k.provision()
43+
err := k.provision(ctx)
4844
if err != nil {
4945
err = fmt.Errorf("failed to provision: %v", err)
5046
goto endSession
5147
}
5248

5349
k.writeEvent(types.RunEvent, nil, "all containers appear healthy, monitoring during tests")
54-
err = k.monitor()
50+
err = k.monitor(ctx)
5551
if err != nil {
5652
err = fmt.Errorf("failed during test: %v", err)
5753
}
@@ -84,7 +80,7 @@ endSession:
8480
return nil
8581
}
8682

87-
func (k *kubeExecutor) provision() error {
83+
func (k *kubeExecutor) provision(ctx context.Context) error {
8884
var components []*types.Component
8985
var workerIPs []string
9086

@@ -125,7 +121,8 @@ func (k *kubeExecutor) provision() error {
125121
default:
126122
continue
127123
}
128-
// TODO(#54): Add timeout/deadline for provisioning resources
124+
case <-ctx.Done():
125+
return fmt.Errorf("provision did not complete before timeout")
129126
default:
130127
time.Sleep(1 * time.Second)
131128
}
@@ -138,7 +135,7 @@ func (k *kubeExecutor) provision() error {
138135
return nil
139136
}
140137

141-
func (k *kubeExecutor) monitor() error {
138+
func (k *kubeExecutor) monitor(ctx context.Context) error {
142139
glog.Infof("kubeExecutor[%v]: monitoring components while session %v runs", k.name, k.session.Name)
143140

144141
for {
@@ -150,8 +147,8 @@ func (k *kubeExecutor) monitor() error {
150147
case Failed:
151148
return fmt.Errorf("component %v has failed: %v", event.ComponentName, event.Error)
152149
}
153-
154-
// TODO(#54): Add timeout/deadline for test execution (see concerns on GitHub)
150+
case <-ctx.Done():
151+
return fmt.Errorf("test did not complete before timeout")
155152
}
156153
}
157154
}

0 commit comments

Comments
 (0)