Skip to content

Commit

Permalink
eager pingsource adapter creation
Browse files Browse the repository at this point in the history
  • Loading branch information
lionelvillard committed Sep 1, 2020
1 parent e55775f commit 295cf4e
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 241 deletions.
1 change: 1 addition & 0 deletions config/500-pingsource-mt-adapter.yaml
3 changes: 0 additions & 3 deletions config/core/deployments/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ spec:
value: config-observability
- name: METRICS_DOMAIN
value: knative.dev/eventing
# PingSource
- name: MT_PING_IMAGE
value: ko://knative.dev/eventing/cmd/mtping
# APIServerSource
- name: APISERVER_RA_IMAGE
value: ko://knative.dev/eventing/cmd/apiserver_receive_adapter
Expand Down
61 changes: 61 additions & 0 deletions config/core/deployments/pingsource-mt-adapter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright 2018 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: apps/v1
kind: Deployment
metadata:
name: pingsource-mt-adapter
namespace: knative-eventing
labels:
eventing.knative.dev/release: devel
spec:
replicas: 1
selector:
matchLabels:
eventing.knative.dev/source: ping-source-controller
sources.knative.dev/role: adapter
template:
metadata:
labels:
eventing.knative.dev/source: ping-source-controller
sources.knative.dev/role: adapter
eventing.knative.dev/release: devel
spec:
containers:
- name: dispatcher
image: ko://knative.dev/eventing/cmd/mtping
env:
- name: SYSTEM_NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
- name: K_METRICS_CONFIG
value: ''
- name: K_LOGGING_CONFIG
value: ''
- name: K_LOGGING_CONFIG
value: ''
ports:
- containerPort: 9090
name: metrics
protocol: TCP
resources:
requests:
cpu: 125m
memory: 64Mi
limits:
cpu: 1000m
memory: 2048Mi
serviceAccountName: pingsource-mt-adapter
26 changes: 6 additions & 20 deletions pkg/reconciler/pingsource/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package pingsource
import (
"context"

"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/client-go/tools/cache"
Expand All @@ -40,13 +39,6 @@ import (
reconcilersource "knative.dev/eventing/pkg/reconciler/source"
)

// envConfig will be used to extract the required environment variables using
// github.com/kelseyhightower/envconfig. If this configuration cannot be extracted, then
// NewController will panic.
type envConfig struct {
Image string `envconfig:"MT_PING_IMAGE" required:"true"`
}

// NewController initializes the controller and is called by the generated code
// Registers event handlers to enqueue events
func NewController(
Expand All @@ -55,11 +47,6 @@ func NewController(
) *controller.Impl {
logger := logging.FromContext(ctx)

env := &envConfig{}
if err := envconfig.Process("", env); err != nil {
logger.Fatalw("unable to process PingSourceSource's required environment variables: %v", err)
}

// Retrieve leader election config
leaderElectionConfig, err := sharedmain.GetLeaderElectionConfig(ctx)
if err != nil {
Expand All @@ -78,13 +65,12 @@ func NewController(
pingSourceInformer := pingsourceinformer.Get(ctx)

r := &Reconciler{
kubeClientSet: kubeclient.Get(ctx),
pingLister: pingSourceInformer.Lister(),
deploymentLister: deploymentInformer.Lister(),
leConfig: leConfig,
loggingContext: ctx,
configs: reconcilersource.WatchConfigurations(ctx, component, cmw),
receiveAdapterImage: env.Image,
kubeClientSet: kubeclient.Get(ctx),
pingLister: pingSourceInformer.Lister(),
deploymentLister: deploymentInformer.Lister(),
leConfig: leConfig,
loggingContext: ctx,
configs: reconcilersource.WatchConfigurations(ctx, component, cmw),
}

impl := pingsourcereconciler.NewImpl(ctx, r)
Expand Down
108 changes: 31 additions & 77 deletions pkg/reconciler/pingsource/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package pingsource

import (
"os"
"testing"

corev1 "k8s.io/api/core/v1"
Expand All @@ -35,83 +34,38 @@ import (
)

func TestNew(t *testing.T) {
testCases := map[string]struct {
setEnv bool
}{
"image not set": {},
"image set": {
setEnv: true,
ctx, _ := SetupFakeContext(t)
c := NewController(ctx, configmap.NewStaticWatcher(
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-observability",
Namespace: "knative-eventing",
},
Data: map[string]string{
"_example": "test-config",
},
}, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-logging",
Namespace: "knative-eventing",
},
Data: map[string]string{
"zap-logger-config": "test-config",
"loglevel.controller": "info",
"loglevel.webhook": "info",
},
}, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-tracing",
Namespace: "knative-eventing",
},
Data: map[string]string{
"_example": "test-config",
},
},
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
if tc.setEnv {
if err := os.Setenv("PING_IMAGE", "anything"); err != nil {
t.Fatalf("Failed to set env var: %v", err)
}
if err := os.Setenv("MT_PING_IMAGE", "anything"); err != nil {
t.Fatalf("Failed to set env var: %v", err)
}
defer func() {
if err := os.Unsetenv("PING_IMAGE"); err != nil {
t.Fatalf("Failed to unset env var: %v", err)
}
if err := os.Unsetenv("MT_PING_IMAGE"); err != nil {
t.Fatalf("Failed to unset env var: %v", err)
}
}()

if err := os.Setenv("METRICS_DOMAIN", "knative.dev/eventing"); err != nil {
t.Fatalf("Failed to set env var: %v", err)
}
defer func() {
if err := os.Unsetenv("METRICS_DOMAIN"); err != nil {
t.Fatalf("Failed to unset env var: %v", err)
}
}()
} else {
defer func() {
r := recover()
if r == nil {
t.Errorf("Expected NewController to panic, nothing recovered.")
}
}()
}

ctx, _ := SetupFakeContext(t)
c := NewController(ctx, configmap.NewStaticWatcher(
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-observability",
Namespace: "knative-eventing",
},
Data: map[string]string{
"_example": "test-config",
},
}, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-logging",
Namespace: "knative-eventing",
},
Data: map[string]string{
"zap-logger-config": "test-config",
"loglevel.controller": "info",
"loglevel.webhook": "info",
},
}, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-tracing",
Namespace: "knative-eventing",
},
Data: map[string]string{
"_example": "test-config",
},
},
))
))

if c == nil {
t.Fatal("Expected NewController to return a non-nil value")
}
})
if c == nil {
t.Fatal("Expected NewController to return a non-nil value")
}
}
41 changes: 27 additions & 14 deletions pkg/reconciler/pingsource/pingsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const (
component = "pingsource"
mtcomponent = "pingsource-mt-adapter"
mtadapterName = "pingsource-mt-adapter"
containerName = "dispatcher"
)

func newWarningSinkNotFound(sink *duckv1.Destination) pkgreconciler.Event {
Expand All @@ -64,8 +65,6 @@ func newWarningSinkNotFound(sink *duckv1.Destination) pkgreconciler.Event {
type Reconciler struct {
kubeClientSet kubernetes.Interface

receiveAdapterImage string

// listers index properties about resources
pingLister listers.PingSourceLister
deploymentLister appsv1listers.DeploymentLister
Expand Down Expand Up @@ -156,13 +155,11 @@ func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *v1alph
}

args := resources.Args{
ServiceAccountName: mtadapterName,
AdapterName: mtadapterName,
Image: r.receiveAdapterImage,
LoggingConfig: loggingConfig,
MetricsConfig: metricsConfig,
LeConfig: r.leConfig,
NoShutdownAfter: mtping.GetNoShutDownAfterValue(),
AdapterName: mtadapterName,
LoggingConfig: loggingConfig,
MetricsConfig: metricsConfig,
LeConfig: r.leConfig,
NoShutdownAfter: mtping.GetNoShutDownAfterValue(),
}
expected := resources.MakeReceiveAdapter(args)

Expand All @@ -178,8 +175,9 @@ func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *v1alph
return d, nil
}
return nil, fmt.Errorf("error getting mt adapter deployment %v", err)
} else if podSpecChanged(d.Spec.Template.Spec, expected.Spec.Template.Spec) {
d.Spec.Template.Spec = expected.Spec.Template.Spec
} else if update, c := needsUpdating(ctx, &d.Spec.Template.Spec, &expected.Spec.Template.Spec); update {
c.Env = expected.Spec.Template.Spec.Containers[0].Env

if d, err = r.kubeClientSet.AppsV1().Deployments(system.Namespace()).Update(d); err != nil {
return d, err
}
Expand All @@ -191,7 +189,22 @@ func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *v1alph
return d, nil
}

func podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1.PodSpec) bool {
// We really care about the fields we set and ignore the test.
return !equality.Semantic.DeepDerivative(newPodSpec, oldPodSpec)
func needsUpdating(ctx context.Context, oldPodSpec *corev1.PodSpec, newPodSpec *corev1.PodSpec) (bool, *corev1.Container) {
// We just care about the environment of the dispatcher container
container := findContainer(oldPodSpec, containerName)
if container == nil {
logging.FromContext(ctx).Errorf("invalid %s deployment: missing the %s container", mtadapterName, containerName)
return false, nil
}

return !equality.Semantic.DeepEqual(container.Env, newPodSpec.Containers[0].Env), container
}

func findContainer(podSpec *corev1.PodSpec, name string) *corev1.Container {
for i, container := range podSpec.Containers {
if container.Name == name {
return &podSpec.Containers[i]
}
}
return nil
}
30 changes: 11 additions & 19 deletions pkg/reconciler/pingsource/pingsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,11 @@ var (
)

const (
image = "github.com/knative/test/image"
mtimage = "github.com/knative/test/mtimage"
sourceName = "test-ping-source"
sourceUID = "1234"
sourceNameLong = "test-pingserver-source-with-a-very-long-name"
sourceUIDLong = "cafed00d-cafed00d-cafed00d-cafed00d-cafed00d"
testNS = "testnamespace"
testSchedule = "*/2 * * * *"
testData = "data"
crName = "knative-eventing-pingsource-adapter"
sourceName = "test-ping-source"
sourceUID = "1234"
testNS = "testnamespace"
testSchedule = "*/2 * * * *"
testData = "data"

sinkName = "testsink"
generation = 1
Expand Down Expand Up @@ -232,11 +227,10 @@ func TestAllCases(t *testing.T) {
table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler {
ctx = addressable.WithDuck(ctx)
r := &Reconciler{
kubeClientSet: fakekubeclient.Get(ctx),
pingLister: listers.GetPingSourceV1alpha2Lister(),
deploymentLister: listers.GetDeploymentLister(),
tracker: tracker.New(func(types.NamespacedName) {}, 0),
receiveAdapterImage: mtimage,
kubeClientSet: fakekubeclient.Get(ctx),
pingLister: listers.GetPingSourceV1alpha2Lister(),
deploymentLister: listers.GetDeploymentLister(),
tracker: tracker.New(func(types.NamespacedName) {}, 0),
}
r.sinkResolver = resolver.NewURIResolver(ctx, func(types.NamespacedName) {})

Expand All @@ -251,10 +245,8 @@ func TestAllCases(t *testing.T) {

func MakeMTAdapter() *appsv1.Deployment {
args := resources.Args{
ServiceAccountName: mtadapterName,
AdapterName: mtadapterName,
Image: mtimage,
NoShutdownAfter: mtping.GetNoShutDownAfterValue(),
AdapterName: mtadapterName,
NoShutdownAfter: mtping.GetNoShutDownAfterValue(),
}
return resources.MakeReceiveAdapter(args)
}
Expand Down
Loading

0 comments on commit 295cf4e

Please sign in to comment.