diff --git a/testctrl/cmd/orchtest/main.go b/testctrl/cmd/orchtest/main.go index 796455528bbb2..7752bee0201f4 100644 --- a/testctrl/cmd/orchtest/main.go +++ b/testctrl/cmd/orchtest/main.go @@ -15,6 +15,7 @@ package main import ( + "context" "flag" "log" "os" @@ -49,11 +50,14 @@ func main() { glog.Fatalf("Invalid config file specified by the KUBE_CONFIG_FILE env variable, unable to connect: %v", err) } - c, _ := orch.NewController(clientset, nil) + c, _ := orch.NewController(clientset, nil, nil) if err := c.Start(); err != nil { panic(err) } - defer c.Stop(*timeout) + + shutdownCtx, cancel := context.WithTimeout(context.Background(), *timeout) + defer cancel() + defer c.Stop(shutdownCtx) go func() { for i := 0; i < *count; i++ { diff --git a/testctrl/cmd/svc/main.go b/testctrl/cmd/svc/main.go index cec343380758c..26aeae8c5e0c2 100644 --- a/testctrl/cmd/svc/main.go +++ b/testctrl/cmd/svc/main.go @@ -15,6 +15,7 @@ package main import ( + "context" "flag" "fmt" "net" @@ -64,6 +65,7 @@ func setupDevEnv(grpcServer *grpc.Server) *kubernetes.Clientset { func main() { port := flag.Int("port", 50051, "Port to start the service.") + testTimeout := flag.Duration("testTimeout", 15*time.Minute, "Maximum time tests are allowed to run") shutdownTimeout := flag.Duration("shutdownTimeout", 5*time.Minute, "Time alloted to a graceful shutdown.") flag.Parse() defer glog.Flush() @@ -82,7 +84,10 @@ func main() { storageServer := store.NewStorageServer() - controller, err := orch.NewController(clientset, storageServer) + controllerOpts := &orch.ControllerOptions{ + TestTimeout: *testTimeout, + } + controller, err := orch.NewController(clientset, storageServer, controllerOpts) if err != nil { glog.Fatalf("could not create a controller: %v", err) } @@ -91,7 +96,9 @@ func main() { glog.Fatalf("unable to start orchestration controller: %v", err) } - defer controller.Stop(*shutdownTimeout) + shutdownCtx, cancel := context.WithTimeout(context.Background(), *shutdownTimeout) + defer cancel() + defer controller.Stop(shutdownCtx) lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) if err != nil { diff --git a/testctrl/svc/orch/controller.go b/testctrl/svc/orch/controller.go index 7bce0ea2053c6..ebce47898c145 100644 --- a/testctrl/svc/orch/controller.go +++ b/testctrl/svc/orch/controller.go @@ -15,11 +15,13 @@ package orch import ( + "context" "errors" "fmt" "sync" "time" + "github.com/google/uuid" "github.com/golang/glog" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" @@ -28,9 +30,6 @@ import ( "github.com/grpc/grpc/testctrl/svc/types" ) -// executorCount specifies the maximum number of sessions that should be processed concurrently. -const executorCount = 1 - // Controller serves as the coordinator for orchestrating sessions. It manages active and idle // sessions, as well as, interactions with Kubernetes through a set of a internal types. type Controller struct { @@ -40,34 +39,83 @@ type Controller struct { nl NodeLister watcher *Watcher waitQueue *queue + executorCount int activeCount int running bool wg sync.WaitGroup mux sync.Mutex newExecutorFunc func() Executor + testTimeout time.Duration +} + +// ControllerOptions overrides the defaults of the controller, allowing it to be +// configured as needed. +type ControllerOptions struct { + // ExecutorCount specifies the maximum number of sessions that should be + // processed at a time. It defaults to 1, disabling concurrent sessions. + ExecutorCount int + + // WatcherOptions overrides the defaults of the watcher. The watcher + // listens for Kubernetes events and reports the health of components + // to the session's executor. + WatcherOptions *WatcherOptions + + // TestTimeout is the maximum duration to wait for component containers + // to provision and terminate with a successful exit code. If this + // timeout is reached before an exit, the session will error. + // + // The zero value provides unlimited time to the test, so it should be + // avoided in production. + TestTimeout time.Duration } -// NewController creates a controller using a Kubernetes clientset and a store. The clientset allows -// the controller to interact with Kubernetes. The store is used to report significant orchestration -// events, so progress can be reported. A nil clientset will result in an error. -func NewController(clientset kubernetes.Interface, store store.Store) (*Controller, error) { +// NewController creates a controller using a Kubernetes clientset, store and an +// optional ControllerOptions instance. +// +// The clientset allows the controller to interact with Kubernetes. If nil, an +// error will be returned instead of a controller. +// +// The store is used to report orchestration events, so progress can be reported +// to a user. +// +// The options value allows the controller to be customized. Specifying nil will +// configure the controller to sane defaults. These defaults are described in +// the ControllerOptions documentation. +func NewController(clientset kubernetes.Interface, store store.Store, options *ControllerOptions) (*Controller, error) { if clientset == nil { return nil, errors.New("cannot create controller from nil kubernetes clientset") } + opts := options + if opts == nil { + opts = &ControllerOptions{} + } + + executorCount := opts.ExecutorCount + if executorCount == 0 { + executorCount = 1 + } + coreV1Interface := clientset.CoreV1() podInterface := coreV1Interface.Pods(corev1.NamespaceDefault) c := &Controller{ - pcd: podInterface, - pw: podInterface, - nl: coreV1Interface.Nodes(), - watcher: NewWatcher(podInterface), - store: store, + pcd: podInterface, + pw: podInterface, + nl: coreV1Interface.Nodes(), + watcher: NewWatcher(podInterface, opts.WatcherOptions), + store: store, + executorCount: executorCount, + testTimeout: opts.TestTimeout, } c.newExecutorFunc = func() Executor { - return newKubeExecutor(0, c.pcd, c.watcher, c.store) + return &kubeExecutor{ + name: uuid.New().String(), + pcd: c.pcd, + watcher: c.watcher, + store: c.store, + } } return c, nil @@ -119,15 +167,16 @@ func (c *Controller) Start() error { return nil } -// Stop attempts to terminate all orchestration threads spawned by a call to Start. It waits for a -// graceful shutdown until for a specified timeout. +// Stop attempts to terminate all orchestration threads spawned by a call to +// Start. It waits for a graceful shutdown until the context is cancelled. // -// If the timeout is reached before shutdown, an improper shutdown will occur. This may result in -// unpredictable states for running sessions and their resources. To signal these potential issues, -// an error is returned when this occurs. +// If the context is cancelled before a graceful shutdown, an error is returned. +// This improper shutdown may result in unpredictable states. It should be +// avoided if possible. // -// If Start was not called prior to Stop, there will be no adverse effects and nil will be returned. -func (c *Controller) Stop(timeout time.Duration) error { +// If Start was not called prior to Stop, there will be no adverse effects and +// nil will be returned. +func (c *Controller) Stop(ctx context.Context) error { defer c.watcher.Stop() c.mux.Lock() @@ -143,7 +192,7 @@ func (c *Controller) Stop(timeout time.Duration) error { select { case <-done: glog.Infof("controller: executors safely exited") - case <-time.After(timeout): + case <-ctx.Done(): glog.Warning("controller: unable to wait for executors to safely exit, timed out") return fmt.Errorf("executors did not safely exit before timeout") } @@ -197,7 +246,7 @@ func (c *Controller) next() (session *types.Session, quit bool) { return nil, true } - if c.activeCount > executorCount { + if c.activeCount > c.executorCount { return nil, false } @@ -228,12 +277,16 @@ func (c *Controller) spawnExecutor(session *types.Session) { c.incExecutors() go func() { - defer func() { - c.decExecutors() - c.waitQueue.Done(session) - }() + ctx, cancel := context.WithCancel(context.Background()) + if c.testTimeout > 0 { + ctx, _ = context.WithTimeout(ctx, c.testTimeout) + } + + defer cancel() + defer c.decExecutors() + defer c.waitQueue.Done(session) - if err := executor.Execute(session); err != nil { + if err := executor.Execute(ctx, session); err != nil { glog.Infof("%v", err) } }() diff --git a/testctrl/svc/orch/controller_test.go b/testctrl/svc/orch/controller_test.go index 6b3e10fa72b33..df95c803be2cc 100644 --- a/testctrl/svc/orch/controller_test.go +++ b/testctrl/svc/orch/controller_test.go @@ -1,6 +1,7 @@ package orch import ( + "context" "errors" "testing" "time" @@ -11,7 +12,7 @@ import ( func TestNewController(t *testing.T) { t.Run("nil clientset returns error", func(t *testing.T) { - controller, err := NewController(nil, nil) + controller, err := NewController(nil, nil, nil) if err == nil { t.Errorf("no error returned for nil clientset") } @@ -50,7 +51,7 @@ func TestControllerSchedule(t *testing.T) { for _, tc := range cases { t.Run(tc.description, func(t *testing.T) { - controller, _ := NewController(fake.NewSimpleClientset(), nil) + controller, _ := NewController(fake.NewSimpleClientset(), nil, nil) executor := &executorMock{} controller.newExecutorFunc = func() Executor { return executor @@ -58,7 +59,7 @@ func TestControllerSchedule(t *testing.T) { if tc.start { controller.Start() - defer controller.Stop(0) + defer controller.Stop(context.Background()) } err := controller.Schedule(tc.session) @@ -84,9 +85,9 @@ func TestControllerSchedule(t *testing.T) { func TestControllerStart(t *testing.T) { t.Run("sets running state", func(t *testing.T) { - controller, _ := NewController(fake.NewSimpleClientset(), nil) + controller, _ := NewController(fake.NewSimpleClientset(), nil, nil) controller.Start() - defer controller.Stop(0) + defer controller.Stop(context.Background()) if controller.Stopped() { t.Errorf("Stopped unexpectedly returned true after starting controller") } @@ -112,7 +113,7 @@ func TestControllerStart(t *testing.T) { for _, tc := range cases { t.Run(tc.description, func(t *testing.T) { - controller, _ := NewController(fake.NewSimpleClientset(), nil) + controller, _ := NewController(fake.NewSimpleClientset(), nil, nil) controller.waitQueue = newQueue(limitlessTracker{}) if tc.mockNL != nil { @@ -121,7 +122,7 @@ func TestControllerStart(t *testing.T) { if tc.mockPW != nil { controller.pw = tc.mockPW - controller.watcher = NewWatcher(tc.mockPW) + controller.watcher = NewWatcher(tc.mockPW, nil) } err := controller.Start() @@ -170,7 +171,7 @@ func TestControllerStop(t *testing.T) { for _, tc := range cases { t.Run(tc.description, func(t *testing.T) { - controller, _ := NewController(fake.NewSimpleClientset(), nil) + controller, _ := NewController(fake.NewSimpleClientset(), nil, nil) controller.running = true controller.waitQueue = newQueue(limitlessTracker{}) @@ -196,7 +197,11 @@ func TestControllerStop(t *testing.T) { go controller.loop() time.Sleep(timeout) - err := controller.Stop(tc.stopTimeout) + + ctx, cancel := context.WithTimeout(context.Background(), tc.stopTimeout) + defer cancel() + + err := controller.Stop(ctx) if tc.shouldError && err == nil { t.Errorf("executors unexpectedly finished before timeout") } else if !tc.shouldError && err != nil { diff --git a/testctrl/svc/orch/executor.go b/testctrl/svc/orch/executor.go index 1506dc6475ac8..0f35c0791bba4 100644 --- a/testctrl/svc/orch/executor.go +++ b/testctrl/svc/orch/executor.go @@ -1,6 +1,7 @@ package orch import ( + "context" "fmt" "strings" "time" @@ -13,45 +14,40 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// Executor executes sessions, returning an error if there is a problem with the infrastructure. -// Executors are expected to provision, monitor and clean up resources related to a session. -// An error is not indicative of a success or failure of the tests, but signals an orchestration -// issue. +// Executors can run a test session by provisioning its components, monitoring +// its health and cleaning up after its termination. type Executor interface { - Execute(*types.Session) error + // Execute runs a test session. It accepts a context that can prevent + // problematic sessions from running indefinitely. + // + // An error is returned if there is a problem regarding the test itself. + // This does not include internal errors that are not specific to the + // test. + Execute(context.Context, *types.Session) error } type kubeExecutor struct { - name string - watcher *Watcher - eventChan <-chan *PodWatchEvent - session *types.Session - pcd podCreateDeleter - store store.Store + name string + watcher *Watcher + pcd podCreateDeleter + store store.Store + session *types.Session + eventChan <-chan *PodWatchEvent } -func newKubeExecutor(index int, pcd podCreateDeleter, watcher *Watcher, store store.Store) *kubeExecutor { - return &kubeExecutor{ - name: fmt.Sprintf("%d", index), - watcher: watcher, - pcd: pcd, - store: store, - } -} - -func (k *kubeExecutor) Execute(session *types.Session) error { +func (k *kubeExecutor) Execute(ctx context.Context, session *types.Session) error { k.setSession(session) k.writeEvent(types.AcceptEvent, nil, "kubernetes executor %v assigned session %v", k.name, session.Name) k.writeEvent(types.ProvisionEvent, nil, "started provisioning components for session") - err := k.provision() + err := k.provision(ctx) if err != nil { err = fmt.Errorf("failed to provision: %v", err) goto endSession } k.writeEvent(types.RunEvent, nil, "all containers appear healthy, monitoring during tests") - err = k.monitor() + err = k.monitor(ctx) if err != nil { err = fmt.Errorf("failed during test: %v", err) } @@ -84,7 +80,7 @@ endSession: return nil } -func (k *kubeExecutor) provision() error { +func (k *kubeExecutor) provision(ctx context.Context) error { var components []*types.Component var workerIPs []string @@ -125,7 +121,8 @@ func (k *kubeExecutor) provision() error { default: continue } - // TODO(#54): Add timeout/deadline for provisioning resources + case <-ctx.Done(): + return fmt.Errorf("provision did not complete before timeout") default: time.Sleep(1 * time.Second) } @@ -138,7 +135,7 @@ func (k *kubeExecutor) provision() error { return nil } -func (k *kubeExecutor) monitor() error { +func (k *kubeExecutor) monitor(ctx context.Context) error { glog.Infof("kubeExecutor[%v]: monitoring components while session %v runs", k.name, k.session.Name) for { @@ -150,8 +147,8 @@ func (k *kubeExecutor) monitor() error { case Failed: return fmt.Errorf("component %v has failed: %v", event.ComponentName, event.Error) } - - // TODO(#54): Add timeout/deadline for test execution (see concerns on GitHub) + case <-ctx.Done(): + return fmt.Errorf("test did not complete before timeout") } } } diff --git a/testctrl/svc/orch/executor_test.go b/testctrl/svc/orch/executor_test.go index 878d4812614a3..7a43b60b7006b 100644 --- a/testctrl/svc/orch/executor_test.go +++ b/testctrl/svc/orch/executor_test.go @@ -1,9 +1,11 @@ package orch import ( + "context" "errors" "strings" "testing" + "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -13,105 +15,178 @@ import ( ) func TestKubeExecutorProvision(t *testing.T) { - // test provision with successful pods - fakePodInf := newFakePodInterface(t) - - driver := types.NewComponent(testContainerImage, types.DriverComponent) - server := types.NewComponent(testContainerImage, types.ServerComponent) - client := types.NewComponent(testContainerImage, types.ClientComponent) - - components := []*types.Component{server, client, driver} - session := types.NewSession(driver, components[:2], nil) - - e := newKubeExecutor(0, fakePodInf, nil, nil) - eventChan := make(chan *PodWatchEvent) - e.eventChan = eventChan - e.session = session - - go func() { - for _, c := range components { - eventChan <- &PodWatchEvent{ - SessionName: session.Name, - ComponentName: c.Name, - Pod: nil, - PodIP: "127.0.0.1", - Health: Ready, - Error: nil, - } - } - }() - if err := e.provision(); err != nil { - t.Fatalf("unexpected error in provision: %v", err) + cases := []struct { + description string + events []fakeWatchEvent + ctxTimeout time.Duration + errors bool + }{ + { + description: "successful pods", + events: []fakeWatchEvent{ + { + component: types.ServerComponent, + health: Ready, + podIP: "127.0.0.1", + }, + { + component: types.ClientComponent, + health: Ready, + podIP: "127.0.0.1", + }, + { + component: types.DriverComponent, + health: Ready, + podIP: "127.0.0.1", + }, + }, + errors: false, + }, + { + description: "failed driver pod", + events: []fakeWatchEvent{ + { + component: types.ServerComponent, + health: Ready, + podIP: "127.0.0.1", + }, + { + component: types.ClientComponent, + health: Ready, + podIP: "127.0.0.1", + }, + { + component: types.DriverComponent, + health: Failed, + podIP: "127.0.0.1", + }, + }, + errors: true, + }, + { + description: "cancelled context", + ctxTimeout: 1 * time.Millisecond * timeMultiplier, + events: []fakeWatchEvent{ + { + component: types.ServerComponent, + sleep: 10 * time.Second * timeMultiplier, + health: Succeeded, + podIP: "127.0.0.1", + }, + { + component: types.ClientComponent, + sleep: 10 * time.Second * timeMultiplier, + health: Succeeded, + podIP: "127.0.0.1", + }, + { + component: types.DriverComponent, + sleep: 10 * time.Second * timeMultiplier, + health: Succeeded, + podIP: "127.0.0.1", + }, + }, + errors: true, + }, } - pods, err := listPods(t, fakePodInf) - if err != nil { - t.Fatalf("could not list pods from provision: %v", err) - } + for _, tc := range cases { + t.Run(tc.description, func(t *testing.T) { + fakePodInf := newFakePodInterface(t) - expectedNames := []string{driver.Name, server.Name, client.Name} - for _, en := range expectedNames { - found := false + server := types.NewComponent(testContainerImage, types.ServerComponent) + client := types.NewComponent(testContainerImage, types.ClientComponent) + driver := types.NewComponent(testContainerImage, types.DriverComponent) - for _, pod := range pods { - if strings.Compare(pod.ObjectMeta.Name, en) == 0 { - found = true + components := []*types.Component{server, client, driver} + session := types.NewSession(driver, components[:2], nil) + + e := &kubeExecutor{ + name: "provision-test-executor", + pcd: fakePodInf, + watcher: nil, + store: nil, } - } + eventChan := make(chan *PodWatchEvent) + e.eventChan = eventChan + e.session = session - if !found { - t.Errorf("provision did not create pod for component %v", en) - } - } + go func() { + for _, event := range tc.events { + var component *types.Component - // test provision with an unsuccessful pod - fakePodInf = newFakePodInterface(t) - - driver = types.NewComponent(testContainerImage, types.DriverComponent) - server = types.NewComponent(testContainerImage, types.ServerComponent) - client = types.NewComponent(testContainerImage, types.ClientComponent) - - components = []*types.Component{server, client, driver} - session = types.NewSession(driver, components[:2], nil) - - e = newKubeExecutor(0, fakePodInf, nil, nil) - eventChan = make(chan *PodWatchEvent) - e.eventChan = eventChan - e.session = session - - go func() { - for _, c := range components[:2] { - eventChan <- &PodWatchEvent{ - SessionName: session.Name, - ComponentName: c.Name, - Pod: nil, - PodIP: "127.0.0.1", - Health: Ready, - Error: nil, + switch event.component { + case types.ServerComponent: + component = server + case types.ClientComponent: + component = client + case types.DriverComponent: + component = driver + default: + t.Fatalf("bad component kind") + } + + time.Sleep(event.sleep) + eventChan <- &PodWatchEvent{ + SessionName: session.Name, + ComponentName: component.Name, + Pod: nil, + PodIP: event.podIP, + Health: event.health, + Error: nil, + } + } + }() + + ctx, cancel := context.WithCancel(context.Background()) + if tc.ctxTimeout > 0 { + ctx, _ = context.WithTimeout(ctx, tc.ctxTimeout) + } + defer cancel() + + err := e.provision(ctx) + if tc.errors { + if err == nil { + t.Fatalf("provision did not error") + } + return } - } - - eventChan <- &PodWatchEvent{ - SessionName: session.Name, - ComponentName: components[2].Name, - Pod: nil, - PodIP: "", - Health: Failed, - Error: nil, - } - }() - - if err := e.provision(); err == nil { - t.Errorf("expected error for failing pod, but returned nil") + + if err != nil { + t.Fatalf("unexpected error in provision: %v", err) + } + + pods, err := listPods(t, fakePodInf) + if err != nil { + t.Fatalf("could not list pods from provision: %v", err) + } + + expectedNames := []string{driver.Name, server.Name, client.Name} + for _, en := range expectedNames { + found := false + + for _, pod := range pods { + if strings.Compare(pod.ObjectMeta.Name, en) == 0 { + found = true + } + } + + if !found { + t.Errorf("provision did not create pod for component %v", en) + } + } + }) } } func TestKubeExecutorMonitor(t *testing.T) { cases := []struct { - description string - event *PodWatchEvent - errors bool + description string + event *PodWatchEvent + eventTimeout time.Duration + ctxTimeout time.Duration + errors bool }{ { description: "success event received", @@ -123,6 +198,13 @@ func TestKubeExecutorMonitor(t *testing.T) { event: &PodWatchEvent{Health: Failed}, errors: true, }, + { + description: "cancelled context", + event: &PodWatchEvent{Health: Succeeded}, + eventTimeout: 10 * time.Second * timeMultiplier, + ctxTimeout: 1 * time.Millisecond * timeMultiplier, + errors: true, + }, } for _, tc := range cases { @@ -136,12 +218,18 @@ func TestKubeExecutorMonitor(t *testing.T) { components := []*types.Component{server, client, driver} session := types.NewSession(driver, components[:2], nil) - e := newKubeExecutor(0, fakePodInf, nil, nil) + e := &kubeExecutor{ + name: "", + pcd: fakePodInf, + watcher: nil, + store: nil, + } eventChan := make(chan *PodWatchEvent) e.eventChan = eventChan e.session = session go func() { + time.Sleep(tc.eventTimeout) eventChan <- &PodWatchEvent{ SessionName: session.Name, ComponentName: driver.Name, @@ -152,7 +240,13 @@ func TestKubeExecutorMonitor(t *testing.T) { } }() - err := e.monitor() + ctx, cancel := context.WithCancel(context.Background()) + if tc.ctxTimeout > 0 { + ctx, _ = context.WithTimeout(ctx, tc.ctxTimeout) + } + defer cancel() + + err := e.monitor(ctx) if err == nil && tc.errors { t.Errorf("case '%v' did not return error", tc.description) } else if err != nil && !tc.errors { @@ -162,6 +256,13 @@ func TestKubeExecutorMonitor(t *testing.T) { } } +type fakeWatchEvent struct { + component types.ComponentKind + health Health + sleep time.Duration + podIP string +} + // TODO(@codeblooded): Refactor clean method, or choose to not test this method //func TestKubeExecutorClean(t *testing.T) { // fakePodInf := newFakePodInterface(t) diff --git a/testctrl/svc/orch/internal_test.go b/testctrl/svc/orch/internal_test.go index 943b571660c89..5cd33ad2e8030 100644 --- a/testctrl/svc/orch/internal_test.go +++ b/testctrl/svc/orch/internal_test.go @@ -1,6 +1,7 @@ package orch import ( + "context" "sync" "testing" "time" @@ -112,7 +113,7 @@ func (pwm *podWatcherMock) Watch(listOpts metav1.ListOptions) (watch.Interface, type nodeListerMock struct { nodes []corev1.Node - err error + err error } func (nlm *nodeListerMock) List(_ metav1.ListOptions) (*corev1.NodeList, error) { @@ -144,7 +145,7 @@ type executorMock struct { sessionArg *types.Session } -func (em *executorMock) Execute(session *types.Session) error { +func (em *executorMock) Execute(_ context.Context, session *types.Session) error { em.mux.Lock() defer em.mux.Unlock() diff --git a/testctrl/svc/orch/watcher.go b/testctrl/svc/orch/watcher.go index e688f547f8b20..814f01ccc0bce 100644 --- a/testctrl/svc/orch/watcher.go +++ b/testctrl/svc/orch/watcher.go @@ -21,19 +21,40 @@ import ( // // Create watcher instances with the NewWatcher constructor, not a literal. type Watcher struct { - eventChans map[string]chan *PodWatchEvent - pw podWatcher - quit chan struct{} - mux sync.Mutex - wi watch.Interface + eventChans map[string]chan *PodWatchEvent + eventBufferSize int + pw podWatcher + quit chan struct{} + mux sync.Mutex + wi watch.Interface +} + +// WatcherOptions overrides the defaults of the watcher, allowing it to be +// configured as needed. +type WatcherOptions struct { + // EventBufferSize specifies the size of the buffered channel for each + // session. It allows the watcher to write additional kubernetes events + // without blocking for reads. It defaults to 32 events. + EventBufferSize int } // NewWatcher creates and prepares a new watcher instance. -func NewWatcher(pw podWatcher) *Watcher { +func NewWatcher(pw podWatcher, options *WatcherOptions) *Watcher { + opts := options + if opts == nil { + opts = &WatcherOptions{} + } + + eventBufferSize := opts.EventBufferSize + if eventBufferSize == 0 { + eventBufferSize = 32 + } + return &Watcher{ - eventChans: make(map[string]chan *PodWatchEvent), - pw: pw, - quit: make(chan struct{}), + eventChans: make(map[string]chan *PodWatchEvent), + eventBufferSize: eventBufferSize, + pw: pw, + quit: make(chan struct{}), } } @@ -71,7 +92,7 @@ func (w *Watcher) Subscribe(sessionName string) (<-chan *PodWatchEvent, error) { return nil, fmt.Errorf("session %v already has a follower", sessionName) } - eventChan := make(chan *PodWatchEvent, eventBufferSize) + eventChan := make(chan *PodWatchEvent, w.eventBufferSize) w.eventChans[sessionName] = eventChan return eventChan, nil } @@ -201,5 +222,3 @@ type PodWatchEvent struct { // Error may provide the error details that led to the failing health. Error error } - -const eventBufferSize = 32 diff --git a/testctrl/svc/orch/watcher_test.go b/testctrl/svc/orch/watcher_test.go index be450b0943e23..54958e717e686 100644 --- a/testctrl/svc/orch/watcher_test.go +++ b/testctrl/svc/orch/watcher_test.go @@ -28,7 +28,7 @@ func TestWatcherStart(t *testing.T) { for _, tc := range cases { t.Run(tc.description, func(t *testing.T) { - w := NewWatcher(tc.mock) + w := NewWatcher(tc.mock, nil) defer w.Stop() err := w.Start() @@ -49,7 +49,7 @@ func TestWatcherStop(t *testing.T) { wi := watch.NewRaceFreeFake() mock := &podWatcherMock{wi: wi} - w := NewWatcher(mock) + w := NewWatcher(mock, nil) if err = w.Start(); err != nil { t.Fatalf("setup failed, Start returned error: %v", err) @@ -76,7 +76,7 @@ func TestWatcherSubscribe(t *testing.T) { var event *PodWatchEvent timeout := 100 * time.Millisecond * timeMultiplier - w := NewWatcher(nil) + w := NewWatcher(nil, nil) sharedSessionName := "double-subscription" _, _ = w.Subscribe(sharedSessionName) if _, err = w.Subscribe(sharedSessionName); err == nil { @@ -85,7 +85,7 @@ func TestWatcherSubscribe(t *testing.T) { wi := watch.NewRaceFreeFake() mock := &podWatcherMock{wi: wi} - w = NewWatcher(mock) + w = NewWatcher(mock, nil) if err = w.Start(); err != nil { t.Fatalf("setup failed, Start returned error: %v", err) @@ -140,7 +140,7 @@ func TestWatcherUnsubscribe(t *testing.T) { // test an error is returned without subscription t.Run("no subscription", func(t *testing.T) { - w := NewWatcher(nil) + w := NewWatcher(nil, nil) if err := w.Unsubscribe("non-existent"); err == nil { t.Errorf("did not return an error for Unsubscribe call without subscription") } @@ -150,7 +150,7 @@ func TestWatcherUnsubscribe(t *testing.T) { t.Run("prevents further events", func(t *testing.T) { wi := watch.NewRaceFreeFake() mock := &podWatcherMock{wi: wi} - w := NewWatcher(mock) + w := NewWatcher(mock, nil) if err = w.Start(); err != nil { t.Fatalf("setup failed, Start returned error: %v", err)