Skip to content

Commit

Permalink
1496 asset sync namespaces (#1519)
Browse files Browse the repository at this point in the history
* Update controller to include watching apprepos in other namespaces when enabled.

* Add repos-per-namespace option to controller template.

* Remove debugging statement.

* Retrieve apprepos after delete depending on reposPerNamespace
  • Loading branch information
absoludity authored Feb 17, 2020
1 parent 29e729d commit e8ad3e4
Show file tree
Hide file tree
Showing 6 changed files with 331 additions and 110 deletions.
3 changes: 3 additions & 0 deletions chart/kubeapps/templates/apprepository-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ spec:
{{- if .Values.apprepository.crontab }}
- --crontab={{ .Values.apprepository.crontab }}
{{- end }}
{{- if .Values.featureFlags.reposPerNamespace }}
- --repos-per-namespace
{{- end }}
{{- if .Values.apprepository.resources }}
resources: {{- toYaml .Values.apprepository.resources | nindent 12 }}
{{- end }}
119 changes: 72 additions & 47 deletions cmd/apprepository-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ const (
// existing.
ErrResourceExists = "ErrResourceExists"

LabelRepoName = "apprepositories.kubeapps.com/repo-name"
LabelRepoNamespace = "apprepositories.kubeapps.com/repo-namespace"

// MessageResourceExists is the message used for Events when a resource
// fails to sync due to a CronJob already existing
MessageResourceExists = "Resource %q already exists and is not managed by AppRepository"
Expand Down Expand Up @@ -84,14 +87,17 @@ type Controller struct {
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder

kubeappsNamespace string
}

// NewController returns a new sample controller
func NewController(
kubeclientset kubernetes.Interface,
apprepoclientset clientset.Interface,
kubeInformerFactory kubeinformers.SharedInformerFactory,
apprepoInformerFactory informers.SharedInformerFactory) *Controller {
apprepoInformerFactory informers.SharedInformerFactory,
kubeappsNamespace string) *Controller {

// obtain references to shared index informers for the CronJob and
// AppRepository types.
Expand All @@ -109,14 +115,15 @@ func NewController(
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

controller := &Controller{
kubeclientset: kubeclientset,
apprepoclientset: apprepoclientset,
cronjobsLister: cronjobInformer.Lister(),
cronjobsSynced: cronjobInformer.Informer().HasSynced,
appreposLister: apprepoInformer.Lister(),
appreposSynced: apprepoInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AppRepositories"),
recorder: recorder,
kubeclientset: kubeclientset,
apprepoclientset: apprepoclientset,
cronjobsLister: cronjobInformer.Lister(),
cronjobsSynced: cronjobInformer.Informer().HasSynced,
appreposLister: apprepoInformer.Lister(),
appreposSynced: apprepoInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AppRepositories"),
recorder: recorder,
kubeappsNamespace: kubeappsNamespace,
}

log.Info("Setting up event handlers")
Expand Down Expand Up @@ -262,35 +269,35 @@ func (c *Controller) syncHandler(key string) error {
if errors.IsNotFound(err) {
log.Infof("AppRepository '%s' no longer exists so performing cleanup of charts from the DB", key)
// Trigger a Job to perfrom the cleanup of the charts in the DB corresponding to deleted AppRepository
_, err = c.kubeclientset.BatchV1().Jobs(namespace).Create(newCleanupJob(name, namespace))
_, err = c.kubeclientset.BatchV1().Jobs(namespace).Create(newCleanupJob(name, namespace, c.kubeappsNamespace))
return nil
}
return fmt.Errorf("Error fetching object with key %s from store: %v", key, err)
}

// Get the cronjob with the same name as AppRepository
cronjobName := cronJobName(apprepo)
cronjob, err := c.cronjobsLister.CronJobs(apprepo.Namespace).Get(cronjobName)
cronjob, err := c.cronjobsLister.CronJobs(c.kubeappsNamespace).Get(cronjobName)
// If the resource doesn't exist, we'll create it
if errors.IsNotFound(err) {
log.Infof("Creating CronJob %q for AppRepository %q", cronjobName, apprepo.GetName())
cronjob, err = c.kubeclientset.BatchV1beta1().CronJobs(apprepo.Namespace).Create(newCronJob(apprepo))
cronjob, err = c.kubeclientset.BatchV1beta1().CronJobs(c.kubeappsNamespace).Create(newCronJob(apprepo, c.kubeappsNamespace))
if err != nil {
return err
}

// Trigger a manual Job for the initial sync
_, err = c.kubeclientset.BatchV1().Jobs(apprepo.Namespace).Create(newSyncJob(apprepo))
_, err = c.kubeclientset.BatchV1().Jobs(c.kubeappsNamespace).Create(newSyncJob(apprepo, c.kubeappsNamespace))
} else if err == nil {
// If the resource already exists, we'll update it
log.Infof("Updating CronJob %q for AppRepository %q", cronjobName, apprepo.GetName())
cronjob, err = c.kubeclientset.BatchV1beta1().CronJobs(apprepo.Namespace).Update(newCronJob(apprepo))
log.Infof("Updating CronJob %q in namespace %q for AppRepository %q in namespace %q", cronjobName, c.kubeappsNamespace, apprepo.GetName(), apprepo.GetNamespace())
cronjob, err = c.kubeclientset.BatchV1beta1().CronJobs(c.kubeappsNamespace).Update(newCronJob(apprepo, c.kubeappsNamespace))
if err != nil {
return err
}

// The AppRepository has changed, launch a manual Job
_, err = c.kubeclientset.BatchV1().Jobs(apprepo.Namespace).Create(newSyncJob(apprepo))
_, err = c.kubeclientset.BatchV1().Jobs(c.kubeappsNamespace).Create(newSyncJob(apprepo, c.kubeappsNamespace))
}

// If an error occurs during Get/Create, we'll requeue the item so we can
Expand All @@ -300,9 +307,10 @@ func (c *Controller) syncHandler(key string) error {
return err
}

// If the CronJob is not controlled by this AppRepository resource, we should
// log a warning to the event recorder and ret
if !metav1.IsControlledBy(cronjob, apprepo) {
// If the CronJob is not controlled by this AppRepository resource and it is not a
// cronjob for an app repo in another namespace, then we should
// log a warning to the event recorder and return it.
if !metav1.IsControlledBy(cronjob, apprepo) && !objectBelongsTo(cronjob, apprepo) {
msg := fmt.Sprintf(MessageResourceExists, cronjob.Name)
c.recorder.Event(apprepo, corev1.EventTypeWarning, ErrResourceExists, msg)
return fmt.Errorf(msg)
Expand All @@ -315,10 +323,19 @@ func (c *Controller) syncHandler(key string) error {
return err
}

c.recorder.Event(apprepo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
if apprepo.GetNamespace() == c.kubeappsNamespace {
c.recorder.Event(apprepo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
}
return nil
}

// belongsTo is similar to IsControlledBy, but enables us to establish a relationship
// between cronjobs and app repositories in different namespaces.
func objectBelongsTo(object, parent metav1.Object) bool {
labels := object.GetLabels()
return labels[LabelRepoName] == parent.GetName() && labels[LabelRepoNamespace] == parent.GetNamespace()
}

// enqueueAppRepo takes a AppRepository resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than AppRepository.
Expand Down Expand Up @@ -367,26 +384,40 @@ func (c *Controller) handleObject(obj interface{}) {
return
}

if apprepo.ObjectMeta.DeletionTimestamp != nil {
log.Infof("ignoring object %q of AppRepository %q with deletion timestamp %q", object.GetSelfLink(), ownerRef.Name, apprepo.ObjectMeta.DeletionTimestamp)
return
}

c.enqueueAppRepo(apprepo)
return
}
}

// ownerReferencesForAppRepo returns populated owner references for app repos in the same namespace
// as the cronjob and nil otherwise.
func ownerReferencesForAppRepo(apprepo *apprepov1alpha1.AppRepository, childNamespace string) []metav1.OwnerReference {
if apprepo.GetNamespace() == childNamespace {
return []metav1.OwnerReference{
*metav1.NewControllerRef(apprepo, schema.GroupVersionKind{
Group: apprepov1alpha1.SchemeGroupVersion.Group,
Version: apprepov1alpha1.SchemeGroupVersion.Version,
Kind: "AppRepository",
}),
}
}
return nil
}

// newCronJob creates a new CronJob for a AppRepository resource. It also sets
// the appropriate OwnerReferences on the resource so handleObject can discover
// the AppRepository resource that 'owns' it.
func newCronJob(apprepo *apprepov1alpha1.AppRepository) *batchv1beta1.CronJob {
func newCronJob(apprepo *apprepov1alpha1.AppRepository, kubeappsNamespace string) *batchv1beta1.CronJob {
return &batchv1beta1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: cronJobName(apprepo),
Namespace: apprepo.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(apprepo, schema.GroupVersionKind{
Group: apprepov1alpha1.SchemeGroupVersion.Group,
Version: apprepov1alpha1.SchemeGroupVersion.Version,
Kind: "AppRepository",
}),
},
Name: cronJobName(apprepo),
OwnerReferences: ownerReferencesForAppRepo(apprepo, kubeappsNamespace),
Labels: jobLabels(apprepo),
},
Spec: batchv1beta1.CronJobSpec{
Schedule: crontab,
Expand All @@ -403,18 +434,11 @@ func newCronJob(apprepo *apprepov1alpha1.AppRepository) *batchv1beta1.CronJob {

// newSyncJob triggers a job for the AppRepository resource. It also sets the
// appropriate OwnerReferences on the resource
func newSyncJob(apprepo *apprepov1alpha1.AppRepository) *batchv1.Job {
func newSyncJob(apprepo *apprepov1alpha1.AppRepository, kubeappsNamespace string) *batchv1.Job {
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
GenerateName: cronJobName(apprepo) + "-",
Namespace: apprepo.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(apprepo, schema.GroupVersionKind{
Group: apprepov1alpha1.SchemeGroupVersion.Group,
Version: apprepov1alpha1.SchemeGroupVersion.Version,
Kind: "AppRepository",
}),
},
GenerateName: cronJobName(apprepo) + "-",
OwnerReferences: ownerReferencesForAppRepo(apprepo, kubeappsNamespace),
},
Spec: syncJobSpec(apprepo),
}
Expand Down Expand Up @@ -475,11 +499,11 @@ func syncJobSpec(apprepo *apprepov1alpha1.AppRepository) batchv1.JobSpec {

// newCleanupJob triggers a job for the AppRepository resource. It also sets the
// appropriate OwnerReferences on the resource
func newCleanupJob(reponame, namespace string) *batchv1.Job {
func newCleanupJob(reponame, namespace, kubeappsNamespace string) *batchv1.Job {
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
GenerateName: deleteJobName(reponame) + "-",
Namespace: namespace,
GenerateName: deleteJobName(reponame, namespace) + "-",
Namespace: kubeappsNamespace,
},
Spec: cleanupJobSpec(reponame),
}
Expand Down Expand Up @@ -519,18 +543,19 @@ func cleanupJobSpec(repoName string) batchv1.JobSpec {
// jobLabels returns the labels for the job and cronjob resources
func jobLabels(apprepo *apprepov1alpha1.AppRepository) map[string]string {
return map[string]string{
"apprepositories.kubeapps.com/repo-name": apprepo.Name,
LabelRepoName: apprepo.GetName(),
LabelRepoNamespace: apprepo.GetNamespace(),
}
}

// cronJobName returns a unique name for the CronJob managed by an AppRepository
func cronJobName(apprepo *apprepov1alpha1.AppRepository) string {
return fmt.Sprintf("apprepo-sync-%s", apprepo.GetName())
return fmt.Sprintf("apprepo-%s-sync-%s", apprepo.GetNamespace(), apprepo.GetName())
}

// deleteJobName returns a unique name for the Job to cleanup AppRepository
func deleteJobName(reponame string) string {
return fmt.Sprintf("apprepo-cleanup-%s", reponame)
func deleteJobName(reponame, reponamespace string) string {
return fmt.Sprintf("apprepo-%s-cleanup-%s", reponamespace, reponame)
}

// apprepoSyncJobArgs returns a list of args for the sync container
Expand Down
Loading

0 comments on commit e8ad3e4

Please sign in to comment.