diff --git a/pkg/apis/siddhi/v1alpha2/siddhiprocess_functions.go b/pkg/apis/siddhi/v1alpha2/siddhiprocess_functions.go index 15bc4fb..bcc4516 100644 --- a/pkg/apis/siddhi/v1alpha2/siddhiprocess_functions.go +++ b/pkg/apis/siddhi/v1alpha2/siddhiprocess_functions.go @@ -60,11 +60,11 @@ func (p *MessagingSystem) Equals(q *MessagingSystem) bool { return (typeEq && cidEq) } -// Equals +// Equals function checks the equality of two SiddhiProcess specs func (p *SiddhiProcessSpec) Equals(q *SiddhiProcessSpec) bool { if !EqualApps(p.Apps, q.Apps) { return false - } + } if p.SiddhiConfig != q.SiddhiConfig { return false } @@ -83,7 +83,7 @@ func (p *SiddhiProcessSpec) Equals(q *SiddhiProcessSpec) bool { return true } -// EqualApps +// EqualApps checks the equality of two app slices func EqualApps(p []Apps, q []Apps) bool { if len(p) != len(q) { return false @@ -103,7 +103,7 @@ func EqualApps(p []Apps, q []Apps) bool { return true } -// EqualContainers +// EqualContainers checks the equality of two container specs func EqualContainers(p *corev1.Container, q *corev1.Container) bool { if p.Image != q.Image { return false diff --git a/pkg/apis/siddhi/v1alpha2/siddhiprocess_types.go b/pkg/apis/siddhi/v1alpha2/siddhiprocess_types.go index 934f43f..a1f5b18 100644 --- a/pkg/apis/siddhi/v1alpha2/siddhiprocess_types.go +++ b/pkg/apis/siddhi/v1alpha2/siddhiprocess_types.go @@ -87,7 +87,8 @@ type SiddhiProcessSpec struct { type SiddhiProcessStatus struct { Status string `json:"status"` Ready string `json:"ready"` - ObservedGeneration int64 `json:"observedGeneration,omitempty"` + CurrentVersion int64 `json:"currentVersion"` + PreviousVersion int64 `json:"previousVersion"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/controller/siddhiprocess/artifacts.go b/pkg/controller/siddhiprocess/artifacts.go index f457edb..5c32ce8 100644 --- a/pkg/controller/siddhiprocess/artifacts.go +++ b/pkg/controller/siddhiprocess/artifacts.go @@ -75,7 +75,7 @@ func (rsp *ReconcileSiddhiProcess) CreateOrUpdateIngress( sp *siddhiv1alpha2.SiddhiProcess, siddhiApp SiddhiApp, configs Configs, -) (err error) { +) (operationResult controllerutil.OperationResult, err error) { var ingressPaths []extensionsv1beta1.HTTPIngressPath for _, port := range siddhiApp.ContainerPorts { @@ -142,7 +142,7 @@ func (rsp *ReconcileSiddhiProcess) CreateOrUpdateIngress( }, Spec: ingressSpec, } - _, err = controllerutil.CreateOrUpdate( + operationResult, err = controllerutil.CreateOrUpdate( context.TODO(), rsp.client, ingress, diff --git a/pkg/controller/siddhiprocess/artifacts_test.go b/pkg/controller/siddhiprocess/artifacts_test.go index 5c5d868..53d2ef7 100644 --- a/pkg/controller/siddhiprocess/artifacts_test.go +++ b/pkg/controller/siddhiprocess/artifacts_test.go @@ -64,7 +64,7 @@ func TestCreateOrUpdateIngress(t *testing.T) { cl := fake.NewFakeClient(objs...) rsp := &ReconcileSiddhiProcess{client: cl, scheme: s} configs := getTestConfigs(testSP) - err := rsp.CreateOrUpdateIngress(testSP, testSiddhiApp, configs) + _, err := rsp.CreateOrUpdateIngress(testSP, testSiddhiApp, configs) if err != nil { t.Error(err) } @@ -86,7 +86,7 @@ func TestCreateOrUpdateIngress(t *testing.T) { }, PersistenceEnabled: true, } - err = rsp.CreateOrUpdateIngress(testSP, sa, configs) + _, err = rsp.CreateOrUpdateIngress(testSP, sa, configs) if err != nil { t.Error(err) } diff --git a/pkg/controller/siddhiprocess/config.go b/pkg/controller/siddhiprocess/config.go index 499283d..e974395 100644 --- a/pkg/controller/siddhiprocess/config.go +++ b/pkg/controller/siddhiprocess/config.go @@ -262,6 +262,12 @@ type SiddhiAppConfig struct { Replicas int32 `json:"replicas"` } +// ConfigMapListner holds the change details of a config map +type ConfigMapListner struct { + SiddhiProcess string `json:"siddhiProcess"` + Changed bool `json:"changed"` +} + // Status of a Siddhi process type Status int diff --git a/pkg/controller/siddhiprocess/operator.go b/pkg/controller/siddhiprocess/operator.go index 1e64472..208c7f2 100644 --- a/pkg/controller/siddhiprocess/operator.go +++ b/pkg/controller/siddhiprocess/operator.go @@ -124,7 +124,7 @@ func (rsp *ReconcileSiddhiProcess) deployApp( RollingUpdate: &rollingUpdate, } } - configMapName := siddhiApp.Name + strconv.Itoa(int(sp.ObjectMeta.Generation)) + configMapName := siddhiApp.Name + strconv.Itoa(int(sp.Status.CurrentVersion)) for k, v := range siddhiApp.Apps { key := k + configs.SiddhiExt configMapData[key] = v @@ -251,10 +251,10 @@ func (rsp *ReconcileSiddhiProcess) createArtifacts( availableDep := 0 reqLogger := log.WithValues("Request.Namespace", sp.Namespace, "Request.Name", sp.Name) eventType := controllerutil.OperationResultNone - if sp.Status.ObservedGeneration == 0 { + if sp.Status.CurrentVersion == 0 { eventType = controllerutil.OperationResultCreated } else { - if sp.ObjectMeta.Generation > sp.Status.ObservedGeneration { + if sp.Status.CurrentVersion > sp.Status.PreviousVersion { eventType = controllerutil.OperationResultUpdated } else { eventType = controllerutil.OperationResultNone @@ -270,9 +270,8 @@ func (rsp *ReconcileSiddhiProcess) createArtifacts( sp = rsp.updateErrorStatus(sp, ER, ERROR, "AppDeploymentError", err) continue } - if (eventType != controllerutil.OperationResultNone) && + if (eventType == controllerutil.OperationResultCreated) && (operationResult == controllerutil.OperationResultCreated) { - availableDep++ sp = rsp.updateRunningStatus( sp, ER, @@ -280,9 +279,8 @@ func (rsp *ReconcileSiddhiProcess) createArtifacts( "DeploymentCreated", (siddhiApp.Name + " deployment created successfully"), ) - } else if (eventType != controllerutil.OperationResultNone) && + } else if (eventType == controllerutil.OperationResultUpdated) && (operationResult == controllerutil.OperationResultUpdated) { - availableDep++ sp = rsp.updateRunningStatus( sp, ER, @@ -291,14 +289,14 @@ func (rsp *ReconcileSiddhiProcess) createArtifacts( (siddhiApp.Name + " deployment updated successfully"), ) } - + availableDep++ if siddhiApp.ServiceEnabled { operationResult, err = rsp.CreateOrUpdateService(sp, siddhiApp, configs) if err != nil { sp = rsp.updateErrorStatus(sp, ER, WARNING, "ServiceCreationError", err) continue } - if (eventType != controllerutil.OperationResultNone) && + if (eventType == controllerutil.OperationResultCreated) && (operationResult == controllerutil.OperationResultCreated) { sp = rsp.updateRunningStatus( sp, @@ -307,7 +305,7 @@ func (rsp *ReconcileSiddhiProcess) createArtifacts( "ServiceCreated", (siddhiApp.Name + " service created successfully"), ) - } else if (eventType != controllerutil.OperationResultNone) && + } else if (eventType == controllerutil.OperationResultUpdated) && (operationResult == controllerutil.OperationResultUpdated) { sp = rsp.updateRunningStatus( sp, @@ -319,20 +317,23 @@ func (rsp *ReconcileSiddhiProcess) createArtifacts( } if configs.AutoCreateIngress { - err := rsp.CreateOrUpdateIngress(sp, siddhiApp, configs) + operationResult, err = rsp.CreateOrUpdateIngress(sp, siddhiApp, configs) if err != nil { sp = rsp.updateErrorStatus(sp, ER, ERROR, "IngressCreationError", err) continue } - if eventType == controllerutil.OperationResultCreated || - eventType == controllerutil.OperationResultUpdated { + if (eventType == controllerutil.OperationResultCreated) && + (operationResult == controllerutil.OperationResultCreated) { + reqLogger.Info("Ingress created", "Ingress.Name", configs.HostName) + } else if (eventType == controllerutil.OperationResultUpdated) && + (operationResult == controllerutil.OperationResultUpdated) { reqLogger.Info("Ingress changed", "Ingress.Name", configs.HostName) } } } } } - sp = rsp.syncGeneration(sp) + sp = rsp.syncVersion(sp) if (eventType == controllerutil.OperationResultCreated) || (eventType == controllerutil.OperationResultUpdated) { sp = rsp.updateReady(sp, availableDep, needDep) @@ -369,7 +370,7 @@ func (rsp *ReconcileSiddhiProcess) populateSiddhiApps( var siddhiApps []SiddhiApp var err error modified := false - if (sp.Status.ObservedGeneration > 0) && (sp.ObjectMeta.Generation > sp.Status.ObservedGeneration) { + if sp.Status.CurrentVersion > sp.Status.PreviousVersion { modified = true } if modified { @@ -425,6 +426,11 @@ func (rsp *ReconcileSiddhiProcess) getSiddhiApps(sp *siddhiv1alpha2.SiddhiProces if err != nil { return } + cmListner := ConfigMapListner{ + SiddhiProcess: sp.Name, + Changed: false, + } + CMContainer[app.ConfigMap] = cmListner for _, siddhiFileContent := range configMap.Data { siddhiApps = append(siddhiApps, siddhiFileContent) } @@ -436,11 +442,18 @@ func (rsp *ReconcileSiddhiProcess) getSiddhiApps(sp *siddhiv1alpha2.SiddhiProces return } -// syncGeneration synchronize the siddhi process internal generation number -// this simply assing ObjectMeta.Generation value to the Status.ObservedGeneration and update the sidhhi process +// syncVersion synchronize the siddhi process internal version number +// this simply assing Status.CurrentVersion value to the Status.PreviousVersion and update the siddhi process // this funtionality used for version controlling inside a siddhi process -func (rsp *ReconcileSiddhiProcess) syncGeneration(sp *siddhiv1alpha2.SiddhiProcess) *siddhiv1alpha2.SiddhiProcess { - sp.Status.ObservedGeneration = sp.ObjectMeta.Generation +func (rsp *ReconcileSiddhiProcess) syncVersion(sp *siddhiv1alpha2.SiddhiProcess) *siddhiv1alpha2.SiddhiProcess { + sp.Status.PreviousVersion = sp.Status.CurrentVersion + _ = rsp.client.Status().Update(context.TODO(), sp) + return sp +} + +// upgradeVersion upgrade the siddhi process internal version number +func (rsp *ReconcileSiddhiProcess) upgradeVersion(sp *siddhiv1alpha2.SiddhiProcess) *siddhiv1alpha2.SiddhiProcess { + sp.Status.CurrentVersion++ _ = rsp.client.Status().Update(context.TODO(), sp) return sp } @@ -513,7 +526,7 @@ func (rsp *ReconcileSiddhiProcess) cleanArtifacts( } } - cmName := artifactName + strconv.Itoa(int(sp.Status.ObservedGeneration)) + cmName := artifactName + strconv.Itoa(int(sp.Status.PreviousVersion)) cm := &corev1.ConfigMap{} er = rsp.client.Get( context.TODO(), diff --git a/pkg/controller/siddhiprocess/siddhiprocess_controller.go b/pkg/controller/siddhiprocess/siddhiprocess_controller.go index 3233040..9b8fcc2 100644 --- a/pkg/controller/siddhiprocess/siddhiprocess_controller.go +++ b/pkg/controller/siddhiprocess/siddhiprocess_controller.go @@ -44,6 +44,9 @@ var log = logf.Log.WithName("siddhi") // SPContainer holds siddhi apps var SPContainer map[string][]SiddhiApp +// CMContainer holds the config map name along with CM listner for listen changes of the CM +var CMContainer map[string]ConfigMapListner + // ER recoder var ER record.EventRecorder @@ -61,6 +64,7 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler { // add adds a new Controller to mgr with rsp as the reconcile.Reconciler func add(mgr manager.Manager, r reconcile.Reconciler) error { SPContainer = make(map[string][]SiddhiApp) + CMContainer = make(map[string]ConfigMapListner) ER = mgr.GetRecorder("siddhiprocess-controller") // Create a new controller @@ -69,7 +73,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { return err } - pred := predicate.Funcs{ + spPredicate := predicate.Funcs{ DeleteFunc: func(e event.DeleteEvent) bool { if _, ok := SPContainer[e.Meta.GetName()]; ok { delete(SPContainer, e.Meta.GetName()) @@ -80,13 +84,15 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { oldObject := e.ObjectOld.(*siddhiv1alpha2.SiddhiProcess) newObject := e.ObjectNew.(*siddhiv1alpha2.SiddhiProcess) if !oldObject.Spec.Equals(&newObject.Spec) { + newObject.Status.CurrentVersion++ return true } return false }, CreateFunc: func(e event.CreateEvent) bool { object := e.Object.(*siddhiv1alpha2.SiddhiProcess) - object.Status.ObservedGeneration = 0 + object.Status.CurrentVersion = 0 + object.Status.PreviousVersion = 0 return true }, GenericFunc: func(e event.GenericEvent) bool { @@ -94,8 +100,29 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { }, } + cmPredicate := predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + if _, ok := CMContainer[e.MetaNew.GetName()]; ok { + cmListner := CMContainer[e.MetaNew.GetName()] + cmListner.Changed = true + CMContainer[e.MetaNew.GetName()] = cmListner + return true + } + return false + }, + CreateFunc: func(e event.CreateEvent) bool { + return false + }, + } + // Watch for changes to primary resource SiddhiProcess - err = c.Watch(&source.Kind{Type: &siddhiv1alpha2.SiddhiProcess{}}, &handler.EnqueueRequestForObject{}, pred) + err = c.Watch(&source.Kind{Type: &siddhiv1alpha2.SiddhiProcess{}}, &handler.EnqueueRequestForObject{}, spPredicate) + if err != nil { + return err + } + + // Watch for changes to secondary resource Config Map and requeue the owner SiddhiProcess + err = c.Watch(&source.Kind{Type: &corev1.ConfigMap{}}, &handler.EnqueueRequestForObject{}, cmPredicate) if err != nil { return err } @@ -144,13 +171,27 @@ type ReconcileSiddhiProcess struct { // and what is in the SiddhiProcess.Spec func (rsp *ReconcileSiddhiProcess) Reconcile(request reconcile.Request) (reconcile.Result, error) { sp := &siddhiv1alpha2.SiddhiProcess{} - err := rsp.client.Get(context.TODO(), request.NamespacedName, sp) + cm := &corev1.ConfigMap{} + siddhiProcessName := request.NamespacedName + SiddhiProcessChanged := true + err := rsp.client.Get(context.TODO(), request.NamespacedName, cm) + if err == nil { + cmListner := CMContainer[request.NamespacedName.Name] + siddhiProcessName.Name = cmListner.SiddhiProcess + SiddhiProcessChanged = false + } + + err = rsp.client.Get(context.TODO(), siddhiProcessName, sp) if err != nil { if errors.IsNotFound(err) { return reconcile.Result{}, nil } return reconcile.Result{}, err } + if !SiddhiProcessChanged { + sp = rsp.upgradeVersion(sp) + } + configs := rsp.Configurations(sp) sp, siddhiApps, err := rsp.populateSiddhiApps(sp, configs) if err != nil { @@ -166,5 +207,10 @@ func (rsp *ReconcileSiddhiProcess) Reconcile(request reconcile.Request) (reconci sp = rsp.createArtifacts(sp, siddhiApps, configs) sp = rsp.checkDeployments(sp, siddhiApps) + if !SiddhiProcessChanged { + cmListner := CMContainer[request.NamespacedName.Name] + cmListner.Changed = false + CMContainer[request.NamespacedName.Name] = cmListner + } return reconcile.Result{Requeue: false}, nil }