diff --git a/cmd/habitat-operator/main.go b/cmd/habitat-operator/main.go index 7efaabb3..c516978b 100644 --- a/cmd/habitat-operator/main.go +++ b/cmd/habitat-operator/main.go @@ -24,7 +24,6 @@ import ( "github.com/go-kit/kit/log/level" flag "github.com/spf13/pflag" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" - apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" @@ -66,15 +65,10 @@ func run() int { return 1 } - // Create Habitat CRD. - _, crdErr := habitatclient.CreateCRD(apiextensionsclientset) - if crdErr != nil { - if !apierrors.IsAlreadyExists(crdErr) { - level.Error(logger).Log("msg", crdErr) - return 1 - } - - level.Info(logger).Log("msg", "Habitat CRD already exists, continuing") + // Create Habitat CRDs. + if crdErr := habitatclient.CreateCRDs(apiextensionsclientset, log.With(logger, "coponent", "crd")); crdErr != nil { + level.Error(logger).Log("msg", crdErr) + return 1 } else { level.Info(logger).Log("msg", "created Habitat CRD") } diff --git a/examples/standalone/habitat-promote.yml b/examples/standalone/habitat-promote.yml new file mode 100644 index 00000000..824809c4 --- /dev/null +++ b/examples/standalone/habitat-promote.yml @@ -0,0 +1,9 @@ +apiVersion: habitat.sh/v1 +kind: HabitatPromote +metadata: + name: example-standalone-habitat +spec: + habitatName: example-standalone-habitat + oldChannel: staging + newChannel: production + replace: false diff --git a/examples/standalone/habitat.yml b/examples/standalone/habitat.yml index 58956d32..ea1394f4 100644 --- a/examples/standalone/habitat.yml +++ b/examples/standalone/habitat.yml @@ -1,11 +1,14 @@ apiVersion: habitat.sh/v1 kind: Habitat metadata: - name: example-standalone-habitat + name: example-standalone-habitat-qqg4x + labels: + habitat-name: example-standalone-habitat spec: # the core/nginx habitat service packaged as a Docker image image: kinvolk/nginx-hab count: 1 + channel: staging service: topology: standalone # if not present, defaults to "default" diff --git a/pkg/habitat/apis/cr/v1/register.go b/pkg/habitat/apis/cr/v1/register.go index 6f95a630..04d4911e 100644 --- a/pkg/habitat/apis/cr/v1/register.go +++ b/pkg/habitat/apis/cr/v1/register.go @@ -41,6 +41,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { scheme.AddKnownTypes(SchemeGroupVersion, &Habitat{}, &HabitatList{}, + &HabitatPromote{}, + &HabitatPromoteList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) diff --git a/pkg/habitat/apis/cr/v1/types.go b/pkg/habitat/apis/cr/v1/types.go index 751dc0b1..f6986636 100644 --- a/pkg/habitat/apis/cr/v1/types.go +++ b/pkg/habitat/apis/cr/v1/types.go @@ -29,6 +29,12 @@ const ( HabitatNameLabel = "habitat-name" TopologyLabel = "topology" + + // HabitatChannelLabel contains the information about stability of application. + // Example: 'channel: production' + HabitatChannelLabel = "channel" + + HabitatPromoteResourcePlural = "habitatpromotes" ) type Habitat struct { @@ -44,6 +50,9 @@ type HabitatSpec struct { // Image is the Docker image of the Habitat Service. Image string `json:"image"` Service Service `json:"service"` + // Channel is the information about stability of the application, expressed as a label in Kubernetes. + // Optional. + Channel string `json:"channel,omitempty"` } type HabitatStatus struct { @@ -94,8 +103,35 @@ const ( TopologyLeader Topology = "leader" ) +type HabitatPromote struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata"` + Spec HabitatPromoteSpec `json:"spec"` + Status HabitatPromoteStatus `json:"status,omitempty"` +} + +type HabitatPromoteSpec struct { + HabitatName string `json:"habitatName"` + OldChannel string `json:"oldChannel"` + NewChannel string `json:"newChannel"` + Replace bool `json:"replace"` +} + +type HabitatPromoteStatus struct { + State HabitatPromoteState `json:"state,omitempty"` + Message string `json:"message,omitempty"` +} + +type HabitatPromoteState string + type HabitatList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata"` Items []Habitat `json:"items"` } + +type HabitatPromoteList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` + Items []HabitatPromote `json:"items"` +} diff --git a/pkg/habitat/client/cr.go b/pkg/habitat/client/cr.go index e8dcabcb..2ea80284 100644 --- a/pkg/habitat/client/cr.go +++ b/pkg/habitat/client/cr.go @@ -15,13 +15,17 @@ package client import ( + "fmt" "reflect" "time" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" crv1 "github.com/kinvolk/habitat-operator/pkg/habitat/apis/cr/v1" apiv1 "k8s.io/api/core/v1" apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/wait" @@ -29,40 +33,46 @@ import ( ) const ( - habitatCRDName = crv1.HabitatResourcePlural + "." + crv1.GroupName - habitatResourceShortName = "hab" + habitatCRDName = crv1.HabitatResourcePlural + "." + crv1.GroupName + habitatPromoteCRDName = crv1.HabitatPromoteResourcePlural + "." + crv1.GroupName + habitatResourceShortName = "hab" + habitatPromoteResourceShortName = "habprom" pollInterval = 500 * time.Millisecond timeOut = 10 * time.Second ) -// CreateCRD creates the Habitat Custom Resource Definition. +// createCRD creates the Custom Resource Definition with the given name. // It checks if creation has completed successfully, and deletes the CRD in case of error. -func CreateCRD(clientset apiextensionsclient.Interface) (*apiextensionsv1beta1.CustomResourceDefinition, error) { +func createCRD(clientset apiextensionsclient.Interface, logger log.Logger, crdName, plural, shortName string, kind interface{}) error { crd := &apiextensionsv1beta1.CustomResourceDefinition{ ObjectMeta: metav1.ObjectMeta{ - Name: habitatCRDName, + Name: crdName, }, Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{ Group: crv1.GroupName, Version: crv1.SchemeGroupVersion.Version, Scope: apiextensionsv1beta1.NamespaceScoped, Names: apiextensionsv1beta1.CustomResourceDefinitionNames{ - Plural: crv1.HabitatResourcePlural, - Kind: reflect.TypeOf(crv1.Habitat{}).Name(), - ShortNames: []string{habitatResourceShortName}, + Plural: plural, + Kind: reflect.TypeOf(kind).Name(), + ShortNames: []string{shortName}, }, }, } - _, err := clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd) - if err != nil { - return nil, err + if _, err := clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd); err != nil { + if !apierrors.IsAlreadyExists(err) { + return err + } + level.Info(logger).Log("msg", fmt.Sprintf("%s CRD already exists, continuing", crdName)) } // wait for CRD being established. - err = wait.Poll(pollInterval, timeOut, func() (bool, error) { - crd, err = clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Get(habitatCRDName, metav1.GetOptions{}) + if err := wait.Poll(pollInterval, timeOut, func() (bool, error) { + var err error + + crd, err = clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Get(crdName, metav1.GetOptions{}) if err != nil { return false, err @@ -76,26 +86,37 @@ func CreateCRD(clientset apiextensionsclient.Interface) (*apiextensionsv1beta1.C } case apiextensionsv1beta1.NamesAccepted: if cond.Status == apiextensionsv1beta1.ConditionFalse { - // TODO re-introduce logging? - // fmt.Printf("Error: Name conflict: %v\n", cond.Reason) + level.Error(logger).Log("msg", fmt.Sprintf("Error: Name conflict: %v\n", cond.Reason)) } } } return false, err - }) - - // delete CRD if there was an error. - if err != nil { - deleteErr := clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Delete(habitatCRDName, nil) + }); err != nil { + // delete CRD if there was an error. + deleteErr := clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Delete(crdName, nil) if deleteErr != nil { - return nil, errors.NewAggregate([]error{err, deleteErr}) + return errors.NewAggregate([]error{err, deleteErr}) } - return nil, err + return err + } + + return nil +} + +func CreateCRDs(clientset apiextensionsclient.Interface, logger log.Logger) error { + // Create Habitat CRD. + if err := createCRD(clientset, logger, habitatCRDName, crv1.HabitatResourcePlural, habitatResourceShortName, crv1.Habitat{}); err != nil { + return err + } + + // Create HabitatPromote CRD. + if err := createCRD(clientset, logger, habitatPromoteCRDName, crv1.HabitatPromoteResourcePlural, habitatPromoteResourceShortName, crv1.HabitatPromote{}); err != nil { + return err } - return crd, nil + return nil } // WaitForHabitatInstanceProcessed polls the API for a specific Habitat with a state of "Processed". diff --git a/pkg/habitat/controller/controller.go b/pkg/habitat/controller/controller.go index 08f63ad4..15380342 100644 --- a/pkg/habitat/controller/controller.go +++ b/pkg/habitat/controller/controller.go @@ -56,6 +56,9 @@ const ( // Keys are saved to disk with the format `-.`. // This regexp captures the name part. ringKeyRegexp = `^([\w_-]+)-\d{14}$` + + // The default channel for habitat applications, if not specified. + defaultHabChannel = "stable" ) var ringRegexp *regexp.Regexp = regexp.MustCompile(ringKeyRegexp) @@ -67,11 +70,13 @@ type HabitatController struct { // queue contains the jobs that will be handled by syncHabitat. // A workqueue.RateLimitingInterface is a queue where failing jobs are re-enqueued with an exponential // delay, so that jobs in a crashing loop don't fill the queue. - queue workqueue.RateLimitingInterface + habitatQueue workqueue.RateLimitingInterface + habitatPromoteQueue workqueue.RateLimitingInterface - habInformer cache.SharedIndexInformer - deployInformer cache.SharedIndexInformer - cMInformer cache.SharedIndexInformer + habInformer cache.SharedIndexInformer + habPromoteInformer cache.SharedIndexInformer + deployInformer cache.SharedIndexInformer + cMInformer cache.SharedIndexInformer } type Config struct { @@ -95,9 +100,10 @@ func New(config Config, logger log.Logger) (*HabitatController, error) { } hc := &HabitatController{ - config: config, - logger: logger, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "habitat"), + config: config, + logger: logger, + habitatQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "habitat"), + habitatPromoteQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "habitatPromote"), } return hc, nil @@ -108,16 +114,19 @@ func (hc *HabitatController) Run(ctx context.Context) error { level.Info(hc.logger).Log("msg", "Watching Habitat objects") hc.cacheHab() + hc.cacheHabPromote() hc.cacheDeployment() hc.cacheConfigMap() hc.watchPods(ctx) go hc.habInformer.Run(ctx.Done()) + go hc.habPromoteInformer.Run(ctx.Done()) go hc.deployInformer.Run(ctx.Done()) go hc.cMInformer.Run(ctx.Done()) // Start the synchronous queue consumer. - go hc.worker() + go hc.habitatWorker() + go hc.habitatPromoteWorker() // This channel is closed when the context is canceled or times out. <-ctx.Done() @@ -149,6 +158,29 @@ func (hc *HabitatController) cacheHab() { }) } +func (hc *HabitatController) cacheHabPromote() { + source := cache.NewListWatchFromClient( + hc.config.HabitatClient, + crv1.HabitatPromoteResourcePlural, + apiv1.NamespaceAll, + fields.Everything()) + + hc.habPromoteInformer = cache.NewSharedIndexInformer( + source, + + // The object type. + &crv1.HabitatPromote{}, + resyncPeriod, + cache.Indexers{}, + ) + + hc.habPromoteInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: hc.handleHabPromoteAdd, + UpdateFunc: hc.handleHabPromoteUpdate, + DeleteFunc: hc.handleHabPromoteDelete, + }) +} + func (hc *HabitatController) cacheDeployment() { source := cache.NewListWatchFromClient( hc.config.KubernetesClientset.AppsV1beta1().RESTClient(), @@ -227,7 +259,7 @@ func (hc *HabitatController) handleHabAdd(obj interface{}) { return } - hc.enqueue(h) + hc.enqueueHabitat(h) } func (hc *HabitatController) handleHabUpdate(oldObj, newObj interface{}) { @@ -243,8 +275,19 @@ func (hc *HabitatController) handleHabUpdate(oldObj, newObj interface{}) { return } - if hc.habitatNeedsUpdate(oldHab, newHab) { - hc.enqueue(newHab) + if hc.habitatIsPromoted(oldHab, newHab) { + // Remove old deployment + deploymentName := hc.deploymentName(oldHab.Name, oldHab.Spec.Channel) + if err := hc.config.KubernetesClientset.AppsV1beta1Client.Deployments(apiv1.NamespaceDefault).Delete(deploymentName, &metav1.DeleteOptions{}); err != nil { + level.Error(hc.logger).Log("msg", "Failed to delete deployment", "name", deploymentName) + return + } + level.Info(hc.logger).Log("msg", "created deployment", "name", deploymentName) + + hc.enqueueHabitat(newHab) + + } else if hc.habitatNeedsUpdate(oldHab, newHab) { + hc.enqueueHabitat(newHab) } } @@ -255,7 +298,45 @@ func (hc *HabitatController) handleHabDelete(obj interface{}) { return } - hc.enqueue(h) + hc.enqueueHabitat(h) +} + +func (hc *HabitatController) handleHabPromoteAdd(obj interface{}) { + hp, ok := obj.(*crv1.HabitatPromote) + if !ok { + level.Error(hc.logger).Log("msg", "Failed to type assert HabitatPromote", "obj", obj) + return + } + + hc.enqueueHabitatPromote(hp) +} + +func (hc *HabitatController) handleHabPromoteUpdate(oldObj, newObj interface{}) { + oldHabProm, ok := oldObj.(*crv1.HabitatPromote) + if !ok { + level.Error(hc.logger).Log("msg", "Failed to type assert HabitatPromote", "obj", oldObj) + return + } + + newHabProm, ok := newObj.(*crv1.HabitatPromote) + if !ok { + level.Error(hc.logger).Log("msg", "Failed to type assert HabitatPromote", "obj", newObj) + return + } + + if hc.habitatPromoteNeedsUpdate(oldHabProm, newHabProm) { + hc.enqueueHabitatPromote(newHabProm) + } +} + +func (hc *HabitatController) handleHabPromoteDelete(obj interface{}) { + hp, ok := obj.(*crv1.HabitatPromote) + if !ok { + level.Error(hc.logger).Log("msg", "Failed to type assert HabitatPromote", "obj", obj) + return + } + + hc.enqueueHabitatPromote(hp) } func (hc *HabitatController) handleDeployAdd(obj interface{}) { @@ -268,11 +349,16 @@ func (hc *HabitatController) handleDeployAdd(obj interface{}) { if isHabitatObject(&d.ObjectMeta) { h, err := hc.getHabitatFromLabeledResource(d) if err != nil { + if hErr, ok := err.(habitatNotFoundError); !ok { + level.Error(hc.logger).Log("msg", hErr) + + } + level.Error(hc.logger).Log("msg", "Could not find Habitat for Deployment", "name", d.Name) return } - hc.enqueue(h) + hc.enqueueHabitat(h) } } @@ -286,11 +372,17 @@ func (hc *HabitatController) handleDeployUpdate(oldObj, newObj interface{}) { if isHabitatObject(&d.ObjectMeta) { h, err := hc.getHabitatFromLabeledResource(d) if err != nil { + if hErr, ok := err.(habitatNotFoundError); !ok { + level.Error(hc.logger).Log("msg", hErr) + return + } + + // This only means the Deployment and the Habitat watchers are not in sync. level.Error(hc.logger).Log("msg", "Could not find Habitat for Deployment", "name", d.Name) return } - hc.enqueue(h) + hc.enqueueHabitat(h) } } @@ -301,16 +393,24 @@ func (hc *HabitatController) handleDeployDelete(obj interface{}) { return } - if isHabitatObject(&d.ObjectMeta) { - h, err := hc.getHabitatFromLabeledResource(d) - if err != nil { - // Could not find Habitat, it must have already been removed. - level.Debug(hc.logger).Log("msg", "Could not find Habitat for Deployment", "name", d.Name) + if !isHabitatObject(&d.ObjectMeta) { + return + } + + h, err := hc.getHabitatFromLabeledResource(d) + if err != nil { + if hErr, ok := err.(habitatNotFoundError); !ok { + level.Error(hc.logger).Log("msg", hErr) return } - hc.enqueue(h) + // This only means the Deployment and the Habitat watchers are not in sync. + level.Debug(hc.logger).Log("msg", "Could not find Habitat for Deployment", "name", d.Name) + + return } + + hc.enqueueHabitat(h) } func (hc *HabitatController) enqueueCM(obj interface{}) { @@ -328,7 +428,7 @@ func (hc *HabitatController) enqueueCM(obj interface{}) { return } if h.Namespace == cm.GetNamespace() { - hc.enqueue(h) + hc.enqueueHabitat(h) } }) } @@ -359,7 +459,7 @@ func (hc *HabitatController) handlePodAdd(obj interface{}) { level.Error(hc.logger).Log("msg", hErr) return } - hc.enqueue(h) + hc.enqueueHabitat(h) } } } @@ -394,7 +494,7 @@ func (hc *HabitatController) handlePodUpdate(oldObj, newObj interface{}) { return } - hc.enqueue(h) + hc.enqueueHabitat(h) } func (hc *HabitatController) handlePodDelete(obj interface{}) { @@ -421,7 +521,7 @@ func (hc *HabitatController) handlePodDelete(obj interface{}) { return } - hc.enqueue(h) + hc.enqueueHabitat(h) } func (hc *HabitatController) getRunningPods(namespace string) ([]apiv1.Pod, error) { @@ -561,6 +661,13 @@ func (hc *HabitatController) handleHabitatDeletion(key string) error { return nil } +func (hc *HabitatController) deploymentName(habitatName, channel string) string { + if channel == "" { + channel = defaultHabChannel + } + return fmt.Sprintf("%s-%s", habitatName, channel) +} + func (hc *HabitatController) newDeployment(h *crv1.Habitat) (*appsv1beta1.Deployment, error) { // This value needs to be passed as a *int32, so we convert it, assign it to a // variable and afterwards pass a pointer to it. @@ -603,16 +710,17 @@ func (hc *HabitatController) newDeployment(h *crv1.Habitat) (*appsv1beta1.Deploy base := &appsv1beta1.Deployment{ ObjectMeta: metav1.ObjectMeta{ - Name: h.Name, + Name: hc.deploymentName(h.Name, h.Spec.Channel), }, Spec: appsv1beta1.DeploymentSpec{ Replicas: &count, Template: apiv1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - crv1.HabitatLabel: "true", - crv1.HabitatNameLabel: h.Name, - crv1.TopologyLabel: topology.String(), + crv1.HabitatLabel: "true", + crv1.HabitatNameLabel: h.Name, + crv1.TopologyLabel: topology.String(), + crv1.HabitatChannelLabel: h.Spec.Channel, }, }, Spec: apiv1.PodSpec{ @@ -738,27 +846,27 @@ func (hc *HabitatController) newDeployment(h *crv1.Habitat) (*appsv1beta1.Deploy return base, nil } -func (hc *HabitatController) enqueue(hab *crv1.Habitat) { +func (hc *HabitatController) enqueueHabitat(hab *crv1.Habitat) { k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(hab) if err != nil { level.Error(hc.logger).Log("msg", "Habitat object key could not be retrieved", "object", hab) return } - hc.queue.Add(k) + hc.habitatQueue.Add(k) } -func (hc *HabitatController) worker() { - for hc.processNextItem() { +func (hc *HabitatController) habitatWorker() { + for hc.processNextHabitatItem() { } } -func (hc *HabitatController) processNextItem() bool { - key, quit := hc.queue.Get() +func (hc *HabitatController) processNextHabitatItem() bool { + key, quit := hc.habitatQueue.Get() if quit { return false } - defer hc.queue.Done(key) + defer hc.habitatQueue.Done(key) k, ok := key.(string) if !ok { @@ -766,16 +874,16 @@ func (hc *HabitatController) processNextItem() bool { return false } - err := hc.conform(k) + err := hc.conformHabitat(k) if err != nil { level.Error(hc.logger).Log("msg", "Habitat could not be synced, requeueing", "msg", err) - hc.queue.AddRateLimited(k) + hc.habitatQueue.AddRateLimited(k) return true } - hc.queue.Forget(k) + hc.habitatQueue.Forget(k) return true } @@ -783,7 +891,7 @@ func (hc *HabitatController) processNextItem() bool { // conform is where the reconciliation takes place. // It is invoked when any of the following resources get created, updated or deleted: // Habitat, Pod, Deployment, ConfigMap. -func (hc *HabitatController) conform(key string) error { +func (hc *HabitatController) conformHabitat(key string) error { obj, exists, err := hc.habInformer.GetStore().GetByKey(key) if err != nil { return err @@ -802,7 +910,7 @@ func (hc *HabitatController) conform(key string) error { level.Debug(hc.logger).Log("function", "handle Habitat Creation", "msg", h.ObjectMeta.SelfLink) // Validate object. - if err := validateCustomObject(*h); err != nil { + if err := validateHabitat(*h); err != nil { return err } @@ -842,6 +950,15 @@ func (hc *HabitatController) conform(key string) error { return nil } +func (hc *HabitatController) habitatIsPromoted(oldHabitat, newHabitat *crv1.Habitat) bool { + if oldHabitat.Spec.Channel != newHabitat.Spec.Channel { + level.Debug(hc.logger).Log("msg", "Habitat is promoted", "h", newHabitat) + return true + } + + return false +} + func (hc *HabitatController) habitatNeedsUpdate(oldHabitat, newHabitat *crv1.Habitat) bool { if reflect.DeepEqual(oldHabitat.Spec, newHabitat.Spec) { level.Debug(hc.logger).Log("msg", "Update ignored as it didn't change Habitat spec", "h", newHabitat) @@ -851,6 +968,102 @@ func (hc *HabitatController) habitatNeedsUpdate(oldHabitat, newHabitat *crv1.Hab return true } +func (hc *HabitatController) enqueueHabitatPromote(habProm *crv1.HabitatPromote) { + k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(habProm) + if err != nil { + level.Error(hc.logger).Log("msg", "Object key could not be retrieved", "object", habProm) + return + } + + hc.habitatPromoteQueue.Add(k) +} + +func (hc *HabitatController) habitatPromoteWorker() { + for hc.processNextHabitatPromoteItem() { + } +} + +func (hc *HabitatController) processNextHabitatPromoteItem() bool { + key, quit := hc.habitatPromoteQueue.Get() + if quit { + return false + } + defer hc.habitatPromoteQueue.Done(key) + + k, ok := key.(string) + if !ok { + level.Error(hc.logger).Log("msg", "Failed to type assert key", "obj", key) + return false + } + + if err := hc.conformHabitatPromote(k); err != nil { + level.Error(hc.logger).Log("msg", "HabitatPromote could not be synced, requeueing", "msg", err) + hc.habitatPromoteQueue.AddRateLimited(k) + + return true + } + + hc.habitatPromoteQueue.Forget(k) + + return true +} + +func (hc *HabitatController) conformHabitatPromote(key string) error { + obj, exists, err := hc.habPromoteInformer.GetStore().GetByKey(key) + if err != nil { + return err + } + if !exists { + // The HabitatPromote was deleted. + // No action is required there. + return nil + } + + // The HabitatPromote was either created or updated. + hp, ok := obj.(*crv1.HabitatPromote) + if !ok { + return fmt.Errorf("unknown event type") + } + + level.Debug(hc.logger).Log("function", "handle HabitatPromote creation", "msg", hp.ObjectMeta.SelfLink) + + var habList crv1.HabitatList + if err := hc.config.HabitatClient.Get(). + Resource(crv1.HabitatResourcePlural). + Namespace(apiv1.NamespaceDefault). + Param("labelSelector", fmt.Sprintf("%s=%s", crv1.HabitatNameLabel, hp.Spec.HabitatName)). + Do().Into(&habList); err != nil { + return err + } + + if len(habList.Items) < 1 { + return fmt.Errorf("cannot find habitat object with name %s", hp.Spec.HabitatName) + } + + for _, hab := range habList.Items { + var result crv1.Habitat + hab.Spec.Channel = hp.Spec.NewChannel + if err := hc.config.HabitatClient.Put(). + Resource(crv1.HabitatResourcePlural). + Name(hab.ObjectMeta.Name). + Namespace(apiv1.NamespaceDefault). + Body(&hab).Do().Into(&result); err != nil { + return err + } + } + + return nil +} + +func (hc *HabitatController) habitatPromoteNeedsUpdate(oldHabProm, newHabProm *crv1.HabitatPromote) bool { + if reflect.DeepEqual(oldHabProm.Spec, newHabProm.Spec) { + level.Debug(hc.logger).Log("msg", "Update ignored as it didn't change HabitatPromote spec", "hp", newHabProm) + return false + } + + return true +} + func (hc *HabitatController) podNeedsUpdate(oldPod, newPod *apiv1.Pod) bool { // Ignore identical objects. // https://github.com/kubernetes/kubernetes/blob/7e630154dfc7b2155f8946a06f92e96e268dcbcd/pkg/controller/replicaset/replica_set.go#L276-L277 diff --git a/pkg/habitat/controller/utils.go b/pkg/habitat/controller/utils.go index 7069c9bc..f63d2e2f 100644 --- a/pkg/habitat/controller/utils.go +++ b/pkg/habitat/controller/utils.go @@ -35,7 +35,7 @@ func (err habitatNotFoundError) Error() string { return fmt.Sprintf("could not find Habitat with key %s", err.key) } -func validateCustomObject(h crv1.Habitat) error { +func validateHabitat(h crv1.Habitat) error { spec := h.Spec switch spec.Service.Topology {