Skip to content

Commit

Permalink
Add ConsumerGroup scheduling
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com>
  • Loading branch information
pierDipi committed Jan 7, 2022
1 parent e32734a commit 4cc955a
Show file tree
Hide file tree
Showing 17 changed files with 1,357 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (
)

const (
ConditionConsumers apis.ConditionType = "Consumers"
ConditionConsumerGroupConsumers apis.ConditionType = "Consumers"
ConditionConsumerGroupConsumersScheduled apis.ConditionType = "ConsumersScheduled"
)

var (
conditionSet = apis.NewLivingConditionSet(
ConditionConsumers,
ConditionConsumerGroupConsumers,
ConditionConsumerGroupConsumersScheduled,
)
)

Expand All @@ -38,10 +40,20 @@ func (c *ConsumerGroup) GetConditionSet() apis.ConditionSet {

func (cg *ConsumerGroup) MarkReconcileConsumersFailed(reason string, err error) error {
err = fmt.Errorf("failed to reconcile consumers: %w", err)
cg.GetConditionSet().Manage(cg.GetStatus()).MarkFalse(ConditionConsumers, reason, err.Error())
cg.GetConditionSet().Manage(cg.GetStatus()).MarkFalse(ConditionConsumerGroupConsumers, reason, err.Error())
return err
}

func (cg *ConsumerGroup) MarkReconcileConsumersSucceeded() {
cg.GetConditionSet().Manage(cg.GetStatus()).MarkTrue(ConditionConsumers)
cg.GetConditionSet().Manage(cg.GetStatus()).MarkTrue(ConditionConsumerGroupConsumers)
}

func (cg *ConsumerGroup) MarkScheduleConsumerFailed(reason string, err error) error {
err = fmt.Errorf("failed to schedule consumers: %w", err)
cg.GetConditionSet().Manage(cg.GetStatus()).MarkFalse(ConditionConsumerGroupConsumers, reason, err.Error())
return err
}

func (cg *ConsumerGroup) MarkScheduleSucceeded() {
cg.GetConditionSet().Manage(cg.GetStatus()).MarkTrue(ConditionConsumerGroupConsumersScheduled)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"

Expand Down Expand Up @@ -53,6 +54,21 @@ type ConsumerGroup struct {
Status ConsumerGroupStatus `json:"status,omitempty"`
}

func (cg *ConsumerGroup) GetKey() types.NamespacedName {
return types.NamespacedName{
Namespace: cg.GetNamespace(),
Name: cg.GetName(),
}
}

func (cg *ConsumerGroup) GetVReplicas() int32 {
return *cg.Spec.Replicas
}

func (cg *ConsumerGroup) GetPlacements() []eventingduckv1alpha1.Placement {
return cg.Status.Placements
}

type ConsumerGroupSpec struct {

// Template is the object that describes the consumer that will be created if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ type Consumer struct {
Status ConsumerStatus `json:"status,omitempty"`
}

type PodBind struct {
PodName string `json:"podName"`
PodNamespace string `json:"podNamespace"`
}

type ConsumerSpec struct {
// Topics is the list of topics to subscribe to.
Topics []string `json:"topics"`
Expand All @@ -71,6 +76,12 @@ type ConsumerSpec struct {

// Subscriber is the addressable that receives events that pass the Filters.
Subscriber duckv1.Destination `json:"subscriber"`

// VReplicas is the number of virtual replicas for a consumer.
VReplicas *int32

// PodBind represents a reference to the pod in which the consumer should be placed.
PodBind *PodBind `json:"podBind"`
}

type DeliverySpec struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ var ConsumerGroupGroupVersionKind = schema.GroupVersionKind{
Kind: "ConsumerGroup",
}

var ConsumerGroupVersionKind = schema.GroupVersionKind{
Group: SchemeGroupVersion.Group,
Version: SchemeGroupVersion.Version,
Kind: "Consumer",
}

// Kind takes an unqualified kind and returns back a Group qualified GroupKind.
func Kind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

193 changes: 149 additions & 44 deletions control-plane/pkg/reconciler/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,42 @@ package consumergroup
import (
"context"
"fmt"
"math"
"sort"

"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/utils/pointer"
eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
"knative.dev/pkg/reconciler"

"knative.dev/eventing/pkg/scheduler"

kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
internalv1alpha1 "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/clientset/versioned/typed/eventing/v1alpha1"
"knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/reconciler/eventing/v1alpha1/consumergroup"
kafkainternalslisters "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/listers/eventing/v1alpha1"
)

type Reconciler struct {
Scheduler scheduler.Scheduler
ConsumerLister kafkainternalslisters.ConsumerLister
InternalsClient internalv1alpha1.InternalV1alpha1Interface

NameGenerator names.NameGenerator

SystemNamespace string
}

func (r Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event {
if err := r.schedule(cg); err != nil {
return err
}
cg.MarkScheduleSucceeded()

if err := r.reconcileConsumers(ctx, cg); err != nil {
return err
}
Expand All @@ -55,71 +71,87 @@ func (r Reconciler) reconcileConsumers(ctx context.Context, cg *kafkainternals.C
return cg.MarkReconcileConsumersFailed("ListConsumers", err)
}

// Stable sort consumers so that we give consumers different deletion
// priorities based on their state.
//
// Consumers at the tail of the list are deleted first.
sort.SliceStable(existingConsumers, func(i, j int) bool {
return existingConsumers[i].IsLessThan(existingConsumers[j])
})
consumersByPlacement := r.joinConsumersByPlacement(cg.Status.Placements, existingConsumers)

nConsumers := int32(len(existingConsumers))

for i := int32(0); i < *cg.Spec.Replicas && i < nConsumers; i++ {
if err := r.reconcileConsumer(ctx, cg, existingConsumers[i]); err != nil {
return cg.MarkReconcileConsumersFailed("ReconcileConsumer", err)
}
}

if nConsumers > *cg.Spec.Replicas {
toBeDeleted := existingConsumers[:nConsumers-*cg.Spec.Replicas]
for _, c := range toBeDeleted {
if err := r.finalizeConsumer(ctx, c); err != nil {
return cg.MarkReconcileConsumersFailed("FinalizeConsumer", err)
for _, pc := range consumersByPlacement {
if pc.Placement == nil {
for _, c := range pc.Consumers {
if err := r.finalizeConsumer(ctx, c); err != nil {
return cg.MarkReconcileConsumersFailed("FinalizeConsumer", err)
}
}
continue
}
return nil
}
if nConsumers < *cg.Spec.Replicas {
for i := int32(0); i < *cg.Spec.Replicas-nConsumers; i++ {
if err := r.reconcileConsumer(ctx, cg, nil); err != nil {
return cg.MarkReconcileConsumersFailed("ReconcileConsumer", err)
}

if err := r.reconcileConsumersInPlacement(ctx, cg, *pc.Placement, pc.Consumers); err != nil {
return cg.MarkReconcileConsumersFailed("ReconcileConsumer", err)
}
return nil
}

return nil
}

func (r Reconciler) reconcileConsumer(ctx context.Context, cg *kafkainternals.ConsumerGroup, c *kafkainternals.Consumer) error {
if c == nil {
// Consumer doesn't exist, create it.
c = cg.ConsumerFromTemplate()
func (r Reconciler) reconcileConsumersInPlacement(
ctx context.Context,
cg *kafkainternals.ConsumerGroup,
placement eventingduckv1alpha1.Placement,
consumers []*kafkainternals.Consumer) error {

if _, err := r.InternalsClient.Consumers(cg.GetNamespace()).Create(ctx, c, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to create consumer %s/%s: %w", c.GetNamespace(), c.GetName(), err)
// Check if there is a consumer for the given placement.
if len(consumers) == 0 {
return r.createConsumer(ctx, cg, placement)
}

// Stable sort consumers so that we give consumers different deletion
// priorities based on their state.
//
// Consumers at the tail of the list are deleted.
sort.SliceStable(consumers, func(i, j int) bool { return consumers[i].IsLessThan(consumers[j]) })

for _, c := range consumers[1:] {
if err := r.finalizeConsumer(ctx, c); err != nil {
return cg.MarkReconcileConsumersFailed("FinalizeConsumer", err)
}
return nil
}
if equality.Semantic.DeepDerivative(cg.Spec.Template, c.Spec) {

// Do not modify informer copy.
c := &kafkainternals.Consumer{}
consumers[0].DeepCopyInto(c)

expectedSpec := kafkainternals.ConsumerSpec{}
cg.Spec.Template.Spec.DeepCopyInto(&expectedSpec)

expectedSpec.VReplicas = pointer.Int32Ptr(placement.VReplicas)

if equality.Semantic.DeepDerivative(expectedSpec, c.Spec) {
// Consumer is equal to the template.
return nil
}

c.Spec.PodBind = &kafkainternals.PodBind{PodName: placement.PodName, PodNamespace: r.SystemNamespace}
c.Spec.VReplicas = pointer.Int32Ptr(*expectedSpec.VReplicas)

// Update existing Consumer.
newC := &kafkainternals.Consumer{
TypeMeta: c.TypeMeta,
ObjectMeta: c.ObjectMeta,
Spec: cg.Spec.Template.Spec,
Status: c.Status,
}
if _, err := r.InternalsClient.Consumers(cg.GetNamespace()).Update(ctx, newC, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update consumer %s/%s: %w", newC.GetNamespace(), newC.GetName(), err)
if _, err := r.InternalsClient.Consumers(cg.GetNamespace()).Update(ctx, c, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update consumer %s/%s: %w", c.GetNamespace(), c.GetName(), err)
}

return nil
}

func (r Reconciler) createConsumer(ctx context.Context, cg *kafkainternals.ConsumerGroup, placement eventingduckv1alpha1.Placement) error {
c := cg.ConsumerFromTemplate()

c.Name = r.NameGenerator.GenerateName(cg.GetName() + "-")
c.Spec.VReplicas = pointer.Int32Ptr(placement.VReplicas)
c.Spec.PodBind = &kafkainternals.PodBind{PodName: placement.PodName, PodNamespace: r.SystemNamespace}

if _, err := r.InternalsClient.Consumers(cg.GetNamespace()).Create(ctx, c, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to create consumer %s/%s: %w", c.GetNamespace(), c.GetName(), err)
}
return nil
}

func (r Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainternals.Consumer) error {
dOpts := metav1.DeleteOptions{
Preconditions: &metav1.Preconditions{UID: &consumer.UID},
Expand All @@ -131,6 +163,79 @@ func (r Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainterna
return nil
}

func (r Reconciler) schedule(cg *kafkainternals.ConsumerGroup) error {
placements, err := r.Scheduler.Schedule(cg)
if err != nil {
return cg.MarkScheduleConsumerFailed("Schedule", err)
}
// Sort placements by pod name.
sort.SliceStable(placements, func(i, j int) bool { return placements[i].PodName < placements[j].PodName })

cg.Status.Placements = placements

return nil
}

type ConsumersPerPlacement struct {
Placement *eventingduckv1alpha1.Placement
Consumers []*kafkainternals.Consumer
}

func (r Reconciler) joinConsumersByPlacement(placements []eventingduckv1alpha1.Placement, consumers []*kafkainternals.Consumer) []ConsumersPerPlacement {
placementConsumers := make([]ConsumersPerPlacement, 0, int(math.Max(float64(len(placements)), float64(len(consumers)))))

// Group consumers by Pod bind.
consumersByPod := make(map[kafkainternals.PodBind][]*kafkainternals.Consumer, len(consumers))
for i := range consumers {
consumersByPod[*consumers[i].Spec.PodBind] = append(consumersByPod[*consumers[i].Spec.PodBind], consumers[i])
}

// Group placements by Pod bind.
placementsByPod := make(map[kafkainternals.PodBind]eventingduckv1alpha1.Placement, len(placements))
for i := range placements {
pb := kafkainternals.PodBind{
PodName: placements[i].PodName,
PodNamespace: r.SystemNamespace,
}

v := placementsByPod[pb]
placementsByPod[pb] = eventingduckv1alpha1.Placement{
PodName: pb.PodName,
VReplicas: v.VReplicas + placements[i].VReplicas,
}
}

for k := range placementsByPod {
if _, ok := consumersByPod[k]; !ok {
consumersByPod[k] = nil
}
}

for pb := range consumersByPod {

var p *eventingduckv1alpha1.Placement
if v, ok := placementsByPod[pb]; ok {
p = &v
}

c := consumersByPod[pb]

placementConsumers = append(placementConsumers, ConsumersPerPlacement{Placement: p, Consumers: c})
}

sort.Slice(placementConsumers, func(i, j int) bool {
if placementConsumers[i].Placement == nil {
return true
}
if placementConsumers[j].Placement == nil {
return false
}
return placementConsumers[i].Placement.PodName < placementConsumers[j].Placement.PodName
})

return placementConsumers
}

var (
_ consumergroup.Interface = &Reconciler{}
)
Loading

0 comments on commit 4cc955a

Please sign in to comment.