Skip to content

Commit 39dfcb6

Browse files
zachallermayzhang2000Juneezeecrenshaw-devEricTendian
authored
Notification Self Service Namespaced Config (#191)
* self-service from event * chore: replace github.com/ghodss/yaml with sigs.k8s.io/yaml (#175) At the time of making this commit, the package `github.com/ghodss/yaml` is no longer actively maintained. `sigs.k8s.io/yaml` is a permanent fork of `ghodss/yaml` and is actively maintained by Kubernetes SIG. Reference: argoproj/argo-cd#13292 (comment) Signed-off-by: Eng Zer Jun <engzerjun@gmail.com> Co-authored-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> * feat: Adding new PagerDuty integration based on Events API v2 (#105) Signed-off-by: Eric Tendian <erictendian@gmail.com> * self-service from event * self-service from event * self-service from event * self-service from event * self-service from event * clean up factory.go * error handleing in factory.go * error handleing in factory.go * error handleing in factory.go * error handleing in factory.go * error handleing in factory.go * error handleing in factory.go * error handleing in factory.go * better caching * better caching * refactor Signed-off-by: zachaller <zachaller@users.noreply.github.com> * unused Signed-off-by: zachaller <zachaller@users.noreply.github.com> * lint Signed-off-by: zachaller <zachaller@users.noreply.github.com> * bump ci Signed-off-by: zachaller <zachaller@users.noreply.github.com> * small fixes Signed-off-by: zachaller <zachaller@users.noreply.github.com> * rename Signed-off-by: zachaller <zachaller@users.noreply.github.com> * rename Signed-off-by: zachaller <zachaller@users.noreply.github.com> * rename Signed-off-by: zachaller <zachaller@users.noreply.github.com> * change log line Signed-off-by: zachaller <zachaller@users.noreply.github.com> * update tests Signed-off-by: zachaller <zachaller@users.noreply.github.com> * continue on error Signed-off-by: zachaller <zachaller@users.noreply.github.com> * continue on errors Signed-off-by: zachaller <zachaller@users.noreply.github.com> * add godoc Signed-off-by: zachaller <zachaller@users.noreply.github.com> * continue on error Signed-off-by: zachaller <zachaller@users.noreply.github.com> * fix error by going back to non namespace support Signed-off-by: zachaller <zachaller@users.noreply.github.com> * fix log Signed-off-by: zachaller <zachaller@users.noreply.github.com> * improve log on error Signed-off-by: zachaller <zachaller@users.noreply.github.com> --------- Signed-off-by: Eng Zer Jun <engzerjun@gmail.com> Signed-off-by: Eric Tendian <erictendian@gmail.com> Signed-off-by: zachaller <zachaller@users.noreply.github.com> Co-authored-by: May Zhang <may_zhang@intuit.com> Co-authored-by: Eng Zer Jun <engzerjun@gmail.com> Co-authored-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> Co-authored-by: Eric Tendian <erictendian@gmail.com>
1 parent 71003c9 commit 39dfcb6

File tree

8 files changed

+167
-58
lines changed

8 files changed

+167
-58
lines changed

.github/workflows/ci.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ jobs:
2727
- name: Run golangci-lint
2828
uses: golangci/golangci-lint-action@v3
2929
with:
30-
version: v1.45.2
30+
version: v1.49.0
3131
args: --timeout 5m
3232
test:
3333
runs-on: ubuntu-latest

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ require (
2626
k8s.io/api v0.23.3
2727
k8s.io/apimachinery v0.23.3
2828
k8s.io/client-go v0.23.3
29+
k8s.io/utils v0.0.0-20211116205334-6203023598ed
2930
sigs.k8s.io/yaml v1.3.0
3031
)
3132

@@ -84,7 +85,6 @@ require (
8485
gopkg.in/yaml.v2 v2.4.0 // indirect
8586
k8s.io/klog/v2 v2.30.0 // indirect
8687
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect
87-
k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect
8888
sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect
8989
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
9090
)

pkg/api/config.go

+2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type Config struct {
2929
DefaultTriggers []string
3030
// ServiceDefaultTriggers holds list of default triggers per service
3131
ServiceDefaultTriggers map[string][]string
32+
Namespace string
3233
}
3334

3435
// Returns list of destinations for the specified trigger
@@ -76,6 +77,7 @@ func ParseConfig(configMap *v1.ConfigMap, secret *v1.Secret) (*Config, error) {
7677
Triggers: map[string][]triggers.Condition{},
7778
ServiceDefaultTriggers: map[string][]string{},
7879
Templates: map[string]services.Notification{},
80+
Namespace: configMap.Namespace,
7981
}
8082
if subscriptionYaml, ok := configMap.Data["subscriptions"]; ok {
8183
if err := yaml.Unmarshal([]byte(subscriptionYaml), &cfg.Subscriptions); err != nil {

pkg/api/factory.go

+100-32
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
package api
22

33
import (
4+
"fmt"
45
"sync"
56

7+
log "github.com/sirupsen/logrus"
8+
9+
"k8s.io/utils/strings/slices"
10+
611
v1 "k8s.io/api/core/v1"
712
"k8s.io/apimachinery/pkg/api/errors"
813
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -18,27 +23,38 @@ type Settings struct {
1823
SecretName string
1924
// InitGetVars returns a function that produces notifications context variables
2025
InitGetVars func(cfg *Config, configMap *v1.ConfigMap, secret *v1.Secret) (GetVars, error)
26+
// DefaultNamespace default namespace for ConfigMap and Secret.
27+
// For self-service notification, we get notification configurations from rollout resource namespace
28+
// and also the default namespace
29+
DefaultNamespace string
2130
}
2231

2332
// Factory creates an API instance
2433
type Factory interface {
2534
GetAPI() (API, error)
35+
GetAPIsFromNamespace(namespace string) (map[string]API, error)
2636
}
2737

2838
type apiFactory struct {
2939
Settings
3040

31-
cmLister v1listers.ConfigMapNamespaceLister
32-
secretLister v1listers.SecretNamespaceLister
41+
cmLister v1listers.ConfigMapLister
42+
secretLister v1listers.SecretLister
3343
lock sync.Mutex
34-
api API
44+
apiMap map[string]API
3545
}
3646

37-
func NewFactory(settings Settings, namespace string, secretsInformer cache.SharedIndexInformer, cmInformer cache.SharedIndexInformer) *apiFactory {
47+
// NewFactory creates a new API factory if namespace is not empty, it will override the default namespace set in settings
48+
func NewFactory(settings Settings, defaultNamespace string, secretsInformer cache.SharedIndexInformer, cmInformer cache.SharedIndexInformer) *apiFactory {
49+
if defaultNamespace != "" {
50+
settings.DefaultNamespace = defaultNamespace
51+
}
52+
3853
factory := &apiFactory{
3954
Settings: settings,
40-
cmLister: v1listers.NewConfigMapLister(cmInformer.GetIndexer()).ConfigMaps(namespace),
41-
secretLister: v1listers.NewSecretLister(secretsInformer.GetIndexer()).Secrets(namespace),
55+
cmLister: v1listers.NewConfigMapLister(cmInformer.GetIndexer()),
56+
secretLister: v1listers.NewSecretLister(secretsInformer.GetIndexer()),
57+
apiMap: make(map[string]API),
4258
}
4359

4460
secretsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -70,12 +86,15 @@ func (f *apiFactory) invalidateIfHasName(name string, obj interface{}) {
7086
return
7187
}
7288
if metaObj.GetName() == name {
73-
f.invalidateCache()
89+
f.lock.Lock()
90+
defer f.lock.Unlock()
91+
f.apiMap[metaObj.GetNamespace()] = nil
92+
log.Info("invalidated cache for resource in namespace: ", metaObj.GetNamespace(), " with the name: ", metaObj.GetName())
7493
}
7594
}
7695

77-
func (f *apiFactory) getConfigMapAndSecret() (*v1.ConfigMap, *v1.Secret, error) {
78-
cm, err := f.cmLister.Get(f.ConfigMapName)
96+
func (f *apiFactory) getConfigMapAndSecretWithListers(cmLister v1listers.ConfigMapNamespaceLister, secretLister v1listers.SecretNamespaceLister) (*v1.ConfigMap, *v1.Secret, error) {
97+
cm, err := cmLister.Get(f.ConfigMapName)
7998
if err != nil {
8099
if errors.IsNotFound(err) {
81100
cm = &v1.ConfigMap{}
@@ -84,7 +103,7 @@ func (f *apiFactory) getConfigMapAndSecret() (*v1.ConfigMap, *v1.Secret, error)
84103
}
85104
}
86105

87-
secret, err := f.secretLister.Get(f.SecretName)
106+
secret, err := secretLister.Get(f.SecretName)
88107
if err != nil {
89108
if errors.IsNotFound(err) {
90109
secret = &v1.Secret{}
@@ -93,36 +112,85 @@ func (f *apiFactory) getConfigMapAndSecret() (*v1.ConfigMap, *v1.Secret, error)
93112
}
94113
}
95114

115+
if errors.IsNotFound(err) {
116+
return cm, secret, nil
117+
}
96118
return cm, secret, err
97119
}
98120

99-
func (f *apiFactory) invalidateCache() {
100-
f.lock.Lock()
101-
defer f.lock.Unlock()
102-
f.api = nil
121+
func (f *apiFactory) getConfigMapAndSecret(namespace string) (*v1.ConfigMap, *v1.Secret, error) {
122+
cmLister := f.cmLister.ConfigMaps(namespace)
123+
secretLister := f.secretLister.Secrets(namespace)
124+
125+
return f.getConfigMapAndSecretWithListers(cmLister, secretLister)
103126
}
104127

105128
func (f *apiFactory) GetAPI() (API, error) {
129+
apis, err := f.GetAPIsFromNamespace(f.Settings.DefaultNamespace)
130+
if err != nil {
131+
return nil, err
132+
}
133+
return apis[f.Settings.DefaultNamespace], nil
134+
}
135+
136+
// GetAPIsFromNamespace returns a map of API instances for a given namespace, if there is an error in populating the API for a namespace, it will be skipped
137+
// and the error will be logged and returned. The caller is responsible for handling the error. The API map will also be returned with any successfully constructed
138+
// API instances.
139+
func (f *apiFactory) GetAPIsFromNamespace(namespace string) (map[string]API, error) {
106140
f.lock.Lock()
107141
defer f.lock.Unlock()
108-
if f.api == nil {
109-
cm, secret, err := f.getConfigMapAndSecret()
110-
if err != nil {
111-
return nil, err
112-
}
113-
cfg, err := ParseConfig(cm, secret)
114-
if err != nil {
115-
return nil, err
116-
}
117-
getVars, err := f.InitGetVars(cfg, cm, secret)
118-
if err != nil {
119-
return nil, err
120-
}
121-
api, err := NewAPI(*cfg, getVars)
122-
if err != nil {
123-
return nil, err
142+
143+
apis := make(map[string]API)
144+
145+
// namespaces to look for notification configurations
146+
namespaces := []string{namespace}
147+
if !slices.Contains(namespaces, f.Settings.DefaultNamespace) {
148+
namespaces = append(namespaces, f.Settings.DefaultNamespace)
149+
}
150+
151+
errors := []error{}
152+
for _, namespace := range namespaces {
153+
if f.apiMap[namespace] == nil {
154+
api, err := f.getApiFromNamespace(namespace)
155+
if err != nil {
156+
log.Error("error getting api from namespace: ", namespace, " error: ", err)
157+
errors = append(errors, err)
158+
continue
159+
}
160+
f.apiMap[namespace] = api
161+
apis[namespace] = f.apiMap[namespace]
162+
} else {
163+
apis[namespace] = f.apiMap[namespace]
124164
}
125-
f.api = api
126165
}
127-
return f.api, nil
166+
167+
if len(errors) > 0 {
168+
return apis, fmt.Errorf("errors getting apis: %s", errors)
169+
}
170+
return apis, nil
171+
}
172+
173+
func (f *apiFactory) getApiFromNamespace(namespace string) (API, error) {
174+
cm, secret, err := f.getConfigMapAndSecret(namespace)
175+
if err != nil {
176+
return nil, err
177+
}
178+
return f.getApiFromConfigmapAndSecret(cm, secret)
179+
180+
}
181+
182+
func (f *apiFactory) getApiFromConfigmapAndSecret(cm *v1.ConfigMap, secret *v1.Secret) (API, error) {
183+
cfg, err := ParseConfig(cm, secret)
184+
if err != nil {
185+
return nil, err
186+
}
187+
getVars, err := f.InitGetVars(cfg, cm, secret)
188+
if err != nil {
189+
return nil, err
190+
}
191+
api, err := NewAPI(*cfg, getVars)
192+
if err != nil {
193+
return nil, err
194+
}
195+
return api, nil
128196
}

pkg/controller/controller.go

+50-18
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,18 @@ func NewController(
142142
return ctrl
143143
}
144144

145+
// NewControllerWithNamespaceSupport For self-service notification
146+
func NewControllerWithNamespaceSupport(
147+
client dynamic.NamespaceableResourceInterface,
148+
informer cache.SharedIndexInformer,
149+
apiFactory api.Factory,
150+
opts ...Opts,
151+
) *notificationController {
152+
ctrl := NewController(client, informer, apiFactory, opts...)
153+
ctrl.namespaceSupport = true
154+
return ctrl
155+
}
156+
145157
type notificationController struct {
146158
client dynamic.NamespaceableResourceInterface
147159
informer cache.SharedIndexInformer
@@ -152,6 +164,7 @@ type notificationController struct {
152164
alterDestinations func(obj v1.Object, destinations services.Destinations, cfg api.Config) services.Destinations
153165
toUnstructured func(obj v1.Object) (*unstructured.Unstructured, error)
154166
eventCallback func(eventSequence NotificationEventSequence)
167+
namespaceSupport bool
155168
}
156169

157170
func (c *notificationController) Run(threadiness int, stopCh <-chan struct{}) {
@@ -169,12 +182,9 @@ func (c *notificationController) Run(threadiness int, stopCh <-chan struct{}) {
169182
log.Warn("Controller has stopped.")
170183
}
171184

172-
func (c *notificationController) processResource(resource v1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) (map[string]string, error) {
185+
func (c *notificationController) processResourceWithAPI(api api.API, resource v1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) (map[string]string, error) {
186+
apiNamespace := api.GetConfig().Namespace
173187
notificationsState := NewStateFromRes(resource)
174-
api, err := c.apiFactory.GetAPI()
175-
if err != nil {
176-
return nil, err
177-
}
178188

179189
destinations := c.getDestinations(resource, api.GetConfig())
180190
if len(destinations) == 0 {
@@ -189,8 +199,8 @@ func (c *notificationController) processResource(resource v1.Object, logEntry *l
189199
for trigger, destinations := range destinations {
190200
res, err := api.RunTrigger(trigger, un.Object)
191201
if err != nil {
192-
logEntry.Debugf("Failed to execute condition of trigger %s: %v", trigger, err)
193-
eventSequence.addWarning(fmt.Errorf("failed to execute condition of trigger %s: %v", trigger, err))
202+
logEntry.Debugf("Failed to execute condition of trigger %s: %v using the configuration in namespace %s", trigger, err, apiNamespace)
203+
eventSequence.addWarning(fmt.Errorf("failed to execute condition of trigger %s: %v using the configuration in namespace %s", trigger, err, apiNamespace))
194204
}
195205
logEntry.Infof("Trigger %s result: %v", trigger, res)
196206

@@ -206,22 +216,22 @@ func (c *notificationController) processResource(resource v1.Object, logEntry *l
206216

207217
for _, to := range destinations {
208218
if changed := notificationsState.SetAlreadyNotified(trigger, cr, to, true); !changed {
209-
logEntry.Infof("Notification about condition '%s.%s' already sent to '%v'", trigger, cr.Key, to)
219+
logEntry.Infof("Notification about condition '%s.%s' already sent to '%v' using the configuration in namespace %s", trigger, cr.Key, to, apiNamespace)
210220
eventSequence.addDelivered(NotificationDelivery{
211221
Trigger: trigger,
212222
Destination: to,
213223
AlreadyNotified: true,
214224
})
215225
} else {
216-
logEntry.Infof("Sending notification about condition '%s.%s' to '%v'", trigger, cr.Key, to)
226+
logEntry.Infof("Sending notification about condition '%s.%s' to '%v' using the configuration in namespace %s", trigger, cr.Key, to, apiNamespace)
217227
if err := api.Send(un.Object, cr.Templates, to); err != nil {
218-
logEntry.Errorf("Failed to notify recipient %s defined in resource %s/%s: %v",
219-
to, resource.GetNamespace(), resource.GetName(), err)
228+
logEntry.Errorf("Failed to notify recipient %s defined in resource %s/%s: %v using the configuration in namespace %s",
229+
to, resource.GetNamespace(), resource.GetName(), err, apiNamespace)
220230
notificationsState.SetAlreadyNotified(trigger, cr, to, false)
221231
c.metricsRegistry.IncDeliveriesCounter(trigger, to.Service, false)
222-
eventSequence.addError(fmt.Errorf("failed to deliver notification %s to %s: %v", trigger, to, err))
232+
eventSequence.addError(fmt.Errorf("failed to deliver notification %s to %s: %v using the configuration in namespace %s", trigger, to, err, apiNamespace))
223233
} else {
224-
logEntry.Debugf("Notification %s was sent", to.Recipient)
234+
logEntry.Debugf("Notification %s was sent using the configuration in namespace %s", to.Recipient, apiNamespace)
225235
c.metricsRegistry.IncDeliveriesCounter(trigger, to.Service, true)
226236
eventSequence.addDelivered(NotificationDelivery{
227237
Trigger: trigger,
@@ -294,7 +304,31 @@ func (c *notificationController) processQueueItem() (processNext bool) {
294304
}
295305
}
296306

297-
annotations, err := c.processResource(resource, logEntry, &eventSequence)
307+
if !c.namespaceSupport {
308+
api, err := c.apiFactory.GetAPI()
309+
if err != nil {
310+
logEntry.Errorf("Failed to get api: %v", err)
311+
eventSequence.addError(err)
312+
return
313+
}
314+
c.processResource(api, resource, logEntry, &eventSequence)
315+
} else {
316+
apisWithNamespace, err := c.apiFactory.GetAPIsFromNamespace(resource.GetNamespace())
317+
if err != nil {
318+
logEntry.Errorf("Failed to get api with namespace: %v", err)
319+
eventSequence.addError(err)
320+
}
321+
for _, api := range apisWithNamespace {
322+
c.processResource(api, resource, logEntry, &eventSequence)
323+
}
324+
}
325+
logEntry.Info("Processing completed")
326+
327+
return
328+
}
329+
330+
func (c *notificationController) processResource(api api.API, resource v1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) {
331+
annotations, err := c.processResourceWithAPI(api, resource, logEntry, eventSequence)
298332
if err != nil {
299333
logEntry.Errorf("Failed to process: %v", err)
300334
eventSequence.addError(err)
@@ -307,7 +341,7 @@ func (c *notificationController) processQueueItem() (processNext bool) {
307341
annotationsPatch[k] = v
308342
}
309343
for k := range resource.GetAnnotations() {
310-
if _, ok = annotations[k]; !ok {
344+
if _, ok := annotations[k]; !ok {
311345
annotationsPatch[k] = nil
312346
}
313347
}
@@ -329,11 +363,9 @@ func (c *notificationController) processQueueItem() (processNext bool) {
329363
if err := c.informer.GetStore().Update(resource); err != nil {
330364
logEntry.Warnf("Failed to store update resource in informer: %v", err)
331365
eventSequence.addWarning(fmt.Errorf("failed to store update resource in informer: %v", err))
366+
return
332367
}
333368
}
334-
logEntry.Info("Processing completed")
335-
336-
return
337369
}
338370

339371
func mapsEqual(first, second map[string]string) bool {

0 commit comments

Comments
 (0)