Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: add configmap for flow controller #240

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 42 additions & 3 deletions pkg/controller/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package flow
import (
"fmt"
"log"
"sync"
"time"

"github.com/golang/glog"
Expand All @@ -43,6 +44,7 @@ import (
// TODO: Get rid of these, but needed as other controllers use them.
servingclientset "github.com/knative/serving/pkg/client/clientset/versioned"
servinginformers "github.com/knative/serving/pkg/client/informers/externalversions"
servingconfigmaps "github.com/knative/serving/pkg/configmap"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm totally fine for using this from the serving stack for now, but we should pull this into a common place so we don't have to take a dependency between repos. Perhaps add a TODO here and file an issue in Serving to move this out to some common repo.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is now in knative/pkg


"github.com/knative/eventing/pkg/controller"

Expand All @@ -60,8 +62,10 @@ import (

const controllerAgentName = "flow-controller"

// TODO: This should come from a configmap
const defaultBusName = "stub"
// defaultClusterBusConfigMapKey is the name of the key in this controller's
// ConfigMap that contains the name of the default cluster bus for the flow
// controller to use.
const defaultClusterBusConfigMapKey = "default-cluster-bus"

// What field do we assume Object Reference exports as a resolvable target
const targetFieldName = "domainInternal"
Expand Down Expand Up @@ -109,6 +113,18 @@ type Controller struct {
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
workqueue workqueue.RateLimitingInterface

// configMutex guards the controller state that comes from the controller's configmap.
configMutex sync.RWMutex

// controllerConfigMapWatcher is used to watch the knative-system/flow-controller-config configMap.
controllerConfigMapWatcher servingconfigmaps.Watcher

// defaultBusName is the default bus name to use to create channels; it is
// updated if the knative-system/flow-controller-config ConfigMap exists and
// contains the 'default-cluster-bus-name' key.
defaultClusterBusName string

// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder
Expand Down Expand Up @@ -170,6 +186,9 @@ func NewController(
},
})

// TODO: const for knative-system
controller.controllerConfigMapWatcher = servingconfigmaps.NewDefaultWatcher(kubeclientset, "knative-system")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be knative-eventing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also think there's a way to get the eventing namespace... time passes... yeah, pkg/names.go

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use system.Namespace for now.


return controller
}

Expand All @@ -184,6 +203,9 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
// Start the informer factories to begin populating the informer caches
glog.Info("Starting Flow controller")

glog.Info("Watching controller config")
c.controllerConfigMapWatcher.Watch("flow-controller-config", c.receiveControllerConfig)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe hoist this into the const?


// Wait for the caches to be synced before starting workers
glog.Info("Waiting for Flow informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.flowsSynced); !ok {
Expand Down Expand Up @@ -213,6 +235,23 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
return nil
}

func (c *Controller) receiveControllerConfig(configMap *corev1.ConfigMap) {
c.configMutex.Lock()
defer c.configMutex.Unlock()

if value, ok := configMap.Data[defaultClusterBusConfigMapKey]; ok {
c.defaultClusterBusName = value
} else {
c.defaultClusterBusName = "stub"
}
}

func (c *Controller) getDefaultClusterBusName() string {
c.configMutex.RLock()
defer c.configMutex.RUnlock()
return c.defaultClusterBusName
}

// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
Expand Down Expand Up @@ -474,7 +513,7 @@ func (c *Controller) createChannel(flow *v1alpha1.Flow) (*channelsv1alpha1.Chann
},
},
Spec: channelsv1alpha1.ChannelSpec{
ClusterBus: defaultBusName,
ClusterBus: c.getDefaultClusterBusName(),
},
}
return c.clientset.ChannelsV1alpha1().Channels(flow.Namespace).Create(channel)
Expand Down