From e277c3485d77f5846cdd00c5a0cb00a079ddfb38 Mon Sep 17 00:00:00 2001 From: May Zhang <may_zhang@intuit.com> Date: Thu, 4 May 2023 10:43:10 -0700 Subject: [PATCH 01/18] self-service from event --- pkg/api/factory.go | 91 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 88 insertions(+), 3 deletions(-) diff --git a/pkg/api/factory.go b/pkg/api/factory.go index 9fb09853..c95b3131 100644 --- a/pkg/api/factory.go +++ b/pkg/api/factory.go @@ -5,6 +5,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" @@ -18,6 +19,8 @@ type Settings struct { SecretName string // InitGetVars returns a function that produces notifications context variables InitGetVars func(cfg *Config, configMap *v1.ConfigMap, secret *v1.Secret) (GetVars, error) + // Default namespace for ConfigMap and Secret + Namespace string } // Factory creates an API instance @@ -25,6 +28,11 @@ type Factory interface { GetAPI() (API, error) } +// Factory creates an API instance +type MayFactory interface { + GetAPIWithNamespace(namespace string) (API, error) +} + type apiFactory struct { Settings @@ -32,13 +40,20 @@ type apiFactory struct { secretLister v1listers.SecretNamespaceLister lock sync.Mutex api API + + cmInformer cache.SharedIndexInformer + secretsInformer cache.SharedIndexInformer + apiMap map[string]API } func NewFactory(settings Settings, namespace string, secretsInformer cache.SharedIndexInformer, cmInformer cache.SharedIndexInformer) *apiFactory { factory := &apiFactory{ - Settings: settings, - cmLister: v1listers.NewConfigMapLister(cmInformer.GetIndexer()).ConfigMaps(namespace), - secretLister: v1listers.NewSecretLister(secretsInformer.GetIndexer()).Secrets(namespace), + Settings: settings, + cmLister: v1listers.NewConfigMapLister(cmInformer.GetIndexer()).ConfigMaps(namespace), + secretLister: v1listers.NewSecretLister(secretsInformer.GetIndexer()).Secrets(namespace), + cmInformer: cmInformer, + secretsInformer: secretsInformer, + apiMap: make(map[string]API), } secretsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -96,10 +111,39 @@ func (f *apiFactory) getConfigMapAndSecret() (*v1.ConfigMap, *v1.Secret, error) return cm, secret, err } +func (f *apiFactory) getConfigMapAndSecretWithNamespace(namespace string) (*v1.ConfigMap, *v1.Secret, error) { + + cmLister := v1listers.NewConfigMapLister(f.cmInformer.GetIndexer()).ConfigMaps(namespace) + secretLister := v1listers.NewSecretLister(f.secretsInformer.GetIndexer()).Secrets(namespace) + + cm, err := cmLister.Get(f.ConfigMapName) + if err != nil { + if errors.IsNotFound(err) { + cm = &v1.ConfigMap{} + } else { + return nil, nil, err + } + } + + secret, err := secretLister.Get(f.SecretName) + if err != nil { + if errors.IsNotFound(err) { + secret = &v1.Secret{} + } else { + return nil, nil, err + } + } + + return cm, secret, err +} + func (f *apiFactory) invalidateCache() { f.lock.Lock() defer f.lock.Unlock() f.api = nil + for namespace := range f.apiMap { + f.apiMap[namespace] = nil + } } func (f *apiFactory) GetAPI() (API, error) { @@ -126,3 +170,44 @@ func (f *apiFactory) GetAPI() (API, error) { } return f.api, nil } + +func (f *apiFactory) GetAPIWithNamespace(namespace string) (API, error) { + f.lock.Lock() + defer f.lock.Unlock() + namespaceHasConfig := namespace + + if f.apiMap[namespaceHasConfig] != nil { + return f.apiMap[namespaceHasConfig], nil + } + + cm, secret, err := f.getConfigMapAndSecretWithNamespace(namespaceHasConfig) + if err != nil { + if !k8serrors.IsNotFound(err) { + return nil, err + } + // If could not find it in namespace, try the namespace from settings + namespaceHasConfig = f.Settings.Namespace + if f.apiMap[namespaceHasConfig] != nil { + return f.apiMap[namespaceHasConfig], nil + } + cm, secret, err = f.getConfigMapAndSecretWithNamespace(namespaceHasConfig) + if err != nil { + return nil, err + } + } + cfg, err := ParseConfig(cm, secret) + if err != nil { + return nil, err + } + getVars, err := f.InitGetVars(cfg, cm, secret) + if err != nil { + return nil, err + } + api, err := NewAPI(*cfg, getVars) + if err != nil { + return nil, err + } + f.apiMap[namespaceHasConfig] = api + + return f.apiMap[namespaceHasConfig], nil +} From d07364d42a601d713134120e22d5f760d553eb95 Mon Sep 17 00:00:00 2001 From: Eng Zer Jun <engzerjun@gmail.com> Date: Fri, 5 May 2023 21:32:18 +0800 Subject: [PATCH 02/18] chore: replace github.com/ghodss/yaml with sigs.k8s.io/yaml (#175) At the time of making this commit, the package `github.com/ghodss/yaml` is no longer actively maintained. `sigs.k8s.io/yaml` is a permanent fork of `ghodss/yaml` and is actively maintained by Kubernetes SIG. Reference: https://github.com/argoproj/argo-cd/pull/13292#issuecomment-1515918894 Signed-off-by: Eng Zer Jun <engzerjun@gmail.com> Co-authored-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> --- go.mod | 3 +-- go.sum | 4 ++-- pkg/api/config.go | 2 +- pkg/cmd/context.go | 2 +- pkg/cmd/trigger_test.go | 2 +- pkg/services/googlechat.go | 2 +- pkg/services/services.go | 2 +- pkg/subscriptions/annotations.go | 2 +- pkg/util/misc/misc.go | 2 +- 9 files changed, 10 insertions(+), 11 deletions(-) diff --git a/go.mod b/go.mod index 61d88565..e66f2ea3 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/RocketChat/Rocket.Chat.Go.SDK v0.0.0-20210112200207-10ab4d695d60 github.com/antonmedv/expr v1.12.5 github.com/bradleyfalzon/ghinstallation/v2 v2.1.0 - github.com/ghodss/yaml v1.0.0 github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1 github.com/golang/mock v1.6.0 github.com/google/go-github/v41 v41.0.0 @@ -26,6 +25,7 @@ require ( k8s.io/api v0.23.3 k8s.io/apimachinery v0.23.3 k8s.io/client-go v0.23.3 + sigs.k8s.io/yaml v1.3.0 ) require ( @@ -85,7 +85,6 @@ require ( k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect - sigs.k8s.io/yaml v1.2.0 // indirect ) replace github.com/prometheus/client_golang => github.com/prometheus/client_golang v1.14.0 diff --git a/go.sum b/go.sum index acf361d3..2298bb4f 100644 --- a/go.sum +++ b/go.sum @@ -128,7 +128,6 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/getkin/kin-openapi v0.76.0/go.mod h1:660oXbgy5JFMKreazJaQTw7o+X00qeSyhcnluiMv+Xg= -github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -896,5 +895,6 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK sigs.k8s.io/structured-merge-diff/v4 v4.2.1 h1:bKCqE9GvQ5tiVHn5rfn1r+yao3aLQEaLzkkmAkf+A6Y= sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4= sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= -sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q= sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= +sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= +sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= diff --git a/pkg/api/config.go b/pkg/api/config.go index 95892791..eefc41db 100644 --- a/pkg/api/config.go +++ b/pkg/api/config.go @@ -9,11 +9,11 @@ import ( "github.com/argoproj/notifications-engine/pkg/subscriptions" "github.com/argoproj/notifications-engine/pkg/triggers" - "github.com/ghodss/yaml" log "github.com/sirupsen/logrus" yaml3 "gopkg.in/yaml.v3" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" + "sigs.k8s.io/yaml" ) type ServiceFactory func() (services.NotificationService, error) diff --git a/pkg/cmd/context.go b/pkg/cmd/context.go index 7bad9df7..c8ee688e 100644 --- a/pkg/cmd/context.go +++ b/pkg/cmd/context.go @@ -9,7 +9,6 @@ import ( "os" "time" - "github.com/ghodss/yaml" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -20,6 +19,7 @@ import ( informersv1 "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" + "sigs.k8s.io/yaml" "github.com/argoproj/notifications-engine/pkg/api" ) diff --git a/pkg/cmd/trigger_test.go b/pkg/cmd/trigger_test.go index 37e655b1..a185bbd5 100644 --- a/pkg/cmd/trigger_test.go +++ b/pkg/cmd/trigger_test.go @@ -11,7 +11,6 @@ import ( "github.com/argoproj/notifications-engine/pkg/api" "github.com/argoproj/notifications-engine/pkg/services" - "github.com/ghodss/yaml" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -20,6 +19,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" dynamicfake "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/kubernetes/fake" + "sigs.k8s.io/yaml" ) func newTestResource(name string) *unstructured.Unstructured { diff --git a/pkg/services/googlechat.go b/pkg/services/googlechat.go index a137072f..a574a7e9 100644 --- a/pkg/services/googlechat.go +++ b/pkg/services/googlechat.go @@ -9,8 +9,8 @@ import ( "net/url" texttemplate "text/template" - "github.com/ghodss/yaml" log "github.com/sirupsen/logrus" + "sigs.k8s.io/yaml" httputil "github.com/argoproj/notifications-engine/pkg/util/http" ) diff --git a/pkg/services/services.go b/pkg/services/services.go index 85482481..ced22bbe 100644 --- a/pkg/services/services.go +++ b/pkg/services/services.go @@ -8,7 +8,7 @@ import ( texttemplate "text/template" _ "time/tzdata" - "github.com/ghodss/yaml" + "sigs.k8s.io/yaml" ) type Notification struct { diff --git a/pkg/subscriptions/annotations.go b/pkg/subscriptions/annotations.go index 312e68c7..ecedac2e 100644 --- a/pkg/subscriptions/annotations.go +++ b/pkg/subscriptions/annotations.go @@ -4,8 +4,8 @@ import ( "fmt" "strings" - "github.com/ghodss/yaml" log "github.com/sirupsen/logrus" + "sigs.k8s.io/yaml" "github.com/argoproj/notifications-engine/pkg/services" ) diff --git a/pkg/util/misc/misc.go b/pkg/util/misc/misc.go index f7850764..e4fcf928 100644 --- a/pkg/util/misc/misc.go +++ b/pkg/util/misc/misc.go @@ -7,7 +7,7 @@ import ( "reflect" "sort" - "github.com/ghodss/yaml" + "sigs.k8s.io/yaml" ) func PrintFormatted(input interface{}, output string, out io.Writer) error { From 905c8841e0b5df7d8560396f60e62fe6f846a95d Mon Sep 17 00:00:00 2001 From: Eric Tendian <erictendian@gmail.com> Date: Mon, 8 May 2023 09:11:06 -0500 Subject: [PATCH 03/18] feat: Adding new PagerDuty integration based on Events API v2 (#105) Signed-off-by: Eric Tendian <erictendian@gmail.com> --- docs/services/pagerduty_v2.md | 78 +++++++++ pkg/services/pagerdutyv2.go | 165 +++++++++++++++++++ pkg/services/pagerdutyv2_test.go | 272 +++++++++++++++++++++++++++++++ pkg/services/services.go | 11 ++ 4 files changed, 526 insertions(+) create mode 100644 docs/services/pagerduty_v2.md create mode 100644 pkg/services/pagerdutyv2.go create mode 100644 pkg/services/pagerdutyv2_test.go diff --git a/docs/services/pagerduty_v2.md b/docs/services/pagerduty_v2.md new file mode 100644 index 00000000..21e8d942 --- /dev/null +++ b/docs/services/pagerduty_v2.md @@ -0,0 +1,78 @@ +# PagerDuty V2 + +## Parameters + +The PagerDuty notification service is used to trigger PagerDuty events and requires specifying the following settings: + +* `serviceKeys` - a dictionary with the following structure: + * `service-name: $pagerduty-key-service-name` where `service-name` is the name you want to use for the service to make events for, and `$pagerduty-key-service-name` is a reference to the secret that contains the actual PagerDuty integration key (Events API v2 integration) + +If you want multiple Argo apps to trigger events to their respective PagerDuty services, create an integration key in each service you want to setup alerts for. + +To create a PagerDuty integration key, [follow these instructions](https://support.pagerduty.com/docs/services-and-integrations#create-a-generic-events-api-integration) to add an Events API v2 integration to the service of your choice. + +## Configuration + +The following snippet contains sample PagerDuty service configuration. It assumes the service you want to alert on is called `my-service`. + +```yaml +apiVersion: v1 +kind: Secret +metadata: + name: <secret-name> +stringData: + pagerduty-key-my-service: <pd-integration-key> +``` + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: <config-map-name> +data: + service.pagerdutyv2: | + serviceKeys: + my-service: $pagerduty-key-my-service +``` + +## Template + +[Notification templates](../templates.md) support specifying subject for PagerDuty notifications: + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: <config-map-name> +data: + template.rollout-aborted: | + message: Rollout {{.rollout.metadata.name}} is aborted. + pagerdutyv2: + summary: "Rollout {{.rollout.metadata.name}} is aborted." + severity: "critical" + source: "{{.rollout.metadata.name}}" +``` + +The parameters for the PagerDuty configuration in the template generally match with the payload for the Events API v2 endpoint. All parameters are strings. + +* `summary` - (required) A brief text summary of the event, used to generate the summaries/titles of any associated alerts. +* `severity` - (required) The perceived severity of the status the event is describing with respect to the affected system. Allowed values: `critical`, `warning`, `error`, `info` +* `source` - (required) The unique location of the affected system, preferably a hostname or FQDN. +* `component` - Component of the source machine that is responsible for the event. +* `group` - Logical grouping of components of a service. +* `class` - The class/type of the event. +* `url` - The URL that should be used for the link "View in ArgoCD" in PagerDuty. + +The `timestamp` and `custom_details` parameters are not currently supported. + +## Annotation + +Annotation sample for PagerDuty notifications: + +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Rollout +metadata: + annotations: + notifications.argoproj.io/subscribe.on-rollout-aborted.pagerdutyv2: "<serviceID for Pagerduty>" +``` diff --git a/pkg/services/pagerdutyv2.go b/pkg/services/pagerdutyv2.go new file mode 100644 index 00000000..91641e54 --- /dev/null +++ b/pkg/services/pagerdutyv2.go @@ -0,0 +1,165 @@ +package services + +import ( + "bytes" + "context" + "fmt" + texttemplate "text/template" + + "github.com/PagerDuty/go-pagerduty" + log "github.com/sirupsen/logrus" +) + +type PagerDutyV2Notification struct { + Summary string `json:"summary"` + Severity string `json:"severity"` + Source string `json:"source"` + Component string `json:"component,omitempty"` + Group string `json:"group,omitempty"` + Class string `json:"class,omitempty"` + URL string `json:"url"` +} + +type PagerdutyV2Options struct { + ServiceKeys map[string]string `json:"serviceKeys"` +} + +func (p *PagerDutyV2Notification) GetTemplater(name string, f texttemplate.FuncMap) (Templater, error) { + summary, err := texttemplate.New(name).Funcs(f).Parse(p.Summary) + if err != nil { + return nil, err + } + severity, err := texttemplate.New(name).Funcs(f).Parse(p.Severity) + if err != nil { + return nil, err + } + source, err := texttemplate.New(name).Funcs(f).Parse(p.Source) + if err != nil { + return nil, err + } + component, err := texttemplate.New(name).Funcs(f).Parse(p.Component) + if err != nil { + return nil, err + } + group, err := texttemplate.New(name).Funcs(f).Parse(p.Group) + if err != nil { + return nil, err + } + class, err := texttemplate.New(name).Funcs(f).Parse(p.Class) + if err != nil { + return nil, err + } + url, err := texttemplate.New(name).Funcs(f).Parse(p.URL) + if err != nil { + return nil, err + } + + return func(notification *Notification, vars map[string]interface{}) error { + if notification.PagerdutyV2 == nil { + notification.PagerdutyV2 = &PagerDutyV2Notification{} + } + var summaryData bytes.Buffer + if err := summary.Execute(&summaryData, vars); err != nil { + return err + } + notification.PagerdutyV2.Summary = summaryData.String() + + var severityData bytes.Buffer + if err := severity.Execute(&severityData, vars); err != nil { + return err + } + notification.PagerdutyV2.Severity = severityData.String() + + var sourceData bytes.Buffer + if err := source.Execute(&sourceData, vars); err != nil { + return err + } + notification.PagerdutyV2.Source = sourceData.String() + + var componentData bytes.Buffer + if err := component.Execute(&componentData, vars); err != nil { + return err + } + notification.PagerdutyV2.Component = componentData.String() + + var groupData bytes.Buffer + if err := group.Execute(&groupData, vars); err != nil { + return err + } + notification.PagerdutyV2.Group = groupData.String() + + var classData bytes.Buffer + if err := class.Execute(&classData, vars); err != nil { + return err + } + notification.PagerdutyV2.Class = classData.String() + + var urlData bytes.Buffer + if err := url.Execute(&urlData, vars); err != nil { + return err + } + notification.PagerdutyV2.URL = urlData.String() + + return nil + }, nil +} + +func NewPagerdutyV2Service(opts PagerdutyV2Options) NotificationService { + return &pagerdutyV2Service{opts: opts} +} + +type pagerdutyV2Service struct { + opts PagerdutyV2Options +} + +func (p pagerdutyV2Service) Send(notification Notification, dest Destination) error { + routingKey, ok := p.opts.ServiceKeys[dest.Recipient] + if !ok { + return fmt.Errorf("no API key configured for recipient %s", dest.Recipient) + } + + if notification.PagerdutyV2 == nil { + return fmt.Errorf("no config found for pagerdutyv2") + } + + event := buildEvent(routingKey, notification) + + response, err := pagerduty.ManageEventWithContext(context.TODO(), event) + if err != nil { + log.Errorf("Error: %v", err) + return err + } + log.Debugf("PagerDuty event triggered succesfully. Status: %v, Message: %v", response.Status, response.Message) + return nil +} + +func buildEvent(routingKey string, notification Notification) pagerduty.V2Event { + payload := pagerduty.V2Payload{ + Summary: notification.PagerdutyV2.Summary, + Severity: notification.PagerdutyV2.Severity, + Source: notification.PagerdutyV2.Source, + } + + if len(notification.PagerdutyV2.Component) > 0 { + payload.Component = notification.PagerdutyV2.Component + } + if len(notification.PagerdutyV2.Group) > 0 { + payload.Group = notification.PagerdutyV2.Group + } + if len(notification.PagerdutyV2.Class) > 0 { + payload.Class = notification.PagerdutyV2.Class + } + + event := pagerduty.V2Event{ + RoutingKey: routingKey, + Action: "trigger", + Payload: &payload, + Client: "ArgoCD", + } + + if len(notification.PagerdutyV2.URL) > 0 { + event.ClientURL = notification.PagerdutyV2.URL + } + + return event +} diff --git a/pkg/services/pagerdutyv2_test.go b/pkg/services/pagerdutyv2_test.go new file mode 100644 index 00000000..b73810c6 --- /dev/null +++ b/pkg/services/pagerdutyv2_test.go @@ -0,0 +1,272 @@ +package services + +import ( + "errors" + "testing" + "text/template" + + "github.com/stretchr/testify/assert" +) + +func TestGetTemplater_PagerDutyV2(t *testing.T) { + t.Run("all parameters specified", func(t *testing.T) { + n := Notification{ + PagerdutyV2: &PagerDutyV2Notification{ + Summary: "{{.summary}}", + Severity: "{{.severity}}", + Source: "{{.source}}", + Component: "{{.component}}", + Group: "{{.group}}", + Class: "{{.class}}", + URL: "{{.url}}", + }, + } + + templater, err := n.GetTemplater("", template.FuncMap{}) + if !assert.NoError(t, err) { + return + } + + var notification Notification + + err = templater(¬ification, map[string]interface{}{ + "summary": "hello", + "severity": "critical", + "source": "my-app", + "component": "test-component", + "group": "test-group", + "class": "test-class", + "url": "http://example.com", + }) + + if !assert.NoError(t, err) { + return + } + + assert.Equal(t, "hello", notification.PagerdutyV2.Summary) + assert.Equal(t, "critical", notification.PagerdutyV2.Severity) + assert.Equal(t, "my-app", notification.PagerdutyV2.Source) + assert.Equal(t, "test-component", notification.PagerdutyV2.Component) + assert.Equal(t, "test-group", notification.PagerdutyV2.Group) + assert.Equal(t, "test-class", notification.PagerdutyV2.Class) + assert.Equal(t, "http://example.com", notification.PagerdutyV2.URL) + }) + + t.Run("handle error for summary", func(t *testing.T) { + n := Notification{ + PagerdutyV2: &PagerDutyV2Notification{ + Summary: "{{.summary}", + Severity: "{{.severity}", + Source: "{{.source}", + Component: "{{.component}", + Group: "{{.group}", + Class: "{{.class}", + URL: "{{.url}", + }, + } + + _, err := n.GetTemplater("", template.FuncMap{}) + assert.Error(t, err) + }) + + t.Run("handle error for severity", func(t *testing.T) { + n := Notification{ + PagerdutyV2: &PagerDutyV2Notification{ + Summary: "{{.summary}}", + Severity: "{{.severity}", + Source: "{{.source}", + Component: "{{.component}", + Group: "{{.group}", + Class: "{{.class}", + URL: "{{.url}", + }, + } + + _, err := n.GetTemplater("", template.FuncMap{}) + assert.Error(t, err) + }) + + t.Run("handle error for source", func(t *testing.T) { + n := Notification{ + PagerdutyV2: &PagerDutyV2Notification{ + Summary: "{{.summary}}", + Severity: "{{.severity}}", + Source: "{{.source}", + Component: "{{.component}", + Group: "{{.group}", + Class: "{{.class}", + URL: "{{.url}", + }, + } + + _, err := n.GetTemplater("", template.FuncMap{}) + assert.Error(t, err) + }) + + t.Run("handle error for component", func(t *testing.T) { + n := Notification{ + PagerdutyV2: &PagerDutyV2Notification{ + Summary: "{{.summary}}", + Severity: "{{.severity}}", + Source: "{{.source}}", + Component: "{{.component}", + Group: "{{.group}", + Class: "{{.class}", + URL: "{{.url}", + }, + } + + _, err := n.GetTemplater("", template.FuncMap{}) + assert.Error(t, err) + }) + + t.Run("handle error for group", func(t *testing.T) { + n := Notification{ + PagerdutyV2: &PagerDutyV2Notification{ + Summary: "{{.summary}}", + Severity: "{{.severity}}", + Source: "{{.source}}", + Component: "{{.component}}", + Group: "{{.group}", + Class: "{{.class}", + URL: "{{.url}", + }, + } + + _, err := n.GetTemplater("", template.FuncMap{}) + assert.Error(t, err) + }) + + t.Run("handle error for class", func(t *testing.T) { + n := Notification{ + PagerdutyV2: &PagerDutyV2Notification{ + Summary: "{{.summary}}", + Severity: "{{.severity}}", + Source: "{{.source}}", + Component: "{{.component}}", + Group: "{{.group}}", + Class: "{{.class}", + URL: "{{.url}", + }, + } + + _, err := n.GetTemplater("", template.FuncMap{}) + assert.Error(t, err) + }) + + t.Run("handle error for url", func(t *testing.T) { + n := Notification{ + PagerdutyV2: &PagerDutyV2Notification{ + Summary: "{{.summary}}", + Severity: "{{.severity}}", + Source: "{{.source}}", + Component: "{{.component}}", + Group: "{{.group}}", + Class: "{{.class}}", + URL: "{{.url}", + }, + } + + _, err := n.GetTemplater("", template.FuncMap{}) + assert.Error(t, err) + }) + + t.Run("only required parameters specified", func(t *testing.T) { + n := Notification{ + PagerdutyV2: &PagerDutyV2Notification{ + Summary: "{{.summary}}", Severity: "{{.severity}}", Source: "{{.source}}", + }, + } + + templater, err := n.GetTemplater("", template.FuncMap{}) + if !assert.NoError(t, err) { + return + } + + var notification Notification + + err = templater(¬ification, map[string]interface{}{ + "summary": "hello", + "severity": "critical", + "source": "my-app", + }) + + if !assert.NoError(t, err) { + return + } + + assert.Equal(t, "hello", notification.PagerdutyV2.Summary) + assert.Equal(t, "critical", notification.PagerdutyV2.Severity) + assert.Equal(t, "my-app", notification.PagerdutyV2.Source) + assert.Equal(t, "", notification.PagerdutyV2.Component) + assert.Equal(t, "", notification.PagerdutyV2.Group) + assert.Equal(t, "", notification.PagerdutyV2.Class) + }) +} + +func TestSend_PagerDuty(t *testing.T) { + t.Run("builds event with full payload", func(t *testing.T) { + routingKey := "routing-key" + summary := "test-app failed to deploy" + severity := "error" + source := "test-app" + component := "test-component" + group := "platform" + class := "test-class" + url := "https://www.example.com/" + + event := buildEvent(routingKey, Notification{ + Message: "message", + PagerdutyV2: &PagerDutyV2Notification{ + Summary: summary, + Severity: severity, + Source: source, + Component: component, + Group: group, + Class: class, + URL: url, + }, + }) + + assert.Equal(t, routingKey, event.RoutingKey) + assert.Equal(t, summary, event.Payload.Summary) + assert.Equal(t, severity, event.Payload.Severity) + assert.Equal(t, source, event.Payload.Source) + assert.Equal(t, component, event.Payload.Component) + assert.Equal(t, group, event.Payload.Group) + assert.Equal(t, class, event.Payload.Class) + assert.Equal(t, url, event.ClientURL) + }) + + t.Run("missing config", func(t *testing.T) { + service := NewPagerdutyV2Service(PagerdutyV2Options{ + ServiceKeys: map[string]string{ + "test-service": "key", + }, + }) + err := service.Send(Notification{ + Message: "message", + }, Destination{ + Service: "pagerdutyv2", + Recipient: "test-service", + }) + + if assert.Error(t, err) { + assert.Equal(t, err, errors.New("no config found for pagerdutyv2")) + } + }) + + t.Run("missing apiKey", func(t *testing.T) { + service := NewPagerdutyV2Service(PagerdutyV2Options{}) + err := service.Send(Notification{ + Message: "message", + }, Destination{ + Service: "pagerduty", + Recipient: "test-service", + }) + + if assert.Error(t, err) { + assert.Equal(t, err, errors.New("no API key configured for recipient test-service")) + } + }) +} diff --git a/pkg/services/services.go b/pkg/services/services.go index ced22bbe..d919981e 100644 --- a/pkg/services/services.go +++ b/pkg/services/services.go @@ -24,6 +24,7 @@ type Notification struct { Alertmanager *AlertmanagerNotification `json:"alertmanager,omitempty"` GoogleChat *GoogleChatNotification `json:"googlechat,omitempty"` Pagerduty *PagerDutyNotification `json:"pagerduty,omitempty"` + PagerdutyV2 *PagerDutyV2Notification `json:"pagerdutyv2,omitempty"` Newrelic *NewrelicNotification `json:"newrelic,omitempty"` } @@ -97,6 +98,10 @@ func (n *Notification) GetTemplater(name string, f texttemplate.FuncMap) (Templa sources = append(sources, n.Pagerduty) } + if n.PagerdutyV2 != nil { + sources = append(sources, n.PagerdutyV2) + } + if n.Newrelic != nil { sources = append(sources, n.Newrelic) } @@ -197,6 +202,12 @@ func NewService(serviceType string, optsData []byte) (NotificationService, error return nil, err } return NewPagerdutyService(opts), nil + case "pagerdutyv2": + var opts PagerdutyV2Options + if err := yaml.Unmarshal(optsData, &opts); err != nil { + return nil, err + } + return NewPagerdutyV2Service(opts), nil case "newrelic": var opts NewrelicOptions if err := yaml.Unmarshal(optsData, &opts); err != nil { From 560c56b29760c891f6ba05522b5425ebbba2366a Mon Sep 17 00:00:00 2001 From: May Zhang <may_zhang@intuit.com> Date: Tue, 9 May 2023 10:05:42 -0700 Subject: [PATCH 04/18] self-service from event --- pkg/api/factory.go | 76 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/pkg/api/factory.go b/pkg/api/factory.go index c95b3131..7231b7fb 100644 --- a/pkg/api/factory.go +++ b/pkg/api/factory.go @@ -33,6 +33,11 @@ type MayFactory interface { GetAPIWithNamespace(namespace string) (API, error) } +// Factory creates an API instance +type MayFactoryWithMultipleAPIs interface { + GetAPIsWithNamespace(namespace string) (map[string]API, error) +} + type apiFactory struct { Settings @@ -211,3 +216,74 @@ func (f *apiFactory) GetAPIWithNamespace(namespace string) (API, error) { return f.apiMap[namespaceHasConfig], nil } + +// Returns a map of api in the namespace and api in the setting's namespace +func (f *apiFactory) GetAPIsWithNamespace(namespace string) (map[string]API, error) { + f.lock.Lock() + defer f.lock.Unlock() + + apis := make(map[string]API) + + if f.apiMap[namespace] != nil && f.apiMap[f.Settings.Namespace] != nil { + apis[namespace] = f.apiMap[namespace] + apis[f.Settings.Namespace] = f.apiMap[f.Settings.Namespace] + return apis, nil + } + + if f.apiMap[namespace] != nil { + apis[namespace] = f.apiMap[namespace] + api, err := f.getApiFromNamespace(f.Settings.Namespace) + if err == nil { + apis[f.Settings.Namespace] = api + f.apiMap[f.Settings.Namespace] = api + } + return apis, nil + } + + if f.apiMap[f.Settings.Namespace] != nil { + apis[f.Settings.Namespace] = f.apiMap[f.Settings.Namespace] + api, err := f.getApiFromNamespace(namespace) + if err == nil { + apis[namespace] = api + f.apiMap[namespace] = api + } + + return apis, nil + } + + //Where is nothing in cache, then we retrieve them + apiFromNamespace, err := f.getApiFromNamespace(namespace) + if err == nil { + apis[namespace] = apiFromNamespace + } + apiFromSettings, err := f.getApiFromNamespace(f.Settings.Namespace) + if err == nil { + apis[f.Settings.Namespace] = apiFromSettings + } + f.apiMap[namespace] = apiFromNamespace + f.apiMap[f.Settings.Namespace] = apiFromSettings + return apis, nil + +} + +func (f *apiFactory) getApiFromNamespace(namespace string) (API, error) { + cm, secret, err := f.getConfigMapAndSecretWithNamespace(namespace) + if err != nil { + if !k8serrors.IsNotFound(err) { + return nil, err + } + } + cfg, err := ParseConfig(cm, secret) + if err != nil { + return nil, err + } + getVars, err := f.InitGetVars(cfg, cm, secret) + if err != nil { + return nil, err + } + api, err := NewAPI(*cfg, getVars) + if err != nil { + return nil, err + } + return api, nil +} From dc25793c78a89b0addcf40220516769e6b2b89c2 Mon Sep 17 00:00:00 2001 From: May Zhang <may_zhang@intuit.com> Date: Tue, 9 May 2023 16:39:21 -0700 Subject: [PATCH 05/18] self-service from event --- pkg/api/factory.go | 5 +- pkg/controller/controller.go | 237 +++++++++++++++++++++++++++++------ 2 files changed, 203 insertions(+), 39 deletions(-) diff --git a/pkg/api/factory.go b/pkg/api/factory.go index 7231b7fb..68076c03 100644 --- a/pkg/api/factory.go +++ b/pkg/api/factory.go @@ -255,13 +255,14 @@ func (f *apiFactory) GetAPIsWithNamespace(namespace string) (map[string]API, err apiFromNamespace, err := f.getApiFromNamespace(namespace) if err == nil { apis[namespace] = apiFromNamespace + f.apiMap[namespace] = apiFromNamespace } apiFromSettings, err := f.getApiFromNamespace(f.Settings.Namespace) if err == nil { apis[f.Settings.Namespace] = apiFromSettings + f.apiMap[f.Settings.Namespace] = apiFromSettings } - f.apiMap[namespace] = apiFromNamespace - f.apiMap[f.Settings.Namespace] = apiFromSettings + return apis, nil } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index d4441302..e1df7b2d 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -142,16 +142,61 @@ func NewController( return ctrl } +func NewControllerWithMultipleNamespace( + client dynamic.NamespaceableResourceInterface, + informer cache.SharedIndexInformer, + apiFactoryWithMultipleNamespace api.MayFactoryWithMultipleAPIs, + opts ...Opts, +) *notificationController { + queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + informer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + queue.Add(key) + } + }, + UpdateFunc: func(old, new interface{}) { + key, err := cache.MetaNamespaceKeyFunc(new) + if err == nil { + queue.Add(key) + } + }, + }, + ) + + ctrl := ¬ificationController{ + client: client, + informer: informer, + queue: queue, + metricsRegistry: NewMetricsRegistry(""), + apiFactoryWithMultipleAPIs: apiFactoryWithMultipleNamespace, + toUnstructured: func(obj v1.Object) (*unstructured.Unstructured, error) { + res, ok := obj.(*unstructured.Unstructured) + if !ok { + return nil, fmt.Errorf("Object must be *unstructured.Unstructured but was: %v", res) + } + return res, nil + }, + } + for i := range opts { + opts[i](ctrl) + } + return ctrl +} + type notificationController struct { - client dynamic.NamespaceableResourceInterface - informer cache.SharedIndexInformer - queue workqueue.RateLimitingInterface - apiFactory api.Factory - metricsRegistry *MetricsRegistry - skipProcessing func(obj v1.Object) (bool, string) - alterDestinations func(obj v1.Object, destinations services.Destinations, cfg api.Config) services.Destinations - toUnstructured func(obj v1.Object) (*unstructured.Unstructured, error) - eventCallback func(eventSequence NotificationEventSequence) + client dynamic.NamespaceableResourceInterface + informer cache.SharedIndexInformer + queue workqueue.RateLimitingInterface + apiFactory api.Factory + metricsRegistry *MetricsRegistry + skipProcessing func(obj v1.Object) (bool, string) + alterDestinations func(obj v1.Object, destinations services.Destinations, cfg api.Config) services.Destinations + toUnstructured func(obj v1.Object) (*unstructured.Unstructured, error) + eventCallback func(eventSequence NotificationEventSequence) + apiFactoryWithMultipleAPIs api.MayFactoryWithMultipleAPIs } func (c *notificationController) Run(threadiness int, stopCh <-chan struct{}) { @@ -237,6 +282,74 @@ func (c *notificationController) processResource(resource v1.Object, logEntry *l return notificationsState.Persist(resource) } +func (c *notificationController) processResourceWithAPI(api api.API, apiNamespace string, resource v1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) (map[string]string, error) { + notificationsState := NewStateFromRes(resource) + //api, err := c.apiFactory.GetAPI() + //if err != nil { + // return nil, err + //} + + destinations := c.getDestinations(resource, api.GetConfig()) + if len(destinations) == 0 { + return resource.GetAnnotations(), nil + } + + un, err := c.toUnstructured(resource) + if err != nil { + return nil, err + } + + for trigger, destinations := range destinations { + res, err := api.RunTrigger(trigger, un.Object) + if err != nil { + logEntry.Debugf("Failed to execute condition of trigger %s: %v using the configuration in namespace %s", trigger, err, apiNamespace) + eventSequence.addWarning(fmt.Errorf("failed to execute condition of trigger %s: %v using the configuration in namespace %s", trigger, err, apiNamespace)) + } + logEntry.Infof("Trigger %s result: %v", trigger, res) + + for _, cr := range res { + c.metricsRegistry.IncTriggerEvaluationsCounter(trigger, cr.Triggered) + + if !cr.Triggered { + for _, to := range destinations { + notificationsState.SetAlreadyNotified(trigger, cr, to, false) + } + continue + } + + for _, to := range destinations { + if changed := notificationsState.SetAlreadyNotified(trigger, cr, to, true); !changed { + logEntry.Infof("Notification about condition '%s.%s' already sent to '%v' using the configuration in namespace %s", trigger, cr.Key, to, apiNamespace) + eventSequence.addDelivered(NotificationDelivery{ + Trigger: trigger, + Destination: to, + AlreadyNotified: true, + }) + } else { + logEntry.Infof("Sending notification about condition '%s.%s' to '%v' using the configuration in namespace %s", trigger, cr.Key, to, apiNamespace) + if err := api.Send(un.Object, cr.Templates, to); err != nil { + logEntry.Errorf("Failed to notify recipient %s defined in resource %s/%s: %v using the configuration in namespace %s", + to, resource.GetNamespace(), resource.GetName(), err, apiNamespace) + notificationsState.SetAlreadyNotified(trigger, cr, to, false) + c.metricsRegistry.IncDeliveriesCounter(trigger, to.Service, false) + eventSequence.addError(fmt.Errorf("failed to deliver notification %s to %s: %v using the configuration in namespace %s", trigger, to, err, apiNamespace)) + } else { + logEntry.Debugf("Notification %s was sent using the configuration in namespace %s", to.Recipient, apiNamespace) + c.metricsRegistry.IncDeliveriesCounter(trigger, to.Service, true) + eventSequence.addDelivered(NotificationDelivery{ + Trigger: trigger, + Destination: to, + AlreadyNotified: false, + }) + } + } + } + } + } + + return notificationsState.Persist(resource) +} + func (c *notificationController) getDestinations(resource v1.Object, cfg api.Config) services.Destinations { res := cfg.GetGlobalDestinations(resource.GetLabels()) res.Merge(subscriptions.NewAnnotations(resource.GetAnnotations()).GetDestinations(cfg.DefaultTriggers, cfg.ServiceDefaultTriggers)) @@ -294,42 +407,92 @@ func (c *notificationController) processQueueItem() (processNext bool) { } } - annotations, err := c.processResource(resource, logEntry, &eventSequence) - if err != nil { - logEntry.Errorf("Failed to process: %v", err) - eventSequence.addError(err) - return - } - - if !mapsEqual(resource.GetAnnotations(), annotations) { - annotationsPatch := make(map[string]interface{}) - for k, v := range annotations { - annotationsPatch[k] = v + if c.apiFactoryWithMultipleAPIs == nil { + annotations, err := c.processResource(resource, logEntry, &eventSequence) + if err != nil { + logEntry.Errorf("Failed to process: %v", err) + eventSequence.addError(err) + return } - for k := range resource.GetAnnotations() { - if _, ok = annotations[k]; !ok { - annotationsPatch[k] = nil + + if !mapsEqual(resource.GetAnnotations(), annotations) { + annotationsPatch := make(map[string]interface{}) + for k, v := range annotations { + annotationsPatch[k] = v + } + for k := range resource.GetAnnotations() { + if _, ok = annotations[k]; !ok { + annotationsPatch[k] = nil + } } - } - patchData, err := json.Marshal(map[string]map[string]interface{}{ - "metadata": {"annotations": annotationsPatch}, - }) - if err != nil { - logEntry.Errorf("Failed to marshal resource patch: %v", err) - eventSequence.addWarning(fmt.Errorf("failed to marshal annotations patch %v", err)) - return + patchData, err := json.Marshal(map[string]map[string]interface{}{ + "metadata": {"annotations": annotationsPatch}, + }) + if err != nil { + logEntry.Errorf("Failed to marshal resource patch: %v", err) + eventSequence.addWarning(fmt.Errorf("failed to marshal annotations patch %v", err)) + return + } + resource, err = c.client.Namespace(resource.GetNamespace()).Patch(context.Background(), resource.GetName(), types.MergePatchType, patchData, v1.PatchOptions{}) + if err != nil { + logEntry.Errorf("Failed to patch resource: %v", err) + eventSequence.addWarning(fmt.Errorf("failed to patch resource annotations %v", err)) + return + } + if err := c.informer.GetStore().Update(resource); err != nil { + logEntry.Warnf("Failed to store update resource in informer: %v", err) + eventSequence.addWarning(fmt.Errorf("failed to store update resource in informer: %v", err)) + } } - resource, err = c.client.Namespace(resource.GetNamespace()).Patch(context.Background(), resource.GetName(), types.MergePatchType, patchData, v1.PatchOptions{}) + } else { + apisWithNamespace, err := c.apiFactoryWithMultipleAPIs.GetAPIsWithNamespace(resource.GetNamespace()) if err != nil { - logEntry.Errorf("Failed to patch resource: %v", err) - eventSequence.addWarning(fmt.Errorf("failed to patch resource annotations %v", err)) + logEntry.Errorf("Failed to process: %v", err) + eventSequence.addError(err) return } - if err := c.informer.GetStore().Update(resource); err != nil { - logEntry.Warnf("Failed to store update resource in informer: %v", err) - eventSequence.addWarning(fmt.Errorf("failed to store update resource in informer: %v", err)) + for apiNamespace, api := range apisWithNamespace { + annotations, err := c.processResourceWithAPI(api, apiNamespace, resource, logEntry, &eventSequence) + if err != nil { + logEntry.Errorf("Failed to process: %v", err) + eventSequence.addError(err) + return + } + + if !mapsEqual(resource.GetAnnotations(), annotations) { + annotationsPatch := make(map[string]interface{}) + for k, v := range annotations { + annotationsPatch[k] = v + } + for k := range resource.GetAnnotations() { + if _, ok = annotations[k]; !ok { + annotationsPatch[k] = nil + } + } + + patchData, err := json.Marshal(map[string]map[string]interface{}{ + "metadata": {"annotations": annotationsPatch}, + }) + if err != nil { + logEntry.Errorf("Failed to marshal resource patch: %v", err) + eventSequence.addWarning(fmt.Errorf("failed to marshal annotations patch %v", err)) + return + } + resource, err = c.client.Namespace(resource.GetNamespace()).Patch(context.Background(), resource.GetName(), types.MergePatchType, patchData, v1.PatchOptions{}) + if err != nil { + logEntry.Errorf("Failed to patch resource: %v", err) + eventSequence.addWarning(fmt.Errorf("failed to patch resource annotations %v", err)) + return + } + if err := c.informer.GetStore().Update(resource); err != nil { + logEntry.Warnf("Failed to store update resource in informer: %v", err) + eventSequence.addWarning(fmt.Errorf("failed to store update resource in informer: %v", err)) + } + } } + + //end } logEntry.Info("Processing completed") From 714c7c3854a77573bce334dec5ac285259cb2f0b Mon Sep 17 00:00:00 2001 From: May Zhang <may_zhang@intuit.com> Date: Thu, 4 May 2023 10:43:10 -0700 Subject: [PATCH 06/18] self-service from event --- pkg/api/factory.go | 91 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 88 insertions(+), 3 deletions(-) diff --git a/pkg/api/factory.go b/pkg/api/factory.go index 9fb09853..c95b3131 100644 --- a/pkg/api/factory.go +++ b/pkg/api/factory.go @@ -5,6 +5,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" @@ -18,6 +19,8 @@ type Settings struct { SecretName string // InitGetVars returns a function that produces notifications context variables InitGetVars func(cfg *Config, configMap *v1.ConfigMap, secret *v1.Secret) (GetVars, error) + // Default namespace for ConfigMap and Secret + Namespace string } // Factory creates an API instance @@ -25,6 +28,11 @@ type Factory interface { GetAPI() (API, error) } +// Factory creates an API instance +type MayFactory interface { + GetAPIWithNamespace(namespace string) (API, error) +} + type apiFactory struct { Settings @@ -32,13 +40,20 @@ type apiFactory struct { secretLister v1listers.SecretNamespaceLister lock sync.Mutex api API + + cmInformer cache.SharedIndexInformer + secretsInformer cache.SharedIndexInformer + apiMap map[string]API } func NewFactory(settings Settings, namespace string, secretsInformer cache.SharedIndexInformer, cmInformer cache.SharedIndexInformer) *apiFactory { factory := &apiFactory{ - Settings: settings, - cmLister: v1listers.NewConfigMapLister(cmInformer.GetIndexer()).ConfigMaps(namespace), - secretLister: v1listers.NewSecretLister(secretsInformer.GetIndexer()).Secrets(namespace), + Settings: settings, + cmLister: v1listers.NewConfigMapLister(cmInformer.GetIndexer()).ConfigMaps(namespace), + secretLister: v1listers.NewSecretLister(secretsInformer.GetIndexer()).Secrets(namespace), + cmInformer: cmInformer, + secretsInformer: secretsInformer, + apiMap: make(map[string]API), } secretsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -96,10 +111,39 @@ func (f *apiFactory) getConfigMapAndSecret() (*v1.ConfigMap, *v1.Secret, error) return cm, secret, err } +func (f *apiFactory) getConfigMapAndSecretWithNamespace(namespace string) (*v1.ConfigMap, *v1.Secret, error) { + + cmLister := v1listers.NewConfigMapLister(f.cmInformer.GetIndexer()).ConfigMaps(namespace) + secretLister := v1listers.NewSecretLister(f.secretsInformer.GetIndexer()).Secrets(namespace) + + cm, err := cmLister.Get(f.ConfigMapName) + if err != nil { + if errors.IsNotFound(err) { + cm = &v1.ConfigMap{} + } else { + return nil, nil, err + } + } + + secret, err := secretLister.Get(f.SecretName) + if err != nil { + if errors.IsNotFound(err) { + secret = &v1.Secret{} + } else { + return nil, nil, err + } + } + + return cm, secret, err +} + func (f *apiFactory) invalidateCache() { f.lock.Lock() defer f.lock.Unlock() f.api = nil + for namespace := range f.apiMap { + f.apiMap[namespace] = nil + } } func (f *apiFactory) GetAPI() (API, error) { @@ -126,3 +170,44 @@ func (f *apiFactory) GetAPI() (API, error) { } return f.api, nil } + +func (f *apiFactory) GetAPIWithNamespace(namespace string) (API, error) { + f.lock.Lock() + defer f.lock.Unlock() + namespaceHasConfig := namespace + + if f.apiMap[namespaceHasConfig] != nil { + return f.apiMap[namespaceHasConfig], nil + } + + cm, secret, err := f.getConfigMapAndSecretWithNamespace(namespaceHasConfig) + if err != nil { + if !k8serrors.IsNotFound(err) { + return nil, err + } + // If could not find it in namespace, try the namespace from settings + namespaceHasConfig = f.Settings.Namespace + if f.apiMap[namespaceHasConfig] != nil { + return f.apiMap[namespaceHasConfig], nil + } + cm, secret, err = f.getConfigMapAndSecretWithNamespace(namespaceHasConfig) + if err != nil { + return nil, err + } + } + cfg, err := ParseConfig(cm, secret) + if err != nil { + return nil, err + } + getVars, err := f.InitGetVars(cfg, cm, secret) + if err != nil { + return nil, err + } + api, err := NewAPI(*cfg, getVars) + if err != nil { + return nil, err + } + f.apiMap[namespaceHasConfig] = api + + return f.apiMap[namespaceHasConfig], nil +} From c78fc69aed23023f2d2ea9a1fdc2f234a3ffe626 Mon Sep 17 00:00:00 2001 From: May Zhang <may_zhang@intuit.com> Date: Tue, 9 May 2023 10:05:42 -0700 Subject: [PATCH 07/18] self-service from event --- pkg/api/factory.go | 76 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/pkg/api/factory.go b/pkg/api/factory.go index c95b3131..7231b7fb 100644 --- a/pkg/api/factory.go +++ b/pkg/api/factory.go @@ -33,6 +33,11 @@ type MayFactory interface { GetAPIWithNamespace(namespace string) (API, error) } +// Factory creates an API instance +type MayFactoryWithMultipleAPIs interface { + GetAPIsWithNamespace(namespace string) (map[string]API, error) +} + type apiFactory struct { Settings @@ -211,3 +216,74 @@ func (f *apiFactory) GetAPIWithNamespace(namespace string) (API, error) { return f.apiMap[namespaceHasConfig], nil } + +// Returns a map of api in the namespace and api in the setting's namespace +func (f *apiFactory) GetAPIsWithNamespace(namespace string) (map[string]API, error) { + f.lock.Lock() + defer f.lock.Unlock() + + apis := make(map[string]API) + + if f.apiMap[namespace] != nil && f.apiMap[f.Settings.Namespace] != nil { + apis[namespace] = f.apiMap[namespace] + apis[f.Settings.Namespace] = f.apiMap[f.Settings.Namespace] + return apis, nil + } + + if f.apiMap[namespace] != nil { + apis[namespace] = f.apiMap[namespace] + api, err := f.getApiFromNamespace(f.Settings.Namespace) + if err == nil { + apis[f.Settings.Namespace] = api + f.apiMap[f.Settings.Namespace] = api + } + return apis, nil + } + + if f.apiMap[f.Settings.Namespace] != nil { + apis[f.Settings.Namespace] = f.apiMap[f.Settings.Namespace] + api, err := f.getApiFromNamespace(namespace) + if err == nil { + apis[namespace] = api + f.apiMap[namespace] = api + } + + return apis, nil + } + + //Where is nothing in cache, then we retrieve them + apiFromNamespace, err := f.getApiFromNamespace(namespace) + if err == nil { + apis[namespace] = apiFromNamespace + } + apiFromSettings, err := f.getApiFromNamespace(f.Settings.Namespace) + if err == nil { + apis[f.Settings.Namespace] = apiFromSettings + } + f.apiMap[namespace] = apiFromNamespace + f.apiMap[f.Settings.Namespace] = apiFromSettings + return apis, nil + +} + +func (f *apiFactory) getApiFromNamespace(namespace string) (API, error) { + cm, secret, err := f.getConfigMapAndSecretWithNamespace(namespace) + if err != nil { + if !k8serrors.IsNotFound(err) { + return nil, err + } + } + cfg, err := ParseConfig(cm, secret) + if err != nil { + return nil, err + } + getVars, err := f.InitGetVars(cfg, cm, secret) + if err != nil { + return nil, err + } + api, err := NewAPI(*cfg, getVars) + if err != nil { + return nil, err + } + return api, nil +} From 8c380027bf2d6a898f2196bb629d883bc40cd95c Mon Sep 17 00:00:00 2001 From: May Zhang <may_zhang@intuit.com> Date: Tue, 9 May 2023 16:39:21 -0700 Subject: [PATCH 08/18] self-service from event --- pkg/api/factory.go | 5 +- pkg/controller/controller.go | 237 +++++++++++++++++++++++++++++------ 2 files changed, 203 insertions(+), 39 deletions(-) diff --git a/pkg/api/factory.go b/pkg/api/factory.go index 7231b7fb..68076c03 100644 --- a/pkg/api/factory.go +++ b/pkg/api/factory.go @@ -255,13 +255,14 @@ func (f *apiFactory) GetAPIsWithNamespace(namespace string) (map[string]API, err apiFromNamespace, err := f.getApiFromNamespace(namespace) if err == nil { apis[namespace] = apiFromNamespace + f.apiMap[namespace] = apiFromNamespace } apiFromSettings, err := f.getApiFromNamespace(f.Settings.Namespace) if err == nil { apis[f.Settings.Namespace] = apiFromSettings + f.apiMap[f.Settings.Namespace] = apiFromSettings } - f.apiMap[namespace] = apiFromNamespace - f.apiMap[f.Settings.Namespace] = apiFromSettings + return apis, nil } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index d4441302..e1df7b2d 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -142,16 +142,61 @@ func NewController( return ctrl } +func NewControllerWithMultipleNamespace( + client dynamic.NamespaceableResourceInterface, + informer cache.SharedIndexInformer, + apiFactoryWithMultipleNamespace api.MayFactoryWithMultipleAPIs, + opts ...Opts, +) *notificationController { + queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + informer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + queue.Add(key) + } + }, + UpdateFunc: func(old, new interface{}) { + key, err := cache.MetaNamespaceKeyFunc(new) + if err == nil { + queue.Add(key) + } + }, + }, + ) + + ctrl := ¬ificationController{ + client: client, + informer: informer, + queue: queue, + metricsRegistry: NewMetricsRegistry(""), + apiFactoryWithMultipleAPIs: apiFactoryWithMultipleNamespace, + toUnstructured: func(obj v1.Object) (*unstructured.Unstructured, error) { + res, ok := obj.(*unstructured.Unstructured) + if !ok { + return nil, fmt.Errorf("Object must be *unstructured.Unstructured but was: %v", res) + } + return res, nil + }, + } + for i := range opts { + opts[i](ctrl) + } + return ctrl +} + type notificationController struct { - client dynamic.NamespaceableResourceInterface - informer cache.SharedIndexInformer - queue workqueue.RateLimitingInterface - apiFactory api.Factory - metricsRegistry *MetricsRegistry - skipProcessing func(obj v1.Object) (bool, string) - alterDestinations func(obj v1.Object, destinations services.Destinations, cfg api.Config) services.Destinations - toUnstructured func(obj v1.Object) (*unstructured.Unstructured, error) - eventCallback func(eventSequence NotificationEventSequence) + client dynamic.NamespaceableResourceInterface + informer cache.SharedIndexInformer + queue workqueue.RateLimitingInterface + apiFactory api.Factory + metricsRegistry *MetricsRegistry + skipProcessing func(obj v1.Object) (bool, string) + alterDestinations func(obj v1.Object, destinations services.Destinations, cfg api.Config) services.Destinations + toUnstructured func(obj v1.Object) (*unstructured.Unstructured, error) + eventCallback func(eventSequence NotificationEventSequence) + apiFactoryWithMultipleAPIs api.MayFactoryWithMultipleAPIs } func (c *notificationController) Run(threadiness int, stopCh <-chan struct{}) { @@ -237,6 +282,74 @@ func (c *notificationController) processResource(resource v1.Object, logEntry *l return notificationsState.Persist(resource) } +func (c *notificationController) processResourceWithAPI(api api.API, apiNamespace string, resource v1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) (map[string]string, error) { + notificationsState := NewStateFromRes(resource) + //api, err := c.apiFactory.GetAPI() + //if err != nil { + // return nil, err + //} + + destinations := c.getDestinations(resource, api.GetConfig()) + if len(destinations) == 0 { + return resource.GetAnnotations(), nil + } + + un, err := c.toUnstructured(resource) + if err != nil { + return nil, err + } + + for trigger, destinations := range destinations { + res, err := api.RunTrigger(trigger, un.Object) + if err != nil { + logEntry.Debugf("Failed to execute condition of trigger %s: %v using the configuration in namespace %s", trigger, err, apiNamespace) + eventSequence.addWarning(fmt.Errorf("failed to execute condition of trigger %s: %v using the configuration in namespace %s", trigger, err, apiNamespace)) + } + logEntry.Infof("Trigger %s result: %v", trigger, res) + + for _, cr := range res { + c.metricsRegistry.IncTriggerEvaluationsCounter(trigger, cr.Triggered) + + if !cr.Triggered { + for _, to := range destinations { + notificationsState.SetAlreadyNotified(trigger, cr, to, false) + } + continue + } + + for _, to := range destinations { + if changed := notificationsState.SetAlreadyNotified(trigger, cr, to, true); !changed { + logEntry.Infof("Notification about condition '%s.%s' already sent to '%v' using the configuration in namespace %s", trigger, cr.Key, to, apiNamespace) + eventSequence.addDelivered(NotificationDelivery{ + Trigger: trigger, + Destination: to, + AlreadyNotified: true, + }) + } else { + logEntry.Infof("Sending notification about condition '%s.%s' to '%v' using the configuration in namespace %s", trigger, cr.Key, to, apiNamespace) + if err := api.Send(un.Object, cr.Templates, to); err != nil { + logEntry.Errorf("Failed to notify recipient %s defined in resource %s/%s: %v using the configuration in namespace %s", + to, resource.GetNamespace(), resource.GetName(), err, apiNamespace) + notificationsState.SetAlreadyNotified(trigger, cr, to, false) + c.metricsRegistry.IncDeliveriesCounter(trigger, to.Service, false) + eventSequence.addError(fmt.Errorf("failed to deliver notification %s to %s: %v using the configuration in namespace %s", trigger, to, err, apiNamespace)) + } else { + logEntry.Debugf("Notification %s was sent using the configuration in namespace %s", to.Recipient, apiNamespace) + c.metricsRegistry.IncDeliveriesCounter(trigger, to.Service, true) + eventSequence.addDelivered(NotificationDelivery{ + Trigger: trigger, + Destination: to, + AlreadyNotified: false, + }) + } + } + } + } + } + + return notificationsState.Persist(resource) +} + func (c *notificationController) getDestinations(resource v1.Object, cfg api.Config) services.Destinations { res := cfg.GetGlobalDestinations(resource.GetLabels()) res.Merge(subscriptions.NewAnnotations(resource.GetAnnotations()).GetDestinations(cfg.DefaultTriggers, cfg.ServiceDefaultTriggers)) @@ -294,42 +407,92 @@ func (c *notificationController) processQueueItem() (processNext bool) { } } - annotations, err := c.processResource(resource, logEntry, &eventSequence) - if err != nil { - logEntry.Errorf("Failed to process: %v", err) - eventSequence.addError(err) - return - } - - if !mapsEqual(resource.GetAnnotations(), annotations) { - annotationsPatch := make(map[string]interface{}) - for k, v := range annotations { - annotationsPatch[k] = v + if c.apiFactoryWithMultipleAPIs == nil { + annotations, err := c.processResource(resource, logEntry, &eventSequence) + if err != nil { + logEntry.Errorf("Failed to process: %v", err) + eventSequence.addError(err) + return } - for k := range resource.GetAnnotations() { - if _, ok = annotations[k]; !ok { - annotationsPatch[k] = nil + + if !mapsEqual(resource.GetAnnotations(), annotations) { + annotationsPatch := make(map[string]interface{}) + for k, v := range annotations { + annotationsPatch[k] = v + } + for k := range resource.GetAnnotations() { + if _, ok = annotations[k]; !ok { + annotationsPatch[k] = nil + } } - } - patchData, err := json.Marshal(map[string]map[string]interface{}{ - "metadata": {"annotations": annotationsPatch}, - }) - if err != nil { - logEntry.Errorf("Failed to marshal resource patch: %v", err) - eventSequence.addWarning(fmt.Errorf("failed to marshal annotations patch %v", err)) - return + patchData, err := json.Marshal(map[string]map[string]interface{}{ + "metadata": {"annotations": annotationsPatch}, + }) + if err != nil { + logEntry.Errorf("Failed to marshal resource patch: %v", err) + eventSequence.addWarning(fmt.Errorf("failed to marshal annotations patch %v", err)) + return + } + resource, err = c.client.Namespace(resource.GetNamespace()).Patch(context.Background(), resource.GetName(), types.MergePatchType, patchData, v1.PatchOptions{}) + if err != nil { + logEntry.Errorf("Failed to patch resource: %v", err) + eventSequence.addWarning(fmt.Errorf("failed to patch resource annotations %v", err)) + return + } + if err := c.informer.GetStore().Update(resource); err != nil { + logEntry.Warnf("Failed to store update resource in informer: %v", err) + eventSequence.addWarning(fmt.Errorf("failed to store update resource in informer: %v", err)) + } } - resource, err = c.client.Namespace(resource.GetNamespace()).Patch(context.Background(), resource.GetName(), types.MergePatchType, patchData, v1.PatchOptions{}) + } else { + apisWithNamespace, err := c.apiFactoryWithMultipleAPIs.GetAPIsWithNamespace(resource.GetNamespace()) if err != nil { - logEntry.Errorf("Failed to patch resource: %v", err) - eventSequence.addWarning(fmt.Errorf("failed to patch resource annotations %v", err)) + logEntry.Errorf("Failed to process: %v", err) + eventSequence.addError(err) return } - if err := c.informer.GetStore().Update(resource); err != nil { - logEntry.Warnf("Failed to store update resource in informer: %v", err) - eventSequence.addWarning(fmt.Errorf("failed to store update resource in informer: %v", err)) + for apiNamespace, api := range apisWithNamespace { + annotations, err := c.processResourceWithAPI(api, apiNamespace, resource, logEntry, &eventSequence) + if err != nil { + logEntry.Errorf("Failed to process: %v", err) + eventSequence.addError(err) + return + } + + if !mapsEqual(resource.GetAnnotations(), annotations) { + annotationsPatch := make(map[string]interface{}) + for k, v := range annotations { + annotationsPatch[k] = v + } + for k := range resource.GetAnnotations() { + if _, ok = annotations[k]; !ok { + annotationsPatch[k] = nil + } + } + + patchData, err := json.Marshal(map[string]map[string]interface{}{ + "metadata": {"annotations": annotationsPatch}, + }) + if err != nil { + logEntry.Errorf("Failed to marshal resource patch: %v", err) + eventSequence.addWarning(fmt.Errorf("failed to marshal annotations patch %v", err)) + return + } + resource, err = c.client.Namespace(resource.GetNamespace()).Patch(context.Background(), resource.GetName(), types.MergePatchType, patchData, v1.PatchOptions{}) + if err != nil { + logEntry.Errorf("Failed to patch resource: %v", err) + eventSequence.addWarning(fmt.Errorf("failed to patch resource annotations %v", err)) + return + } + if err := c.informer.GetStore().Update(resource); err != nil { + logEntry.Warnf("Failed to store update resource in informer: %v", err) + eventSequence.addWarning(fmt.Errorf("failed to store update resource in informer: %v", err)) + } + } } + + //end } logEntry.Info("Processing completed") From ad9af613f505804d7773bcbf714d448ea618eec7 Mon Sep 17 00:00:00 2001 From: May Zhang <may_zhang@intuit.com> Date: Wed, 10 May 2023 15:57:38 -0700 Subject: [PATCH 09/18] clean up factory.go --- pkg/api/factory.go | 58 +++++----------------------------------------- 1 file changed, 6 insertions(+), 52 deletions(-) diff --git a/pkg/api/factory.go b/pkg/api/factory.go index 68076c03..3f4afd49 100644 --- a/pkg/api/factory.go +++ b/pkg/api/factory.go @@ -5,7 +5,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" @@ -28,12 +27,9 @@ type Factory interface { GetAPI() (API, error) } -// Factory creates an API instance -type MayFactory interface { - GetAPIWithNamespace(namespace string) (API, error) -} - -// Factory creates an API instance +// Factory creates a map of APIs that include +// api in the namespace specified in input parameter +// and api in the namespace specified in the Settings type MayFactoryWithMultipleAPIs interface { GetAPIsWithNamespace(namespace string) (map[string]API, error) } @@ -116,8 +112,7 @@ func (f *apiFactory) getConfigMapAndSecret() (*v1.ConfigMap, *v1.Secret, error) return cm, secret, err } -func (f *apiFactory) getConfigMapAndSecretWithNamespace(namespace string) (*v1.ConfigMap, *v1.Secret, error) { - +func (f *apiFactory) getConfigMapAndSecretInNamespace(namespace string) (*v1.ConfigMap, *v1.Secret, error) { cmLister := v1listers.NewConfigMapLister(f.cmInformer.GetIndexer()).ConfigMaps(namespace) secretLister := v1listers.NewSecretLister(f.secretsInformer.GetIndexer()).Secrets(namespace) @@ -176,47 +171,6 @@ func (f *apiFactory) GetAPI() (API, error) { return f.api, nil } -func (f *apiFactory) GetAPIWithNamespace(namespace string) (API, error) { - f.lock.Lock() - defer f.lock.Unlock() - namespaceHasConfig := namespace - - if f.apiMap[namespaceHasConfig] != nil { - return f.apiMap[namespaceHasConfig], nil - } - - cm, secret, err := f.getConfigMapAndSecretWithNamespace(namespaceHasConfig) - if err != nil { - if !k8serrors.IsNotFound(err) { - return nil, err - } - // If could not find it in namespace, try the namespace from settings - namespaceHasConfig = f.Settings.Namespace - if f.apiMap[namespaceHasConfig] != nil { - return f.apiMap[namespaceHasConfig], nil - } - cm, secret, err = f.getConfigMapAndSecretWithNamespace(namespaceHasConfig) - if err != nil { - return nil, err - } - } - cfg, err := ParseConfig(cm, secret) - if err != nil { - return nil, err - } - getVars, err := f.InitGetVars(cfg, cm, secret) - if err != nil { - return nil, err - } - api, err := NewAPI(*cfg, getVars) - if err != nil { - return nil, err - } - f.apiMap[namespaceHasConfig] = api - - return f.apiMap[namespaceHasConfig], nil -} - // Returns a map of api in the namespace and api in the setting's namespace func (f *apiFactory) GetAPIsWithNamespace(namespace string) (map[string]API, error) { f.lock.Lock() @@ -268,9 +222,9 @@ func (f *apiFactory) GetAPIsWithNamespace(namespace string) (map[string]API, err } func (f *apiFactory) getApiFromNamespace(namespace string) (API, error) { - cm, secret, err := f.getConfigMapAndSecretWithNamespace(namespace) + cm, secret, err := f.getConfigMapAndSecretInNamespace(namespace) if err != nil { - if !k8serrors.IsNotFound(err) { + if !errors.IsNotFound(err) { return nil, err } } From 3669f0a5c8aaf7cabe6998a95ea081ae9cc706c8 Mon Sep 17 00:00:00 2001 From: May Zhang <may_zhang@intuit.com> Date: Wed, 10 May 2023 16:31:51 -0700 Subject: [PATCH 10/18] error handleing in factory.go --- pkg/api/factory.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/pkg/api/factory.go b/pkg/api/factory.go index 3f4afd49..9e2ee246 100644 --- a/pkg/api/factory.go +++ b/pkg/api/factory.go @@ -197,36 +197,36 @@ func (f *apiFactory) GetAPIsWithNamespace(namespace string) (map[string]API, err if f.apiMap[f.Settings.Namespace] != nil { apis[f.Settings.Namespace] = f.apiMap[f.Settings.Namespace] api, err := f.getApiFromNamespace(namespace) - if err == nil { - apis[namespace] = api - f.apiMap[namespace] = api + if err != nil { + return nil, err } - + apis[namespace] = api + f.apiMap[namespace] = api return apis, nil } //Where is nothing in cache, then we retrieve them apiFromNamespace, err := f.getApiFromNamespace(namespace) - if err == nil { - apis[namespace] = apiFromNamespace - f.apiMap[namespace] = apiFromNamespace + if err != nil { + return nil, err } + apis[namespace] = apiFromNamespace + f.apiMap[namespace] = apiFromNamespace + apiFromSettings, err := f.getApiFromNamespace(f.Settings.Namespace) - if err == nil { - apis[f.Settings.Namespace] = apiFromSettings - f.apiMap[f.Settings.Namespace] = apiFromSettings + if err != nil { + return nil, err } + apis[f.Settings.Namespace] = apiFromSettings + f.apiMap[f.Settings.Namespace] = apiFromSettings return apis, nil - } func (f *apiFactory) getApiFromNamespace(namespace string) (API, error) { cm, secret, err := f.getConfigMapAndSecretInNamespace(namespace) if err != nil { - if !errors.IsNotFound(err) { - return nil, err - } + return nil, err } cfg, err := ParseConfig(cm, secret) if err != nil { From 7a729d6089ccbcb80ad680a268971cdb5425b313 Mon Sep 17 00:00:00 2001 From: May Zhang <may_zhang@intuit.com> Date: Thu, 11 May 2023 08:52:08 -0700 Subject: [PATCH 11/18] error handleing in factory.go --- pkg/api/factory.go | 28 +++++----------------------- 1 file changed, 5 insertions(+), 23 deletions(-) diff --git a/pkg/api/factory.go b/pkg/api/factory.go index 9e2ee246..27a85582 100644 --- a/pkg/api/factory.go +++ b/pkg/api/factory.go @@ -90,8 +90,8 @@ func (f *apiFactory) invalidateIfHasName(name string, obj interface{}) { } } -func (f *apiFactory) getConfigMapAndSecret() (*v1.ConfigMap, *v1.Secret, error) { - cm, err := f.cmLister.Get(f.ConfigMapName) +func (f *apiFactory) getConfigMapAndSecret(cmLister v1listers.ConfigMapNamespaceLister, secretLister v1listers.SecretNamespaceLister) (*v1.ConfigMap, *v1.Secret, error) { + cm, err := cmLister.Get(f.ConfigMapName) if err != nil { if errors.IsNotFound(err) { cm = &v1.ConfigMap{} @@ -100,7 +100,7 @@ func (f *apiFactory) getConfigMapAndSecret() (*v1.ConfigMap, *v1.Secret, error) } } - secret, err := f.secretLister.Get(f.SecretName) + secret, err := secretLister.Get(f.SecretName) if err != nil { if errors.IsNotFound(err) { secret = &v1.Secret{} @@ -116,25 +116,7 @@ func (f *apiFactory) getConfigMapAndSecretInNamespace(namespace string) (*v1.Con cmLister := v1listers.NewConfigMapLister(f.cmInformer.GetIndexer()).ConfigMaps(namespace) secretLister := v1listers.NewSecretLister(f.secretsInformer.GetIndexer()).Secrets(namespace) - cm, err := cmLister.Get(f.ConfigMapName) - if err != nil { - if errors.IsNotFound(err) { - cm = &v1.ConfigMap{} - } else { - return nil, nil, err - } - } - - secret, err := secretLister.Get(f.SecretName) - if err != nil { - if errors.IsNotFound(err) { - secret = &v1.Secret{} - } else { - return nil, nil, err - } - } - - return cm, secret, err + return f.getConfigMapAndSecret(cmLister, secretLister) } func (f *apiFactory) invalidateCache() { @@ -150,7 +132,7 @@ func (f *apiFactory) GetAPI() (API, error) { f.lock.Lock() defer f.lock.Unlock() if f.api == nil { - cm, secret, err := f.getConfigMapAndSecret() + cm, secret, err := f.getConfigMapAndSecret(f.cmLister, f.secretLister) if err != nil { return nil, err } From c0d5bab047b5cdbd8a9c1ada279deb88f4258c9f Mon Sep 17 00:00:00 2001 From: May Zhang <may_zhang@intuit.com> Date: Mon, 15 May 2023 12:57:54 -0700 Subject: [PATCH 12/18] error handleing in factory.go --- pkg/api/factory.go | 69 +++++++++++++++++++----------------- pkg/controller/controller.go | 4 +-- 2 files changed, 39 insertions(+), 34 deletions(-) diff --git a/pkg/api/factory.go b/pkg/api/factory.go index 27a85582..39d76c75 100644 --- a/pkg/api/factory.go +++ b/pkg/api/factory.go @@ -18,7 +18,9 @@ type Settings struct { SecretName string // InitGetVars returns a function that produces notifications context variables InitGetVars func(cfg *Config, configMap *v1.ConfigMap, secret *v1.Secret) (GetVars, error) - // Default namespace for ConfigMap and Secret + // Default namespace for ConfigMap and Secret. + // For self-service notification, we get notification configurations from rollout resource namespace + // and also the default namespace Namespace string } @@ -27,10 +29,10 @@ type Factory interface { GetAPI() (API, error) } -// Factory creates a map of APIs that include +// For self-service notification, factory creates a map of APIs that include // api in the namespace specified in input parameter // and api in the namespace specified in the Settings -type MayFactoryWithMultipleAPIs interface { +type FactoryWithMultipleAPIs interface { GetAPIsWithNamespace(namespace string) (map[string]API, error) } @@ -42,6 +44,7 @@ type apiFactory struct { lock sync.Mutex api API + // For self-service notification cmInformer cache.SharedIndexInformer secretsInformer cache.SharedIndexInformer apiMap map[string]API @@ -49,9 +52,11 @@ type apiFactory struct { func NewFactory(settings Settings, namespace string, secretsInformer cache.SharedIndexInformer, cmInformer cache.SharedIndexInformer) *apiFactory { factory := &apiFactory{ - Settings: settings, - cmLister: v1listers.NewConfigMapLister(cmInformer.GetIndexer()).ConfigMaps(namespace), - secretLister: v1listers.NewSecretLister(secretsInformer.GetIndexer()).Secrets(namespace), + Settings: settings, + cmLister: v1listers.NewConfigMapLister(cmInformer.GetIndexer()).ConfigMaps(namespace), + secretLister: v1listers.NewSecretLister(secretsInformer.GetIndexer()).Secrets(namespace), + + // For self-service notification cmInformer: cmInformer, secretsInformer: secretsInformer, apiMap: make(map[string]API), @@ -136,15 +141,8 @@ func (f *apiFactory) GetAPI() (API, error) { if err != nil { return nil, err } - cfg, err := ParseConfig(cm, secret) - if err != nil { - return nil, err - } - getVars, err := f.InitGetVars(cfg, cm, secret) - if err != nil { - return nil, err - } - api, err := NewAPI(*cfg, getVars) + + api, err := f.getApiFromConfigmapAndSecret(cm, secret) if err != nil { return nil, err } @@ -153,7 +151,7 @@ func (f *apiFactory) GetAPI() (API, error) { return f.api, nil } -// Returns a map of api in the namespace and api in the setting's namespace +// For self-service notification, we need a map of apis which include api in the namespace and api in the setting's namespace func (f *apiFactory) GetAPIsWithNamespace(namespace string) (map[string]API, error) { f.lock.Lock() defer f.lock.Unlock() @@ -179,30 +177,32 @@ func (f *apiFactory) GetAPIsWithNamespace(namespace string) (map[string]API, err if f.apiMap[f.Settings.Namespace] != nil { apis[f.Settings.Namespace] = f.apiMap[f.Settings.Namespace] api, err := f.getApiFromNamespace(namespace) - if err != nil { - return nil, err + if err == nil { + apis[namespace] = api + f.apiMap[namespace] = api } - apis[namespace] = api - f.apiMap[namespace] = api return apis, nil } - //Where is nothing in cache, then we retrieve them - apiFromNamespace, err := f.getApiFromNamespace(namespace) - if err != nil { - return nil, err + apiFromNamespace, errApiFromNamespace := f.getApiFromNamespace(namespace) + apiFromSettings, errApiFromSettings := f.getApiFromNamespace(f.Settings.Namespace) + + if errApiFromNamespace == nil { + apis[namespace] = apiFromNamespace + f.apiMap[namespace] = apiFromNamespace } - apis[namespace] = apiFromNamespace - f.apiMap[namespace] = apiFromNamespace - apiFromSettings, err := f.getApiFromNamespace(f.Settings.Namespace) - if err != nil { - return nil, err + if errApiFromSettings == nil { + apis[f.Settings.Namespace] = apiFromSettings + f.apiMap[f.Settings.Namespace] = apiFromSettings } - apis[f.Settings.Namespace] = apiFromSettings - f.apiMap[f.Settings.Namespace] = apiFromSettings - return apis, nil + // Only return error when we received error from both namespace provided in the input paremeter and settings' namespace + if errApiFromNamespace != nil && errApiFromSettings != nil { + return apis, errApiFromSettings + } else { + return apis, nil + } } func (f *apiFactory) getApiFromNamespace(namespace string) (API, error) { @@ -210,6 +210,11 @@ func (f *apiFactory) getApiFromNamespace(namespace string) (API, error) { if err != nil { return nil, err } + return f.getApiFromConfigmapAndSecret(cm, secret) + +} + +func (f *apiFactory) getApiFromConfigmapAndSecret(cm *v1.ConfigMap, secret *v1.Secret) (API, error) { cfg, err := ParseConfig(cm, secret) if err != nil { return nil, err diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index e1df7b2d..813824b4 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -145,7 +145,7 @@ func NewController( func NewControllerWithMultipleNamespace( client dynamic.NamespaceableResourceInterface, informer cache.SharedIndexInformer, - apiFactoryWithMultipleNamespace api.MayFactoryWithMultipleAPIs, + apiFactoryWithMultipleNamespace api.FactoryWithMultipleAPIs, opts ...Opts, ) *notificationController { queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) @@ -196,7 +196,7 @@ type notificationController struct { alterDestinations func(obj v1.Object, destinations services.Destinations, cfg api.Config) services.Destinations toUnstructured func(obj v1.Object) (*unstructured.Unstructured, error) eventCallback func(eventSequence NotificationEventSequence) - apiFactoryWithMultipleAPIs api.MayFactoryWithMultipleAPIs + apiFactoryWithMultipleAPIs api.FactoryWithMultipleAPIs } func (c *notificationController) Run(threadiness int, stopCh <-chan struct{}) { From b535cadfc14ef8cc0211fdeb9796ccce177d5d7d Mon Sep 17 00:00:00 2001 From: May Zhang <may_zhang@intuit.com> Date: Mon, 15 May 2023 13:04:18 -0700 Subject: [PATCH 13/18] error handleing in factory.go --- pkg/api/factory.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/api/factory.go b/pkg/api/factory.go index 39d76c75..f9c07619 100644 --- a/pkg/api/factory.go +++ b/pkg/api/factory.go @@ -95,7 +95,7 @@ func (f *apiFactory) invalidateIfHasName(name string, obj interface{}) { } } -func (f *apiFactory) getConfigMapAndSecret(cmLister v1listers.ConfigMapNamespaceLister, secretLister v1listers.SecretNamespaceLister) (*v1.ConfigMap, *v1.Secret, error) { +func (f *apiFactory) getConfigMapAndSecretWithListers(cmLister v1listers.ConfigMapNamespaceLister, secretLister v1listers.SecretNamespaceLister) (*v1.ConfigMap, *v1.Secret, error) { cm, err := cmLister.Get(f.ConfigMapName) if err != nil { if errors.IsNotFound(err) { @@ -117,11 +117,11 @@ func (f *apiFactory) getConfigMapAndSecret(cmLister v1listers.ConfigMapNamespace return cm, secret, err } -func (f *apiFactory) getConfigMapAndSecretInNamespace(namespace string) (*v1.ConfigMap, *v1.Secret, error) { +func (f *apiFactory) getConfigMapAndSecret(namespace string) (*v1.ConfigMap, *v1.Secret, error) { cmLister := v1listers.NewConfigMapLister(f.cmInformer.GetIndexer()).ConfigMaps(namespace) secretLister := v1listers.NewSecretLister(f.secretsInformer.GetIndexer()).Secrets(namespace) - return f.getConfigMapAndSecret(cmLister, secretLister) + return f.getConfigMapAndSecretWithListers(cmLister, secretLister) } func (f *apiFactory) invalidateCache() { @@ -137,7 +137,7 @@ func (f *apiFactory) GetAPI() (API, error) { f.lock.Lock() defer f.lock.Unlock() if f.api == nil { - cm, secret, err := f.getConfigMapAndSecret(f.cmLister, f.secretLister) + cm, secret, err := f.getConfigMapAndSecretWithListers(f.cmLister, f.secretLister) if err != nil { return nil, err } @@ -206,7 +206,7 @@ func (f *apiFactory) GetAPIsWithNamespace(namespace string) (map[string]API, err } func (f *apiFactory) getApiFromNamespace(namespace string) (API, error) { - cm, secret, err := f.getConfigMapAndSecretInNamespace(namespace) + cm, secret, err := f.getConfigMapAndSecret(namespace) if err != nil { return nil, err } From f7be52d8f90be30b9beeeccf432ccd2f511b1c90 Mon Sep 17 00:00:00 2001 From: May Zhang <may_zhang@intuit.com> Date: Mon, 15 May 2023 14:07:00 -0700 Subject: [PATCH 14/18] error handleing in factory.go --- pkg/controller/controller.go | 39 +++++------------------------------- 1 file changed, 5 insertions(+), 34 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 813824b4..d9d78b29 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -142,47 +142,18 @@ func NewController( return ctrl } +// For self-service notification +// This controller is using FactoryWithMultipleAPIs func NewControllerWithMultipleNamespace( client dynamic.NamespaceableResourceInterface, informer cache.SharedIndexInformer, apiFactoryWithMultipleNamespace api.FactoryWithMultipleAPIs, opts ...Opts, ) *notificationController { - queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) - informer.AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - key, err := cache.MetaNamespaceKeyFunc(obj) - if err == nil { - queue.Add(key) - } - }, - UpdateFunc: func(old, new interface{}) { - key, err := cache.MetaNamespaceKeyFunc(new) - if err == nil { - queue.Add(key) - } - }, - }, - ) - ctrl := ¬ificationController{ - client: client, - informer: informer, - queue: queue, - metricsRegistry: NewMetricsRegistry(""), - apiFactoryWithMultipleAPIs: apiFactoryWithMultipleNamespace, - toUnstructured: func(obj v1.Object) (*unstructured.Unstructured, error) { - res, ok := obj.(*unstructured.Unstructured) - if !ok { - return nil, fmt.Errorf("Object must be *unstructured.Unstructured but was: %v", res) - } - return res, nil - }, - } - for i := range opts { - opts[i](ctrl) - } + ctrl := NewController(client, informer, nil, opts...) + ctrl.apiFactoryWithMultipleAPIs = apiFactoryWithMultipleNamespace + return ctrl } From 8e233b88c89cfd0e662e9b9f7ae0fe4fd12a0b68 Mon Sep 17 00:00:00 2001 From: May Zhang <may_zhang@intuit.com> Date: Mon, 15 May 2023 17:10:33 -0700 Subject: [PATCH 15/18] error handleing in factory.go --- pkg/api/config.go | 2 + pkg/controller/controller.go | 190 ++++++++---------------------- pkg/controller/controller_test.go | 8 +- 3 files changed, 52 insertions(+), 148 deletions(-) diff --git a/pkg/api/config.go b/pkg/api/config.go index eefc41db..cf1d0e0d 100644 --- a/pkg/api/config.go +++ b/pkg/api/config.go @@ -29,6 +29,7 @@ type Config struct { DefaultTriggers []string // ServiceDefaultTriggers holds list of default triggers per service ServiceDefaultTriggers map[string][]string + Namespace string } // Returns list of destinations for the specified trigger @@ -76,6 +77,7 @@ func ParseConfig(configMap *v1.ConfigMap, secret *v1.Secret) (*Config, error) { Triggers: map[string][]triggers.Condition{}, ServiceDefaultTriggers: map[string][]string{}, Templates: map[string]services.Notification{}, + Namespace: configMap.Namespace, } if subscriptionYaml, ok := configMap.Data["subscriptions"]; ok { if err := yaml.Unmarshal([]byte(subscriptionYaml), &cfg.Subscriptions); err != nil { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index d9d78b29..b4a32f52 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -185,80 +185,9 @@ func (c *notificationController) Run(threadiness int, stopCh <-chan struct{}) { log.Warn("Controller has stopped.") } -func (c *notificationController) processResource(resource v1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) (map[string]string, error) { +func (c *notificationController) processResourceWithAPI(api api.API, resource v1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) (map[string]string, error) { + apiNamespace := api.GetConfig().Namespace notificationsState := NewStateFromRes(resource) - api, err := c.apiFactory.GetAPI() - if err != nil { - return nil, err - } - - destinations := c.getDestinations(resource, api.GetConfig()) - if len(destinations) == 0 { - return resource.GetAnnotations(), nil - } - - un, err := c.toUnstructured(resource) - if err != nil { - return nil, err - } - - for trigger, destinations := range destinations { - res, err := api.RunTrigger(trigger, un.Object) - if err != nil { - logEntry.Debugf("Failed to execute condition of trigger %s: %v", trigger, err) - eventSequence.addWarning(fmt.Errorf("failed to execute condition of trigger %s: %v", trigger, err)) - } - logEntry.Infof("Trigger %s result: %v", trigger, res) - - for _, cr := range res { - c.metricsRegistry.IncTriggerEvaluationsCounter(trigger, cr.Triggered) - - if !cr.Triggered { - for _, to := range destinations { - notificationsState.SetAlreadyNotified(trigger, cr, to, false) - } - continue - } - - for _, to := range destinations { - if changed := notificationsState.SetAlreadyNotified(trigger, cr, to, true); !changed { - logEntry.Infof("Notification about condition '%s.%s' already sent to '%v'", trigger, cr.Key, to) - eventSequence.addDelivered(NotificationDelivery{ - Trigger: trigger, - Destination: to, - AlreadyNotified: true, - }) - } else { - logEntry.Infof("Sending notification about condition '%s.%s' to '%v'", trigger, cr.Key, to) - if err := api.Send(un.Object, cr.Templates, to); err != nil { - logEntry.Errorf("Failed to notify recipient %s defined in resource %s/%s: %v", - to, resource.GetNamespace(), resource.GetName(), err) - notificationsState.SetAlreadyNotified(trigger, cr, to, false) - c.metricsRegistry.IncDeliveriesCounter(trigger, to.Service, false) - eventSequence.addError(fmt.Errorf("failed to deliver notification %s to %s: %v", trigger, to, err)) - } else { - logEntry.Debugf("Notification %s was sent", to.Recipient) - c.metricsRegistry.IncDeliveriesCounter(trigger, to.Service, true) - eventSequence.addDelivered(NotificationDelivery{ - Trigger: trigger, - Destination: to, - AlreadyNotified: false, - }) - } - } - } - } - } - - return notificationsState.Persist(resource) -} - -func (c *notificationController) processResourceWithAPI(api api.API, apiNamespace string, resource v1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) (map[string]string, error) { - notificationsState := NewStateFromRes(resource) - //api, err := c.apiFactory.GetAPI() - //if err != nil { - // return nil, err - //} destinations := c.getDestinations(resource, api.GetConfig()) if len(destinations) == 0 { @@ -379,43 +308,13 @@ func (c *notificationController) processQueueItem() (processNext bool) { } if c.apiFactoryWithMultipleAPIs == nil { - annotations, err := c.processResource(resource, logEntry, &eventSequence) + api, err := c.apiFactory.GetAPI() if err != nil { logEntry.Errorf("Failed to process: %v", err) eventSequence.addError(err) return } - - if !mapsEqual(resource.GetAnnotations(), annotations) { - annotationsPatch := make(map[string]interface{}) - for k, v := range annotations { - annotationsPatch[k] = v - } - for k := range resource.GetAnnotations() { - if _, ok = annotations[k]; !ok { - annotationsPatch[k] = nil - } - } - - patchData, err := json.Marshal(map[string]map[string]interface{}{ - "metadata": {"annotations": annotationsPatch}, - }) - if err != nil { - logEntry.Errorf("Failed to marshal resource patch: %v", err) - eventSequence.addWarning(fmt.Errorf("failed to marshal annotations patch %v", err)) - return - } - resource, err = c.client.Namespace(resource.GetNamespace()).Patch(context.Background(), resource.GetName(), types.MergePatchType, patchData, v1.PatchOptions{}) - if err != nil { - logEntry.Errorf("Failed to patch resource: %v", err) - eventSequence.addWarning(fmt.Errorf("failed to patch resource annotations %v", err)) - return - } - if err := c.informer.GetStore().Update(resource); err != nil { - logEntry.Warnf("Failed to store update resource in informer: %v", err) - eventSequence.addWarning(fmt.Errorf("failed to store update resource in informer: %v", err)) - } - } + c.processResource(api, resource, logEntry, &eventSequence) } else { apisWithNamespace, err := c.apiFactoryWithMultipleAPIs.GetAPIsWithNamespace(resource.GetNamespace()) if err != nil { @@ -423,51 +322,54 @@ func (c *notificationController) processQueueItem() (processNext bool) { eventSequence.addError(err) return } - for apiNamespace, api := range apisWithNamespace { - annotations, err := c.processResourceWithAPI(api, apiNamespace, resource, logEntry, &eventSequence) - if err != nil { - logEntry.Errorf("Failed to process: %v", err) - eventSequence.addError(err) - return - } + for _, api := range apisWithNamespace { + c.processResource(api, resource, logEntry, &eventSequence) + } + } + logEntry.Info("Processing completed") - if !mapsEqual(resource.GetAnnotations(), annotations) { - annotationsPatch := make(map[string]interface{}) - for k, v := range annotations { - annotationsPatch[k] = v - } - for k := range resource.GetAnnotations() { - if _, ok = annotations[k]; !ok { - annotationsPatch[k] = nil - } - } + return +} - patchData, err := json.Marshal(map[string]map[string]interface{}{ - "metadata": {"annotations": annotationsPatch}, - }) - if err != nil { - logEntry.Errorf("Failed to marshal resource patch: %v", err) - eventSequence.addWarning(fmt.Errorf("failed to marshal annotations patch %v", err)) - return - } - resource, err = c.client.Namespace(resource.GetNamespace()).Patch(context.Background(), resource.GetName(), types.MergePatchType, patchData, v1.PatchOptions{}) - if err != nil { - logEntry.Errorf("Failed to patch resource: %v", err) - eventSequence.addWarning(fmt.Errorf("failed to patch resource annotations %v", err)) - return - } - if err := c.informer.GetStore().Update(resource); err != nil { - logEntry.Warnf("Failed to store update resource in informer: %v", err) - eventSequence.addWarning(fmt.Errorf("failed to store update resource in informer: %v", err)) - } +func (c *notificationController) processResource(api api.API, resource v1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) { + annotations, err := c.processResourceWithAPI(api, resource, logEntry, eventSequence) + if err != nil { + logEntry.Errorf("Failed to process: %v", err) + eventSequence.addError(err) + return + } + + if !mapsEqual(resource.GetAnnotations(), annotations) { + annotationsPatch := make(map[string]interface{}) + for k, v := range annotations { + annotationsPatch[k] = v + } + for k := range resource.GetAnnotations() { + if _, ok := annotations[k]; !ok { + annotationsPatch[k] = nil } } - //end + patchData, err := json.Marshal(map[string]map[string]interface{}{ + "metadata": {"annotations": annotationsPatch}, + }) + if err != nil { + logEntry.Errorf("Failed to marshal resource patch: %v", err) + eventSequence.addWarning(fmt.Errorf("failed to marshal annotations patch %v", err)) + return + } + resource, err = c.client.Namespace(resource.GetNamespace()).Patch(context.Background(), resource.GetName(), types.MergePatchType, patchData, v1.PatchOptions{}) + if err != nil { + logEntry.Errorf("Failed to patch resource: %v", err) + eventSequence.addWarning(fmt.Errorf("failed to patch resource annotations %v", err)) + return + } + if err := c.informer.GetStore().Update(resource); err != nil { + logEntry.Warnf("Failed to store update resource in informer: %v", err) + eventSequence.addWarning(fmt.Errorf("failed to store update resource in informer: %v", err)) + return + } } - logEntry.Info("Processing completed") - - return } func mapsEqual(first, second map[string]string) bool { diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 2128e15d..f122ff6a 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -115,7 +115,7 @@ func TestSendsNotificationIfTriggered(t *testing.T) { return true }), []string{"test"}, services.Destination{Service: "mock", Recipient: "recipient"}).Return(nil) - annotations, err := ctrl.processResource(app, logEntry, &NotificationEventSequence{}) + annotations, err := ctrl.processResourceWithAPI(api, app, logEntry, &NotificationEventSequence{}) if err != nil { logEntry.Errorf("Failed to process: %v", err) } @@ -141,7 +141,7 @@ func TestDoesNotSendNotificationIfAnnotationPresent(t *testing.T) { api.EXPECT().RunTrigger("my-trigger", gomock.Any()).Return([]triggers.ConditionResult{{Triggered: true, Templates: []string{"test"}}}, nil) - _, err = ctrl.processResource(app, logEntry, &NotificationEventSequence{}) + _, err = ctrl.processResourceWithAPI(api, app, logEntry, &NotificationEventSequence{}) if err != nil { logEntry.Errorf("Failed to process: %v", err) } @@ -163,7 +163,7 @@ func TestRemovesAnnotationIfNoTrigger(t *testing.T) { api.EXPECT().RunTrigger("my-trigger", gomock.Any()).Return([]triggers.ConditionResult{{Triggered: false}}, nil) - annotations, err := ctrl.processResource(app, logEntry, &NotificationEventSequence{}) + annotations, err := ctrl.processResourceWithAPI(api, app, logEntry, &NotificationEventSequence{}) if err != nil { logEntry.Errorf("Failed to process: %v", err) } @@ -298,7 +298,7 @@ func TestWithEventCallback(t *testing.T) { description: "EventCallback should be invoked with non-nil error on send failure", sendErr: errors.New("this is a send error"), expectedErrors: []error{ - errors.New("failed to deliver notification my-trigger to {mock recipient}: this is a send error"), + errors.New("failed to deliver notification my-trigger to {mock recipient}: this is a send error using the configuration in namespace "), }, }, { From 6df2afd50dc2d0e44347d1ea4954efbab4c5d48e Mon Sep 17 00:00:00 2001 From: May Zhang <may_zhang@intuit.com> Date: Wed, 17 May 2023 15:06:54 -0700 Subject: [PATCH 16/18] error handleing in factory.go --- pkg/api/factory.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/api/factory.go b/pkg/api/factory.go index f9c07619..16a9d070 100644 --- a/pkg/api/factory.go +++ b/pkg/api/factory.go @@ -1,6 +1,7 @@ package api import ( + log "github.com/sirupsen/logrus" "sync" v1 "k8s.io/api/core/v1" @@ -170,6 +171,8 @@ func (f *apiFactory) GetAPIsWithNamespace(namespace string) (map[string]API, err if err == nil { apis[f.Settings.Namespace] = api f.apiMap[f.Settings.Namespace] = api + } else { + log.Warnf("getApiFromNamespace %s got error %s", f.Settings.Namespace, err) } return apis, nil } @@ -180,6 +183,8 @@ func (f *apiFactory) GetAPIsWithNamespace(namespace string) (map[string]API, err if err == nil { apis[namespace] = api f.apiMap[namespace] = api + } else { + log.Warnf("getApiFromNamespace %s got error %s", namespace, err) } return apis, nil } @@ -190,11 +195,15 @@ func (f *apiFactory) GetAPIsWithNamespace(namespace string) (map[string]API, err if errApiFromNamespace == nil { apis[namespace] = apiFromNamespace f.apiMap[namespace] = apiFromNamespace + } else { + log.Warnf("getApiFromNamespace %s got error %s", namespace, errApiFromNamespace) } if errApiFromSettings == nil { apis[f.Settings.Namespace] = apiFromSettings f.apiMap[f.Settings.Namespace] = apiFromSettings + } else { + log.Warnf("getApiFromNamespace %s got error %s", f.Settings.Namespace, errApiFromSettings) } // Only return error when we received error from both namespace provided in the input paremeter and settings' namespace From 5625a215613f74c573c8f7182746d69e15e53f91 Mon Sep 17 00:00:00 2001 From: May Zhang <may_zhang@intuit.com> Date: Thu, 18 May 2023 10:36:33 -0700 Subject: [PATCH 17/18] better caching --- pkg/api/factory.go | 81 +++++++++++++++++++++++++++++++++--- pkg/controller/controller.go | 2 +- 2 files changed, 77 insertions(+), 6 deletions(-) diff --git a/pkg/api/factory.go b/pkg/api/factory.go index 16a9d070..501f1833 100644 --- a/pkg/api/factory.go +++ b/pkg/api/factory.go @@ -34,7 +34,7 @@ type Factory interface { // api in the namespace specified in input parameter // and api in the namespace specified in the Settings type FactoryWithMultipleAPIs interface { - GetAPIsWithNamespace(namespace string) (map[string]API, error) + GetAPIsWithNamespaceV2(namespace string) (map[string]API, error) } type apiFactory struct { @@ -49,6 +49,13 @@ type apiFactory struct { cmInformer cache.SharedIndexInformer secretsInformer cache.SharedIndexInformer apiMap map[string]API + cacheList []apisCache +} + +type apisCache struct { + api API + namespace string + refresh bool } func NewFactory(settings Settings, namespace string, secretsInformer cache.SharedIndexInformer, cmInformer cache.SharedIndexInformer) *apiFactory { @@ -61,6 +68,7 @@ func NewFactory(settings Settings, namespace string, secretsInformer cache.Share cmInformer: cmInformer, secretsInformer: secretsInformer, apiMap: make(map[string]API), + cacheList: []apisCache{}, } secretsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -92,7 +100,7 @@ func (f *apiFactory) invalidateIfHasName(name string, obj interface{}) { return } if metaObj.GetName() == name { - f.invalidateCache() + f.invalidateCache(metaObj.GetNamespace()) } } @@ -125,12 +133,18 @@ func (f *apiFactory) getConfigMapAndSecret(namespace string) (*v1.ConfigMap, *v1 return f.getConfigMapAndSecretWithListers(cmLister, secretLister) } -func (f *apiFactory) invalidateCache() { +func (f *apiFactory) invalidateCache(namespace string) { f.lock.Lock() defer f.lock.Unlock() f.api = nil - for namespace := range f.apiMap { - f.apiMap[namespace] = nil + + f.apiMap[namespace] = nil + + for _, mycache := range f.cacheList { + if mycache.namespace == namespace { + mycache.refresh = true + mycache.api = nil + } } } @@ -214,6 +228,63 @@ func (f *apiFactory) GetAPIsWithNamespace(namespace string) (map[string]API, err } } +// For self-service notification, we need a map of apis which include api in the namespace and api in the setting's namespace +func (f *apiFactory) GetAPIsWithNamespaceV2(namespace string) (map[string]API, error) { + f.lock.Lock() + defer f.lock.Unlock() + + apis := make(map[string]API) + + // namespaces to look for notification configurations + namespaces := []string{namespace} + if f.Settings.Namespace != "" && f.Settings.Namespace != namespace { + namespaces = append(namespaces, f.Settings.Namespace) + } + + for _, namespace := range namespaces { + //Look up the cacheList + //Exist in cacheList and does not need refresh, then use it + //Exist in cacheList and needs refresh, then retrieve it + //Doesn't exist in cacheList, get it and put in cacheList + foundInCache := false + for _, cache := range f.cacheList { + if cache.namespace == namespace { + foundInCache = true + if !cache.refresh { + //Found in cache, and no need to refresh + if cache.api != nil { + apis[namespace] = cache.api + } + } else { + //Found in cache, and need refresh + api, err := f.getApiFromNamespace(namespace) + if err == nil { + apis[namespace] = api + cache.api = api + cache.refresh = false + } else { + log.Warnf("getApiFromNamespace %s got error %s", namespace, err) + } + } + break + } + } + + if !foundInCache { + api, err := f.getApiFromNamespace(namespace) + if err == nil { + apis[namespace] = api + myCache := apisCache{refresh: false, api: api, namespace: namespace} + f.cacheList = append(f.cacheList, myCache) + } else { + log.Warnf("getApiFromNamespace %s got error %s", namespace, err) + } + } + } + + return apis, nil +} + func (f *apiFactory) getApiFromNamespace(namespace string) (API, error) { cm, secret, err := f.getConfigMapAndSecret(namespace) if err != nil { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index b4a32f52..45347258 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -316,7 +316,7 @@ func (c *notificationController) processQueueItem() (processNext bool) { } c.processResource(api, resource, logEntry, &eventSequence) } else { - apisWithNamespace, err := c.apiFactoryWithMultipleAPIs.GetAPIsWithNamespace(resource.GetNamespace()) + apisWithNamespace, err := c.apiFactoryWithMultipleAPIs.GetAPIsWithNamespaceV2(resource.GetNamespace()) if err != nil { logEntry.Errorf("Failed to process: %v", err) eventSequence.addError(err) From 152986e608a07cdd1e92f25076ca9383f7d77bdb Mon Sep 17 00:00:00 2001 From: May Zhang <may_zhang@intuit.com> Date: Fri, 19 May 2023 13:47:19 -0700 Subject: [PATCH 18/18] better caching --- pkg/mocks/factory.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/mocks/factory.go b/pkg/mocks/factory.go index 136955b1..c4aab2c4 100644 --- a/pkg/mocks/factory.go +++ b/pkg/mocks/factory.go @@ -10,3 +10,9 @@ type FakeFactory struct { func (f *FakeFactory) GetAPI() (api.API, error) { return f.Api, f.Err } + +func (f *FakeFactory) GetAPIsWithNamespaceV2(namespace string) (map[string]api.API, error) { + apiMap := make(map[string]api.API) + apiMap[namespace] = f.Api + return apiMap, f.Err +}