Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions testctrl/cmd/orchtest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package main

import (
"context"
"flag"
"log"
"os"
Expand Down Expand Up @@ -49,11 +50,14 @@ func main() {
glog.Fatalf("Invalid config file specified by the KUBE_CONFIG_FILE env variable, unable to connect: %v", err)
}

c, _ := orch.NewController(clientset, nil)
c, _ := orch.NewController(clientset, nil, nil)
if err := c.Start(); err != nil {
panic(err)
}
defer c.Stop(*timeout)

shutdownCtx, cancel := context.WithTimeout(context.Background(), *timeout)
defer cancel()
defer c.Stop(shutdownCtx)

go func() {
for i := 0; i < *count; i++ {
Expand Down
11 changes: 9 additions & 2 deletions testctrl/cmd/svc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package main

import (
"context"
"flag"
"fmt"
"net"
Expand Down Expand Up @@ -64,6 +65,7 @@ func setupDevEnv(grpcServer *grpc.Server) *kubernetes.Clientset {

func main() {
port := flag.Int("port", 50051, "Port to start the service.")
testTimeout := flag.Duration("testTimeout", 15*time.Minute, "Maximum time tests are allowed to run")
shutdownTimeout := flag.Duration("shutdownTimeout", 5*time.Minute, "Time alloted to a graceful shutdown.")
flag.Parse()
defer glog.Flush()
Expand All @@ -82,7 +84,10 @@ func main() {

storageServer := store.NewStorageServer()

controller, err := orch.NewController(clientset, storageServer)
controllerOpts := &orch.ControllerOptions{
TestTimeout: *testTimeout,
}
controller, err := orch.NewController(clientset, storageServer, controllerOpts)
if err != nil {
glog.Fatalf("could not create a controller: %v", err)
}
Expand All @@ -91,7 +96,9 @@ func main() {
glog.Fatalf("unable to start orchestration controller: %v", err)
}

defer controller.Stop(*shutdownTimeout)
shutdownCtx, cancel := context.WithTimeout(context.Background(), *shutdownTimeout)
defer cancel()
defer controller.Stop(shutdownCtx)

lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
Expand Down
107 changes: 80 additions & 27 deletions testctrl/svc/orch/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
package orch

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/google/uuid"
"github.com/golang/glog"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
Expand All @@ -28,9 +30,6 @@ import (
"github.com/grpc/grpc/testctrl/svc/types"
)

// executorCount specifies the maximum number of sessions that should be processed concurrently.
const executorCount = 1

// Controller serves as the coordinator for orchestrating sessions. It manages active and idle
// sessions, as well as, interactions with Kubernetes through a set of a internal types.
type Controller struct {
Expand All @@ -40,34 +39,83 @@ type Controller struct {
nl NodeLister
watcher *Watcher
waitQueue *queue
executorCount int
activeCount int
running bool
wg sync.WaitGroup
mux sync.Mutex
newExecutorFunc func() Executor
testTimeout time.Duration
}

// ControllerOptions overrides the defaults of the controller, allowing it to be
// configured as needed.
type ControllerOptions struct {
// ExecutorCount specifies the maximum number of sessions that should be
// processed at a time. It defaults to 1, disabling concurrent sessions.
ExecutorCount int

// WatcherOptions overrides the defaults of the watcher. The watcher
// listens for Kubernetes events and reports the health of components
// to the session's executor.
WatcherOptions *WatcherOptions

// TestTimeout is the maximum duration to wait for component containers
// to provision and terminate with a successful exit code. If this
// timeout is reached before an exit, the session will error.
//
// The zero value provides unlimited time to the test, so it should be
// avoided in production.
TestTimeout time.Duration
}

// NewController creates a controller using a Kubernetes clientset and a store. The clientset allows
// the controller to interact with Kubernetes. The store is used to report significant orchestration
// events, so progress can be reported. A nil clientset will result in an error.
func NewController(clientset kubernetes.Interface, store store.Store) (*Controller, error) {
// NewController creates a controller using a Kubernetes clientset, store and an
// optional ControllerOptions instance.
//
// The clientset allows the controller to interact with Kubernetes. If nil, an
// error will be returned instead of a controller.
//
// The store is used to report orchestration events, so progress can be reported
// to a user.
//
// The options value allows the controller to be customized. Specifying nil will
// configure the controller to sane defaults. These defaults are described in
// the ControllerOptions documentation.
func NewController(clientset kubernetes.Interface, store store.Store, options *ControllerOptions) (*Controller, error) {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it is best to make the ControllerOptions parameter a pointer. It allows someone to specify nil and inherit defaults, but it decreases the readability of function calls. It may be better to have callers specify ControllerOptions{} to inherit the defaults. I believe I have seen both with Kubernetes. Would love input here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is too important. If it is not a pointer you would write ControllerOptions{} where you now write nil, and it avoids some logic inside the controller, so it may be a reasonable thing to do.

if clientset == nil {
return nil, errors.New("cannot create controller from nil kubernetes clientset")
}

opts := options
if opts == nil {
opts = &ControllerOptions{}
}

executorCount := opts.ExecutorCount
if executorCount == 0 {
executorCount = 1
}

coreV1Interface := clientset.CoreV1()
podInterface := coreV1Interface.Pods(corev1.NamespaceDefault)

c := &Controller{
pcd: podInterface,
pw: podInterface,
nl: coreV1Interface.Nodes(),
watcher: NewWatcher(podInterface),
store: store,
pcd: podInterface,
pw: podInterface,
nl: coreV1Interface.Nodes(),
watcher: NewWatcher(podInterface, opts.WatcherOptions),
store: store,
executorCount: executorCount,
testTimeout: opts.TestTimeout,
}

c.newExecutorFunc = func() Executor {
return newKubeExecutor(0, c.pcd, c.watcher, c.store)
return &kubeExecutor{
name: uuid.New().String(),
pcd: c.pcd,
watcher: c.watcher,
store: c.store,
}
}

return c, nil
Expand Down Expand Up @@ -119,15 +167,16 @@ func (c *Controller) Start() error {
return nil
}

// Stop attempts to terminate all orchestration threads spawned by a call to Start. It waits for a
// graceful shutdown until for a specified timeout.
// Stop attempts to terminate all orchestration threads spawned by a call to
// Start. It waits for a graceful shutdown until the context is cancelled.
//
// If the timeout is reached before shutdown, an improper shutdown will occur. This may result in
// unpredictable states for running sessions and their resources. To signal these potential issues,
// an error is returned when this occurs.
// If the context is cancelled before a graceful shutdown, an error is returned.
// This improper shutdown may result in unpredictable states. It should be
// avoided if possible.
//
// If Start was not called prior to Stop, there will be no adverse effects and nil will be returned.
func (c *Controller) Stop(timeout time.Duration) error {
// If Start was not called prior to Stop, there will be no adverse effects and
// nil will be returned.
func (c *Controller) Stop(ctx context.Context) error {
defer c.watcher.Stop()

c.mux.Lock()
Expand All @@ -143,7 +192,7 @@ func (c *Controller) Stop(timeout time.Duration) error {
select {
case <-done:
glog.Infof("controller: executors safely exited")
case <-time.After(timeout):
case <-ctx.Done():
glog.Warning("controller: unable to wait for executors to safely exit, timed out")
return fmt.Errorf("executors did not safely exit before timeout")
}
Expand Down Expand Up @@ -197,7 +246,7 @@ func (c *Controller) next() (session *types.Session, quit bool) {
return nil, true
}

if c.activeCount > executorCount {
if c.activeCount > c.executorCount {
return nil, false
}

Expand Down Expand Up @@ -228,12 +277,16 @@ func (c *Controller) spawnExecutor(session *types.Session) {
c.incExecutors()

go func() {
defer func() {
c.decExecutors()
c.waitQueue.Done(session)
}()
ctx, cancel := context.WithCancel(context.Background())
if c.testTimeout > 0 {
ctx, _ = context.WithTimeout(ctx, c.testTimeout)
}

defer cancel()
defer c.decExecutors()
defer c.waitQueue.Done(session)

if err := executor.Execute(session); err != nil {
if err := executor.Execute(ctx, session); err != nil {
glog.Infof("%v", err)
}
}()
Expand Down
23 changes: 14 additions & 9 deletions testctrl/svc/orch/controller_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package orch

import (
"context"
"errors"
"testing"
"time"
Expand All @@ -11,7 +12,7 @@ import (

func TestNewController(t *testing.T) {
t.Run("nil clientset returns error", func(t *testing.T) {
controller, err := NewController(nil, nil)
controller, err := NewController(nil, nil, nil)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All tests currently use the default options. In the future, we may want to add tests to experiment with multiple combinations. The defaults are very sane, so I doubt we will deviate much.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that means testing scheduling with two executors or with a timeout or with different watcher options. It may be ok to defer these, although eventually it would be good to test for instance that a second test can be scheduled when you have two executors, and so on.

if err == nil {
t.Errorf("no error returned for nil clientset")
}
Expand Down Expand Up @@ -50,15 +51,15 @@ func TestControllerSchedule(t *testing.T) {

for _, tc := range cases {
t.Run(tc.description, func(t *testing.T) {
controller, _ := NewController(fake.NewSimpleClientset(), nil)
controller, _ := NewController(fake.NewSimpleClientset(), nil, nil)
executor := &executorMock{}
controller.newExecutorFunc = func() Executor {
return executor
}

if tc.start {
controller.Start()
defer controller.Stop(0)
defer controller.Stop(context.Background())
}

err := controller.Schedule(tc.session)
Expand All @@ -84,9 +85,9 @@ func TestControllerSchedule(t *testing.T) {

func TestControllerStart(t *testing.T) {
t.Run("sets running state", func(t *testing.T) {
controller, _ := NewController(fake.NewSimpleClientset(), nil)
controller, _ := NewController(fake.NewSimpleClientset(), nil, nil)
controller.Start()
defer controller.Stop(0)
defer controller.Stop(context.Background())
if controller.Stopped() {
t.Errorf("Stopped unexpectedly returned true after starting controller")
}
Expand All @@ -112,7 +113,7 @@ func TestControllerStart(t *testing.T) {

for _, tc := range cases {
t.Run(tc.description, func(t *testing.T) {
controller, _ := NewController(fake.NewSimpleClientset(), nil)
controller, _ := NewController(fake.NewSimpleClientset(), nil, nil)
controller.waitQueue = newQueue(limitlessTracker{})

if tc.mockNL != nil {
Expand All @@ -121,7 +122,7 @@ func TestControllerStart(t *testing.T) {

if tc.mockPW != nil {
controller.pw = tc.mockPW
controller.watcher = NewWatcher(tc.mockPW)
controller.watcher = NewWatcher(tc.mockPW, nil)
}

err := controller.Start()
Expand Down Expand Up @@ -170,7 +171,7 @@ func TestControllerStop(t *testing.T) {

for _, tc := range cases {
t.Run(tc.description, func(t *testing.T) {
controller, _ := NewController(fake.NewSimpleClientset(), nil)
controller, _ := NewController(fake.NewSimpleClientset(), nil, nil)
controller.running = true
controller.waitQueue = newQueue(limitlessTracker{})

Expand All @@ -196,7 +197,11 @@ func TestControllerStop(t *testing.T) {

go controller.loop()
time.Sleep(timeout)
err := controller.Stop(tc.stopTimeout)

ctx, cancel := context.WithTimeout(context.Background(), tc.stopTimeout)
defer cancel()

err := controller.Stop(ctx)
if tc.shouldError && err == nil {
t.Errorf("executors unexpectedly finished before timeout")
} else if !tc.shouldError && err != nil {
Expand Down
Loading