Skip to content

Commit

Permalink
feat: enable self service notification support (#2930)
Browse files Browse the repository at this point in the history
* self service rollouts

Signed-off-by: zachaller <zachaller@users.noreply.github.com>

* cleanup

Signed-off-by: zachaller <zachaller@users.noreply.github.com>

* update function name

Signed-off-by: zachaller <zachaller@users.noreply.github.com>

* continue sending other api's on error

Signed-off-by: zachaller <zachaller@users.noreply.github.com>

* update go mod

Signed-off-by: zachaller <zachaller@users.noreply.github.com>

* fix tests

Signed-off-by: zachaller <zachaller@users.noreply.github.com>

* codegen

Signed-off-by: zachaller <zachaller@users.noreply.github.com>

* codegen

Signed-off-by: zachaller <zachaller@users.noreply.github.com>

* add docs

Signed-off-by: zachaller <zachaller@users.noreply.github.com>

* put self serve notifications behind a flag

Signed-off-by: zachaller <zachaller@users.noreply.github.com>

* put self serve notifications behind a flag

Signed-off-by: zachaller <zachaller@users.noreply.github.com>

* remove un-needed len check

Signed-off-by: zachaller <zachaller@users.noreply.github.com>

* fix up typo and cleanup

Signed-off-by: zachaller <zachaller@users.noreply.github.com>

* cleanup wording

Signed-off-by: zachaller <zachaller@users.noreply.github.com>

---------

Signed-off-by: zachaller <zachaller@users.noreply.github.com>
  • Loading branch information
zachaller authored Aug 8, 2023
1 parent fdb26d1 commit e1c6136
Show file tree
Hide file tree
Showing 16 changed files with 353 additions and 189 deletions.
86 changes: 54 additions & 32 deletions cmd/rollouts-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"time"

"github.com/argoproj/argo-rollouts/utils/record"

"github.com/argoproj/pkg/kubeclientmetrics"
smiclientset "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -44,31 +46,32 @@ const (

func newCommand() *cobra.Command {
var (
clientConfig clientcmd.ClientConfig
rolloutResyncPeriod int64
logLevel string
logFormat string
klogLevel int
metricsPort int
healthzPort int
instanceID string
qps float32
burst int
rolloutThreads int
experimentThreads int
analysisThreads int
serviceThreads int
ingressThreads int
istioVersion string
trafficSplitVersion string
ambassadorVersion string
ingressVersion string
appmeshCRDVersion string
albIngressClasses []string
nginxIngressClasses []string
awsVerifyTargetGroup bool
namespaced bool
printVersion bool
clientConfig clientcmd.ClientConfig
rolloutResyncPeriod int64
logLevel string
logFormat string
klogLevel int
metricsPort int
healthzPort int
instanceID string
qps float32
burst int
rolloutThreads int
experimentThreads int
analysisThreads int
serviceThreads int
ingressThreads int
istioVersion string
trafficSplitVersion string
ambassadorVersion string
ingressVersion string
appmeshCRDVersion string
albIngressClasses []string
nginxIngressClasses []string
awsVerifyTargetGroup bool
namespaced bool
printVersion bool
selfServiceNotificationEnabled bool
)
electOpts := controller.NewLeaderElectionOptions()
var command = cobra.Command{
Expand Down Expand Up @@ -151,12 +154,31 @@ func newCommand() *cobra.Command {
}
istioDynamicInformerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(istioPrimaryDynamicClient, resyncDuration, namespace, nil)

controllerNamespaceInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(
var notificationConfigNamespace string
if selfServiceNotificationEnabled {
notificationConfigNamespace = metav1.NamespaceAll
} else {
notificationConfigNamespace = defaults.Namespace()
}
notificationSecretInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(
kubeClient,
resyncDuration,
kubeinformers.WithNamespace(defaults.Namespace()))
configMapInformer := controllerNamespaceInformerFactory.Core().V1().ConfigMaps()
secretInformer := controllerNamespaceInformerFactory.Core().V1().Secrets()
kubeinformers.WithNamespace(notificationConfigNamespace),
kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.Kind = "Secrete"
options.FieldSelector = fmt.Sprintf("metadata.name=%s", record.NotificationSecret)
}),
)

notificationConfigMapInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(
kubeClient,
resyncDuration,
kubeinformers.WithNamespace(notificationConfigNamespace),
kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.Kind = "ConfigMap"
options.FieldSelector = fmt.Sprintf("metadata.name=%s", record.NotificationConfigMap)
}),
)

mode, err := ingressutil.DetermineIngressMode(ingressVersion, kubeClient.DiscoveryClient)
checkError(err)
Expand All @@ -182,8 +204,8 @@ func newCommand() *cobra.Command {
istioPrimaryDynamicClient,
istioDynamicInformerFactory.ForResource(istioutil.GetIstioVirtualServiceGVR()).Informer(),
istioDynamicInformerFactory.ForResource(istioutil.GetIstioDestinationRuleGVR()).Informer(),
configMapInformer,
secretInformer,
notificationConfigMapInformerFactory,
notificationSecretInformerFactory,
resyncDuration,
instanceID,
metricsPort,
Expand All @@ -196,7 +218,6 @@ func newCommand() *cobra.Command {
istioDynamicInformerFactory,
namespaced,
kubeInformerFactory,
controllerNamespaceInformerFactory,
jobInformerFactory)

if err = cm.Run(ctx, rolloutThreads, serviceThreads, ingressThreads, experimentThreads, analysisThreads, electOpts); err != nil {
Expand Down Expand Up @@ -240,6 +261,7 @@ func newCommand() *cobra.Command {
command.Flags().DurationVar(&electOpts.LeaderElectionLeaseDuration, "leader-election-lease-duration", controller.DefaultLeaderElectionLeaseDuration, "The duration that non-leader candidates will wait after observing a leadership renewal until attempting to acquire leadership of a led but unrenewed leader slot. This is effectively the maximum duration that a leader can be stopped before it is replaced by another candidate. This is only applicable if leader election is enabled.")
command.Flags().DurationVar(&electOpts.LeaderElectionRenewDeadline, "leader-election-renew-deadline", controller.DefaultLeaderElectionRenewDeadline, "The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than or equal to the lease duration. This is only applicable if leader election is enabled.")
command.Flags().DurationVar(&electOpts.LeaderElectionRetryPeriod, "leader-election-retry-period", controller.DefaultLeaderElectionRetryPeriod, "The duration the clients should wait between attempting acquisition and renewal of a leadership. This is only applicable if leader election is enabled.")
command.Flags().BoolVar(&selfServiceNotificationEnabled, "self-service-notification-enabled", false, "Allows rollouts controller to pull notification config from the namespace that the rollout resource is in. This is useful for self-service notification.")
return &command
}

Expand Down
105 changes: 54 additions & 51 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,15 @@ type Manager struct {

namespace string

dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory
clusterDynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory
istioDynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory
namespaced bool
kubeInformerFactory kubeinformers.SharedInformerFactory
controllerNamespaceInformerFactory kubeinformers.SharedInformerFactory
jobInformerFactory kubeinformers.SharedInformerFactory
istioPrimaryDynamicClient dynamic.Interface
dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory
clusterDynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory
istioDynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory
namespaced bool
kubeInformerFactory kubeinformers.SharedInformerFactory
notificationConfigMapInformerFactory kubeinformers.SharedInformerFactory
notificationSecretInformerFactory kubeinformers.SharedInformerFactory
jobInformerFactory kubeinformers.SharedInformerFactory
istioPrimaryDynamicClient dynamic.Interface
}

// NewManager returns a new manager to manage all the controllers
Expand All @@ -184,8 +185,8 @@ func NewManager(
istioPrimaryDynamicClient dynamic.Interface,
istioVirtualServiceInformer cache.SharedIndexInformer,
istioDestinationRuleInformer cache.SharedIndexInformer,
configMapInformer coreinformers.ConfigMapInformer,
secretInformer coreinformers.SecretInformer,
notificationConfigMapInformerFactory kubeinformers.SharedInformerFactory,
notificationSecretInformerFactory kubeinformers.SharedInformerFactory,
resyncPeriod time.Duration,
instanceID string,
metricsPort int,
Expand All @@ -198,10 +199,8 @@ func NewManager(
istioDynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory,
namespaced bool,
kubeInformerFactory kubeinformers.SharedInformerFactory,
controllerNamespaceInformerFactory kubeinformers.SharedInformerFactory,
jobInformerFactory kubeinformers.SharedInformerFactory,
) *Manager {

runtime.Must(rolloutscheme.AddToScheme(scheme.Scheme))
log.Info("Creating event broadcaster")

Expand All @@ -224,9 +223,9 @@ func NewManager(
ingressWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "Ingresses")

refResolver := rollout.NewInformerBasedWorkloadRefResolver(namespace, dynamicclientset, discoveryClient, argoprojclientset, rolloutsInformer.Informer())
apiFactory := notificationapi.NewFactory(record.NewAPIFactorySettings(), defaults.Namespace(), secretInformer.Informer(), configMapInformer.Informer())
apiFactory := notificationapi.NewFactory(record.NewAPIFactorySettings(), defaults.Namespace(), notificationSecretInformerFactory.Core().V1().Secrets().Informer(), notificationConfigMapInformerFactory.Core().V1().ConfigMaps().Informer())
recorder := record.NewEventRecorder(kubeclientset, metrics.MetricRolloutEventsTotal, metrics.MetricNotificationFailedTotal, metrics.MetricNotificationSuccessTotal, metrics.MetricNotificationSend, apiFactory)
notificationsController := notificationcontroller.NewController(dynamicclientset.Resource(v1alpha1.RolloutGVR), rolloutsInformer.Informer(), apiFactory,
notificationsController := notificationcontroller.NewControllerWithNamespaceSupport(dynamicclientset.Resource(v1alpha1.RolloutGVR), rolloutsInformer.Informer(), apiFactory,
notificationcontroller.WithToUnstructured(func(obj metav1.Object) (*unstructured.Unstructured, error) {
data, err := json.Marshal(obj)
if err != nil {
Expand Down Expand Up @@ -320,42 +319,43 @@ func NewManager(
})

cm := &Manager{
wg: &sync.WaitGroup{},
metricsServer: metricsServer,
healthzServer: healthzServer,
rolloutSynced: rolloutsInformer.Informer().HasSynced,
serviceSynced: servicesInformer.Informer().HasSynced,
ingressSynced: ingressWrap.HasSynced,
jobSynced: jobInformer.Informer().HasSynced,
experimentSynced: experimentsInformer.Informer().HasSynced,
analysisRunSynced: analysisRunInformer.Informer().HasSynced,
analysisTemplateSynced: analysisTemplateInformer.Informer().HasSynced,
clusterAnalysisTemplateSynced: clusterAnalysisTemplateInformer.Informer().HasSynced,
replicasSetSynced: replicaSetInformer.Informer().HasSynced,
configMapSynced: configMapInformer.Informer().HasSynced,
secretSynced: secretInformer.Informer().HasSynced,
rolloutWorkqueue: rolloutWorkqueue,
experimentWorkqueue: experimentWorkqueue,
analysisRunWorkqueue: analysisRunWorkqueue,
serviceWorkqueue: serviceWorkqueue,
ingressWorkqueue: ingressWorkqueue,
rolloutController: rolloutController,
serviceController: serviceController,
ingressController: ingressController,
experimentController: experimentController,
analysisController: analysisController,
notificationsController: notificationsController,
refResolver: refResolver,
namespace: namespace,
kubeClientSet: kubeclientset,
dynamicInformerFactory: dynamicInformerFactory,
clusterDynamicInformerFactory: clusterDynamicInformerFactory,
istioDynamicInformerFactory: istioDynamicInformerFactory,
namespaced: namespaced,
kubeInformerFactory: kubeInformerFactory,
controllerNamespaceInformerFactory: controllerNamespaceInformerFactory,
jobInformerFactory: jobInformerFactory,
istioPrimaryDynamicClient: istioPrimaryDynamicClient,
wg: &sync.WaitGroup{},
metricsServer: metricsServer,
healthzServer: healthzServer,
rolloutSynced: rolloutsInformer.Informer().HasSynced,
serviceSynced: servicesInformer.Informer().HasSynced,
ingressSynced: ingressWrap.HasSynced,
jobSynced: jobInformer.Informer().HasSynced,
experimentSynced: experimentsInformer.Informer().HasSynced,
analysisRunSynced: analysisRunInformer.Informer().HasSynced,
analysisTemplateSynced: analysisTemplateInformer.Informer().HasSynced,
clusterAnalysisTemplateSynced: clusterAnalysisTemplateInformer.Informer().HasSynced,
replicasSetSynced: replicaSetInformer.Informer().HasSynced,
configMapSynced: notificationConfigMapInformerFactory.Core().V1().ConfigMaps().Informer().HasSynced,
secretSynced: notificationSecretInformerFactory.Core().V1().Secrets().Informer().HasSynced,
rolloutWorkqueue: rolloutWorkqueue,
experimentWorkqueue: experimentWorkqueue,
analysisRunWorkqueue: analysisRunWorkqueue,
serviceWorkqueue: serviceWorkqueue,
ingressWorkqueue: ingressWorkqueue,
rolloutController: rolloutController,
serviceController: serviceController,
ingressController: ingressController,
experimentController: experimentController,
analysisController: analysisController,
notificationsController: notificationsController,
refResolver: refResolver,
namespace: namespace,
kubeClientSet: kubeclientset,
dynamicInformerFactory: dynamicInformerFactory,
clusterDynamicInformerFactory: clusterDynamicInformerFactory,
istioDynamicInformerFactory: istioDynamicInformerFactory,
namespaced: namespaced,
kubeInformerFactory: kubeInformerFactory,
jobInformerFactory: jobInformerFactory,
istioPrimaryDynamicClient: istioPrimaryDynamicClient,
notificationConfigMapInformerFactory: notificationConfigMapInformerFactory,
notificationSecretInformerFactory: notificationSecretInformerFactory,
}

_, err := rolloutsConfig.InitializeConfig(kubeclientset, defaults.DefaultRolloutsConfigMapName)
Expand Down Expand Up @@ -470,7 +470,10 @@ func (c *Manager) startLeading(ctx context.Context, rolloutThreadiness, serviceT
c.clusterDynamicInformerFactory.Start(ctx.Done())
}
c.kubeInformerFactory.Start(ctx.Done())
c.controllerNamespaceInformerFactory.Start(ctx.Done())

c.notificationConfigMapInformerFactory.Start(ctx.Done())
c.notificationSecretInformerFactory.Start(ctx.Done())

c.jobInformerFactory.Start(ctx.Done())

// Check if Istio installed on cluster before starting dynamicInformerFactory
Expand Down
50 changes: 25 additions & 25 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,27 +67,29 @@ func (f *fixture) newManager(t *testing.T) *Manager {
analysisRunWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "AnalysisRuns")

cm := &Manager{
wg: &sync.WaitGroup{},
healthzServer: NewHealthzServer(fmt.Sprintf(listenAddr, 8080)),
rolloutSynced: alwaysReady,
experimentSynced: alwaysReady,
analysisRunSynced: alwaysReady,
analysisTemplateSynced: alwaysReady,
clusterAnalysisTemplateSynced: alwaysReady,
serviceSynced: alwaysReady,
ingressSynced: alwaysReady,
jobSynced: alwaysReady,
replicasSetSynced: alwaysReady,
configMapSynced: alwaysReady,
secretSynced: alwaysReady,
rolloutWorkqueue: rolloutWorkqueue,
serviceWorkqueue: serviceWorkqueue,
ingressWorkqueue: ingressWorkqueue,
experimentWorkqueue: experimentWorkqueue,
analysisRunWorkqueue: analysisRunWorkqueue,
kubeClientSet: f.kubeclient,
namespace: "",
namespaced: false,
wg: &sync.WaitGroup{},
healthzServer: NewHealthzServer(fmt.Sprintf(listenAddr, 8080)),
rolloutSynced: alwaysReady,
experimentSynced: alwaysReady,
analysisRunSynced: alwaysReady,
analysisTemplateSynced: alwaysReady,
clusterAnalysisTemplateSynced: alwaysReady,
serviceSynced: alwaysReady,
ingressSynced: alwaysReady,
jobSynced: alwaysReady,
replicasSetSynced: alwaysReady,
configMapSynced: alwaysReady,
secretSynced: alwaysReady,
rolloutWorkqueue: rolloutWorkqueue,
serviceWorkqueue: serviceWorkqueue,
ingressWorkqueue: ingressWorkqueue,
experimentWorkqueue: experimentWorkqueue,
analysisRunWorkqueue: analysisRunWorkqueue,
kubeClientSet: f.kubeclient,
namespace: "",
namespaced: false,
notificationSecretInformerFactory: kubeinformers.NewSharedInformerFactoryWithOptions(f.kubeclient, noResyncPeriodFunc()),
notificationConfigMapInformerFactory: kubeinformers.NewSharedInformerFactoryWithOptions(f.kubeclient, noResyncPeriodFunc()),
}

metricsAddr := fmt.Sprintf(listenAddr, 8090)
Expand Down Expand Up @@ -120,7 +122,6 @@ func (f *fixture) newManager(t *testing.T) *Manager {
cm.dynamicInformerFactory = dynamicInformerFactory
cm.clusterDynamicInformerFactory = dynamicInformerFactory
cm.kubeInformerFactory = k8sI
cm.controllerNamespaceInformerFactory = k8sI
cm.jobInformerFactory = k8sI
cm.istioPrimaryDynamicClient = dynamicClient
cm.istioDynamicInformerFactory = dynamicInformerFactory
Expand Down Expand Up @@ -253,8 +254,8 @@ func TestNewManager(t *testing.T) {
dynamicClient,
istioVirtualServiceInformer,
istioDestinationRuleInformer,
k8sI.Core().V1().ConfigMaps(),
k8sI.Core().V1().Secrets(),
k8sI,
k8sI,
noResyncPeriodFunc(),
"test",
8090,
Expand All @@ -268,7 +269,6 @@ func TestNewManager(t *testing.T) {
false,
nil,
nil,
nil,
)

assert.NotNil(t, cm)
Expand Down
Loading

0 comments on commit e1c6136

Please sign in to comment.