From a82fabadcf8a2429a1f8ef50577986710a0f182f Mon Sep 17 00:00:00 2001 From: aavarghese Date: Fri, 7 Jan 2022 16:39:53 -0500 Subject: [PATCH] WIP Review update cont Signed-off-by: aavarghese --- .../pkg/reconciler/channel/v2/channelv2.go | 127 +++++++++++++++++- .../pkg/reconciler/channel/v2/controllerv2.go | 9 +- 2 files changed, 128 insertions(+), 8 deletions(-) diff --git a/control-plane/pkg/reconciler/channel/v2/channelv2.go b/control-plane/pkg/reconciler/channel/v2/channelv2.go index 0af1b06daf..2c0d2398eb 100644 --- a/control-plane/pkg/reconciler/channel/v2/channelv2.go +++ b/control-plane/pkg/reconciler/channel/v2/channelv2.go @@ -19,25 +19,36 @@ package v2 import ( "context" "fmt" + "strings" "go.uber.org/multierr" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corelisters "k8s.io/client-go/listers/core/v1" v1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/pkg/apis" "knative.dev/pkg/kmeta" "knative.dev/pkg/reconciler" + "knative.dev/pkg/system" messagingv1beta1 "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" + "knative.dev/eventing-kafka-broker/control-plane/pkg/config" + "knative.dev/eventing-kafka-broker/control-plane/pkg/contract" + "knative.dev/eventing-kafka-broker/control-plane/pkg/receiver" + "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/kafka" internals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing" internalscg "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1" internalsclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/clientset/versioned" internalslst "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/listers/eventing/v1alpha1" + coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config" + kafkalogging "knative.dev/eventing-kafka-broker/control-plane/pkg/logging" ) const ( @@ -55,34 +66,94 @@ var ( ) type Reconciler struct { + *base.Reconciler + *config.Env + ConfigMapLister corelisters.ConfigMapLister ConsumerGroupLister internalslst.ConsumerGroupLister InternalsClient internalsclient.Interface } func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta1.KafkaChannel) reconciler.Event { + logger := kafkalogging.CreateReconcileMethodLogger(ctx, channel) - err := r.reconcileConsumerGroup(ctx, channel) + topicName := kafka.ChannelTopic(TopicPrefix, channel) + + channelConfigMap, err := r.channelConfigMap() + if err != nil { + return err + } + bootstrapServers, err := kafka.BootstrapServersFromConfigMap(logger, channelConfigMap) + if err != nil { + return err + } + + // Get contract config map. + contractConfigMap, err := r.GetOrCreateDataPlaneConfigMap(ctx) + if err != nil { + return err + } + logger.Debug("Got contract config map") + + // Get contract data. + ct, err := r.GetDataPlaneConfigMapData(logger, contractConfigMap) + if err != nil && ct == nil { + return err + } + if ct == nil { + ct = &contract.Contract{} + } + logger.Debug("Got contract data from config map", zap.Any(base.ContractLogKey, ct)) + + // Get channel configuration + channelConfig := &contract.Resource{ + Uid: string(channel.UID), + Topics: []string{topicName}, + Ingress: &contract.Ingress{ + IngressType: &contract.Ingress_Path{ + Path: receiver.PathFromObject(channel), + }, + }, + BootstrapServers: strings.Join(bootstrapServers, ","), + Reference: &contract.Reference{ + Uuid: string(channel.GetUID()), + Namespace: channel.GetNamespace(), + Name: channel.GetName(), + }, + } + + err = r.reconcileConsumerGroup(ctx, channel, topicName) if err != nil { channel.GetConditionSet().Manage(&channel.Status).MarkFalse(KafkaChannelConditionConsumerGroup, "failed to reconcile consumer group", err.Error()) return err } channel.GetConditionSet().Manage(&channel.Status).MarkTrue(KafkaChannelConditionConsumerGroup) - //todo other status fields relevant for channels - //SubscribableStatus - //AddressStatus - //DeadLetterSinkURI + // Update contract data with the new contract configuration (add/update channel resource) + channelIndex := coreconfig.FindResource(ct, channel.UID) + changed := coreconfig.AddOrUpdateResourceConfig(ct, channelConfig, channelIndex, logger) + logger.Debug("Change detector", zap.Int("changed", changed)) + + if changed == coreconfig.ResourceChanged { + // Resource changed, increment contract generation. + coreconfig.IncrementContractGeneration(ct) + + // Update the configuration map with the new contract data. + if err := r.UpdateDataPlaneConfigMap(ctx, ct, contractConfigMap); err != nil { + return err + } + logger.Debug("Contract config map updated") + } return nil } -func (r Reconciler) reconcileConsumerGroup(ctx context.Context, channel *messagingv1beta1.KafkaChannel) error { +func (r Reconciler) reconcileConsumerGroup(ctx context.Context, channel *messagingv1beta1.KafkaChannel, topicName string) error { - topicName := kafka.ChannelTopic(TopicPrefix, channel) var globalErr error for i := range channel.Spec.Subscribers { s := &channel.Spec.Subscribers[i] + newcg := &internalscg.ConsumerGroup{ ObjectMeta: metav1.ObjectMeta{ Name: string(s.UID), @@ -109,13 +180,31 @@ func (r Reconciler) reconcileConsumerGroup(ctx context.Context, channel *messagi cg, err := r.ConsumerGroupLister.ConsumerGroups(channel.GetNamespace()).Get(string(s.UID)) //Get by consumer group id if err != nil && !apierrors.IsNotFound(err) { + channel.Status.Subscribers = append(channel.Status.Subscribers, v1.SubscriberStatus{ + UID: s.UID, + ObservedGeneration: s.Generation, + Ready: corev1.ConditionFalse, + Message: fmt.Sprint("Subscription not ready", err), + }) globalErr = multierr.Append(globalErr, err) } if apierrors.IsNotFound(err) { _, err = r.InternalsClient.InternalV1alpha1().ConsumerGroups(newcg.GetNamespace()).Create(ctx, newcg, metav1.CreateOptions{}) if err != nil && !apierrors.IsAlreadyExists(err) { + channel.Status.Subscribers = append(channel.Status.Subscribers, v1.SubscriberStatus{ + UID: s.UID, + ObservedGeneration: s.Generation, + Ready: corev1.ConditionFalse, + Message: fmt.Sprint("Subscription not ready", err), + }) globalErr = multierr.Append(globalErr, fmt.Errorf("failed to create consumer group %s/%s: %w", newcg.GetNamespace(), newcg.GetName(), err)) + } else { + channel.Status.Subscribers = append(channel.Status.Subscribers, v1.SubscriberStatus{ + UID: s.UID, + ObservedGeneration: s.Generation, + Ready: corev1.ConditionTrue, + }) } continue } @@ -128,7 +217,19 @@ func (r Reconciler) reconcileConsumerGroup(ctx context.Context, channel *messagi Status: cg.Status, } if _, err = r.InternalsClient.InternalV1alpha1().ConsumerGroups(cg.GetNamespace()).Update(ctx, newCg, metav1.UpdateOptions{}); err != nil { + channel.Status.Subscribers = append(channel.Status.Subscribers, v1.SubscriberStatus{ + UID: s.UID, + ObservedGeneration: s.Generation, + Ready: corev1.ConditionFalse, + Message: fmt.Sprint("Subscription not ready", err), + }) globalErr = multierr.Append(globalErr, fmt.Errorf("failed to update consumer group %s/%s: %w", newCg.GetNamespace(), newCg.GetName(), err)) + } else { + channel.Status.Subscribers = append(channel.Status.Subscribers, v1.SubscriberStatus{ + UID: s.UID, + ObservedGeneration: s.Generation, + Ready: corev1.ConditionTrue, + }) } } } @@ -140,3 +241,15 @@ func (r Reconciler) reconcileConsumerGroup(ctx context.Context, channel *messagi func consumerGroup(channel *messagingv1beta1.KafkaChannel, s *v1.SubscriberSpec) string { return fmt.Sprintf("kafka.%s.%s.%s", channel.Namespace, channel.Name, string(s.UID)) } + +func (r *Reconciler) channelConfigMap() (*corev1.ConfigMap, error) { + // TODO: do we want to support namespaced channels? they're not supported at the moment. + + namespace := system.Namespace() + cm, err := r.ConfigMapLister.ConfigMaps(namespace).Get(r.Env.GeneralConfigMapName) + if err != nil { + return nil, fmt.Errorf("failed to get configmap %s/%s: %w", namespace, r.Env.GeneralConfigMapName, err) + } + + return cm, nil +} diff --git a/control-plane/pkg/reconciler/channel/v2/controllerv2.go b/control-plane/pkg/reconciler/channel/v2/controllerv2.go index 8498b181b2..0bb45d5c70 100644 --- a/control-plane/pkg/reconciler/channel/v2/controllerv2.go +++ b/control-plane/pkg/reconciler/channel/v2/controllerv2.go @@ -31,6 +31,7 @@ import ( "knative.dev/pkg/controller" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" + "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/consumergroup" ) @@ -42,6 +43,12 @@ func NewController(ctx context.Context, configs *config.Env) *controller.Impl { messagingv1beta.RegisterAlternateKafkaChannelConditionSet(conditionSet) reconciler := &Reconciler{ + Reconciler: &base.Reconciler{ + DataPlaneConfigMapNamespace: configs.DataPlaneConfigMapNamespace, + DataPlaneConfigMapName: configs.DataPlaneConfigMapName, + DataPlaneConfigFormat: configs.DataPlaneConfigFormat, + }, + Env: configs, ConsumerGroupLister: consumerGroupInformer.Lister(), InternalsClient: consumergroupclient.Get(ctx), } @@ -55,7 +62,7 @@ func NewController(ctx context.Context, configs *config.Env) *controller.Impl { // ConsumerGroup changes and enqueue associated channel consumerGroupInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: consumergroup.Filter("kafkachannel"), - Handler: controller.HandleAll(impl.Enqueue), + Handler: controller.HandleAll(consumergroup.Enqueue("kafkachannel", impl.EnqueueKey)), }) return impl