Skip to content

Commit c6600db

Browse files
author
Ben Reed
committed
Add options struct for Controller and Watcher
This change adds an options struct for the Controller and Watcher types, allowing them to be configured. Previously, they used magic constants.
1 parent 56e0676 commit c6600db

File tree

4 files changed

+95
-37
lines changed

4 files changed

+95
-37
lines changed

testctrl/svc/orch/controller.go

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@ import (
2828
"github.com/grpc/grpc/testctrl/svc/types"
2929
)
3030

31-
// executorCount specifies the maximum number of sessions that should be processed concurrently.
32-
const executorCount = 1
33-
3431
// Controller serves as the coordinator for orchestrating sessions. It manages active and idle
3532
// sessions, as well as, interactions with Kubernetes through a set of a internal types.
3633
type Controller struct {
@@ -40,30 +37,72 @@ type Controller struct {
4037
nl NodeLister
4138
watcher *Watcher
4239
waitQueue *queue
40+
executorCount int
4341
activeCount int
4442
running bool
4543
wg sync.WaitGroup
4644
mux sync.Mutex
4745
newExecutorFunc func() Executor
4846
}
4947

50-
// NewController creates a controller using a Kubernetes clientset and a store. The clientset allows
51-
// the controller to interact with Kubernetes. The store is used to report significant orchestration
52-
// events, so progress can be reported. A nil clientset will result in an error.
53-
func NewController(clientset kubernetes.Interface, store store.Store) (*Controller, error) {
48+
// ControllerOptions overrides the defaults of the controller, allowing it to be
49+
// configured as needed.
50+
type ControllerOptions struct {
51+
// ExecutorCount specifies the maximum number of sessions that should be
52+
// processed at a time. It defaults to 1, disabling concurrent sessions.
53+
ExecutorCount int
54+
55+
// WatchEventBufferSize specifies the size of the buffered channel for
56+
// each channel the watcher creates. It allows the watcher to write
57+
// additional kubernetes events without blocking for reads. It defaults
58+
// to 32 events.
59+
WatchEventBufferSize int
60+
}
61+
62+
// NewController creates a controller using a Kubernetes clientset, store and an
63+
// optional ControllerOptions instance.
64+
//
65+
// The clientset allows the controller to interact with Kubernetes. If nil, an
66+
// error will be returned instead of a controller.
67+
//
68+
// The store is used to report orchestration events, so progress can be reported
69+
// to a user.
70+
//
71+
// The options value allows the controller to be customized. Specifying nil will
72+
// configure the controller to sane defaults described in the ControllerOptions
73+
// documentation.
74+
func NewController(clientset kubernetes.Interface, store store.Store, options *ControllerOptions) (*Controller, error) {
5475
if clientset == nil {
5576
return nil, errors.New("cannot create controller from nil kubernetes clientset")
5677
}
5778

79+
opts := options
80+
if opts == nil {
81+
opts = &ControllerOptions{}
82+
}
83+
84+
executorCount := opts.ExecutorCount
85+
if executorCount == 0 {
86+
executorCount = 1
87+
}
88+
89+
watcherOpts := &WatcherOptions{
90+
EventBufferSize: opts.WatchEventBufferSize,
91+
}
92+
if watcherOpts.EventBufferSize == 0 {
93+
watcherOpts.EventBufferSize = 32
94+
}
95+
5896
coreV1Interface := clientset.CoreV1()
5997
podInterface := coreV1Interface.Pods(corev1.NamespaceDefault)
6098

6199
c := &Controller{
62-
pcd: podInterface,
63-
pw: podInterface,
64-
nl: coreV1Interface.Nodes(),
65-
watcher: NewWatcher(podInterface),
66-
store: store,
100+
pcd: podInterface,
101+
pw: podInterface,
102+
nl: coreV1Interface.Nodes(),
103+
watcher: NewWatcher(podInterface, watcherOpts),
104+
store: store,
105+
executorCount: executorCount,
67106
}
68107

69108
c.newExecutorFunc = func() Executor {
@@ -197,7 +236,7 @@ func (c *Controller) next() (session *types.Session, quit bool) {
197236
return nil, true
198237
}
199238

200-
if c.activeCount > executorCount {
239+
if c.activeCount > c.executorCount {
201240
return nil, false
202241
}
203242

testctrl/svc/orch/controller_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111

1212
func TestNewController(t *testing.T) {
1313
t.Run("nil clientset returns error", func(t *testing.T) {
14-
controller, err := NewController(nil, nil)
14+
controller, err := NewController(nil, nil, nil)
1515
if err == nil {
1616
t.Errorf("no error returned for nil clientset")
1717
}
@@ -50,7 +50,7 @@ func TestControllerSchedule(t *testing.T) {
5050

5151
for _, tc := range cases {
5252
t.Run(tc.description, func(t *testing.T) {
53-
controller, _ := NewController(fake.NewSimpleClientset(), nil)
53+
controller, _ := NewController(fake.NewSimpleClientset(), nil, nil)
5454
executor := &executorMock{}
5555
controller.newExecutorFunc = func() Executor {
5656
return executor
@@ -84,7 +84,7 @@ func TestControllerSchedule(t *testing.T) {
8484

8585
func TestControllerStart(t *testing.T) {
8686
t.Run("sets running state", func(t *testing.T) {
87-
controller, _ := NewController(fake.NewSimpleClientset(), nil)
87+
controller, _ := NewController(fake.NewSimpleClientset(), nil, nil)
8888
controller.Start()
8989
defer controller.Stop(0)
9090
if controller.Stopped() {
@@ -112,7 +112,7 @@ func TestControllerStart(t *testing.T) {
112112

113113
for _, tc := range cases {
114114
t.Run(tc.description, func(t *testing.T) {
115-
controller, _ := NewController(fake.NewSimpleClientset(), nil)
115+
controller, _ := NewController(fake.NewSimpleClientset(), nil, nil)
116116
controller.waitQueue = newQueue(limitlessTracker{})
117117

118118
if tc.mockNL != nil {
@@ -121,7 +121,7 @@ func TestControllerStart(t *testing.T) {
121121

122122
if tc.mockPW != nil {
123123
controller.pw = tc.mockPW
124-
controller.watcher = NewWatcher(tc.mockPW)
124+
controller.watcher = NewWatcher(tc.mockPW, nil)
125125
}
126126

127127
err := controller.Start()
@@ -170,7 +170,7 @@ func TestControllerStop(t *testing.T) {
170170

171171
for _, tc := range cases {
172172
t.Run(tc.description, func(t *testing.T) {
173-
controller, _ := NewController(fake.NewSimpleClientset(), nil)
173+
controller, _ := NewController(fake.NewSimpleClientset(), nil, nil)
174174
controller.running = true
175175
controller.waitQueue = newQueue(limitlessTracker{})
176176

testctrl/svc/orch/watcher.go

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,40 @@ import (
2121
//
2222
// Create watcher instances with the NewWatcher constructor, not a literal.
2323
type Watcher struct {
24-
eventChans map[string]chan *PodWatchEvent
25-
pw podWatcher
26-
quit chan struct{}
27-
mux sync.Mutex
28-
wi watch.Interface
24+
eventChans map[string]chan *PodWatchEvent
25+
eventBufferSize int
26+
pw podWatcher
27+
quit chan struct{}
28+
mux sync.Mutex
29+
wi watch.Interface
30+
}
31+
32+
// WatcherOptions overrides the defaults of the controller, allowing it to be
33+
// configured as needed.
34+
type WatcherOptions struct {
35+
// EventBufferSize specifies the size of the buffered channel for each
36+
// session. It allows the watcher to write additional kubernetes events
37+
// without blocking for reads. It defaults to 32 events.
38+
EventBufferSize int
2939
}
3040

3141
// NewWatcher creates and prepares a new watcher instance.
32-
func NewWatcher(pw podWatcher) *Watcher {
42+
func NewWatcher(pw podWatcher, options *WatcherOptions) *Watcher {
43+
opts := options
44+
if opts == nil {
45+
opts = &WatcherOptions{}
46+
}
47+
48+
eventBufferSize := opts.EventBufferSize
49+
if eventBufferSize == 0 {
50+
eventBufferSize = 32
51+
}
52+
3353
return &Watcher{
34-
eventChans: make(map[string]chan *PodWatchEvent),
35-
pw: pw,
36-
quit: make(chan struct{}),
54+
eventChans: make(map[string]chan *PodWatchEvent),
55+
eventBufferSize: eventBufferSize,
56+
pw: pw,
57+
quit: make(chan struct{}),
3758
}
3859
}
3960

@@ -71,7 +92,7 @@ func (w *Watcher) Subscribe(sessionName string) (<-chan *PodWatchEvent, error) {
7192
return nil, fmt.Errorf("session %v already has a follower", sessionName)
7293
}
7394

74-
eventChan := make(chan *PodWatchEvent, eventBufferSize)
95+
eventChan := make(chan *PodWatchEvent, w.eventBufferSize)
7596
w.eventChans[sessionName] = eventChan
7697
return eventChan, nil
7798
}
@@ -201,5 +222,3 @@ type PodWatchEvent struct {
201222
// Error may provide the error details that led to the failing health.
202223
Error error
203224
}
204-
205-
const eventBufferSize = 32

testctrl/svc/orch/watcher_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func TestWatcherStart(t *testing.T) {
2828

2929
for _, tc := range cases {
3030
t.Run(tc.description, func(t *testing.T) {
31-
w := NewWatcher(tc.mock)
31+
w := NewWatcher(tc.mock, nil)
3232

3333
defer w.Stop()
3434
err := w.Start()
@@ -49,7 +49,7 @@ func TestWatcherStop(t *testing.T) {
4949

5050
wi := watch.NewRaceFreeFake()
5151
mock := &podWatcherMock{wi: wi}
52-
w := NewWatcher(mock)
52+
w := NewWatcher(mock, nil)
5353

5454
if err = w.Start(); err != nil {
5555
t.Fatalf("setup failed, Start returned error: %v", err)
@@ -76,7 +76,7 @@ func TestWatcherSubscribe(t *testing.T) {
7676
var event *PodWatchEvent
7777
timeout := 100 * time.Millisecond * timeMultiplier
7878

79-
w := NewWatcher(nil)
79+
w := NewWatcher(nil, nil)
8080
sharedSessionName := "double-subscription"
8181
_, _ = w.Subscribe(sharedSessionName)
8282
if _, err = w.Subscribe(sharedSessionName); err == nil {
@@ -85,7 +85,7 @@ func TestWatcherSubscribe(t *testing.T) {
8585

8686
wi := watch.NewRaceFreeFake()
8787
mock := &podWatcherMock{wi: wi}
88-
w = NewWatcher(mock)
88+
w = NewWatcher(mock, nil)
8989

9090
if err = w.Start(); err != nil {
9191
t.Fatalf("setup failed, Start returned error: %v", err)
@@ -140,7 +140,7 @@ func TestWatcherUnsubscribe(t *testing.T) {
140140

141141
// test an error is returned without subscription
142142
t.Run("no subscription", func(t *testing.T) {
143-
w := NewWatcher(nil)
143+
w := NewWatcher(nil, nil)
144144
if err := w.Unsubscribe("non-existent"); err == nil {
145145
t.Errorf("did not return an error for Unsubscribe call without subscription")
146146
}
@@ -150,7 +150,7 @@ func TestWatcherUnsubscribe(t *testing.T) {
150150
t.Run("prevents further events", func(t *testing.T) {
151151
wi := watch.NewRaceFreeFake()
152152
mock := &podWatcherMock{wi: wi}
153-
w := NewWatcher(mock)
153+
w := NewWatcher(mock, nil)
154154

155155
if err = w.Start(); err != nil {
156156
t.Fatalf("setup failed, Start returned error: %v", err)

0 commit comments

Comments
 (0)