Skip to content

Commit

Permalink
[Consolidated Channel] Update target pod IPs when dispatcher pods sca…
Browse files Browse the repository at this point in the history
…le before the subscription is marked ready (openshift-knative#487)

* Restart probing dispatchers if the pods change before being ready

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* fix when to return subscribers reconciled

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Proper locking in status prober

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Add a debug log when dispatcher pods change

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Refresh pods probing on dispatcher pod changes

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* change some bad var names

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Add prober test for succeeding only after refreshing pod probing

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Update struct creation after unused field removed

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Reuse same handler for pod informer

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>

* Fix a couple of typos

Signed-off-by: Ahmed Abdalla <aabdelre@redhat.com>
  • Loading branch information
devguyio authored Apr 1, 2021
1 parent 48961e3 commit c4ff783
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 150 deletions.
29 changes: 17 additions & 12 deletions pkg/channel/consolidated/reconciler/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ package controller
import (
"context"

corev1 "k8s.io/api/core/v1"
knativeReconciler "knative.dev/pkg/reconciler"

"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"

"knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"
"knative.dev/eventing-kafka/pkg/channel/consolidated/status"
kafkamessagingv1beta1 "knative.dev/eventing-kafka/pkg/client/informers/externalversions/messaging/v1beta1"
kafkaChannelClient "knative.dev/eventing-kafka/pkg/client/injection/client"
"knative.dev/eventing-kafka/pkg/client/injection/informers/messaging/v1beta1/kafkachannel"
kafkaChannelReconciler "knative.dev/eventing-kafka/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel"
Expand All @@ -45,6 +44,7 @@ import (
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
knativeReconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/system"
)

Expand Down Expand Up @@ -154,17 +154,22 @@ func NewController(
),
Handler: cache.ResourceEventHandlerFuncs{
// Cancel probing when a Pod is deleted
DeleteFunc: func(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if ok && pod != nil {
logger.Debugw("Dispatcher pod deleted. Canceling pod probing.",
zap.String("pod", pod.GetName()))
statusProber.CancelPodProbing(*pod)
impl.GlobalResync(kafkaChannelInformer.Informer())
}
},
DeleteFunc: getPodInformerEventHandler(ctx, logger, statusProber, impl, kafkaChannelInformer, "Delete"),
AddFunc: getPodInformerEventHandler(ctx, logger, statusProber, impl, kafkaChannelInformer, "Add"),
},
})

return impl
}

func getPodInformerEventHandler(ctx context.Context, logger *zap.SugaredLogger, statusProber *status.Prober, impl *controller.Impl, kafkaChannelInformer kafkamessagingv1beta1.KafkaChannelInformer, handlerType string) func(obj interface{}) {
return func(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if ok && pod != nil {
logger.Debugw("%s pods. Refreshing pod probing.", handlerType,
zap.String("pod", pod.GetName()))
statusProber.RefreshPodProbing(ctx)
impl.GlobalResync(kafkaChannelInformer.Informer())
}
}
}
15 changes: 4 additions & 11 deletions pkg/channel/consolidated/reconciler/controller/kafkachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1beta1.KafkaChannel
kc.Status.MarkConfigFailed("InvalidConfiguration", "Unable to build Kafka admin client for channel %s: %v", kc.Name, err)
return err
}
defer kafkaClusterAdmin.Close()

kc.Status.MarkConfigTrue()

Expand Down Expand Up @@ -225,34 +226,26 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1beta1.KafkaChannel
// Reconcile the k8s service representing the actual Channel. It points to the Dispatcher service via ExternalName
svc, err := r.reconcileChannelService(ctx, dispatcherNamespace, kc)
if err != nil {

return err
}
kc.Status.MarkChannelServiceTrue()
kc.Status.SetAddress(&apis.URL{
Scheme: "http",
Host: network.GetServiceHostname(svc.Name, svc.Namespace),
})
err = r.setupSubscriptionStatusWatcher(ctx, kc)
err = r.reconcileSubscribers(ctx, kc)
if err != nil {
logger.Errorw("error setting up some subscription status watchers", zap.Error(err))
}
// close the connection
err = kafkaClusterAdmin.Close()
if err != nil {
logger.Errorw("Error closing the connection", zap.Error(err))
return err
return fmt.Errorf("error reconciling subscribers %v", err)
}

// Ok, so now the Dispatcher Deployment & Service have been created, we're golden since the
// dispatcher watches the Channel and where it needs to dispatch events to.
return newReconciledNormal(kc.Namespace, kc.Name)
}

func (r *Reconciler) setupSubscriptionStatusWatcher(ctx context.Context, ch *v1beta1.KafkaChannel) error {
func (r *Reconciler) reconcileSubscribers(ctx context.Context, ch *v1beta1.KafkaChannel) error {
after := ch.DeepCopy()
after.Status.Subscribers = make([]v1.SubscriberStatus, 0)

for _, s := range ch.Spec.Subscribers {
if r, _ := r.statusManager.IsReady(ctx, *ch, s); r {
logging.FromContext(ctx).Debugw("marking subscription", zap.Any("subscription", s))
Expand Down
1 change: 0 additions & 1 deletion pkg/channel/consolidated/reconciler/controller/lister.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func (t *DispatcherPodsLister) ListProbeTargets(ctx context.Context, kc v1beta1.
return &status.ProbeTarget{
PodIPs: sets.NewString(readyIPs...),
PodPort: "8081",
Port: "8081",
URL: u,
}, nil
}
Expand Down
132 changes: 90 additions & 42 deletions pkg/channel/consolidated/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ type targetState struct {
pendingCount atomic.Int32
// readyCount is the number of pods that have the subscription ready
readyPartitions sets.Int
initialCount int
probedPods sets.String
lastAccessed time.Time

cancel func()
ready bool
cancel func()
}

// podState represents the probing state of a Pod (for a specific subscription)
Expand Down Expand Up @@ -97,7 +97,6 @@ type workItem struct {
type ProbeTarget struct {
PodIPs sets.String
PodPort string
Port string
URL *url.URL
}

Expand All @@ -107,7 +106,7 @@ type ProbeTargetLister interface {
ListProbeTargets(ctx context.Context, ch messagingv1beta1.KafkaChannel) (*ProbeTarget, error)
}

// Manager provides a way to check if an Ingress is ready
// Manager provides a way to check if a Subscription is ready
type Manager interface {
IsReady(ctx context.Context, ch messagingv1beta1.KafkaChannel, sub eventingduckv1.SubscriberSpec) (bool, error)
CancelProbing(sub eventingduckv1.SubscriberSpec)
Expand Down Expand Up @@ -160,40 +159,52 @@ func NewProber(
}

func (m *Prober) checkReadiness(state *targetState) bool {
consumers := int32(state.initialCount)
state.readyLock.Lock()
defer state.readyLock.Unlock()
partitions := state.ch.Spec.NumPartitions
m.logger.Debugw("Checking subscription readiness",
zap.Any("initial probed consumers", consumers),
zap.Any("subscription", state.sub.UID),
zap.Any("channel", state.ch.Name),
zap.Any("pod ips", state.probedPods),
zap.Any("channel partitions", partitions),
zap.Any("ready partitions", state.readyPartitions.List()),
)
return state.readyPartitions.Len() == int(partitions)
if !state.ready {
state.ready = state.readyPartitions.Len() == int(partitions)
}
return state.ready
}

func (m *Prober) IsReady(ctx context.Context, ch messagingv1beta1.KafkaChannel, sub eventingduckv1.SubscriberSpec) (bool, error) {
subscriptionKey := sub.UID
logger := logging.FromContext(ctx)

if ready, ok := func() (bool, bool) {
// Get the probe targets
target, err := m.targetLister.ListProbeTargets(ctx, ch)
if err != nil {
logger.Errorw("Error listing probe targets", zap.Error(err),
zap.Any("subscription", sub.UID))
return false, err
}
// get the state while locking for very short scope
state, ok := func() (*targetState, bool) {
m.mu.Lock()
defer m.mu.Unlock()
if state, ok := m.targetStates[subscriptionKey]; ok {
if state.sub.Generation == sub.Generation {
state.lastAccessed = time.Now()
logger.Debugw("Subscription is cached. Checking readiness",
zap.Any("subscription", sub.UID))
return m.checkReadiness(state), true
}

// Cancel the polling for the outdated version
state.cancel()
delete(m.targetStates, subscriptionKey)
s, o := m.targetStates[sub.UID]
return s, o
}()
if ok {
if !isOutdatedTargetState(state, sub, target.PodIPs) {
return m.checkReadiness(state), nil
}
return false, false
}(); ok {
return ready, nil
m.ejectStateUnsafe(sub)
}
m.probeTarget(ctx, ch, sub, target)
return false, nil
}

func (m *Prober) probeTarget(ctx context.Context, ch messagingv1beta1.KafkaChannel, sub eventingduckv1.SubscriberSpec, target *ProbeTarget) {
subscriptionKey := sub.UID
logger := logging.FromContext(ctx)
subCtx, cancel := context.WithCancel(context.Background())
subscriptionState := &targetState{
sub: sub,
Expand All @@ -202,14 +213,7 @@ func (m *Prober) IsReady(ctx context.Context, ch messagingv1beta1.KafkaChannel,
cancel: cancel,
}

// Get the probe targets and group them by IP
target, err := m.targetLister.ListProbeTargets(ctx, ch)
if err != nil {
logger.Errorw("Error listing probe targets", zap.Error(err),
zap.Any("subscription", sub.UID))
return false, err
}

// Group the probe targets by IP
workItems := make(map[string][]*workItem)
for ip := range target.PodIPs {
workItems[ip] = append(workItems[ip], &workItem{
Expand All @@ -221,7 +225,7 @@ func (m *Prober) IsReady(ctx context.Context, ch messagingv1beta1.KafkaChannel,
})
}

subscriptionState.initialCount = target.PodIPs.Len()
subscriptionState.probedPods = target.PodIPs
subscriptionState.pendingCount.Store(int32(len(workItems)))
subscriptionState.readyPartitions = sets.Int{}

Expand Down Expand Up @@ -281,7 +285,6 @@ func (m *Prober) IsReady(ctx context.Context, ch messagingv1beta1.KafkaChannel,
defer m.mu.Unlock()
m.targetStates[subscriptionKey] = subscriptionState
}()
return false, nil
}

// Start starts the Manager background operations
Expand Down Expand Up @@ -317,6 +320,11 @@ func (m *Prober) Start(done <-chan struct{}) chan struct{} {
func (m *Prober) CancelProbing(sub eventingduckv1.SubscriberSpec) {
m.mu.Lock()
defer m.mu.Unlock()
m.ejectStateUnsafe(sub)
}

// ejectStateUnsafe ejects a state from Cache, it's not safe for concurrent access and is meant for internal use only under proper locking.
func (m *Prober) ejectStateUnsafe(sub eventingduckv1.SubscriberSpec) {
if state, ok := m.targetStates[sub.UID]; ok {
m.logger.Debugw("Canceling state", zap.Any("subscription", sub))
state.cancel()
Expand All @@ -335,6 +343,35 @@ func (m *Prober) CancelPodProbing(pod corev1.Pod) {
}
}

// RefreshPodProbing lists probe targets and invalidates any in-flight (non-ready) states whose initial probed targets changed from the
// current ones.
func (m *Prober) RefreshPodProbing(ctx context.Context) {
m.mu.Lock()
defer m.mu.Unlock()
logger := logging.FromContext(ctx)
for _, state := range m.targetStates {
if !m.checkReadiness(state) {
// This is an in-flight state
sub := state.sub
ch := state.ch
// Get the probe targets
target, err := m.targetLister.ListProbeTargets(ctx, ch)
if err != nil {
logger.Errorw("Error listing probe targets", zap.Error(err),
zap.Any("subscription", sub.UID))
return
}
m.ejectStateUnsafe(sub)
func() {
// probeTarget requires an unlocked mutex.
m.mu.Unlock()
defer m.mu.Lock()
m.probeTarget(ctx, ch, sub, target)
}()
}
}
}

// processWorkItem processes a single work item from workQueue.
// It returns false when there is no more items to process, true otherwise.
func (m *Prober) processWorkItem() bool {
Expand Down Expand Up @@ -386,8 +423,10 @@ func (m *Prober) processWorkItem() bool {
if err != nil || !ok {
// In case of error, enqueue for retry
m.workQueue.AddRateLimited(obj)
item.logger.Debugw("Probing of %s failed, IP: %s:%s, ready: %t, error: %v (depth: %d)",
item.url, item.podIP, item.podPort, ok, err, m.workQueue.Len())
item.logger.Debugw("Probing failed",
zap.Any("url", item.url), zap.Any("IP", item.podIP),
zap.Any("port", item.podPort), zap.Bool("ready", ok), zap.Error(err),
zap.Int("depth", m.workQueue.Len()))
} else {
m.onProbingSuccess(item.targetStates, item.podState)
}
Expand All @@ -399,7 +438,7 @@ func (m *Prober) onProbingSuccess(subscriptionState *targetState, podState *podS
if podState.pendingCount.Dec() == 0 {
// Unlock the goroutine blocked on <-podCtx.Done()
podState.cancel()
// This is the last pod being successfully probed, the subscription is ready
// This is the last pod being successfully probed, the subscription might ready
if m.checkReadiness(subscriptionState) {
subscriptionState.cancel()
m.readyCallback(subscriptionState.ch, subscriptionState.sub)
Expand All @@ -414,13 +453,13 @@ func (m *Prober) onProbingCancellation(subscriptionState *targetState, podState
// Probing succeeded, nothing to do
return
}

// Attempt to set pendingCount to 0.
if podState.pendingCount.CAS(pendingCount, 0) {
// This is the last pod being successfully probed, the subscription is ready
// This is the last pod being successfully probed, the subscription might be ready
if subscriptionState.pendingCount.Dec() == 0 {
subscriptionState.cancel()
m.readyCallback(subscriptionState.ch, subscriptionState.sub)
if m.checkReadiness(subscriptionState) {
m.readyCallback(subscriptionState.ch, subscriptionState.sub)
}
}
return
}
Expand All @@ -430,7 +469,8 @@ func (m *Prober) onProbingCancellation(subscriptionState *targetState, podState
func (m *Prober) probeVerifier(item *workItem) prober.Verifier {
return func(r *http.Response, b []byte) (bool, error) {
m.logger.Debugw("Verifying response", zap.Int("status code", r.StatusCode),
zap.ByteString("body", b))
zap.ByteString("body", b), zap.Any("subscription", item.targetStates.sub.UID),
zap.Any("channel", item.targetStates.ch))
switch r.StatusCode {
case http.StatusOK:
var subscriptions = make(map[string][]int)
Expand Down Expand Up @@ -467,6 +507,14 @@ func (m *Prober) probeVerifier(item *workItem) prober.Verifier {
}
}

// A target state is outdated if the generation is different or if the target IPs change before it becomes
// ready.
func isOutdatedTargetState(state *targetState, sub eventingduckv1.SubscriberSpec, podIPs sets.String) bool {
state.readyLock.RLock()
defer state.readyLock.RUnlock()
return state.sub.Generation != sub.Generation || (!state.ready && !state.probedPods.Equal(podIPs))
}

// deepCopy copies a URL into a new one
func deepCopy(in *url.URL) *url.URL {
// Safe to ignore the error since this is a deep copy
Expand Down
Loading

0 comments on commit c4ff783

Please sign in to comment.