Skip to content

Commit

Permalink
WIP Review update cont
Browse files Browse the repository at this point in the history
Signed-off-by: aavarghese <avarghese@us.ibm.com>
  • Loading branch information
aavarghese committed Jan 7, 2022
1 parent 126c683 commit a82faba
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 8 deletions.
127 changes: 120 additions & 7 deletions control-plane/pkg/reconciler/channel/v2/channelv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,36 @@ package v2
import (
"context"
"fmt"
"strings"

"go.uber.org/multierr"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corelisters "k8s.io/client-go/listers/core/v1"

v1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/pkg/apis"
"knative.dev/pkg/kmeta"
"knative.dev/pkg/reconciler"
"knative.dev/pkg/system"

messagingv1beta1 "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"

"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"
"knative.dev/eventing-kafka-broker/control-plane/pkg/receiver"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/kafka"

internals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing"
internalscg "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
internalsclient "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/clientset/versioned"
internalslst "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/listers/eventing/v1alpha1"
coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config"
kafkalogging "knative.dev/eventing-kafka-broker/control-plane/pkg/logging"
)

const (
Expand All @@ -55,34 +66,94 @@ var (
)

type Reconciler struct {
*base.Reconciler
*config.Env
ConfigMapLister corelisters.ConfigMapLister
ConsumerGroupLister internalslst.ConsumerGroupLister
InternalsClient internalsclient.Interface
}

func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta1.KafkaChannel) reconciler.Event {
logger := kafkalogging.CreateReconcileMethodLogger(ctx, channel)

err := r.reconcileConsumerGroup(ctx, channel)
topicName := kafka.ChannelTopic(TopicPrefix, channel)

channelConfigMap, err := r.channelConfigMap()
if err != nil {
return err
}
bootstrapServers, err := kafka.BootstrapServersFromConfigMap(logger, channelConfigMap)
if err != nil {
return err
}

// Get contract config map.
contractConfigMap, err := r.GetOrCreateDataPlaneConfigMap(ctx)
if err != nil {
return err
}
logger.Debug("Got contract config map")

// Get contract data.
ct, err := r.GetDataPlaneConfigMapData(logger, contractConfigMap)
if err != nil && ct == nil {
return err
}
if ct == nil {
ct = &contract.Contract{}
}
logger.Debug("Got contract data from config map", zap.Any(base.ContractLogKey, ct))

// Get channel configuration
channelConfig := &contract.Resource{
Uid: string(channel.UID),
Topics: []string{topicName},
Ingress: &contract.Ingress{
IngressType: &contract.Ingress_Path{
Path: receiver.PathFromObject(channel),
},
},
BootstrapServers: strings.Join(bootstrapServers, ","),
Reference: &contract.Reference{
Uuid: string(channel.GetUID()),
Namespace: channel.GetNamespace(),
Name: channel.GetName(),
},
}

err = r.reconcileConsumerGroup(ctx, channel, topicName)
if err != nil {
channel.GetConditionSet().Manage(&channel.Status).MarkFalse(KafkaChannelConditionConsumerGroup, "failed to reconcile consumer group", err.Error())
return err
}
channel.GetConditionSet().Manage(&channel.Status).MarkTrue(KafkaChannelConditionConsumerGroup)

//todo other status fields relevant for channels
//SubscribableStatus
//AddressStatus
//DeadLetterSinkURI
// Update contract data with the new contract configuration (add/update channel resource)
channelIndex := coreconfig.FindResource(ct, channel.UID)
changed := coreconfig.AddOrUpdateResourceConfig(ct, channelConfig, channelIndex, logger)
logger.Debug("Change detector", zap.Int("changed", changed))

if changed == coreconfig.ResourceChanged {
// Resource changed, increment contract generation.
coreconfig.IncrementContractGeneration(ct)

// Update the configuration map with the new contract data.
if err := r.UpdateDataPlaneConfigMap(ctx, ct, contractConfigMap); err != nil {
return err
}
logger.Debug("Contract config map updated")
}

return nil
}

func (r Reconciler) reconcileConsumerGroup(ctx context.Context, channel *messagingv1beta1.KafkaChannel) error {
func (r Reconciler) reconcileConsumerGroup(ctx context.Context, channel *messagingv1beta1.KafkaChannel, topicName string) error {

topicName := kafka.ChannelTopic(TopicPrefix, channel)
var globalErr error

for i := range channel.Spec.Subscribers {
s := &channel.Spec.Subscribers[i]

newcg := &internalscg.ConsumerGroup{
ObjectMeta: metav1.ObjectMeta{
Name: string(s.UID),
Expand All @@ -109,13 +180,31 @@ func (r Reconciler) reconcileConsumerGroup(ctx context.Context, channel *messagi

cg, err := r.ConsumerGroupLister.ConsumerGroups(channel.GetNamespace()).Get(string(s.UID)) //Get by consumer group id
if err != nil && !apierrors.IsNotFound(err) {
channel.Status.Subscribers = append(channel.Status.Subscribers, v1.SubscriberStatus{
UID: s.UID,
ObservedGeneration: s.Generation,
Ready: corev1.ConditionFalse,
Message: fmt.Sprint("Subscription not ready", err),
})
globalErr = multierr.Append(globalErr, err)
}

if apierrors.IsNotFound(err) {
_, err = r.InternalsClient.InternalV1alpha1().ConsumerGroups(newcg.GetNamespace()).Create(ctx, newcg, metav1.CreateOptions{})
if err != nil && !apierrors.IsAlreadyExists(err) {
channel.Status.Subscribers = append(channel.Status.Subscribers, v1.SubscriberStatus{
UID: s.UID,
ObservedGeneration: s.Generation,
Ready: corev1.ConditionFalse,
Message: fmt.Sprint("Subscription not ready", err),
})
globalErr = multierr.Append(globalErr, fmt.Errorf("failed to create consumer group %s/%s: %w", newcg.GetNamespace(), newcg.GetName(), err))
} else {
channel.Status.Subscribers = append(channel.Status.Subscribers, v1.SubscriberStatus{
UID: s.UID,
ObservedGeneration: s.Generation,
Ready: corev1.ConditionTrue,
})
}
continue
}
Expand All @@ -128,7 +217,19 @@ func (r Reconciler) reconcileConsumerGroup(ctx context.Context, channel *messagi
Status: cg.Status,
}
if _, err = r.InternalsClient.InternalV1alpha1().ConsumerGroups(cg.GetNamespace()).Update(ctx, newCg, metav1.UpdateOptions{}); err != nil {
channel.Status.Subscribers = append(channel.Status.Subscribers, v1.SubscriberStatus{
UID: s.UID,
ObservedGeneration: s.Generation,
Ready: corev1.ConditionFalse,
Message: fmt.Sprint("Subscription not ready", err),
})
globalErr = multierr.Append(globalErr, fmt.Errorf("failed to update consumer group %s/%s: %w", newCg.GetNamespace(), newCg.GetName(), err))
} else {
channel.Status.Subscribers = append(channel.Status.Subscribers, v1.SubscriberStatus{
UID: s.UID,
ObservedGeneration: s.Generation,
Ready: corev1.ConditionTrue,
})
}
}
}
Expand All @@ -140,3 +241,15 @@ func (r Reconciler) reconcileConsumerGroup(ctx context.Context, channel *messagi
func consumerGroup(channel *messagingv1beta1.KafkaChannel, s *v1.SubscriberSpec) string {
return fmt.Sprintf("kafka.%s.%s.%s", channel.Namespace, channel.Name, string(s.UID))
}

func (r *Reconciler) channelConfigMap() (*corev1.ConfigMap, error) {
// TODO: do we want to support namespaced channels? they're not supported at the moment.

namespace := system.Namespace()
cm, err := r.ConfigMapLister.ConfigMaps(namespace).Get(r.Env.GeneralConfigMapName)
if err != nil {
return nil, fmt.Errorf("failed to get configmap %s/%s: %w", namespace, r.Env.GeneralConfigMapName, err)
}

return cm, nil
}
9 changes: 8 additions & 1 deletion control-plane/pkg/reconciler/channel/v2/controllerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"knative.dev/pkg/controller"

"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/base"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/consumergroup"
)

Expand All @@ -42,6 +43,12 @@ func NewController(ctx context.Context, configs *config.Env) *controller.Impl {
messagingv1beta.RegisterAlternateKafkaChannelConditionSet(conditionSet)

reconciler := &Reconciler{
Reconciler: &base.Reconciler{
DataPlaneConfigMapNamespace: configs.DataPlaneConfigMapNamespace,
DataPlaneConfigMapName: configs.DataPlaneConfigMapName,
DataPlaneConfigFormat: configs.DataPlaneConfigFormat,
},
Env: configs,
ConsumerGroupLister: consumerGroupInformer.Lister(),
InternalsClient: consumergroupclient.Get(ctx),
}
Expand All @@ -55,7 +62,7 @@ func NewController(ctx context.Context, configs *config.Env) *controller.Impl {
// ConsumerGroup changes and enqueue associated channel
consumerGroupInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: consumergroup.Filter("kafkachannel"),
Handler: controller.HandleAll(impl.Enqueue),
Handler: controller.HandleAll(consumergroup.Enqueue("kafkachannel", impl.EnqueueKey)),
})

return impl
Expand Down

0 comments on commit a82faba

Please sign in to comment.