@@ -28,9 +28,6 @@ import (
2828 "github.com/grpc/grpc/testctrl/svc/types"
2929)
3030
31- // executorCount specifies the maximum number of sessions that should be processed concurrently.
32- const executorCount = 1
33-
3431// Controller serves as the coordinator for orchestrating sessions. It manages active and idle
3532// sessions, as well as, interactions with Kubernetes through a set of a internal types.
3633type Controller struct {
@@ -40,30 +37,72 @@ type Controller struct {
4037 nl NodeLister
4138 watcher * Watcher
4239 waitQueue * queue
40+ executorCount int
4341 activeCount int
4442 running bool
4543 wg sync.WaitGroup
4644 mux sync.Mutex
4745 newExecutorFunc func () Executor
4846}
4947
50- // NewController creates a controller using a Kubernetes clientset and a store. The clientset allows
51- // the controller to interact with Kubernetes. The store is used to report significant orchestration
52- // events, so progress can be reported. A nil clientset will result in an error.
53- func NewController (clientset kubernetes.Interface , store store.Store ) (* Controller , error ) {
48+ // ControllerOptions overrides the defaults of the controller, allowing it to be
49+ // configured as needed.
50+ type ControllerOptions struct {
51+ // ExecutorCount specifies the maximum number of sessions that should be
52+ // processed at a time. It defaults to 1, disabling concurrent sessions.
53+ ExecutorCount int
54+
55+ // WatchEventBufferSize specifies the size of the buffered channel for
56+ // each channel the watcher creates. It allows the watcher to write
57+ // additional kubernetes events without blocking for reads. It defaults
58+ // to 32 events.
59+ WatchEventBufferSize int
60+ }
61+
62+ // NewController creates a controller using a Kubernetes clientset, store and an
63+ // optional ControllerOptions instance.
64+ //
65+ // The clientset allows the controller to interact with Kubernetes. If nil, an
66+ // error will be returned instead of a controller.
67+ //
68+ // The store is used to report orchestration events, so progress can be reported
69+ // to a user.
70+ //
71+ // The options value allows the controller to be customized. Specifying nil will
72+ // configure the controller to sane defaults described in the ControllerOptions
73+ // documentation.
74+ func NewController (clientset kubernetes.Interface , store store.Store , options * ControllerOptions ) (* Controller , error ) {
5475 if clientset == nil {
5576 return nil , errors .New ("cannot create controller from nil kubernetes clientset" )
5677 }
5778
79+ opts := options
80+ if opts == nil {
81+ opts = & ControllerOptions {}
82+ }
83+
84+ executorCount := opts .ExecutorCount
85+ if executorCount == 0 {
86+ executorCount = 1
87+ }
88+
89+ watcherOpts := & WatcherOptions {
90+ EventBufferSize : opts .WatchEventBufferSize ,
91+ }
92+ if watcherOpts .EventBufferSize == 0 {
93+ watcherOpts .EventBufferSize = 32
94+ }
95+
5896 coreV1Interface := clientset .CoreV1 ()
5997 podInterface := coreV1Interface .Pods (corev1 .NamespaceDefault )
6098
6199 c := & Controller {
62- pcd : podInterface ,
63- pw : podInterface ,
64- nl : coreV1Interface .Nodes (),
65- watcher : NewWatcher (podInterface ),
66- store : store ,
100+ pcd : podInterface ,
101+ pw : podInterface ,
102+ nl : coreV1Interface .Nodes (),
103+ watcher : NewWatcher (podInterface , watcherOpts ),
104+ store : store ,
105+ executorCount : executorCount ,
67106 }
68107
69108 c .newExecutorFunc = func () Executor {
@@ -197,7 +236,7 @@ func (c *Controller) next() (session *types.Session, quit bool) {
197236 return nil , true
198237 }
199238
200- if c .activeCount > executorCount {
239+ if c .activeCount > c . executorCount {
201240 return nil , false
202241 }
203242
0 commit comments