Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable version controlling for config map changes #66

Merged
merged 2 commits into from
Jul 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/apis/siddhi/v1alpha2/siddhiprocess_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ func (p *MessagingSystem) Equals(q *MessagingSystem) bool {
return (typeEq && cidEq)
}

// Equals
// Equals function checks the equality of two SiddhiProcess specs
func (p *SiddhiProcessSpec) Equals(q *SiddhiProcessSpec) bool {
if !EqualApps(p.Apps, q.Apps) {
return false
}
}
if p.SiddhiConfig != q.SiddhiConfig {
return false
}
Expand All @@ -83,7 +83,7 @@ func (p *SiddhiProcessSpec) Equals(q *SiddhiProcessSpec) bool {
return true
}

// EqualApps
// EqualApps checks the equality of two app slices
func EqualApps(p []Apps, q []Apps) bool {
if len(p) != len(q) {
return false
Expand All @@ -103,7 +103,7 @@ func EqualApps(p []Apps, q []Apps) bool {
return true
}

// EqualContainers
// EqualContainers checks the equality of two container specs
func EqualContainers(p *corev1.Container, q *corev1.Container) bool {
if p.Image != q.Image {
return false
Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/siddhi/v1alpha2/siddhiprocess_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ type SiddhiProcessSpec struct {
type SiddhiProcessStatus struct {
Status string `json:"status"`
Ready string `json:"ready"`
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
CurrentVersion int64 `json:"currentVersion"`
BuddhiWathsala marked this conversation as resolved.
Show resolved Hide resolved
PreviousVersion int64 `json:"previousVersion"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/siddhiprocess/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (rsp *ReconcileSiddhiProcess) CreateOrUpdateIngress(
sp *siddhiv1alpha2.SiddhiProcess,
siddhiApp SiddhiApp,
configs Configs,
) (err error) {
) (operationResult controllerutil.OperationResult, err error) {

var ingressPaths []extensionsv1beta1.HTTPIngressPath
for _, port := range siddhiApp.ContainerPorts {
Expand Down Expand Up @@ -142,7 +142,7 @@ func (rsp *ReconcileSiddhiProcess) CreateOrUpdateIngress(
},
Spec: ingressSpec,
}
_, err = controllerutil.CreateOrUpdate(
operationResult, err = controllerutil.CreateOrUpdate(
context.TODO(),
rsp.client,
ingress,
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/siddhiprocess/artifacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestCreateOrUpdateIngress(t *testing.T) {
cl := fake.NewFakeClient(objs...)
rsp := &ReconcileSiddhiProcess{client: cl, scheme: s}
configs := getTestConfigs(testSP)
err := rsp.CreateOrUpdateIngress(testSP, testSiddhiApp, configs)
_, err := rsp.CreateOrUpdateIngress(testSP, testSiddhiApp, configs)
if err != nil {
t.Error(err)
}
Expand All @@ -86,7 +86,7 @@ func TestCreateOrUpdateIngress(t *testing.T) {
},
PersistenceEnabled: true,
}
err = rsp.CreateOrUpdateIngress(testSP, sa, configs)
_, err = rsp.CreateOrUpdateIngress(testSP, sa, configs)
if err != nil {
t.Error(err)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/siddhiprocess/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,12 @@ type SiddhiAppConfig struct {
Replicas int32 `json:"replicas"`
}

// ConfigMapListner holds the change details of a config map
type ConfigMapListner struct {
SiddhiProcess string `json:"siddhiProcess"`
Changed bool `json:"changed"`
}

// Status of a Siddhi process
type Status int

Expand Down
53 changes: 33 additions & 20 deletions pkg/controller/siddhiprocess/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (rsp *ReconcileSiddhiProcess) deployApp(
RollingUpdate: &rollingUpdate,
}
}
configMapName := siddhiApp.Name + strconv.Itoa(int(sp.ObjectMeta.Generation))
configMapName := siddhiApp.Name + strconv.Itoa(int(sp.Status.CurrentVersion))
BuddhiWathsala marked this conversation as resolved.
Show resolved Hide resolved
for k, v := range siddhiApp.Apps {
key := k + configs.SiddhiExt
configMapData[key] = v
Expand Down Expand Up @@ -251,10 +251,10 @@ func (rsp *ReconcileSiddhiProcess) createArtifacts(
availableDep := 0
reqLogger := log.WithValues("Request.Namespace", sp.Namespace, "Request.Name", sp.Name)
eventType := controllerutil.OperationResultNone
if sp.Status.ObservedGeneration == 0 {
if sp.Status.CurrentVersion == 0 {
eventType = controllerutil.OperationResultCreated
} else {
if sp.ObjectMeta.Generation > sp.Status.ObservedGeneration {
if sp.Status.CurrentVersion > sp.Status.PreviousVersion {
eventType = controllerutil.OperationResultUpdated
} else {
eventType = controllerutil.OperationResultNone
Expand All @@ -270,19 +270,17 @@ func (rsp *ReconcileSiddhiProcess) createArtifacts(
sp = rsp.updateErrorStatus(sp, ER, ERROR, "AppDeploymentError", err)
continue
}
if (eventType != controllerutil.OperationResultNone) &&
if (eventType == controllerutil.OperationResultCreated) &&
BuddhiWathsala marked this conversation as resolved.
Show resolved Hide resolved
(operationResult == controllerutil.OperationResultCreated) {
availableDep++
sp = rsp.updateRunningStatus(
sp,
ER,
RUNNING,
"DeploymentCreated",
(siddhiApp.Name + " deployment created successfully"),
)
} else if (eventType != controllerutil.OperationResultNone) &&
} else if (eventType == controllerutil.OperationResultUpdated) &&
(operationResult == controllerutil.OperationResultUpdated) {
availableDep++
sp = rsp.updateRunningStatus(
sp,
ER,
Expand All @@ -291,14 +289,14 @@ func (rsp *ReconcileSiddhiProcess) createArtifacts(
(siddhiApp.Name + " deployment updated successfully"),
)
}

availableDep++
if siddhiApp.ServiceEnabled {
operationResult, err = rsp.CreateOrUpdateService(sp, siddhiApp, configs)
if err != nil {
sp = rsp.updateErrorStatus(sp, ER, WARNING, "ServiceCreationError", err)
continue
}
if (eventType != controllerutil.OperationResultNone) &&
if (eventType == controllerutil.OperationResultCreated) &&
(operationResult == controllerutil.OperationResultCreated) {
sp = rsp.updateRunningStatus(
sp,
Expand All @@ -307,7 +305,7 @@ func (rsp *ReconcileSiddhiProcess) createArtifacts(
"ServiceCreated",
(siddhiApp.Name + " service created successfully"),
)
} else if (eventType != controllerutil.OperationResultNone) &&
} else if (eventType == controllerutil.OperationResultUpdated) &&
(operationResult == controllerutil.OperationResultUpdated) {
sp = rsp.updateRunningStatus(
sp,
Expand All @@ -319,20 +317,23 @@ func (rsp *ReconcileSiddhiProcess) createArtifacts(
}

if configs.AutoCreateIngress {
err := rsp.CreateOrUpdateIngress(sp, siddhiApp, configs)
operationResult, err = rsp.CreateOrUpdateIngress(sp, siddhiApp, configs)
if err != nil {
sp = rsp.updateErrorStatus(sp, ER, ERROR, "IngressCreationError", err)
continue
}
if eventType == controllerutil.OperationResultCreated ||
eventType == controllerutil.OperationResultUpdated {
if (eventType == controllerutil.OperationResultCreated) &&
(operationResult == controllerutil.OperationResultCreated) {
reqLogger.Info("Ingress created", "Ingress.Name", configs.HostName)
} else if (eventType == controllerutil.OperationResultUpdated) &&
(operationResult == controllerutil.OperationResultUpdated) {
reqLogger.Info("Ingress changed", "Ingress.Name", configs.HostName)
}
}
}
}
}
sp = rsp.syncGeneration(sp)
sp = rsp.syncVersion(sp)
if (eventType == controllerutil.OperationResultCreated) ||
(eventType == controllerutil.OperationResultUpdated) {
sp = rsp.updateReady(sp, availableDep, needDep)
Expand Down Expand Up @@ -369,7 +370,7 @@ func (rsp *ReconcileSiddhiProcess) populateSiddhiApps(
var siddhiApps []SiddhiApp
var err error
modified := false
if (sp.Status.ObservedGeneration > 0) && (sp.ObjectMeta.Generation > sp.Status.ObservedGeneration) {
if sp.Status.CurrentVersion > sp.Status.PreviousVersion {
modified = true
}
if modified {
Expand Down Expand Up @@ -425,6 +426,11 @@ func (rsp *ReconcileSiddhiProcess) getSiddhiApps(sp *siddhiv1alpha2.SiddhiProces
if err != nil {
return
}
cmListner := ConfigMapListner{
SiddhiProcess: sp.Name,
Changed: false,
}
CMContainer[app.ConfigMap] = cmListner
for _, siddhiFileContent := range configMap.Data {
siddhiApps = append(siddhiApps, siddhiFileContent)
}
Expand All @@ -436,11 +442,18 @@ func (rsp *ReconcileSiddhiProcess) getSiddhiApps(sp *siddhiv1alpha2.SiddhiProces
return
}

// syncGeneration synchronize the siddhi process internal generation number
// this simply assing ObjectMeta.Generation value to the Status.ObservedGeneration and update the sidhhi process
// syncVersion synchronize the siddhi process internal version number
// this simply assing Status.CurrentVersion value to the Status.PreviousVersion and update the siddhi process
// this funtionality used for version controlling inside a siddhi process
func (rsp *ReconcileSiddhiProcess) syncGeneration(sp *siddhiv1alpha2.SiddhiProcess) *siddhiv1alpha2.SiddhiProcess {
sp.Status.ObservedGeneration = sp.ObjectMeta.Generation
func (rsp *ReconcileSiddhiProcess) syncVersion(sp *siddhiv1alpha2.SiddhiProcess) *siddhiv1alpha2.SiddhiProcess {
sp.Status.PreviousVersion = sp.Status.CurrentVersion
_ = rsp.client.Status().Update(context.TODO(), sp)
return sp
}

// upgradeVersion upgrade the siddhi process internal version number
func (rsp *ReconcileSiddhiProcess) upgradeVersion(sp *siddhiv1alpha2.SiddhiProcess) *siddhiv1alpha2.SiddhiProcess {
sp.Status.CurrentVersion++
_ = rsp.client.Status().Update(context.TODO(), sp)
return sp
}
Expand Down Expand Up @@ -513,7 +526,7 @@ func (rsp *ReconcileSiddhiProcess) cleanArtifacts(
}
}

cmName := artifactName + strconv.Itoa(int(sp.Status.ObservedGeneration))
cmName := artifactName + strconv.Itoa(int(sp.Status.PreviousVersion))
cm := &corev1.ConfigMap{}
er = rsp.client.Get(
context.TODO(),
Expand Down
54 changes: 50 additions & 4 deletions pkg/controller/siddhiprocess/siddhiprocess_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ var log = logf.Log.WithName("siddhi")
// SPContainer holds siddhi apps
var SPContainer map[string][]SiddhiApp

// CMContainer holds the config map name along with CM listner for listen changes of the CM
var CMContainer map[string]ConfigMapListner

// ER recoder
var ER record.EventRecorder

Expand All @@ -61,6 +64,7 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler {
// add adds a new Controller to mgr with rsp as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
SPContainer = make(map[string][]SiddhiApp)
CMContainer = make(map[string]ConfigMapListner)
ER = mgr.GetRecorder("siddhiprocess-controller")

// Create a new controller
Expand All @@ -69,7 +73,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

pred := predicate.Funcs{
spPredicate := predicate.Funcs{
DeleteFunc: func(e event.DeleteEvent) bool {
if _, ok := SPContainer[e.Meta.GetName()]; ok {
delete(SPContainer, e.Meta.GetName())
Expand All @@ -80,22 +84,45 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
oldObject := e.ObjectOld.(*siddhiv1alpha2.SiddhiProcess)
newObject := e.ObjectNew.(*siddhiv1alpha2.SiddhiProcess)
if !oldObject.Spec.Equals(&newObject.Spec) {
newObject.Status.CurrentVersion++
return true
}
return false
},
CreateFunc: func(e event.CreateEvent) bool {
object := e.Object.(*siddhiv1alpha2.SiddhiProcess)
object.Status.ObservedGeneration = 0
object.Status.CurrentVersion = 0
object.Status.PreviousVersion = 0
return true
},
GenericFunc: func(e event.GenericEvent) bool {
return false
},
}

cmPredicate := predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
if _, ok := CMContainer[e.MetaNew.GetName()]; ok {
cmListner := CMContainer[e.MetaNew.GetName()]
cmListner.Changed = true
CMContainer[e.MetaNew.GetName()] = cmListner
return true
}
return false
},
CreateFunc: func(e event.CreateEvent) bool {
return false
},
}

// Watch for changes to primary resource SiddhiProcess
err = c.Watch(&source.Kind{Type: &siddhiv1alpha2.SiddhiProcess{}}, &handler.EnqueueRequestForObject{}, pred)
err = c.Watch(&source.Kind{Type: &siddhiv1alpha2.SiddhiProcess{}}, &handler.EnqueueRequestForObject{}, spPredicate)
if err != nil {
return err
}

// Watch for changes to secondary resource Config Map and requeue the owner SiddhiProcess
err = c.Watch(&source.Kind{Type: &corev1.ConfigMap{}}, &handler.EnqueueRequestForObject{}, cmPredicate)
if err != nil {
return err
}
Expand Down Expand Up @@ -144,13 +171,27 @@ type ReconcileSiddhiProcess struct {
// and what is in the SiddhiProcess.Spec
func (rsp *ReconcileSiddhiProcess) Reconcile(request reconcile.Request) (reconcile.Result, error) {
sp := &siddhiv1alpha2.SiddhiProcess{}
err := rsp.client.Get(context.TODO(), request.NamespacedName, sp)
cm := &corev1.ConfigMap{}
siddhiProcessName := request.NamespacedName
SiddhiProcessChanged := true
err := rsp.client.Get(context.TODO(), request.NamespacedName, cm)
if err == nil {
cmListner := CMContainer[request.NamespacedName.Name]
siddhiProcessName.Name = cmListner.SiddhiProcess
SiddhiProcessChanged = false
}

err = rsp.client.Get(context.TODO(), siddhiProcessName, sp)
if err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
if !SiddhiProcessChanged {
sp = rsp.upgradeVersion(sp)
}

configs := rsp.Configurations(sp)
sp, siddhiApps, err := rsp.populateSiddhiApps(sp, configs)
if err != nil {
Expand All @@ -166,5 +207,10 @@ func (rsp *ReconcileSiddhiProcess) Reconcile(request reconcile.Request) (reconci

sp = rsp.createArtifacts(sp, siddhiApps, configs)
sp = rsp.checkDeployments(sp, siddhiApps)
if !SiddhiProcessChanged {
cmListner := CMContainer[request.NamespacedName.Name]
cmListner.Changed = false
CMContainer[request.NamespacedName.Name] = cmListner
}
return reconcile.Result{Requeue: false}, nil
}