Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Fix Kafka Channel dispatcher ownerRef #1536

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions kafka/channel/config/500-kafka-ch-dispatcher.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Copyright 2018 The Knative Authors
aliok marked this conversation as resolved.
Show resolved Hide resolved
aliok marked this conversation as resolved.
Show resolved Hide resolved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-ch-dispatcher
namespace: knative-eventing
labels:
eventing.knative.dev/release: devel
spec:
# this deployment is going to be scaled up by the
# controller when the very first KafkaChannel is created
replicas: 0
selector:
matchLabels:
messaging.knative.dev/channel: kafka-channel
messaging.knative.dev/role: dispatcher
template:
metadata:
labels:
messaging.knative.dev/channel: kafka-channel
messaging.knative.dev/role: dispatcher
eventing.knative.dev/release: devel
spec:
containers:
- name: dispatcher
image: ko://knative.dev/eventing-contrib/kafka/channel/cmd/channel_dispatcher
env:
- name: SYSTEM_NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
- name: METRICS_DOMAIN
value: "knative.dev/eventing"
- name: CONFIG_LOGGING_NAME
value: "config-logging"
- name: CONFIG_LEADERELECTION_NAME
value: "config-leader-election-kafka"
ports:
- containerPort: 9090
name: metrics
protocol: TCP
volumeMounts:
- name: config-kafka
mountPath: /etc/config-kafka
serviceAccountName: kafka-ch-dispatcher
volumes:
- name: config-kafka
configMap:
name: config-kafka

---

apiVersion: v1
kind: Service
metadata:
labels:
messaging.knative.dev/channel: kafka-channel
messaging.knative.dev/role: dispatcher
name: kafka-ch-dispatcher
namespace: knative-eventing
spec:
ports:
- name: http-dispatcher
port: 80
protocol: TCP
targetPort: 8080
selector:
messaging.knative.dev/channel: kafka-channel
messaging.knative.dev/role: dispatcher
31 changes: 21 additions & 10 deletions kafka/channel/pkg/reconciler/controller/kafkachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,17 +294,28 @@ func (r *Reconciler) reconcileDispatcher(ctx context.Context, scope string, disp
logging.FromContext(ctx).Errorw("Unable to get the dispatcher deployment", zap.Error(err))
kc.Status.MarkDispatcherUnknown("DispatcherDeploymentFailed", "Failed to get dispatcher deployment: %v", err)
return nil, err
} else if !reflect.DeepEqual(expected.Spec.Template.Spec.Containers[0].Image, d.Spec.Template.Spec.Containers[0].Image) {
logging.FromContext(ctx).Infof("Deployment image is not what we expect it to be, updating Deployment Got: %q Expect: %q", expected.Spec.Template.Spec.Containers[0].Image, d.Spec.Template.Spec.Containers[0].Image)
d, err := r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Update(expected)
if err == nil {
controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated")
kc.Status.PropagateDispatcherStatus(&d.Status)
return d, nil
} else {
kc.Status.MarkServiceFailed("DispatcherDeploymentUpdateFailed", "Failed to update the dispatcher deployment: %v", err)
} else {
needsUpdate := false

if !reflect.DeepEqual(expected.Spec.Template.Spec.Containers[0].Image, d.Spec.Template.Spec.Containers[0].Image) {
aliok marked this conversation as resolved.
Show resolved Hide resolved
logging.FromContext(ctx).Infof("Deployment image is not what we expect it to be, updating Deployment Got: %q Expect: %q", expected.Spec.Template.Spec.Containers[0].Image, d.Spec.Template.Spec.Containers[0].Image)
needsUpdate = true
} else if !reflect.DeepEqual(expected.Spec.Replicas, d.Spec.Replicas) {
aliok marked this conversation as resolved.
Show resolved Hide resolved
logging.FromContext(ctx).Infof("Deployment image is not what we expect it to be, updating Deployment Got: %q Expect: %q", expected.Spec.Template.Spec.Containers[0].Image, d.Spec.Template.Spec.Containers[0].Image)
needsUpdate = true
}

if needsUpdate {
d, err := r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Update(expected)
if err == nil {
controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated")
kc.Status.PropagateDispatcherStatus(&d.Status)
return d, nil
} else {
kc.Status.MarkServiceFailed("DispatcherDeploymentUpdateFailed", "Failed to update the dispatcher deployment: %v", err)
}
return d, newDeploymentWarn(err)
}
return d, newDeploymentWarn(err)
}

kc.Status.PropagateDispatcherStatus(&d.Status)
Expand Down