Skip to content

Commit

Permalink
Review updates
Browse files Browse the repository at this point in the history
Signed-off-by: aavarghese <avarghese@us.ibm.com>
  • Loading branch information
aavarghese committed Feb 2, 2022
1 parent 6069af7 commit e371d41
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
const (
ConditionConsumerGroupConsumers apis.ConditionType = "Consumers"
ConditionConsumerGroupConsumersScheduled apis.ConditionType = "ConsumersScheduled"
// Labels
KafkaChannelNameLabel = "kafkachannel-name"
)

var (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ var (

func (c *ConsumerGroup) Validate(ctx context.Context) *apis.FieldError {
ctx = apis.WithinParent(ctx, c.ObjectMeta)
if apis.IsInUpdate(ctx) {
err := c.CheckImmutableFields(ctx, apis.GetBaseline(ctx).(*ConsumerGroup).Labels)
if err != nil {
return err
}
}
return c.Spec.Validate(ctx).ViaField("spec")
}

Expand All @@ -48,3 +54,13 @@ func (cts *ConsumerTemplateSpec) Validate(ctx context.Context) *apis.FieldError
}
return cts.Spec.Validate(specCtx).ViaField("spec")
}

func (c *ConsumerGroup) CheckImmutableFields(ctx context.Context, original map[string]string) *apis.FieldError {
if _, ok := original[KafkaChannelNameLabel]; ok {
if _, ok := c.Labels[KafkaChannelNameLabel]; !ok {
return ErrImmutableField("Consumer Group Label",
"Removing the consumer group label is unsupported")
}
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/pkg/apis"
Expand Down Expand Up @@ -378,6 +379,149 @@ func TestConsumerGroup_Validate(t *testing.T) {
},
wantErr: true,
},
{
name: "valid with channel label",
ctx: apis.WithinUpdate(context.Background(), &ConsumerGroup{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
KafkaChannelNameLabel: "channelName", // identifies the new ConsumerGroup as associated with this channel
},
},
Spec: ConsumerGroupSpec{
Replicas: pointer.Int32Ptr(1),
Selector: map[string]string{"app": "app"},
Template: ConsumerTemplateSpec{
Spec: ConsumerSpec{
Topics: []string{"t1"},
Configs: ConsumerConfigs{
Configs: map[string]string{
"group.id": "g1",
"bootstrap.servers": "kafka:9092",
},
},
Delivery: &DeliverySpec{
DeliverySpec: &eventingduck.DeliverySpec{},
},
Subscriber: duckv1.Destination{
URI: &apis.URL{
Scheme: "http",
Host: "127.0.0.1",
},
},
PodBind: &PodBind{
PodName: "p-0",
PodNamespace: "ns",
},
},
},
},
}),
given: &ConsumerGroup{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
KafkaChannelNameLabel: "channelName", // identifies the new ConsumerGroup as associated with this channel
},
},
Spec: ConsumerGroupSpec{
Replicas: pointer.Int32Ptr(1),
Selector: map[string]string{"app": "app"},
Template: ConsumerTemplateSpec{
Spec: ConsumerSpec{
Topics: []string{"t1"},
Configs: ConsumerConfigs{
Configs: map[string]string{
"group.id": "g1",
"bootstrap.servers": "kafka:9092",
},
},
Delivery: &DeliverySpec{
DeliverySpec: &eventingduck.DeliverySpec{},
},
Subscriber: duckv1.Destination{
URI: &apis.URL{
Scheme: "http",
Host: "127.0.0.1",
},
},
PodBind: &PodBind{
PodName: "p-0",
PodNamespace: "ns",
},
},
},
},
},
wantErr: false,
},
{
name: "invalid without channel label",
ctx: apis.WithinUpdate(context.Background(), &ConsumerGroup{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
KafkaChannelNameLabel: "channelName", // identifies the new ConsumerGroup as associated with this channel
},
},
Spec: ConsumerGroupSpec{
Replicas: pointer.Int32Ptr(1),
Selector: map[string]string{"app": "app"},
Template: ConsumerTemplateSpec{
Spec: ConsumerSpec{
Topics: []string{"t1"},
Configs: ConsumerConfigs{
Configs: map[string]string{
"group.id": "g1",
"bootstrap.servers": "kafka:9092",
},
},
Delivery: &DeliverySpec{
DeliverySpec: &eventingduck.DeliverySpec{},
},
Subscriber: duckv1.Destination{
URI: &apis.URL{
Scheme: "http",
Host: "127.0.0.1",
},
},
PodBind: &PodBind{
PodName: "p-0",
PodNamespace: "ns",
},
},
},
},
}),
given: &ConsumerGroup{
Spec: ConsumerGroupSpec{
Replicas: pointer.Int32Ptr(1),
Selector: map[string]string{"app": "app"},
Template: ConsumerTemplateSpec{
Spec: ConsumerSpec{
Topics: []string{"t1"},
Configs: ConsumerConfigs{
Configs: map[string]string{
"group.id": "g1",
"bootstrap.servers": "kafka:9092",
},
},
Delivery: &DeliverySpec{
DeliverySpec: &eventingduck.DeliverySpec{},
},
Subscriber: duckv1.Destination{
URI: &apis.URL{
Scheme: "http",
Host: "127.0.0.1",
},
},
PodBind: &PodBind{
PodName: "p-0",
PodNamespace: "ns",
},
},
},
},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
26 changes: 18 additions & 8 deletions control-plane/pkg/reconciler/channel/v2/channelv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v2

import (
"context"
"errors"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -64,9 +65,6 @@ const (
TopicPrefix = "knative-messaging-kafka"
DefaultDeliveryOrder = internals.Ordered

// Labels
KafkaChannelNameLabel = "kafkachannel-name"

KafkaChannelConditionConsumerGroup apis.ConditionType = "ConsumerGroup" //condition is registered by controller
)

Expand Down Expand Up @@ -320,6 +318,8 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, channel *messagingv1beta1
logger.Debug("Contract config map updated")
}

channel.Status.Address = nil

// We update receiver and dispatcher pods annotation regardless of our contract changed or not due to the fact
// that in a previous reconciliation we might have failed to update one of our data plane pod annotation, so we want
// to update anyway remaining annotations with the contract generation that was saved in the CM.
Expand All @@ -334,12 +334,22 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, channel *messagingv1beta1
return err
}

// TODO probe (as in #974) and check if status code is 404 otherwise requeue and return.
// Rationale: after deleting a topic closing a producer ends up blocking and requesting metadata for max.block.ms
// because topic metadata aren't available anymore.
// See (under discussions KIPs, unlikely to be accepted as they are):
// - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446
// - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update
address := receiver.Address(r.IngressHost, channel)
proberAddressable := prober.Addressable{
Address: address,
ResourceKey: types.NamespacedName{
Namespace: channel.GetNamespace(),
Name: channel.GetName(),
},
}
if status := r.Prober.Probe(ctx, proberAddressable); status != prober.StatusNotReady {
return nil // Object will get re-queued once probe status changes.
}

// get the channel configmap
channelConfigMap, err := r.channelConfigMap()
Expand Down Expand Up @@ -436,7 +446,7 @@ func (r *Reconciler) reconcileSubscribers(ctx context.Context, channel *messagin
Ready: corev1.ConditionUnknown,
Message: msg,
})
//globalErr = multierr.Append(globalErr, errors.New(msg))
globalErr = multierr.Append(globalErr, errors.New(msg))
} else {
msg := fmt.Sprint("Subscription not ready. ", topLevelCondition.Reason, topLevelCondition.Message)
channel.Status.Subscribers = append(channel.Status.Subscribers, v1.SubscriberStatus{
Expand All @@ -445,14 +455,14 @@ func (r *Reconciler) reconcileSubscribers(ctx context.Context, channel *messagin
Ready: corev1.ConditionFalse,
Message: msg,
})
//globalErr = multierr.Append(globalErr, errors.New(msg))
globalErr = multierr.Append(globalErr, errors.New(msg))
}
}
}
}

// Get all consumer groups associated with this Channel
selector := labels.SelectorFromSet(map[string]string{KafkaChannelNameLabel: channel.Name})
selector := labels.SelectorFromSet(map[string]string{internalscg.KafkaChannelNameLabel: channel.Name})
channelCgs, err := r.ConsumerGroupLister.ConsumerGroups(channel.GetNamespace()).List(selector)
if err != nil {
globalErr = multierr.Append(globalErr, err)
Expand Down Expand Up @@ -484,7 +494,7 @@ func (r Reconciler) reconcileConsumerGroup(ctx context.Context, channel *messagi
*kmeta.NewControllerRef(channel),
},
Labels: map[string]string{
KafkaChannelNameLabel: channel.Name, // identifies the new ConsumerGroup as associated with this channel (same namespace)
internalscg.KafkaChannelNameLabel: channel.Name, // identifies the new ConsumerGroup as associated with this channel (same namespace)
},
},
Spec: internalscg.ConsumerGroupSpec{
Expand Down

0 comments on commit e371d41

Please sign in to comment.