Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Fix Kafka Channel dispatcher ownerRef #1536

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions kafka/channel/config/500-kafka-ch-dispatcher.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Copyright 2018 The Knative Authors
aliok marked this conversation as resolved.
Show resolved Hide resolved
aliok marked this conversation as resolved.
Show resolved Hide resolved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-ch-dispatcher
namespace: knative-eventing
labels:
eventing.knative.dev/release: devel
spec:
# this deployment is going to be scaled up by the
# controller when the very first KafkaChannel is created
replicas: 0
selector:
matchLabels:
messaging.knative.dev/channel: kafka-channel
messaging.knative.dev/role: dispatcher
template:
metadata:
labels:
messaging.knative.dev/channel: kafka-channel
messaging.knative.dev/role: dispatcher
eventing.knative.dev/release: devel
spec:
containers:
- name: dispatcher
image: ko://knative.dev/eventing-contrib/kafka/channel/cmd/channel_dispatcher
env:
- name: SYSTEM_NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
- name: METRICS_DOMAIN
value: "knative.dev/eventing"
- name: CONFIG_LOGGING_NAME
value: "config-logging"
- name: CONFIG_LEADERELECTION_NAME
value: "config-leader-election-kafka"
ports:
- containerPort: 9090
name: metrics
protocol: TCP
volumeMounts:
- name: config-kafka
mountPath: /etc/config-kafka
serviceAccountName: kafka-ch-dispatcher
volumes:
- name: config-kafka
configMap:
name: config-kafka

---

apiVersion: v1
kind: Service
metadata:
labels:
messaging.knative.dev/channel: kafka-channel
messaging.knative.dev/role: dispatcher
name: kafka-ch-dispatcher
namespace: knative-eventing
spec:
ports:
- name: http-dispatcher
port: 80
protocol: TCP
targetPort: 8080
selector:
messaging.knative.dev/channel: kafka-channel
messaging.knative.dev/role: dispatcher
2 changes: 1 addition & 1 deletion kafka/channel/pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat

producer, err := sarama.NewAsyncProducer(args.Brokers, conf)
if err != nil {
return nil, fmt.Errorf("unable to create kafka producer: %v", err)
return nil, fmt.Errorf("unable to create kafka producer for brokers %v : %v", args.Brokers, err)
aliok marked this conversation as resolved.
Show resolved Hide resolved
}

dispatcher := &KafkaDispatcher{
Expand Down
57 changes: 46 additions & 11 deletions kafka/channel/pkg/reconciler/controller/kafkachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
"context"
"errors"
"fmt"
"reflect"

"k8s.io/utils/pointer"

"github.com/Shopify/sarama"

Expand Down Expand Up @@ -294,17 +295,51 @@ func (r *Reconciler) reconcileDispatcher(ctx context.Context, scope string, disp
logging.FromContext(ctx).Errorw("Unable to get the dispatcher deployment", zap.Error(err))
kc.Status.MarkDispatcherUnknown("DispatcherDeploymentFailed", "Failed to get dispatcher deployment: %v", err)
return nil, err
} else if !reflect.DeepEqual(expected.Spec.Template.Spec.Containers[0].Image, d.Spec.Template.Spec.Containers[0].Image) {
logging.FromContext(ctx).Infof("Deployment image is not what we expect it to be, updating Deployment Got: %q Expect: %q", expected.Spec.Template.Spec.Containers[0].Image, d.Spec.Template.Spec.Containers[0].Image)
d, err := r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Update(expected)
if err == nil {
controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated")
kc.Status.PropagateDispatcherStatus(&d.Status)
return d, nil
} else {
kc.Status.MarkServiceFailed("DispatcherDeploymentUpdateFailed", "Failed to update the dispatcher deployment: %v", err)
} else {
existing := utils.FindContainer(d, resources.DispatcherContainerName)
if existing == nil {
logging.FromContext(ctx).Errorw("Container %s does not exist in existing dispatcher deployment. Updating the deployment", resources.DispatcherContainerName)
d, err := r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Update(expected)
if err == nil {
controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated")
kc.Status.PropagateDispatcherStatus(&d.Status)
return d, nil
} else {
kc.Status.MarkServiceFailed("DispatcherDeploymentUpdateFailed", "Failed to update the dispatcher deployment: %v", err)
}
return d, newDeploymentWarn(err)
Comment on lines +303 to +311
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed a new commit and made it easier to understand now.
Looks more similar to old code: https://github.com/knative/eventing-contrib/blob/master/kafka/channel/pkg/reconciler/controller/kafkachannel.go#L307

}

expectedContainer := utils.FindContainer(expected, resources.DispatcherContainerName)
if expectedContainer == nil {
return nil, errors.New(fmt.Sprintf("Container %s does not exist in expected dispatcher deployment. Cannot check if the deployment needs an update.", resources.DispatcherContainerName))
}

needsUpdate := false

if existing.Image != expectedContainer.Image {
logging.FromContext(ctx).Infof("Dispatcher deployment image is not what we expect it to be, updating Deployment Got: %q Expect: %q", expected.Spec.Template.Spec.Containers[0].Image, d.Spec.Template.Spec.Containers[0].Image)
existing.Image = expectedContainer.Image
needsUpdate = true
}

if *d.Spec.Replicas == 0 {
logging.FromContext(ctx).Infof("Dispatcher deployment has 0 replicas. Scaling up deployment to 1 replicas")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: replicas -> replica, both instances.

d.Spec.Replicas = pointer.Int32Ptr(1)
needsUpdate = true
}

if needsUpdate {
d, err := r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Update(d)
if err == nil {
controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated")
kc.Status.PropagateDispatcherStatus(&d.Status)
return d, nil
} else {
kc.Status.MarkServiceFailed("DispatcherDeploymentUpdateFailed", "Failed to update the dispatcher deployment: %v", err)
}
return d, newDeploymentWarn(err)
}
return d, newDeploymentWarn(err)
}

kc.Status.PropagateDispatcherStatus(&d.Status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"knative.dev/pkg/system"
)

const DispatcherContainerName = "dispatcher"

var (
serviceAccountName = "kafka-ch-dispatcher"
dispatcherName = "kafka-ch-dispatcher"
Expand Down Expand Up @@ -65,7 +67,7 @@ func MakeDispatcher(args DispatcherArgs) *v1.Deployment {
ServiceAccountName: serviceAccountName,
Containers: []corev1.Container{
{
Name: "dispatcher",
Name: DispatcherContainerName,
Image: args.Image,
Env: makeEnv(args),
Ports: []corev1.ContainerPort{{
Expand Down
12 changes: 12 additions & 0 deletions kafka/channel/pkg/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package utils
import (
"errors"
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"strings"

"knative.dev/pkg/configmap"
Expand Down Expand Up @@ -89,3 +91,13 @@ func TopicName(separator, namespace, name string) string {
topic := []string{knativeKafkaTopicPrefix, namespace, name}
return strings.Join(topic, separator)
}

func FindContainer(d *appsv1.Deployment, containerName string) *corev1.Container {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find a function like this in the codebase. Any pointers?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hrm...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

@aliok aliok Sep 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/hold
Ok, looks like I need to write some unit tests for this function I created...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/unhold

Done with unit tests.
Next time I need this, I will refactor it into a function in knative/pkg

for i := range d.Spec.Template.Spec.Containers {
if d.Spec.Template.Spec.Containers[i].Name == containerName {
return &d.Spec.Template.Spec.Containers[i]
}
}

return nil
}