11package orch
22
33import (
4+ "context"
45 "fmt"
56 "strings"
67 "time"
@@ -13,45 +14,40 @@ import (
1314 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1415)
1516
16- // Executor executes sessions, returning an error if there is a problem with the infrastructure.
17- // Executors are expected to provision, monitor and clean up resources related to a session.
18- // An error is not indicative of a success or failure of the tests, but signals an orchestration
19- // issue.
17+ // Executors can run a test session by provisioning its components, monitoring
18+ // its health and cleaning up after its termination.
2019type Executor interface {
21- Execute (* types.Session ) error
20+ // Execute runs a test session. It accepts a context that can prevent
21+ // problematic sessions from running indefinitely.
22+ //
23+ // An error is returned if there is a problem regarding the test itself.
24+ // This does not include internal errors that are not specific to the
25+ // test.
26+ Execute (context.Context , * types.Session ) error
2227}
2328
2429type kubeExecutor struct {
25- name string
26- watcher * Watcher
27- eventChan <- chan * PodWatchEvent
28- session * types. Session
29- pcd podCreateDeleter
30- store store. Store
30+ name string
31+ watcher * Watcher
32+ pcd podCreateDeleter
33+ store store. Store
34+ session * types. Session
35+ eventChan <- chan * PodWatchEvent
3136}
3237
33- func newKubeExecutor (index int , pcd podCreateDeleter , watcher * Watcher , store store.Store ) * kubeExecutor {
34- return & kubeExecutor {
35- name : fmt .Sprintf ("%d" , index ),
36- watcher : watcher ,
37- pcd : pcd ,
38- store : store ,
39- }
40- }
41-
42- func (k * kubeExecutor ) Execute (session * types.Session ) error {
38+ func (k * kubeExecutor ) Execute (ctx context.Context , session * types.Session ) error {
4339 k .setSession (session )
4440 k .writeEvent (types .AcceptEvent , nil , "kubernetes executor %v assigned session %v" , k .name , session .Name )
4541
4642 k .writeEvent (types .ProvisionEvent , nil , "started provisioning components for session" )
47- err := k .provision ()
43+ err := k .provision (ctx )
4844 if err != nil {
4945 err = fmt .Errorf ("failed to provision: %v" , err )
5046 goto endSession
5147 }
5248
5349 k .writeEvent (types .RunEvent , nil , "all containers appear healthy, monitoring during tests" )
54- err = k .monitor ()
50+ err = k .monitor (ctx )
5551 if err != nil {
5652 err = fmt .Errorf ("failed during test: %v" , err )
5753 }
@@ -84,7 +80,7 @@ endSession:
8480 return nil
8581}
8682
87- func (k * kubeExecutor ) provision () error {
83+ func (k * kubeExecutor ) provision (ctx context. Context ) error {
8884 var components []* types.Component
8985 var workerIPs []string
9086
@@ -125,7 +121,8 @@ func (k *kubeExecutor) provision() error {
125121 default :
126122 continue
127123 }
128- // TODO(#54): Add timeout/deadline for provisioning resources
124+ case <- ctx .Done ():
125+ return fmt .Errorf ("provision did not complete before timeout" )
129126 default :
130127 time .Sleep (1 * time .Second )
131128 }
@@ -138,7 +135,7 @@ func (k *kubeExecutor) provision() error {
138135 return nil
139136}
140137
141- func (k * kubeExecutor ) monitor () error {
138+ func (k * kubeExecutor ) monitor (ctx context. Context ) error {
142139 glog .Infof ("kubeExecutor[%v]: monitoring components while session %v runs" , k .name , k .session .Name )
143140
144141 for {
@@ -150,8 +147,8 @@ func (k *kubeExecutor) monitor() error {
150147 case Failed :
151148 return fmt .Errorf ("component %v has failed: %v" , event .ComponentName , event .Error )
152149 }
153-
154- // TODO(#54): Add timeout/deadline for test execution (see concerns on GitHub )
150+ case <- ctx . Done ():
151+ return fmt . Errorf ( " test did not complete before timeout" )
155152 }
156153 }
157154}
0 commit comments