diff --git a/.github/workflows/e2e-test.yaml b/.github/workflows/e2e-test.yaml index 68bed90f..2815f5b6 100644 --- a/.github/workflows/e2e-test.yaml +++ b/.github/workflows/e2e-test.yaml @@ -10,6 +10,9 @@ jobs: version: - v1.21 - v1.29 + setup: + - minimal + - production runs-on: ubuntu-latest name: test on minikube steps: @@ -19,4 +22,4 @@ jobs: with: kubernetes-version: ${{ matrix.version }} - name: Build and run wave - run: hack/run-test-in-minikube.sh + run: hack/run-test-in-minikube.sh ${{ matrix.setup }} diff --git a/charts/wave/templates/clusterrole.yaml b/charts/wave/templates/clusterrole.yaml index 2adcb98a..4858de6e 100644 --- a/charts/wave/templates/clusterrole.yaml +++ b/charts/wave/templates/clusterrole.yaml @@ -37,4 +37,10 @@ rules: - update - patch - watch + - verbs: + - '*' + apiGroups: + - coordination.k8s.io + resources: + - leases {{- end }} diff --git a/charts/wave/templates/deployment.yaml b/charts/wave/templates/deployment.yaml index b4296ca7..ab26187c 100644 --- a/charts/wave/templates/deployment.yaml +++ b/charts/wave/templates/deployment.yaml @@ -32,12 +32,36 @@ spec: {{- if .Values.syncPeriod }} - --sync-period={{ .Values.syncPeriod }} {{- end }} - volumeMounts: {{ toYaml .Values.extraVolumeMounts | nindent 12 }} + {{- if .Values.webhooks.enabled }} + - --enable-webhooks=true + {{- end }} + volumeMounts: + {{- if .Values.webhooks.enabled }} + - mountPath: /tmp/k8s-webhook-server/serving-certs + name: cert + readOnly: true + {{- end }} + {{- with .Values.extraVolumeMounts }} + {{- toYaml . | nindent 12 }} + {{- end }} + ports: + - containerPort: 9443 + name: webhook-server + protocol: TCP resources: {{- toYaml .Values.resources | nindent 12 }} securityContext: {{ toYaml .Values.securityContext | nindent 8 }} serviceAccountName: {{ .Values.serviceAccount.name | default (include "wave-fullname" .) }} nodeSelector: {{ toYaml .Values.nodeSelector | nindent 8 }} affinity: {{ toYaml .Values.affinity | nindent 8 }} tolerations: {{ toYaml .Values.tolerations | nindent 8 }} + volumes: + {{- if .Values.webhooks.enabled }} + - name: cert + secret: + defaultMode: 420 + secretName: {{ template "wave-fullname" . }}-webhook-server-cert + {{- end }} + {{- with .Values.extraVolumes }} + {{- toYaml . | nindent 8 }} + {{- end }} topologySpreadConstraints: {{ toYaml .Values.topologySpreadConstraints | nindent 8 }} - volumes: {{ toYaml .Values.extraVolumes | nindent 8 }} diff --git a/charts/wave/templates/poddisruptionbudget.yaml b/charts/wave/templates/poddisruptionbudget.yaml index 204bcc10..d11588a8 100644 --- a/charts/wave/templates/poddisruptionbudget.yaml +++ b/charts/wave/templates/poddisruptionbudget.yaml @@ -7,6 +7,6 @@ metadata: spec: selector: matchLabels: - {{ include "wave.selectorLabels" . | nindent 6}} + {{ include "wave-labels.chart" . | nindent 6 }} maxUnavailable: 1 {{- end }} \ No newline at end of file diff --git a/charts/wave/templates/webhook.yaml b/charts/wave/templates/webhook.yaml new file mode 100644 index 00000000..e81290ea --- /dev/null +++ b/charts/wave/templates/webhook.yaml @@ -0,0 +1,70 @@ +--- +{{- if .Values.webhooks.enabled }} +apiVersion: admissionregistration.k8s.io/v1 +kind: MutatingWebhookConfiguration +metadata: + name: '{{ template "wave-fullname" . }}-mutating-webhook-configuration' + annotations: + cert-manager.io/inject-ca-from: '{{ .Release.Namespace }}/{{ template "wave-fullname" . }}-serving-cert' +webhooks: + - admissionReviewVersions: + - v1 + clientConfig: + service: + name: '{{ template "wave-fullname" . }}-webhook-service' + namespace: '{{ .Release.Namespace }}' + path: /mutate-apps-v1-deployment + failurePolicy: Ignore + name: deployments.wave.pusher.com + rules: + - apiGroups: + - apps + apiVersions: + - v1 + operations: + - CREATE + - UPDATE + resources: + - deployments + sideEffects: NoneOnDryRun + - admissionReviewVersions: + - v1 + clientConfig: + service: + name: '{{ template "wave-fullname" . }}-webhook-service' + namespace: '{{ .Release.Namespace }}' + path: /mutate-apps-v1-statefulset + failurePolicy: Ignore + name: statefulsets.wave.pusher.com + rules: + - apiGroups: + - apps + apiVersions: + - v1 + operations: + - CREATE + - UPDATE + resources: + - statefulsets + sideEffects: NoneOnDryRun + - admissionReviewVersions: + - v1 + clientConfig: + service: + name: '{{ template "wave-fullname" . }}-webhook-service' + namespace: '{{ .Release.Namespace }}' + path: /mutate-apps-v1-daemonset + failurePolicy: Ignore + name: daemonsets.wave.pusher.com + rules: + - apiGroups: + - apps + apiVersions: + - v1 + operations: + - CREATE + - UPDATE + resources: + - daemonsets + sideEffects: NoneOnDryRun +{{- end }} \ No newline at end of file diff --git a/charts/wave/templates/webhook_certificate.yaml b/charts/wave/templates/webhook_certificate.yaml new file mode 100644 index 00000000..5d63f8ba --- /dev/null +++ b/charts/wave/templates/webhook_certificate.yaml @@ -0,0 +1,25 @@ +{{- if .Values.webhooks.enabled }} +# The following manifests contain a self-signed issuer CR and a certificate CR. +# More document can be found at https://docs.cert-manager.io +apiVersion: cert-manager.io/v1 +kind: Issuer +metadata: + name: {{ template "wave-fullname" . }}-selfsigned-issuer + namespace: {{ .Release.Namespace }} +spec: + selfSigned: {} +--- +apiVersion: cert-manager.io/v1 +kind: Certificate +metadata: + name: {{ template "wave-fullname" . }}-serving-cert + namespace: {{ .Release.Namespace }} +spec: + dnsNames: + - {{ template "wave-fullname" . }}-webhook-service.{{ .Release.Namespace }}.svc + - {{ template "wave-fullname" . }}-webhook-service.{{ .Release.Namespace }}.svc.cluster.local + issuerRef: + kind: Issuer + name: {{ template "wave-fullname" . }}-selfsigned-issuer + secretName: {{ template "wave-fullname" . }}-webhook-server-cert +{{- end }} \ No newline at end of file diff --git a/charts/wave/templates/webhook_service.yaml b/charts/wave/templates/webhook_service.yaml new file mode 100644 index 00000000..a3b2532a --- /dev/null +++ b/charts/wave/templates/webhook_service.yaml @@ -0,0 +1,15 @@ +{{- if .Values.webhooks.enabled }} +apiVersion: v1 +kind: Service +metadata: + name: {{ template "wave-fullname" . }}-webhook-service + namespace: {{ .Release.Namespace }} + labels: + {{ include "wave-labels.chart" . | nindent 4 }} +spec: + ports: + - port: 443 + targetPort: 9443 + selector: + {{ include "wave-labels.chart" . | nindent 4 }} +{{- end }} diff --git a/charts/wave/values.yaml b/charts/wave/values.yaml index add081da..6ec2e4f5 100644 --- a/charts/wave/values.yaml +++ b/charts/wave/values.yaml @@ -47,6 +47,9 @@ serviceAccount: # If not set and create is true, a name is generated using the fullname template name: +webhooks: + enabled: false + # Period for reconciliation # syncPeriod: 5m diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 3065b755..8667f369 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -27,7 +27,11 @@ import ( "github.com/wave-k8s/wave/pkg/apis" "github.com/wave-k8s/wave/pkg/controller" - "github.com/wave-k8s/wave/pkg/webhook" + "github.com/wave-k8s/wave/pkg/controller/daemonset" + "github.com/wave-k8s/wave/pkg/controller/deployment" + "github.com/wave-k8s/wave/pkg/controller/statefulset" + k8swebhook "sigs.k8s.io/controller-runtime/pkg/webhook" + _ "k8s.io/client-go/plugin/pkg/client/auth" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client/config" @@ -42,6 +46,7 @@ var ( leaderElectionNamespace = flag.String("leader-election-namespace", "", "Namespace for the configmap used by the leader election system") syncPeriod = flag.Duration("sync-period", 5*time.Minute, "Reconcile sync period") showVersion = flag.Bool("version", false, "Show version and exit") + enableWebhooks = flag.Bool("enable-webhooks", false, "Enable webhooks") setupLog = ctrl.Log.WithName("setup") ) @@ -69,7 +74,14 @@ func main() { // Create a new Cmd to provide shared dependencies and start components setupLog.Info("setting up manager") + var webhookServer k8swebhook.Server + if *enableWebhooks { + webhookServer = k8swebhook.NewServer(k8swebhook.Options{ + Port: 9443, + }) + } mgr, err := manager.New(cfg, manager.Options{ + WebhookServer: webhookServer, LeaderElection: *leaderElection, LeaderElectionID: *leaderElectionID, LeaderElectionNamespace: *leaderElectionNamespace, @@ -97,11 +109,21 @@ func main() { setupLog.Error(err, "unable to register controllers to the manager") os.Exit(1) } - - setupLog.Info("setting up webhooks") - if err := webhook.AddToManager(mgr); err != nil { - setupLog.Error(err, "unable to register webhooks to the manager") - os.Exit(1) + if *enableWebhooks { + if err := deployment.AddDeploymentWebhook(mgr); err != nil { + setupLog.Error(err, "unable to create webhook", "webhook", "Deployment") + os.Exit(1) + } + + if err := statefulset.AddStatefulSetWebhook(mgr); err != nil { + setupLog.Error(err, "unable to create webhook", "webhook", "StatefulSet") + os.Exit(1) + } + + if err := daemonset.AddDaemonSetWebhook(mgr); err != nil { + setupLog.Error(err, "unable to create webhook", "webhook", "DaemonSet") + os.Exit(1) + } } // Start the Cmd diff --git a/config/webhook/manifests.yaml b/config/webhook/manifests.yaml index e69de29b..10be8448 100644 --- a/config/webhook/manifests.yaml +++ b/config/webhook/manifests.yaml @@ -0,0 +1,66 @@ +--- +apiVersion: admissionregistration.k8s.io/v1 +kind: MutatingWebhookConfiguration +metadata: + name: mutating-webhook-configuration +webhooks: +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /mutate-apps-v1-daemonset + failurePolicy: Ignore + name: daemonsets.wave.pusher.com + rules: + - apiGroups: + - apps + apiVersions: + - v1 + operations: + - CREATE + - UPDATE + resources: + - daemonsets + sideEffects: NoneOnDryRun +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /mutate-apps-v1-deployment + failurePolicy: Ignore + name: deployments.wave.pusher.com + rules: + - apiGroups: + - apps + apiVersions: + - v1 + operations: + - CREATE + - UPDATE + resources: + - deployments + sideEffects: NoneOnDryRun +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /mutate-apps-v1-statefulset + failurePolicy: Ignore + name: statefulsets.wave.pusher.com + rules: + - apiGroups: + - apps + apiVersions: + - v1 + operations: + - CREATE + - UPDATE + resources: + - statefulsets + sideEffects: NoneOnDryRun diff --git a/hack/production.yaml b/hack/production.yaml new file mode 100644 index 00000000..fe40d0a6 --- /dev/null +++ b/hack/production.yaml @@ -0,0 +1,16 @@ +# production setup used in tests +replicas: 2 + +webhooks: + enabled: true + +pdb: + enabled: true + +topologySpreadConstraints: + - maxSkew: 1 + topologyKey: topology.kubernetes.io/zone + whenUnsatisfiable: ScheduleAnyway + labelSelector: + matchLabels: + app: wave diff --git a/hack/run-test-in-minikube.sh b/hack/run-test-in-minikube.sh index 940ef97f..4f8aa6f1 100755 --- a/hack/run-test-in-minikube.sh +++ b/hack/run-test-in-minikube.sh @@ -33,7 +33,16 @@ eval $(minikube -p minikube docker-env) docker build -f ./Dockerfile -t wave-local:local . echo Installing wave... -helm install wave charts/wave --set image.name=wave-local --set image.tag=local +if [ "$1" = "production" ]; then + # Install cert-manager + kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.14.5/cert-manager.yaml + while [ "$(kubectl get pods -n cert-manager | grep 'webhook' | grep -c '1/1')" -ne 1 ]; do echo Waiting for \"cert-manager-webhook\" to start; sleep 10; done + # Production setup + helm install wave charts/wave --set image.name=wave-local --set image.tag=local -f hack/production.yaml +else + # Default install without values + helm install wave charts/wave --set image.name=wave-local --set image.tag=local +fi while [ "$(kubectl get pods -A | grep -cEv 'Running|Completed')" -gt 1 ]; do echo Waiting for \"cluster\" to start; sleep 10; done diff --git a/pkg/controller/daemonset/daemonset_controller_suite_test.go b/pkg/controller/daemonset/daemonset_controller_suite_test.go index 4ad3d13e..a05cca5e 100644 --- a/pkg/controller/daemonset/daemonset_controller_suite_test.go +++ b/pkg/controller/daemonset/daemonset_controller_suite_test.go @@ -31,6 +31,9 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/reconcile" + + admissionv1 "k8s.io/api/admissionregistration/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) var cfg *rest.Config @@ -45,8 +48,51 @@ var t *envtest.Environment var testCtx, testCancel = context.WithCancel(context.Background()) var _ = BeforeSuite(func() { + failurePolicy := admissionv1.Ignore + sideEffects := admissionv1.SideEffectClassNone + webhookPath := "/mutate-apps-v1-daemonset" + webhookInstallOptions := envtest.WebhookInstallOptions{ + MutatingWebhooks: []*admissionv1.MutatingWebhookConfiguration{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "daemonset-operator", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "MutatingWebhookConfiguration", + APIVersion: "admissionregistration.k8s.io/v1", + }, + Webhooks: []admissionv1.MutatingWebhook{ + { + Name: "daemonsets.wave.pusher.com", + AdmissionReviewVersions: []string{"v1"}, + FailurePolicy: &failurePolicy, + ClientConfig: admissionv1.WebhookClientConfig{ + Service: &admissionv1.ServiceReference{ + Path: &webhookPath, + }, + }, + Rules: []admissionv1.RuleWithOperations{ + { + Operations: []admissionv1.OperationType{ + admissionv1.Create, + admissionv1.Update, + }, + Rule: admissionv1.Rule{ + APIGroups: []string{"apps"}, + APIVersions: []string{"v1"}, + Resources: []string{"daemonsets"}, + }, + }, + }, + SideEffects: &sideEffects, + }, + }, + }, + }, + } t = &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crds")}, + WebhookInstallOptions: webhookInstallOptions, + CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crds")}, } logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) diff --git a/pkg/controller/daemonset/daemonset_controller_test.go b/pkg/controller/daemonset/daemonset_controller_test.go index efb9d3a8..b1fff383 100644 --- a/pkg/controller/daemonset/daemonset_controller_test.go +++ b/pkg/controller/daemonset/daemonset_controller_test.go @@ -34,6 +34,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/metrics" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/controller-runtime/pkg/reconcile" + webhook "sigs.k8s.io/controller-runtime/pkg/webhook" ) var _ = Describe("DaemonSet controller Suite", func() { @@ -94,6 +95,11 @@ var _ = Describe("DaemonSet controller Suite", func() { Metrics: metricsserver.Options{ BindAddress: "0", }, + WebhookServer: webhook.NewServer(webhook.Options{ + Host: t.WebhookInstallOptions.LocalServingHost, + Port: t.WebhookInstallOptions.LocalServingPort, + CertDir: t.WebhookInstallOptions.LocalServingCertDir, + }), }) Expect(err).NotTo(HaveOccurred()) var cerr error @@ -106,6 +112,10 @@ var _ = Describe("DaemonSet controller Suite", func() { recFn, requestsStart, requests = SetupTestReconcile(r) Expect(add(mgr, recFn, r.handler)).NotTo(HaveOccurred()) + // register mutating pod webhook + err = AddDaemonSetWebhook(mgr) + Expect(err).ToNot(HaveOccurred()) + testCtx, testCancel = context.WithCancel(context.Background()) go Run(testCtx, mgr) @@ -131,11 +141,6 @@ var _ = Describe("DaemonSet controller Suite", func() { m.Get(s3, timeout).Should(Succeed()) daemonset = utils.ExampleDaemonSet.DeepCopy() - - // Create a daemonset and wait for it to be reconciled - clearReconciled() - m.Create(daemonset).Should(Succeed()) - waitForDaemonSetReconciled(daemonset) }) AfterEach(func() { @@ -149,7 +154,14 @@ var _ = Describe("DaemonSet controller Suite", func() { ) }) - Context("When a DaemonSet is reconciled", func() { + Context("When a DaemonSet with all children existing is reconciled", func() { + BeforeEach(func() { + // Create a daemonset and wait for it to be reconciled + clearReconciled() + m.Create(daemonset).Should(Succeed()) + waitForDaemonSetReconciled(daemonset) + }) + Context("And it has the required annotation", func() { BeforeEach(func() { addAnnotation := func(obj client.Object) client.Object { @@ -163,14 +175,18 @@ var _ = Describe("DaemonSet controller Suite", func() { } clearReconciled() m.Update(daemonset, addAnnotation).Should(Succeed()) - // Two runs since we the controller retriggers itself by changing the object - waitForDaemonSetReconciled(daemonset) waitForDaemonSetReconciled(daemonset) // Get the updated DaemonSet m.Get(daemonset, timeout).Should(Succeed()) }) + It("Has scheduling enabled", func() { + m.Get(daemonset, timeout).Should(Succeed()) + Expect(daemonset.Spec.Template.Spec.SchedulerName).To(Equal("default-scheduler")) + Expect(daemonset.ObjectMeta.Annotations).NotTo(HaveKey(core.SchedulingDisabledAnnotation)) + }) + It("Adds a config hash to the Pod Template", func() { Eventually(daemonset, timeout).Should(utils.WithPodTemplateAnnotations(HaveKey(core.ConfigHashAnnotation))) }) @@ -205,7 +221,6 @@ var _ = Describe("DaemonSet controller Suite", func() { clearReconciled() m.Update(daemonset, removeContainer2).Should(Succeed()) waitForDaemonSetReconciled(daemonset) - waitForDaemonSetReconciled(daemonset) // Get the updated DaemonSet m.Get(daemonset, timeout).Should(Succeed()) @@ -419,4 +434,42 @@ var _ = Describe("DaemonSet controller Suite", func() { }) }) + Context("When a DaemonSet with missing children is reconciled", func() { + BeforeEach(func() { + m.Delete(cm1).Should(Succeed()) + + annotations := daemonset.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + annotations[core.RequiredAnnotation] = "true" + daemonset.SetAnnotations(annotations) + + // Create a daemonset and wait for it to be reconciled + clearReconciled() + m.Create(daemonset).Should(Succeed()) + waitForDaemonSetReconciled(daemonset) + }) + + It("Has scheduling disabled", func() { + m.Get(daemonset, timeout).Should(Succeed()) + Expect(daemonset.Spec.Template.Spec.SchedulerName).To(Equal(core.SchedulingDisabledSchedulerName)) + Expect(daemonset.ObjectMeta.Annotations[core.SchedulingDisabledAnnotation]).To(Equal("default-scheduler")) + }) + + Context("And the missing child is created", func() { + BeforeEach(func() { + clearReconciled() + cm1 = utils.ExampleConfigMap1.DeepCopy() + m.Create(cm1).Should(Succeed()) + waitForDaemonSetReconciled(daemonset) + }) + + It("Has scheduling renabled", func() { + m.Get(daemonset, timeout).Should(Succeed()) + Expect(daemonset.Spec.Template.Spec.SchedulerName).To(Equal("default-scheduler")) + Expect(daemonset.ObjectMeta.Annotations).NotTo(HaveKey(core.SchedulingDisabledAnnotation)) + }) + }) + }) }) diff --git a/pkg/controller/daemonset/daemonset_webhook.go b/pkg/controller/daemonset/daemonset_webhook.go new file mode 100644 index 00000000..eff15322 --- /dev/null +++ b/pkg/controller/daemonset/daemonset_webhook.go @@ -0,0 +1,39 @@ +package daemonset + +import ( + "context" + + "github.com/wave-k8s/wave/pkg/core" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" +) + +// +kubebuilder:webhook:path=/mutate-apps-v1-daemonset,mutating=true,failurePolicy=ignore,groups=apps,resources=daemonsets,verbs=create;update,versions=v1,name=daemonsets.wave.pusher.com,admissionReviewVersions=v1,sideEffects=NoneOnDryRun + +type DaemonSetWebhook struct { + client.Client + Handler *core.Handler +} + +func (a *DaemonSetWebhook) Default(ctx context.Context, obj runtime.Object) error { + request, err := admission.RequestFromContext(ctx) + if err != nil { + return err + } + err = a.Handler.HandleDaemonSetWebhook(obj.(*appsv1.DaemonSet), request.DryRun, request.Operation == "CREATE") + return err +} + +func AddDaemonSetWebhook(mgr manager.Manager) error { + err := builder.WebhookManagedBy(mgr).For(&appsv1.DaemonSet{}).WithDefaulter( + &DaemonSetWebhook{ + Client: mgr.GetClient(), + Handler: core.NewHandler(mgr.GetClient(), mgr.GetEventRecorderFor("wave")), + }).Complete() + + return err +} diff --git a/pkg/controller/deployment/deployment_controller_suite_test.go b/pkg/controller/deployment/deployment_controller_suite_test.go index 0da71857..49c3213e 100644 --- a/pkg/controller/deployment/deployment_controller_suite_test.go +++ b/pkg/controller/deployment/deployment_controller_suite_test.go @@ -33,6 +33,9 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/reconcile" + + admissionv1 "k8s.io/api/admissionregistration/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) var cfg *rest.Config @@ -47,8 +50,51 @@ var t *envtest.Environment var testCtx, testCancel = context.WithCancel(context.Background()) var _ = BeforeSuite(func() { + failurePolicy := admissionv1.Ignore + sideEffects := admissionv1.SideEffectClassNone + webhookPath := "/mutate-apps-v1-deployment" + webhookInstallOptions := envtest.WebhookInstallOptions{ + MutatingWebhooks: []*admissionv1.MutatingWebhookConfiguration{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "deployment-operator", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "MutatingWebhookConfiguration", + APIVersion: "admissionregistration.k8s.io/v1", + }, + Webhooks: []admissionv1.MutatingWebhook{ + { + Name: "deployments.wave.pusher.com", + AdmissionReviewVersions: []string{"v1"}, + FailurePolicy: &failurePolicy, + ClientConfig: admissionv1.WebhookClientConfig{ + Service: &admissionv1.ServiceReference{ + Path: &webhookPath, + }, + }, + Rules: []admissionv1.RuleWithOperations{ + { + Operations: []admissionv1.OperationType{ + admissionv1.Create, + admissionv1.Update, + }, + Rule: admissionv1.Rule{ + APIGroups: []string{"apps"}, + APIVersions: []string{"v1"}, + Resources: []string{"deployments"}, + }, + }, + }, + SideEffects: &sideEffects, + }, + }, + }, + }, + } t = &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crds")}, + WebhookInstallOptions: webhookInstallOptions, + CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crds")}, } apis.AddToScheme(scheme.Scheme) diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index ba887726..6e666e57 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -35,6 +35,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/controller-runtime/pkg/reconcile" + webhook "sigs.k8s.io/controller-runtime/pkg/webhook" ) var _ = Describe("Deployment controller Suite", func() { @@ -101,6 +102,11 @@ var _ = Describe("Deployment controller Suite", func() { Metrics: metricsserver.Options{ BindAddress: "0", }, + WebhookServer: webhook.NewServer(webhook.Options{ + Host: t.WebhookInstallOptions.LocalServingHost, + Port: t.WebhookInstallOptions.LocalServingPort, + CertDir: t.WebhookInstallOptions.LocalServingCertDir, + }), }) Expect(err).NotTo(HaveOccurred()) var cerr error @@ -113,6 +119,10 @@ var _ = Describe("Deployment controller Suite", func() { recFn, requestsStart, requests = SetupTestReconcile(r) Expect(add(mgr, recFn, r.handler)).NotTo(HaveOccurred()) + // register mutating pod webhook + err = AddDeploymentWebhook(mgr) + Expect(err).ToNot(HaveOccurred()) + testCtx, testCancel = context.WithCancel(context.Background()) go Run(testCtx, mgr) @@ -156,11 +166,6 @@ var _ = Describe("Deployment controller Suite", func() { m.Get(s6, timeout).Should(Succeed()) deployment = utils.ExampleDeployment.DeepCopy() - - // Create a deployment and wait for it to be reconciled - clearReconciled() - m.Create(deployment).Should(Succeed()) - waitForDeploymentReconciled(deployment) }) AfterEach(func() { @@ -174,7 +179,14 @@ var _ = Describe("Deployment controller Suite", func() { ) }) - Context("When a Deployment is reconciled", func() { + Context("When a Deployment with all children existing is reconciled", func() { + BeforeEach(func() { + // Create a deployment and wait for it to be reconciled + clearReconciled() + m.Create(deployment).Should(Succeed()) + waitForDeploymentReconciled(deployment) + }) + Context("And it has the required annotation", func() { BeforeEach(func() { addAnnotation := func(obj client.Object) client.Object { @@ -188,14 +200,18 @@ var _ = Describe("Deployment controller Suite", func() { } clearReconciled() m.Update(deployment, addAnnotation).Should(Succeed()) - // Two runs since we the controller retriggers itself by changing the object - waitForDeploymentReconciled(deployment) waitForDeploymentReconciled(deployment) // Get the updated Deployment m.Get(deployment, timeout).Should(Succeed()) }) + It("Has scheduling enabled", func() { + m.Get(deployment, timeout).Should(Succeed()) + Expect(deployment.Spec.Template.Spec.SchedulerName).To(Equal("default-scheduler")) + Expect(deployment.ObjectMeta.Annotations).NotTo(HaveKey(core.SchedulingDisabledAnnotation)) + }) + It("Adds a config hash to the Pod Template", func() { Eventually(deployment, timeout).Should(utils.WithPodTemplateAnnotations(HaveKey(core.ConfigHashAnnotation))) }) @@ -232,7 +248,6 @@ var _ = Describe("Deployment controller Suite", func() { clearReconciled() m.Update(deployment, removeContainer2).Should(Succeed()) waitForDeploymentReconciled(deployment) - waitForDeploymentReconciled(deployment) // Get the updated Deployment m.Get(deployment, timeout).Should(Succeed()) @@ -445,4 +460,43 @@ var _ = Describe("Deployment controller Suite", func() { }) }) + Context("When a Deployment with missing children is reconciled", func() { + BeforeEach(func() { + m.Delete(cm1).Should(Succeed()) + + annotations := deployment.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + annotations[core.RequiredAnnotation] = "true" + deployment.SetAnnotations(annotations) + + // Create a deployment and wait for it to be reconciled + clearReconciled() + m.Create(deployment).Should(Succeed()) + waitForDeploymentReconciled(deployment) + }) + + It("Has scheduling disabled", func() { + m.Get(deployment, timeout).Should(Succeed()) + Expect(deployment.Spec.Template.Spec.SchedulerName).To(Equal(core.SchedulingDisabledSchedulerName)) + Expect(deployment.ObjectMeta.Annotations[core.SchedulingDisabledAnnotation]).To(Equal("default-scheduler")) + }) + + Context("And the missing child is created", func() { + BeforeEach(func() { + clearReconciled() + cm1 = utils.ExampleConfigMap1.DeepCopy() + m.Create(cm1).Should(Succeed()) + waitForDeploymentReconciled(deployment) + }) + + It("Has scheduling renabled", func() { + m.Get(deployment, timeout).Should(Succeed()) + Expect(deployment.Spec.Template.Spec.SchedulerName).To(Equal("default-scheduler")) + Expect(deployment.ObjectMeta.Annotations).NotTo(HaveKey(core.SchedulingDisabledAnnotation)) + }) + }) + }) + }) diff --git a/pkg/controller/deployment/deployment_webhook.go b/pkg/controller/deployment/deployment_webhook.go new file mode 100644 index 00000000..7ba9e865 --- /dev/null +++ b/pkg/controller/deployment/deployment_webhook.go @@ -0,0 +1,39 @@ +package deployment + +import ( + "context" + + "github.com/wave-k8s/wave/pkg/core" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" +) + +// +kubebuilder:webhook:path=/mutate-apps-v1-deployment,mutating=true,failurePolicy=ignore,groups=apps,resources=deployments,verbs=create;update,versions=v1,name=deployments.wave.pusher.com,admissionReviewVersions=v1,sideEffects=NoneOnDryRun + +type DeploymentWebhook struct { + client.Client + Handler *core.Handler +} + +func (a *DeploymentWebhook) Default(ctx context.Context, obj runtime.Object) error { + request, err := admission.RequestFromContext(ctx) + if err != nil { + return err + } + err = a.Handler.HandleDeploymentWebhook(obj.(*appsv1.Deployment), request.DryRun, request.Operation == "CREATE") + return err +} + +func AddDeploymentWebhook(mgr manager.Manager) error { + err := builder.WebhookManagedBy(mgr).For(&appsv1.Deployment{}).WithDefaulter( + &DeploymentWebhook{ + Client: mgr.GetClient(), + Handler: core.NewHandler(mgr.GetClient(), mgr.GetEventRecorderFor("wave")), + }).Complete() + + return err +} diff --git a/pkg/controller/statefulset/statefulset_controller_suite_test.go b/pkg/controller/statefulset/statefulset_controller_suite_test.go index 93ddfd9f..67fc8705 100644 --- a/pkg/controller/statefulset/statefulset_controller_suite_test.go +++ b/pkg/controller/statefulset/statefulset_controller_suite_test.go @@ -33,6 +33,9 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/reconcile" + + admissionv1 "k8s.io/api/admissionregistration/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) var cfg *rest.Config @@ -47,8 +50,51 @@ var t *envtest.Environment var testCtx, testCancel = context.WithCancel(context.Background()) var _ = BeforeSuite(func() { + failurePolicy := admissionv1.Ignore + sideEffects := admissionv1.SideEffectClassNone + webhookPath := "/mutate-apps-v1-statefulset" + webhookInstallOptions := envtest.WebhookInstallOptions{ + MutatingWebhooks: []*admissionv1.MutatingWebhookConfiguration{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "statefulset-operator", + }, + TypeMeta: metav1.TypeMeta{ + Kind: "MutatingWebhookConfiguration", + APIVersion: "admissionregistration.k8s.io/v1", + }, + Webhooks: []admissionv1.MutatingWebhook{ + { + Name: "statefulsets.wave.pusher.com", + AdmissionReviewVersions: []string{"v1"}, + FailurePolicy: &failurePolicy, + ClientConfig: admissionv1.WebhookClientConfig{ + Service: &admissionv1.ServiceReference{ + Path: &webhookPath, + }, + }, + Rules: []admissionv1.RuleWithOperations{ + { + Operations: []admissionv1.OperationType{ + admissionv1.Create, + admissionv1.Update, + }, + Rule: admissionv1.Rule{ + APIGroups: []string{"apps"}, + APIVersions: []string{"v1"}, + Resources: []string{"statefulsets"}, + }, + }, + }, + SideEffects: &sideEffects, + }, + }, + }, + }, + } t = &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crds")}, + WebhookInstallOptions: webhookInstallOptions, + CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crds")}, } apis.AddToScheme(scheme.Scheme) diff --git a/pkg/controller/statefulset/statefulset_controller_test.go b/pkg/controller/statefulset/statefulset_controller_test.go index 578aa0d2..515794f6 100644 --- a/pkg/controller/statefulset/statefulset_controller_test.go +++ b/pkg/controller/statefulset/statefulset_controller_test.go @@ -35,6 +35,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/controller-runtime/pkg/reconcile" + webhook "sigs.k8s.io/controller-runtime/pkg/webhook" ) var _ = Describe("StatefulSet controller Suite", func() { @@ -95,6 +96,11 @@ var _ = Describe("StatefulSet controller Suite", func() { Metrics: metricsserver.Options{ BindAddress: "0", }, + WebhookServer: webhook.NewServer(webhook.Options{ + Host: t.WebhookInstallOptions.LocalServingHost, + Port: t.WebhookInstallOptions.LocalServingPort, + CertDir: t.WebhookInstallOptions.LocalServingCertDir, + }), }) Expect(err).NotTo(HaveOccurred()) @@ -109,6 +115,10 @@ var _ = Describe("StatefulSet controller Suite", func() { recFn, requestsStart, requests = SetupTestReconcile(r) Expect(add(mgr, recFn, r.handler)).NotTo(HaveOccurred()) + // register mutating pod webhook + err = AddStatefulSetWebhook(mgr) + Expect(err).ToNot(HaveOccurred()) + testCtx, testCancel = context.WithCancel(context.Background()) go Run(testCtx, mgr) @@ -134,11 +144,6 @@ var _ = Describe("StatefulSet controller Suite", func() { m.Get(s3, timeout).Should(Succeed()) statefulset = utils.ExampleStatefulSet.DeepCopy() - - // Create a statefulset and wait for it to be reconciled - clearReconciled() - m.Create(statefulset).Should(Succeed()) - waitForStatefulSetReconciled(statefulset) }) AfterEach(func() { @@ -152,7 +157,14 @@ var _ = Describe("StatefulSet controller Suite", func() { ) }) - Context("When a StatefulSet is reconciled", func() { + Context("When a StatefulSet with all children existing is reconciled", func() { + BeforeEach(func() { + // Create a statefulset and wait for it to be reconciled + clearReconciled() + m.Create(statefulset).Should(Succeed()) + waitForStatefulSetReconciled(statefulset) + }) + Context("And it has the required annotation", func() { BeforeEach(func() { addAnnotation := func(obj client.Object) client.Object { @@ -166,14 +178,18 @@ var _ = Describe("StatefulSet controller Suite", func() { } clearReconciled() m.Update(statefulset, addAnnotation).Should(Succeed()) - // Two runs since we the controller retriggers itself by changing the object - waitForStatefulSetReconciled(statefulset) waitForStatefulSetReconciled(statefulset) // Get the updated StatefulSet m.Get(statefulset, timeout).Should(Succeed()) }) + It("Has scheduling enabled", func() { + m.Get(statefulset, timeout).Should(Succeed()) + Expect(statefulset.Spec.Template.Spec.SchedulerName).To(Equal("default-scheduler")) + Expect(statefulset.ObjectMeta.Annotations).NotTo(HaveKey(core.SchedulingDisabledAnnotation)) + }) + It("Adds a config hash to the Pod Template", func() { Eventually(statefulset, timeout).Should(utils.WithPodTemplateAnnotations(HaveKey(core.ConfigHashAnnotation))) }) @@ -210,7 +226,6 @@ var _ = Describe("StatefulSet controller Suite", func() { clearReconciled() m.Update(statefulset, removeContainer2).Should(Succeed()) waitForStatefulSetReconciled(statefulset) - waitForStatefulSetReconciled(statefulset) // Get the updated StatefulSet m.Get(statefulset, timeout).Should(Succeed()) @@ -423,4 +438,42 @@ var _ = Describe("StatefulSet controller Suite", func() { }) }) + Context("When a Deployment with missing children is reconciled", func() { + BeforeEach(func() { + m.Delete(cm1).Should(Succeed()) + + annotations := statefulset.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + annotations[core.RequiredAnnotation] = "true" + statefulset.SetAnnotations(annotations) + + // Create a statefulset and wait for it to be reconciled + clearReconciled() + m.Create(statefulset).Should(Succeed()) + waitForStatefulSetReconciled(statefulset) + }) + + It("Has scheduling disabled", func() { + m.Get(statefulset, timeout).Should(Succeed()) + Expect(statefulset.Spec.Template.Spec.SchedulerName).To(Equal(core.SchedulingDisabledSchedulerName)) + Expect(statefulset.ObjectMeta.Annotations[core.SchedulingDisabledAnnotation]).To(Equal("default-scheduler")) + }) + + Context("And the missing child is created", func() { + BeforeEach(func() { + clearReconciled() + cm1 = utils.ExampleConfigMap1.DeepCopy() + m.Create(cm1).Should(Succeed()) + waitForStatefulSetReconciled(statefulset) + }) + + It("Has scheduling renabled", func() { + m.Get(statefulset, timeout).Should(Succeed()) + Expect(statefulset.Spec.Template.Spec.SchedulerName).To(Equal("default-scheduler")) + Expect(statefulset.ObjectMeta.Annotations).NotTo(HaveKey(core.SchedulingDisabledAnnotation)) + }) + }) + }) }) diff --git a/pkg/controller/statefulset/statefulset_webhook.go b/pkg/controller/statefulset/statefulset_webhook.go new file mode 100644 index 00000000..ddb6c9ae --- /dev/null +++ b/pkg/controller/statefulset/statefulset_webhook.go @@ -0,0 +1,39 @@ +package statefulset + +import ( + "context" + + "github.com/wave-k8s/wave/pkg/core" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" +) + +// +kubebuilder:webhook:path=/mutate-apps-v1-statefulset,mutating=true,failurePolicy=ignore,groups=apps,resources=statefulsets,verbs=create;update,versions=v1,name=statefulsets.wave.pusher.com,admissionReviewVersions=v1,sideEffects=NoneOnDryRun + +type StatefulSetWebhook struct { + client.Client + Handler *core.Handler +} + +func (a *StatefulSetWebhook) Default(ctx context.Context, obj runtime.Object) error { + request, err := admission.RequestFromContext(ctx) + if err != nil { + return err + } + err = a.Handler.HandleStatefulSetWebhook(obj.(*appsv1.StatefulSet), request.DryRun, request.Operation == "CREATE") + return err +} + +func AddStatefulSetWebhook(mgr manager.Manager) error { + err := builder.WebhookManagedBy(mgr).For(&appsv1.StatefulSet{}).WithDefaulter( + &StatefulSetWebhook{ + Client: mgr.GetClient(), + Handler: core.NewHandler(mgr.GetClient(), mgr.GetEventRecorderFor("wave")), + }).Complete() + + return err +} diff --git a/pkg/core/children.go b/pkg/core/children.go index 9b109979..7d9b3513 100644 --- a/pkg/core/children.go +++ b/pkg/core/children.go @@ -22,6 +22,7 @@ import ( "strings" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -38,10 +39,21 @@ type configMetadata struct { keys map[string]struct{} } +type configMetadataMap map[types.NamespacedName]configMetadata + +type NotFoundError struct { + string +} + +func (e *NotFoundError) Error() string { + return e.string +} + // getResult is returned from the getObject method as a helper struct to be // passed into a channel type getResult struct { err error + notFound bool obj Object metadata configMetadata } @@ -50,29 +62,32 @@ type getResult struct { // referenced in the Deployment's spec. Any reference to a whole ConfigMap or Secret // (i.e. via an EnvFrom or a Volume) will result in one entry in the list, irrespective of // whether individual elements are also references (i.e. via an Env entry). -func (h *Handler) getCurrentChildren(obj podController) ([]configObject, error) { - configMaps, secrets := getChildNamesByType(obj) - +func (h *Handler) getCurrentChildren(configMaps configMetadataMap, secrets configMetadataMap) ([]configObject, error) { // get all of ConfigMaps and Secrets resultsChan := make(chan getResult) for name, metadata := range configMaps { - go func(name string, metadata configMetadata) { - resultsChan <- h.getConfigMap(obj.GetNamespace(), name, metadata) + go func(name types.NamespacedName, metadata configMetadata) { + resultsChan <- h.getConfigMap(name, metadata) }(name, metadata) } for name, metadata := range secrets { - go func(name string, metadata configMetadata) { - resultsChan <- h.getSecret(obj.GetNamespace(), name, metadata) + go func(name types.NamespacedName, metadata configMetadata) { + resultsChan <- h.getSecret(name, metadata) }(name, metadata) } // Range over and collect results from the gets var errs []string + var notFoundErrs []string var children []configObject for i := 0; i < len(configMaps)+len(secrets); i++ { result := <-resultsChan if result.err != nil { - errs = append(errs, result.err.Error()) + if result.notFound { + notFoundErrs = append(notFoundErrs, result.err.Error()) + } else { + errs = append(errs, result.err.Error()) + } } if result.obj != nil { children = append(children, configObject{ @@ -89,6 +104,11 @@ func (h *Handler) getCurrentChildren(obj podController) ([]configObject, error) return []configObject{}, fmt.Errorf("error(s) encountered when geting children: %s", strings.Join(errs, ", ")) } + // If there we did not find required elements + if len(notFoundErrs) > 0 { + return []configObject{}, &NotFoundError{fmt.Sprintf("required children not found: %s", strings.Join(notFoundErrs, ", "))} + } + // No errors, return the list of children return children, nil } @@ -96,43 +116,43 @@ func (h *Handler) getCurrentChildren(obj podController) ([]configObject, error) // getChildNamesByType parses the Deployment object and returns two maps, // the first containing ConfigMap metadata for all referenced ConfigMaps, keyed on the name of the ConfigMap, // the second containing Secret metadata for all referenced Secrets, keyed on the name of the Secrets -func getChildNamesByType(obj podController) (map[string]configMetadata, map[string]configMetadata) { +func getChildNamesByType(obj podController) (configMetadataMap, configMetadataMap) { // Create sets for storing the names fo the ConfigMaps/Secrets - configMaps := make(map[string]configMetadata) - secrets := make(map[string]configMetadata) + configMaps := make(configMetadataMap) + secrets := make(configMetadataMap) // Range through all Volumes and check the VolumeSources for ConfigMaps // and Secrets for _, vol := range obj.GetPodTemplate().Spec.Volumes { if cm := vol.VolumeSource.ConfigMap; cm != nil { - configMaps[cm.Name] = configMetadata{required: isRequired(cm.Optional), allKeys: true} + configMaps[GetNamespacedName(cm.Name, obj.GetNamespace())] = configMetadata{required: isRequired(cm.Optional), allKeys: true} } if s := vol.VolumeSource.Secret; s != nil { - secrets[s.SecretName] = configMetadata{required: isRequired(s.Optional), allKeys: true} + secrets[GetNamespacedName(s.SecretName, obj.GetNamespace())] = configMetadata{required: isRequired(s.Optional), allKeys: true} } if projection := vol.VolumeSource.Projected; projection != nil { for _, source := range projection.Sources { if cm := source.ConfigMap; cm != nil { if cm.Items == nil { - configMaps[cm.Name] = configMetadata{required: isRequired(cm.Optional), allKeys: true} + configMaps[GetNamespacedName(cm.Name, obj.GetNamespace())] = configMetadata{required: isRequired(cm.Optional), allKeys: true} } else { keys := make(map[string]struct{}) for _, item := range cm.Items { keys[item.Key] = struct{}{} } - configMaps[cm.Name] = configMetadata{required: isRequired(cm.Optional), allKeys: false, keys: keys} + configMaps[GetNamespacedName(cm.Name, obj.GetNamespace())] = configMetadata{required: isRequired(cm.Optional), allKeys: false, keys: keys} } } if s := source.Secret; s != nil { if s.Items == nil { - secrets[s.Name] = configMetadata{required: isRequired(s.Optional), allKeys: true} + secrets[GetNamespacedName(s.Name, obj.GetNamespace())] = configMetadata{required: isRequired(s.Optional), allKeys: true} } else { keys := make(map[string]struct{}) for _, item := range s.Items { keys[item.Key] = struct{}{} } - secrets[s.Name] = configMetadata{required: isRequired(s.Optional), allKeys: false, keys: keys} + secrets[GetNamespacedName(s.Name, obj.GetNamespace())] = configMetadata{required: isRequired(s.Optional), allKeys: false, keys: keys} } } } @@ -144,10 +164,10 @@ func getChildNamesByType(obj podController) (map[string]configMetadata, map[stri for _, container := range obj.GetPodTemplate().Spec.Containers { for _, env := range container.EnvFrom { if cm := env.ConfigMapRef; cm != nil { - configMaps[cm.Name] = configMetadata{required: isRequired(cm.Optional), allKeys: true} + configMaps[GetNamespacedName(cm.Name, obj.GetNamespace())] = configMetadata{required: isRequired(cm.Optional), allKeys: true} } if s := env.SecretRef; s != nil { - secrets[s.Name] = configMetadata{required: isRequired(s.Optional), allKeys: true} + secrets[GetNamespacedName(s.Name, obj.GetNamespace())] = configMetadata{required: isRequired(s.Optional), allKeys: true} } } } @@ -157,10 +177,10 @@ func getChildNamesByType(obj podController) (map[string]configMetadata, map[stri for _, env := range container.Env { if valFrom := env.ValueFrom; valFrom != nil { if cm := valFrom.ConfigMapKeyRef; cm != nil { - configMaps[cm.Name] = parseConfigMapKeyRef(configMaps[cm.Name], cm) + configMaps[GetNamespacedName(cm.Name, obj.GetNamespace())] = parseConfigMapKeyRef(configMaps[GetNamespacedName(cm.Name, obj.GetNamespace())], cm) } if s := valFrom.SecretKeyRef; s != nil { - secrets[s.Name] = parseSecretKeyRef(secrets[s.Name], s) + secrets[GetNamespacedName(s.Name, obj.GetNamespace())] = parseSecretKeyRef(secrets[GetNamespacedName(s.Name, obj.GetNamespace())], s) } } } @@ -203,28 +223,31 @@ func parseSecretKeyRef(metadata configMetadata, s *corev1.SecretKeySelector) con // getConfigMap gets a ConfigMap with the given name and namespace from the // API server. -func (h *Handler) getConfigMap(namespace, name string, metadata configMetadata) getResult { - return h.getObject(namespace, name, metadata, &corev1.ConfigMap{}) +func (h *Handler) getConfigMap(name types.NamespacedName, metadata configMetadata) getResult { + return h.getObject(name, metadata, &corev1.ConfigMap{}) } // getSecret gets a Secret with the given name and namespace from the // API server. -func (h *Handler) getSecret(namespace, name string, metadata configMetadata) getResult { - return h.getObject(namespace, name, metadata, &corev1.Secret{}) +func (h *Handler) getSecret(name types.NamespacedName, metadata configMetadata) getResult { + return h.getObject(name, metadata, &corev1.Secret{}) } // getObject gets the Object with the given name and namespace from the API // server -func (h *Handler) getObject(namespace, name string, metadata configMetadata, obj Object) getResult { - objectName := types.NamespacedName{Namespace: namespace, Name: name} +func (h *Handler) getObject(objectName types.NamespacedName, metadata configMetadata, obj Object) getResult { err := h.Get(context.TODO(), objectName, obj) if err != nil { - if metadata.required { - return getResult{err: err} + if errors.IsNotFound(err) { + if metadata.required { + return getResult{err: err, notFound: true} + } + return getResult{metadata: metadata, notFound: true} + } else { + return getResult{err: err, notFound: false} } - return getResult{metadata: metadata} } - return getResult{obj: obj, metadata: metadata} + return getResult{obj: obj, metadata: metadata, notFound: false} } // getExistingChildren returns a list of all Secrets and ConfigMaps that are diff --git a/pkg/core/children_test.go b/pkg/core/children_test.go index aa60d23e..4dba0f53 100644 --- a/pkg/core/children_test.go +++ b/pkg/core/children_test.go @@ -138,7 +138,8 @@ var _ = Describe("Wave children Suite", func() { Context("getCurrentChildren", func() { BeforeEach(func() { var err error - currentChildren, err = h.getCurrentChildren(podControllerDeployment) + configMaps, secrets := getChildNamesByType(podControllerDeployment) + currentChildren, err = h.getCurrentChildren(configMaps, secrets) Expect(err).NotTo(HaveOccurred()) }) @@ -259,94 +260,107 @@ var _ = Describe("Wave children Suite", func() { m.Delete(s2).Should(Succeed()) m.Get(s2, timeout).ShouldNot(Succeed()) - current, err := h.getCurrentChildren(podControllerDeployment) + configMaps, secrets := getChildNamesByType(podControllerDeployment) + current, err := h.getCurrentChildren(configMaps, secrets) Expect(err).To(HaveOccurred()) Expect(current).To(BeEmpty()) }) }) Context("getChildNamesByType", func() { - var configMaps map[string]configMetadata - var secrets map[string]configMetadata + var configMaps configMetadataMap + var secrets configMetadataMap BeforeEach(func() { configMaps, secrets = getChildNamesByType(podControllerDeployment) }) It("returns ConfigMaps referenced in Volumes", func() { - Expect(configMaps).To(HaveKeyWithValue(cm1.GetName(), configMetadata{required: true, allKeys: true})) + Expect(configMaps).To(HaveKeyWithValue(GetNamespacedName(cm1.GetName(), podControllerDeployment.GetNamespace()), + configMetadata{required: true, allKeys: true})) }) It("optional ConfigMaps referenced in Volumes are returned as optional", func() { - Expect(configMaps).To(HaveKeyWithValue("volume-optional", configMetadata{required: false, allKeys: true})) + Expect(configMaps).To(HaveKeyWithValue(GetNamespacedName("volume-optional", podControllerDeployment.GetNamespace()), + configMetadata{required: false, allKeys: true})) }) It("optional Secrets referenced in Volumes are returned as optional", func() { - Expect(secrets).To(HaveKeyWithValue("volume-optional", configMetadata{required: false, allKeys: true})) + Expect(secrets).To(HaveKeyWithValue(GetNamespacedName("volume-optional", podControllerDeployment.GetNamespace()), + configMetadata{required: false, allKeys: true})) }) It("returns ConfigMaps referenced in EnvFrom", func() { - Expect(configMaps).To(HaveKeyWithValue(cm2.GetName(), configMetadata{required: true, allKeys: true})) + Expect(configMaps).To(HaveKeyWithValue(GetNamespacedName(cm2.GetName(), podControllerDeployment.GetNamespace()), + configMetadata{required: true, allKeys: true})) }) It("optional ConfigMaps referenced in EnvFrom are returned as optional", func() { - Expect(configMaps).To(HaveKeyWithValue("envfrom-optional", configMetadata{required: false, allKeys: true})) + Expect(configMaps).To(HaveKeyWithValue(GetNamespacedName("envfrom-optional", podControllerDeployment.GetNamespace()), + configMetadata{required: false, allKeys: true})) }) It("returns ConfigMaps referenced in Env", func() { - Expect(configMaps).To(HaveKeyWithValue(cm3.GetName(), configMetadata{ - required: true, - allKeys: false, - keys: map[string]struct{}{ - "key1": {}, - "key2": {}, - "key4": {}, - }, - })) + Expect(configMaps).To(HaveKeyWithValue(GetNamespacedName(cm3.GetName(), podControllerDeployment.GetNamespace()), + configMetadata{ + required: true, + allKeys: false, + keys: map[string]struct{}{ + "key1": {}, + "key2": {}, + "key4": {}, + }, + })) }) It("returns ConfigMaps referenced in Env as optional correctly", func() { - Expect(configMaps).To(HaveKeyWithValue("env-optional", configMetadata{ - required: false, - allKeys: false, - keys: map[string]struct{}{ - "key2": {}, - }, - })) + Expect(configMaps).To(HaveKeyWithValue(GetNamespacedName("env-optional", podControllerDeployment.GetNamespace()), + configMetadata{ + required: false, + allKeys: false, + keys: map[string]struct{}{ + "key2": {}, + }, + })) }) It("returns Secrets referenced in Volumes", func() { - Expect(secrets).To(HaveKeyWithValue(s1.GetName(), configMetadata{required: true, allKeys: true})) + Expect(secrets).To(HaveKeyWithValue(GetNamespacedName(s1.GetName(), podControllerDeployment.GetNamespace()), + configMetadata{required: true, allKeys: true})) }) It("returns Secrets referenced in EnvFrom", func() { - Expect(secrets).To(HaveKeyWithValue(s2.GetName(), configMetadata{required: true, allKeys: true})) + Expect(secrets).To(HaveKeyWithValue(GetNamespacedName(s2.GetName(), podControllerDeployment.GetNamespace()), + configMetadata{required: true, allKeys: true})) }) It("optional Secrets referenced in EnvFrom are returned as optional", func() { - Expect(secrets).To(HaveKeyWithValue("envfrom-optional", configMetadata{required: false, allKeys: true})) + Expect(secrets).To(HaveKeyWithValue(GetNamespacedName("envfrom-optional", podControllerDeployment.GetNamespace()), + configMetadata{required: false, allKeys: true})) }) It("returns Secrets referenced in Env", func() { - Expect(secrets).To(HaveKeyWithValue(s3.GetName(), configMetadata{ - required: true, - allKeys: false, - keys: map[string]struct{}{ - "key1": {}, - "key2": {}, - "key4": {}, - }, - })) + Expect(secrets).To(HaveKeyWithValue(GetNamespacedName(s3.GetName(), podControllerDeployment.GetNamespace()), + configMetadata{ + required: true, + allKeys: false, + keys: map[string]struct{}{ + "key1": {}, + "key2": {}, + "key4": {}, + }, + })) }) It("returns secrets referenced in Env as optional correctly", func() { - Expect(secrets).To(HaveKeyWithValue("env-optional", configMetadata{ - required: false, - allKeys: false, - keys: map[string]struct{}{ - "key2": {}, - }, - })) + Expect(secrets).To(HaveKeyWithValue(GetNamespacedName("env-optional", podControllerDeployment.GetNamespace()), + configMetadata{ + required: false, + allKeys: false, + keys: map[string]struct{}{ + "key2": {}, + }, + })) }) It("does not return extra children", func() { diff --git a/pkg/core/handler.go b/pkg/core/handler.go index 588eddf6..67be9198 100644 --- a/pkg/core/handler.go +++ b/pkg/core/handler.go @@ -48,6 +48,21 @@ func (h *Handler) HandleDeployment(instance *appsv1.Deployment) (reconcile.Resul return h.handlePodController(&deployment{Deployment: instance}) } +// HandleDeploymentWebhook is called by the deployment webhook +func (h *Handler) HandleDeploymentWebhook(instance *appsv1.Deployment, dryRun *bool, isCreate bool) error { + return h.updatePodController(&deployment{Deployment: instance}, (dryRun != nil && *dryRun), isCreate) +} + +// HandleStatefulSetWebhook is called by the statefulset webhook +func (h *Handler) HandleStatefulSetWebhook(instance *appsv1.StatefulSet, dryRun *bool, isCreate bool) error { + return h.updatePodController(&statefulset{StatefulSet: instance}, (dryRun != nil && *dryRun), isCreate) +} + +// HandleDaemonSetWebhook is called by the daemonset webhook +func (h *Handler) HandleDaemonSetWebhook(instance *appsv1.DaemonSet, dryRun *bool, isCreate bool) error { + return h.updatePodController(&daemonset{DaemonSet: instance}, (dryRun != nil && *dryRun), isCreate) +} + // HandleStatefulSet is called by the StatefulSet controller to reconcile StatefulSets func (h *Handler) HandleStatefulSet(instance *appsv1.StatefulSet) (reconcile.Result, error) { return h.handlePodController(&statefulset{StatefulSet: instance}) @@ -74,15 +89,23 @@ func (h *Handler) handlePodController(instance podController) (reconcile.Result, return reconcile.Result{}, nil } - // Get all children that the instance currently references - current, err := h.getCurrentChildren(instance) + log.V(5).Info("Reconciling") + + // Get all children and add watches + configMaps, secrets := getChildNamesByType(instance) + h.removeWatchesForInstance(instance) + h.watchChildrenForInstance(instance, configMaps, secrets) + + // Get content of children + current, err := h.getCurrentChildren(configMaps, secrets) if err != nil { + if _, ok := err.(*NotFoundError); ok { + // We are missing children but we added watchers for all children so we are done + return reconcile.Result{}, nil + } return reconcile.Result{}, fmt.Errorf("error fetching current children: %v", err) } - h.removeWatchesForInstance(instance) - h.watchChildrenForInstance(instance, current) - hash, err := calculateConfigHash(current) if err != nil { return reconcile.Result{}, fmt.Errorf("error calculating configuration hash: %v", err) @@ -92,6 +115,10 @@ func (h *Handler) handlePodController(instance podController) (reconcile.Result, copy := instance.DeepCopyPodController() setConfigHash(copy, hash) + if isSchedulingDisabled(copy) { + restoreScheduling(copy) + } + // If the desired state doesn't match the existing state, update it if !reflect.DeepEqual(instance, copy) { log.V(0).Info("Updating instance hash", "hash", hash) @@ -104,3 +131,47 @@ func (h *Handler) handlePodController(instance podController) (reconcile.Result, } return reconcile.Result{}, nil } + +// handlePodController will only update the hash. Everything else is left to the reconciler. +func (h *Handler) updatePodController(instance podController, dryRun bool, isCreate bool) error { + log := logf.Log.WithName("wave").WithValues("namespace", instance.GetNamespace(), "name", instance.GetName(), "dryRun", dryRun, "isCreate", isCreate) + log.V(5).Info("Running webhook") + + // If the required annotation isn't present, ignore the instance + if !hasRequiredAnnotation(instance) { + return nil + } + + // Get all children that the instance currently references + configMaps, secrets := getChildNamesByType(instance) + current, err := h.getCurrentChildren(configMaps, secrets) + if err != nil { + if _, ok := err.(*NotFoundError); ok { + if isCreate { + log.V(0).Info("Not all required children found yet. Disabling scheduling!", "err", err) + disableScheduling(instance) + } else { + log.V(0).Info("Not all required children found yet. Skipping mutation!", "err", err) + } + return nil + } else { + return fmt.Errorf("error fetching current children: %v", err) + } + } + + hash, err := calculateConfigHash(current) + if err != nil { + return fmt.Errorf("error calculating configuration hash: %v", err) + } + + // Update the desired state of the Deployment + oldHash := getConfigHash(instance) + setConfigHash(instance, hash) + + if !dryRun && oldHash != hash { + log.V(0).Info("Updating instance hash", "hash", hash) + h.recorder.Eventf(instance.GetApiObject(), corev1.EventTypeNormal, "ConfigChanged", "Configuration hash updated to %s", hash) + } + + return nil +} diff --git a/pkg/core/hash.go b/pkg/core/hash.go index 85befda8..c179f73b 100644 --- a/pkg/core/hash.go +++ b/pkg/core/hash.go @@ -105,7 +105,7 @@ func getSecretData(child configObject) map[string][]byte { return keyData } -// setConfigHash upates the configuration hash of the given Deployment to the +// setConfigHash updates the configuration hash of the given Deployment to the // given string func setConfigHash(obj podController, hash string) { // Get the existing annotations @@ -120,3 +120,9 @@ func setConfigHash(obj podController, hash string) { podTemplate.SetAnnotations(annotations) obj.SetPodTemplate(podTemplate) } + +// getConfigHash return the config hash string +func getConfigHash(obj podController) string { + podTemplate := obj.GetPodTemplate() + return podTemplate.GetAnnotations()[ConfigHashAnnotation] +} diff --git a/pkg/core/scheduler.go b/pkg/core/scheduler.go new file mode 100644 index 00000000..1bfdf163 --- /dev/null +++ b/pkg/core/scheduler.go @@ -0,0 +1,56 @@ +package core + +// disableScheduling sets an invalid scheduler and adds an annotation with the original scheduler +func disableScheduling(obj podController) { + if isSchedulingDisabled(obj) { + return + } + + // Get the existing annotations + annotations := obj.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + + // Store previous scheduler in annotation + schedulerName := obj.GetPodTemplate().Spec.SchedulerName + annotations[SchedulingDisabledAnnotation] = schedulerName + obj.SetAnnotations(annotations) + + // Set invalid scheduler + podTemplate := obj.GetPodTemplate() + podTemplate.Spec.SchedulerName = SchedulingDisabledSchedulerName + obj.SetPodTemplate(podTemplate) +} + +// isSchedulingDisabled returns true if scheduling has been disabled by wave +func isSchedulingDisabled(obj podController) bool { + // Get the existing annotations + annotations := obj.GetAnnotations() + if annotations == nil { + return false + } + _, ok := annotations[SchedulingDisabledAnnotation] + return ok +} + +// enableScheduling restore scheduling if it has been disabled by wave +func restoreScheduling(obj podController) { + // Get the existing annotations + annotations := obj.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + schedulerName, ok := annotations[SchedulingDisabledAnnotation] + if !ok { + // Scheduling has not been disabled + return + } + delete(annotations, SchedulingDisabledAnnotation) + obj.SetAnnotations(annotations) + + // Restore scheduler + podTemplate := obj.GetPodTemplate() + podTemplate.Spec.SchedulerName = schedulerName + obj.SetPodTemplate(podTemplate) +} diff --git a/pkg/core/scheduler_test.go b/pkg/core/scheduler_test.go new file mode 100644 index 00000000..2a4a2db5 --- /dev/null +++ b/pkg/core/scheduler_test.go @@ -0,0 +1,76 @@ +/* +Copyright 2018 Pusher Ltd. and Wave Contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package core + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/wave-k8s/wave/test/utils" + appsv1 "k8s.io/api/apps/v1" +) + +var _ = Describe("Wave scheduler Suite", func() { + var deploymentObject *appsv1.Deployment + var podControllerDeployment podController + + BeforeEach(func() { + deploymentObject = utils.ExampleDeployment.DeepCopy() + podControllerDeployment = &deployment{deploymentObject} + }) + + Context("When scheduler is disabled", func() { + BeforeEach(func() { + disableScheduling(podControllerDeployment) + }) + + It("Sets the annotations and stores the previous scheduler", func() { + annotations := podControllerDeployment.GetAnnotations() + Expect(annotations[SchedulingDisabledAnnotation]).To(Equal("default-scheduler")) + }) + + It("Disables scheduling", func() { + podTemplate := podControllerDeployment.GetPodTemplate() + Expect(podTemplate.Spec.SchedulerName).To(Equal(SchedulingDisabledSchedulerName)) + }) + + It("Is reports as disabled", func() { + Expect(isSchedulingDisabled(podControllerDeployment)).To(BeTrue()) + }) + + Context("And Is Restored", func() { + BeforeEach(func() { + restoreScheduling(podControllerDeployment) + }) + + It("Removes the annotations", func() { + annotations := podControllerDeployment.GetAnnotations() + Expect(annotations).NotTo(HaveKey(SchedulingDisabledAnnotation)) + }) + + It("Restores the scheduler", func() { + podTemplate := podControllerDeployment.GetPodTemplate() + Expect(podTemplate.Spec.SchedulerName).To(Equal("default-scheduler")) + }) + + It("Is does not report as disabled", func() { + Expect(isSchedulingDisabled(podControllerDeployment)).To(BeFalse()) + }) + + }) + + }) +}) diff --git a/pkg/core/types.go b/pkg/core/types.go index 7fd4e739..8bc1a945 100644 --- a/pkg/core/types.go +++ b/pkg/core/types.go @@ -4,6 +4,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -16,6 +17,13 @@ const ( // perform advanced deletion logic FinalizerString = "wave.pusher.com/finalizer" + // SchedulingDisabledAnnotation is set on a deployment if scheduling has been disabled + // due to missing children and contains the original scheduler + SchedulingDisabledAnnotation = "wave.pusher.com/scheduling-disabled" + + // SchedulingDisabledSchedulerName is the dummy scheduler to disable scheduling of pods + SchedulingDisabledSchedulerName = "wave.pusher.com/invalid" + // RequiredAnnotation is the key of the annotation on the Deployment that Wave // checks for before processing the deployment RequiredAnnotation = "wave.pusher.com/update-on-config-change" @@ -125,3 +133,22 @@ func (d *daemonset) GetApiObject() client.Object { ObjectMeta: d.ObjectMeta, } } + +func GetNamespacedName(name string, namespace string) types.NamespacedName { + return types.NamespacedName{ + Name: name, + Namespace: namespace, + } +} + +type ObjectWithNameAndNamespace interface { + GetNamespace() string + GetName() string +} + +func GetNamespacedNameFromObject(obj ObjectWithNameAndNamespace) types.NamespacedName { + return types.NamespacedName{ + Name: obj.GetName(), + Namespace: obj.GetNamespace(), + } +} diff --git a/pkg/core/watcher.go b/pkg/core/watcher.go index 7c4aaedc..1b167d5e 100644 --- a/pkg/core/watcher.go +++ b/pkg/core/watcher.go @@ -3,7 +3,6 @@ package core import ( "context" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" @@ -50,10 +49,7 @@ func (e *enqueueRequestForWatcher) Generic(ctx context.Context, evt event.Generi // queueOwnerReconcileRequest looks the object up in our watchList and queues reconcile.Request to reconcile // all owners of object func (e *enqueueRequestForWatcher) queueOwnerReconcileRequest(object metav1.Object, q workqueue.RateLimitingInterface) { - name := types.NamespacedName{ - Name: object.GetName(), - Namespace: object.GetNamespace(), - } + name := GetNamespacedNameFromObject(object) if watchers, ok := e.watcherList[name]; ok { for watcher := range watchers { request := reconcile.Request{NamespacedName: watcher} @@ -70,40 +66,25 @@ func (h *Handler) GetWatchedSecrets() map[types.NamespacedName]map[types.Namespa return h.watchedSecrets } -func (h *Handler) watchChildrenForInstance(instance podController, children []configObject) { - instanceName := types.NamespacedName{ - Name: instance.GetName(), - Namespace: instance.GetNamespace(), - } - for _, child := range children { - childName := types.NamespacedName{ - Name: child.object.GetName(), - Namespace: child.object.GetNamespace(), - } +func (h *Handler) watchChildrenForInstance(instance podController, configMaps configMetadataMap, secrets configMetadataMap) { + instanceName := GetNamespacedNameFromObject(instance) + for childName := range configMaps { - switch child.object.(type) { - case *corev1.ConfigMap: - if _, ok := h.watchedConfigmaps[childName]; !ok { - h.watchedConfigmaps[childName] = map[types.NamespacedName]bool{} - } - h.watchedConfigmaps[childName][instanceName] = true - case *corev1.Secret: - if _, ok := h.watchedSecrets[childName]; !ok { - h.watchedSecrets[childName] = map[types.NamespacedName]bool{} - } - h.watchedSecrets[childName][instanceName] = true - default: - panic(child.object) + if _, ok := h.watchedConfigmaps[childName]; !ok { + h.watchedConfigmaps[childName] = map[types.NamespacedName]bool{} } + h.watchedConfigmaps[childName][instanceName] = true + } + for childName := range secrets { + if _, ok := h.watchedSecrets[childName]; !ok { + h.watchedSecrets[childName] = map[types.NamespacedName]bool{} + } + h.watchedSecrets[childName][instanceName] = true } } func (h *Handler) removeWatchesForInstance(instance podController) { - instanceName := types.NamespacedName{ - Name: instance.GetName(), - Namespace: instance.GetNamespace(), - } - h.RemoveWatches(instanceName) + h.RemoveWatches(GetNamespacedNameFromObject(instance)) } func (h *Handler) RemoveWatches(instanceName types.NamespacedName) { diff --git a/test/utils/test_objects.go b/test/utils/test_objects.go index 61e0b1df..1429ac63 100644 --- a/test/utils/test_objects.go +++ b/test/utils/test_objects.go @@ -44,6 +44,7 @@ var ExampleDeployment = &appsv1.Deployment{ Labels: labels, }, Spec: corev1.PodSpec{ + SchedulerName: "default-scheduler", Volumes: []corev1.Volume{ { Name: "secret1",