Skip to content

Commit

Permalink
WIP: add configmap for flow controller
Browse files Browse the repository at this point in the history
  • Loading branch information
pmorie committed Jul 19, 2018
1 parent 3388445 commit 79030d6
Showing 1 changed file with 42 additions and 3 deletions.
45 changes: 42 additions & 3 deletions pkg/controller/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package flow
import (
"fmt"
"log"
"sync"
"time"

"github.com/golang/glog"
Expand All @@ -43,6 +44,7 @@ import (
// TODO: Get rid of these, but needed as other controllers use them.
servingclientset "github.com/knative/serving/pkg/client/clientset/versioned"
servinginformers "github.com/knative/serving/pkg/client/informers/externalversions"
servingconfigmaps "github.com/knative/serving/pkg/configmap"

"github.com/knative/eventing/pkg/controller"

Expand All @@ -60,8 +62,10 @@ import (

const controllerAgentName = "flow-controller"

// TODO: This should come from a configmap
const defaultBusName = "stub"
// defaultClusterBusConfigMapKey is the name of the key in this controller's
// ConfigMap that contains the name of the default cluster bus for the flow
// controller to use.
const defaultClusterBusConfigMapKey = "default-cluster-bus"

// What field do we assume Object Reference exports as a resolvable target
const targetFieldName = "domainInternal"
Expand Down Expand Up @@ -109,6 +113,18 @@ type Controller struct {
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
workqueue workqueue.RateLimitingInterface

// configMutex guards the controller state that comes from the controller's configmap.
configMutex sync.RWMutex

// controllerConfigMapWatcher is used to watch the knative-system/flow-controller-config configMap.
controllerConfigMapWatcher servingconfigmaps.Watcher

// defaultBusName is the default bus name to use to create channels; it is
// updated if the knative-system/flow-controller-config ConfigMap exists and
// contains the 'default-cluster-bus-name' key.
defaultClusterBusName string

// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder
Expand Down Expand Up @@ -170,6 +186,9 @@ func NewController(
},
})

// TODO: const for knative-system
controller.controllerConfigMapWatcher = servingconfigmaps.NewDefaultWatcher(kubeclientset, "knative-system")

return controller
}

Expand All @@ -184,6 +203,9 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
// Start the informer factories to begin populating the informer caches
glog.Info("Starting Flow controller")

glog.Info("Watching controller config")
c.controllerConfigMapWatcher.Watch("flow-controller-config", c.receiveControllerConfig)

// Wait for the caches to be synced before starting workers
glog.Info("Waiting for Flow informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.flowsSynced); !ok {
Expand Down Expand Up @@ -213,6 +235,23 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
return nil
}

func (c *Controller) receiveControllerConfig(configMap *corev1.ConfigMap) {
c.configMutex.Lock()
defer c.configMutex.Unlock()

if value, ok := configMap.Data[defaultClusterBusConfigMapKey]; ok {
c.defaultClusterBusName = value
} else {
c.defaultClusterBusName = "stub"
}
}

func (c *Controller) getDefaultClusterBusName() string {
c.configMutex.RLock()
defer c.configMutex.RUnlock()
return c.defaultClusterBusName
}

// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
Expand Down Expand Up @@ -474,7 +513,7 @@ func (c *Controller) createChannel(flow *v1alpha1.Flow) (*channelsv1alpha1.Chann
},
},
Spec: channelsv1alpha1.ChannelSpec{
ClusterBus: defaultBusName,
ClusterBus: c.getDefaultClusterBusName(),
},
}
return c.clientset.ChannelsV1alpha1().Channels(flow.Namespace).Create(channel)
Expand Down

0 comments on commit 79030d6

Please sign in to comment.