From 295cf4e6a21443b013c1203c0bf7f6718e5cc457 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Mon, 31 Aug 2020 13:54:00 -0400 Subject: [PATCH] eager pingsource adapter creation --- config/500-pingsource-mt-adapter.yaml | 1 + config/core/deployments/controller.yaml | 3 - .../deployments/pingsource-mt-adapter.yaml | 61 ++++++++++ pkg/reconciler/pingsource/controller.go | 26 +---- pkg/reconciler/pingsource/controller_test.go | 108 +++++------------- pkg/reconciler/pingsource/pingsource.go | 41 ++++--- pkg/reconciler/pingsource/pingsource_test.go | 30 ++--- .../pingsource/resources/receive_adapter.go | 93 ++++----------- .../resources/receive_adapter_test.go | 45 +------- 9 files changed, 167 insertions(+), 241 deletions(-) create mode 120000 config/500-pingsource-mt-adapter.yaml create mode 100644 config/core/deployments/pingsource-mt-adapter.yaml diff --git a/config/500-pingsource-mt-adapter.yaml b/config/500-pingsource-mt-adapter.yaml new file mode 120000 index 00000000000..849b9341ff6 --- /dev/null +++ b/config/500-pingsource-mt-adapter.yaml @@ -0,0 +1 @@ +core/deployments/pingsource-mt-adapter.yaml \ No newline at end of file diff --git a/config/core/deployments/controller.yaml b/config/core/deployments/controller.yaml index 03b341c4f01..4279dddee2b 100644 --- a/config/core/deployments/controller.yaml +++ b/config/core/deployments/controller.yaml @@ -65,9 +65,6 @@ spec: value: config-observability - name: METRICS_DOMAIN value: knative.dev/eventing - # PingSource - - name: MT_PING_IMAGE - value: ko://knative.dev/eventing/cmd/mtping # APIServerSource - name: APISERVER_RA_IMAGE value: ko://knative.dev/eventing/cmd/apiserver_receive_adapter diff --git a/config/core/deployments/pingsource-mt-adapter.yaml b/config/core/deployments/pingsource-mt-adapter.yaml new file mode 100644 index 00000000000..7c4f670065b --- /dev/null +++ b/config/core/deployments/pingsource-mt-adapter.yaml @@ -0,0 +1,61 @@ +# Copyright 2018 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: pingsource-mt-adapter + namespace: knative-eventing + labels: + eventing.knative.dev/release: devel +spec: + replicas: 1 + selector: + matchLabels: + eventing.knative.dev/source: ping-source-controller + sources.knative.dev/role: adapter + template: + metadata: + labels: + eventing.knative.dev/source: ping-source-controller + sources.knative.dev/role: adapter + eventing.knative.dev/release: devel + spec: + containers: + - name: dispatcher + image: ko://knative.dev/eventing/cmd/mtping + env: + - name: SYSTEM_NAMESPACE + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.namespace + - name: K_METRICS_CONFIG + value: '' + - name: K_LOGGING_CONFIG + value: '' + - name: K_LOGGING_CONFIG + value: '' + ports: + - containerPort: 9090 + name: metrics + protocol: TCP + resources: + requests: + cpu: 125m + memory: 64Mi + limits: + cpu: 1000m + memory: 2048Mi + serviceAccountName: pingsource-mt-adapter diff --git a/pkg/reconciler/pingsource/controller.go b/pkg/reconciler/pingsource/controller.go index 1cfe2da6576..c48bad1e9bc 100644 --- a/pkg/reconciler/pingsource/controller.go +++ b/pkg/reconciler/pingsource/controller.go @@ -19,7 +19,6 @@ package pingsource import ( "context" - "github.com/kelseyhightower/envconfig" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" "k8s.io/client-go/tools/cache" @@ -40,13 +39,6 @@ import ( reconcilersource "knative.dev/eventing/pkg/reconciler/source" ) -// envConfig will be used to extract the required environment variables using -// github.com/kelseyhightower/envconfig. If this configuration cannot be extracted, then -// NewController will panic. -type envConfig struct { - Image string `envconfig:"MT_PING_IMAGE" required:"true"` -} - // NewController initializes the controller and is called by the generated code // Registers event handlers to enqueue events func NewController( @@ -55,11 +47,6 @@ func NewController( ) *controller.Impl { logger := logging.FromContext(ctx) - env := &envConfig{} - if err := envconfig.Process("", env); err != nil { - logger.Fatalw("unable to process PingSourceSource's required environment variables: %v", err) - } - // Retrieve leader election config leaderElectionConfig, err := sharedmain.GetLeaderElectionConfig(ctx) if err != nil { @@ -78,13 +65,12 @@ func NewController( pingSourceInformer := pingsourceinformer.Get(ctx) r := &Reconciler{ - kubeClientSet: kubeclient.Get(ctx), - pingLister: pingSourceInformer.Lister(), - deploymentLister: deploymentInformer.Lister(), - leConfig: leConfig, - loggingContext: ctx, - configs: reconcilersource.WatchConfigurations(ctx, component, cmw), - receiveAdapterImage: env.Image, + kubeClientSet: kubeclient.Get(ctx), + pingLister: pingSourceInformer.Lister(), + deploymentLister: deploymentInformer.Lister(), + leConfig: leConfig, + loggingContext: ctx, + configs: reconcilersource.WatchConfigurations(ctx, component, cmw), } impl := pingsourcereconciler.NewImpl(ctx, r) diff --git a/pkg/reconciler/pingsource/controller_test.go b/pkg/reconciler/pingsource/controller_test.go index 68691a713a8..0de90d7360b 100644 --- a/pkg/reconciler/pingsource/controller_test.go +++ b/pkg/reconciler/pingsource/controller_test.go @@ -17,7 +17,6 @@ limitations under the License. package pingsource import ( - "os" "testing" corev1 "k8s.io/api/core/v1" @@ -35,83 +34,38 @@ import ( ) func TestNew(t *testing.T) { - testCases := map[string]struct { - setEnv bool - }{ - "image not set": {}, - "image set": { - setEnv: true, + ctx, _ := SetupFakeContext(t) + c := NewController(ctx, configmap.NewStaticWatcher( + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-observability", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + "_example": "test-config", + }, + }, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-logging", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + "zap-logger-config": "test-config", + "loglevel.controller": "info", + "loglevel.webhook": "info", + }, + }, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-tracing", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + "_example": "test-config", + }, }, - } - for n, tc := range testCases { - t.Run(n, func(t *testing.T) { - if tc.setEnv { - if err := os.Setenv("PING_IMAGE", "anything"); err != nil { - t.Fatalf("Failed to set env var: %v", err) - } - if err := os.Setenv("MT_PING_IMAGE", "anything"); err != nil { - t.Fatalf("Failed to set env var: %v", err) - } - defer func() { - if err := os.Unsetenv("PING_IMAGE"); err != nil { - t.Fatalf("Failed to unset env var: %v", err) - } - if err := os.Unsetenv("MT_PING_IMAGE"); err != nil { - t.Fatalf("Failed to unset env var: %v", err) - } - }() - - if err := os.Setenv("METRICS_DOMAIN", "knative.dev/eventing"); err != nil { - t.Fatalf("Failed to set env var: %v", err) - } - defer func() { - if err := os.Unsetenv("METRICS_DOMAIN"); err != nil { - t.Fatalf("Failed to unset env var: %v", err) - } - }() - } else { - defer func() { - r := recover() - if r == nil { - t.Errorf("Expected NewController to panic, nothing recovered.") - } - }() - } - - ctx, _ := SetupFakeContext(t) - c := NewController(ctx, configmap.NewStaticWatcher( - &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "config-observability", - Namespace: "knative-eventing", - }, - Data: map[string]string{ - "_example": "test-config", - }, - }, &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "config-logging", - Namespace: "knative-eventing", - }, - Data: map[string]string{ - "zap-logger-config": "test-config", - "loglevel.controller": "info", - "loglevel.webhook": "info", - }, - }, &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: "config-tracing", - Namespace: "knative-eventing", - }, - Data: map[string]string{ - "_example": "test-config", - }, - }, - )) + )) - if c == nil { - t.Fatal("Expected NewController to return a non-nil value") - } - }) + if c == nil { + t.Fatal("Expected NewController to return a non-nil value") } } diff --git a/pkg/reconciler/pingsource/pingsource.go b/pkg/reconciler/pingsource/pingsource.go index 277e0ab1475..432ff5aedcc 100644 --- a/pkg/reconciler/pingsource/pingsource.go +++ b/pkg/reconciler/pingsource/pingsource.go @@ -54,6 +54,7 @@ const ( component = "pingsource" mtcomponent = "pingsource-mt-adapter" mtadapterName = "pingsource-mt-adapter" + containerName = "dispatcher" ) func newWarningSinkNotFound(sink *duckv1.Destination) pkgreconciler.Event { @@ -64,8 +65,6 @@ func newWarningSinkNotFound(sink *duckv1.Destination) pkgreconciler.Event { type Reconciler struct { kubeClientSet kubernetes.Interface - receiveAdapterImage string - // listers index properties about resources pingLister listers.PingSourceLister deploymentLister appsv1listers.DeploymentLister @@ -156,13 +155,11 @@ func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *v1alph } args := resources.Args{ - ServiceAccountName: mtadapterName, - AdapterName: mtadapterName, - Image: r.receiveAdapterImage, - LoggingConfig: loggingConfig, - MetricsConfig: metricsConfig, - LeConfig: r.leConfig, - NoShutdownAfter: mtping.GetNoShutDownAfterValue(), + AdapterName: mtadapterName, + LoggingConfig: loggingConfig, + MetricsConfig: metricsConfig, + LeConfig: r.leConfig, + NoShutdownAfter: mtping.GetNoShutDownAfterValue(), } expected := resources.MakeReceiveAdapter(args) @@ -178,8 +175,9 @@ func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *v1alph return d, nil } return nil, fmt.Errorf("error getting mt adapter deployment %v", err) - } else if podSpecChanged(d.Spec.Template.Spec, expected.Spec.Template.Spec) { - d.Spec.Template.Spec = expected.Spec.Template.Spec + } else if update, c := needsUpdating(ctx, &d.Spec.Template.Spec, &expected.Spec.Template.Spec); update { + c.Env = expected.Spec.Template.Spec.Containers[0].Env + if d, err = r.kubeClientSet.AppsV1().Deployments(system.Namespace()).Update(d); err != nil { return d, err } @@ -191,7 +189,22 @@ func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *v1alph return d, nil } -func podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1.PodSpec) bool { - // We really care about the fields we set and ignore the test. - return !equality.Semantic.DeepDerivative(newPodSpec, oldPodSpec) +func needsUpdating(ctx context.Context, oldPodSpec *corev1.PodSpec, newPodSpec *corev1.PodSpec) (bool, *corev1.Container) { + // We just care about the environment of the dispatcher container + container := findContainer(oldPodSpec, containerName) + if container == nil { + logging.FromContext(ctx).Errorf("invalid %s deployment: missing the %s container", mtadapterName, containerName) + return false, nil + } + + return !equality.Semantic.DeepEqual(container.Env, newPodSpec.Containers[0].Env), container +} + +func findContainer(podSpec *corev1.PodSpec, name string) *corev1.Container { + for i, container := range podSpec.Containers { + if container.Name == name { + return &podSpec.Containers[i] + } + } + return nil } diff --git a/pkg/reconciler/pingsource/pingsource_test.go b/pkg/reconciler/pingsource/pingsource_test.go index 83439ca8256..e78bd06e457 100644 --- a/pkg/reconciler/pingsource/pingsource_test.go +++ b/pkg/reconciler/pingsource/pingsource_test.go @@ -67,16 +67,11 @@ var ( ) const ( - image = "github.com/knative/test/image" - mtimage = "github.com/knative/test/mtimage" - sourceName = "test-ping-source" - sourceUID = "1234" - sourceNameLong = "test-pingserver-source-with-a-very-long-name" - sourceUIDLong = "cafed00d-cafed00d-cafed00d-cafed00d-cafed00d" - testNS = "testnamespace" - testSchedule = "*/2 * * * *" - testData = "data" - crName = "knative-eventing-pingsource-adapter" + sourceName = "test-ping-source" + sourceUID = "1234" + testNS = "testnamespace" + testSchedule = "*/2 * * * *" + testData = "data" sinkName = "testsink" generation = 1 @@ -232,11 +227,10 @@ func TestAllCases(t *testing.T) { table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler { ctx = addressable.WithDuck(ctx) r := &Reconciler{ - kubeClientSet: fakekubeclient.Get(ctx), - pingLister: listers.GetPingSourceV1alpha2Lister(), - deploymentLister: listers.GetDeploymentLister(), - tracker: tracker.New(func(types.NamespacedName) {}, 0), - receiveAdapterImage: mtimage, + kubeClientSet: fakekubeclient.Get(ctx), + pingLister: listers.GetPingSourceV1alpha2Lister(), + deploymentLister: listers.GetDeploymentLister(), + tracker: tracker.New(func(types.NamespacedName) {}, 0), } r.sinkResolver = resolver.NewURIResolver(ctx, func(types.NamespacedName) {}) @@ -251,10 +245,8 @@ func TestAllCases(t *testing.T) { func MakeMTAdapter() *appsv1.Deployment { args := resources.Args{ - ServiceAccountName: mtadapterName, - AdapterName: mtadapterName, - Image: mtimage, - NoShutdownAfter: mtping.GetNoShutDownAfterValue(), + AdapterName: mtadapterName, + NoShutdownAfter: mtping.GetNoShutDownAfterValue(), } return resources.MakeReceiveAdapter(args) } diff --git a/pkg/reconciler/pingsource/resources/receive_adapter.go b/pkg/reconciler/pingsource/resources/receive_adapter.go index fa4de10e255..9da9b58b2d1 100644 --- a/pkg/reconciler/pingsource/resources/receive_adapter.go +++ b/pkg/reconciler/pingsource/resources/receive_adapter.go @@ -21,34 +21,22 @@ import ( v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/eventing/pkg/adapter/mtping" "knative.dev/eventing/pkg/adapter/v2" "knative.dev/pkg/system" ) -var ( - mtlabels = map[string]string{ - "sources.knative.dev/role": "adapter", - "eventing.knative.dev/source": controllerAgentName, - } -) - type Args struct { - ServiceAccountName string - AdapterName string - Image string - MetricsConfig string - LoggingConfig string - LeConfig string - NoShutdownAfter int + AdapterName string + MetricsConfig string + LoggingConfig string + LeConfig string + NoShutdownAfter int } // MakeReceiveAdapter generates the mtping deployment for pingsources func MakeReceiveAdapter(args Args) *v1.Deployment { - replicas := int32(1) - return &v1.Deployment{ TypeMeta: metav1.TypeMeta{ APIVersion: "apps/v1", @@ -59,37 +47,30 @@ func MakeReceiveAdapter(args Args) *v1.Deployment { Name: args.AdapterName, }, Spec: v1.DeploymentSpec{ - Replicas: &replicas, - Selector: &metav1.LabelSelector{ - MatchLabels: mtlabels, - }, Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: mtlabels, - }, Spec: corev1.PodSpec{ - ServiceAccountName: args.ServiceAccountName, Containers: []corev1.Container{ { - Name: "dispatcher", - Image: args.Image, - Env: makeEnv(args), - - // Set low resource requests and limits. - // This should be configurable. - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("125m"), - corev1.ResourceMemory: resource.MustParse("64Mi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1000m"), - corev1.ResourceMemory: resource.MustParse("2048Mi"), + Name: "dispatcher", + Env: []corev1.EnvVar{{ + Name: system.NamespaceEnvKey, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, }, - }, - Ports: []corev1.ContainerPort{{ - Name: "metrics", - ContainerPort: 9090, + }, { + Name: adapter.EnvConfigMetricsConfig, + Value: args.MetricsConfig, + }, { + Name: adapter.EnvConfigLoggingConfig, + Value: args.LoggingConfig, + }, { + Name: adapter.EnvConfigLeaderElectionConfig, + Value: args.LeConfig, + }, { + Name: mtping.EnvNoShutdownAfter, + Value: strconv.Itoa(args.NoShutdownAfter), }}, }, }, @@ -98,29 +79,3 @@ func MakeReceiveAdapter(args Args) *v1.Deployment { }, } } - -func makeEnv(args Args) []corev1.EnvVar { - return []corev1.EnvVar{{ - Name: system.NamespaceEnvKey, - Value: system.Namespace(), - }, { - Name: adapter.EnvConfigNamespace, - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.namespace", - }, - }, - }, { - Name: adapter.EnvConfigMetricsConfig, - Value: args.MetricsConfig, - }, { - Name: adapter.EnvConfigLoggingConfig, - Value: args.LoggingConfig, - }, { - Name: adapter.EnvConfigLeaderElectionConfig, - Value: args.LeConfig, - }, { - Name: mtping.EnvNoShutdownAfter, - Value: strconv.Itoa(args.NoShutdownAfter), - }} -} diff --git a/pkg/reconciler/pingsource/resources/receive_adapter_test.go b/pkg/reconciler/pingsource/resources/receive_adapter_test.go index 8195953a87e..991a36df465 100644 --- a/pkg/reconciler/pingsource/resources/receive_adapter_test.go +++ b/pkg/reconciler/pingsource/resources/receive_adapter_test.go @@ -22,22 +22,17 @@ import ( "github.com/google/go-cmp/cmp" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/system" _ "knative.dev/pkg/system/testing" ) func TestMakePingAdapter(t *testing.T) { - replicas := int32(1) - args := Args{ - ServiceAccountName: "test-sa", - AdapterName: "test-name", - Image: "test-image", - MetricsConfig: "metrics", - LoggingConfig: "logging", - NoShutdownAfter: 40, + AdapterName: "test-name", + MetricsConfig: "metrics", + LoggingConfig: "logging", + NoShutdownAfter: 40, } want := &v1.Deployment{ @@ -50,25 +45,13 @@ func TestMakePingAdapter(t *testing.T) { Name: args.AdapterName, }, Spec: v1.DeploymentSpec{ - Replicas: &replicas, - Selector: &metav1.LabelSelector{ - MatchLabels: mtlabels, - }, Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: mtlabels, - }, Spec: corev1.PodSpec{ - ServiceAccountName: args.ServiceAccountName, Containers: []corev1.Container{ { - Name: "dispatcher", - Image: args.Image, + Name: "dispatcher", Env: []corev1.EnvVar{{ - Name: system.NamespaceEnvKey, - Value: system.Namespace(), - }, { - Name: "NAMESPACE", + Name: system.NamespaceEnvKey, ValueFrom: &corev1.EnvVarSource{ FieldRef: &corev1.ObjectFieldSelector{ FieldPath: "metadata.namespace", @@ -87,22 +70,6 @@ func TestMakePingAdapter(t *testing.T) { Name: "K_NO_SHUTDOWN_AFTER", Value: "40", }}, - // Set low resource requests and limits. - // This should be configurable. - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("125m"), - corev1.ResourceMemory: resource.MustParse("64Mi"), - }, - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1000m"), - corev1.ResourceMemory: resource.MustParse("2048Mi"), - }, - }, - Ports: []corev1.ContainerPort{{ - Name: "metrics", - ContainerPort: 9090, - }}, }, }, },