diff --git a/pkg/api/config.go b/pkg/api/config.go index eefc41db..cf1d0e0d 100644 --- a/pkg/api/config.go +++ b/pkg/api/config.go @@ -29,6 +29,7 @@ type Config struct { DefaultTriggers []string // ServiceDefaultTriggers holds list of default triggers per service ServiceDefaultTriggers map[string][]string + Namespace string } // Returns list of destinations for the specified trigger @@ -76,6 +77,7 @@ func ParseConfig(configMap *v1.ConfigMap, secret *v1.Secret) (*Config, error) { Triggers: map[string][]triggers.Condition{}, ServiceDefaultTriggers: map[string][]string{}, Templates: map[string]services.Notification{}, + Namespace: configMap.Namespace, } if subscriptionYaml, ok := configMap.Data["subscriptions"]; ok { if err := yaml.Unmarshal([]byte(subscriptionYaml), &cfg.Subscriptions); err != nil { diff --git a/pkg/api/factory.go b/pkg/api/factory.go index 9fb09853..501f1833 100644 --- a/pkg/api/factory.go +++ b/pkg/api/factory.go @@ -1,6 +1,7 @@ package api import ( + log "github.com/sirupsen/logrus" "sync" v1 "k8s.io/api/core/v1" @@ -18,6 +19,10 @@ type Settings struct { SecretName string // InitGetVars returns a function that produces notifications context variables InitGetVars func(cfg *Config, configMap *v1.ConfigMap, secret *v1.Secret) (GetVars, error) + // Default namespace for ConfigMap and Secret. + // For self-service notification, we get notification configurations from rollout resource namespace + // and also the default namespace + Namespace string } // Factory creates an API instance @@ -25,6 +30,13 @@ type Factory interface { GetAPI() (API, error) } +// For self-service notification, factory creates a map of APIs that include +// api in the namespace specified in input parameter +// and api in the namespace specified in the Settings +type FactoryWithMultipleAPIs interface { + GetAPIsWithNamespaceV2(namespace string) (map[string]API, error) +} + type apiFactory struct { Settings @@ -32,6 +44,18 @@ type apiFactory struct { secretLister v1listers.SecretNamespaceLister lock sync.Mutex api API + + // For self-service notification + cmInformer cache.SharedIndexInformer + secretsInformer cache.SharedIndexInformer + apiMap map[string]API + cacheList []apisCache +} + +type apisCache struct { + api API + namespace string + refresh bool } func NewFactory(settings Settings, namespace string, secretsInformer cache.SharedIndexInformer, cmInformer cache.SharedIndexInformer) *apiFactory { @@ -39,6 +63,12 @@ func NewFactory(settings Settings, namespace string, secretsInformer cache.Share Settings: settings, cmLister: v1listers.NewConfigMapLister(cmInformer.GetIndexer()).ConfigMaps(namespace), secretLister: v1listers.NewSecretLister(secretsInformer.GetIndexer()).Secrets(namespace), + + // For self-service notification + cmInformer: cmInformer, + secretsInformer: secretsInformer, + apiMap: make(map[string]API), + cacheList: []apisCache{}, } secretsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -70,12 +100,12 @@ func (f *apiFactory) invalidateIfHasName(name string, obj interface{}) { return } if metaObj.GetName() == name { - f.invalidateCache() + f.invalidateCache(metaObj.GetNamespace()) } } -func (f *apiFactory) getConfigMapAndSecret() (*v1.ConfigMap, *v1.Secret, error) { - cm, err := f.cmLister.Get(f.ConfigMapName) +func (f *apiFactory) getConfigMapAndSecretWithListers(cmLister v1listers.ConfigMapNamespaceLister, secretLister v1listers.SecretNamespaceLister) (*v1.ConfigMap, *v1.Secret, error) { + cm, err := cmLister.Get(f.ConfigMapName) if err != nil { if errors.IsNotFound(err) { cm = &v1.ConfigMap{} @@ -84,7 +114,7 @@ func (f *apiFactory) getConfigMapAndSecret() (*v1.ConfigMap, *v1.Secret, error) } } - secret, err := f.secretLister.Get(f.SecretName) + secret, err := secretLister.Get(f.SecretName) if err != nil { if errors.IsNotFound(err) { secret = &v1.Secret{} @@ -96,29 +126,38 @@ func (f *apiFactory) getConfigMapAndSecret() (*v1.ConfigMap, *v1.Secret, error) return cm, secret, err } -func (f *apiFactory) invalidateCache() { +func (f *apiFactory) getConfigMapAndSecret(namespace string) (*v1.ConfigMap, *v1.Secret, error) { + cmLister := v1listers.NewConfigMapLister(f.cmInformer.GetIndexer()).ConfigMaps(namespace) + secretLister := v1listers.NewSecretLister(f.secretsInformer.GetIndexer()).Secrets(namespace) + + return f.getConfigMapAndSecretWithListers(cmLister, secretLister) +} + +func (f *apiFactory) invalidateCache(namespace string) { f.lock.Lock() defer f.lock.Unlock() f.api = nil + + f.apiMap[namespace] = nil + + for _, mycache := range f.cacheList { + if mycache.namespace == namespace { + mycache.refresh = true + mycache.api = nil + } + } } func (f *apiFactory) GetAPI() (API, error) { f.lock.Lock() defer f.lock.Unlock() if f.api == nil { - cm, secret, err := f.getConfigMapAndSecret() - if err != nil { - return nil, err - } - cfg, err := ParseConfig(cm, secret) - if err != nil { - return nil, err - } - getVars, err := f.InitGetVars(cfg, cm, secret) + cm, secret, err := f.getConfigMapAndSecretWithListers(f.cmLister, f.secretLister) if err != nil { return nil, err } - api, err := NewAPI(*cfg, getVars) + + api, err := f.getApiFromConfigmapAndSecret(cm, secret) if err != nil { return nil, err } @@ -126,3 +165,147 @@ func (f *apiFactory) GetAPI() (API, error) { } return f.api, nil } + +// For self-service notification, we need a map of apis which include api in the namespace and api in the setting's namespace +func (f *apiFactory) GetAPIsWithNamespace(namespace string) (map[string]API, error) { + f.lock.Lock() + defer f.lock.Unlock() + + apis := make(map[string]API) + + if f.apiMap[namespace] != nil && f.apiMap[f.Settings.Namespace] != nil { + apis[namespace] = f.apiMap[namespace] + apis[f.Settings.Namespace] = f.apiMap[f.Settings.Namespace] + return apis, nil + } + + if f.apiMap[namespace] != nil { + apis[namespace] = f.apiMap[namespace] + api, err := f.getApiFromNamespace(f.Settings.Namespace) + if err == nil { + apis[f.Settings.Namespace] = api + f.apiMap[f.Settings.Namespace] = api + } else { + log.Warnf("getApiFromNamespace %s got error %s", f.Settings.Namespace, err) + } + return apis, nil + } + + if f.apiMap[f.Settings.Namespace] != nil { + apis[f.Settings.Namespace] = f.apiMap[f.Settings.Namespace] + api, err := f.getApiFromNamespace(namespace) + if err == nil { + apis[namespace] = api + f.apiMap[namespace] = api + } else { + log.Warnf("getApiFromNamespace %s got error %s", namespace, err) + } + return apis, nil + } + + apiFromNamespace, errApiFromNamespace := f.getApiFromNamespace(namespace) + apiFromSettings, errApiFromSettings := f.getApiFromNamespace(f.Settings.Namespace) + + if errApiFromNamespace == nil { + apis[namespace] = apiFromNamespace + f.apiMap[namespace] = apiFromNamespace + } else { + log.Warnf("getApiFromNamespace %s got error %s", namespace, errApiFromNamespace) + } + + if errApiFromSettings == nil { + apis[f.Settings.Namespace] = apiFromSettings + f.apiMap[f.Settings.Namespace] = apiFromSettings + } else { + log.Warnf("getApiFromNamespace %s got error %s", f.Settings.Namespace, errApiFromSettings) + } + + // Only return error when we received error from both namespace provided in the input paremeter and settings' namespace + if errApiFromNamespace != nil && errApiFromSettings != nil { + return apis, errApiFromSettings + } else { + return apis, nil + } +} + +// For self-service notification, we need a map of apis which include api in the namespace and api in the setting's namespace +func (f *apiFactory) GetAPIsWithNamespaceV2(namespace string) (map[string]API, error) { + f.lock.Lock() + defer f.lock.Unlock() + + apis := make(map[string]API) + + // namespaces to look for notification configurations + namespaces := []string{namespace} + if f.Settings.Namespace != "" && f.Settings.Namespace != namespace { + namespaces = append(namespaces, f.Settings.Namespace) + } + + for _, namespace := range namespaces { + //Look up the cacheList + //Exist in cacheList and does not need refresh, then use it + //Exist in cacheList and needs refresh, then retrieve it + //Doesn't exist in cacheList, get it and put in cacheList + foundInCache := false + for _, cache := range f.cacheList { + if cache.namespace == namespace { + foundInCache = true + if !cache.refresh { + //Found in cache, and no need to refresh + if cache.api != nil { + apis[namespace] = cache.api + } + } else { + //Found in cache, and need refresh + api, err := f.getApiFromNamespace(namespace) + if err == nil { + apis[namespace] = api + cache.api = api + cache.refresh = false + } else { + log.Warnf("getApiFromNamespace %s got error %s", namespace, err) + } + } + break + } + } + + if !foundInCache { + api, err := f.getApiFromNamespace(namespace) + if err == nil { + apis[namespace] = api + myCache := apisCache{refresh: false, api: api, namespace: namespace} + f.cacheList = append(f.cacheList, myCache) + } else { + log.Warnf("getApiFromNamespace %s got error %s", namespace, err) + } + } + } + + return apis, nil +} + +func (f *apiFactory) getApiFromNamespace(namespace string) (API, error) { + cm, secret, err := f.getConfigMapAndSecret(namespace) + if err != nil { + return nil, err + } + return f.getApiFromConfigmapAndSecret(cm, secret) + +} + +func (f *apiFactory) getApiFromConfigmapAndSecret(cm *v1.ConfigMap, secret *v1.Secret) (API, error) { + cfg, err := ParseConfig(cm, secret) + if err != nil { + return nil, err + } + getVars, err := f.InitGetVars(cfg, cm, secret) + if err != nil { + return nil, err + } + api, err := NewAPI(*cfg, getVars) + if err != nil { + return nil, err + } + return api, nil +} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index d4441302..45347258 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -142,16 +142,32 @@ func NewController( return ctrl } +// For self-service notification +// This controller is using FactoryWithMultipleAPIs +func NewControllerWithMultipleNamespace( + client dynamic.NamespaceableResourceInterface, + informer cache.SharedIndexInformer, + apiFactoryWithMultipleNamespace api.FactoryWithMultipleAPIs, + opts ...Opts, +) *notificationController { + + ctrl := NewController(client, informer, nil, opts...) + ctrl.apiFactoryWithMultipleAPIs = apiFactoryWithMultipleNamespace + + return ctrl +} + type notificationController struct { - client dynamic.NamespaceableResourceInterface - informer cache.SharedIndexInformer - queue workqueue.RateLimitingInterface - apiFactory api.Factory - metricsRegistry *MetricsRegistry - skipProcessing func(obj v1.Object) (bool, string) - alterDestinations func(obj v1.Object, destinations services.Destinations, cfg api.Config) services.Destinations - toUnstructured func(obj v1.Object) (*unstructured.Unstructured, error) - eventCallback func(eventSequence NotificationEventSequence) + client dynamic.NamespaceableResourceInterface + informer cache.SharedIndexInformer + queue workqueue.RateLimitingInterface + apiFactory api.Factory + metricsRegistry *MetricsRegistry + skipProcessing func(obj v1.Object) (bool, string) + alterDestinations func(obj v1.Object, destinations services.Destinations, cfg api.Config) services.Destinations + toUnstructured func(obj v1.Object) (*unstructured.Unstructured, error) + eventCallback func(eventSequence NotificationEventSequence) + apiFactoryWithMultipleAPIs api.FactoryWithMultipleAPIs } func (c *notificationController) Run(threadiness int, stopCh <-chan struct{}) { @@ -169,12 +185,9 @@ func (c *notificationController) Run(threadiness int, stopCh <-chan struct{}) { log.Warn("Controller has stopped.") } -func (c *notificationController) processResource(resource v1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) (map[string]string, error) { +func (c *notificationController) processResourceWithAPI(api api.API, resource v1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) (map[string]string, error) { + apiNamespace := api.GetConfig().Namespace notificationsState := NewStateFromRes(resource) - api, err := c.apiFactory.GetAPI() - if err != nil { - return nil, err - } destinations := c.getDestinations(resource, api.GetConfig()) if len(destinations) == 0 { @@ -189,8 +202,8 @@ func (c *notificationController) processResource(resource v1.Object, logEntry *l for trigger, destinations := range destinations { res, err := api.RunTrigger(trigger, un.Object) if err != nil { - logEntry.Debugf("Failed to execute condition of trigger %s: %v", trigger, err) - eventSequence.addWarning(fmt.Errorf("failed to execute condition of trigger %s: %v", trigger, err)) + logEntry.Debugf("Failed to execute condition of trigger %s: %v using the configuration in namespace %s", trigger, err, apiNamespace) + eventSequence.addWarning(fmt.Errorf("failed to execute condition of trigger %s: %v using the configuration in namespace %s", trigger, err, apiNamespace)) } logEntry.Infof("Trigger %s result: %v", trigger, res) @@ -206,22 +219,22 @@ func (c *notificationController) processResource(resource v1.Object, logEntry *l for _, to := range destinations { if changed := notificationsState.SetAlreadyNotified(trigger, cr, to, true); !changed { - logEntry.Infof("Notification about condition '%s.%s' already sent to '%v'", trigger, cr.Key, to) + logEntry.Infof("Notification about condition '%s.%s' already sent to '%v' using the configuration in namespace %s", trigger, cr.Key, to, apiNamespace) eventSequence.addDelivered(NotificationDelivery{ Trigger: trigger, Destination: to, AlreadyNotified: true, }) } else { - logEntry.Infof("Sending notification about condition '%s.%s' to '%v'", trigger, cr.Key, to) + logEntry.Infof("Sending notification about condition '%s.%s' to '%v' using the configuration in namespace %s", trigger, cr.Key, to, apiNamespace) if err := api.Send(un.Object, cr.Templates, to); err != nil { - logEntry.Errorf("Failed to notify recipient %s defined in resource %s/%s: %v", - to, resource.GetNamespace(), resource.GetName(), err) + logEntry.Errorf("Failed to notify recipient %s defined in resource %s/%s: %v using the configuration in namespace %s", + to, resource.GetNamespace(), resource.GetName(), err, apiNamespace) notificationsState.SetAlreadyNotified(trigger, cr, to, false) c.metricsRegistry.IncDeliveriesCounter(trigger, to.Service, false) - eventSequence.addError(fmt.Errorf("failed to deliver notification %s to %s: %v", trigger, to, err)) + eventSequence.addError(fmt.Errorf("failed to deliver notification %s to %s: %v using the configuration in namespace %s", trigger, to, err, apiNamespace)) } else { - logEntry.Debugf("Notification %s was sent", to.Recipient) + logEntry.Debugf("Notification %s was sent using the configuration in namespace %s", to.Recipient, apiNamespace) c.metricsRegistry.IncDeliveriesCounter(trigger, to.Service, true) eventSequence.addDelivered(NotificationDelivery{ Trigger: trigger, @@ -294,7 +307,32 @@ func (c *notificationController) processQueueItem() (processNext bool) { } } - annotations, err := c.processResource(resource, logEntry, &eventSequence) + if c.apiFactoryWithMultipleAPIs == nil { + api, err := c.apiFactory.GetAPI() + if err != nil { + logEntry.Errorf("Failed to process: %v", err) + eventSequence.addError(err) + return + } + c.processResource(api, resource, logEntry, &eventSequence) + } else { + apisWithNamespace, err := c.apiFactoryWithMultipleAPIs.GetAPIsWithNamespaceV2(resource.GetNamespace()) + if err != nil { + logEntry.Errorf("Failed to process: %v", err) + eventSequence.addError(err) + return + } + for _, api := range apisWithNamespace { + c.processResource(api, resource, logEntry, &eventSequence) + } + } + logEntry.Info("Processing completed") + + return +} + +func (c *notificationController) processResource(api api.API, resource v1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) { + annotations, err := c.processResourceWithAPI(api, resource, logEntry, eventSequence) if err != nil { logEntry.Errorf("Failed to process: %v", err) eventSequence.addError(err) @@ -307,7 +345,7 @@ func (c *notificationController) processQueueItem() (processNext bool) { annotationsPatch[k] = v } for k := range resource.GetAnnotations() { - if _, ok = annotations[k]; !ok { + if _, ok := annotations[k]; !ok { annotationsPatch[k] = nil } } @@ -329,11 +367,9 @@ func (c *notificationController) processQueueItem() (processNext bool) { if err := c.informer.GetStore().Update(resource); err != nil { logEntry.Warnf("Failed to store update resource in informer: %v", err) eventSequence.addWarning(fmt.Errorf("failed to store update resource in informer: %v", err)) + return } } - logEntry.Info("Processing completed") - - return } func mapsEqual(first, second map[string]string) bool { diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 2128e15d..f122ff6a 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -115,7 +115,7 @@ func TestSendsNotificationIfTriggered(t *testing.T) { return true }), []string{"test"}, services.Destination{Service: "mock", Recipient: "recipient"}).Return(nil) - annotations, err := ctrl.processResource(app, logEntry, &NotificationEventSequence{}) + annotations, err := ctrl.processResourceWithAPI(api, app, logEntry, &NotificationEventSequence{}) if err != nil { logEntry.Errorf("Failed to process: %v", err) } @@ -141,7 +141,7 @@ func TestDoesNotSendNotificationIfAnnotationPresent(t *testing.T) { api.EXPECT().RunTrigger("my-trigger", gomock.Any()).Return([]triggers.ConditionResult{{Triggered: true, Templates: []string{"test"}}}, nil) - _, err = ctrl.processResource(app, logEntry, &NotificationEventSequence{}) + _, err = ctrl.processResourceWithAPI(api, app, logEntry, &NotificationEventSequence{}) if err != nil { logEntry.Errorf("Failed to process: %v", err) } @@ -163,7 +163,7 @@ func TestRemovesAnnotationIfNoTrigger(t *testing.T) { api.EXPECT().RunTrigger("my-trigger", gomock.Any()).Return([]triggers.ConditionResult{{Triggered: false}}, nil) - annotations, err := ctrl.processResource(app, logEntry, &NotificationEventSequence{}) + annotations, err := ctrl.processResourceWithAPI(api, app, logEntry, &NotificationEventSequence{}) if err != nil { logEntry.Errorf("Failed to process: %v", err) } @@ -298,7 +298,7 @@ func TestWithEventCallback(t *testing.T) { description: "EventCallback should be invoked with non-nil error on send failure", sendErr: errors.New("this is a send error"), expectedErrors: []error{ - errors.New("failed to deliver notification my-trigger to {mock recipient}: this is a send error"), + errors.New("failed to deliver notification my-trigger to {mock recipient}: this is a send error using the configuration in namespace "), }, }, { diff --git a/pkg/mocks/factory.go b/pkg/mocks/factory.go index 136955b1..c4aab2c4 100644 --- a/pkg/mocks/factory.go +++ b/pkg/mocks/factory.go @@ -10,3 +10,9 @@ type FakeFactory struct { func (f *FakeFactory) GetAPI() (api.API, error) { return f.Api, f.Err } + +func (f *FakeFactory) GetAPIsWithNamespaceV2(namespace string) (map[string]api.API, error) { + apiMap := make(map[string]api.API) + apiMap[namespace] = f.Api + return apiMap, f.Err +}